using CredentialManager.Services;
using CredentialManager.Models;
using DataConnection.Interfaces;
using DataConnection.REST.Interfaces;
using DataConnection.REST.Models;
using DataConnection.CredentialManagement.Interfaces;
using DataConnection.CredentialManagement.Models;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Globalization;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using Data_Coupler.Models;
using Data_Coupler.Services;
namespace Data_Coupler.Services;
///
/// Servizio per l'esecuzione di profili Data Coupler schedulati con logica completa di trasferimento dati
/// Implementa la stessa logica avanzata di StartDataTransferWithComposite per le schedulazioni
///
public class ScheduledProfileExecutionService : IScheduledProfileExecutionService
{
private readonly IDataCouplerProfileService _profileService;
private readonly IDataConnectionFactory _connectionFactory;
private readonly ICredentialService _credentialService;
private readonly IDataConnectionCredentialService _dataConnectionCredentialService;
private readonly IKeyAssociationService _keyAssociationService;
private readonly ILogger _logger;
public ScheduledProfileExecutionService(
IDataCouplerProfileService profileService,
IDataConnectionFactory connectionFactory,
ICredentialService credentialService,
IDataConnectionCredentialService dataConnectionCredentialService,
IKeyAssociationService keyAssociationService,
ILogger logger)
{
_profileService = profileService;
_connectionFactory = connectionFactory;
_credentialService = credentialService;
_dataConnectionCredentialService = dataConnectionCredentialService;
_keyAssociationService = keyAssociationService;
_logger = logger;
}
///
/// Esegue un profilo Data Coupler specificato dall'ID
///
public async Task ExecuteProfileAsync(int profileId)
{
var startTime = DateTime.UtcNow;
var result = new ProfileExecutionResult
{
ExecutionTime = startTime // Ora locale per coerenza con l'interfaccia utente
};
try
{
_logger.LogInformation("Inizio esecuzione profilo schedulato ID: {ProfileId}", profileId);
// Carica il profilo
var profile = await _profileService.GetProfileByIdAsync(profileId);
if (profile == null)
{
result.Success = false;
result.Message = $"Profilo con ID {profileId} non trovato";
return result;
}
_logger.LogInformation("Esecuzione profilo: {ProfileName}", profile.Name);
// Aggiorna la data di ultimo utilizzo
await _profileService.UpdateLastUsedAsync(profile.Id);
// Esegue il trasferimento dati con la logica completa
var recordsTransferred = await ExecuteDataTransferAsync(profile);
result.Success = true;
result.RecordsProcessed = recordsTransferred;
result.Message = $"Trasferimento completato con successo. {recordsTransferred} record elaborati.";
_logger.LogInformation("Profilo {ProfileName} eseguito con successo. Record elaborati: {RecordCount}",
profile.Name, recordsTransferred);
}
catch (Exception ex)
{
result.Success = false;
result.Message = $"Errore durante l'esecuzione: {ex.Message}";
_logger.LogError(ex, "Errore durante l'esecuzione del profilo ID: {ProfileId}", profileId);
}
finally
{
result.Duration = DateTime.UtcNow - startTime;
}
return result;
}
///
/// Metodo principale per l'esecuzione del trasferimento dati
/// Implementa la stessa logica di StartDataTransferWithComposite
///
private async Task ExecuteDataTransferAsync(DataCouplerProfile profile)
{
_logger.LogInformation("=== INIZIO TRASFERIMENTO DATI SCHEDULATO ===");
_logger.LogInformation("Esecuzione profilo: {ProfileName} (ID: {ProfileId})", profile.Name, profile.Id);
try
{
// 1. Setup delle connessioni sorgente e destinazione
var (sourceManager, sourceCredential) = await SetupSourceConnectionAsync(profile);
var (restClient, restCredential, restEntity) = await SetupDestinationConnectionAsync(profile);
// 2. Verifica che le connessioni siano valide
if (sourceManager == null)
{
throw new InvalidOperationException("Impossibile stabilire connessione con la sorgente dati");
}
if (restClient == null || restEntity == null)
{
throw new InvalidOperationException("Impossibile stabilire connessione con la destinazione REST");
}
// 3. Ottieni i record dalla sorgente
var sourceRecords = await GetAllRecordsFromSourceAsync(profile, sourceManager);
if (!sourceRecords.Any())
{
_logger.LogWarning("Nessun record trovato nella sorgente per il profilo: {ProfileName}", profile.Name);
return 0;
}
_logger.LogInformation("Ottenuti {RecordCount} record dalla sorgente", sourceRecords.Count());
// 4. Parse field mappings
var fieldMappings = ParseFieldMappings(profile.FieldMappingJson);
if (!fieldMappings.Any())
{
throw new InvalidOperationException("Nessun mapping dei campi configurato per il profilo");
}
// 5. Determina se utilizzare Salesforce Composite API
bool useSalesforceComposite = restClient is DataConnection.REST.Implementations.SalesforceServiceClient;
if (useSalesforceComposite)
{
_logger.LogInformation("Utilizzo Salesforce Composite API per il trasferimento");
return await ExecuteDataTransferWithCompositeAsync(profile, sourceRecords, restClient, restEntity, restCredential!, fieldMappings);
}
else
{
_logger.LogInformation("Utilizzo metodo trasferimento standard per il trasferimento");
return await ExecuteDataTransferStandardAsync(profile, sourceRecords, restClient, restEntity, restCredential!, fieldMappings);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore durante il trasferimento dati per il profilo {ProfileName}", profile.Name);
throw;
}
finally
{
_logger.LogInformation("=== FINE TRASFERIMENTO DATI SCHEDULATO ===");
}
}
///
/// Setup della connessione sorgente (database o file)
///
private async Task<(IDatabaseManager? manager, DatabaseCredential? credential)> SetupSourceConnectionAsync(DataCouplerProfile profile)
{
if (profile.SourceType.ToLower() != "database")
{
return (null, null); // Per i file gestiremo diversamente
}
if (!profile.SourceCredentialId.HasValue)
{
throw new InvalidOperationException("Credenziale sorgente non specificata per il database");
}
var sourceCredential = await _dataConnectionCredentialService.GetDatabaseCredentialAsync(profile.SourceCredentialId.Value);
if (sourceCredential == null)
{
throw new InvalidOperationException($"Credenziale database sorgente con ID {profile.SourceCredentialId.Value} non trovata");
}
// Applica override database se specificato (per le schedulazioni)
if (!string.IsNullOrEmpty(profile.SourceDatabaseName))
{
sourceCredential.DatabaseName = profile.SourceDatabaseName;
}
var databaseManager = await _connectionFactory.CreateDatabaseManagerAsync(sourceCredential.Name);
// Test connessione
var canConnect = await databaseManager.TestConnectionAsync();
if (!canConnect)
{
throw new InvalidOperationException($"Impossibile connettersi al database sorgente: {sourceCredential.Name}");
}
_logger.LogInformation("Connessione database sorgente stabilita: {CredentialName}", sourceCredential.Name);
return (databaseManager, sourceCredential);
}
///
/// Setup della connessione destinazione REST
///
private async Task<(IRestServiceClient? client, RestApiCredential? credential, RestEntitySummary? entity)> SetupDestinationConnectionAsync(DataCouplerProfile profile)
{
if (profile.DestinationType.ToLower() != "rest")
{
throw new NotSupportedException("Solo destinazioni REST sono supportate per le schedulazioni");
}
if (!profile.DestinationCredentialId.HasValue)
{
throw new InvalidOperationException("Credenziale destinazione REST non specificata");
}
var restCredential = await _dataConnectionCredentialService.GetRestApiCredentialAsync(profile.DestinationCredentialId.Value);
if (restCredential == null)
{
throw new InvalidOperationException($"Credenziale REST con ID {profile.DestinationCredentialId.Value} non trovata");
}
var restClient = await _connectionFactory.CreateRestServiceClientAsync(restCredential.Name);
// Autenticazione
var authResult = await restClient.AuthenticateAsync();
if (!authResult)
{
throw new InvalidOperationException($"Autenticazione fallita per il servizio REST: {restCredential.Name}");
}
// Ottieni l'entità REST (per le schedulazioni usiamo solo il nome)
RestEntitySummary? restEntity = null;
if (!string.IsNullOrEmpty(profile.DestinationEndpoint))
{
// Per le schedulazioni, creiamo un'entità fittizia con il nome dal profilo
restEntity = new RestEntitySummary
{
Name = profile.DestinationEndpoint,
Label = profile.DestinationEndpoint
};
}
if (restEntity == null)
{
throw new InvalidOperationException($"Entità REST non specificata nel profilo");
}
_logger.LogInformation("Connessione REST destinazione stabilita: {CredentialName}, Entità: {EntityName}",
restCredential.Name, restEntity.Name);
return (restClient, restCredential, restEntity);
}
///
/// Ottiene tutti i record dalla sorgente
///
private async Task>> GetAllRecordsFromSourceAsync(DataCouplerProfile profile, IDatabaseManager? databaseManager)
{
if (profile.SourceType.ToLower() == "database")
{
return await GetAllRecordsFromDatabaseAsync(profile, databaseManager!);
}
else if (profile.SourceType.ToLower() == "file")
{
return await GetAllRecordsFromFileAsync(profile);
}
throw new NotSupportedException($"Tipo sorgente non supportato: {profile.SourceType}");
}
///
/// Parse del JSON dei field mappings
///
private Dictionary ParseFieldMappings(string? fieldMappingJson)
{
var mappings = new Dictionary();
if (string.IsNullOrEmpty(fieldMappingJson))
{
_logger.LogWarning("Field mapping JSON è vuoto o null");
return mappings;
}
_logger.LogDebug("Parsing field mappings JSON: {FieldMappingJson}", fieldMappingJson);
try
{
// Usa le stesse opzioni di serializzazione del DataCouplerProfileService
var options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
var fieldMappingsList = JsonSerializer.Deserialize>(fieldMappingJson, options);
if (fieldMappingsList != null)
{
_logger.LogInformation("Trovati {Count} field mappings nel JSON", fieldMappingsList.Count);
foreach (var mapping in fieldMappingsList)
{
if (!string.IsNullOrEmpty(mapping.SourceField) && !string.IsNullOrEmpty(mapping.DestinationField))
{
mappings[mapping.SourceField] = mapping.DestinationField;
_logger.LogDebug("Mapping aggiunto: {SourceField} -> {DestinationField}",
mapping.SourceField, mapping.DestinationField);
}
else
{
_logger.LogWarning("Mapping incompleto ignorato: SourceField='{SourceField}', DestinationField='{DestinationField}'",
mapping.SourceField, mapping.DestinationField);
}
}
}
else
{
_logger.LogWarning("Deserializzazione ritornato null per field mappings JSON");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nel parsing dei field mappings: {FieldMappingJson}", fieldMappingJson);
}
_logger.LogInformation("Mappings completati: {Count} associazioni", mappings.Count);
return mappings;
}
///
/// Ottiene tutti i record dal database
///
private async Task>> GetAllRecordsFromDatabaseAsync(DataCouplerProfile profile, IDatabaseManager databaseManager)
{
string query;
if (!string.IsNullOrEmpty(profile.SourceCustomQuery))
{
query = profile.SourceCustomQuery;
}
else if (!string.IsNullOrEmpty(profile.SourceTable))
{
var tableName = !string.IsNullOrEmpty(profile.SourceSchema)
? $"[{profile.SourceSchema}].[{profile.SourceTable}]"
: $"[{profile.SourceTable}]";
query = $"SELECT * FROM {tableName}";
}
else
{
throw new InvalidOperationException("Né query custom né tabella specificata per la sorgente database");
}
_logger.LogDebug("Esecuzione query sorgente: {Query}", query);
var results = await databaseManager.ExecuteRawQueryAsync(query, profile.SourceDatabaseName ?? "");
return results;
}
///
/// Ottiene tutti i record da file (implementazione future)
///
private async Task>> GetAllRecordsFromFileAsync(DataCouplerProfile profile)
{
if (string.IsNullOrEmpty(profile.SourceFilePath))
throw new InvalidOperationException("Percorso file sorgente non specificato");
// TODO: Implementazione per file Excel/CSV per le schedulazioni
// Per ora restituiamo una lista vuota
_logger.LogWarning("Lettura file non ancora implementata per le schedulazioni. File: {FilePath}", profile.SourceFilePath);
await Task.Delay(1); // Placeholder async
return new List>();
}
///
/// Metodo principale per trasferimento dati standard (non Salesforce Composite)
///
private async Task ExecuteDataTransferStandardAsync(
DataCouplerProfile profile,
IEnumerable> sourceRecords,
IRestServiceClient restClient,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary fieldMappings)
{
_logger.LogInformation("Iniziando trasferimento dati standard per {RecordCount} record", sourceRecords.Count());
int successCount = 0;
int errorCount = 0;
int recordNumber = 1;
foreach (var record in sourceRecords)
{
try
{
// 1. Trasforma il record utilizzando i field mappings
var restData = TransformRecordForRest(record, fieldMappings);
// 2. Gestione associazioni record se abilitata
string? entityId = null;
if (profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField))
{
entityId = await HandleRecordAssociation(profile, record, restClient, restEntity, restCredential, restData, fieldMappings);
}
// 3. Se non abbiamo un ID esistente, crea un nuovo record
if (string.IsNullOrEmpty(entityId))
{
var result = await restClient.CreateEntityAsync(restEntity.Name, restData);
if (result != null)
{
// Estrai l'ID dal risultato (diversi sistemi REST usano campi diversi)
entityId = result.ContainsKey("id") ? result["id"]?.ToString() :
result.ContainsKey("Id") ? result["Id"]?.ToString() :
result.ContainsKey("DocEntry") ? result["DocEntry"]?.ToString() : null;
successCount++;
_logger.LogDebug("Record {RecordNumber} creato con successo. ID: {EntityId}", recordNumber, entityId);
}
else
{
errorCount++;
_logger.LogWarning("Errore nella creazione del record {RecordNumber}: risultato null", recordNumber);
}
}
else
{
successCount++; // Record aggiornato tramite associazione
}
// 4. Salva associazione se abilitata e abbiamo un ID
if (profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField) && !string.IsNullOrEmpty(entityId))
{
await SaveRecordAssociation(profile, record, restEntity, restCredential, restData, entityId, fieldMappings);
}
}
catch (Exception ex)
{
errorCount++;
_logger.LogError(ex, "Errore nel trasferimento del record {RecordNumber}", recordNumber);
}
recordNumber++;
}
_logger.LogInformation("Trasferimento completato. Successi: {SuccessCount}, Errori: {ErrorCount}",
successCount, errorCount);
return successCount;
}
///
/// Implementazione completa del trasferimento con Salesforce Composite API
/// Equivalente a StartDataTransferWithComposite del DataCoupler.razor.cs
///
private async Task ExecuteDataTransferWithCompositeAsync(
DataCouplerProfile profile,
IEnumerable> sourceRecords,
IRestServiceClient restClient,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary fieldMappings)
{
_logger.LogInformation("Iniziando trasferimento dati COMPOSITE per {RecordCount} record", sourceRecords.Count());
// Verifica che sia effettivamente un SalesforceServiceClient
if (!(restClient is DataConnection.REST.Implementations.SalesforceServiceClient salesforceClient))
{
_logger.LogWarning("Client REST non è SalesforceServiceClient, fallback al metodo standard");
return await ExecuteDataTransferStandardAsync(profile, sourceRecords, restClient, restEntity, restCredential, fieldMappings);
}
try
{
// 1. Trasforma i record e analizza le associazioni IN PARALLELO
var recordsForCreate = new ConcurrentBag<(Dictionary transformedData, Dictionary originalRecord, int recordNumber)>();
var recordsForUpdate = new ConcurrentBag<(Dictionary transformedData, string entityId, Dictionary originalRecord, int recordNumber, string newDataHash)>();
var recordsSkipped = new ConcurrentBag<(Dictionary originalRecord, int recordNumber, string reason)>();
var recordErrors = new ConcurrentBag<(Dictionary originalRecord, int recordNumber, string error)>();
// Cattura i valori condivisi per evitare race conditions
var currentEntityName = restEntity.Name;
var currentCredentialName = restCredential.Name; // Usa il nome della credenziale, non l'ID
var currentUseRecordAssociations = profile.UseRecordAssociations;
// Crea lista indicizzata per mantenere il record number
var indexedRecords = sourceRecords.Select((record, index) => new { Record = record, RecordNumber = index + 1 }).ToList();
_logger.LogInformation("COMPOSITE SCHEDULED: Inizio analisi parallela di {RecordCount} record", indexedRecords.Count);
var analysisStartTime = DateTime.UtcNow;
// Processa tutti i record in parallelo
var processingTasks = indexedRecords.Select(async indexedRecord =>
{
try
{
var record = indexedRecord.Record;
var recordNumber = indexedRecord.RecordNumber;
// Trasforma il record in base ai mapping (operazione locale, thread-safe)
var restData = TransformRecordForRest(record, fieldMappings);
// Genera la chiave sorgente e l'hash dei dati per questo record (include MAPPING_SIGNATURE)
var sourceKey = GenerateSourceKey(record, profile.SourceKeyField);
var currentDataHash = GenerateDataHash(restData, fieldMappings);
// Analizza le associazioni per capire se aggiornare, creare o saltare
if (currentUseRecordAssociations && !string.IsNullOrEmpty(sourceKey))
{
_logger.LogDebug("COMPOSITE SCHEDULED: Cerco associazione per KeyValue: '{KeyValue}', Entity: '{Entity}', Credential: '{Credential}'",
sourceKey, currentEntityName, currentCredentialName);
// Cerca associazione esistente usando il metodo parallelo
var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(
sourceKey, currentEntityName, currentCredentialName);
// FALLBACK: Se non troviamo l'associazione con tutti i parametri, proviamo solo con il KeyValue
if (existingAssociation == null)
{
existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(sourceKey);
if (existingAssociation != null)
{
// Verifica compatibilità
if (existingAssociation.DestinationEntity != currentEntityName ||
existingAssociation.RestCredentialName != currentCredentialName)
{
existingAssociation = null;
}
}
}
if (existingAssociation != null && existingAssociation.IsActive)
{
// CONTROLLO HASH: Verifica se i dati sono cambiati
var existingHash = existingAssociation.Data_Hash;
if (!string.IsNullOrEmpty(existingHash) && existingHash.Equals(currentDataHash, StringComparison.OrdinalIgnoreCase))
{
// I dati non sono cambiati, salta questo record
recordsSkipped.Add((record, recordNumber, "Dati non modificati (hash identico)"));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} saltato - hash identico: {Hash}",
recordNumber, currentDataHash);
}
else
{
// I dati sono cambiati o l'hash è vuoto, procedi con l'aggiornamento
recordsForUpdate.Add((restData, existingAssociation.DestinationId, record, recordNumber, currentDataHash));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per aggiornamento (EntityId: {EntityId}) - hash diverso: old={OldHash}, new={NewHash}",
recordNumber, existingAssociation.DestinationId, existingHash ?? "NULL", currentDataHash);
}
}
else
{
// Record da creare (nessuna associazione esistente)
recordsForCreate.Add((restData, record, recordNumber));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per creazione", recordNumber);
}
}
else
{
// Record da creare (no associazioni)
recordsForCreate.Add((restData, record, recordNumber));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per creazione (no associazioni)", recordNumber);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nella trasformazione del record {RecordNumber}", indexedRecord.RecordNumber);
recordErrors.Add((indexedRecord.Record, indexedRecord.RecordNumber, ex.Message));
}
});
// Attendi il completamento di tutte le operazioni parallele
await Task.WhenAll(processingTasks);
var analysisEndTime = DateTime.UtcNow;
var analysisElapsed = (analysisEndTime - analysisStartTime).TotalMilliseconds;
// Converti i ConcurrentBag in liste per il resto del processing
var finalRecordsForCreate = recordsForCreate.ToList();
var finalRecordsForUpdate = recordsForUpdate.ToList();
var finalRecordsSkipped = recordsSkipped.ToList();
var finalRecordErrors = recordErrors.ToList();
_logger.LogInformation("COMPOSITE SCHEDULED: Analisi parallela completata in {ElapsedMs}ms - {CreateCount} record da creare, {UpdateCount} record da aggiornare, {SkippedCount} record saltati, {ErrorCount} errori",
analysisElapsed, finalRecordsForCreate.Count, finalRecordsForUpdate.Count, finalRecordsSkipped.Count, finalRecordErrors.Count);
// 2. Esegui le chiamate composite in parallelo
var createTask = Task.FromResult(new List());
var updateTask = Task.FromResult(new List());
if (finalRecordsForCreate.Any())
{
var createData = finalRecordsForCreate.Select(r => r.transformedData).ToList();
createTask = salesforceClient.BatchCreateEntitiesAsync(restEntity.Name, createData);
}
if (finalRecordsForUpdate.Any())
{
var updateData = finalRecordsForUpdate.ToDictionary(
r => r.entityId,
r => r.transformedData);
updateTask = salesforceClient.BatchUpdateEntitiesAsync(restEntity.Name, updateData);
}
// Attendi entrambe le operazioni
await Task.WhenAll(createTask, updateTask);
var createResults = await createTask;
var updateResults = await updateTask;
// 3. Processa i risultati delle creazioni
int successCount = 0;
int errorCount = finalRecordErrors.Count;
int updatedCount = 0;
// Lista per raccogliere le task di creazione associazioni
var createAssociationTasks = new List();
for (int i = 0; i < createResults.Count; i++)
{
var result = createResults[i];
var originalData = finalRecordsForCreate[i];
if (result.Success)
{
successCount++;
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} creato con successo. EntityId: {EntityId}",
originalData.recordNumber, result.EntityId);
// Aggiungi task di creazione associazione alla lista (esecuzione parallela)
if (currentUseRecordAssociations && !string.IsNullOrEmpty(result.EntityId))
{
var dataHashForAssociation = GenerateDataHash(originalData.transformedData, fieldMappings);
var associationTask = CreateAssociationAsync(profile, originalData.originalRecord, restCredential, result.EntityId, originalData.recordNumber, dataHashForAssociation);
createAssociationTasks.Add(associationTask);
}
}
else
{
errorCount++;
_logger.LogError("COMPOSITE SCHEDULED: Errore creazione record {RecordNumber}: {Error}",
originalData.recordNumber, result.ErrorMessage);
}
}
// 4. Processa i risultati degli aggiornamenti
var updateAssociationTasks = new List();
for (int i = 0; i < updateResults.Count; i++)
{
var result = updateResults[i];
var originalData = finalRecordsForUpdate[i];
if (result.Success)
{
updatedCount++;
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} aggiornato con successo. EntityId: {EntityId}",
originalData.recordNumber, result.EntityId);
// Aggiungi task di aggiornamento associazione alla lista (esecuzione parallela)
if (currentUseRecordAssociations && !string.IsNullOrEmpty(result.EntityId))
{
var updateHashTask = UpdateAssociationHashAsync(profile, originalData.originalRecord, restCredential, result.EntityId, originalData.newDataHash);
updateAssociationTasks.Add(updateHashTask);
}
}
else
{
errorCount++;
_logger.LogError("COMPOSITE SCHEDULED: Errore aggiornamento record {RecordNumber}: {Error}",
originalData.recordNumber, result.ErrorMessage);
}
}
// 5. Esegui tutte le operazioni di associazione in parallelo
var allAssociationTasks = createAssociationTasks.Concat(updateAssociationTasks).ToList();
if (allAssociationTasks.Any())
{
_logger.LogInformation("COMPOSITE SCHEDULED: Avvio di {TaskCount} operazioni di associazione in parallelo ({CreateCount} creazioni, {UpdateCount} aggiornamenti)",
allAssociationTasks.Count, createAssociationTasks.Count, updateAssociationTasks.Count);
var startTime = DateTime.UtcNow;
await Task.WhenAll(allAssociationTasks);
var endTime = DateTime.UtcNow;
_logger.LogInformation("COMPOSITE SCHEDULED: Operazioni di associazione completate in {ElapsedMs}ms",
(endTime - startTime).TotalMilliseconds);
}
var skippedCount = finalRecordsSkipped.Count;
var totalProcessed = successCount + updatedCount;
_logger.LogInformation("COMPOSITE SCHEDULED: Trasferimento completato. Creazioni: {SuccessCount}, Aggiornamenti: {UpdatedCount}, Saltati: {SkippedCount}, Errori: {ErrorCount}",
successCount, updatedCount, skippedCount, errorCount);
return totalProcessed;
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore generale nel trasferimento dati");
throw;
}
}
#region Helper Methods
///
/// Trasforma un record sorgente in formato REST utilizzando i field mappings
///
private Dictionary TransformRecordForRest(Dictionary sourceRecord, Dictionary fieldMappings)
{
var restData = new Dictionary();
foreach (var mapping in fieldMappings)
{
if (sourceRecord.ContainsKey(mapping.Key))
{
var value = sourceRecord[mapping.Key];
// Trasforma il valore se necessario
var transformedValue = TransformValueForRest(value);
if (transformedValue != null)
{
restData[mapping.Value] = transformedValue;
}
}
}
return restData;
}
///
/// Trasforma un valore per il formato REST
///
private object? TransformValueForRest(object? value)
{
if (value == null || value == DBNull.Value)
return null;
// Trasformazioni specifiche per tipo
if (value is DateTime dateTime)
{
return dateTime.ToString("yyyy-MM-ddTHH:mm:ss.fffZ", CultureInfo.InvariantCulture);
}
if (value is decimal dec)
{
return dec.ToString(CultureInfo.InvariantCulture);
}
if (value is double dbl)
{
return dbl.ToString(CultureInfo.InvariantCulture);
}
if (value is float flt)
{
return flt.ToString(CultureInfo.InvariantCulture);
}
return value;
}
///
/// Gestisce l'associazione dei record per evitare duplicati
///
private async Task HandleRecordAssociation(
DataCouplerProfile profile,
Dictionary sourceRecord,
IRestServiceClient restClient,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary restData,
Dictionary fieldMappings)
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !sourceRecord.ContainsKey(profile.SourceKeyField))
return null;
var sourceKey = sourceRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return null;
try
{
// Cerca associazione esistente usando il metodo parallelo
var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(
sourceKey, restEntity.Name, restCredential.Name);
if (existingAssociation != null && existingAssociation.IsActive)
{
// Genera l'hash dei dati correnti (solo dati trasformati/mappati, include MAPPING_SIGNATURE)
var currentDataHash = GenerateDataHash(restData, fieldMappings);
var existingHash = existingAssociation.Data_Hash;
// Verifica se i dati sono cambiati
if (!string.IsNullOrEmpty(existingHash) && existingHash.Equals(currentDataHash, StringComparison.OrdinalIgnoreCase))
{
// I dati non sono cambiati, salta l'aggiornamento ma aggiorna LastVerifiedAt
existingAssociation.LastVerifiedAt = DateTime.Now;
await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation);
_logger.LogDebug("Record non modificato, aggiornamento saltato. KeyValue: {KeyValue}, EntityId: {EntityId}, Hash: {Hash}",
sourceKey, existingAssociation.DestinationId, currentDataHash);
return existingAssociation.DestinationId;
}
// I dati sono cambiati, procedi con l'aggiornamento
var updateResult = await restClient.UpdateEntityAsync(restEntity.Name, existingAssociation.DestinationId, restData);
if (updateResult != null)
{
// Aggiorna l'hash e LastVerifiedAt nell'associazione
existingAssociation.Data_Hash = currentDataHash;
existingAssociation.UpdatedAt = DateTime.Now;
existingAssociation.LastVerifiedAt = DateTime.Now;
await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation);
_logger.LogDebug("Record aggiornato tramite associazione. KeyValue: {KeyValue}, EntityId: {EntityId}, NewHash: {Hash}",
sourceKey, existingAssociation.DestinationId, currentDataHash);
return existingAssociation.DestinationId;
}
else
{
_logger.LogWarning("Fallimento aggiornamento record associato. KeyValue: {KeyValue}, EntityId: {EntityId}",
sourceKey, existingAssociation.DestinationId);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nella gestione dell'associazione per KeyValue: {KeyValue}", sourceKey);
}
return null;
}
///
/// Salva l'associazione tra record sorgente e destinazione
///
private async Task SaveRecordAssociation(
DataCouplerProfile profile,
Dictionary sourceRecord,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary restData,
string entityId,
Dictionary fieldMappings)
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !sourceRecord.ContainsKey(profile.SourceKeyField))
return;
var sourceKey = sourceRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return;
try
{
// Genera l'hash dei dati per il controllo delle modifiche (solo dati trasformati/mappati, include MAPPING_SIGNATURE)
var dataHash = GenerateDataHash(restData, fieldMappings);
var association = new KeyAssociation
{
KeyValue = sourceKey,
SourceKeyField = profile.SourceKeyField,
DestinationKeyField = "Id", // Assumiamo "Id" come campo di default
DestinationId = entityId,
DestinationEntity = restEntity.Name,
RestCredentialName = restCredential.Name, // Usa il nome della credenziale
IsActive = true,
Data_Hash = dataHash,
CreatedAt = DateTime.Now,
UpdatedAt = DateTime.Now,
LastVerifiedAt = DateTime.Now,
AdditionalInfo = JsonSerializer.Serialize(new
{
TransferDate = DateTime.Now,
SourceType = profile.SourceType,
DestinationType = profile.DestinationType,
ProfileName = profile.Name,
ScheduledTransfer = true,
StandardTransfer = true,
DataHashGenerated = true
})
};
await _dataConnectionCredentialService.SaveKeyAssociationParallelAsync(association);
_logger.LogDebug("Associazione salvata. KeyValue: {KeyValue}, EntityId: {EntityId}, Hash: {Hash}",
sourceKey, entityId, dataHash);
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nel salvaggio dell'associazione per KeyValue: {KeyValue}", sourceKey);
}
}
///
/// Genera la chiave sorgente per un record
///
private string GenerateSourceKey(Dictionary record, string? sourceKeyField)
{
if (string.IsNullOrEmpty(sourceKeyField) || !record.ContainsKey(sourceKeyField))
return string.Empty;
var keyValue = record[sourceKeyField];
return keyValue?.ToString() ?? string.Empty;
}
///
/// Genera un hash SHA256 dei dati del record passato come parametro.
/// Utilizzato per rilevare cambiamenti nei dati e ottimizzare il trasferimento.
/// Calcola l'hash SOLO sui campi presenti nel record, in ordine alfabetico.
/// DEVE essere identico al metodo in DataCoupler.razor.cs per garantire consistenza.
///
private string GenerateDataHash(Dictionary record, Dictionary? fieldMappings = null)
{
try
{
var valuesForHash = new List();
// Ordina le chiavi alfabeticamente per garantire consistenza
var orderedKeys = record.Keys.OrderBy(k => k).ToList();
// Aggiungi i valori dei dati per ogni campo presente nel record
foreach (var key in orderedKeys)
{
var value = record[key];
var normalizedValue = value?.ToString()?.Trim() ?? "";
valuesForHash.Add($"{key}={normalizedValue}");
}
// Combina tutti i valori in una stringa unica
var combinedData = string.Join("|", valuesForHash);
_logger.LogDebug("Hash dei dati generato da: {CombinedData}", combinedData);
// Calcola l'hash SHA256 (stesso algoritmo di DataCoupler.razor.cs)
using (var sha256 = System.Security.Cryptography.SHA256.Create())
{
var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(combinedData));
var hashString = Convert.ToHexString(hashBytes);
_logger.LogDebug("Hash SHA256 generato: {Hash} per {FieldCount} campi", hashString, orderedKeys.Count);
return hashString;
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Errore nella generazione dell'hash per il record");
return string.Empty;
}
}
///
/// Crea una nuova associazione record in modo asincrono
///
private async Task CreateAssociationAsync(DataCouplerProfile profile, Dictionary originalRecord, RestApiCredential restCredential, string entityId, int recordNumber, string dataHash)
{
try
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !originalRecord.ContainsKey(profile.SourceKeyField))
return;
var sourceKey = originalRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return;
// Calcola il MappingCount in modo sicuro e trova il campo destinazione mappato al campo chiave sorgente
int mappingCount = 0;
string? mappedDestinationField = null;
try
{
if (!string.IsNullOrEmpty(profile.FieldMappingJson))
{
var mappings = ParseFieldMappings(profile.FieldMappingJson);
mappingCount = mappings?.Count ?? 0;
// Cerca il campo destinazione mappato al campo chiave sorgente
if (mappings != null && !string.IsNullOrEmpty(profile.SourceKeyField))
{
if (mappings.TryGetValue(profile.SourceKeyField, out var destinationFieldName))
{
mappedDestinationField = destinationFieldName;
_logger.LogDebug("SCHEDULED MAPPING: Campo sorgente '{SourceField}' è mappato al campo destinazione '{DestField}'",
profile.SourceKeyField, mappedDestinationField);
}
else
{
_logger.LogWarning("SCHEDULED MAPPING: Campo chiave sorgente '{SourceKeyField}' NON trovato nei mappings del profilo {ProfileName}",
profile.SourceKeyField, profile.Name);
}
}
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Errore nel calcolo del MappingCount per l'associazione del record {RecordNumber}", recordNumber);
}
var association = new KeyAssociation
{
KeyValue = sourceKey,
SourceKeyField = profile.SourceKeyField ?? "",
DestinationKeyField = "Id", // Campo ID standard per REST
MappedDestinationField = mappedDestinationField, // Campo destinazione mappato al campo chiave sorgente
DestinationEntity = profile.DestinationEndpoint ?? "",
DestinationId = entityId,
RestCredentialName = restCredential.Name, // Usa il nome della credenziale
IsActive = true,
Data_Hash = dataHash,
CreatedAt = DateTime.Now,
UpdatedAt = DateTime.Now,
LastVerifiedAt = DateTime.Now,
AdditionalInfo = JsonSerializer.Serialize(new
{
TransferDate = DateTime.Now,
RecordNumber = recordNumber,
MappingCount = mappingCount,
SourceType = profile.SourceType,
DestinationType = profile.DestinationType,
ProfileName = profile.Name,
ScheduledTransfer = true,
CompositeTransfer = true,
DataHashGenerated = true
})
};
var associationId = await _dataConnectionCredentialService.SaveKeyAssociationParallelAsync(association);
_logger.LogDebug("COMPOSITE SCHEDULED: Associazione creata con ID: {AssociationId} per record {RecordNumber} - Key: {SourceKey}, EntityId: {EntityId}, Hash: {Hash}, MappedField: {MappedField}",
associationId, recordNumber, sourceKey, entityId, dataHash, mappedDestinationField ?? "N/A");
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nella creazione dell'associazione per il record {RecordNumber}", recordNumber);
}
}
///
/// Aggiorna l'hash di un'associazione esistente
///
private async Task UpdateAssociationHashAsync(DataCouplerProfile profile, Dictionary originalRecord, RestApiCredential restCredential, string entityId, string newDataHash)
{
try
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !originalRecord.ContainsKey(profile.SourceKeyField))
return;
var sourceKey = originalRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return;
// Trova l'associazione esistente usando il metodo parallelo
var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(
sourceKey, profile.DestinationEndpoint ?? "", restCredential.Name);
if (existingAssociation != null)
{
existingAssociation.Data_Hash = newDataHash;
existingAssociation.UpdatedAt = DateTime.Now;
existingAssociation.LastVerifiedAt = DateTime.Now;
// Usa UpdateKeyAssociationAsync per l'aggiornamento invece di SaveKeyAssociationParallelAsync
await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation);
_logger.LogDebug("COMPOSITE SCHEDULED: Hash associazione aggiornato per entityId {EntityId} - Key: {SourceKey}, NewHash: {Hash}",
entityId, sourceKey, newDataHash);
}
else
{
_logger.LogWarning("COMPOSITE SCHEDULED: Associazione non trovata per aggiornamento hash - EntityId: {EntityId}, SourceKey: {SourceKey}",
entityId, sourceKey);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nell'aggiornamento dell'hash per EntityId: {EntityId}", entityId);
}
}
#endregion
}