using CredentialManager.Services; using CredentialManager.Models; using DataConnection.Interfaces; using DataConnection.REST.Interfaces; using DataConnection.REST.Models; using DataConnection.CredentialManagement.Interfaces; using DataConnection.CredentialManagement.Models; using Microsoft.Extensions.Logging; using System.Collections.Concurrent; using System.Globalization; using System.Security.Cryptography; using System.Text; using System.Text.Json; using Data_Coupler.Models; using Data_Coupler.Services; using ExcelDataReader; namespace Data_Coupler.Services; /// /// Servizio per l'esecuzione di profili Data Coupler schedulati con logica completa di trasferimento dati /// Implementa la stessa logica avanzata di StartDataTransferWithComposite per le schedulazioni /// public class ScheduledProfileExecutionService : IScheduledProfileExecutionService { private readonly IDataCouplerProfileService _profileService; private readonly IDataConnectionFactory _connectionFactory; private readonly ICredentialService _credentialService; private readonly IDataConnectionCredentialService _dataConnectionCredentialService; private readonly IKeyAssociationService _keyAssociationService; private readonly IAssociationService _associationService; private readonly IDeletionSyncService _deletionSyncService; private readonly ILogger _logger; public ScheduledProfileExecutionService( IDataCouplerProfileService profileService, IDataConnectionFactory connectionFactory, ICredentialService credentialService, IDataConnectionCredentialService dataConnectionCredentialService, IKeyAssociationService keyAssociationService, IAssociationService associationService, IDeletionSyncService deletionSyncService, ILogger logger) { _profileService = profileService; _connectionFactory = connectionFactory; _credentialService = credentialService; _dataConnectionCredentialService = dataConnectionCredentialService; _keyAssociationService = keyAssociationService; _associationService = associationService; _deletionSyncService = deletionSyncService; _logger = logger; } /// /// Esegue un profilo Data Coupler specificato dall'ID /// public async Task ExecuteProfileAsync(int profileId) { return await ExecuteProfileAsync(profileId, enableDeletionSync: false); } /// /// Esegue un profilo Data Coupler specificato dall'ID con configurazione sincronizzazione eliminazioni /// public async Task ExecuteProfileAsync(int profileId, bool enableDeletionSync) { var startTime = DateTime.UtcNow; var result = new ProfileExecutionResult { ExecutionTime = startTime // Ora locale per coerenza con l'interfaccia utente }; try { _logger.LogInformation("Inizio esecuzione profilo schedulato ID: {ProfileId} - DeletionSync: {DeletionSync}", profileId, enableDeletionSync); // Carica il profilo var profile = await _profileService.GetProfileByIdAsync(profileId); if (profile == null) { result.Success = false; result.Message = $"Profilo con ID {profileId} non trovato"; return result; } _logger.LogInformation("Esecuzione profilo: {ProfileName}", profile.Name); // Aggiorna la data di ultimo utilizzo await _profileService.UpdateLastUsedAsync(profile.Id); // Esegue il trasferimento dati con la logica completa var recordsTransferred = await ExecuteDataTransferAsync(profile, enableDeletionSync); result.Success = true; result.RecordsProcessed = recordsTransferred; result.Message = $"Trasferimento completato con successo. {recordsTransferred} record elaborati."; _logger.LogInformation("Profilo {ProfileName} eseguito con successo. Record elaborati: {RecordCount}", profile.Name, recordsTransferred); } catch (Exception ex) { result.Success = false; result.Message = $"Errore durante l'esecuzione: {ex.Message}"; _logger.LogError(ex, "Errore durante l'esecuzione del profilo ID: {ProfileId}", profileId); } finally { result.Duration = DateTime.UtcNow - startTime; } return result; } /// /// Metodo principale per l'esecuzione del trasferimento dati /// Implementa la stessa logica di StartDataTransferWithComposite /// private async Task ExecuteDataTransferAsync(DataCouplerProfile profile, bool enableDeletionSync = false) { _logger.LogInformation("=== INIZIO TRASFERIMENTO DATI SCHEDULATO ==="); _logger.LogInformation("Esecuzione profilo: {ProfileName} (ID: {ProfileId}) - DeletionSync: {DeletionSync}", profile.Name, profile.Id, enableDeletionSync); try { // 1. Setup delle connessioni sorgente e destinazione var (sourceManager, sourceCredential) = await SetupSourceConnectionAsync(profile); var (restClient, restCredential, restEntity) = await SetupDestinationConnectionAsync(profile); // 2. Verifica che le connessioni siano valide // Per i file, sourceManager sarà null (è normale), per database deve essere presente if (profile.SourceType.ToLower() == "database" && sourceManager == null) { throw new InvalidOperationException("Impossibile stabilire connessione con il database sorgente"); } if (profile.SourceType.ToLower() == "file" && string.IsNullOrEmpty(profile.SourceFilePath)) { throw new InvalidOperationException("Percorso file sorgente non specificato nel profilo"); } if (restClient == null || restEntity == null) { throw new InvalidOperationException("Impossibile stabilire connessione con la destinazione REST"); } // 3. Ottieni i record dalla sorgente var sourceRecords = await GetAllRecordsFromSourceAsync(profile, sourceManager); if (!sourceRecords.Any()) { _logger.LogWarning("Nessun record trovato nella sorgente per il profilo: {ProfileName}", profile.Name); return 0; } _logger.LogInformation("Ottenuti {RecordCount} record dalla sorgente", sourceRecords.Count()); // 4. Parse field mappings var fieldMappings = ParseFieldMappings(profile.FieldMappingJson); if (!fieldMappings.Any()) { throw new InvalidOperationException("Nessun mapping dei campi configurato per il profilo"); } // 4.5. Parse External ID Relationships (Salesforce) var externalIdRelationships = ParseExternalIdRelationships(profile.ExternalIdRelationshipsJson); if (externalIdRelationships.Any()) { _logger.LogInformation("Caricate {Count} External ID Relationships dal profilo", externalIdRelationships.Count); } // 4.6. Parse Default Values var defaultValues = ParseDefaultValues(profile.DefaultValuesJson); if (defaultValues.Any()) { _logger.LogInformation("Caricati {Count} default values dal profilo", defaultValues.Count); } // 5. Determina se utilizzare Salesforce Composite API bool useSalesforceComposite = restClient is DataConnection.REST.Implementations.SalesforceServiceClient; if (useSalesforceComposite) { _logger.LogInformation("Utilizzo Salesforce Composite API per il trasferimento"); return await ExecuteDataTransferWithCompositeAsync(profile, sourceRecords, restClient, restEntity, restCredential!, fieldMappings, defaultValues, externalIdRelationships, enableDeletionSync); } else { _logger.LogInformation("Utilizzo metodo trasferimento standard per il trasferimento"); return await ExecuteDataTransferStandardAsync(profile, sourceRecords, restClient, restEntity, restCredential!, fieldMappings, defaultValues, externalIdRelationships, enableDeletionSync); } } catch (Exception ex) { _logger.LogError(ex, "Errore durante il trasferimento dati per il profilo {ProfileName}", profile.Name); throw; } finally { _logger.LogInformation("=== FINE TRASFERIMENTO DATI SCHEDULATO ==="); } } /// /// Setup della connessione sorgente (database o file) /// private async Task<(IDatabaseManager? manager, DatabaseCredential? credential)> SetupSourceConnectionAsync(DataCouplerProfile profile) { _logger.LogInformation("SetupSourceConnectionAsync - SourceType: '{SourceType}', SourceCredentialId: {SourceCredentialId}, SourceFilePath: '{SourceFilePath}'", profile.SourceType, profile.SourceCredentialId, profile.SourceFilePath); // Se la sorgente è un file, non servono credenziali database if (string.IsNullOrEmpty(profile.SourceType) || profile.SourceType.Equals("file", StringComparison.OrdinalIgnoreCase)) { _logger.LogInformation("Sorgente tipo file, nessuna connessione database necessaria"); return (null, null); } // Per database, la credenziale è obbligatoria if (!profile.SourceCredentialId.HasValue) { throw new InvalidOperationException("Credenziale sorgente non specificata per il database"); } var sourceCredential = await _dataConnectionCredentialService.GetDatabaseCredentialAsync(profile.SourceCredentialId.Value); if (sourceCredential == null) { throw new InvalidOperationException($"Credenziale database sorgente con ID {profile.SourceCredentialId.Value} non trovata"); } // Applica override database se specificato (per le schedulazioni) if (!string.IsNullOrEmpty(profile.SourceDatabaseName)) { sourceCredential.DatabaseName = profile.SourceDatabaseName; } var databaseManager = await _connectionFactory.CreateDatabaseManagerAsync(sourceCredential.Name); // Test connessione var canConnect = await databaseManager.TestConnectionAsync(); if (!canConnect) { throw new InvalidOperationException($"Impossibile connettersi al database sorgente: {sourceCredential.Name}"); } _logger.LogInformation("Connessione database sorgente stabilita: {CredentialName}", sourceCredential.Name); return (databaseManager, sourceCredential); } /// /// Setup della connessione destinazione REST /// private async Task<(IRestServiceClient? client, RestApiCredential? credential, RestEntitySummary? entity)> SetupDestinationConnectionAsync(DataCouplerProfile profile) { if (profile.DestinationType.ToLower() != "rest") { throw new NotSupportedException("Solo destinazioni REST sono supportate per le schedulazioni"); } if (!profile.DestinationCredentialId.HasValue) { throw new InvalidOperationException("Credenziale destinazione REST non specificata"); } var restCredential = await _dataConnectionCredentialService.GetRestApiCredentialAsync(profile.DestinationCredentialId.Value); if (restCredential == null) { throw new InvalidOperationException($"Credenziale REST con ID {profile.DestinationCredentialId.Value} non trovata"); } var restClient = await _connectionFactory.CreateRestServiceClientAsync(restCredential.Name); // Autenticazione var authResult = await restClient.AuthenticateAsync(); if (!authResult) { throw new InvalidOperationException($"Autenticazione fallita per il servizio REST: {restCredential.Name}"); } // Ottieni l'entità REST (per le schedulazioni usiamo solo il nome) RestEntitySummary? restEntity = null; if (!string.IsNullOrEmpty(profile.DestinationEndpoint)) { // Per le schedulazioni, creiamo un'entità fittizia con il nome dal profilo restEntity = new RestEntitySummary { Name = profile.DestinationEndpoint, Label = profile.DestinationEndpoint }; } if (restEntity == null) { throw new InvalidOperationException($"Entità REST non specificata nel profilo"); } _logger.LogInformation("Connessione REST destinazione stabilita: {CredentialName}, Entità: {EntityName}", restCredential.Name, restEntity.Name); return (restClient, restCredential, restEntity); } /// /// Ottiene tutti i record dalla sorgente /// private async Task>> GetAllRecordsFromSourceAsync(DataCouplerProfile profile, IDatabaseManager? databaseManager) { if (profile.SourceType.ToLower() == "database") { return await GetAllRecordsFromDatabaseAsync(profile, databaseManager!); } else if (profile.SourceType.ToLower() == "file") { return await GetAllRecordsFromFileAsync(profile); } throw new NotSupportedException($"Tipo sorgente non supportato: {profile.SourceType}"); } /// /// Parse del JSON dei field mappings /// private Dictionary ParseFieldMappings(string? fieldMappingJson) { var mappings = new Dictionary(); if (string.IsNullOrEmpty(fieldMappingJson)) { _logger.LogWarning("Field mapping JSON è vuoto o null"); return mappings; } _logger.LogDebug("Parsing field mappings JSON: {FieldMappingJson}", fieldMappingJson); try { // Usa le stesse opzioni di serializzazione del DataCouplerProfileService var options = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; var fieldMappingsList = JsonSerializer.Deserialize>(fieldMappingJson, options); if (fieldMappingsList != null) { _logger.LogInformation("Trovati {Count} field mappings nel JSON", fieldMappingsList.Count); foreach (var mapping in fieldMappingsList) { if (!string.IsNullOrEmpty(mapping.SourceField) && !string.IsNullOrEmpty(mapping.DestinationField)) { mappings[mapping.SourceField] = mapping.DestinationField; _logger.LogDebug("Mapping aggiunto: {SourceField} -> {DestinationField}", mapping.SourceField, mapping.DestinationField); } else { _logger.LogWarning("Mapping incompleto ignorato: SourceField='{SourceField}', DestinationField='{DestinationField}'", mapping.SourceField, mapping.DestinationField); } } } else { _logger.LogWarning("Deserializzazione ritornato null per field mappings JSON"); } } catch (Exception ex) { _logger.LogError(ex, "Errore nel parsing dei field mappings: {FieldMappingJson}", fieldMappingJson); } _logger.LogInformation("Mappings completati: {Count} associazioni", mappings.Count); return mappings; } /// /// Deserializza gli External ID Relationships dal JSON del profilo /// private List ParseExternalIdRelationships(string? externalIdRelationshipsJson) { var relationships = new List(); if (string.IsNullOrEmpty(externalIdRelationshipsJson)) { _logger.LogDebug("ExternalIdRelationships JSON è vuoto o null"); return relationships; } _logger.LogDebug("Parsing ExternalIdRelationships JSON: {Json}", externalIdRelationshipsJson); try { var options = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; var relationshipsList = JsonSerializer.Deserialize>(externalIdRelationshipsJson, options); if (relationshipsList != null) { relationships = relationshipsList; _logger.LogInformation("Trovati {Count} External ID Relationships nel JSON", relationships.Count); foreach (var rel in relationships) { _logger.LogDebug("External ID Relationship: {RelationshipName} - {RelatedObject}.{ExternalIdField} <- {SourceField}", rel.RelationshipName, rel.RelatedObjectName, rel.ExternalIdField, rel.SourceField); } } else { _logger.LogWarning("Deserializzazione ritornato null per ExternalIdRelationships JSON"); } } catch (Exception ex) { _logger.LogError(ex, "Errore nel parsing degli ExternalIdRelationships: {Json}", externalIdRelationshipsJson); } return relationships; } /// /// Parse del JSON dei default values /// private Dictionary ParseDefaultValues(string? defaultValuesJson) { var defaultValues = new Dictionary(); if (string.IsNullOrEmpty(defaultValuesJson)) { _logger.LogDebug("DefaultValues JSON è vuoto o null"); return defaultValues; } _logger.LogDebug("Parsing DefaultValues JSON: {Json}", defaultValuesJson); try { var options = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; var deserializedDefaults = JsonSerializer.Deserialize>(defaultValuesJson, options); if (deserializedDefaults != null) { foreach (var entry in deserializedDefaults) { defaultValues[entry.Key] = (entry.Value.Value, entry.Value.Type); _logger.LogDebug("Default value: {Field} = {Value} ({Type})", entry.Key, entry.Value.Value, entry.Value.Type); } _logger.LogInformation("Trovati {Count} default values nel JSON", defaultValues.Count); } else { _logger.LogWarning("Deserializzazione ritornato null per DefaultValues JSON"); } } catch (Exception ex) { _logger.LogError(ex, "Errore nel parsing dei default values: {Json}", defaultValuesJson); } return defaultValues; } /// /// Ottiene tutti i record dal database /// private async Task>> GetAllRecordsFromDatabaseAsync(DataCouplerProfile profile, IDatabaseManager databaseManager) { string query; if (!string.IsNullOrEmpty(profile.SourceCustomQuery)) { query = profile.SourceCustomQuery; } else if (!string.IsNullOrEmpty(profile.SourceTable)) { var tableName = !string.IsNullOrEmpty(profile.SourceSchema) ? $"[{profile.SourceSchema}].[{profile.SourceTable}]" : $"[{profile.SourceTable}]"; query = $"SELECT * FROM {tableName}"; } else { throw new InvalidOperationException("Né query custom né tabella specificata per la sorgente database"); } _logger.LogDebug("Esecuzione query sorgente: {Query}", query); var results = await databaseManager.ExecuteRawQueryAsync(query, profile.SourceDatabaseName ?? ""); return results; } /// /// Ottiene tutti i record da file CSV o Excel /// private async Task>> GetAllRecordsFromFileAsync(DataCouplerProfile profile) { if (string.IsNullOrEmpty(profile.SourceFilePath)) throw new InvalidOperationException("Percorso file sorgente non specificato"); if (!System.IO.File.Exists(profile.SourceFilePath)) throw new FileNotFoundException($"Il file '{profile.SourceFilePath}' non esiste"); var extension = System.IO.Path.GetExtension(profile.SourceFilePath).ToLowerInvariant(); _logger.LogInformation("Lettura file sorgente: {FilePath} (Tipo: {Extension})", profile.SourceFilePath, extension); if (extension == ".csv") { return await ReadCsvFileAsync(profile.SourceFilePath); } else if (extension == ".xlsx" || extension == ".xls") { return await ReadExcelFileAsync(profile.SourceFilePath); } else { throw new NotSupportedException($"Formato file non supportato: {extension}. Utilizzare .csv, .xlsx o .xls"); } } /// /// Legge un file CSV e restituisce i record come dizionari /// private async Task>> ReadCsvFileAsync(string filePath) { var dataRows = new List>(); try { // Apri in modalità condivisa per permettere ad altri processi di accedere al file using var stream = new System.IO.FileStream(filePath, System.IO.FileMode.Open, System.IO.FileAccess.Read, System.IO.FileShare.ReadWrite); using var reader = new System.IO.StreamReader(stream); var firstLine = await reader.ReadLineAsync(); if (string.IsNullOrEmpty(firstLine)) { _logger.LogWarning("Il file CSV è vuoto: {FilePath}", filePath); return dataRows; } // Detect separator automatically var separator = DetectCsvSeparator(firstLine); _logger.LogDebug("CSV separator rilevato: '{Separator}'", separator); // Parse headers (first row) var headers = ParseCsvLine(firstLine, separator); _logger.LogInformation("CSV headers: {Headers} (Totale: {Count})", string.Join(", ", headers), headers.Count); // Read data rows string? line; int rowNumber = 2; while ((line = await reader.ReadLineAsync()) != null) { if (string.IsNullOrWhiteSpace(line)) continue; var values = ParseCsvLine(line, separator); var row = new Dictionary(); for (int i = 0; i < headers.Count; i++) { var value = i < values.Count ? values[i] : ""; row[headers[i]] = string.IsNullOrEmpty(value) ? "" : value; } dataRows.Add(row); rowNumber++; } _logger.LogInformation("File CSV letto con successo: {FilePath}, Record: {RecordCount}", filePath, dataRows.Count); return dataRows; } catch (Exception ex) { _logger.LogError(ex, "Errore nella lettura del file CSV: {FilePath}", filePath); throw; } } /// /// Legge un file Excel e restituisce i record come dizionari /// private async Task>> ReadExcelFileAsync(string filePath) { var dataRows = new List>(); try { // Registra il provider di encoding per ExcelDataReader System.Text.Encoding.RegisterProvider(System.Text.CodePagesEncodingProvider.Instance); // Apri in modalità condivisa per permettere ad altri processi di accedere al file using var stream = new System.IO.FileStream(filePath, System.IO.FileMode.Open, System.IO.FileAccess.Read, System.IO.FileShare.ReadWrite); var extension = System.IO.Path.GetExtension(filePath).ToLowerInvariant(); ExcelDataReader.IExcelDataReader reader; if (extension == ".xlsx") { reader = ExcelDataReader.ExcelReaderFactory.CreateOpenXmlReader(stream); } else if (extension == ".xls") { reader = ExcelDataReader.ExcelReaderFactory.CreateBinaryReader(stream); } else { throw new NotSupportedException($"Formato Excel non supportato: {extension}"); } using (reader) { var configuration = new ExcelDataReader.ExcelDataSetConfiguration() { ConfigureDataTable = (_) => new ExcelDataReader.ExcelDataTableConfiguration() { UseHeaderRow = true } }; var dataSet = reader.AsDataSet(configuration); _logger.LogInformation("File Excel letto: {FilePath}, Fogli: {SheetCount}", filePath, dataSet.Tables.Count); // Legge il primo foglio (o tutti i fogli se necessario) if (dataSet.Tables.Count > 0) { var table = dataSet.Tables[0]; var headers = new List(); foreach (System.Data.DataColumn column in table.Columns) { headers.Add(column.ColumnName); } foreach (System.Data.DataRow dataRow in table.Rows) { var row = new Dictionary(); foreach (var header in headers) { var value = dataRow[header]; row[header] = value == DBNull.Value ? "" : value?.ToString() ?? ""; } dataRows.Add(row); } _logger.LogInformation("Foglio Excel '{SheetName}' letto con successo: {RecordCount} record", table.TableName, dataRows.Count); } } await Task.CompletedTask; // Per mantenere il metodo async return dataRows; } catch (Exception ex) { _logger.LogError(ex, "Errore nella lettura del file Excel: {FilePath}", filePath); throw; } } /// /// Rileva automaticamente il separatore CSV /// private char DetectCsvSeparator(string line) { var separators = new[] { ',', ';', '\t', '|' }; var maxCount = 0; var detectedSeparator = ','; foreach (var sep in separators) { var count = line.Count(c => c == sep); if (count > maxCount) { maxCount = count; detectedSeparator = sep; } } return detectedSeparator; } /// /// Parse di una riga CSV gestendo correttamente le virgolette /// private List ParseCsvLine(string line, char separator = ',') { var result = new List(); var current = new System.Text.StringBuilder(); bool inQuotes = false; for (int i = 0; i < line.Length; i++) { char c = line[i]; if (c == '"') { if (inQuotes && i + 1 < line.Length && line[i + 1] == '"') { current.Append('"'); i++; } else { inQuotes = !inQuotes; } } else if (c == separator && !inQuotes) { result.Add(current.ToString().Trim()); current.Clear(); } else { current.Append(c); } } result.Add(current.ToString().Trim()); return result; } /// /// Metodo principale per trasferimento dati standard (non Salesforce Composite) /// private async Task ExecuteDataTransferStandardAsync( DataCouplerProfile profile, IEnumerable> sourceRecords, IRestServiceClient restClient, RestEntitySummary restEntity, RestApiCredential restCredential, Dictionary fieldMappings, Dictionary defaultValues, List externalIdRelationships, bool enableDeletionSync = false) { _logger.LogInformation("Iniziando trasferimento dati standard per {RecordCount} record - DeletionSync: {DeletionSync}", sourceRecords.Count(), enableDeletionSync); int successCount = 0; int errorCount = 0; int recordNumber = 1; foreach (var record in sourceRecords) { try { // 1. Trasforma il record utilizzando i field mappings, default values e External ID Relationships var restData = TransformRecordForRest(record, fieldMappings, defaultValues, externalIdRelationships); // 2. Gestione associazioni record se abilitata string? entityId = null; if (profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField)) { entityId = await HandleRecordAssociation(profile, record, restClient, restEntity, restCredential, restData, fieldMappings); } // 3. Se non abbiamo un ID esistente, crea un nuovo record if (string.IsNullOrEmpty(entityId)) { var result = await restClient.CreateEntityAsync(restEntity.Name, restData); if (result != null) { // Estrai l'ID dal risultato (diversi sistemi REST usano campi diversi) entityId = result.ContainsKey("id") ? result["id"]?.ToString() : result.ContainsKey("Id") ? result["Id"]?.ToString() : result.ContainsKey("DocEntry") ? result["DocEntry"]?.ToString() : null; successCount++; _logger.LogDebug("Record {RecordNumber} creato con successo. ID: {EntityId}", recordNumber, entityId); } else { errorCount++; _logger.LogWarning("Errore nella creazione del record {RecordNumber}: risultato null", recordNumber); } } else { successCount++; // Record aggiornato tramite associazione } // 4. Salva associazione se abilitata e abbiamo un ID if (profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField) && !string.IsNullOrEmpty(entityId)) { await SaveRecordAssociation(profile, record, restEntity, restCredential, restData, entityId, fieldMappings); } } catch (Exception ex) { errorCount++; _logger.LogError(ex, "Errore nel trasferimento del record {RecordNumber}", recordNumber); } recordNumber++; } _logger.LogInformation("Trasferimento completato. Successi: {SuccessCount}, Errori: {ErrorCount}", successCount, errorCount); // Sincronizzazione cancellazioni (se abilitata) if (enableDeletionSync && profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField)) { try { _logger.LogInformation("SCHEDULED: Inizio sincronizzazione cancellazioni..."); // Estrai tutti i valori chiave presenti nella sorgente var sourceKeyValues = sourceRecords .Select(r => r.ContainsKey(profile.SourceKeyField) ? r[profile.SourceKeyField]?.ToString() : null) .Where(k => !string.IsNullOrEmpty(k)) .Cast() .Distinct() .ToList(); _logger.LogInformation("SCHEDULED: Trovati {Count} valori chiave nella sorgente", sourceKeyValues.Count); // Sincronizza le cancellazioni var deletionOptions = new DeletionSyncOptions { Action = DeletionAction.Delete // Default: elimina fisicamente }; var deletionResult = await _deletionSyncService.SyncDeletionsAsync( sourceKeyValues, restEntity.Name, restCredential.Name, restClient, deletionOptions); if (deletionResult.DeletedRecordsDetected > 0) { _logger.LogInformation("SCHEDULED: Sincronizzazione cancellazioni completata - {Detected} rilevati, {Synced} sincronizzati, {Errors} errori", deletionResult.DeletedRecordsDetected, deletionResult.DeletedRecordsSynced, deletionResult.SyncErrors); } } catch (Exception delEx) { _logger.LogError(delEx, "SCHEDULED: Errore durante la sincronizzazione delle cancellazioni"); } } return successCount; } /// /// Implementazione completa del trasferimento con Salesforce Composite API /// Equivalente a StartDataTransferWithComposite del DataCoupler.razor.cs /// private async Task ExecuteDataTransferWithCompositeAsync( DataCouplerProfile profile, IEnumerable> sourceRecords, IRestServiceClient restClient, RestEntitySummary restEntity, RestApiCredential restCredential, Dictionary fieldMappings, Dictionary defaultValues, List externalIdRelationships, bool enableDeletionSync = false) { _logger.LogInformation("Iniziando trasferimento dati COMPOSITE per {RecordCount} record - DeletionSync: {DeletionSync}", sourceRecords.Count(), enableDeletionSync); // Verifica che sia effettivamente un SalesforceServiceClient if (!(restClient is DataConnection.REST.Implementations.SalesforceServiceClient salesforceClient)) { _logger.LogWarning("Client REST non è SalesforceServiceClient, fallback al metodo standard"); return await ExecuteDataTransferStandardAsync(profile, sourceRecords, restClient, restEntity, restCredential, fieldMappings, defaultValues, externalIdRelationships, enableDeletionSync); } try { // 1. Trasforma i record e analizza le associazioni IN PARALLELO var recordsForCreate = new ConcurrentBag<(Dictionary transformedData, Dictionary originalRecord, int recordNumber)>(); var recordsForUpdate = new ConcurrentBag<(Dictionary transformedData, string entityId, Dictionary originalRecord, int recordNumber, string newDataHash)>(); var recordsSkipped = new ConcurrentBag<(Dictionary originalRecord, int recordNumber, string reason)>(); var recordErrors = new ConcurrentBag<(Dictionary originalRecord, int recordNumber, string error)>(); // Cattura i valori condivisi per evitare race conditions var currentEntityName = restEntity.Name; var currentCredentialName = restCredential.Name; // Usa il nome della credenziale, non l'ID var currentUseRecordAssociations = profile.UseRecordAssociations; // Crea lista indicizzata per mantenere il record number var indexedRecords = sourceRecords.Select((record, index) => new { Record = record, RecordNumber = index + 1 }).ToList(); _logger.LogInformation("COMPOSITE SCHEDULED: Inizio analisi di {RecordCount} record", indexedRecords.Count); var analysisStartTime = DateTime.UtcNow; // === STEP A: Bulk Pre-Discovery (1 query SQLite + poche SOQL IN invece di 2N+N) === // Pre-calcolo locale: source key per ogni record (operazione thread-safe) var sourceKeyByRecordIndex = new Dictionary(indexedRecords.Count); foreach (var idx in indexedRecords) { var key = GenerateSourceKey(idx.Record, profile.SourceKeyField); if (!string.IsNullOrEmpty(key)) sourceKeyByRecordIndex[idx.RecordNumber] = key; } Dictionary associationsByKey = new(StringComparer.Ordinal); if (currentUseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField) && sourceKeyByRecordIndex.Count > 0) { var commonRequest = new PreDiscoveryRequest { SourceKeyField = profile.SourceKeyField, DestinationEntity = currentEntityName, CredentialName = currentCredentialName, DestinationKeyField = "Id", FieldMappings = fieldMappings, RestClient = restClient, EnablePreDiscovery = true, UseParallelMethod = true, IsScheduledTransfer = true, SourceType = profile.SourceType }; associationsByKey = await _associationService.BatchFindOrCreateAssociationsAsync( sourceKeyByRecordIndex.Values, commonRequest); _logger.LogInformation("COMPOSITE SCHEDULED: Bulk Pre-Discovery completata - {Found}/{Total} associazioni risolte", associationsByKey.Count, sourceKeyByRecordIndex.Count); } // === STEP B: Analisi locale parallela per decidere create/update/skip === // Nessuna chiamata DB o REST in questo loop — solo memoria. var processingTasks = indexedRecords.Select(async indexedRecord => { try { var record = indexedRecord.Record; var recordNumber = indexedRecord.RecordNumber; // Trasforma il record in base ai mapping e External ID Relationships (operazione locale, thread-safe) var restData = TransformRecordForRest(record, fieldMappings, defaultValues, externalIdRelationships); // Genera la chiave sorgente e l'hash dei dati per questo record (include MAPPING_SIGNATURE) var sourceKey = GenerateSourceKey(record, profile.SourceKeyField); var currentDataHash = GenerateDataHash(restData, fieldMappings); // Analizza le associazioni per capire se aggiornare, creare o saltare if (currentUseRecordAssociations && !string.IsNullOrEmpty(sourceKey)) { associationsByKey.TryGetValue(sourceKey, out var existingAssociation); if (existingAssociation != null && existingAssociation.IsActive) { // 🔍 PRE-DISCOVERY: Usa il servizio per verificare se è un'associazione Pre-Discovery var isPreDiscoveryAssociation = _associationService.IsPreDiscoveryAssociation(existingAssociation); // Se l'associazione è stata appena creata dal Pre-Discovery, FORZA l'aggiornamento if (isPreDiscoveryAssociation) { // Forza aggiornamento senza controllo hash recordsForUpdate.Add((restData, existingAssociation.DestinationId, record, recordNumber, currentDataHash)); _logger.LogInformation("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per AGGIORNAMENTO FORZATO (Pre-Discovery) - EntityId: {EntityId}", recordNumber, existingAssociation.DestinationId); } else { // CONTROLLO HASH: Verifica se i dati sono cambiati (solo per associazioni esistenti) var existingHash = existingAssociation.Data_Hash; if (!string.IsNullOrEmpty(existingHash) && existingHash.Equals(currentDataHash, StringComparison.OrdinalIgnoreCase)) { // I dati non sono cambiati, salta questo record recordsSkipped.Add((record, recordNumber, "Dati non modificati (hash identico)")); _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} saltato - hash identico: {Hash}", recordNumber, currentDataHash); } else { // I dati sono cambiati o l'hash è vuoto, procedi con l'aggiornamento recordsForUpdate.Add((restData, existingAssociation.DestinationId, record, recordNumber, currentDataHash)); _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per aggiornamento (EntityId: {EntityId}) - hash diverso: old={OldHash}, new={NewHash}", recordNumber, existingAssociation.DestinationId, existingHash ?? "NULL", currentDataHash); } } } else { // Record da creare (nessuna associazione esistente) recordsForCreate.Add((restData, record, recordNumber)); _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per creazione", recordNumber); } } else { // Record da creare (no associazioni) recordsForCreate.Add((restData, record, recordNumber)); _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per creazione (no associazioni)", recordNumber); } } catch (Exception ex) { _logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nella trasformazione del record {RecordNumber}", indexedRecord.RecordNumber); recordErrors.Add((indexedRecord.Record, indexedRecord.RecordNumber, ex.Message)); } }); // Attendi il completamento di tutte le operazioni parallele await Task.WhenAll(processingTasks); var analysisEndTime = DateTime.UtcNow; var analysisElapsed = (analysisEndTime - analysisStartTime).TotalMilliseconds; // Converti i ConcurrentBag in liste per il resto del processing var finalRecordsForCreate = recordsForCreate.ToList(); var finalRecordsForUpdate = recordsForUpdate.ToList(); var finalRecordsSkipped = recordsSkipped.ToList(); var finalRecordErrors = recordErrors.ToList(); _logger.LogInformation("COMPOSITE SCHEDULED: Analisi parallela completata in {ElapsedMs}ms - {CreateCount} record da creare, {UpdateCount} record da aggiornare, {SkippedCount} record saltati, {ErrorCount} errori", analysisElapsed, finalRecordsForCreate.Count, finalRecordsForUpdate.Count, finalRecordsSkipped.Count, finalRecordErrors.Count); // 2. Esegui le chiamate composite in parallelo var createTask = Task.FromResult(new List()); var updateTask = Task.FromResult(new List()); if (finalRecordsForCreate.Any()) { var createData = finalRecordsForCreate.Select(r => r.transformedData).ToList(); createTask = salesforceClient.BatchCreateEntitiesAsync(restEntity.Name, createData); } if (finalRecordsForUpdate.Any()) { var updateData = finalRecordsForUpdate.ToDictionary( r => r.entityId, r => r.transformedData); updateTask = salesforceClient.BatchUpdateEntitiesAsync(restEntity.Name, updateData); } // Attendi entrambe le operazioni await Task.WhenAll(createTask, updateTask); var createResults = await createTask; var updateResults = await updateTask; // 3. Processa i risultati delle creazioni int successCount = 0; int errorCount = finalRecordErrors.Count; int updatedCount = 0; // Lista per raccogliere le task di creazione associazioni var createAssociationTasks = new List(); for (int i = 0; i < createResults.Count; i++) { var result = createResults[i]; var originalData = finalRecordsForCreate[i]; if (result.Success) { successCount++; _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} creato con successo. EntityId: {EntityId}", originalData.recordNumber, result.EntityId); // Aggiungi task di creazione associazione alla lista (esecuzione parallela) if (currentUseRecordAssociations && !string.IsNullOrEmpty(result.EntityId)) { var dataHashForAssociation = GenerateDataHash(originalData.transformedData, fieldMappings); var associationTask = CreateAssociationAsync(profile, originalData.originalRecord, restCredential, result.EntityId, originalData.recordNumber, dataHashForAssociation); createAssociationTasks.Add(associationTask); } } else { errorCount++; _logger.LogError("COMPOSITE SCHEDULED: Errore creazione record {RecordNumber}: {Error}", originalData.recordNumber, result.ErrorMessage); } } // 4. Processa i risultati degli aggiornamenti var updateAssociationTasks = new List(); for (int i = 0; i < updateResults.Count; i++) { var result = updateResults[i]; var originalData = finalRecordsForUpdate[i]; if (result.Success) { updatedCount++; _logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} aggiornato con successo. EntityId: {EntityId}", originalData.recordNumber, result.EntityId); // Aggiungi task di aggiornamento associazione alla lista (esecuzione parallela) if (currentUseRecordAssociations && !string.IsNullOrEmpty(result.EntityId)) { var updateHashTask = UpdateAssociationHashAsync(profile, originalData.originalRecord, restCredential, result.EntityId, originalData.newDataHash); updateAssociationTasks.Add(updateHashTask); } } else { errorCount++; _logger.LogError("COMPOSITE SCHEDULED: Errore aggiornamento record {RecordNumber}: {Error}", originalData.recordNumber, result.ErrorMessage); } } // 5. Esegui tutte le operazioni di associazione in parallelo var allAssociationTasks = createAssociationTasks.Concat(updateAssociationTasks).ToList(); if (allAssociationTasks.Any()) { _logger.LogInformation("COMPOSITE SCHEDULED: Avvio di {TaskCount} operazioni di associazione in parallelo ({CreateCount} creazioni, {UpdateCount} aggiornamenti)", allAssociationTasks.Count, createAssociationTasks.Count, updateAssociationTasks.Count); var startTime = DateTime.UtcNow; await Task.WhenAll(allAssociationTasks); var endTime = DateTime.UtcNow; _logger.LogInformation("COMPOSITE SCHEDULED: Operazioni di associazione completate in {ElapsedMs}ms", (endTime - startTime).TotalMilliseconds); } var skippedCount = finalRecordsSkipped.Count; var totalProcessed = successCount + updatedCount; _logger.LogInformation("COMPOSITE SCHEDULED: Trasferimento completato. Creazioni: {SuccessCount}, Aggiornamenti: {UpdatedCount}, Saltati: {SkippedCount}, Errori: {ErrorCount}", successCount, updatedCount, skippedCount, errorCount); // Sincronizzazione cancellazioni (se abilitata) if (enableDeletionSync && currentUseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField)) { try { _logger.LogInformation("COMPOSITE SCHEDULED: Inizio sincronizzazione cancellazioni..."); // Estrai tutti i valori chiave presenti nella sorgente var sourceKeyValues = sourceRecords .Select(r => r.ContainsKey(profile.SourceKeyField) ? r[profile.SourceKeyField]?.ToString() : null) .Where(k => !string.IsNullOrEmpty(k)) .Cast() .Distinct() .ToList(); _logger.LogInformation("COMPOSITE SCHEDULED: Trovati {Count} valori chiave nella sorgente", sourceKeyValues.Count); // Sincronizza le cancellazioni var deletionOptions = new DeletionSyncOptions { Action = DeletionAction.Delete // Default: elimina fisicamente }; var deletionResult = await _deletionSyncService.SyncDeletionsAsync( sourceKeyValues, currentEntityName, currentCredentialName, restClient, deletionOptions); if (deletionResult.DeletedRecordsDetected > 0) { _logger.LogInformation("COMPOSITE SCHEDULED: Sincronizzazione cancellazioni completata - {Detected} rilevati, {Synced} sincronizzati, {Errors} errori", deletionResult.DeletedRecordsDetected, deletionResult.DeletedRecordsSynced, deletionResult.SyncErrors); } } catch (Exception delEx) { _logger.LogError(delEx, "COMPOSITE SCHEDULED: Errore durante la sincronizzazione delle cancellazioni"); } } return totalProcessed; } catch (Exception ex) { _logger.LogError(ex, "COMPOSITE SCHEDULED: Errore generale nel trasferimento dati"); throw; } } #region Helper Methods /// /// Trasforma un record sorgente in formato REST utilizzando i field mappings /// private Dictionary TransformRecordForRest( Dictionary sourceRecord, Dictionary fieldMappings, Dictionary defaultValues, List? externalIdRelationships = null) { var restData = new Dictionary(); // Costruisce un set dei campi sorgente usati esclusivamente come External ID Relationship: // questi NON devono essere inviati anche come mapping normale (stessa logica della UI manuale). var externalIdSourceFields = (externalIdRelationships != null) ? externalIdRelationships .Where(r => !string.IsNullOrWhiteSpace(r.SourceField)) .Select(r => r.SourceField) .ToHashSet() : new HashSet(); // 1. Applica field mappings (escludendo i campi sorgente usati per External ID Relationships) foreach (var mapping in fieldMappings) { // Salta il campo se è usato come sorgente in un External ID Relationship if (externalIdSourceFields.Contains(mapping.Key)) { _logger.LogDebug("Campo sorgente '{SourceField}' usato in External ID Relationship, escluso dal mapping normale", mapping.Key); continue; } if (sourceRecord.ContainsKey(mapping.Key)) { var value = sourceRecord[mapping.Key]; // Trasforma il valore se necessario var transformedValue = TransformValueForRest(value); if (transformedValue != null) { restData[mapping.Value] = transformedValue; } } } // 2. Applica default values (solo se il campo non è già stato mappato) foreach (var defaultValue in defaultValues) { if (!restData.ContainsKey(defaultValue.Key)) { var (value, type) = defaultValue.Value; if (value != null) { restData[defaultValue.Key] = value; _logger.LogDebug("Applicato default value: {Field} = {Value} ({Type})", defaultValue.Key, value, type); } } } // 3. Aggiungi External ID Relationships (per Salesforce) if (externalIdRelationships != null && externalIdRelationships.Any()) { foreach (var relationship in externalIdRelationships) { if (!string.IsNullOrWhiteSpace(relationship.SourceField) && sourceRecord.ContainsKey(relationship.SourceField)) { var sourceValue = sourceRecord[relationship.SourceField]; var transformedValue = TransformValueForRest(sourceValue); if (transformedValue != null) { // Crea il dizionario annidato per l'External ID Relationship // Formato: { "Account__r": { "Country__c": "US" } } var externalIdObject = new Dictionary { { relationship.ExternalIdField, transformedValue } }; restData[relationship.RelationshipName] = externalIdObject; _logger.LogDebug("Aggiunta External ID Relationship: {RelationshipName} → {ExternalIdField} = {Value}", relationship.RelationshipName, relationship.ExternalIdField, transformedValue); } } } } return restData; } /// /// Trasforma un valore per il formato REST /// private object? TransformValueForRest(object? value) { if (value == null || value == DBNull.Value) return null; // Trasformazioni specifiche per tipo if (value is DateTime dateTime) { return dateTime.ToString("yyyy-MM-ddTHH:mm:ss.fffZ", CultureInfo.InvariantCulture); } if (value is decimal dec) { return dec.ToString(CultureInfo.InvariantCulture); } if (value is double dbl) { return dbl.ToString(CultureInfo.InvariantCulture); } if (value is float flt) { return flt.ToString(CultureInfo.InvariantCulture); } return value; } /// /// Gestisce l'associazione dei record per evitare duplicati /// private async Task HandleRecordAssociation( DataCouplerProfile profile, Dictionary sourceRecord, IRestServiceClient restClient, RestEntitySummary restEntity, RestApiCredential restCredential, Dictionary restData, Dictionary fieldMappings) { if (string.IsNullOrEmpty(profile.SourceKeyField) || !sourceRecord.ContainsKey(profile.SourceKeyField)) return null; var sourceKey = sourceRecord[profile.SourceKeyField]?.ToString(); if (string.IsNullOrEmpty(sourceKey)) return null; try { // Cerca associazione esistente usando il metodo parallelo var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync( sourceKey, restEntity.Name, restCredential.Name); if (existingAssociation != null && existingAssociation.IsActive) { // Genera l'hash dei dati correnti (solo dati trasformati/mappati, include MAPPING_SIGNATURE) var currentDataHash = GenerateDataHash(restData, fieldMappings); var existingHash = existingAssociation.Data_Hash; // Verifica se i dati sono cambiati if (!string.IsNullOrEmpty(existingHash) && existingHash.Equals(currentDataHash, StringComparison.OrdinalIgnoreCase)) { // I dati non sono cambiati, salta l'aggiornamento ma aggiorna LastVerifiedAt existingAssociation.LastVerifiedAt = DateTime.Now; await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation); _logger.LogDebug("Record non modificato, aggiornamento saltato. KeyValue: {KeyValue}, EntityId: {EntityId}, Hash: {Hash}", sourceKey, existingAssociation.DestinationId, currentDataHash); return existingAssociation.DestinationId; } // I dati sono cambiati, procedi con l'aggiornamento var updateResult = await restClient.UpdateEntityAsync(restEntity.Name, existingAssociation.DestinationId, restData); if (updateResult != null) { // Aggiorna l'hash e LastVerifiedAt nell'associazione existingAssociation.Data_Hash = currentDataHash; existingAssociation.UpdatedAt = DateTime.Now; existingAssociation.LastVerifiedAt = DateTime.Now; await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation); _logger.LogDebug("Record aggiornato tramite associazione. KeyValue: {KeyValue}, EntityId: {EntityId}, NewHash: {Hash}", sourceKey, existingAssociation.DestinationId, currentDataHash); return existingAssociation.DestinationId; } else { _logger.LogWarning("Fallimento aggiornamento record associato. KeyValue: {KeyValue}, EntityId: {EntityId}", sourceKey, existingAssociation.DestinationId); } } } catch (Exception ex) { _logger.LogError(ex, "Errore nella gestione dell'associazione per KeyValue: {KeyValue}", sourceKey); } return null; } /// /// Salva l'associazione tra record sorgente e destinazione /// private async Task SaveRecordAssociation( DataCouplerProfile profile, Dictionary sourceRecord, RestEntitySummary restEntity, RestApiCredential restCredential, Dictionary restData, string entityId, Dictionary fieldMappings) { if (string.IsNullOrEmpty(profile.SourceKeyField) || !sourceRecord.ContainsKey(profile.SourceKeyField)) return; var sourceKey = sourceRecord[profile.SourceKeyField]?.ToString(); if (string.IsNullOrEmpty(sourceKey)) return; try { // Genera l'hash dei dati per il controllo delle modifiche (solo dati trasformati/mappati, include MAPPING_SIGNATURE) var dataHash = GenerateDataHash(restData, fieldMappings); var association = new KeyAssociation { KeyValue = sourceKey, SourceKeyField = profile.SourceKeyField, DestinationKeyField = "Id", // Assumiamo "Id" come campo di default DestinationId = entityId, DestinationEntity = restEntity.Name, RestCredentialName = restCredential.Name, // Usa il nome della credenziale IsActive = true, Data_Hash = dataHash, CreatedAt = DateTime.Now, UpdatedAt = DateTime.Now, LastVerifiedAt = DateTime.Now, AdditionalInfo = JsonSerializer.Serialize(new { TransferDate = DateTime.Now, SourceType = profile.SourceType, DestinationType = profile.DestinationType, ProfileName = profile.Name, ScheduledTransfer = true, StandardTransfer = true, DataHashGenerated = true }) }; await _dataConnectionCredentialService.SaveKeyAssociationParallelAsync(association); _logger.LogDebug("Associazione salvata. KeyValue: {KeyValue}, EntityId: {EntityId}, Hash: {Hash}", sourceKey, entityId, dataHash); } catch (Exception ex) { _logger.LogError(ex, "Errore nel salvaggio dell'associazione per KeyValue: {KeyValue}", sourceKey); } } /// /// Genera la chiave sorgente per un record /// private string GenerateSourceKey(Dictionary record, string? sourceKeyField) { if (string.IsNullOrEmpty(sourceKeyField) || !record.ContainsKey(sourceKeyField)) return string.Empty; var keyValue = record[sourceKeyField]; return keyValue?.ToString() ?? string.Empty; } /// /// Genera un hash SHA256 dei dati del record passato come parametro. /// Utilizzato per rilevare cambiamenti nei dati e ottimizzare il trasferimento. /// Calcola l'hash SOLO sui campi presenti nel record, in ordine alfabetico. /// DEVE essere identico al metodo in DataCoupler.razor.cs per garantire consistenza. /// private string GenerateDataHash(Dictionary record, Dictionary? fieldMappings = null) { try { var valuesForHash = new List(); // Ordina le chiavi alfabeticamente per garantire consistenza var orderedKeys = record.Keys.OrderBy(k => k).ToList(); // Aggiungi i valori dei dati per ogni campo presente nel record foreach (var key in orderedKeys) { var value = record[key]; var normalizedValue = value?.ToString()?.Trim() ?? ""; valuesForHash.Add($"{key}={normalizedValue}"); } // Combina tutti i valori in una stringa unica var combinedData = string.Join("|", valuesForHash); _logger.LogDebug("Hash dei dati generato da: {CombinedData}", combinedData); // Calcola l'hash SHA256 (stesso algoritmo di DataCoupler.razor.cs) using (var sha256 = System.Security.Cryptography.SHA256.Create()) { var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(combinedData)); var hashString = Convert.ToHexString(hashBytes); _logger.LogDebug("Hash SHA256 generato: {Hash} per {FieldCount} campi", hashString, orderedKeys.Count); return hashString; } } catch (Exception ex) { _logger.LogWarning(ex, "Errore nella generazione dell'hash per il record"); return string.Empty; } } /// /// Crea una nuova associazione record in modo asincrono /// private async Task CreateAssociationAsync(DataCouplerProfile profile, Dictionary originalRecord, RestApiCredential restCredential, string entityId, int recordNumber, string dataHash) { try { if (string.IsNullOrEmpty(profile.SourceKeyField) || !originalRecord.ContainsKey(profile.SourceKeyField)) return; var sourceKey = originalRecord[profile.SourceKeyField]?.ToString(); if (string.IsNullOrEmpty(sourceKey)) return; // Calcola il MappingCount in modo sicuro e trova il campo destinazione mappato al campo chiave sorgente int mappingCount = 0; string? mappedDestinationField = null; try { if (!string.IsNullOrEmpty(profile.FieldMappingJson)) { var mappings = ParseFieldMappings(profile.FieldMappingJson); mappingCount = mappings?.Count ?? 0; // Cerca il campo destinazione mappato al campo chiave sorgente if (mappings != null && !string.IsNullOrEmpty(profile.SourceKeyField)) { if (mappings.TryGetValue(profile.SourceKeyField, out var destinationFieldName)) { mappedDestinationField = destinationFieldName; _logger.LogDebug("SCHEDULED MAPPING: Campo sorgente '{SourceField}' è mappato al campo destinazione '{DestField}'", profile.SourceKeyField, mappedDestinationField); } else { _logger.LogWarning("SCHEDULED MAPPING: Campo chiave sorgente '{SourceKeyField}' NON trovato nei mappings del profilo {ProfileName}", profile.SourceKeyField, profile.Name); } } } } catch (Exception ex) { _logger.LogWarning(ex, "Errore nel calcolo del MappingCount per l'associazione del record {RecordNumber}", recordNumber); } var association = new KeyAssociation { KeyValue = sourceKey, SourceKeyField = profile.SourceKeyField ?? "", DestinationKeyField = "Id", // Campo ID standard per REST MappedDestinationField = mappedDestinationField, // Campo destinazione mappato al campo chiave sorgente DestinationEntity = profile.DestinationEndpoint ?? "", DestinationId = entityId, RestCredentialName = restCredential.Name, // Usa il nome della credenziale IsActive = true, Data_Hash = dataHash, CreatedAt = DateTime.Now, UpdatedAt = DateTime.Now, LastVerifiedAt = DateTime.Now, AdditionalInfo = JsonSerializer.Serialize(new { TransferDate = DateTime.Now, RecordNumber = recordNumber, MappingCount = mappingCount, SourceType = profile.SourceType, DestinationType = profile.DestinationType, ProfileName = profile.Name, ScheduledTransfer = true, CompositeTransfer = true, DataHashGenerated = true }) }; var associationId = await _dataConnectionCredentialService.SaveKeyAssociationParallelAsync(association); _logger.LogDebug("COMPOSITE SCHEDULED: Associazione creata con ID: {AssociationId} per record {RecordNumber} - Key: {SourceKey}, EntityId: {EntityId}, Hash: {Hash}, MappedField: {MappedField}", associationId, recordNumber, sourceKey, entityId, dataHash, mappedDestinationField ?? "N/A"); } catch (Exception ex) { _logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nella creazione dell'associazione per il record {RecordNumber}", recordNumber); } } /// /// Aggiorna l'hash di un'associazione esistente /// private async Task UpdateAssociationHashAsync(DataCouplerProfile profile, Dictionary originalRecord, RestApiCredential restCredential, string entityId, string newDataHash) { try { if (string.IsNullOrEmpty(profile.SourceKeyField) || !originalRecord.ContainsKey(profile.SourceKeyField)) return; var sourceKey = originalRecord[profile.SourceKeyField]?.ToString(); if (string.IsNullOrEmpty(sourceKey)) return; // Trova l'associazione esistente usando il metodo parallelo var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync( sourceKey, profile.DestinationEndpoint ?? "", restCredential.Name); if (existingAssociation != null) { existingAssociation.Data_Hash = newDataHash; existingAssociation.UpdatedAt = DateTime.Now; existingAssociation.LastVerifiedAt = DateTime.Now; // Usa UpdateKeyAssociationAsync per l'aggiornamento invece di SaveKeyAssociationParallelAsync await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation); _logger.LogDebug("COMPOSITE SCHEDULED: Hash associazione aggiornato per entityId {EntityId} - Key: {SourceKey}, NewHash: {Hash}", entityId, sourceKey, newDataHash); } else { _logger.LogWarning("COMPOSITE SCHEDULED: Associazione non trovata per aggiornamento hash - EntityId: {EntityId}, SourceKey: {SourceKey}", entityId, sourceKey); } } catch (Exception ex) { _logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nell'aggiornamento dell'hash per EntityId: {EntityId}", entityId); } } #endregion }