using CredentialManager.Services;
using CredentialManager.Models;
using DataConnection.Interfaces;
using DataConnection.REST.Interfaces;
using DataConnection.REST.Models;
using DataConnection.CredentialManagement.Interfaces;
using DataConnection.CredentialManagement.Models;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Globalization;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using Data_Coupler.Models;
using Data_Coupler.Services;
using ExcelDataReader;
namespace Data_Coupler.Services;
///
/// Servizio per l'esecuzione di profili Data Coupler schedulati con logica completa di trasferimento dati
/// Implementa la stessa logica avanzata di StartDataTransferWithComposite per le schedulazioni
///
public class ScheduledProfileExecutionService : IScheduledProfileExecutionService
{
private readonly IDataCouplerProfileService _profileService;
private readonly IDataConnectionFactory _connectionFactory;
private readonly ICredentialService _credentialService;
private readonly IDataConnectionCredentialService _dataConnectionCredentialService;
private readonly IKeyAssociationService _keyAssociationService;
private readonly IAssociationService _associationService;
private readonly IDeletionSyncService _deletionSyncService;
private readonly ILogger _logger;
public ScheduledProfileExecutionService(
IDataCouplerProfileService profileService,
IDataConnectionFactory connectionFactory,
ICredentialService credentialService,
IDataConnectionCredentialService dataConnectionCredentialService,
IKeyAssociationService keyAssociationService,
IAssociationService associationService,
IDeletionSyncService deletionSyncService,
ILogger logger)
{
_profileService = profileService;
_connectionFactory = connectionFactory;
_credentialService = credentialService;
_dataConnectionCredentialService = dataConnectionCredentialService;
_keyAssociationService = keyAssociationService;
_associationService = associationService;
_deletionSyncService = deletionSyncService;
_logger = logger;
}
///
/// Esegue un profilo Data Coupler specificato dall'ID
///
public async Task ExecuteProfileAsync(int profileId)
{
return await ExecuteProfileAsync(profileId, enableDeletionSync: false);
}
///
/// Esegue un profilo Data Coupler specificato dall'ID con configurazione sincronizzazione eliminazioni
///
public async Task ExecuteProfileAsync(int profileId, bool enableDeletionSync)
{
var startTime = DateTime.UtcNow;
var result = new ProfileExecutionResult
{
ExecutionTime = startTime // Ora locale per coerenza con l'interfaccia utente
};
try
{
_logger.LogInformation("Inizio esecuzione profilo schedulato ID: {ProfileId} - DeletionSync: {DeletionSync}", profileId, enableDeletionSync);
// Carica il profilo
var profile = await _profileService.GetProfileByIdAsync(profileId);
if (profile == null)
{
result.Success = false;
result.Message = $"Profilo con ID {profileId} non trovato";
return result;
}
_logger.LogInformation("Esecuzione profilo: {ProfileName}", profile.Name);
// Aggiorna la data di ultimo utilizzo
await _profileService.UpdateLastUsedAsync(profile.Id);
// Esegue il trasferimento dati con la logica completa
var recordsTransferred = await ExecuteDataTransferAsync(profile, enableDeletionSync);
result.Success = true;
result.RecordsProcessed = recordsTransferred;
result.Message = $"Trasferimento completato con successo. {recordsTransferred} record elaborati.";
_logger.LogInformation("Profilo {ProfileName} eseguito con successo. Record elaborati: {RecordCount}",
profile.Name, recordsTransferred);
}
catch (Exception ex)
{
result.Success = false;
result.Message = $"Errore durante l'esecuzione: {ex.Message}";
_logger.LogError(ex, "Errore durante l'esecuzione del profilo ID: {ProfileId}", profileId);
}
finally
{
result.Duration = DateTime.UtcNow - startTime;
}
return result;
}
///
/// Metodo principale per l'esecuzione del trasferimento dati
/// Implementa la stessa logica di StartDataTransferWithComposite
///
private async Task ExecuteDataTransferAsync(DataCouplerProfile profile, bool enableDeletionSync = false)
{
_logger.LogInformation("=== INIZIO TRASFERIMENTO DATI SCHEDULATO ===");
_logger.LogInformation("Esecuzione profilo: {ProfileName} (ID: {ProfileId}) - DeletionSync: {DeletionSync}",
profile.Name, profile.Id, enableDeletionSync);
try
{
// 1. Setup delle connessioni sorgente e destinazione
var (sourceManager, sourceCredential) = await SetupSourceConnectionAsync(profile);
var (restClient, restCredential, restEntity) = await SetupDestinationConnectionAsync(profile);
// 2. Verifica che le connessioni siano valide
// Per i file, sourceManager sarà null (è normale), per database deve essere presente
if (profile.SourceType.ToLower() == "database" && sourceManager == null)
{
throw new InvalidOperationException("Impossibile stabilire connessione con il database sorgente");
}
if (profile.SourceType.ToLower() == "file" && string.IsNullOrEmpty(profile.SourceFilePath))
{
throw new InvalidOperationException("Percorso file sorgente non specificato nel profilo");
}
if (restClient == null || restEntity == null)
{
throw new InvalidOperationException("Impossibile stabilire connessione con la destinazione REST");
}
// 3. Ottieni i record dalla sorgente
var sourceRecords = await GetAllRecordsFromSourceAsync(profile, sourceManager);
if (!sourceRecords.Any())
{
_logger.LogWarning("Nessun record trovato nella sorgente per il profilo: {ProfileName}", profile.Name);
return 0;
}
_logger.LogInformation("Ottenuti {RecordCount} record dalla sorgente", sourceRecords.Count());
// 4. Parse field mappings
var fieldMappings = ParseFieldMappings(profile.FieldMappingJson);
if (!fieldMappings.Any())
{
throw new InvalidOperationException("Nessun mapping dei campi configurato per il profilo");
}
// 4.5. Parse External ID Relationships (Salesforce)
var externalIdRelationships = ParseExternalIdRelationships(profile.ExternalIdRelationshipsJson);
if (externalIdRelationships.Any())
{
_logger.LogInformation("Caricate {Count} External ID Relationships dal profilo", externalIdRelationships.Count);
}
// 5. Determina se utilizzare Salesforce Composite API
bool useSalesforceComposite = restClient is DataConnection.REST.Implementations.SalesforceServiceClient;
if (useSalesforceComposite)
{
_logger.LogInformation("Utilizzo Salesforce Composite API per il trasferimento");
return await ExecuteDataTransferWithCompositeAsync(profile, sourceRecords, restClient, restEntity, restCredential!, fieldMappings, externalIdRelationships, enableDeletionSync);
}
else
{
_logger.LogInformation("Utilizzo metodo trasferimento standard per il trasferimento");
return await ExecuteDataTransferStandardAsync(profile, sourceRecords, restClient, restEntity, restCredential!, fieldMappings, externalIdRelationships, enableDeletionSync);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore durante il trasferimento dati per il profilo {ProfileName}", profile.Name);
throw;
}
finally
{
_logger.LogInformation("=== FINE TRASFERIMENTO DATI SCHEDULATO ===");
}
}
///
/// Setup della connessione sorgente (database o file)
///
private async Task<(IDatabaseManager? manager, DatabaseCredential? credential)> SetupSourceConnectionAsync(DataCouplerProfile profile)
{
_logger.LogInformation("SetupSourceConnectionAsync - SourceType: '{SourceType}', SourceCredentialId: {SourceCredentialId}, SourceFilePath: '{SourceFilePath}'",
profile.SourceType, profile.SourceCredentialId, profile.SourceFilePath);
// Se la sorgente è un file, non servono credenziali database
if (string.IsNullOrEmpty(profile.SourceType) ||
profile.SourceType.Equals("file", StringComparison.OrdinalIgnoreCase))
{
_logger.LogInformation("Sorgente tipo file, nessuna connessione database necessaria");
return (null, null);
}
// Per database, la credenziale è obbligatoria
if (!profile.SourceCredentialId.HasValue)
{
throw new InvalidOperationException("Credenziale sorgente non specificata per il database");
}
var sourceCredential = await _dataConnectionCredentialService.GetDatabaseCredentialAsync(profile.SourceCredentialId.Value);
if (sourceCredential == null)
{
throw new InvalidOperationException($"Credenziale database sorgente con ID {profile.SourceCredentialId.Value} non trovata");
}
// Applica override database se specificato (per le schedulazioni)
if (!string.IsNullOrEmpty(profile.SourceDatabaseName))
{
sourceCredential.DatabaseName = profile.SourceDatabaseName;
}
var databaseManager = await _connectionFactory.CreateDatabaseManagerAsync(sourceCredential.Name);
// Test connessione
var canConnect = await databaseManager.TestConnectionAsync();
if (!canConnect)
{
throw new InvalidOperationException($"Impossibile connettersi al database sorgente: {sourceCredential.Name}");
}
_logger.LogInformation("Connessione database sorgente stabilita: {CredentialName}", sourceCredential.Name);
return (databaseManager, sourceCredential);
}
///
/// Setup della connessione destinazione REST
///
private async Task<(IRestServiceClient? client, RestApiCredential? credential, RestEntitySummary? entity)> SetupDestinationConnectionAsync(DataCouplerProfile profile)
{
if (profile.DestinationType.ToLower() != "rest")
{
throw new NotSupportedException("Solo destinazioni REST sono supportate per le schedulazioni");
}
if (!profile.DestinationCredentialId.HasValue)
{
throw new InvalidOperationException("Credenziale destinazione REST non specificata");
}
var restCredential = await _dataConnectionCredentialService.GetRestApiCredentialAsync(profile.DestinationCredentialId.Value);
if (restCredential == null)
{
throw new InvalidOperationException($"Credenziale REST con ID {profile.DestinationCredentialId.Value} non trovata");
}
var restClient = await _connectionFactory.CreateRestServiceClientAsync(restCredential.Name);
// Autenticazione
var authResult = await restClient.AuthenticateAsync();
if (!authResult)
{
throw new InvalidOperationException($"Autenticazione fallita per il servizio REST: {restCredential.Name}");
}
// Ottieni l'entità REST (per le schedulazioni usiamo solo il nome)
RestEntitySummary? restEntity = null;
if (!string.IsNullOrEmpty(profile.DestinationEndpoint))
{
// Per le schedulazioni, creiamo un'entità fittizia con il nome dal profilo
restEntity = new RestEntitySummary
{
Name = profile.DestinationEndpoint,
Label = profile.DestinationEndpoint
};
}
if (restEntity == null)
{
throw new InvalidOperationException($"Entità REST non specificata nel profilo");
}
_logger.LogInformation("Connessione REST destinazione stabilita: {CredentialName}, Entità: {EntityName}",
restCredential.Name, restEntity.Name);
return (restClient, restCredential, restEntity);
}
///
/// Ottiene tutti i record dalla sorgente
///
private async Task>> GetAllRecordsFromSourceAsync(DataCouplerProfile profile, IDatabaseManager? databaseManager)
{
if (profile.SourceType.ToLower() == "database")
{
return await GetAllRecordsFromDatabaseAsync(profile, databaseManager!);
}
else if (profile.SourceType.ToLower() == "file")
{
return await GetAllRecordsFromFileAsync(profile);
}
throw new NotSupportedException($"Tipo sorgente non supportato: {profile.SourceType}");
}
///
/// Parse del JSON dei field mappings
///
private Dictionary ParseFieldMappings(string? fieldMappingJson)
{
var mappings = new Dictionary();
if (string.IsNullOrEmpty(fieldMappingJson))
{
_logger.LogWarning("Field mapping JSON è vuoto o null");
return mappings;
}
_logger.LogDebug("Parsing field mappings JSON: {FieldMappingJson}", fieldMappingJson);
try
{
// Usa le stesse opzioni di serializzazione del DataCouplerProfileService
var options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
var fieldMappingsList = JsonSerializer.Deserialize>(fieldMappingJson, options);
if (fieldMappingsList != null)
{
_logger.LogInformation("Trovati {Count} field mappings nel JSON", fieldMappingsList.Count);
foreach (var mapping in fieldMappingsList)
{
if (!string.IsNullOrEmpty(mapping.SourceField) && !string.IsNullOrEmpty(mapping.DestinationField))
{
mappings[mapping.SourceField] = mapping.DestinationField;
_logger.LogDebug("Mapping aggiunto: {SourceField} -> {DestinationField}",
mapping.SourceField, mapping.DestinationField);
}
else
{
_logger.LogWarning("Mapping incompleto ignorato: SourceField='{SourceField}', DestinationField='{DestinationField}'",
mapping.SourceField, mapping.DestinationField);
}
}
}
else
{
_logger.LogWarning("Deserializzazione ritornato null per field mappings JSON");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nel parsing dei field mappings: {FieldMappingJson}", fieldMappingJson);
}
_logger.LogInformation("Mappings completati: {Count} associazioni", mappings.Count);
return mappings;
}
///
/// Deserializza gli External ID Relationships dal JSON del profilo
///
private List ParseExternalIdRelationships(string? externalIdRelationshipsJson)
{
var relationships = new List();
if (string.IsNullOrEmpty(externalIdRelationshipsJson))
{
_logger.LogDebug("ExternalIdRelationships JSON è vuoto o null");
return relationships;
}
_logger.LogDebug("Parsing ExternalIdRelationships JSON: {Json}", externalIdRelationshipsJson);
try
{
var options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
var relationshipsList = JsonSerializer.Deserialize>(externalIdRelationshipsJson, options);
if (relationshipsList != null)
{
relationships = relationshipsList;
_logger.LogInformation("Trovati {Count} External ID Relationships nel JSON", relationships.Count);
foreach (var rel in relationships)
{
_logger.LogDebug("External ID Relationship: {RelationshipName} - {RelatedObject}.{ExternalIdField} <- {SourceField}",
rel.RelationshipName, rel.RelatedObjectName, rel.ExternalIdField, rel.SourceField);
}
}
else
{
_logger.LogWarning("Deserializzazione ritornato null per ExternalIdRelationships JSON");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nel parsing degli ExternalIdRelationships: {Json}", externalIdRelationshipsJson);
}
return relationships;
}
///
/// Ottiene tutti i record dal database
///
private async Task>> GetAllRecordsFromDatabaseAsync(DataCouplerProfile profile, IDatabaseManager databaseManager)
{
string query;
if (!string.IsNullOrEmpty(profile.SourceCustomQuery))
{
query = profile.SourceCustomQuery;
}
else if (!string.IsNullOrEmpty(profile.SourceTable))
{
var tableName = !string.IsNullOrEmpty(profile.SourceSchema)
? $"[{profile.SourceSchema}].[{profile.SourceTable}]"
: $"[{profile.SourceTable}]";
query = $"SELECT * FROM {tableName}";
}
else
{
throw new InvalidOperationException("Né query custom né tabella specificata per la sorgente database");
}
_logger.LogDebug("Esecuzione query sorgente: {Query}", query);
var results = await databaseManager.ExecuteRawQueryAsync(query, profile.SourceDatabaseName ?? "");
return results;
}
///
/// Ottiene tutti i record da file CSV o Excel
///
private async Task>> GetAllRecordsFromFileAsync(DataCouplerProfile profile)
{
if (string.IsNullOrEmpty(profile.SourceFilePath))
throw new InvalidOperationException("Percorso file sorgente non specificato");
if (!System.IO.File.Exists(profile.SourceFilePath))
throw new FileNotFoundException($"Il file '{profile.SourceFilePath}' non esiste");
var extension = System.IO.Path.GetExtension(profile.SourceFilePath).ToLowerInvariant();
_logger.LogInformation("Lettura file sorgente: {FilePath} (Tipo: {Extension})", profile.SourceFilePath, extension);
if (extension == ".csv")
{
return await ReadCsvFileAsync(profile.SourceFilePath);
}
else if (extension == ".xlsx" || extension == ".xls")
{
return await ReadExcelFileAsync(profile.SourceFilePath);
}
else
{
throw new NotSupportedException($"Formato file non supportato: {extension}. Utilizzare .csv, .xlsx o .xls");
}
}
///
/// Legge un file CSV e restituisce i record come dizionari
///
private async Task>> ReadCsvFileAsync(string filePath)
{
var dataRows = new List>();
try
{
// Apri in modalità condivisa per permettere ad altri processi di accedere al file
using var stream = new System.IO.FileStream(filePath, System.IO.FileMode.Open, System.IO.FileAccess.Read, System.IO.FileShare.ReadWrite);
using var reader = new System.IO.StreamReader(stream);
var firstLine = await reader.ReadLineAsync();
if (string.IsNullOrEmpty(firstLine))
{
_logger.LogWarning("Il file CSV è vuoto: {FilePath}", filePath);
return dataRows;
}
// Detect separator automatically
var separator = DetectCsvSeparator(firstLine);
_logger.LogDebug("CSV separator rilevato: '{Separator}'", separator);
// Parse headers (first row)
var headers = ParseCsvLine(firstLine, separator);
_logger.LogInformation("CSV headers: {Headers} (Totale: {Count})", string.Join(", ", headers), headers.Count);
// Read data rows
string? line;
int rowNumber = 2;
while ((line = await reader.ReadLineAsync()) != null)
{
if (string.IsNullOrWhiteSpace(line)) continue;
var values = ParseCsvLine(line, separator);
var row = new Dictionary();
for (int i = 0; i < headers.Count; i++)
{
var value = i < values.Count ? values[i] : "";
row[headers[i]] = string.IsNullOrEmpty(value) ? "" : value;
}
dataRows.Add(row);
rowNumber++;
}
_logger.LogInformation("File CSV letto con successo: {FilePath}, Record: {RecordCount}", filePath, dataRows.Count);
return dataRows;
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nella lettura del file CSV: {FilePath}", filePath);
throw;
}
}
///
/// Legge un file Excel e restituisce i record come dizionari
///
private async Task>> ReadExcelFileAsync(string filePath)
{
var dataRows = new List>();
try
{
// Registra il provider di encoding per ExcelDataReader
System.Text.Encoding.RegisterProvider(System.Text.CodePagesEncodingProvider.Instance);
// Apri in modalità condivisa per permettere ad altri processi di accedere al file
using var stream = new System.IO.FileStream(filePath, System.IO.FileMode.Open, System.IO.FileAccess.Read, System.IO.FileShare.ReadWrite);
var extension = System.IO.Path.GetExtension(filePath).ToLowerInvariant();
ExcelDataReader.IExcelDataReader reader;
if (extension == ".xlsx")
{
reader = ExcelDataReader.ExcelReaderFactory.CreateOpenXmlReader(stream);
}
else if (extension == ".xls")
{
reader = ExcelDataReader.ExcelReaderFactory.CreateBinaryReader(stream);
}
else
{
throw new NotSupportedException($"Formato Excel non supportato: {extension}");
}
using (reader)
{
var configuration = new ExcelDataReader.ExcelDataSetConfiguration()
{
ConfigureDataTable = (_) => new ExcelDataReader.ExcelDataTableConfiguration()
{
UseHeaderRow = true
}
};
var dataSet = reader.AsDataSet(configuration);
_logger.LogInformation("File Excel letto: {FilePath}, Fogli: {SheetCount}", filePath, dataSet.Tables.Count);
// Legge il primo foglio (o tutti i fogli se necessario)
if (dataSet.Tables.Count > 0)
{
var table = dataSet.Tables[0];
var headers = new List();
foreach (System.Data.DataColumn column in table.Columns)
{
headers.Add(column.ColumnName);
}
foreach (System.Data.DataRow dataRow in table.Rows)
{
var row = new Dictionary();
foreach (var header in headers)
{
var value = dataRow[header];
row[header] = value == DBNull.Value ? "" : value?.ToString() ?? "";
}
dataRows.Add(row);
}
_logger.LogInformation("Foglio Excel '{SheetName}' letto con successo: {RecordCount} record",
table.TableName, dataRows.Count);
}
}
await Task.CompletedTask; // Per mantenere il metodo async
return dataRows;
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nella lettura del file Excel: {FilePath}", filePath);
throw;
}
}
///
/// Rileva automaticamente il separatore CSV
///
private char DetectCsvSeparator(string line)
{
var separators = new[] { ',', ';', '\t', '|' };
var maxCount = 0;
var detectedSeparator = ',';
foreach (var sep in separators)
{
var count = line.Count(c => c == sep);
if (count > maxCount)
{
maxCount = count;
detectedSeparator = sep;
}
}
return detectedSeparator;
}
///
/// Parse di una riga CSV gestendo correttamente le virgolette
///
private List ParseCsvLine(string line, char separator = ',')
{
var result = new List();
var current = new System.Text.StringBuilder();
bool inQuotes = false;
for (int i = 0; i < line.Length; i++)
{
char c = line[i];
if (c == '"')
{
if (inQuotes && i + 1 < line.Length && line[i + 1] == '"')
{
current.Append('"');
i++;
}
else
{
inQuotes = !inQuotes;
}
}
else if (c == separator && !inQuotes)
{
result.Add(current.ToString().Trim());
current.Clear();
}
else
{
current.Append(c);
}
}
result.Add(current.ToString().Trim());
return result;
}
///
/// Metodo principale per trasferimento dati standard (non Salesforce Composite)
///
private async Task ExecuteDataTransferStandardAsync(
DataCouplerProfile profile,
IEnumerable> sourceRecords,
IRestServiceClient restClient,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary fieldMappings,
List externalIdRelationships,
bool enableDeletionSync = false)
{
_logger.LogInformation("Iniziando trasferimento dati standard per {RecordCount} record - DeletionSync: {DeletionSync}",
sourceRecords.Count(), enableDeletionSync);
int successCount = 0;
int errorCount = 0;
int recordNumber = 1;
foreach (var record in sourceRecords)
{
try
{
// 1. Trasforma il record utilizzando i field mappings e External ID Relationships
var restData = TransformRecordForRest(record, fieldMappings, externalIdRelationships);
// 2. Gestione associazioni record se abilitata
string? entityId = null;
if (profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField))
{
entityId = await HandleRecordAssociation(profile, record, restClient, restEntity, restCredential, restData, fieldMappings);
}
// 3. Se non abbiamo un ID esistente, crea un nuovo record
if (string.IsNullOrEmpty(entityId))
{
var result = await restClient.CreateEntityAsync(restEntity.Name, restData);
if (result != null)
{
// Estrai l'ID dal risultato (diversi sistemi REST usano campi diversi)
entityId = result.ContainsKey("id") ? result["id"]?.ToString() :
result.ContainsKey("Id") ? result["Id"]?.ToString() :
result.ContainsKey("DocEntry") ? result["DocEntry"]?.ToString() : null;
successCount++;
_logger.LogDebug("Record {RecordNumber} creato con successo. ID: {EntityId}", recordNumber, entityId);
}
else
{
errorCount++;
_logger.LogWarning("Errore nella creazione del record {RecordNumber}: risultato null", recordNumber);
}
}
else
{
successCount++; // Record aggiornato tramite associazione
}
// 4. Salva associazione se abilitata e abbiamo un ID
if (profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField) && !string.IsNullOrEmpty(entityId))
{
await SaveRecordAssociation(profile, record, restEntity, restCredential, restData, entityId, fieldMappings);
}
}
catch (Exception ex)
{
errorCount++;
_logger.LogError(ex, "Errore nel trasferimento del record {RecordNumber}", recordNumber);
}
recordNumber++;
}
_logger.LogInformation("Trasferimento completato. Successi: {SuccessCount}, Errori: {ErrorCount}",
successCount, errorCount);
// Sincronizzazione cancellazioni (se abilitata)
if (enableDeletionSync && profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField))
{
try
{
_logger.LogInformation("SCHEDULED: Inizio sincronizzazione cancellazioni...");
// Estrai tutti i valori chiave presenti nella sorgente
var sourceKeyValues = sourceRecords
.Select(r => r.ContainsKey(profile.SourceKeyField) ? r[profile.SourceKeyField]?.ToString() : null)
.Where(k => !string.IsNullOrEmpty(k))
.Cast()
.Distinct()
.ToList();
_logger.LogInformation("SCHEDULED: Trovati {Count} valori chiave nella sorgente", sourceKeyValues.Count);
// Sincronizza le cancellazioni
var deletionOptions = new DeletionSyncOptions
{
Action = DeletionAction.Delete // Default: elimina fisicamente
};
var deletionResult = await _deletionSyncService.SyncDeletionsAsync(
sourceKeyValues,
restEntity.Name,
restCredential.Name,
restClient,
deletionOptions);
if (deletionResult.DeletedRecordsDetected > 0)
{
_logger.LogInformation("SCHEDULED: Sincronizzazione cancellazioni completata - {Detected} rilevati, {Synced} sincronizzati, {Errors} errori",
deletionResult.DeletedRecordsDetected,
deletionResult.DeletedRecordsSynced,
deletionResult.SyncErrors);
}
}
catch (Exception delEx)
{
_logger.LogError(delEx, "SCHEDULED: Errore durante la sincronizzazione delle cancellazioni");
}
}
return successCount;
}
///
/// Implementazione completa del trasferimento con Salesforce Composite API
/// Equivalente a StartDataTransferWithComposite del DataCoupler.razor.cs
///
private async Task ExecuteDataTransferWithCompositeAsync(
DataCouplerProfile profile,
IEnumerable> sourceRecords,
IRestServiceClient restClient,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary fieldMappings,
List externalIdRelationships,
bool enableDeletionSync = false)
{
_logger.LogInformation("Iniziando trasferimento dati COMPOSITE per {RecordCount} record - DeletionSync: {DeletionSync}",
sourceRecords.Count(), enableDeletionSync);
// Verifica che sia effettivamente un SalesforceServiceClient
if (!(restClient is DataConnection.REST.Implementations.SalesforceServiceClient salesforceClient))
{
_logger.LogWarning("Client REST non è SalesforceServiceClient, fallback al metodo standard");
return await ExecuteDataTransferStandardAsync(profile, sourceRecords, restClient, restEntity, restCredential, fieldMappings, externalIdRelationships, enableDeletionSync);
}
try
{
// 1. Trasforma i record e analizza le associazioni IN PARALLELO
var recordsForCreate = new ConcurrentBag<(Dictionary transformedData, Dictionary originalRecord, int recordNumber)>();
var recordsForUpdate = new ConcurrentBag<(Dictionary transformedData, string entityId, Dictionary originalRecord, int recordNumber, string newDataHash)>();
var recordsSkipped = new ConcurrentBag<(Dictionary originalRecord, int recordNumber, string reason)>();
var recordErrors = new ConcurrentBag<(Dictionary originalRecord, int recordNumber, string error)>();
// Cattura i valori condivisi per evitare race conditions
var currentEntityName = restEntity.Name;
var currentCredentialName = restCredential.Name; // Usa il nome della credenziale, non l'ID
var currentUseRecordAssociations = profile.UseRecordAssociations;
// Crea lista indicizzata per mantenere il record number
var indexedRecords = sourceRecords.Select((record, index) => new { Record = record, RecordNumber = index + 1 }).ToList();
_logger.LogInformation("COMPOSITE SCHEDULED: Inizio analisi parallela di {RecordCount} record", indexedRecords.Count);
var analysisStartTime = DateTime.UtcNow;
// Processa tutti i record in parallelo
var processingTasks = indexedRecords.Select(async indexedRecord =>
{
try
{
var record = indexedRecord.Record;
var recordNumber = indexedRecord.RecordNumber;
// Trasforma il record in base ai mapping e External ID Relationships (operazione locale, thread-safe)
var restData = TransformRecordForRest(record, fieldMappings, externalIdRelationships);
// Genera la chiave sorgente e l'hash dei dati per questo record (include MAPPING_SIGNATURE)
var sourceKey = GenerateSourceKey(record, profile.SourceKeyField);
var currentDataHash = GenerateDataHash(restData, fieldMappings);
// Analizza le associazioni per capire se aggiornare, creare o saltare
if (currentUseRecordAssociations && !string.IsNullOrEmpty(sourceKey))
{
_logger.LogDebug("COMPOSITE SCHEDULED: Cerco associazione per KeyValue: '{KeyValue}', Entity: '{Entity}', Credential: '{Credential}'",
sourceKey, currentEntityName, currentCredentialName);
// Cerca associazione esistente usando il metodo parallelo
var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(
sourceKey, currentEntityName, currentCredentialName);
// FALLBACK: Se non troviamo l'associazione con tutti i parametri, proviamo solo con il KeyValue
if (existingAssociation == null)
{
existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(sourceKey);
if (existingAssociation != null)
{
// Verifica compatibilità
if (existingAssociation.DestinationEntity != currentEntityName ||
existingAssociation.RestCredentialName != currentCredentialName)
{
existingAssociation = null;
}
}
}
// 🔍 PRE-DISCOVERY: Usa il servizio centralizzato
if (existingAssociation == null && !string.IsNullOrEmpty(profile.SourceKeyField))
{
var preDiscoveryRequest = new PreDiscoveryRequest
{
SourceKey = sourceKey,
SourceKeyField = profile.SourceKeyField,
DestinationEntity = currentEntityName,
CredentialName = currentCredentialName,
DestinationKeyField = "Id",
FieldMappings = fieldMappings,
RestClient = restClient,
CurrentDataHash = currentDataHash,
EnablePreDiscovery = true,
UseParallelMethod = true, // Usa metodi paralleli thread-safe
IsScheduledTransfer = true,
SourceType = profile.SourceType
};
existingAssociation = await _associationService.FindOrCreateAssociationAsync(preDiscoveryRequest);
}
if (existingAssociation != null && existingAssociation.IsActive)
{
// 🔍 PRE-DISCOVERY: Usa il servizio per verificare se è un'associazione Pre-Discovery
var isPreDiscoveryAssociation = _associationService.IsPreDiscoveryAssociation(existingAssociation);
// Se l'associazione è stata appena creata dal Pre-Discovery, FORZA l'aggiornamento
if (isPreDiscoveryAssociation)
{
// Forza aggiornamento senza controllo hash
recordsForUpdate.Add((restData, existingAssociation.DestinationId, record, recordNumber, currentDataHash));
_logger.LogInformation("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per AGGIORNAMENTO FORZATO (Pre-Discovery) - EntityId: {EntityId}",
recordNumber, existingAssociation.DestinationId);
}
else
{
// CONTROLLO HASH: Verifica se i dati sono cambiati (solo per associazioni esistenti)
var existingHash = existingAssociation.Data_Hash;
if (!string.IsNullOrEmpty(existingHash) && existingHash.Equals(currentDataHash, StringComparison.OrdinalIgnoreCase))
{
// I dati non sono cambiati, salta questo record
recordsSkipped.Add((record, recordNumber, "Dati non modificati (hash identico)"));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} saltato - hash identico: {Hash}",
recordNumber, currentDataHash);
}
else
{
// I dati sono cambiati o l'hash è vuoto, procedi con l'aggiornamento
recordsForUpdate.Add((restData, existingAssociation.DestinationId, record, recordNumber, currentDataHash));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per aggiornamento (EntityId: {EntityId}) - hash diverso: old={OldHash}, new={NewHash}",
recordNumber, existingAssociation.DestinationId, existingHash ?? "NULL", currentDataHash);
}
}
}
else
{
// Record da creare (nessuna associazione esistente)
recordsForCreate.Add((restData, record, recordNumber));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per creazione", recordNumber);
}
}
else
{
// Record da creare (no associazioni)
recordsForCreate.Add((restData, record, recordNumber));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per creazione (no associazioni)", recordNumber);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nella trasformazione del record {RecordNumber}", indexedRecord.RecordNumber);
recordErrors.Add((indexedRecord.Record, indexedRecord.RecordNumber, ex.Message));
}
});
// Attendi il completamento di tutte le operazioni parallele
await Task.WhenAll(processingTasks);
var analysisEndTime = DateTime.UtcNow;
var analysisElapsed = (analysisEndTime - analysisStartTime).TotalMilliseconds;
// Converti i ConcurrentBag in liste per il resto del processing
var finalRecordsForCreate = recordsForCreate.ToList();
var finalRecordsForUpdate = recordsForUpdate.ToList();
var finalRecordsSkipped = recordsSkipped.ToList();
var finalRecordErrors = recordErrors.ToList();
_logger.LogInformation("COMPOSITE SCHEDULED: Analisi parallela completata in {ElapsedMs}ms - {CreateCount} record da creare, {UpdateCount} record da aggiornare, {SkippedCount} record saltati, {ErrorCount} errori",
analysisElapsed, finalRecordsForCreate.Count, finalRecordsForUpdate.Count, finalRecordsSkipped.Count, finalRecordErrors.Count);
// 2. Esegui le chiamate composite in parallelo
var createTask = Task.FromResult(new List());
var updateTask = Task.FromResult(new List());
if (finalRecordsForCreate.Any())
{
var createData = finalRecordsForCreate.Select(r => r.transformedData).ToList();
createTask = salesforceClient.BatchCreateEntitiesAsync(restEntity.Name, createData);
}
if (finalRecordsForUpdate.Any())
{
var updateData = finalRecordsForUpdate.ToDictionary(
r => r.entityId,
r => r.transformedData);
updateTask = salesforceClient.BatchUpdateEntitiesAsync(restEntity.Name, updateData);
}
// Attendi entrambe le operazioni
await Task.WhenAll(createTask, updateTask);
var createResults = await createTask;
var updateResults = await updateTask;
// 3. Processa i risultati delle creazioni
int successCount = 0;
int errorCount = finalRecordErrors.Count;
int updatedCount = 0;
// Lista per raccogliere le task di creazione associazioni
var createAssociationTasks = new List();
for (int i = 0; i < createResults.Count; i++)
{
var result = createResults[i];
var originalData = finalRecordsForCreate[i];
if (result.Success)
{
successCount++;
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} creato con successo. EntityId: {EntityId}",
originalData.recordNumber, result.EntityId);
// Aggiungi task di creazione associazione alla lista (esecuzione parallela)
if (currentUseRecordAssociations && !string.IsNullOrEmpty(result.EntityId))
{
var dataHashForAssociation = GenerateDataHash(originalData.transformedData, fieldMappings);
var associationTask = CreateAssociationAsync(profile, originalData.originalRecord, restCredential, result.EntityId, originalData.recordNumber, dataHashForAssociation);
createAssociationTasks.Add(associationTask);
}
}
else
{
errorCount++;
_logger.LogError("COMPOSITE SCHEDULED: Errore creazione record {RecordNumber}: {Error}",
originalData.recordNumber, result.ErrorMessage);
}
}
// 4. Processa i risultati degli aggiornamenti
var updateAssociationTasks = new List();
for (int i = 0; i < updateResults.Count; i++)
{
var result = updateResults[i];
var originalData = finalRecordsForUpdate[i];
if (result.Success)
{
updatedCount++;
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} aggiornato con successo. EntityId: {EntityId}",
originalData.recordNumber, result.EntityId);
// Aggiungi task di aggiornamento associazione alla lista (esecuzione parallela)
if (currentUseRecordAssociations && !string.IsNullOrEmpty(result.EntityId))
{
var updateHashTask = UpdateAssociationHashAsync(profile, originalData.originalRecord, restCredential, result.EntityId, originalData.newDataHash);
updateAssociationTasks.Add(updateHashTask);
}
}
else
{
errorCount++;
_logger.LogError("COMPOSITE SCHEDULED: Errore aggiornamento record {RecordNumber}: {Error}",
originalData.recordNumber, result.ErrorMessage);
}
}
// 5. Esegui tutte le operazioni di associazione in parallelo
var allAssociationTasks = createAssociationTasks.Concat(updateAssociationTasks).ToList();
if (allAssociationTasks.Any())
{
_logger.LogInformation("COMPOSITE SCHEDULED: Avvio di {TaskCount} operazioni di associazione in parallelo ({CreateCount} creazioni, {UpdateCount} aggiornamenti)",
allAssociationTasks.Count, createAssociationTasks.Count, updateAssociationTasks.Count);
var startTime = DateTime.UtcNow;
await Task.WhenAll(allAssociationTasks);
var endTime = DateTime.UtcNow;
_logger.LogInformation("COMPOSITE SCHEDULED: Operazioni di associazione completate in {ElapsedMs}ms",
(endTime - startTime).TotalMilliseconds);
}
var skippedCount = finalRecordsSkipped.Count;
var totalProcessed = successCount + updatedCount;
_logger.LogInformation("COMPOSITE SCHEDULED: Trasferimento completato. Creazioni: {SuccessCount}, Aggiornamenti: {UpdatedCount}, Saltati: {SkippedCount}, Errori: {ErrorCount}",
successCount, updatedCount, skippedCount, errorCount);
// Sincronizzazione cancellazioni (se abilitata)
if (enableDeletionSync && currentUseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField))
{
try
{
_logger.LogInformation("COMPOSITE SCHEDULED: Inizio sincronizzazione cancellazioni...");
// Estrai tutti i valori chiave presenti nella sorgente
var sourceKeyValues = sourceRecords
.Select(r => r.ContainsKey(profile.SourceKeyField) ? r[profile.SourceKeyField]?.ToString() : null)
.Where(k => !string.IsNullOrEmpty(k))
.Cast()
.Distinct()
.ToList();
_logger.LogInformation("COMPOSITE SCHEDULED: Trovati {Count} valori chiave nella sorgente", sourceKeyValues.Count);
// Sincronizza le cancellazioni
var deletionOptions = new DeletionSyncOptions
{
Action = DeletionAction.Delete // Default: elimina fisicamente
};
var deletionResult = await _deletionSyncService.SyncDeletionsAsync(
sourceKeyValues,
currentEntityName,
currentCredentialName,
restClient,
deletionOptions);
if (deletionResult.DeletedRecordsDetected > 0)
{
_logger.LogInformation("COMPOSITE SCHEDULED: Sincronizzazione cancellazioni completata - {Detected} rilevati, {Synced} sincronizzati, {Errors} errori",
deletionResult.DeletedRecordsDetected,
deletionResult.DeletedRecordsSynced,
deletionResult.SyncErrors);
}
}
catch (Exception delEx)
{
_logger.LogError(delEx, "COMPOSITE SCHEDULED: Errore durante la sincronizzazione delle cancellazioni");
}
}
return totalProcessed;
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore generale nel trasferimento dati");
throw;
}
}
#region Helper Methods
///
/// Trasforma un record sorgente in formato REST utilizzando i field mappings
///
private Dictionary TransformRecordForRest(
Dictionary sourceRecord,
Dictionary fieldMappings,
List? externalIdRelationships = null)
{
var restData = new Dictionary();
foreach (var mapping in fieldMappings)
{
if (sourceRecord.ContainsKey(mapping.Key))
{
var value = sourceRecord[mapping.Key];
// Trasforma il valore se necessario
var transformedValue = TransformValueForRest(value);
if (transformedValue != null)
{
restData[mapping.Value] = transformedValue;
}
}
}
// Aggiungi External ID Relationships (per Salesforce)
if (externalIdRelationships != null && externalIdRelationships.Any())
{
foreach (var relationship in externalIdRelationships)
{
if (!string.IsNullOrWhiteSpace(relationship.SourceField) &&
sourceRecord.ContainsKey(relationship.SourceField))
{
var sourceValue = sourceRecord[relationship.SourceField];
var transformedValue = TransformValueForRest(sourceValue);
if (transformedValue != null)
{
// Crea il dizionario annidato per l'External ID Relationship
// Formato: { "Account__r": { "Country__c": "US" } }
var externalIdObject = new Dictionary
{
{ relationship.ExternalIdField, transformedValue }
};
restData[relationship.RelationshipName] = externalIdObject;
_logger.LogDebug("Aggiunta External ID Relationship: {RelationshipName} → {ExternalIdField} = {Value}",
relationship.RelationshipName, relationship.ExternalIdField, transformedValue);
}
}
}
}
return restData;
}
///
/// Trasforma un valore per il formato REST
///
private object? TransformValueForRest(object? value)
{
if (value == null || value == DBNull.Value)
return null;
// Trasformazioni specifiche per tipo
if (value is DateTime dateTime)
{
return dateTime.ToString("yyyy-MM-ddTHH:mm:ss.fffZ", CultureInfo.InvariantCulture);
}
if (value is decimal dec)
{
return dec.ToString(CultureInfo.InvariantCulture);
}
if (value is double dbl)
{
return dbl.ToString(CultureInfo.InvariantCulture);
}
if (value is float flt)
{
return flt.ToString(CultureInfo.InvariantCulture);
}
return value;
}
///
/// Gestisce l'associazione dei record per evitare duplicati
///
private async Task HandleRecordAssociation(
DataCouplerProfile profile,
Dictionary sourceRecord,
IRestServiceClient restClient,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary restData,
Dictionary fieldMappings)
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !sourceRecord.ContainsKey(profile.SourceKeyField))
return null;
var sourceKey = sourceRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return null;
try
{
// Cerca associazione esistente usando il metodo parallelo
var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(
sourceKey, restEntity.Name, restCredential.Name);
if (existingAssociation != null && existingAssociation.IsActive)
{
// Genera l'hash dei dati correnti (solo dati trasformati/mappati, include MAPPING_SIGNATURE)
var currentDataHash = GenerateDataHash(restData, fieldMappings);
var existingHash = existingAssociation.Data_Hash;
// Verifica se i dati sono cambiati
if (!string.IsNullOrEmpty(existingHash) && existingHash.Equals(currentDataHash, StringComparison.OrdinalIgnoreCase))
{
// I dati non sono cambiati, salta l'aggiornamento ma aggiorna LastVerifiedAt
existingAssociation.LastVerifiedAt = DateTime.Now;
await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation);
_logger.LogDebug("Record non modificato, aggiornamento saltato. KeyValue: {KeyValue}, EntityId: {EntityId}, Hash: {Hash}",
sourceKey, existingAssociation.DestinationId, currentDataHash);
return existingAssociation.DestinationId;
}
// I dati sono cambiati, procedi con l'aggiornamento
var updateResult = await restClient.UpdateEntityAsync(restEntity.Name, existingAssociation.DestinationId, restData);
if (updateResult != null)
{
// Aggiorna l'hash e LastVerifiedAt nell'associazione
existingAssociation.Data_Hash = currentDataHash;
existingAssociation.UpdatedAt = DateTime.Now;
existingAssociation.LastVerifiedAt = DateTime.Now;
await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation);
_logger.LogDebug("Record aggiornato tramite associazione. KeyValue: {KeyValue}, EntityId: {EntityId}, NewHash: {Hash}",
sourceKey, existingAssociation.DestinationId, currentDataHash);
return existingAssociation.DestinationId;
}
else
{
_logger.LogWarning("Fallimento aggiornamento record associato. KeyValue: {KeyValue}, EntityId: {EntityId}",
sourceKey, existingAssociation.DestinationId);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nella gestione dell'associazione per KeyValue: {KeyValue}", sourceKey);
}
return null;
}
///
/// Salva l'associazione tra record sorgente e destinazione
///
private async Task SaveRecordAssociation(
DataCouplerProfile profile,
Dictionary sourceRecord,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary restData,
string entityId,
Dictionary fieldMappings)
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !sourceRecord.ContainsKey(profile.SourceKeyField))
return;
var sourceKey = sourceRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return;
try
{
// Genera l'hash dei dati per il controllo delle modifiche (solo dati trasformati/mappati, include MAPPING_SIGNATURE)
var dataHash = GenerateDataHash(restData, fieldMappings);
var association = new KeyAssociation
{
KeyValue = sourceKey,
SourceKeyField = profile.SourceKeyField,
DestinationKeyField = "Id", // Assumiamo "Id" come campo di default
DestinationId = entityId,
DestinationEntity = restEntity.Name,
RestCredentialName = restCredential.Name, // Usa il nome della credenziale
IsActive = true,
Data_Hash = dataHash,
CreatedAt = DateTime.Now,
UpdatedAt = DateTime.Now,
LastVerifiedAt = DateTime.Now,
AdditionalInfo = JsonSerializer.Serialize(new
{
TransferDate = DateTime.Now,
SourceType = profile.SourceType,
DestinationType = profile.DestinationType,
ProfileName = profile.Name,
ScheduledTransfer = true,
StandardTransfer = true,
DataHashGenerated = true
})
};
await _dataConnectionCredentialService.SaveKeyAssociationParallelAsync(association);
_logger.LogDebug("Associazione salvata. KeyValue: {KeyValue}, EntityId: {EntityId}, Hash: {Hash}",
sourceKey, entityId, dataHash);
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nel salvaggio dell'associazione per KeyValue: {KeyValue}", sourceKey);
}
}
///
/// Genera la chiave sorgente per un record
///
private string GenerateSourceKey(Dictionary record, string? sourceKeyField)
{
if (string.IsNullOrEmpty(sourceKeyField) || !record.ContainsKey(sourceKeyField))
return string.Empty;
var keyValue = record[sourceKeyField];
return keyValue?.ToString() ?? string.Empty;
}
///
/// Genera un hash SHA256 dei dati del record passato come parametro.
/// Utilizzato per rilevare cambiamenti nei dati e ottimizzare il trasferimento.
/// Calcola l'hash SOLO sui campi presenti nel record, in ordine alfabetico.
/// DEVE essere identico al metodo in DataCoupler.razor.cs per garantire consistenza.
///
private string GenerateDataHash(Dictionary record, Dictionary? fieldMappings = null)
{
try
{
var valuesForHash = new List();
// Ordina le chiavi alfabeticamente per garantire consistenza
var orderedKeys = record.Keys.OrderBy(k => k).ToList();
// Aggiungi i valori dei dati per ogni campo presente nel record
foreach (var key in orderedKeys)
{
var value = record[key];
var normalizedValue = value?.ToString()?.Trim() ?? "";
valuesForHash.Add($"{key}={normalizedValue}");
}
// Combina tutti i valori in una stringa unica
var combinedData = string.Join("|", valuesForHash);
_logger.LogDebug("Hash dei dati generato da: {CombinedData}", combinedData);
// Calcola l'hash SHA256 (stesso algoritmo di DataCoupler.razor.cs)
using (var sha256 = System.Security.Cryptography.SHA256.Create())
{
var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(combinedData));
var hashString = Convert.ToHexString(hashBytes);
_logger.LogDebug("Hash SHA256 generato: {Hash} per {FieldCount} campi", hashString, orderedKeys.Count);
return hashString;
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Errore nella generazione dell'hash per il record");
return string.Empty;
}
}
///
/// Crea una nuova associazione record in modo asincrono
///
private async Task CreateAssociationAsync(DataCouplerProfile profile, Dictionary originalRecord, RestApiCredential restCredential, string entityId, int recordNumber, string dataHash)
{
try
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !originalRecord.ContainsKey(profile.SourceKeyField))
return;
var sourceKey = originalRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return;
// Calcola il MappingCount in modo sicuro e trova il campo destinazione mappato al campo chiave sorgente
int mappingCount = 0;
string? mappedDestinationField = null;
try
{
if (!string.IsNullOrEmpty(profile.FieldMappingJson))
{
var mappings = ParseFieldMappings(profile.FieldMappingJson);
mappingCount = mappings?.Count ?? 0;
// Cerca il campo destinazione mappato al campo chiave sorgente
if (mappings != null && !string.IsNullOrEmpty(profile.SourceKeyField))
{
if (mappings.TryGetValue(profile.SourceKeyField, out var destinationFieldName))
{
mappedDestinationField = destinationFieldName;
_logger.LogDebug("SCHEDULED MAPPING: Campo sorgente '{SourceField}' è mappato al campo destinazione '{DestField}'",
profile.SourceKeyField, mappedDestinationField);
}
else
{
_logger.LogWarning("SCHEDULED MAPPING: Campo chiave sorgente '{SourceKeyField}' NON trovato nei mappings del profilo {ProfileName}",
profile.SourceKeyField, profile.Name);
}
}
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Errore nel calcolo del MappingCount per l'associazione del record {RecordNumber}", recordNumber);
}
var association = new KeyAssociation
{
KeyValue = sourceKey,
SourceKeyField = profile.SourceKeyField ?? "",
DestinationKeyField = "Id", // Campo ID standard per REST
MappedDestinationField = mappedDestinationField, // Campo destinazione mappato al campo chiave sorgente
DestinationEntity = profile.DestinationEndpoint ?? "",
DestinationId = entityId,
RestCredentialName = restCredential.Name, // Usa il nome della credenziale
IsActive = true,
Data_Hash = dataHash,
CreatedAt = DateTime.Now,
UpdatedAt = DateTime.Now,
LastVerifiedAt = DateTime.Now,
AdditionalInfo = JsonSerializer.Serialize(new
{
TransferDate = DateTime.Now,
RecordNumber = recordNumber,
MappingCount = mappingCount,
SourceType = profile.SourceType,
DestinationType = profile.DestinationType,
ProfileName = profile.Name,
ScheduledTransfer = true,
CompositeTransfer = true,
DataHashGenerated = true
})
};
var associationId = await _dataConnectionCredentialService.SaveKeyAssociationParallelAsync(association);
_logger.LogDebug("COMPOSITE SCHEDULED: Associazione creata con ID: {AssociationId} per record {RecordNumber} - Key: {SourceKey}, EntityId: {EntityId}, Hash: {Hash}, MappedField: {MappedField}",
associationId, recordNumber, sourceKey, entityId, dataHash, mappedDestinationField ?? "N/A");
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nella creazione dell'associazione per il record {RecordNumber}", recordNumber);
}
}
///
/// Aggiorna l'hash di un'associazione esistente
///
private async Task UpdateAssociationHashAsync(DataCouplerProfile profile, Dictionary originalRecord, RestApiCredential restCredential, string entityId, string newDataHash)
{
try
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !originalRecord.ContainsKey(profile.SourceKeyField))
return;
var sourceKey = originalRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return;
// Trova l'associazione esistente usando il metodo parallelo
var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(
sourceKey, profile.DestinationEndpoint ?? "", restCredential.Name);
if (existingAssociation != null)
{
existingAssociation.Data_Hash = newDataHash;
existingAssociation.UpdatedAt = DateTime.Now;
existingAssociation.LastVerifiedAt = DateTime.Now;
// Usa UpdateKeyAssociationAsync per l'aggiornamento invece di SaveKeyAssociationParallelAsync
await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation);
_logger.LogDebug("COMPOSITE SCHEDULED: Hash associazione aggiornato per entityId {EntityId} - Key: {SourceKey}, NewHash: {Hash}",
entityId, sourceKey, newDataHash);
}
else
{
_logger.LogWarning("COMPOSITE SCHEDULED: Associazione non trovata per aggiornamento hash - EntityId: {EntityId}, SourceKey: {SourceKey}",
entityId, sourceKey);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nell'aggiornamento dell'hash per EntityId: {EntityId}", entityId);
}
}
#endregion
}