From 344853fde9b125f6677f4f04eb0dc0333a566ea1 Mon Sep 17 00:00:00 2001 From: Alessio Dal Santo Date: Thu, 28 May 2026 11:15:18 +0200 Subject: [PATCH] [Feature/Perf] Ottimizzazioni bulk pre-discovery, batch deletion sync e supporto OLE DB / Salesforce client_credentials MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Bulk Pre-Discovery e riduzione query SQLite/SOQL ### KeyAssociationService — FindAssociationsByKeyValuesBulkAsync (nuovo) - Aggiunta query bulk 'WHERE KeyValue IN (...)' per recuperare N associazioni con 1 sola query SQLite (chunking a 500 chiavi per rispettare il limite ~999 parametri di SQLite) - Aggiunta interfaccia IKeyAssociationService e delegata in DataConnectionCredentialService / IDataConnectionCredentialService ### AssociationService — BatchFindOrCreateAssociationsAsync (nuovo) - Nuovo metodo bulk che sostituisce i loop per-record durante l'analisi composite: 1) 1 query SQLite bulk per tutte le chiavi 2) Per le chiavi non trovate: SOQL 'IN (...)' su Salesforce in chunk da 200 via BatchExecuteQueriesAsync (ceil(K/25) HTTP Composite call invece di K singole) 3) Salvataggio parallelo delle associazioni pre-discovery scoperte - Fallback per-record automatico per client REST non Salesforce - Aggiornata interfaccia IAssociationService con documentazione XML completa ### DataCoupler.razor.cs — STEP A/B nel flusso COMPOSITE - Pre-Discovery spostata FUORI dal loop parallelo (STEP A, prima dell'analisi) - associationsByKey pre-popolato con BatchFindOrCreateAssociationsAsync - STEP B: il loop parallelo usa TryGetValue O(1) invece di query async per record - Rimozione blocco ~40 righe di per-record lookup / fallback duplicati ## Salesforce Composite API — Batch Delete e Patch ### SalesforceServiceClient — metodi batch (nuovi) - BatchDeleteEntitiesAsync: elimina N record con ceil(N/25) Composite call invece di N - BatchPatchSingleFieldAsync: aggiorna un singolo campo su N record tramite BatchUpdateEntitiesAsync ### DeletionSyncService — refactoring batch - ExecuteBatchedSalesforceDeletionsAsync: orchestrazione batch per Delete / Deactivate / Mark su Salesforce - ExecuteSequentialDeletionsAsync: loop sequenziale esistente estratto in metodo riutilizzabile - Dispatcher: Salesforce -> batch Composite, altri client REST -> sequenziale ## Supporto OLE DB (database) ### DatabaseSchemaProviderFactory - Aggiunto case DatabaseType.OleDb -> new OleDbSchemaProvider() nel factory switch ### DatabaseMethod.cs - Aggiunto metodo IsOleDbConnection() (parallelo a IsOdbcConnection()) - Query validation e manager temporaneo estesi a OLE DB oltre che ODBC - GetLimitedQuery: aggiunto case OleDb -> 'SELECT TOP N FROM (subquery)' ## Salesforce OAuth2 — fix client_credentials ### CredentialService.cs - Aggiunto 'GrantType' alla HashSet serviceSpecificKeys per preservarlo nella serializzazione AdditionalParameters ### DataConnectionCredentialService.cs - Refactored BuildRestServiceOptions in helper statico riutilizzato da entrambi i metodi GetRestServiceOptions - Mapping coerente ClientId/ClientSecret/GrantType per Salesforce (allineato a DataConnectionFactory) - TestSalesforceOAuthLogin: branch esplicito per client_credentials (no username/password/token) con validazione preventiva ClientId+ClientSecret obbligatori - Log flow label (password|client_credentials) in tutti i messaggi di autenticazione ## VS Code tasks ### .vscode/tasks.json - Rimosso task generico 'Publish Data_Coupler' - Aggiunti due task separati: win-x64 e win-x86, entrambi SingleFile + Self-Contained + ReadyToRun --- .vscode/tasks.json | 36 ++- .../Services/CredentialService.cs | 8 +- .../Services/IKeyAssociationService.cs | 10 + .../Services/KeyAssociationService.cs | 61 ++++- .../IDataConnectionCredentialService.cs | 8 + .../DataConnectionCredentialService.cs | 124 ++++++--- .../DB/EF/DatabaseSchemaProviderFactory.cs | 5 +- .../SalesforceServiceClient.cs | 171 +++++++++++- .../Extensions/DataCoupler/DatabaseMethod.cs | 27 +- Data_Coupler/Pages/DataCoupler.razor.cs | 79 +++--- Data_Coupler/Services/AssociationService.cs | 239 ++++++++++++++++ Data_Coupler/Services/DeletionSyncService.cs | 258 +++++++++++++----- .../ScheduledProfileExecutionService.cs | 83 +++--- 13 files changed, 886 insertions(+), 223 deletions(-) diff --git a/.vscode/tasks.json b/.vscode/tasks.json index c0b1c6e..b329ecb 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -29,17 +29,39 @@ } }, { - "label": "Publish Data_Coupler", + "label": "Publish Data_Coupler Temp SingleFile Self-Contained Ready-To-Run win-x64", + "detail": "Publish the Data Coupler 64-bit with a Single File, Self Contained, Ready-To-Run", "type": "shell", "command": "dotnet", "args": [ "publish", - "--configuration", - "Release", - "--output", - "${workspaceFolder}/publish", - "--project", - "Data_Coupler/Data_Coupler.csproj" + "Data_Coupler/Data_Coupler.csproj", + "-c", "Release", + "-r", "win-x64", + "--self-contained", "true", + "-p:PublishSingleFile=true", + "-p:PublishReadyToRun=true", + "-p:PublishTrimmed=false", + "-o", "C:\\Temp\\Publish\\Data_Coupler" + ], + "group": "build", + "problemMatcher": [] + }, + { + "label": "Publish Data_Coupler Temp SingleFile Self-Contained Ready-To-Run win-x86", + "detail": "Publish the Data Coupler 32-bit with a Single File, Self Contained, Ready-To-Run", + "type": "shell", + "command": "dotnet", + "args": [ + "publish", + "Data_Coupler/Data_Coupler.csproj", + "-c", "Release", + "-r", "win-x86", + "--self-contained", "true", + "-p:PublishSingleFile=true", + "-p:PublishReadyToRun=true", + "-p:PublishTrimmed=false", + "-o", "C:\\Temp\\Publish\\Data_Coupler_x86" ], "group": "build", "problemMatcher": [] diff --git a/CredentialManager/Services/CredentialService.cs b/CredentialManager/Services/CredentialService.cs index ea09c93..83e598b 100644 --- a/CredentialManager/Services/CredentialService.cs +++ b/CredentialManager/Services/CredentialService.cs @@ -806,11 +806,11 @@ public class CredentialService : ICredentialService } // Copia tutti i parametri che non sono specifici del servizio - var serviceSpecificKeys = new HashSet - { + var serviceSpecificKeys = new HashSet + { "CompanyDatabase", "Language", "Version", "UseTrustedConnection", - "SecurityToken", "ClientId", "ClientSecret", "ApiVersion", - "IsSandbox", "UseSoapApi", "RefreshToken", "AccessToken", "TokenExpiry" + "SecurityToken", "ClientId", "ClientSecret", "ApiVersion", + "IsSandbox", "UseSoapApi", "GrantType", "RefreshToken", "AccessToken", "TokenExpiry" }; foreach (var param in additionalParams) diff --git a/CredentialManager/Services/IKeyAssociationService.cs b/CredentialManager/Services/IKeyAssociationService.cs index b2df86d..96607f3 100644 --- a/CredentialManager/Services/IKeyAssociationService.cs +++ b/CredentialManager/Services/IKeyAssociationService.cs @@ -112,6 +112,16 @@ public interface IKeyAssociationService /// Task FindAssociationByKeyValueParallelAsync(string keyValue); + /// + /// Versione bulk: ricerca in un colpo solo tutte le associazioni attive per la combinazione + /// (KeyValue ∈ keyValues, DestinationEntity, RestCredentialName) usando una query SQL IN(...). + /// Riduce drasticamente le query SQLite quando si processano molti record. + /// + Task> FindAssociationsByKeyValuesBulkAsync( + IEnumerable keyValues, + string destinationEntity, + string restCredentialName); + /// /// Versione thread-safe per operazioni parallele - Elimina associazione /// diff --git a/CredentialManager/Services/KeyAssociationService.cs b/CredentialManager/Services/KeyAssociationService.cs index 93ed899..f80d2f6 100644 --- a/CredentialManager/Services/KeyAssociationService.cs +++ b/CredentialManager/Services/KeyAssociationService.cs @@ -341,9 +341,9 @@ public class KeyAssociationService : IKeyAssociationService var options = new DbContextOptionsBuilder() .UseSqlite(_context.Database.GetConnectionString()) .Options; - + using var parallelContext = new CredentialDbContext(options); - + try { return await parallelContext.KeyAssociations @@ -358,6 +358,63 @@ public class KeyAssociationService : IKeyAssociationService } } + /// + /// Bulk lookup delle associazioni: una sola query con WHERE KeyValue IN (...). + /// Per N chiavi sostituisce fino a 2N query SQLite del flusso per-record. + /// + public async Task> FindAssociationsByKeyValuesBulkAsync( + IEnumerable keyValues, + string destinationEntity, + string restCredentialName) + { + var distinctKeys = keyValues + .Where(k => !string.IsNullOrEmpty(k)) + .Distinct() + .ToList(); + + if (distinctKeys.Count == 0) + return new Dictionary(StringComparer.Ordinal); + + try + { + // SQLite ha un limite hardcoded di ~999 parametri per query: chunk per sicurezza. + const int chunkSize = 500; + var result = new Dictionary(StringComparer.Ordinal); + + for (int i = 0; i < distinctKeys.Count; i += chunkSize) + { + var chunk = distinctKeys.Skip(i).Take(chunkSize).ToList(); + + var associations = await _context.KeyAssociations + .AsNoTracking() + .Where(ka => ka.IsActive && + ka.DestinationEntity == destinationEntity && + ka.RestCredentialName == restCredentialName && + chunk.Contains(ka.KeyValue)) + .ToListAsync(); + + // Se ci sono duplicati (KeyValue ripetuto), tieni il più recente + foreach (var assoc in associations + .GroupBy(a => a.KeyValue) + .Select(g => g.OrderByDescending(a => a.UpdatedAt ?? a.CreatedAt).First())) + { + result[assoc.KeyValue] = assoc; + } + } + + _logger.LogDebug("BULK: Ricerca associazioni completata - {Found}/{Total} match per {Entity}/{Credential}", + result.Count, distinctKeys.Count, destinationEntity, restCredentialName); + + return result; + } + catch (Exception ex) + { + _logger.LogError(ex, "BULK: Errore nella ricerca bulk delle associazioni ({Count} chiavi, Entity={Entity})", + distinctKeys.Count, destinationEntity); + throw; + } + } + /// /// Versione thread-safe per operazioni parallele - Delete association /// diff --git a/DataConnection/CredentialManagement/Interfaces/IDataConnectionCredentialService.cs b/DataConnection/CredentialManagement/Interfaces/IDataConnectionCredentialService.cs index f7ad0b0..7b57b0b 100644 --- a/DataConnection/CredentialManagement/Interfaces/IDataConnectionCredentialService.cs +++ b/DataConnection/CredentialManagement/Interfaces/IDataConnectionCredentialService.cs @@ -85,6 +85,14 @@ public interface IDataConnectionCredentialService Task FindKeyAssociationByValueParallelAsync(string keyValue); Task DeleteKeyAssociationParallelAsync(int id); + /// + /// Bulk lookup associazioni - una sola query SQLite per N chiavi. + /// + Task> FindKeyAssociationsByValuesBulkAsync( + IEnumerable keyValues, + string destinationEntity, + string restCredentialName); + // Deletion synchronization operations Task MarkDeletedAssociationsAsync(List sourceKeyValues, string destinationEntity, string restCredentialName); Task> GetPendingDeletionsAsync(string destinationEntity, string restCredentialName); diff --git a/DataConnection/CredentialManagement/Services/DataConnectionCredentialService.cs b/DataConnection/CredentialManagement/Services/DataConnectionCredentialService.cs index e1ff1cc..40808ca 100644 --- a/DataConnection/CredentialManagement/Services/DataConnectionCredentialService.cs +++ b/DataConnection/CredentialManagement/Services/DataConnectionCredentialService.cs @@ -168,16 +168,7 @@ public class DataConnectionCredentialService : IDataConnectionCredentialService if (credential == null) throw new InvalidOperationException($"REST API credential '{credentialName}' not found"); - var options = new DataConnection.REST.Configuration.RestServiceOptions - { - BaseUrl = credential.BaseUrl, - ApiKey = credential.ApiKey, - Username = credential.Username, - Password = credential.Password, - AuthToken = credential.AuthToken, - TimeoutSeconds = credential.TimeoutSeconds, - IgnoreSslErrors = credential.IgnoreSslErrors - }; + var options = BuildRestServiceOptions(credential); _logger.LogDebug("Created RestServiceOptions for credential: {Name} ({BaseUrl})", credentialName, credential.BaseUrl); @@ -191,19 +182,42 @@ public class DataConnectionCredentialService : IDataConnectionCredentialService if (credential == null) throw new InvalidOperationException($"REST API credential with ID '{credentialId}' not found"); + var options = BuildRestServiceOptions(credential); + + _logger.LogDebug("Created RestServiceOptions for credential ID: {Id} ({BaseUrl})", + credentialId, credential.BaseUrl); + return options; + } + + private static DataConnection.REST.Configuration.RestServiceOptions BuildRestServiceOptions(RestApiCredential credential) + { var options = new DataConnection.REST.Configuration.RestServiceOptions { BaseUrl = credential.BaseUrl, - ApiKey = credential.ApiKey, Username = credential.Username, Password = credential.Password, - AuthToken = credential.AuthToken, TimeoutSeconds = credential.TimeoutSeconds, IgnoreSslErrors = credential.IgnoreSslErrors }; - _logger.LogDebug("Created RestServiceOptions for credential ID: {Id} ({BaseUrl})", - credentialId, credential.BaseUrl); + // Mapping coerente con DataConnectionFactory.CreateRestServiceClientAsync + switch (credential.ServiceType) + { + case RestServiceType.Salesforce: + options.ApiKey = credential.ClientId; + options.AuthToken = credential.ClientSecret; + options.SalesforceGrantType = credential.GrantType; + break; + case RestServiceType.SapB1ServiceLayer: + options.ApiKey = credential.CompanyDatabase; + options.AuthToken = credential.AuthToken; + break; + default: + options.ApiKey = credential.ApiKey; + options.AuthToken = credential.AuthToken; + break; + } + return options; } @@ -550,8 +564,8 @@ public class DataConnectionCredentialService : IDataConnectionCredentialService { try { - _logger.LogInformation("Testing Salesforce authentication for {Name} ({BaseUrl})", - credential.Name, credential.BaseUrl); + _logger.LogInformation("Testing Salesforce authentication for {Name} ({BaseUrl}, GrantType={GrantType})", + credential.Name, credential.BaseUrl, credential.GrantType); _logger.LogDebug("Salesforce credential details: Username={Username}, HasPassword={HasPassword}, HasSecurityToken={HasSecurityToken}, HasClientId={HasClientId}, HasClientSecret={HasClientSecret}", credential.Username, !string.IsNullOrEmpty(credential.Password), !string.IsNullOrEmpty(credential.SecurityToken), @@ -560,49 +574,69 @@ public class DataConnectionCredentialService : IDataConnectionCredentialService using var httpClient = new HttpClient(); httpClient.Timeout = TimeSpan.FromSeconds(credential.TimeoutSeconds); - // Test di autenticazione OAuth2 var tokenUrl = credential.BaseUrl.TrimEnd('/') + "/services/oauth2/token"; - var tokenData = new List> - { - new("grant_type", "password"), - new("username", credential.Username ?? "") - }; + List> tokenData; + string flowLabel; - // Aggiungiamo password + security token se disponibile - var password = credential.Password ?? ""; - if (!string.IsNullOrEmpty(credential.SecurityToken)) + if (credential.GrantType == CredentialManager.Models.SalesforceGrantType.ClientCredentials) { - password += credential.SecurityToken; - } - tokenData.Add(new("password", password)); + // Client Credentials flow — server-to-server, no user + if (string.IsNullOrEmpty(credential.ClientId) || string.IsNullOrEmpty(credential.ClientSecret)) + { + return (false, "Flusso client_credentials richiede ClientId e ClientSecret configurati."); + } - // Aggiungiamo client credentials se disponibili - if (!string.IsNullOrEmpty(credential.ClientId)) - { - tokenData.Add(new("client_id", credential.ClientId)); + tokenData = new List> + { + new("grant_type", "client_credentials"), + new("client_id", credential.ClientId), + new("client_secret", credential.ClientSecret) + }; + flowLabel = "client_credentials"; } - if (!string.IsNullOrEmpty(credential.ClientSecret)) + else { - tokenData.Add(new("client_secret", credential.ClientSecret)); + // Password flow (default) + var password = credential.Password ?? ""; + if (!string.IsNullOrEmpty(credential.SecurityToken)) + { + password += credential.SecurityToken; + } + + tokenData = new List> + { + new("grant_type", "password"), + new("username", credential.Username ?? ""), + new("password", password) + }; + + if (!string.IsNullOrEmpty(credential.ClientId)) + { + tokenData.Add(new("client_id", credential.ClientId)); + } + if (!string.IsNullOrEmpty(credential.ClientSecret)) + { + tokenData.Add(new("client_secret", credential.ClientSecret)); + } + flowLabel = "password"; } - _logger.LogDebug("Posting to Salesforce token URL: {TokenUrl}", tokenUrl); + _logger.LogDebug("Posting to Salesforce token URL: {TokenUrl} (flow={Flow})", tokenUrl, flowLabel); var tokenContent = new FormUrlEncodedContent(tokenData); var response = await httpClient.PostAsync(tokenUrl, tokenContent); if (response.IsSuccessStatusCode) { - var responseContent = await response.Content.ReadAsStringAsync(); - _logger.LogInformation("Salesforce authentication successful for {Name}", credential.Name); - return (true, $"Autenticazione Salesforce riuscita!\n\nDettagli:\n- Login URL: {credential.BaseUrl}\n- API Version: {credential.ApiVersion}\n- Sandbox: {credential.IsSandbox}\n- Tipo Auth: OAuth2\n- Timeout: {credential.TimeoutSeconds}s"); + _logger.LogInformation("Salesforce authentication ({Flow}) successful for {Name}", flowLabel, credential.Name); + return (true, $"Autenticazione Salesforce riuscita!\n\nDettagli:\n- Login URL: {credential.BaseUrl}\n- API Version: {credential.ApiVersion}\n- Sandbox: {credential.IsSandbox}\n- Tipo Auth: OAuth2 ({flowLabel})\n- Timeout: {credential.TimeoutSeconds}s"); } else { var errorContent = await response.Content.ReadAsStringAsync(); - _logger.LogWarning("Salesforce authentication failed for {Name}. Status: {StatusCode}, Response: {Response}", - credential.Name, response.StatusCode, errorContent); - return (false, $"Autenticazione Salesforce fallita. Status: {response.StatusCode}\nDettagli: {errorContent}"); + _logger.LogWarning("Salesforce authentication ({Flow}) failed for {Name}. Status: {StatusCode}, Response: {Response}", + flowLabel, credential.Name, response.StatusCode, errorContent); + return (false, $"Autenticazione Salesforce ({flowLabel}) fallita. Status: {response.StatusCode}\nDettagli: {errorContent}"); } } catch (HttpRequestException ex) @@ -1074,6 +1108,14 @@ public class DataConnectionCredentialService : IDataConnectionCredentialService return await _keyAssociationService.FindAssociationByKeyValueParallelAsync(keyValue); } + public async Task> FindKeyAssociationsByValuesBulkAsync( + IEnumerable keyValues, + string destinationEntity, + string restCredentialName) + { + return await _keyAssociationService.FindAssociationsByKeyValuesBulkAsync(keyValues, destinationEntity, restCredentialName); + } + public async Task DeleteKeyAssociationParallelAsync(int id) { return await _keyAssociationService.DeleteAssociationParallelAsync(id); diff --git a/DataConnection/DB/EF/DatabaseSchemaProviderFactory.cs b/DataConnection/DB/EF/DatabaseSchemaProviderFactory.cs index 2726047..b93fae0 100644 --- a/DataConnection/DB/EF/DatabaseSchemaProviderFactory.cs +++ b/DataConnection/DB/EF/DatabaseSchemaProviderFactory.cs @@ -19,7 +19,10 @@ public class DatabaseSchemaProviderFactory { return databaseType switch { - DatabaseType.SqlServer => new SqlServerSchemaProvider(), DatabaseType.Odbc => new OdbcSchemaProvider(), // Aggiungere qui altri provider quando implementati + DatabaseType.SqlServer => new SqlServerSchemaProvider(), + DatabaseType.Odbc => new OdbcSchemaProvider(), + DatabaseType.OleDb => new OleDbSchemaProvider(), + // Aggiungere qui altri provider quando implementati // DatabaseType.MySql => new MySqlSchemaProvider(), // DatabaseType.PostgreSql => new PostgreSqlSchemaProvider(), // DatabaseType.Oracle => new OracleSchemaProvider(), diff --git a/DataConnection/REST/Implementations/SalesforceServiceClient.cs b/DataConnection/REST/Implementations/SalesforceServiceClient.cs index f4791a6..5295ada 100644 --- a/DataConnection/REST/Implementations/SalesforceServiceClient.cs +++ b/DataConnection/REST/Implementations/SalesforceServiceClient.cs @@ -126,13 +126,19 @@ namespace DataConnection.REST.Implementations { _accessToken = tokenResponse.AccessToken; _instanceUrl = tokenResponse.InstanceUrl; - _tokenExpiry = DateTime.UtcNow.AddSeconds(3600); // Salesforce doesn't always return expires_in + + // Se Salesforce restituisce expires_in (es. client_credentials), usalo con un margine di 60s; + // altrimenti (password flow non restituisce expires_in) fallback al default 1h conservativo. + var ttlSeconds = tokenResponse.ExpiresIn.HasValue && tokenResponse.ExpiresIn.Value > 60 + ? tokenResponse.ExpiresIn.Value - 60 + : 3600; + _tokenExpiry = DateTime.UtcNow.AddSeconds(ttlSeconds); _httpClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", _accessToken); - _logger.LogInformation("Salesforce authentication ({Flow}) successful. InstanceUrl={InstanceUrl}, TokenExpiry={Expiry}", - flowName, _instanceUrl, _tokenExpiry.ToLocalTime()); + _logger.LogInformation("Salesforce authentication ({Flow}) successful. InstanceUrl={InstanceUrl}, TokenExpiry={Expiry} (TTL {Ttl}s, server expires_in={ExpiresIn})", + flowName, _instanceUrl, _tokenExpiry.ToLocalTime(), ttlSeconds, tokenResponse.ExpiresIn?.ToString() ?? "null"); return true; } @@ -1616,6 +1622,13 @@ namespace DataConnection.REST.Implementations [JsonPropertyName("signature")] public string Signature { get; set; } = string.Empty; + + /// + /// Durata di validità del token in secondi. Presente solo in alcuni flussi OAuth Salesforce + /// (es. client_credentials, JWT bearer); assente in altri (es. username/password) — in tal caso 0/null. + /// + [JsonPropertyName("expires_in")] + public int? ExpiresIn { get; set; } } private class SalesforceSObjectsResponse @@ -2161,5 +2174,157 @@ namespace DataConnection.REST.Implementations }).ToList(); } } + + /// + /// Elimina N record SObject in batch tramite Salesforce Composite API. + /// Riduce N HTTP calls a ceil(N/25), eseguite in parallelo. + /// + /// Nome SObject (es. "Account"). + /// Lista di Id da eliminare. + /// Cancellation token. + /// Risultato per ogni Id (Success/ErrorMessage). + public async Task> BatchDeleteEntitiesAsync( + string entityName, + List entityIds, + CancellationToken cancellationToken = default) + { + if (!IsAuthenticated()) + { + _logger.LogDebug("Error: Not authenticated to Salesforce. Cannot perform batch delete."); + return new List(); + } + + if (entityIds == null || entityIds.Count == 0) + return new List(); + + const int maxBatchSize = 25; + + var batches = new List<(List batch, int startIndex, int batchNumber)>(); + for (int i = 0; i < entityIds.Count; i += maxBatchSize) + { + var chunk = entityIds.Skip(i).Take(maxBatchSize).ToList(); + batches.Add((chunk, i, (i / maxBatchSize) + 1)); + } + + _logger.LogDebug($"--- BatchDelete: {entityIds.Count} record in {batches.Count} batch (parallel) ---"); + + var batchTasks = batches.Select(async b => + await ExecuteDeleteBatchAsync(entityName, b.batch, b.startIndex, cancellationToken)); + + var batchResults = await Task.WhenAll(batchTasks); + + var allResults = new List(); + foreach (var r in batchResults) allResults.AddRange(r); + + _logger.LogDebug($"All delete batches completed: {allResults.Count(r => r.Success)} success, {allResults.Count(r => !r.Success)} failed"); + return allResults; + } + + private async Task> ExecuteDeleteBatchAsync( + string entityName, + List batch, + int startIndex, + CancellationToken cancellationToken) + { + try + { + var compositeUri = $"{_instanceUrl}/services/data/v60.0/composite/"; + + var compositeRequest = new SalesforceCompositeRequest(); + for (int i = 0; i < batch.Count; i++) + { + var entityId = batch[i]; + compositeRequest.CompositeRequest.Add(new SalesforceCompositeSubRequest + { + Method = "DELETE", + Url = $"/services/data/v60.0/sobjects/{entityName}/{entityId}", + ReferenceId = $"delete_{startIndex + i}" + // Body intenzionalmente null per DELETE + }); + } + + var jsonContent = new StringContent( + JsonSerializer.Serialize(compositeRequest, SalesforceJsonOptions), + System.Text.Encoding.UTF8, + "application/json"); + + var response = await _httpClient.PostAsync(compositeUri, jsonContent, cancellationToken); + + if (!response.IsSuccessStatusCode) + { + var errorContent = await response.Content.ReadAsStringAsync(cancellationToken); + _logger.LogDebug($"Salesforce Batch Delete failed: {response.StatusCode} - {errorContent}"); + + return batch.Select((id, idx) => new CompositeOperationResult + { + ReferenceId = $"delete_{startIndex + idx}", + EntityId = id, + Success = false, + ErrorMessage = $"Batch operation failed: {response.StatusCode} - {errorContent}" + }).ToList(); + } + + var responseContent = await response.Content.ReadAsStringAsync(cancellationToken); + var compositeResponse = JsonSerializer.Deserialize(responseContent, SalesforceJsonOptions); + + var results = new List(); + if (compositeResponse?.CompositeResponse != null) + { + for (int i = 0; i < compositeResponse.CompositeResponse.Count; i++) + { + var sub = compositeResponse.CompositeResponse[i]; + var originalId = i < batch.Count ? batch[i] : string.Empty; + + var result = new CompositeOperationResult + { + ReferenceId = sub.ReferenceId, + EntityId = originalId, + HttpStatusCode = sub.HttpStatusCode, + // 204 No Content è la risposta di successo standard per DELETE + Success = sub.HttpStatusCode >= 200 && sub.HttpStatusCode < 300 + }; + + if (!result.Success) + result.ErrorMessage = sub.Body?.ToString() ?? "Unknown error"; + + results.Add(result); + } + } + + return results; + } + catch (Exception ex) + { + _logger.LogDebug($"Error during Salesforce batch delete: {ex.Message}"); + return batch.Select((id, idx) => new CompositeOperationResult + { + ReferenceId = $"delete_{startIndex + idx}", + EntityId = id, + Success = false, + ErrorMessage = ex.Message + }).ToList(); + } + } + + + + /// + /// Bulk PATCH per aggiornare un singolo campo (es. campo di tombstone/mark-as-deleted) su N record. + /// + public Task> BatchPatchSingleFieldAsync( + string entityName, + IEnumerable entityIds, + string fieldName, + object fieldValue, + CancellationToken cancellationToken = default) + { + var updates = entityIds + .Where(id => !string.IsNullOrEmpty(id)) + .ToDictionary( + id => id, + _ => (Dictionary)new Dictionary { { fieldName, fieldValue } }); + + return BatchUpdateEntitiesAsync(entityName, updates, cancellationToken); + } } } diff --git a/Data_Coupler/Extensions/DataCoupler/DatabaseMethod.cs b/Data_Coupler/Extensions/DataCoupler/DatabaseMethod.cs index 92d8efb..108f007 100644 --- a/Data_Coupler/Extensions/DataCoupler/DatabaseMethod.cs +++ b/Data_Coupler/Extensions/DataCoupler/DatabaseMethod.cs @@ -70,15 +70,26 @@ public partial class DataCoupler : ComponentBase /// /// Verifica se la credenziale database selezionata è di tipo ODBC /// - /// True se la credenziale è ODBC, altrimenti False protected bool IsOdbcConnection() { if (string.IsNullOrEmpty(selectedDatabaseCredential)) return false; - + var credential = databaseCredentials.FirstOrDefault(c => c.Name == selectedDatabaseCredential); return credential?.DatabaseType == DatabaseType.Odbc; } + + /// + /// Verifica se la credenziale database selezionata è di tipo OLE DB + /// + protected bool IsOleDbConnection() + { + if (string.IsNullOrEmpty(selectedDatabaseCredential)) + return false; + + var credential = databaseCredentials.FirstOrDefault(c => c.Name == selectedDatabaseCredential); + return credential?.DatabaseType == DatabaseType.OleDb; + } /// /// Gestisce il cambio di credenziale database selezionata @@ -621,11 +632,12 @@ public partial class DataCoupler : ComponentBase return; } - // Per ODBC, crea un database manager temporaneo se non esiste + // Per ODBC e OLE DB, crea un database manager temporaneo se non esiste var managerToUse = currentDatabaseManager; - if (managerToUse == null && IsOdbcConnection()) + if (managerToUse == null && (IsOdbcConnection() || IsOleDbConnection())) { - Logger.LogInformation("Creando database manager temporaneo per validazione query ODBC"); + Logger.LogInformation("Creando database manager temporaneo per validazione query {Type}", + IsOdbcConnection() ? "ODBC" : "OLE DB"); tempManager = await ConnectionFactory.CreateDatabaseManagerAsync(selectedDatabaseCredential); managerToUse = tempManager; } @@ -661,8 +673,8 @@ public partial class DataCoupler : ComponentBase Logger.LogInformation("Query validata con successo: {ColumnCount} colonne", queryColumns.Count); - // Per ODBC, salva il manager se non era già presente - if (IsOdbcConnection() && currentDatabaseManager == null && tempManager != null) + // Per ODBC e OLE DB, salva il manager temporaneo per riuso + if ((IsOdbcConnection() || IsOleDbConnection()) && currentDatabaseManager == null && tempManager != null) { currentDatabaseManager = tempManager; tempManager = null; // Non distruggerlo nel finally @@ -747,6 +759,7 @@ public partial class DataCoupler : ComponentBase return databaseType switch { DatabaseType.SqlServer => $"SELECT TOP {limit} * FROM ({baseQuery}) AS subquery", + DatabaseType.OleDb => $"SELECT TOP {limit} * FROM ({baseQuery}) AS subquery", DatabaseType.Oracle => $"SELECT * FROM ({baseQuery}) WHERE ROWNUM <= {limit}", DatabaseType.MySql => $"{baseQuery} LIMIT {limit}", DatabaseType.PostgreSql => $"{baseQuery} LIMIT {limit}", diff --git a/Data_Coupler/Pages/DataCoupler.razor.cs b/Data_Coupler/Pages/DataCoupler.razor.cs index 6d3361e..19b3f11 100644 --- a/Data_Coupler/Pages/DataCoupler.razor.cs +++ b/Data_Coupler/Pages/DataCoupler.razor.cs @@ -3491,10 +3491,42 @@ public partial class DataCoupler : ComponentBase // Crea lista indicizzata per mantenere il record number var indexedRecords = records.Select((record, index) => new { Record = record, RecordNumber = index + 1 }).ToList(); - Logger.LogInformation("COMPOSITE: Inizio analisi parallela di {RecordCount} record", indexedRecords.Count); + Logger.LogInformation("COMPOSITE: Inizio analisi di {RecordCount} record", indexedRecords.Count); var analysisStartTime = DateTime.UtcNow; - // Processa tutti i record in parallelo + // === STEP A: Bulk Pre-Discovery (1 query SQLite + poche SOQL IN invece di 2N+N) === + var sourceKeysForBulk = new List(indexedRecords.Count); + foreach (var idx in indexedRecords) + { + var key = GenerateSourceKey(idx.Record); + if (!string.IsNullOrEmpty(key)) + sourceKeysForBulk.Add(key); + } + + Dictionary associationsByKey = new(StringComparer.Ordinal); + if (currentUseRecordAssociations && !string.IsNullOrEmpty(currentSourceKeyField) && sourceKeysForBulk.Count > 0) + { + var commonRequest = new PreDiscoveryRequest + { + SourceKeyField = currentSourceKeyField, + DestinationEntity = currentEntityName, + CredentialName = currentCredentialName, + DestinationKeyField = GetEntityIdField(), + FieldMappings = currentFieldMappings, + RestClient = currentRestClient, + EnablePreDiscovery = true, + UseParallelMethod = true, + IsScheduledTransfer = false + }; + + associationsByKey = await AssociationService.BatchFindOrCreateAssociationsAsync( + sourceKeysForBulk, commonRequest); + + Logger.LogInformation("COMPOSITE: Bulk Pre-Discovery completata - {Found}/{Total} associazioni risolte", + associationsByKey.Count, sourceKeysForBulk.Count); + } + + // === STEP B: Analisi locale parallela (no I/O) === var processingTasks = indexedRecords.Select(async indexedRecord => { try @@ -3513,48 +3545,7 @@ public partial class DataCoupler : ComponentBase // Analizza le associazioni per capire se aggiornare, creare o saltare if (currentUseRecordAssociations && !string.IsNullOrEmpty(sourceKey)) { - Logger.LogDebug("COMPOSITE PARALLEL: Cerco associazione per KeyValue: '{KeyValue}', Entity: '{Entity}', Credential: '{Credential}'", - sourceKey, currentEntityName, currentCredentialName); - - // Usa i metodi paralleli per le operazioni di database - var existingAssociation = await CredentialService.FindKeyAssociationByValueParallelAsync( - sourceKey, currentEntityName, currentCredentialName); - - // FALLBACK: Se non troviamo l'associazione con tutti i parametri, proviamo solo con il KeyValue - if (existingAssociation == null) - { - existingAssociation = await CredentialService.FindKeyAssociationByValueParallelAsync(sourceKey); - if (existingAssociation != null) - { - // Verifica compatibilità - if (existingAssociation.DestinationEntity != currentEntityName || - existingAssociation.RestCredentialName != currentCredentialName) - { - existingAssociation = null; - } - } - } - - // 🔍 PRE-DISCOVERY: Usa il servizio centralizzato - if (existingAssociation == null) - { - var preDiscoveryRequest = new PreDiscoveryRequest - { - SourceKey = sourceKey, - SourceKeyField = currentSourceKeyField, - DestinationEntity = currentEntityName, - CredentialName = currentCredentialName, - DestinationKeyField = GetEntityIdField(), - FieldMappings = currentFieldMappings, - RestClient = currentRestClient, - CurrentDataHash = currentDataHash, - EnablePreDiscovery = true, - UseParallelMethod = true, // Usa metodi paralleli thread-safe - IsScheduledTransfer = false - }; - - existingAssociation = await AssociationService.FindOrCreateAssociationAsync(preDiscoveryRequest); - } + associationsByKey.TryGetValue(sourceKey, out var existingAssociation); if (existingAssociation != null && existingAssociation.IsActive) { diff --git a/Data_Coupler/Services/AssociationService.cs b/Data_Coupler/Services/AssociationService.cs index 3f72f04..3c4ecb1 100644 --- a/Data_Coupler/Services/AssociationService.cs +++ b/Data_Coupler/Services/AssociationService.cs @@ -1,11 +1,13 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Text; using System.Text.Json; using System.Threading.Tasks; using CredentialManager.Models; using CredentialManager.Services; using DataConnection.CredentialManagement.Interfaces; +using DataConnection.REST.Implementations; using DataConnection.REST.Interfaces; using Microsoft.Extensions.Logging; @@ -69,6 +71,227 @@ public class AssociationService : IAssociationService return null; } + /// + /// Versione bulk del find-or-create — vedi . + /// + public async Task> BatchFindOrCreateAssociationsAsync( + IEnumerable sourceKeys, + PreDiscoveryRequest commonRequest) + { + if (commonRequest == null) + throw new ArgumentNullException(nameof(commonRequest)); + + var distinctKeys = sourceKeys + .Where(k => !string.IsNullOrEmpty(k)) + .Distinct() + .ToList(); + + var result = new Dictionary(StringComparer.Ordinal); + if (distinctKeys.Count == 0) + return result; + + // STEP 1 — Bulk lookup nel DB locale (1 query SQLite invece di N) + Dictionary localMatches; + try + { + localMatches = await _credentialService.FindKeyAssociationsByValuesBulkAsync( + distinctKeys, commonRequest.DestinationEntity, commonRequest.CredentialName); + } + catch (Exception ex) + { + _logger.LogError(ex, "BULK PRE-DISCOVERY: bulk lookup locale fallito, fallback per-record"); + // Fallback per-record (mantiene comportamento esistente in caso di problemi) + foreach (var key in distinctKeys) + { + var req = ClonePreDiscoveryRequest(commonRequest, key); + var found = await FindOrCreateAssociationAsync(req); + if (found != null) result[key] = found; + } + return result; + } + + foreach (var kvp in localMatches) + result[kvp.Key] = kvp.Value; + + var missingKeys = distinctKeys.Where(k => !result.ContainsKey(k)).ToList(); + + _logger.LogInformation("BULK PRE-DISCOVERY: {Local}/{Total} associazioni trovate localmente, {Missing} da cercare nella destinazione", + localMatches.Count, distinctKeys.Count, missingKeys.Count); + + if (missingKeys.Count == 0 || !commonRequest.EnablePreDiscovery) + return result; + + // STEP 2 — Pre-Discovery batched sulla destinazione REST + // Verifica che il campo chiave sia mappato + if (string.IsNullOrEmpty(commonRequest.SourceKeyField) || + !commonRequest.FieldMappings.TryGetValue(commonRequest.SourceKeyField, out var mappedField)) + { + _logger.LogWarning("BULK PRE-DISCOVERY: campo chiave '{SourceKeyField}' non mappato; skip discovery", + commonRequest.SourceKeyField); + return result; + } + + // Solo SalesforceServiceClient supporta SOQL IN ottimizzata; per altri client si ricade al per-record. + if (commonRequest.RestClient is SalesforceServiceClient sfClient) + { + var discovered = await PerformBulkPreDiscoverySalesforceAsync( + sfClient, missingKeys, mappedField, commonRequest); + + foreach (var kvp in discovered) + result[kvp.Key] = kvp.Value; + } + else + { + _logger.LogDebug("BULK PRE-DISCOVERY: client REST non Salesforce, fallback per-record per {Count} chiavi", missingKeys.Count); + foreach (var key in missingKeys) + { + var req = ClonePreDiscoveryRequest(commonRequest, key); + var found = await PerformPreDiscoveryAsync(req); + if (found != null) result[key] = found; + } + } + + return result; + } + + /// + /// Pre-Discovery batched specifica per Salesforce: usa SOQL WHERE field IN (...) + /// per recuperare in pochissime chiamate API tutti i record che matchano una qualsiasi delle chiavi mancanti. + /// + private async Task> PerformBulkPreDiscoverySalesforceAsync( + SalesforceServiceClient sfClient, + List missingKeys, + string mappedDestinationField, + PreDiscoveryRequest commonRequest) + { + var output = new Dictionary(StringComparer.Ordinal); + + // Chunk per stare sotto il limite SOQL/URL (~16 KB GET): ~200 valori per query + const int chunkSize = 200; + var queries = new List(); + for (int i = 0; i < missingKeys.Count; i += chunkSize) + { + var chunk = missingKeys.Skip(i).Take(chunkSize).ToList(); + + var sb = new StringBuilder(); + sb.Append("SELECT Id, "); + sb.Append(mappedDestinationField); + sb.Append(" FROM "); + sb.Append(commonRequest.DestinationEntity); + sb.Append(" WHERE "); + sb.Append(mappedDestinationField); + sb.Append(" IN ("); + sb.Append(string.Join(",", chunk.Select(v => $"'{v.Replace("'", "\\'")}'"))); + sb.Append(')'); + + queries.Add(sb.ToString()); + } + + _logger.LogInformation("BULK PRE-DISCOVERY: {QueryCount} SOQL IN-query (~{ChunkSize} chiavi/query, Composite API ceil(N/25) HTTP call)", + queries.Count, chunkSize); + + // BatchExecuteQueriesAsync raggruppa fino a 25 query in 1 Composite request + var batchResults = await sfClient.BatchExecuteQueriesAsync(queries); + + // Indicizza i risultati per chiave: dal record letto leggiamo il valore di mappedDestinationField + var entityIdByKey = new Dictionary(StringComparer.Ordinal); + foreach (var batchResult in batchResults.Where(r => r.Success && r.Records != null)) + { + foreach (var record in batchResult.Records) + { + if (!record.TryGetValue(mappedDestinationField, out var keyVal) || keyVal == null) + continue; + + var keyStr = keyVal.ToString(); + if (string.IsNullOrEmpty(keyStr)) + continue; + + var idStr = ExtractDestinationId(record); + if (string.IsNullOrEmpty(idStr)) + continue; + + // In caso di duplicati in Salesforce, prendiamo il primo + if (!entityIdByKey.ContainsKey(keyStr)) + entityIdByKey[keyStr] = idStr; + } + } + + _logger.LogInformation("BULK PRE-DISCOVERY: trovati {Found}/{Missing} record esistenti nella destinazione", + entityIdByKey.Count, missingKeys.Count); + + if (entityIdByKey.Count == 0) + return output; + + // Salvataggio associazioni Pre-Discovery in parallelo + var saveTasks = entityIdByKey.Select(async kvp => + { + try + { + var newAssoc = new KeyAssociation + { + KeyValue = kvp.Key, + SourceKeyField = commonRequest.SourceKeyField, + DestinationKeyField = commonRequest.DestinationKeyField ?? "Id", + MappedDestinationField = mappedDestinationField, + DestinationEntity = commonRequest.DestinationEntity, + DestinationId = kvp.Value, + RestCredentialName = commonRequest.CredentialName, + CreatedAt = DateTime.UtcNow, + LastVerifiedAt = DateTime.UtcNow, + IsActive = true, + AdditionalInfo = JsonSerializer.Serialize(new Dictionary + { + { "CreatedBy", "PreDiscovery" }, + { "DiscoveredAt", DateTime.UtcNow }, + { "MappingCount", commonRequest.FieldMappings.Count }, + { "BulkPreDiscovery", true }, + { "ScheduledTransfer", commonRequest.IsScheduledTransfer }, + { "SourceType", commonRequest.SourceType ?? string.Empty } + }) + }; + + var id = commonRequest.UseParallelMethod + ? await _credentialService.SaveKeyAssociationParallelAsync(newAssoc) + : await _credentialService.SaveKeyAssociationAsync(newAssoc); + + newAssoc.Id = id; + return (kvp.Key, newAssoc); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "BULK PRE-DISCOVERY: errore nel salvataggio associazione per KeyValue '{KeyValue}'", kvp.Key); + return (kvp.Key, (KeyAssociation?)null); + } + }); + + var savedResults = await Task.WhenAll(saveTasks); + foreach (var (key, assoc) in savedResults) + { + if (assoc != null) output[key] = assoc; + } + + return output; + } + + private static PreDiscoveryRequest ClonePreDiscoveryRequest(PreDiscoveryRequest source, string sourceKey) + { + return new PreDiscoveryRequest + { + SourceKey = sourceKey, + SourceKeyField = source.SourceKeyField, + DestinationEntity = source.DestinationEntity, + CredentialName = source.CredentialName, + DestinationKeyField = source.DestinationKeyField, + FieldMappings = source.FieldMappings, + RestClient = source.RestClient, + CurrentDataHash = source.CurrentDataHash, + EnablePreDiscovery = source.EnablePreDiscovery, + UseParallelMethod = source.UseParallelMethod, + IsScheduledTransfer = source.IsScheduledTransfer, + SourceType = source.SourceType + }; + } + /// /// Verifica se un'associazione è stata creata dal Pre-Discovery /// controllando il campo AdditionalInfo @@ -285,6 +508,22 @@ public interface IAssociationService { Task FindOrCreateAssociationAsync(PreDiscoveryRequest request); bool IsPreDiscoveryAssociation(KeyAssociation association); + + /// + /// Versione bulk del find-or-create. + /// 1) Una sola query SQLite (WHERE KeyValue IN …) per recuperare le associazioni esistenti. + /// 2) Per le chiavi non trovate localmente, una manciata di SOQL "IN" su Salesforce + /// (~200 chiavi per query, Composite API: ceil(K/25) HTTP call) invece di K chiamate singole. + /// 3) Le associazioni Pre-Discovery scoperte vengono salvate e restituite. + /// + /// Lista (non vuota) dei valori chiave sorgente per tutti i record da analizzare. + /// Parametri condivisi (entity, credential, restClient, mappings, ecc.). + /// e + /// sono ignorati; vengono presi dal parametro . + /// Dizionario KeyValue → KeyAssociation (solo per chiavi trovate/create). + Task> BatchFindOrCreateAssociationsAsync( + IEnumerable sourceKeys, + PreDiscoveryRequest commonRequest); } /// diff --git a/Data_Coupler/Services/DeletionSyncService.cs b/Data_Coupler/Services/DeletionSyncService.cs index f011275..8a64760 100644 --- a/Data_Coupler/Services/DeletionSyncService.cs +++ b/Data_Coupler/Services/DeletionSyncService.cs @@ -1,5 +1,6 @@ using CredentialManager.Models; using DataConnection.CredentialManagement.Interfaces; +using DataConnection.REST.Implementations; using DataConnection.REST.Interfaces; using Microsoft.Extensions.Logging; @@ -79,76 +80,18 @@ public class DeletionSyncService : IDeletionSyncService _logger.LogInformation("Trovate {Count} cancellazioni in attesa di sincronizzazione", pendingDeletions.Count); - // Step 3: Esegui le cancellazioni nella destinazione - foreach (var deletion in pendingDeletions) + // Step 3: Esegui le cancellazioni nella destinazione. + // Per Salesforce usiamo le Composite API in batch (ceil(N/25) HTTP call invece di N); + // per gli altri client REST manteniamo il loop sequenziale (nessun batch supportato). + if (restClient is SalesforceServiceClient salesforceClient) { - try - { - bool syncSuccess = false; - string errorMessage = ""; - - switch (options.Action) - { - case DeletionAction.Delete: - // Elimina fisicamente il record - syncSuccess = await DeleteRecordAsync( - restClient, destinationEntity, deletion.DestinationId); - break; - - case DeletionAction.Deactivate: - // Marca il record come inattivo - syncSuccess = await DeactivateRecordAsync( - restClient, destinationEntity, deletion.DestinationId); - break; - - case DeletionAction.Mark: - // Imposta un campo personalizzato - if (string.IsNullOrEmpty(options.MarkField) || string.IsNullOrEmpty(options.MarkValue)) - { - errorMessage = "MarkField e MarkValue devono essere specificati per DeletionAction.Mark"; - _logger.LogWarning(errorMessage); - result.Errors.Add($"KeyValue: {deletion.KeyValue} - {errorMessage}"); - continue; - } - - syncSuccess = await MarkRecordAsync( - restClient, destinationEntity, deletion.DestinationId, - options.MarkField, options.MarkValue); - break; - - default: - errorMessage = $"Azione di cancellazione non supportata: {options.Action}"; - _logger.LogWarning(errorMessage); - result.Errors.Add($"KeyValue: {deletion.KeyValue} - {errorMessage}"); - continue; - } - - if (syncSuccess) - { - // Marca la cancellazione come sincronizzata - await _credentialService.MarkDeletionSyncedAsync(deletion.Id); - result.DeletedRecordsSynced++; - - _logger.LogInformation( - "Cancellazione sincronizzata: KeyValue={KeyValue}, DestinationId={DestinationId}, Action={Action}", - deletion.KeyValue, deletion.DestinationId, options.Action); - } - else - { - result.SyncErrors++; - var error = $"Errore nella sincronizzazione della cancellazione per KeyValue: {deletion.KeyValue}"; - result.Errors.Add(error); - _logger.LogWarning(error); - } - } - catch (Exception ex) - { - result.SyncErrors++; - var error = $"Errore durante la sincronizzazione della cancellazione per KeyValue: {deletion.KeyValue} - {ex.Message}"; - result.Errors.Add(error); - _logger.LogError(ex, "Errore nella sincronizzazione della cancellazione per {KeyValue}", - deletion.KeyValue); - } + await ExecuteBatchedSalesforceDeletionsAsync( + salesforceClient, destinationEntity, pendingDeletions, options, result); + } + else + { + await ExecuteSequentialDeletionsAsync( + restClient, destinationEntity, pendingDeletions, options, result); } result.IsSuccess = result.SyncErrors == 0; @@ -170,6 +113,183 @@ public class DeletionSyncService : IDeletionSyncService return result; } + /// + /// Esegue le cancellazioni in batch via Salesforce Composite API. + /// Riduce N round-trip HTTP a ceil(N/25) batch in parallelo. + /// + private async Task ExecuteBatchedSalesforceDeletionsAsync( + SalesforceServiceClient salesforceClient, + string destinationEntity, + List pendingDeletions, + DeletionSyncOptions options, + DeletionSyncResult result) + { + // Per Mark serve MarkField e MarkValue: validazione preventiva (un solo log) + if (options.Action == DeletionAction.Mark && + (string.IsNullOrEmpty(options.MarkField) || string.IsNullOrEmpty(options.MarkValue))) + { + const string err = "MarkField e MarkValue devono essere specificati per DeletionAction.Mark"; + _logger.LogWarning(err); + foreach (var d in pendingDeletions) + { + result.SyncErrors++; + result.Errors.Add($"KeyValue: {d.KeyValue} - {err}"); + } + return; + } + + // Mappa entityId → KeyAssociation per ricostruire l'associazione dal risultato batch + var deletionsById = pendingDeletions + .Where(d => !string.IsNullOrEmpty(d.DestinationId)) + .GroupBy(d => d.DestinationId) + .ToDictionary(g => g.Key, g => g.First()); // se duplicati, prima occorrenza + + var entityIds = deletionsById.Keys.ToList(); + if (entityIds.Count == 0) + return; + + _logger.LogInformation("DELETION SYNC (Salesforce batched): {Count} record, action={Action}", + entityIds.Count, options.Action); + + List batchResults; + try + { + switch (options.Action) + { + case DeletionAction.Delete: + batchResults = await salesforceClient.BatchDeleteEntitiesAsync(destinationEntity, entityIds); + break; + + case DeletionAction.Deactivate: + // Aggiorna IsActive/Active = false in batch. + // Non sappiamo a priori quale dei due campi esista sull'SObject: proviamo IsActive, + // se Salesforce ritorna errore il record verrà segnalato come fallito. + var deactivateUpdates = entityIds.ToDictionary( + id => id, + _ => (Dictionary)new Dictionary + { + { "IsActive", false } + }); + batchResults = await salesforceClient.BatchUpdateEntitiesAsync(destinationEntity, deactivateUpdates); + break; + + case DeletionAction.Mark: + batchResults = await salesforceClient.BatchPatchSingleFieldAsync( + destinationEntity, entityIds, options.MarkField!, options.MarkValue!); + break; + + default: + _logger.LogWarning("DELETION SYNC: azione non supportata: {Action}", options.Action); + foreach (var d in pendingDeletions) + { + result.SyncErrors++; + result.Errors.Add($"KeyValue: {d.KeyValue} - Azione non supportata: {options.Action}"); + } + return; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "DELETION SYNC: errore nell'esecuzione del batch Salesforce"); + foreach (var d in pendingDeletions) + { + result.SyncErrors++; + result.Errors.Add($"KeyValue: {d.KeyValue} - {ex.Message}"); + } + return; + } + + // Aggiorna lo stato delle cancellazioni in DB in parallelo per i record sincronizzati con successo + var markSyncedTasks = new List(); + foreach (var br in batchResults) + { + if (!deletionsById.TryGetValue(br.EntityId ?? string.Empty, out var deletion)) + continue; + + if (br.Success) + { + result.DeletedRecordsSynced++; + markSyncedTasks.Add(_credentialService.MarkDeletionSyncedAsync(deletion.Id)); + _logger.LogDebug( + "DELETION SYNC: KeyValue={KeyValue}, DestinationId={DestinationId}, Action={Action} OK", + deletion.KeyValue, deletion.DestinationId, options.Action); + } + else + { + result.SyncErrors++; + var msg = $"KeyValue: {deletion.KeyValue} - {br.ErrorMessage ?? "Unknown error"}"; + result.Errors.Add(msg); + _logger.LogWarning("DELETION SYNC fallita: {Msg}", msg); + } + } + + if (markSyncedTasks.Count > 0) + await Task.WhenAll(markSyncedTasks); + } + + /// + /// Fallback sequenziale per client REST non Salesforce. + /// + private async Task ExecuteSequentialDeletionsAsync( + IRestServiceClient restClient, + string destinationEntity, + List pendingDeletions, + DeletionSyncOptions options, + DeletionSyncResult result) + { + foreach (var deletion in pendingDeletions) + { + try + { + bool syncSuccess = false; + string errorMessage = ""; + + switch (options.Action) + { + case DeletionAction.Delete: + syncSuccess = await DeleteRecordAsync(restClient, destinationEntity, deletion.DestinationId); + break; + case DeletionAction.Deactivate: + syncSuccess = await DeactivateRecordAsync(restClient, destinationEntity, deletion.DestinationId); + break; + case DeletionAction.Mark: + if (string.IsNullOrEmpty(options.MarkField) || string.IsNullOrEmpty(options.MarkValue)) + { + errorMessage = "MarkField e MarkValue devono essere specificati per DeletionAction.Mark"; + _logger.LogWarning(errorMessage); + result.Errors.Add($"KeyValue: {deletion.KeyValue} - {errorMessage}"); + continue; + } + syncSuccess = await MarkRecordAsync(restClient, destinationEntity, deletion.DestinationId, + options.MarkField, options.MarkValue); + break; + default: + errorMessage = $"Azione di cancellazione non supportata: {options.Action}"; + _logger.LogWarning(errorMessage); + result.Errors.Add($"KeyValue: {deletion.KeyValue} - {errorMessage}"); + continue; + } + + if (syncSuccess) + { + await _credentialService.MarkDeletionSyncedAsync(deletion.Id); + result.DeletedRecordsSynced++; + } + else + { + result.SyncErrors++; + result.Errors.Add($"Errore nella sincronizzazione della cancellazione per KeyValue: {deletion.KeyValue}"); + } + } + catch (Exception ex) + { + result.SyncErrors++; + result.Errors.Add($"KeyValue: {deletion.KeyValue} - {ex.Message}"); + _logger.LogError(ex, "Errore nella sincronizzazione della cancellazione per {KeyValue}", deletion.KeyValue); + } + } + } + /// /// Elimina fisicamente un record dalla destinazione /// diff --git a/Data_Coupler/Services/ScheduledProfileExecutionService.cs b/Data_Coupler/Services/ScheduledProfileExecutionService.cs index 6fbb4cf..af3362b 100644 --- a/Data_Coupler/Services/ScheduledProfileExecutionService.cs +++ b/Data_Coupler/Services/ScheduledProfileExecutionService.cs @@ -895,10 +895,45 @@ public class ScheduledProfileExecutionService : IScheduledProfileExecutionServic // Crea lista indicizzata per mantenere il record number var indexedRecords = sourceRecords.Select((record, index) => new { Record = record, RecordNumber = index + 1 }).ToList(); - _logger.LogInformation("COMPOSITE SCHEDULED: Inizio analisi parallela di {RecordCount} record", indexedRecords.Count); + _logger.LogInformation("COMPOSITE SCHEDULED: Inizio analisi di {RecordCount} record", indexedRecords.Count); var analysisStartTime = DateTime.UtcNow; - // Processa tutti i record in parallelo + // === STEP A: Bulk Pre-Discovery (1 query SQLite + poche SOQL IN invece di 2N+N) === + // Pre-calcolo locale: source key per ogni record (operazione thread-safe) + var sourceKeyByRecordIndex = new Dictionary(indexedRecords.Count); + foreach (var idx in indexedRecords) + { + var key = GenerateSourceKey(idx.Record, profile.SourceKeyField); + if (!string.IsNullOrEmpty(key)) + sourceKeyByRecordIndex[idx.RecordNumber] = key; + } + + Dictionary associationsByKey = new(StringComparer.Ordinal); + if (currentUseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField) && sourceKeyByRecordIndex.Count > 0) + { + var commonRequest = new PreDiscoveryRequest + { + SourceKeyField = profile.SourceKeyField, + DestinationEntity = currentEntityName, + CredentialName = currentCredentialName, + DestinationKeyField = "Id", + FieldMappings = fieldMappings, + RestClient = restClient, + EnablePreDiscovery = true, + UseParallelMethod = true, + IsScheduledTransfer = true, + SourceType = profile.SourceType + }; + + associationsByKey = await _associationService.BatchFindOrCreateAssociationsAsync( + sourceKeyByRecordIndex.Values, commonRequest); + + _logger.LogInformation("COMPOSITE SCHEDULED: Bulk Pre-Discovery completata - {Found}/{Total} associazioni risolte", + associationsByKey.Count, sourceKeyByRecordIndex.Count); + } + + // === STEP B: Analisi locale parallela per decidere create/update/skip === + // Nessuna chiamata DB o REST in questo loop — solo memoria. var processingTasks = indexedRecords.Select(async indexedRecord => { try @@ -916,49 +951,7 @@ public class ScheduledProfileExecutionService : IScheduledProfileExecutionServic // Analizza le associazioni per capire se aggiornare, creare o saltare if (currentUseRecordAssociations && !string.IsNullOrEmpty(sourceKey)) { - _logger.LogDebug("COMPOSITE SCHEDULED: Cerco associazione per KeyValue: '{KeyValue}', Entity: '{Entity}', Credential: '{Credential}'", - sourceKey, currentEntityName, currentCredentialName); - - // Cerca associazione esistente usando il metodo parallelo - var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync( - sourceKey, currentEntityName, currentCredentialName); - - // FALLBACK: Se non troviamo l'associazione con tutti i parametri, proviamo solo con il KeyValue - if (existingAssociation == null) - { - existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(sourceKey); - if (existingAssociation != null) - { - // Verifica compatibilità - if (existingAssociation.DestinationEntity != currentEntityName || - existingAssociation.RestCredentialName != currentCredentialName) - { - existingAssociation = null; - } - } - } - - // 🔍 PRE-DISCOVERY: Usa il servizio centralizzato - if (existingAssociation == null && !string.IsNullOrEmpty(profile.SourceKeyField)) - { - var preDiscoveryRequest = new PreDiscoveryRequest - { - SourceKey = sourceKey, - SourceKeyField = profile.SourceKeyField, - DestinationEntity = currentEntityName, - CredentialName = currentCredentialName, - DestinationKeyField = "Id", - FieldMappings = fieldMappings, - RestClient = restClient, - CurrentDataHash = currentDataHash, - EnablePreDiscovery = true, - UseParallelMethod = true, // Usa metodi paralleli thread-safe - IsScheduledTransfer = true, - SourceType = profile.SourceType - }; - - existingAssociation = await _associationService.FindOrCreateAssociationAsync(preDiscoveryRequest); - } + associationsByKey.TryGetValue(sourceKey, out var existingAssociation); if (existingAssociation != null && existingAssociation.IsActive) {