using CredentialManager.Services; using CredentialManager.Models; using DataConnection.Interfaces; using DataConnection.REST.Interfaces; using DataConnection.REST.Models; using DataConnection.CredentialManagement.Interfaces; using DataConnection.CredentialManagement.Models; using Microsoft.Extensions.Logging; using System.Collections.Concurrent; using System.Globalization; using System.Security.Cryptography; using System.Text; using System.Text.Json; using Data_Coupler.Models; using Data_Coupler.Services; namespace Data_Coupler.Services; /// /// Servizio per l'esecuzione di profili Data Coupler schedulati con logica completa di trasferimento dati /// Implementa la stessa logica avanzata di StartDataTransferWithComposite per le schedulazioni /// public class ScheduledProfileExecutionService : IScheduledProfileExecutionService { private readonly IDataCouplerProfileService _profileService; private readonly IDataConnectionFactory _connectionFactory; private readonly ICredentialService _credentialService; private readonly IDataConnectionCredentialService _dataConnectionCredentialService; private readonly IKeyAssociationService _keyAssociationService; private readonly ILogger _logger; public ScheduledProfileExecutionService( IDataCouplerProfileService profileService, IDataConnectionFactory connectionFactory, ICredentialService credentialService, IDataConnectionCredentialService dataConnectionCredentialService, IKeyAssociationService keyAssociationService, ILogger logger) { _profileService = profileService; _connectionFactory = connectionFactory; _credentialService = credentialService; _dataConnectionCredentialService = dataConnectionCredentialService; _keyAssociationService = keyAssociationService; _logger = logger; } /// /// Esegue un profilo Data Coupler specificato dall'ID /// public async Task ExecuteProfileAsync(int profileId) { var startTime = DateTime.UtcNow; var result = new ProfileExecutionResult { ExecutionTime = startTime // Ora locale per coerenza con l'interfaccia utente }; try { _logger.LogInformation("Inizio esecuzione profilo schedulato ID: {ProfileId}", profileId); // Carica il profilo var profile = await _profileService.GetProfileByIdAsync(profileId); if (profile == null) { result.Success = false; result.Message = $"Profilo con ID {profileId} non trovato"; return result; } _logger.LogInformation("Esecuzione profilo: {ProfileName}", profile.Name); // Aggiorna la data di ultimo utilizzo await _profileService.UpdateLastUsedAsync(profile.Id); // Esegue il trasferimento dati con la logica completa var recordsTransferred = await ExecuteDataTransferAsync(profile); result.Success = true; result.RecordsProcessed = recordsTransferred; result.Message = $"Trasferimento completato con successo. {recordsTransferred} record elaborati."; _logger.LogInformation("Profilo {ProfileName} eseguito con successo. Record elaborati: {RecordCount}", profile.Name, recordsTransferred); } catch (Exception ex) { result.Success = false; result.Message = $"Errore durante l'esecuzione: {ex.Message}"; _logger.LogError(ex, "Errore durante l'esecuzione del profilo ID: {ProfileId}", profileId); } finally { result.Duration = DateTime.UtcNow - startTime; } return result; } /// /// Metodo principale per l'esecuzione del trasferimento dati /// Implementa la stessa logica di StartDataTransferWithComposite /// private async Task ExecuteDataTransferAsync(DataCouplerProfile profile) { _logger.LogInformation("=== INIZIO TRASFERIMENTO DATI SCHEDULATO ==="); _logger.LogInformation("Esecuzione profilo: {ProfileName} (ID: {ProfileId})", profile.Name, profile.Id); try { // 1. Setup delle connessioni sorgente e destinazione var (sourceManager, sourceCredential) = await SetupSourceConnectionAsync(profile); var (restClient, restCredential, restEntity) = await SetupDestinationConnectionAsync(profile); // 2. Verifica che le connessioni siano valide if (sourceManager == null) { throw new InvalidOperationException("Impossibile stabilire connessione con la sorgente dati"); } if (restClient == null || restEntity == null) { throw new InvalidOperationException("Impossibile stabilire connessione con la destinazione REST"); } // 3. Ottieni i record dalla sorgente var sourceRecords = await GetAllRecordsFromSourceAsync(profile, sourceManager); if (!sourceRecords.Any()) { _logger.LogWarning("Nessun record trovato nella sorgente per il profilo: {ProfileName}", profile.Name); return 0; } _logger.LogInformation("Ottenuti {RecordCount} record dalla sorgente", sourceRecords.Count()); // 4. Parse field mappings var fieldMappings = ParseFieldMappings(profile.FieldMappingJson); if (!fieldMappings.Any()) { throw new InvalidOperationException("Nessun mapping dei campi configurato per il profilo"); } // 5. Determina se utilizzare Salesforce Composite API bool useSalesforceComposite = restClient is DataConnection.REST.Implementations.SalesforceServiceClient; if (useSalesforceComposite) { _logger.LogInformation("Utilizzo Salesforce Composite API per il trasferimento"); return await ExecuteDataTransferWithCompositeAsync(profile, sourceRecords, restClient, restEntity, restCredential!, fieldMappings); } else { _logger.LogInformation("Utilizzo metodo trasferimento standard per il trasferimento"); return await ExecuteDataTransferStandardAsync(profile, sourceRecords, restClient, restEntity, restCredential!, fieldMappings); } } catch (Exception ex) { _logger.LogError(ex, "Errore durante il trasferimento dati per il profilo {ProfileName}", profile.Name); throw; } finally { _logger.LogInformation("=== FINE TRASFERIMENTO DATI SCHEDULATO ==="); } } /// /// Setup della connessione sorgente (database o file) /// private async Task<(IDatabaseManager? manager, DatabaseCredential? credential)> SetupSourceConnectionAsync(DataCouplerProfile profile) { if (profile.SourceType.ToLower() != "database") { return (null, null); // Per i file gestiremo diversamente } if (!profile.SourceCredentialId.HasValue) { throw new InvalidOperationException("Credenziale sorgente non specificata per il database"); } var sourceCredential = await _dataConnectionCredentialService.GetDatabaseCredentialAsync(profile.SourceCredentialId.Value); if (sourceCredential == null) { throw new InvalidOperationException($"Credenziale database sorgente con ID {profile.SourceCredentialId.Value} non trovata"); } // Applica override database se specificato (per le schedulazioni) if (!string.IsNullOrEmpty(profile.SourceDatabaseName)) { sourceCredential.DatabaseName = profile.SourceDatabaseName; } var databaseManager = await _connectionFactory.CreateDatabaseManagerAsync(sourceCredential.Name); // Test connessione var canConnect = await databaseManager.TestConnectionAsync(); if (!canConnect) { throw new InvalidOperationException($"Impossibile connettersi al database sorgente: {sourceCredential.Name}"); } _logger.LogInformation("Connessione database sorgente stabilita: {CredentialName}", sourceCredential.Name); return (databaseManager, sourceCredential); } /// /// Setup della connessione destinazione REST /// private async Task<(IRestServiceClient? client, RestApiCredential? credential, RestEntitySummary? entity)> SetupDestinationConnectionAsync(DataCouplerProfile profile) { if (profile.DestinationType.ToLower() != "rest") { throw new NotSupportedException("Solo destinazioni REST sono supportate per le schedulazioni"); } if (!profile.DestinationCredentialId.HasValue) { throw new InvalidOperationException("Credenziale destinazione REST non specificata"); } var restCredential = await _dataConnectionCredentialService.GetRestApiCredentialAsync(profile.DestinationCredentialId.Value); if (restCredential == null) { throw new InvalidOperationException($"Credenziale REST con ID {profile.DestinationCredentialId.Value} non trovata"); } var restClient = await _connectionFactory.CreateRestServiceClientAsync(restCredential.Name); // Autenticazione var authResult = await restClient.AuthenticateAsync(); if (!authResult) { throw new InvalidOperationException($"Autenticazione fallita per il servizio REST: {restCredential.Name}"); } // Ottieni l'entità REST (per le schedulazioni usiamo solo il nome) RestEntitySummary? restEntity = null; if (!string.IsNullOrEmpty(profile.DestinationEndpoint)) { // Per le schedulazioni, creiamo un'entità fittizia con il nome dal profilo restEntity = new RestEntitySummary { Name = profile.DestinationEndpoint, Label = profile.DestinationEndpoint }; } if (restEntity == null) { throw new InvalidOperationException($"Entità REST non specificata nel profilo"); } _logger.LogInformation("Connessione REST destinazione stabilita: {CredentialName}, Entità: {EntityName}", restCredential.Name, restEntity.Name); return (restClient, restCredential, restEntity); } /// /// Ottiene tutti i record dalla sorgente /// private async Task>> GetAllRecordsFromSourceAsync(DataCouplerProfile profile, IDatabaseManager? databaseManager) { if (profile.SourceType.ToLower() == "database") { return await GetAllRecordsFromDatabaseAsync(profile, databaseManager!); } else if (profile.SourceType.ToLower() == "file") { return await GetAllRecordsFromFileAsync(profile); } throw new NotSupportedException($"Tipo sorgente non supportato: {profile.SourceType}"); } /// /// Parse del JSON dei field mappings /// private Dictionary ParseFieldMappings(string? fieldMappingJson) { var mappings = new Dictionary(); if (string.IsNullOrEmpty(fieldMappingJson)) { _logger.LogWarning("Field mapping JSON è vuoto o null"); return mappings; } _logger.LogDebug("Parsing field mappings JSON: {FieldMappingJson}", fieldMappingJson); try { // Usa le stesse opzioni di serializzazione del DataCouplerProfileService var options = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; var fieldMappingsList = JsonSerializer.Deserialize>(fieldMappingJson, options); if (fieldMappingsList != null) { _logger.LogInformation("Trovati {Count} field mappings nel JSON", fieldMappingsList.Count); foreach (var mapping in fieldMappingsList) { if (!string.IsNullOrEmpty(mapping.SourceField) && !string.IsNullOrEmpty(mapping.DestinationField)) { mappings[mapping.SourceField] = mapping.DestinationField; _logger.LogDebug("Mapping aggiunto: {SourceField} -> {DestinationField}", mapping.SourceField, mapping.DestinationField); } else { _logger.LogWarning("Mapping incompleto ignorato: SourceField='{SourceField}', DestinationField='{DestinationField}'", mapping.SourceField, mapping.DestinationField); } } } else { _logger.LogWarning("Deserializzazione ritornato null per field mappings JSON"); } } catch (Exception ex) { _logger.LogError(ex, "Errore nel parsing dei field mappings: {FieldMappingJson}", fieldMappingJson); } _logger.LogInformation("Mappings completati: {Count} associazioni", mappings.Count); return mappings; } /// /// Ottiene tutti i record dal database /// private async Task>> GetAllRecordsFromDatabaseAsync(DataCouplerProfile profile, IDatabaseManager databaseManager) { string query; if (!string.IsNullOrEmpty(profile.SourceCustomQuery)) { query = profile.SourceCustomQuery; } else if (!string.IsNullOrEmpty(profile.SourceTable)) { var tableName = !string.IsNullOrEmpty(profile.SourceSchema) ? $"[{profile.SourceSchema}].[{profile.SourceTable}]" : $"[{profile.SourceTable}]"; query = $"SELECT * FROM {tableName}"; } else { throw new InvalidOperationException("Né query custom né tabella specificata per la sorgente database"); } _logger.LogDebug("Esecuzione query sorgente: {Query}", query); var results = await databaseManager.ExecuteRawQueryAsync(query, profile.SourceDatabaseName ?? ""); return results; } /// /// Ottiene tutti i record da file (implementazione future) /// private async Task>> GetAllRecordsFromFileAsync(DataCouplerProfile profile) { if (string.IsNullOrEmpty(profile.SourceFilePath)) throw new InvalidOperationException("Percorso file sorgente non specificato"); // TODO: Implementazione per file Excel/CSV per le schedulazioni // Per ora restituiamo una lista vuota _logger.LogWarning("Lettura file non ancora implementata per le schedulazioni. File: {FilePath}", profile.SourceFilePath); await Task.Delay(1); // Placeholder async return new List>(); } /// /// Metodo principale per trasferimento dati standard (non Salesforce Composite) /// private async Task ExecuteDataTransferStandardAsync( DataCouplerProfile profile, IEnumerable> sourceRecords, IRestServiceClient restClient, RestEntitySummary restEntity, RestApiCredential restCredential, Dictionary fieldMappings) { _logger.LogInformation("Iniziando trasferimento dati standard per {RecordCount} record", sourceRecords.Count()); int successCount = 0; int errorCount = 0; int recordNumber = 1; foreach (var record in sourceRecords) { try { // 1. Trasforma il record utilizzando i field mappings var restData = TransformRecordForRest(record, fieldMappings); // 2. Gestione associazioni record se abilitata string? entityId = null; if (profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField)) { entityId = await HandleRecordAssociation(profile, record, restClient, restEntity, restCredential, restData, fieldMappings); } // 3. Se non abbiamo un ID esistente, crea un nuovo record if (string.IsNullOrEmpty(entityId)) { var result = await restClient.CreateEntityAsync(restEntity.Name, restData); if (result != null) { // Estrai l'ID dal risultato (diversi sistemi REST usano campi diversi) entityId = result.ContainsKey("id") ? result["id"]?.ToString() : result.ContainsKey("Id") ? result["Id"]?.ToString() : result.ContainsKey("DocEntry") ? result["DocEntry"]?.ToString() : null; successCount++; _logger.LogDebug("Record {RecordNumber} creato con successo. ID: {EntityId}", recordNumber, entityId); } else { errorCount++; _logger.LogWarning("Errore nella creazione del record {RecordNumber}: risultato null", recordNumber); } } else { successCount++; // Record aggiornato tramite associazione } // 4. Salva associazione se abilitata e abbiamo un ID if (profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField) && !string.IsNullOrEmpty(entityId)) { await SaveRecordAssociation(profile, record, restEntity, restCredential, restData, entityId, fieldMappings); } } catch (Exception ex) { errorCount++; _logger.LogError(ex, "Errore nel trasferimento del record {RecordNumber}", recordNumber); } recordNumber++; } _logger.LogInformation("Trasferimento completato. Successi: {SuccessCount}, Errori: {ErrorCount}", successCount, errorCount); return successCount; } /// /// Implementazione completa del trasferimento con Salesforce Composite API /// Equivalente a StartDataTransferWithComposite del DataCoupler.razor.cs /// private async Task ExecuteDataTransferWithCompositeAsync( DataCouplerProfile profile, IEnumerable> sourceRecords, IRestServiceClient restClient, RestEntitySummary restEntity, RestApiCredential restCredential, Dictionary fieldMappings) { _logger.LogInformation("Iniziando trasferimento dati COMPOSITE per {RecordCount} record", sourceRecords.Count()); // Verifica che sia effettivamente un SalesforceServiceClient if (!(restClient is DataConnection.REST.Implementations.SalesforceServiceClient salesforceClient)) { _logger.LogWarning("Client REST non è SalesforceServiceClient, fallback al metodo standard"); return await ExecuteDataTransferStandardAsync(profile, sourceRecords, restClient, restEntity, restCredential, fieldMappings); } try { // 1. Trasforma i record e analizza le associazioni IN PARALLELO var recordsForCreate = new ConcurrentBag<(Dictionary transformedData, Dictionary originalRecord, int recordNumber)>(); var recordsForUpdate = new ConcurrentBag<(Dictionary transformedData, string entityId, Dictionary originalRecord, int recordNumber, string newDataHash)>(); var recordsSkipped = new ConcurrentBag<(Dictionary originalRecord, int recordNumber, string reason)>(); var recordErrors = new ConcurrentBag<(Dictionary originalRecord, int recordNumber, string error)>(); // Cattura i valori condivisi per evitare race conditions var currentEntityName = restEntity.Name; var currentCredentialName = restCredential.Name; // Usa il nome della credenziale, non l'ID var currentUseRecordAssociations = profile.UseRecordAssociations; // Crea lista indicizzata per mantenere il record number var indexedRecords = sourceRecords.Select((record, index) => new { Record = record, RecordNumber = index + 1 }).ToList(); _logger.LogInformation("COMPOSITE SCHEDULED: Inizio analisi parallela di {RecordCount} record", indexedRecords.Count); var analysisStartTime = DateTime.UtcNow; // Processa tutti i record in parallelo var processingTasks = indexedRecords.Select(async indexedRecord => { try { var record = indexedRecord.Record; var recordNumber = indexedRecord.RecordNumber; // Trasforma il record in base ai mapping (operazione locale, thread-safe) var restData = TransformRecordForRest(record, fieldMappings); // Genera la chiave sorgente e l'hash dei dati per questo record (include MAPPING_SIGNATURE) var sourceKey = GenerateSourceKey(record, profile.SourceKeyField); var currentDataHash = GenerateDataHash(restData, fieldMappings); // Analizza le associazioni per capire se aggiornare, creare o saltare if (currentUseRecordAssociations && !string.IsNullOrEmpty(sourceKey)) { _logger.LogDebug("COMPOSITE SCHEDULED: Cerco associazione per KeyValue: '{KeyValue}', Entity: '{Entity}', Credential: '{Credential}'", sourceKey, currentEntityName, currentCredentialName); // Cerca associazione esistente usando il metodo parallelo var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync( sourceKey, currentEntityName, currentCredentialName); // FALLBACK: Se non troviamo l'associazione con tutti i parametri, proviamo solo con il KeyValue if (existingAssociation == null) { existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(sourceKey); if (existingAssociation != null) { // Verifica compatibilità if (existingAssociation.DestinationEntity != currentEntityName || existingAssociation.RestCredentialName != currentCredentialName) { existingAssociation = null; } } } // 🔍 PRE-DISCOVERY: Se non esiste associazione, cerca nella destinazione if (existingAssociation == null && !string.IsNullOrEmpty(profile.SourceKeyField)) { _logger.LogInformation("PRE-DISCOVERY SCHEDULED: Nessuna associazione trovata per '{KeyValue}'. Cerco nella destinazione...", sourceKey); // Cerca il campo destinazione mappato al campo chiave sorgente if (fieldMappings.TryGetValue(profile.SourceKeyField, out var mappedDestinationFieldName)) { try { // Prepara i campi di ricerca: usa il campo mappato + il valore della chiave var searchFields = new Dictionary { { mappedDestinationFieldName, sourceKey } }; _logger.LogInformation("PRE-DISCOVERY SCHEDULED: Cerco in '{Entity}' dove {Field} = '{Value}'", currentEntityName, mappedDestinationFieldName, sourceKey); // Cerca nella destinazione REST var existingEntities = await restClient.FindEntitiesByKeysAsync( currentEntityName, searchFields); if (existingEntities != null && existingEntities.Count > 0) { // Trovato! Prendi il primo risultato var foundEntity = existingEntities[0]; // Estrai l'ID del record trovato var destinationId = foundEntity.ContainsKey("Id") ? foundEntity["Id"]?.ToString() : foundEntity.ContainsKey("id") ? foundEntity["id"]?.ToString() : null; if (!string.IsNullOrEmpty(destinationId)) { _logger.LogInformation("PRE-DISCOVERY SCHEDULED: ✅ Trovato record esistente! KeyValue: '{KeyValue}' -> DestinationId: '{DestinationId}'", sourceKey, destinationId); // Crea l'associazione prima di procedere var newAssociation = new KeyAssociation { KeyValue = sourceKey, SourceKeyField = profile.SourceKeyField, DestinationKeyField = "Id", MappedDestinationField = mappedDestinationFieldName, DestinationEntity = currentEntityName, DestinationId = destinationId, RestCredentialName = currentCredentialName, CreatedAt = DateTime.UtcNow, LastVerifiedAt = DateTime.UtcNow, IsActive = true, Data_Hash = currentDataHash, AdditionalInfo = JsonSerializer.Serialize(new { CreatedBy = "PreDiscovery", DiscoveredAt = DateTime.UtcNow, MappingCount = fieldMappings.Count, ScheduledTransfer = true }) }; // Salva l'associazione (metodo parallelo thread-safe) var associationId = await _dataConnectionCredentialService.SaveKeyAssociationParallelAsync(newAssociation); _logger.LogInformation("PRE-DISCOVERY SCHEDULED: Associazione creata con ID: {AssociationId}", associationId); // Usa l'associazione appena creata per il resto del flusso existingAssociation = newAssociation; existingAssociation.Id = associationId; } else { _logger.LogWarning("PRE-DISCOVERY SCHEDULED: Record trovato ma senza ID valido per KeyValue: '{KeyValue}'", sourceKey); } } else { _logger.LogInformation("PRE-DISCOVERY SCHEDULED: Nessun record esistente trovato per KeyValue: '{KeyValue}'", sourceKey); } } catch (Exception discEx) { _logger.LogWarning(discEx, "PRE-DISCOVERY SCHEDULED: Errore durante la ricerca nella destinazione per KeyValue: '{KeyValue}'", sourceKey); // Continua comunque, il record verrà creato normalmente } } else { _logger.LogWarning("PRE-DISCOVERY SCHEDULED: Campo chiave '{SourceKeyField}' non trovato nei mappings. Skip discovery.", profile.SourceKeyField); } } if (existingAssociation != null && existingAssociation.IsActive) { // Verifica se l'associazione è stata creata dal Pre-Discovery var isPreDiscoveryAssociation = false; if (!string.IsNullOrEmpty(existingAssociation.AdditionalInfo)) { try { var additionalInfo = JsonSerializer.Deserialize>(existingAssociation.AdditionalInfo); if (additionalInfo != null && additionalInfo.ContainsKey("CreatedBy")) { var createdBy = additionalInfo["CreatedBy"]?.ToString(); isPreDiscoveryAssociation = createdBy == "PreDiscovery"; } } catch { // Ignora errori di parsing } } // 🔍 PRE-DISCOVERY: Se l'associazione è stata appena creata dal Pre-Discovery, FORZA l'aggiornamento if (isPreDiscoveryAssociation) { // Forza aggiornamento senza controllo hash recordsForUpdate.Add((restData, existingAssociation.DestinationId, record, recordNumber, currentDataHash)); _logger.LogInformation("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per AGGIORNAMENTO FORZATO (Pre-Discovery) - EntityId: {EntityId}", recordNumber, existingAssociation.DestinationId); } else { // CONTROLLO HASH: Verifica se i dati sono cambiati (solo per associazioni esistenti) var existingHash = existingAssociation.Data_Hash; if (!string.IsNullOrEmpty(existingHash) && existingHash.Equals(currentDataHash, StringComparison.OrdinalIgnoreCase)) { // I dati non sono cambiati, salta questo record recordsSkipped.Add((record, recordNumber, "Dati non modificati (hash identico)")); _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} saltato - hash identico: {Hash}", recordNumber, currentDataHash); } else { // I dati sono cambiati o l'hash è vuoto, procedi con l'aggiornamento recordsForUpdate.Add((restData, existingAssociation.DestinationId, record, recordNumber, currentDataHash)); _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per aggiornamento (EntityId: {EntityId}) - hash diverso: old={OldHash}, new={NewHash}", recordNumber, existingAssociation.DestinationId, existingHash ?? "NULL", currentDataHash); } } } else { // Record da creare (nessuna associazione esistente) recordsForCreate.Add((restData, record, recordNumber)); _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per creazione", recordNumber); } } else { // Record da creare (no associazioni) recordsForCreate.Add((restData, record, recordNumber)); _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per creazione (no associazioni)", recordNumber); } } catch (Exception ex) { _logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nella trasformazione del record {RecordNumber}", indexedRecord.RecordNumber); recordErrors.Add((indexedRecord.Record, indexedRecord.RecordNumber, ex.Message)); } }); // Attendi il completamento di tutte le operazioni parallele await Task.WhenAll(processingTasks); var analysisEndTime = DateTime.UtcNow; var analysisElapsed = (analysisEndTime - analysisStartTime).TotalMilliseconds; // Converti i ConcurrentBag in liste per il resto del processing var finalRecordsForCreate = recordsForCreate.ToList(); var finalRecordsForUpdate = recordsForUpdate.ToList(); var finalRecordsSkipped = recordsSkipped.ToList(); var finalRecordErrors = recordErrors.ToList(); _logger.LogInformation("COMPOSITE SCHEDULED: Analisi parallela completata in {ElapsedMs}ms - {CreateCount} record da creare, {UpdateCount} record da aggiornare, {SkippedCount} record saltati, {ErrorCount} errori", analysisElapsed, finalRecordsForCreate.Count, finalRecordsForUpdate.Count, finalRecordsSkipped.Count, finalRecordErrors.Count); // 2. Esegui le chiamate composite in parallelo var createTask = Task.FromResult(new List()); var updateTask = Task.FromResult(new List()); if (finalRecordsForCreate.Any()) { var createData = finalRecordsForCreate.Select(r => r.transformedData).ToList(); createTask = salesforceClient.BatchCreateEntitiesAsync(restEntity.Name, createData); } if (finalRecordsForUpdate.Any()) { var updateData = finalRecordsForUpdate.ToDictionary( r => r.entityId, r => r.transformedData); updateTask = salesforceClient.BatchUpdateEntitiesAsync(restEntity.Name, updateData); } // Attendi entrambe le operazioni await Task.WhenAll(createTask, updateTask); var createResults = await createTask; var updateResults = await updateTask; // 3. Processa i risultati delle creazioni int successCount = 0; int errorCount = finalRecordErrors.Count; int updatedCount = 0; // Lista per raccogliere le task di creazione associazioni var createAssociationTasks = new List(); for (int i = 0; i < createResults.Count; i++) { var result = createResults[i]; var originalData = finalRecordsForCreate[i]; if (result.Success) { successCount++; _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} creato con successo. EntityId: {EntityId}", originalData.recordNumber, result.EntityId); // Aggiungi task di creazione associazione alla lista (esecuzione parallela) if (currentUseRecordAssociations && !string.IsNullOrEmpty(result.EntityId)) { var dataHashForAssociation = GenerateDataHash(originalData.transformedData, fieldMappings); var associationTask = CreateAssociationAsync(profile, originalData.originalRecord, restCredential, result.EntityId, originalData.recordNumber, dataHashForAssociation); createAssociationTasks.Add(associationTask); } } else { errorCount++; _logger.LogError("COMPOSITE SCHEDULED: Errore creazione record {RecordNumber}: {Error}", originalData.recordNumber, result.ErrorMessage); } } // 4. Processa i risultati degli aggiornamenti var updateAssociationTasks = new List(); for (int i = 0; i < updateResults.Count; i++) { var result = updateResults[i]; var originalData = finalRecordsForUpdate[i]; if (result.Success) { updatedCount++; _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} aggiornato con successo. EntityId: {EntityId}", originalData.recordNumber, result.EntityId); // Aggiungi task di aggiornamento associazione alla lista (esecuzione parallela) if (currentUseRecordAssociations && !string.IsNullOrEmpty(result.EntityId)) { var updateHashTask = UpdateAssociationHashAsync(profile, originalData.originalRecord, restCredential, result.EntityId, originalData.newDataHash); updateAssociationTasks.Add(updateHashTask); } } else { errorCount++; _logger.LogError("COMPOSITE SCHEDULED: Errore aggiornamento record {RecordNumber}: {Error}", originalData.recordNumber, result.ErrorMessage); } } // 5. Esegui tutte le operazioni di associazione in parallelo var allAssociationTasks = createAssociationTasks.Concat(updateAssociationTasks).ToList(); if (allAssociationTasks.Any()) { _logger.LogInformation("COMPOSITE SCHEDULED: Avvio di {TaskCount} operazioni di associazione in parallelo ({CreateCount} creazioni, {UpdateCount} aggiornamenti)", allAssociationTasks.Count, createAssociationTasks.Count, updateAssociationTasks.Count); var startTime = DateTime.UtcNow; await Task.WhenAll(allAssociationTasks); var endTime = DateTime.UtcNow; _logger.LogInformation("COMPOSITE SCHEDULED: Operazioni di associazione completate in {ElapsedMs}ms", (endTime - startTime).TotalMilliseconds); } var skippedCount = finalRecordsSkipped.Count; var totalProcessed = successCount + updatedCount; _logger.LogInformation("COMPOSITE SCHEDULED: Trasferimento completato. Creazioni: {SuccessCount}, Aggiornamenti: {UpdatedCount}, Saltati: {SkippedCount}, Errori: {ErrorCount}", successCount, updatedCount, skippedCount, errorCount); return totalProcessed; } catch (Exception ex) { _logger.LogError(ex, "COMPOSITE SCHEDULED: Errore generale nel trasferimento dati"); throw; } } #region Helper Methods /// /// Trasforma un record sorgente in formato REST utilizzando i field mappings /// private Dictionary TransformRecordForRest(Dictionary sourceRecord, Dictionary fieldMappings) { var restData = new Dictionary(); foreach (var mapping in fieldMappings) { if (sourceRecord.ContainsKey(mapping.Key)) { var value = sourceRecord[mapping.Key]; // Trasforma il valore se necessario var transformedValue = TransformValueForRest(value); if (transformedValue != null) { restData[mapping.Value] = transformedValue; } } } return restData; } /// /// Trasforma un valore per il formato REST /// private object? TransformValueForRest(object? value) { if (value == null || value == DBNull.Value) return null; // Trasformazioni specifiche per tipo if (value is DateTime dateTime) { return dateTime.ToString("yyyy-MM-ddTHH:mm:ss.fffZ", CultureInfo.InvariantCulture); } if (value is decimal dec) { return dec.ToString(CultureInfo.InvariantCulture); } if (value is double dbl) { return dbl.ToString(CultureInfo.InvariantCulture); } if (value is float flt) { return flt.ToString(CultureInfo.InvariantCulture); } return value; } /// /// Gestisce l'associazione dei record per evitare duplicati /// private async Task HandleRecordAssociation( DataCouplerProfile profile, Dictionary sourceRecord, IRestServiceClient restClient, RestEntitySummary restEntity, RestApiCredential restCredential, Dictionary restData, Dictionary fieldMappings) { if (string.IsNullOrEmpty(profile.SourceKeyField) || !sourceRecord.ContainsKey(profile.SourceKeyField)) return null; var sourceKey = sourceRecord[profile.SourceKeyField]?.ToString(); if (string.IsNullOrEmpty(sourceKey)) return null; try { // Cerca associazione esistente usando il metodo parallelo var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync( sourceKey, restEntity.Name, restCredential.Name); if (existingAssociation != null && existingAssociation.IsActive) { // Genera l'hash dei dati correnti (solo dati trasformati/mappati, include MAPPING_SIGNATURE) var currentDataHash = GenerateDataHash(restData, fieldMappings); var existingHash = existingAssociation.Data_Hash; // Verifica se i dati sono cambiati if (!string.IsNullOrEmpty(existingHash) && existingHash.Equals(currentDataHash, StringComparison.OrdinalIgnoreCase)) { // I dati non sono cambiati, salta l'aggiornamento ma aggiorna LastVerifiedAt existingAssociation.LastVerifiedAt = DateTime.Now; await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation); _logger.LogDebug("Record non modificato, aggiornamento saltato. KeyValue: {KeyValue}, EntityId: {EntityId}, Hash: {Hash}", sourceKey, existingAssociation.DestinationId, currentDataHash); return existingAssociation.DestinationId; } // I dati sono cambiati, procedi con l'aggiornamento var updateResult = await restClient.UpdateEntityAsync(restEntity.Name, existingAssociation.DestinationId, restData); if (updateResult != null) { // Aggiorna l'hash e LastVerifiedAt nell'associazione existingAssociation.Data_Hash = currentDataHash; existingAssociation.UpdatedAt = DateTime.Now; existingAssociation.LastVerifiedAt = DateTime.Now; await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation); _logger.LogDebug("Record aggiornato tramite associazione. KeyValue: {KeyValue}, EntityId: {EntityId}, NewHash: {Hash}", sourceKey, existingAssociation.DestinationId, currentDataHash); return existingAssociation.DestinationId; } else { _logger.LogWarning("Fallimento aggiornamento record associato. KeyValue: {KeyValue}, EntityId: {EntityId}", sourceKey, existingAssociation.DestinationId); } } } catch (Exception ex) { _logger.LogError(ex, "Errore nella gestione dell'associazione per KeyValue: {KeyValue}", sourceKey); } return null; } /// /// Salva l'associazione tra record sorgente e destinazione /// private async Task SaveRecordAssociation( DataCouplerProfile profile, Dictionary sourceRecord, RestEntitySummary restEntity, RestApiCredential restCredential, Dictionary restData, string entityId, Dictionary fieldMappings) { if (string.IsNullOrEmpty(profile.SourceKeyField) || !sourceRecord.ContainsKey(profile.SourceKeyField)) return; var sourceKey = sourceRecord[profile.SourceKeyField]?.ToString(); if (string.IsNullOrEmpty(sourceKey)) return; try { // Genera l'hash dei dati per il controllo delle modifiche (solo dati trasformati/mappati, include MAPPING_SIGNATURE) var dataHash = GenerateDataHash(restData, fieldMappings); var association = new KeyAssociation { KeyValue = sourceKey, SourceKeyField = profile.SourceKeyField, DestinationKeyField = "Id", // Assumiamo "Id" come campo di default DestinationId = entityId, DestinationEntity = restEntity.Name, RestCredentialName = restCredential.Name, // Usa il nome della credenziale IsActive = true, Data_Hash = dataHash, CreatedAt = DateTime.Now, UpdatedAt = DateTime.Now, LastVerifiedAt = DateTime.Now, AdditionalInfo = JsonSerializer.Serialize(new { TransferDate = DateTime.Now, SourceType = profile.SourceType, DestinationType = profile.DestinationType, ProfileName = profile.Name, ScheduledTransfer = true, StandardTransfer = true, DataHashGenerated = true }) }; await _dataConnectionCredentialService.SaveKeyAssociationParallelAsync(association); _logger.LogDebug("Associazione salvata. KeyValue: {KeyValue}, EntityId: {EntityId}, Hash: {Hash}", sourceKey, entityId, dataHash); } catch (Exception ex) { _logger.LogError(ex, "Errore nel salvaggio dell'associazione per KeyValue: {KeyValue}", sourceKey); } } /// /// Genera la chiave sorgente per un record /// private string GenerateSourceKey(Dictionary record, string? sourceKeyField) { if (string.IsNullOrEmpty(sourceKeyField) || !record.ContainsKey(sourceKeyField)) return string.Empty; var keyValue = record[sourceKeyField]; return keyValue?.ToString() ?? string.Empty; } /// /// Genera un hash SHA256 dei dati del record passato come parametro. /// Utilizzato per rilevare cambiamenti nei dati e ottimizzare il trasferimento. /// Calcola l'hash SOLO sui campi presenti nel record, in ordine alfabetico. /// DEVE essere identico al metodo in DataCoupler.razor.cs per garantire consistenza. /// private string GenerateDataHash(Dictionary record, Dictionary? fieldMappings = null) { try { var valuesForHash = new List(); // Ordina le chiavi alfabeticamente per garantire consistenza var orderedKeys = record.Keys.OrderBy(k => k).ToList(); // Aggiungi i valori dei dati per ogni campo presente nel record foreach (var key in orderedKeys) { var value = record[key]; var normalizedValue = value?.ToString()?.Trim() ?? ""; valuesForHash.Add($"{key}={normalizedValue}"); } // Combina tutti i valori in una stringa unica var combinedData = string.Join("|", valuesForHash); _logger.LogDebug("Hash dei dati generato da: {CombinedData}", combinedData); // Calcola l'hash SHA256 (stesso algoritmo di DataCoupler.razor.cs) using (var sha256 = System.Security.Cryptography.SHA256.Create()) { var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(combinedData)); var hashString = Convert.ToHexString(hashBytes); _logger.LogDebug("Hash SHA256 generato: {Hash} per {FieldCount} campi", hashString, orderedKeys.Count); return hashString; } } catch (Exception ex) { _logger.LogWarning(ex, "Errore nella generazione dell'hash per il record"); return string.Empty; } } /// /// Crea una nuova associazione record in modo asincrono /// private async Task CreateAssociationAsync(DataCouplerProfile profile, Dictionary originalRecord, RestApiCredential restCredential, string entityId, int recordNumber, string dataHash) { try { if (string.IsNullOrEmpty(profile.SourceKeyField) || !originalRecord.ContainsKey(profile.SourceKeyField)) return; var sourceKey = originalRecord[profile.SourceKeyField]?.ToString(); if (string.IsNullOrEmpty(sourceKey)) return; // Calcola il MappingCount in modo sicuro e trova il campo destinazione mappato al campo chiave sorgente int mappingCount = 0; string? mappedDestinationField = null; try { if (!string.IsNullOrEmpty(profile.FieldMappingJson)) { var mappings = ParseFieldMappings(profile.FieldMappingJson); mappingCount = mappings?.Count ?? 0; // Cerca il campo destinazione mappato al campo chiave sorgente if (mappings != null && !string.IsNullOrEmpty(profile.SourceKeyField)) { if (mappings.TryGetValue(profile.SourceKeyField, out var destinationFieldName)) { mappedDestinationField = destinationFieldName; _logger.LogDebug("SCHEDULED MAPPING: Campo sorgente '{SourceField}' è mappato al campo destinazione '{DestField}'", profile.SourceKeyField, mappedDestinationField); } else { _logger.LogWarning("SCHEDULED MAPPING: Campo chiave sorgente '{SourceKeyField}' NON trovato nei mappings del profilo {ProfileName}", profile.SourceKeyField, profile.Name); } } } } catch (Exception ex) { _logger.LogWarning(ex, "Errore nel calcolo del MappingCount per l'associazione del record {RecordNumber}", recordNumber); } var association = new KeyAssociation { KeyValue = sourceKey, SourceKeyField = profile.SourceKeyField ?? "", DestinationKeyField = "Id", // Campo ID standard per REST MappedDestinationField = mappedDestinationField, // Campo destinazione mappato al campo chiave sorgente DestinationEntity = profile.DestinationEndpoint ?? "", DestinationId = entityId, RestCredentialName = restCredential.Name, // Usa il nome della credenziale IsActive = true, Data_Hash = dataHash, CreatedAt = DateTime.Now, UpdatedAt = DateTime.Now, LastVerifiedAt = DateTime.Now, AdditionalInfo = JsonSerializer.Serialize(new { TransferDate = DateTime.Now, RecordNumber = recordNumber, MappingCount = mappingCount, SourceType = profile.SourceType, DestinationType = profile.DestinationType, ProfileName = profile.Name, ScheduledTransfer = true, CompositeTransfer = true, DataHashGenerated = true }) }; var associationId = await _dataConnectionCredentialService.SaveKeyAssociationParallelAsync(association); _logger.LogDebug("COMPOSITE SCHEDULED: Associazione creata con ID: {AssociationId} per record {RecordNumber} - Key: {SourceKey}, EntityId: {EntityId}, Hash: {Hash}, MappedField: {MappedField}", associationId, recordNumber, sourceKey, entityId, dataHash, mappedDestinationField ?? "N/A"); } catch (Exception ex) { _logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nella creazione dell'associazione per il record {RecordNumber}", recordNumber); } } /// /// Aggiorna l'hash di un'associazione esistente /// private async Task UpdateAssociationHashAsync(DataCouplerProfile profile, Dictionary originalRecord, RestApiCredential restCredential, string entityId, string newDataHash) { try { if (string.IsNullOrEmpty(profile.SourceKeyField) || !originalRecord.ContainsKey(profile.SourceKeyField)) return; var sourceKey = originalRecord[profile.SourceKeyField]?.ToString(); if (string.IsNullOrEmpty(sourceKey)) return; // Trova l'associazione esistente usando il metodo parallelo var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync( sourceKey, profile.DestinationEndpoint ?? "", restCredential.Name); if (existingAssociation != null) { existingAssociation.Data_Hash = newDataHash; existingAssociation.UpdatedAt = DateTime.Now; existingAssociation.LastVerifiedAt = DateTime.Now; // Usa UpdateKeyAssociationAsync per l'aggiornamento invece di SaveKeyAssociationParallelAsync await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation); _logger.LogDebug("COMPOSITE SCHEDULED: Hash associazione aggiornato per entityId {EntityId} - Key: {SourceKey}, NewHash: {Hash}", entityId, sourceKey, newDataHash); } else { _logger.LogWarning("COMPOSITE SCHEDULED: Associazione non trovata per aggiornamento hash - EntityId: {EntityId}, SourceKey: {SourceKey}", entityId, sourceKey); } } catch (Exception ex) { _logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nell'aggiornamento dell'hash per EntityId: {EntityId}", entityId); } } #endregion }