Files
Data-Coupler/Data_Coupler/Services/ScheduledProfileExecutionService.cs
T
Alessio Dal Santo 344853fde9 [Feature/Perf] Ottimizzazioni bulk pre-discovery, batch deletion sync e supporto OLE DB / Salesforce client_credentials
## Bulk Pre-Discovery e riduzione query SQLite/SOQL

### KeyAssociationService — FindAssociationsByKeyValuesBulkAsync (nuovo)
- Aggiunta query bulk 'WHERE KeyValue IN (...)' per recuperare N associazioni con 1 sola query SQLite
  (chunking a 500 chiavi per rispettare il limite ~999 parametri di SQLite)
- Aggiunta interfaccia IKeyAssociationService e delegata in DataConnectionCredentialService / IDataConnectionCredentialService

### AssociationService — BatchFindOrCreateAssociationsAsync (nuovo)
- Nuovo metodo bulk che sostituisce i loop per-record durante l'analisi composite:
  1) 1 query SQLite bulk per tutte le chiavi
  2) Per le chiavi non trovate: SOQL 'IN (...)' su Salesforce in chunk da 200 via BatchExecuteQueriesAsync
     (ceil(K/25) HTTP Composite call invece di K singole)
  3) Salvataggio parallelo delle associazioni pre-discovery scoperte
- Fallback per-record automatico per client REST non Salesforce
- Aggiornata interfaccia IAssociationService con documentazione XML completa

### DataCoupler.razor.cs — STEP A/B nel flusso COMPOSITE
- Pre-Discovery spostata FUORI dal loop parallelo (STEP A, prima dell'analisi)
- associationsByKey pre-popolato con BatchFindOrCreateAssociationsAsync
- STEP B: il loop parallelo usa TryGetValue O(1) invece di query async per record
- Rimozione blocco ~40 righe di per-record lookup / fallback duplicati

## Salesforce Composite API — Batch Delete e Patch

### SalesforceServiceClient — metodi batch (nuovi)
- BatchDeleteEntitiesAsync: elimina N record con ceil(N/25) Composite call invece di N
- BatchPatchSingleFieldAsync: aggiorna un singolo campo su N record tramite BatchUpdateEntitiesAsync

### DeletionSyncService — refactoring batch
- ExecuteBatchedSalesforceDeletionsAsync: orchestrazione batch per Delete / Deactivate / Mark su Salesforce
- ExecuteSequentialDeletionsAsync: loop sequenziale esistente estratto in metodo riutilizzabile
- Dispatcher: Salesforce -> batch Composite, altri client REST -> sequenziale

## Supporto OLE DB (database)

### DatabaseSchemaProviderFactory
- Aggiunto case DatabaseType.OleDb -> new OleDbSchemaProvider() nel factory switch

### DatabaseMethod.cs
- Aggiunto metodo IsOleDbConnection() (parallelo a IsOdbcConnection())
- Query validation e manager temporaneo estesi a OLE DB oltre che ODBC
- GetLimitedQuery: aggiunto case OleDb -> 'SELECT TOP N FROM (subquery)'

## Salesforce OAuth2 — fix client_credentials

### CredentialService.cs
- Aggiunto 'GrantType' alla HashSet serviceSpecificKeys per preservarlo nella serializzazione AdditionalParameters

### DataConnectionCredentialService.cs
- Refactored BuildRestServiceOptions in helper statico riutilizzato da entrambi i metodi GetRestServiceOptions
- Mapping coerente ClientId/ClientSecret/GrantType per Salesforce (allineato a DataConnectionFactory)
- TestSalesforceOAuthLogin: branch esplicito per client_credentials (no username/password/token)
  con validazione preventiva ClientId+ClientSecret obbligatori
- Log flow label (password|client_credentials) in tutti i messaggi di autenticazione

## VS Code tasks

### .vscode/tasks.json
- Rimosso task generico 'Publish Data_Coupler'
- Aggiunti due task separati: win-x64 e win-x86, entrambi SingleFile + Self-Contained + ReadyToRun
2026-05-28 11:15:18 +02:00

1629 lines
72 KiB
C#

using CredentialManager.Services;
using CredentialManager.Models;
using DataConnection.Interfaces;
using DataConnection.REST.Interfaces;
using DataConnection.REST.Models;
using DataConnection.CredentialManagement.Interfaces;
using DataConnection.CredentialManagement.Models;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Globalization;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using Data_Coupler.Models;
using Data_Coupler.Services;
using ExcelDataReader;
namespace Data_Coupler.Services;
/// <summary>
/// Servizio per l'esecuzione di profili Data Coupler schedulati con logica completa di trasferimento dati
/// Implementa la stessa logica avanzata di StartDataTransferWithComposite per le schedulazioni
/// </summary>
public class ScheduledProfileExecutionService : IScheduledProfileExecutionService
{
private readonly IDataCouplerProfileService _profileService;
private readonly IDataConnectionFactory _connectionFactory;
private readonly ICredentialService _credentialService;
private readonly IDataConnectionCredentialService _dataConnectionCredentialService;
private readonly IKeyAssociationService _keyAssociationService;
private readonly IAssociationService _associationService;
private readonly IDeletionSyncService _deletionSyncService;
private readonly ILogger<ScheduledProfileExecutionService> _logger;
public ScheduledProfileExecutionService(
IDataCouplerProfileService profileService,
IDataConnectionFactory connectionFactory,
ICredentialService credentialService,
IDataConnectionCredentialService dataConnectionCredentialService,
IKeyAssociationService keyAssociationService,
IAssociationService associationService,
IDeletionSyncService deletionSyncService,
ILogger<ScheduledProfileExecutionService> logger)
{
_profileService = profileService;
_connectionFactory = connectionFactory;
_credentialService = credentialService;
_dataConnectionCredentialService = dataConnectionCredentialService;
_keyAssociationService = keyAssociationService;
_associationService = associationService;
_deletionSyncService = deletionSyncService;
_logger = logger;
}
/// <summary>
/// Esegue un profilo Data Coupler specificato dall'ID
/// </summary>
public async Task<ProfileExecutionResult> ExecuteProfileAsync(int profileId)
{
return await ExecuteProfileAsync(profileId, enableDeletionSync: false);
}
/// <summary>
/// Esegue un profilo Data Coupler specificato dall'ID con configurazione sincronizzazione eliminazioni
/// </summary>
public async Task<ProfileExecutionResult> ExecuteProfileAsync(int profileId, bool enableDeletionSync)
{
var startTime = DateTime.UtcNow;
var result = new ProfileExecutionResult
{
ExecutionTime = startTime // Ora locale per coerenza con l'interfaccia utente
};
try
{
_logger.LogInformation("Inizio esecuzione profilo schedulato ID: {ProfileId} - DeletionSync: {DeletionSync}", profileId, enableDeletionSync);
// Carica il profilo
var profile = await _profileService.GetProfileByIdAsync(profileId);
if (profile == null)
{
result.Success = false;
result.Message = $"Profilo con ID {profileId} non trovato";
return result;
}
_logger.LogInformation("Esecuzione profilo: {ProfileName}", profile.Name);
// Aggiorna la data di ultimo utilizzo
await _profileService.UpdateLastUsedAsync(profile.Id);
// Esegue il trasferimento dati con la logica completa
var recordsTransferred = await ExecuteDataTransferAsync(profile, enableDeletionSync);
result.Success = true;
result.RecordsProcessed = recordsTransferred;
result.Message = $"Trasferimento completato con successo. {recordsTransferred} record elaborati.";
_logger.LogInformation("Profilo {ProfileName} eseguito con successo. Record elaborati: {RecordCount}",
profile.Name, recordsTransferred);
}
catch (Exception ex)
{
result.Success = false;
result.Message = $"Errore durante l'esecuzione: {ex.Message}";
_logger.LogError(ex, "Errore durante l'esecuzione del profilo ID: {ProfileId}", profileId);
}
finally
{
result.Duration = DateTime.UtcNow - startTime;
}
return result;
}
/// <summary>
/// Metodo principale per l'esecuzione del trasferimento dati
/// Implementa la stessa logica di StartDataTransferWithComposite
/// </summary>
private async Task<int> ExecuteDataTransferAsync(DataCouplerProfile profile, bool enableDeletionSync = false)
{
_logger.LogInformation("=== INIZIO TRASFERIMENTO DATI SCHEDULATO ===");
_logger.LogInformation("Esecuzione profilo: {ProfileName} (ID: {ProfileId}) - DeletionSync: {DeletionSync}",
profile.Name, profile.Id, enableDeletionSync);
try
{
// 1. Setup delle connessioni sorgente e destinazione
var (sourceManager, sourceCredential) = await SetupSourceConnectionAsync(profile);
var (restClient, restCredential, restEntity) = await SetupDestinationConnectionAsync(profile);
// 2. Verifica che le connessioni siano valide
// Per i file, sourceManager sarà null (è normale), per database deve essere presente
if (profile.SourceType.ToLower() == "database" && sourceManager == null)
{
throw new InvalidOperationException("Impossibile stabilire connessione con il database sorgente");
}
if (profile.SourceType.ToLower() == "file" && string.IsNullOrEmpty(profile.SourceFilePath))
{
throw new InvalidOperationException("Percorso file sorgente non specificato nel profilo");
}
if (restClient == null || restEntity == null)
{
throw new InvalidOperationException("Impossibile stabilire connessione con la destinazione REST");
}
// 3. Ottieni i record dalla sorgente
var sourceRecords = await GetAllRecordsFromSourceAsync(profile, sourceManager);
if (!sourceRecords.Any())
{
_logger.LogWarning("Nessun record trovato nella sorgente per il profilo: {ProfileName}", profile.Name);
return 0;
}
_logger.LogInformation("Ottenuti {RecordCount} record dalla sorgente", sourceRecords.Count());
// 4. Parse field mappings
var fieldMappings = ParseFieldMappings(profile.FieldMappingJson);
if (!fieldMappings.Any())
{
throw new InvalidOperationException("Nessun mapping dei campi configurato per il profilo");
}
// 4.5. Parse External ID Relationships (Salesforce)
var externalIdRelationships = ParseExternalIdRelationships(profile.ExternalIdRelationshipsJson);
if (externalIdRelationships.Any())
{
_logger.LogInformation("Caricate {Count} External ID Relationships dal profilo", externalIdRelationships.Count);
}
// 4.6. Parse Default Values
var defaultValues = ParseDefaultValues(profile.DefaultValuesJson);
if (defaultValues.Any())
{
_logger.LogInformation("Caricati {Count} default values dal profilo", defaultValues.Count);
}
// 5. Determina se utilizzare Salesforce Composite API
bool useSalesforceComposite = restClient is DataConnection.REST.Implementations.SalesforceServiceClient;
if (useSalesforceComposite)
{
_logger.LogInformation("Utilizzo Salesforce Composite API per il trasferimento");
return await ExecuteDataTransferWithCompositeAsync(profile, sourceRecords, restClient, restEntity, restCredential!, fieldMappings, defaultValues, externalIdRelationships, enableDeletionSync);
}
else
{
_logger.LogInformation("Utilizzo metodo trasferimento standard per il trasferimento");
return await ExecuteDataTransferStandardAsync(profile, sourceRecords, restClient, restEntity, restCredential!, fieldMappings, defaultValues, externalIdRelationships, enableDeletionSync);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore durante il trasferimento dati per il profilo {ProfileName}", profile.Name);
throw;
}
finally
{
_logger.LogInformation("=== FINE TRASFERIMENTO DATI SCHEDULATO ===");
}
}
/// <summary>
/// Setup della connessione sorgente (database o file)
/// </summary>
private async Task<(IDatabaseManager? manager, DatabaseCredential? credential)> SetupSourceConnectionAsync(DataCouplerProfile profile)
{
_logger.LogInformation("SetupSourceConnectionAsync - SourceType: '{SourceType}', SourceCredentialId: {SourceCredentialId}, SourceFilePath: '{SourceFilePath}'",
profile.SourceType, profile.SourceCredentialId, profile.SourceFilePath);
// Se la sorgente è un file, non servono credenziali database
if (string.IsNullOrEmpty(profile.SourceType) ||
profile.SourceType.Equals("file", StringComparison.OrdinalIgnoreCase))
{
_logger.LogInformation("Sorgente tipo file, nessuna connessione database necessaria");
return (null, null);
}
// Per database, la credenziale è obbligatoria
if (!profile.SourceCredentialId.HasValue)
{
throw new InvalidOperationException("Credenziale sorgente non specificata per il database");
}
var sourceCredential = await _dataConnectionCredentialService.GetDatabaseCredentialAsync(profile.SourceCredentialId.Value);
if (sourceCredential == null)
{
throw new InvalidOperationException($"Credenziale database sorgente con ID {profile.SourceCredentialId.Value} non trovata");
}
// Applica override database se specificato (per le schedulazioni)
if (!string.IsNullOrEmpty(profile.SourceDatabaseName))
{
sourceCredential.DatabaseName = profile.SourceDatabaseName;
}
var databaseManager = await _connectionFactory.CreateDatabaseManagerAsync(sourceCredential.Name);
// Test connessione
var canConnect = await databaseManager.TestConnectionAsync();
if (!canConnect)
{
throw new InvalidOperationException($"Impossibile connettersi al database sorgente: {sourceCredential.Name}");
}
_logger.LogInformation("Connessione database sorgente stabilita: {CredentialName}", sourceCredential.Name);
return (databaseManager, sourceCredential);
}
/// <summary>
/// Setup della connessione destinazione REST
/// </summary>
private async Task<(IRestServiceClient? client, RestApiCredential? credential, RestEntitySummary? entity)> SetupDestinationConnectionAsync(DataCouplerProfile profile)
{
if (profile.DestinationType.ToLower() != "rest")
{
throw new NotSupportedException("Solo destinazioni REST sono supportate per le schedulazioni");
}
if (!profile.DestinationCredentialId.HasValue)
{
throw new InvalidOperationException("Credenziale destinazione REST non specificata");
}
var restCredential = await _dataConnectionCredentialService.GetRestApiCredentialAsync(profile.DestinationCredentialId.Value);
if (restCredential == null)
{
throw new InvalidOperationException($"Credenziale REST con ID {profile.DestinationCredentialId.Value} non trovata");
}
var restClient = await _connectionFactory.CreateRestServiceClientAsync(restCredential.Name);
// Autenticazione
var authResult = await restClient.AuthenticateAsync();
if (!authResult)
{
throw new InvalidOperationException($"Autenticazione fallita per il servizio REST: {restCredential.Name}");
}
// Ottieni l'entità REST (per le schedulazioni usiamo solo il nome)
RestEntitySummary? restEntity = null;
if (!string.IsNullOrEmpty(profile.DestinationEndpoint))
{
// Per le schedulazioni, creiamo un'entità fittizia con il nome dal profilo
restEntity = new RestEntitySummary
{
Name = profile.DestinationEndpoint,
Label = profile.DestinationEndpoint
};
}
if (restEntity == null)
{
throw new InvalidOperationException($"Entità REST non specificata nel profilo");
}
_logger.LogInformation("Connessione REST destinazione stabilita: {CredentialName}, Entità: {EntityName}",
restCredential.Name, restEntity.Name);
return (restClient, restCredential, restEntity);
}
/// <summary>
/// Ottiene tutti i record dalla sorgente
/// </summary>
private async Task<IEnumerable<Dictionary<string, object>>> GetAllRecordsFromSourceAsync(DataCouplerProfile profile, IDatabaseManager? databaseManager)
{
if (profile.SourceType.ToLower() == "database")
{
return await GetAllRecordsFromDatabaseAsync(profile, databaseManager!);
}
else if (profile.SourceType.ToLower() == "file")
{
return await GetAllRecordsFromFileAsync(profile);
}
throw new NotSupportedException($"Tipo sorgente non supportato: {profile.SourceType}");
}
/// <summary>
/// Parse del JSON dei field mappings
/// </summary>
private Dictionary<string, string> ParseFieldMappings(string? fieldMappingJson)
{
var mappings = new Dictionary<string, string>();
if (string.IsNullOrEmpty(fieldMappingJson))
{
_logger.LogWarning("Field mapping JSON è vuoto o null");
return mappings;
}
_logger.LogDebug("Parsing field mappings JSON: {FieldMappingJson}", fieldMappingJson);
try
{
// Usa le stesse opzioni di serializzazione del DataCouplerProfileService
var options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
var fieldMappingsList = JsonSerializer.Deserialize<List<FieldMappingDto>>(fieldMappingJson, options);
if (fieldMappingsList != null)
{
_logger.LogInformation("Trovati {Count} field mappings nel JSON", fieldMappingsList.Count);
foreach (var mapping in fieldMappingsList)
{
if (!string.IsNullOrEmpty(mapping.SourceField) && !string.IsNullOrEmpty(mapping.DestinationField))
{
mappings[mapping.SourceField] = mapping.DestinationField;
_logger.LogDebug("Mapping aggiunto: {SourceField} -> {DestinationField}",
mapping.SourceField, mapping.DestinationField);
}
else
{
_logger.LogWarning("Mapping incompleto ignorato: SourceField='{SourceField}', DestinationField='{DestinationField}'",
mapping.SourceField, mapping.DestinationField);
}
}
}
else
{
_logger.LogWarning("Deserializzazione ritornato null per field mappings JSON");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nel parsing dei field mappings: {FieldMappingJson}", fieldMappingJson);
}
_logger.LogInformation("Mappings completati: {Count} associazioni", mappings.Count);
return mappings;
}
/// <summary>
/// Deserializza gli External ID Relationships dal JSON del profilo
/// </summary>
private List<ExternalIdRelationshipDto> ParseExternalIdRelationships(string? externalIdRelationshipsJson)
{
var relationships = new List<ExternalIdRelationshipDto>();
if (string.IsNullOrEmpty(externalIdRelationshipsJson))
{
_logger.LogDebug("ExternalIdRelationships JSON è vuoto o null");
return relationships;
}
_logger.LogDebug("Parsing ExternalIdRelationships JSON: {Json}", externalIdRelationshipsJson);
try
{
var options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
var relationshipsList = JsonSerializer.Deserialize<List<ExternalIdRelationshipDto>>(externalIdRelationshipsJson, options);
if (relationshipsList != null)
{
relationships = relationshipsList;
_logger.LogInformation("Trovati {Count} External ID Relationships nel JSON", relationships.Count);
foreach (var rel in relationships)
{
_logger.LogDebug("External ID Relationship: {RelationshipName} - {RelatedObject}.{ExternalIdField} <- {SourceField}",
rel.RelationshipName, rel.RelatedObjectName, rel.ExternalIdField, rel.SourceField);
}
}
else
{
_logger.LogWarning("Deserializzazione ritornato null per ExternalIdRelationships JSON");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nel parsing degli ExternalIdRelationships: {Json}", externalIdRelationshipsJson);
}
return relationships;
}
/// <summary>
/// Parse del JSON dei default values
/// </summary>
private Dictionary<string, (object? Value, string? Type)> ParseDefaultValues(string? defaultValuesJson)
{
var defaultValues = new Dictionary<string, (object? Value, string? Type)>();
if (string.IsNullOrEmpty(defaultValuesJson))
{
_logger.LogDebug("DefaultValues JSON è vuoto o null");
return defaultValues;
}
_logger.LogDebug("Parsing DefaultValues JSON: {Json}", defaultValuesJson);
try
{
var options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
var deserializedDefaults = JsonSerializer.Deserialize<Dictionary<string, DefaultValueDto>>(defaultValuesJson, options);
if (deserializedDefaults != null)
{
foreach (var entry in deserializedDefaults)
{
defaultValues[entry.Key] = (entry.Value.Value, entry.Value.Type);
_logger.LogDebug("Default value: {Field} = {Value} ({Type})",
entry.Key, entry.Value.Value, entry.Value.Type);
}
_logger.LogInformation("Trovati {Count} default values nel JSON", defaultValues.Count);
}
else
{
_logger.LogWarning("Deserializzazione ritornato null per DefaultValues JSON");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nel parsing dei default values: {Json}", defaultValuesJson);
}
return defaultValues;
}
/// <summary>
/// Ottiene tutti i record dal database
/// </summary>
private async Task<IEnumerable<Dictionary<string, object>>> GetAllRecordsFromDatabaseAsync(DataCouplerProfile profile, IDatabaseManager databaseManager)
{
string query;
if (!string.IsNullOrEmpty(profile.SourceCustomQuery))
{
query = profile.SourceCustomQuery;
}
else if (!string.IsNullOrEmpty(profile.SourceTable))
{
var tableName = !string.IsNullOrEmpty(profile.SourceSchema)
? $"[{profile.SourceSchema}].[{profile.SourceTable}]"
: $"[{profile.SourceTable}]";
query = $"SELECT * FROM {tableName}";
}
else
{
throw new InvalidOperationException("Né query custom né tabella specificata per la sorgente database");
}
_logger.LogDebug("Esecuzione query sorgente: {Query}", query);
var results = await databaseManager.ExecuteRawQueryAsync(query, profile.SourceDatabaseName ?? "");
return results;
}
/// <summary>
/// Ottiene tutti i record da file CSV o Excel
/// </summary>
private async Task<IEnumerable<Dictionary<string, object>>> GetAllRecordsFromFileAsync(DataCouplerProfile profile)
{
if (string.IsNullOrEmpty(profile.SourceFilePath))
throw new InvalidOperationException("Percorso file sorgente non specificato");
if (!System.IO.File.Exists(profile.SourceFilePath))
throw new FileNotFoundException($"Il file '{profile.SourceFilePath}' non esiste");
var extension = System.IO.Path.GetExtension(profile.SourceFilePath).ToLowerInvariant();
_logger.LogInformation("Lettura file sorgente: {FilePath} (Tipo: {Extension})", profile.SourceFilePath, extension);
if (extension == ".csv")
{
return await ReadCsvFileAsync(profile.SourceFilePath);
}
else if (extension == ".xlsx" || extension == ".xls")
{
return await ReadExcelFileAsync(profile.SourceFilePath);
}
else
{
throw new NotSupportedException($"Formato file non supportato: {extension}. Utilizzare .csv, .xlsx o .xls");
}
}
/// <summary>
/// Legge un file CSV e restituisce i record come dizionari
/// </summary>
private async Task<IEnumerable<Dictionary<string, object>>> ReadCsvFileAsync(string filePath)
{
var dataRows = new List<Dictionary<string, object>>();
try
{
// Apri in modalità condivisa per permettere ad altri processi di accedere al file
using var stream = new System.IO.FileStream(filePath, System.IO.FileMode.Open, System.IO.FileAccess.Read, System.IO.FileShare.ReadWrite);
using var reader = new System.IO.StreamReader(stream);
var firstLine = await reader.ReadLineAsync();
if (string.IsNullOrEmpty(firstLine))
{
_logger.LogWarning("Il file CSV è vuoto: {FilePath}", filePath);
return dataRows;
}
// Detect separator automatically
var separator = DetectCsvSeparator(firstLine);
_logger.LogDebug("CSV separator rilevato: '{Separator}'", separator);
// Parse headers (first row)
var headers = ParseCsvLine(firstLine, separator);
_logger.LogInformation("CSV headers: {Headers} (Totale: {Count})", string.Join(", ", headers), headers.Count);
// Read data rows
string? line;
int rowNumber = 2;
while ((line = await reader.ReadLineAsync()) != null)
{
if (string.IsNullOrWhiteSpace(line)) continue;
var values = ParseCsvLine(line, separator);
var row = new Dictionary<string, object>();
for (int i = 0; i < headers.Count; i++)
{
var value = i < values.Count ? values[i] : "";
row[headers[i]] = string.IsNullOrEmpty(value) ? "" : value;
}
dataRows.Add(row);
rowNumber++;
}
_logger.LogInformation("File CSV letto con successo: {FilePath}, Record: {RecordCount}", filePath, dataRows.Count);
return dataRows;
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nella lettura del file CSV: {FilePath}", filePath);
throw;
}
}
/// <summary>
/// Legge un file Excel e restituisce i record come dizionari
/// </summary>
private async Task<IEnumerable<Dictionary<string, object>>> ReadExcelFileAsync(string filePath)
{
var dataRows = new List<Dictionary<string, object>>();
try
{
// Registra il provider di encoding per ExcelDataReader
System.Text.Encoding.RegisterProvider(System.Text.CodePagesEncodingProvider.Instance);
// Apri in modalità condivisa per permettere ad altri processi di accedere al file
using var stream = new System.IO.FileStream(filePath, System.IO.FileMode.Open, System.IO.FileAccess.Read, System.IO.FileShare.ReadWrite);
var extension = System.IO.Path.GetExtension(filePath).ToLowerInvariant();
ExcelDataReader.IExcelDataReader reader;
if (extension == ".xlsx")
{
reader = ExcelDataReader.ExcelReaderFactory.CreateOpenXmlReader(stream);
}
else if (extension == ".xls")
{
reader = ExcelDataReader.ExcelReaderFactory.CreateBinaryReader(stream);
}
else
{
throw new NotSupportedException($"Formato Excel non supportato: {extension}");
}
using (reader)
{
var configuration = new ExcelDataReader.ExcelDataSetConfiguration()
{
ConfigureDataTable = (_) => new ExcelDataReader.ExcelDataTableConfiguration()
{
UseHeaderRow = true
}
};
var dataSet = reader.AsDataSet(configuration);
_logger.LogInformation("File Excel letto: {FilePath}, Fogli: {SheetCount}", filePath, dataSet.Tables.Count);
// Legge il primo foglio (o tutti i fogli se necessario)
if (dataSet.Tables.Count > 0)
{
var table = dataSet.Tables[0];
var headers = new List<string>();
foreach (System.Data.DataColumn column in table.Columns)
{
headers.Add(column.ColumnName);
}
foreach (System.Data.DataRow dataRow in table.Rows)
{
var row = new Dictionary<string, object>();
foreach (var header in headers)
{
var value = dataRow[header];
row[header] = value == DBNull.Value ? "" : value?.ToString() ?? "";
}
dataRows.Add(row);
}
_logger.LogInformation("Foglio Excel '{SheetName}' letto con successo: {RecordCount} record",
table.TableName, dataRows.Count);
}
}
await Task.CompletedTask; // Per mantenere il metodo async
return dataRows;
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nella lettura del file Excel: {FilePath}", filePath);
throw;
}
}
/// <summary>
/// Rileva automaticamente il separatore CSV
/// </summary>
private char DetectCsvSeparator(string line)
{
var separators = new[] { ',', ';', '\t', '|' };
var maxCount = 0;
var detectedSeparator = ',';
foreach (var sep in separators)
{
var count = line.Count(c => c == sep);
if (count > maxCount)
{
maxCount = count;
detectedSeparator = sep;
}
}
return detectedSeparator;
}
/// <summary>
/// Parse di una riga CSV gestendo correttamente le virgolette
/// </summary>
private List<string> ParseCsvLine(string line, char separator = ',')
{
var result = new List<string>();
var current = new System.Text.StringBuilder();
bool inQuotes = false;
for (int i = 0; i < line.Length; i++)
{
char c = line[i];
if (c == '"')
{
if (inQuotes && i + 1 < line.Length && line[i + 1] == '"')
{
current.Append('"');
i++;
}
else
{
inQuotes = !inQuotes;
}
}
else if (c == separator && !inQuotes)
{
result.Add(current.ToString().Trim());
current.Clear();
}
else
{
current.Append(c);
}
}
result.Add(current.ToString().Trim());
return result;
}
/// <summary>
/// Metodo principale per trasferimento dati standard (non Salesforce Composite)
/// </summary>
private async Task<int> ExecuteDataTransferStandardAsync(
DataCouplerProfile profile,
IEnumerable<Dictionary<string, object>> sourceRecords,
IRestServiceClient restClient,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary<string, string> fieldMappings,
Dictionary<string, (object? Value, string? Type)> defaultValues,
List<ExternalIdRelationshipDto> externalIdRelationships,
bool enableDeletionSync = false)
{
_logger.LogInformation("Iniziando trasferimento dati standard per {RecordCount} record - DeletionSync: {DeletionSync}",
sourceRecords.Count(), enableDeletionSync);
int successCount = 0;
int errorCount = 0;
int recordNumber = 1;
foreach (var record in sourceRecords)
{
try
{
// 1. Trasforma il record utilizzando i field mappings, default values e External ID Relationships
var restData = TransformRecordForRest(record, fieldMappings, defaultValues, externalIdRelationships);
// 2. Gestione associazioni record se abilitata
string? entityId = null;
if (profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField))
{
entityId = await HandleRecordAssociation(profile, record, restClient, restEntity, restCredential, restData, fieldMappings);
}
// 3. Se non abbiamo un ID esistente, crea un nuovo record
if (string.IsNullOrEmpty(entityId))
{
var result = await restClient.CreateEntityAsync(restEntity.Name, restData);
if (result != null)
{
// Estrai l'ID dal risultato (diversi sistemi REST usano campi diversi)
entityId = result.ContainsKey("id") ? result["id"]?.ToString() :
result.ContainsKey("Id") ? result["Id"]?.ToString() :
result.ContainsKey("DocEntry") ? result["DocEntry"]?.ToString() : null;
successCount++;
_logger.LogDebug("Record {RecordNumber} creato con successo. ID: {EntityId}", recordNumber, entityId);
}
else
{
errorCount++;
_logger.LogWarning("Errore nella creazione del record {RecordNumber}: risultato null", recordNumber);
}
}
else
{
successCount++; // Record aggiornato tramite associazione
}
// 4. Salva associazione se abilitata e abbiamo un ID
if (profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField) && !string.IsNullOrEmpty(entityId))
{
await SaveRecordAssociation(profile, record, restEntity, restCredential, restData, entityId, fieldMappings);
}
}
catch (Exception ex)
{
errorCount++;
_logger.LogError(ex, "Errore nel trasferimento del record {RecordNumber}", recordNumber);
}
recordNumber++;
}
_logger.LogInformation("Trasferimento completato. Successi: {SuccessCount}, Errori: {ErrorCount}",
successCount, errorCount);
// Sincronizzazione cancellazioni (se abilitata)
if (enableDeletionSync && profile.UseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField))
{
try
{
_logger.LogInformation("SCHEDULED: Inizio sincronizzazione cancellazioni...");
// Estrai tutti i valori chiave presenti nella sorgente
var sourceKeyValues = sourceRecords
.Select(r => r.ContainsKey(profile.SourceKeyField) ? r[profile.SourceKeyField]?.ToString() : null)
.Where(k => !string.IsNullOrEmpty(k))
.Cast<string>()
.Distinct()
.ToList();
_logger.LogInformation("SCHEDULED: Trovati {Count} valori chiave nella sorgente", sourceKeyValues.Count);
// Sincronizza le cancellazioni
var deletionOptions = new DeletionSyncOptions
{
Action = DeletionAction.Delete // Default: elimina fisicamente
};
var deletionResult = await _deletionSyncService.SyncDeletionsAsync(
sourceKeyValues,
restEntity.Name,
restCredential.Name,
restClient,
deletionOptions);
if (deletionResult.DeletedRecordsDetected > 0)
{
_logger.LogInformation("SCHEDULED: Sincronizzazione cancellazioni completata - {Detected} rilevati, {Synced} sincronizzati, {Errors} errori",
deletionResult.DeletedRecordsDetected,
deletionResult.DeletedRecordsSynced,
deletionResult.SyncErrors);
}
}
catch (Exception delEx)
{
_logger.LogError(delEx, "SCHEDULED: Errore durante la sincronizzazione delle cancellazioni");
}
}
return successCount;
}
/// <summary>
/// Implementazione completa del trasferimento con Salesforce Composite API
/// Equivalente a StartDataTransferWithComposite del DataCoupler.razor.cs
/// </summary>
private async Task<int> ExecuteDataTransferWithCompositeAsync(
DataCouplerProfile profile,
IEnumerable<Dictionary<string, object>> sourceRecords,
IRestServiceClient restClient,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary<string, string> fieldMappings,
Dictionary<string, (object? Value, string? Type)> defaultValues,
List<ExternalIdRelationshipDto> externalIdRelationships,
bool enableDeletionSync = false)
{
_logger.LogInformation("Iniziando trasferimento dati COMPOSITE per {RecordCount} record - DeletionSync: {DeletionSync}",
sourceRecords.Count(), enableDeletionSync);
// Verifica che sia effettivamente un SalesforceServiceClient
if (!(restClient is DataConnection.REST.Implementations.SalesforceServiceClient salesforceClient))
{
_logger.LogWarning("Client REST non è SalesforceServiceClient, fallback al metodo standard");
return await ExecuteDataTransferStandardAsync(profile, sourceRecords, restClient, restEntity, restCredential, fieldMappings, defaultValues, externalIdRelationships, enableDeletionSync);
}
try
{
// 1. Trasforma i record e analizza le associazioni IN PARALLELO
var recordsForCreate = new ConcurrentBag<(Dictionary<string, object> transformedData, Dictionary<string, object> originalRecord, int recordNumber)>();
var recordsForUpdate = new ConcurrentBag<(Dictionary<string, object> transformedData, string entityId, Dictionary<string, object> originalRecord, int recordNumber, string newDataHash)>();
var recordsSkipped = new ConcurrentBag<(Dictionary<string, object> originalRecord, int recordNumber, string reason)>();
var recordErrors = new ConcurrentBag<(Dictionary<string, object> originalRecord, int recordNumber, string error)>();
// Cattura i valori condivisi per evitare race conditions
var currentEntityName = restEntity.Name;
var currentCredentialName = restCredential.Name; // Usa il nome della credenziale, non l'ID
var currentUseRecordAssociations = profile.UseRecordAssociations;
// Crea lista indicizzata per mantenere il record number
var indexedRecords = sourceRecords.Select((record, index) => new { Record = record, RecordNumber = index + 1 }).ToList();
_logger.LogInformation("COMPOSITE SCHEDULED: Inizio analisi di {RecordCount} record", indexedRecords.Count);
var analysisStartTime = DateTime.UtcNow;
// === STEP A: Bulk Pre-Discovery (1 query SQLite + poche SOQL IN invece di 2N+N) ===
// Pre-calcolo locale: source key per ogni record (operazione thread-safe)
var sourceKeyByRecordIndex = new Dictionary<int, string>(indexedRecords.Count);
foreach (var idx in indexedRecords)
{
var key = GenerateSourceKey(idx.Record, profile.SourceKeyField);
if (!string.IsNullOrEmpty(key))
sourceKeyByRecordIndex[idx.RecordNumber] = key;
}
Dictionary<string, KeyAssociation> associationsByKey = new(StringComparer.Ordinal);
if (currentUseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField) && sourceKeyByRecordIndex.Count > 0)
{
var commonRequest = new PreDiscoveryRequest
{
SourceKeyField = profile.SourceKeyField,
DestinationEntity = currentEntityName,
CredentialName = currentCredentialName,
DestinationKeyField = "Id",
FieldMappings = fieldMappings,
RestClient = restClient,
EnablePreDiscovery = true,
UseParallelMethod = true,
IsScheduledTransfer = true,
SourceType = profile.SourceType
};
associationsByKey = await _associationService.BatchFindOrCreateAssociationsAsync(
sourceKeyByRecordIndex.Values, commonRequest);
_logger.LogInformation("COMPOSITE SCHEDULED: Bulk Pre-Discovery completata - {Found}/{Total} associazioni risolte",
associationsByKey.Count, sourceKeyByRecordIndex.Count);
}
// === STEP B: Analisi locale parallela per decidere create/update/skip ===
// Nessuna chiamata DB o REST in questo loop — solo memoria.
var processingTasks = indexedRecords.Select(async indexedRecord =>
{
try
{
var record = indexedRecord.Record;
var recordNumber = indexedRecord.RecordNumber;
// Trasforma il record in base ai mapping e External ID Relationships (operazione locale, thread-safe)
var restData = TransformRecordForRest(record, fieldMappings, defaultValues, externalIdRelationships);
// Genera la chiave sorgente e l'hash dei dati per questo record (include MAPPING_SIGNATURE)
var sourceKey = GenerateSourceKey(record, profile.SourceKeyField);
var currentDataHash = GenerateDataHash(restData, fieldMappings);
// Analizza le associazioni per capire se aggiornare, creare o saltare
if (currentUseRecordAssociations && !string.IsNullOrEmpty(sourceKey))
{
associationsByKey.TryGetValue(sourceKey, out var existingAssociation);
if (existingAssociation != null && existingAssociation.IsActive)
{
// 🔍 PRE-DISCOVERY: Usa il servizio per verificare se è un'associazione Pre-Discovery
var isPreDiscoveryAssociation = _associationService.IsPreDiscoveryAssociation(existingAssociation);
// Se l'associazione è stata appena creata dal Pre-Discovery, FORZA l'aggiornamento
if (isPreDiscoveryAssociation)
{
// Forza aggiornamento senza controllo hash
recordsForUpdate.Add((restData, existingAssociation.DestinationId, record, recordNumber, currentDataHash));
_logger.LogInformation("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per AGGIORNAMENTO FORZATO (Pre-Discovery) - EntityId: {EntityId}",
recordNumber, existingAssociation.DestinationId);
}
else
{
// CONTROLLO HASH: Verifica se i dati sono cambiati (solo per associazioni esistenti)
var existingHash = existingAssociation.Data_Hash;
if (!string.IsNullOrEmpty(existingHash) && existingHash.Equals(currentDataHash, StringComparison.OrdinalIgnoreCase))
{
// I dati non sono cambiati, salta questo record
recordsSkipped.Add((record, recordNumber, "Dati non modificati (hash identico)"));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} saltato - hash identico: {Hash}",
recordNumber, currentDataHash);
}
else
{
// I dati sono cambiati o l'hash è vuoto, procedi con l'aggiornamento
recordsForUpdate.Add((restData, existingAssociation.DestinationId, record, recordNumber, currentDataHash));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per aggiornamento (EntityId: {EntityId}) - hash diverso: old={OldHash}, new={NewHash}",
recordNumber, existingAssociation.DestinationId, existingHash ?? "NULL", currentDataHash);
}
}
}
else
{
// Record da creare (nessuna associazione esistente)
recordsForCreate.Add((restData, record, recordNumber));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per creazione", recordNumber);
}
}
else
{
// Record da creare (no associazioni)
recordsForCreate.Add((restData, record, recordNumber));
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} marcato per creazione (no associazioni)", recordNumber);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nella trasformazione del record {RecordNumber}", indexedRecord.RecordNumber);
recordErrors.Add((indexedRecord.Record, indexedRecord.RecordNumber, ex.Message));
}
});
// Attendi il completamento di tutte le operazioni parallele
await Task.WhenAll(processingTasks);
var analysisEndTime = DateTime.UtcNow;
var analysisElapsed = (analysisEndTime - analysisStartTime).TotalMilliseconds;
// Converti i ConcurrentBag in liste per il resto del processing
var finalRecordsForCreate = recordsForCreate.ToList();
var finalRecordsForUpdate = recordsForUpdate.ToList();
var finalRecordsSkipped = recordsSkipped.ToList();
var finalRecordErrors = recordErrors.ToList();
_logger.LogInformation("COMPOSITE SCHEDULED: Analisi parallela completata in {ElapsedMs}ms - {CreateCount} record da creare, {UpdateCount} record da aggiornare, {SkippedCount} record saltati, {ErrorCount} errori",
analysisElapsed, finalRecordsForCreate.Count, finalRecordsForUpdate.Count, finalRecordsSkipped.Count, finalRecordErrors.Count);
// 2. Esegui le chiamate composite in parallelo
var createTask = Task.FromResult(new List<DataConnection.REST.Implementations.SalesforceServiceClient.CompositeOperationResult>());
var updateTask = Task.FromResult(new List<DataConnection.REST.Implementations.SalesforceServiceClient.CompositeOperationResult>());
if (finalRecordsForCreate.Any())
{
var createData = finalRecordsForCreate.Select(r => r.transformedData).ToList();
createTask = salesforceClient.BatchCreateEntitiesAsync(restEntity.Name, createData);
}
if (finalRecordsForUpdate.Any())
{
var updateData = finalRecordsForUpdate.ToDictionary(
r => r.entityId,
r => r.transformedData);
updateTask = salesforceClient.BatchUpdateEntitiesAsync(restEntity.Name, updateData);
}
// Attendi entrambe le operazioni
await Task.WhenAll(createTask, updateTask);
var createResults = await createTask;
var updateResults = await updateTask;
// 3. Processa i risultati delle creazioni
int successCount = 0;
int errorCount = finalRecordErrors.Count;
int updatedCount = 0;
// Lista per raccogliere le task di creazione associazioni
var createAssociationTasks = new List<Task>();
for (int i = 0; i < createResults.Count; i++)
{
var result = createResults[i];
var originalData = finalRecordsForCreate[i];
if (result.Success)
{
successCount++;
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} creato con successo. EntityId: {EntityId}",
originalData.recordNumber, result.EntityId);
// Aggiungi task di creazione associazione alla lista (esecuzione parallela)
if (currentUseRecordAssociations && !string.IsNullOrEmpty(result.EntityId))
{
var dataHashForAssociation = GenerateDataHash(originalData.transformedData, fieldMappings);
var associationTask = CreateAssociationAsync(profile, originalData.originalRecord, restCredential, result.EntityId, originalData.recordNumber, dataHashForAssociation);
createAssociationTasks.Add(associationTask);
}
}
else
{
errorCount++;
_logger.LogError("COMPOSITE SCHEDULED: Errore creazione record {RecordNumber}: {Error}",
originalData.recordNumber, result.ErrorMessage);
}
}
// 4. Processa i risultati degli aggiornamenti
var updateAssociationTasks = new List<Task>();
for (int i = 0; i < updateResults.Count; i++)
{
var result = updateResults[i];
var originalData = finalRecordsForUpdate[i];
if (result.Success)
{
updatedCount++;
_logger.LogDebug("COMPOSITE SCHEDULED: Record {RecordNumber} aggiornato con successo. EntityId: {EntityId}",
originalData.recordNumber, result.EntityId);
// Aggiungi task di aggiornamento associazione alla lista (esecuzione parallela)
if (currentUseRecordAssociations && !string.IsNullOrEmpty(result.EntityId))
{
var updateHashTask = UpdateAssociationHashAsync(profile, originalData.originalRecord, restCredential, result.EntityId, originalData.newDataHash);
updateAssociationTasks.Add(updateHashTask);
}
}
else
{
errorCount++;
_logger.LogError("COMPOSITE SCHEDULED: Errore aggiornamento record {RecordNumber}: {Error}",
originalData.recordNumber, result.ErrorMessage);
}
}
// 5. Esegui tutte le operazioni di associazione in parallelo
var allAssociationTasks = createAssociationTasks.Concat(updateAssociationTasks).ToList();
if (allAssociationTasks.Any())
{
_logger.LogInformation("COMPOSITE SCHEDULED: Avvio di {TaskCount} operazioni di associazione in parallelo ({CreateCount} creazioni, {UpdateCount} aggiornamenti)",
allAssociationTasks.Count, createAssociationTasks.Count, updateAssociationTasks.Count);
var startTime = DateTime.UtcNow;
await Task.WhenAll(allAssociationTasks);
var endTime = DateTime.UtcNow;
_logger.LogInformation("COMPOSITE SCHEDULED: Operazioni di associazione completate in {ElapsedMs}ms",
(endTime - startTime).TotalMilliseconds);
}
var skippedCount = finalRecordsSkipped.Count;
var totalProcessed = successCount + updatedCount;
_logger.LogInformation("COMPOSITE SCHEDULED: Trasferimento completato. Creazioni: {SuccessCount}, Aggiornamenti: {UpdatedCount}, Saltati: {SkippedCount}, Errori: {ErrorCount}",
successCount, updatedCount, skippedCount, errorCount);
// Sincronizzazione cancellazioni (se abilitata)
if (enableDeletionSync && currentUseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField))
{
try
{
_logger.LogInformation("COMPOSITE SCHEDULED: Inizio sincronizzazione cancellazioni...");
// Estrai tutti i valori chiave presenti nella sorgente
var sourceKeyValues = sourceRecords
.Select(r => r.ContainsKey(profile.SourceKeyField) ? r[profile.SourceKeyField]?.ToString() : null)
.Where(k => !string.IsNullOrEmpty(k))
.Cast<string>()
.Distinct()
.ToList();
_logger.LogInformation("COMPOSITE SCHEDULED: Trovati {Count} valori chiave nella sorgente", sourceKeyValues.Count);
// Sincronizza le cancellazioni
var deletionOptions = new DeletionSyncOptions
{
Action = DeletionAction.Delete // Default: elimina fisicamente
};
var deletionResult = await _deletionSyncService.SyncDeletionsAsync(
sourceKeyValues,
currentEntityName,
currentCredentialName,
restClient,
deletionOptions);
if (deletionResult.DeletedRecordsDetected > 0)
{
_logger.LogInformation("COMPOSITE SCHEDULED: Sincronizzazione cancellazioni completata - {Detected} rilevati, {Synced} sincronizzati, {Errors} errori",
deletionResult.DeletedRecordsDetected,
deletionResult.DeletedRecordsSynced,
deletionResult.SyncErrors);
}
}
catch (Exception delEx)
{
_logger.LogError(delEx, "COMPOSITE SCHEDULED: Errore durante la sincronizzazione delle cancellazioni");
}
}
return totalProcessed;
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore generale nel trasferimento dati");
throw;
}
}
#region Helper Methods
/// <summary>
/// Trasforma un record sorgente in formato REST utilizzando i field mappings
/// </summary>
private Dictionary<string, object> TransformRecordForRest(
Dictionary<string, object> sourceRecord,
Dictionary<string, string> fieldMappings,
Dictionary<string, (object? Value, string? Type)> defaultValues,
List<ExternalIdRelationshipDto>? externalIdRelationships = null)
{
var restData = new Dictionary<string, object>();
// Costruisce un set dei campi sorgente usati esclusivamente come External ID Relationship:
// questi NON devono essere inviati anche come mapping normale (stessa logica della UI manuale).
var externalIdSourceFields = (externalIdRelationships != null)
? externalIdRelationships
.Where(r => !string.IsNullOrWhiteSpace(r.SourceField))
.Select(r => r.SourceField)
.ToHashSet()
: new HashSet<string>();
// 1. Applica field mappings (escludendo i campi sorgente usati per External ID Relationships)
foreach (var mapping in fieldMappings)
{
// Salta il campo se è usato come sorgente in un External ID Relationship
if (externalIdSourceFields.Contains(mapping.Key))
{
_logger.LogDebug("Campo sorgente '{SourceField}' usato in External ID Relationship, escluso dal mapping normale", mapping.Key);
continue;
}
if (sourceRecord.ContainsKey(mapping.Key))
{
var value = sourceRecord[mapping.Key];
// Trasforma il valore se necessario
var transformedValue = TransformValueForRest(value);
if (transformedValue != null)
{
restData[mapping.Value] = transformedValue;
}
}
}
// 2. Applica default values (solo se il campo non è già stato mappato)
foreach (var defaultValue in defaultValues)
{
if (!restData.ContainsKey(defaultValue.Key))
{
var (value, type) = defaultValue.Value;
if (value != null)
{
restData[defaultValue.Key] = value;
_logger.LogDebug("Applicato default value: {Field} = {Value} ({Type})",
defaultValue.Key, value, type);
}
}
}
// 3. Aggiungi External ID Relationships (per Salesforce)
if (externalIdRelationships != null && externalIdRelationships.Any())
{
foreach (var relationship in externalIdRelationships)
{
if (!string.IsNullOrWhiteSpace(relationship.SourceField) &&
sourceRecord.ContainsKey(relationship.SourceField))
{
var sourceValue = sourceRecord[relationship.SourceField];
var transformedValue = TransformValueForRest(sourceValue);
if (transformedValue != null)
{
// Crea il dizionario annidato per l'External ID Relationship
// Formato: { "Account__r": { "Country__c": "US" } }
var externalIdObject = new Dictionary<string, object>
{
{ relationship.ExternalIdField, transformedValue }
};
restData[relationship.RelationshipName] = externalIdObject;
_logger.LogDebug("Aggiunta External ID Relationship: {RelationshipName} → {ExternalIdField} = {Value}",
relationship.RelationshipName, relationship.ExternalIdField, transformedValue);
}
}
}
}
return restData;
}
/// <summary>
/// Trasforma un valore per il formato REST
/// </summary>
private object? TransformValueForRest(object? value)
{
if (value == null || value == DBNull.Value)
return null;
// Trasformazioni specifiche per tipo
if (value is DateTime dateTime)
{
return dateTime.ToString("yyyy-MM-ddTHH:mm:ss.fffZ", CultureInfo.InvariantCulture);
}
if (value is decimal dec)
{
return dec.ToString(CultureInfo.InvariantCulture);
}
if (value is double dbl)
{
return dbl.ToString(CultureInfo.InvariantCulture);
}
if (value is float flt)
{
return flt.ToString(CultureInfo.InvariantCulture);
}
return value;
}
/// <summary>
/// Gestisce l'associazione dei record per evitare duplicati
/// </summary>
private async Task<string?> HandleRecordAssociation(
DataCouplerProfile profile,
Dictionary<string, object> sourceRecord,
IRestServiceClient restClient,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary<string, object> restData,
Dictionary<string, string> fieldMappings)
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !sourceRecord.ContainsKey(profile.SourceKeyField))
return null;
var sourceKey = sourceRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return null;
try
{
// Cerca associazione esistente usando il metodo parallelo
var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(
sourceKey, restEntity.Name, restCredential.Name);
if (existingAssociation != null && existingAssociation.IsActive)
{
// Genera l'hash dei dati correnti (solo dati trasformati/mappati, include MAPPING_SIGNATURE)
var currentDataHash = GenerateDataHash(restData, fieldMappings);
var existingHash = existingAssociation.Data_Hash;
// Verifica se i dati sono cambiati
if (!string.IsNullOrEmpty(existingHash) && existingHash.Equals(currentDataHash, StringComparison.OrdinalIgnoreCase))
{
// I dati non sono cambiati, salta l'aggiornamento ma aggiorna LastVerifiedAt
existingAssociation.LastVerifiedAt = DateTime.Now;
await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation);
_logger.LogDebug("Record non modificato, aggiornamento saltato. KeyValue: {KeyValue}, EntityId: {EntityId}, Hash: {Hash}",
sourceKey, existingAssociation.DestinationId, currentDataHash);
return existingAssociation.DestinationId;
}
// I dati sono cambiati, procedi con l'aggiornamento
var updateResult = await restClient.UpdateEntityAsync(restEntity.Name, existingAssociation.DestinationId, restData);
if (updateResult != null)
{
// Aggiorna l'hash e LastVerifiedAt nell'associazione
existingAssociation.Data_Hash = currentDataHash;
existingAssociation.UpdatedAt = DateTime.Now;
existingAssociation.LastVerifiedAt = DateTime.Now;
await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation);
_logger.LogDebug("Record aggiornato tramite associazione. KeyValue: {KeyValue}, EntityId: {EntityId}, NewHash: {Hash}",
sourceKey, existingAssociation.DestinationId, currentDataHash);
return existingAssociation.DestinationId;
}
else
{
_logger.LogWarning("Fallimento aggiornamento record associato. KeyValue: {KeyValue}, EntityId: {EntityId}",
sourceKey, existingAssociation.DestinationId);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nella gestione dell'associazione per KeyValue: {KeyValue}", sourceKey);
}
return null;
}
/// <summary>
/// Salva l'associazione tra record sorgente e destinazione
/// </summary>
private async Task SaveRecordAssociation(
DataCouplerProfile profile,
Dictionary<string, object> sourceRecord,
RestEntitySummary restEntity,
RestApiCredential restCredential,
Dictionary<string, object> restData,
string entityId,
Dictionary<string, string> fieldMappings)
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !sourceRecord.ContainsKey(profile.SourceKeyField))
return;
var sourceKey = sourceRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return;
try
{
// Genera l'hash dei dati per il controllo delle modifiche (solo dati trasformati/mappati, include MAPPING_SIGNATURE)
var dataHash = GenerateDataHash(restData, fieldMappings);
var association = new KeyAssociation
{
KeyValue = sourceKey,
SourceKeyField = profile.SourceKeyField,
DestinationKeyField = "Id", // Assumiamo "Id" come campo di default
DestinationId = entityId,
DestinationEntity = restEntity.Name,
RestCredentialName = restCredential.Name, // Usa il nome della credenziale
IsActive = true,
Data_Hash = dataHash,
CreatedAt = DateTime.Now,
UpdatedAt = DateTime.Now,
LastVerifiedAt = DateTime.Now,
AdditionalInfo = JsonSerializer.Serialize(new
{
TransferDate = DateTime.Now,
SourceType = profile.SourceType,
DestinationType = profile.DestinationType,
ProfileName = profile.Name,
ScheduledTransfer = true,
StandardTransfer = true,
DataHashGenerated = true
})
};
await _dataConnectionCredentialService.SaveKeyAssociationParallelAsync(association);
_logger.LogDebug("Associazione salvata. KeyValue: {KeyValue}, EntityId: {EntityId}, Hash: {Hash}",
sourceKey, entityId, dataHash);
}
catch (Exception ex)
{
_logger.LogError(ex, "Errore nel salvaggio dell'associazione per KeyValue: {KeyValue}", sourceKey);
}
}
/// <summary>
/// Genera la chiave sorgente per un record
/// </summary>
private string GenerateSourceKey(Dictionary<string, object> record, string? sourceKeyField)
{
if (string.IsNullOrEmpty(sourceKeyField) || !record.ContainsKey(sourceKeyField))
return string.Empty;
var keyValue = record[sourceKeyField];
return keyValue?.ToString() ?? string.Empty;
}
/// <summary>
/// Genera un hash SHA256 dei dati del record passato come parametro.
/// Utilizzato per rilevare cambiamenti nei dati e ottimizzare il trasferimento.
/// Calcola l'hash SOLO sui campi presenti nel record, in ordine alfabetico.
/// DEVE essere identico al metodo in DataCoupler.razor.cs per garantire consistenza.
/// </summary>
private string GenerateDataHash(Dictionary<string, object> record, Dictionary<string, string>? fieldMappings = null)
{
try
{
var valuesForHash = new List<string>();
// Ordina le chiavi alfabeticamente per garantire consistenza
var orderedKeys = record.Keys.OrderBy(k => k).ToList();
// Aggiungi i valori dei dati per ogni campo presente nel record
foreach (var key in orderedKeys)
{
var value = record[key];
var normalizedValue = value?.ToString()?.Trim() ?? "";
valuesForHash.Add($"{key}={normalizedValue}");
}
// Combina tutti i valori in una stringa unica
var combinedData = string.Join("|", valuesForHash);
_logger.LogDebug("Hash dei dati generato da: {CombinedData}", combinedData);
// Calcola l'hash SHA256 (stesso algoritmo di DataCoupler.razor.cs)
using (var sha256 = System.Security.Cryptography.SHA256.Create())
{
var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(combinedData));
var hashString = Convert.ToHexString(hashBytes);
_logger.LogDebug("Hash SHA256 generato: {Hash} per {FieldCount} campi", hashString, orderedKeys.Count);
return hashString;
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Errore nella generazione dell'hash per il record");
return string.Empty;
}
}
/// <summary>
/// Crea una nuova associazione record in modo asincrono
/// </summary>
private async Task CreateAssociationAsync(DataCouplerProfile profile, Dictionary<string, object> originalRecord, RestApiCredential restCredential, string entityId, int recordNumber, string dataHash)
{
try
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !originalRecord.ContainsKey(profile.SourceKeyField))
return;
var sourceKey = originalRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return;
// Calcola il MappingCount in modo sicuro e trova il campo destinazione mappato al campo chiave sorgente
int mappingCount = 0;
string? mappedDestinationField = null;
try
{
if (!string.IsNullOrEmpty(profile.FieldMappingJson))
{
var mappings = ParseFieldMappings(profile.FieldMappingJson);
mappingCount = mappings?.Count ?? 0;
// Cerca il campo destinazione mappato al campo chiave sorgente
if (mappings != null && !string.IsNullOrEmpty(profile.SourceKeyField))
{
if (mappings.TryGetValue(profile.SourceKeyField, out var destinationFieldName))
{
mappedDestinationField = destinationFieldName;
_logger.LogDebug("SCHEDULED MAPPING: Campo sorgente '{SourceField}' è mappato al campo destinazione '{DestField}'",
profile.SourceKeyField, mappedDestinationField);
}
else
{
_logger.LogWarning("SCHEDULED MAPPING: Campo chiave sorgente '{SourceKeyField}' NON trovato nei mappings del profilo {ProfileName}",
profile.SourceKeyField, profile.Name);
}
}
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Errore nel calcolo del MappingCount per l'associazione del record {RecordNumber}", recordNumber);
}
var association = new KeyAssociation
{
KeyValue = sourceKey,
SourceKeyField = profile.SourceKeyField ?? "",
DestinationKeyField = "Id", // Campo ID standard per REST
MappedDestinationField = mappedDestinationField, // Campo destinazione mappato al campo chiave sorgente
DestinationEntity = profile.DestinationEndpoint ?? "",
DestinationId = entityId,
RestCredentialName = restCredential.Name, // Usa il nome della credenziale
IsActive = true,
Data_Hash = dataHash,
CreatedAt = DateTime.Now,
UpdatedAt = DateTime.Now,
LastVerifiedAt = DateTime.Now,
AdditionalInfo = JsonSerializer.Serialize(new
{
TransferDate = DateTime.Now,
RecordNumber = recordNumber,
MappingCount = mappingCount,
SourceType = profile.SourceType,
DestinationType = profile.DestinationType,
ProfileName = profile.Name,
ScheduledTransfer = true,
CompositeTransfer = true,
DataHashGenerated = true
})
};
var associationId = await _dataConnectionCredentialService.SaveKeyAssociationParallelAsync(association);
_logger.LogDebug("COMPOSITE SCHEDULED: Associazione creata con ID: {AssociationId} per record {RecordNumber} - Key: {SourceKey}, EntityId: {EntityId}, Hash: {Hash}, MappedField: {MappedField}",
associationId, recordNumber, sourceKey, entityId, dataHash, mappedDestinationField ?? "N/A");
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nella creazione dell'associazione per il record {RecordNumber}", recordNumber);
}
}
/// <summary>
/// Aggiorna l'hash di un'associazione esistente
/// </summary>
private async Task UpdateAssociationHashAsync(DataCouplerProfile profile, Dictionary<string, object> originalRecord, RestApiCredential restCredential, string entityId, string newDataHash)
{
try
{
if (string.IsNullOrEmpty(profile.SourceKeyField) || !originalRecord.ContainsKey(profile.SourceKeyField))
return;
var sourceKey = originalRecord[profile.SourceKeyField]?.ToString();
if (string.IsNullOrEmpty(sourceKey))
return;
// Trova l'associazione esistente usando il metodo parallelo
var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(
sourceKey, profile.DestinationEndpoint ?? "", restCredential.Name);
if (existingAssociation != null)
{
existingAssociation.Data_Hash = newDataHash;
existingAssociation.UpdatedAt = DateTime.Now;
existingAssociation.LastVerifiedAt = DateTime.Now;
// Usa UpdateKeyAssociationAsync per l'aggiornamento invece di SaveKeyAssociationParallelAsync
await _dataConnectionCredentialService.UpdateKeyAssociationAsync(existingAssociation);
_logger.LogDebug("COMPOSITE SCHEDULED: Hash associazione aggiornato per entityId {EntityId} - Key: {SourceKey}, NewHash: {Hash}",
entityId, sourceKey, newDataHash);
}
else
{
_logger.LogWarning("COMPOSITE SCHEDULED: Associazione non trovata per aggiornamento hash - EntityId: {EntityId}, SourceKey: {SourceKey}",
entityId, sourceKey);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "COMPOSITE SCHEDULED: Errore nell'aggiornamento dell'hash per EntityId: {EntityId}", entityId);
}
}
#endregion
}