[Feature/Perf] Ottimizzazioni bulk pre-discovery, batch deletion sync e supporto OLE DB / Salesforce client_credentials

## Bulk Pre-Discovery e riduzione query SQLite/SOQL

### KeyAssociationService — FindAssociationsByKeyValuesBulkAsync (nuovo)
- Aggiunta query bulk 'WHERE KeyValue IN (...)' per recuperare N associazioni con 1 sola query SQLite
  (chunking a 500 chiavi per rispettare il limite ~999 parametri di SQLite)
- Aggiunta interfaccia IKeyAssociationService e delegata in DataConnectionCredentialService / IDataConnectionCredentialService

### AssociationService — BatchFindOrCreateAssociationsAsync (nuovo)
- Nuovo metodo bulk che sostituisce i loop per-record durante l'analisi composite:
  1) 1 query SQLite bulk per tutte le chiavi
  2) Per le chiavi non trovate: SOQL 'IN (...)' su Salesforce in chunk da 200 via BatchExecuteQueriesAsync
     (ceil(K/25) HTTP Composite call invece di K singole)
  3) Salvataggio parallelo delle associazioni pre-discovery scoperte
- Fallback per-record automatico per client REST non Salesforce
- Aggiornata interfaccia IAssociationService con documentazione XML completa

### DataCoupler.razor.cs — STEP A/B nel flusso COMPOSITE
- Pre-Discovery spostata FUORI dal loop parallelo (STEP A, prima dell'analisi)
- associationsByKey pre-popolato con BatchFindOrCreateAssociationsAsync
- STEP B: il loop parallelo usa TryGetValue O(1) invece di query async per record
- Rimozione blocco ~40 righe di per-record lookup / fallback duplicati

## Salesforce Composite API — Batch Delete e Patch

### SalesforceServiceClient — metodi batch (nuovi)
- BatchDeleteEntitiesAsync: elimina N record con ceil(N/25) Composite call invece di N
- BatchPatchSingleFieldAsync: aggiorna un singolo campo su N record tramite BatchUpdateEntitiesAsync

### DeletionSyncService — refactoring batch
- ExecuteBatchedSalesforceDeletionsAsync: orchestrazione batch per Delete / Deactivate / Mark su Salesforce
- ExecuteSequentialDeletionsAsync: loop sequenziale esistente estratto in metodo riutilizzabile
- Dispatcher: Salesforce -> batch Composite, altri client REST -> sequenziale

## Supporto OLE DB (database)

### DatabaseSchemaProviderFactory
- Aggiunto case DatabaseType.OleDb -> new OleDbSchemaProvider() nel factory switch

### DatabaseMethod.cs
- Aggiunto metodo IsOleDbConnection() (parallelo a IsOdbcConnection())
- Query validation e manager temporaneo estesi a OLE DB oltre che ODBC
- GetLimitedQuery: aggiunto case OleDb -> 'SELECT TOP N FROM (subquery)'

## Salesforce OAuth2 — fix client_credentials

### CredentialService.cs
- Aggiunto 'GrantType' alla HashSet serviceSpecificKeys per preservarlo nella serializzazione AdditionalParameters

### DataConnectionCredentialService.cs
- Refactored BuildRestServiceOptions in helper statico riutilizzato da entrambi i metodi GetRestServiceOptions
- Mapping coerente ClientId/ClientSecret/GrantType per Salesforce (allineato a DataConnectionFactory)
- TestSalesforceOAuthLogin: branch esplicito per client_credentials (no username/password/token)
  con validazione preventiva ClientId+ClientSecret obbligatori
- Log flow label (password|client_credentials) in tutti i messaggi di autenticazione

## VS Code tasks

### .vscode/tasks.json
- Rimosso task generico 'Publish Data_Coupler'
- Aggiunti due task separati: win-x64 e win-x86, entrambi SingleFile + Self-Contained + ReadyToRun
This commit is contained in:
Alessio Dal Santo
2026-05-28 11:15:18 +02:00
parent 82e0d6bc77
commit 344853fde9
13 changed files with 886 additions and 223 deletions
+29 -7
View File
@@ -29,17 +29,39 @@
} }
}, },
{ {
"label": "Publish Data_Coupler", "label": "Publish Data_Coupler Temp SingleFile Self-Contained Ready-To-Run win-x64",
"detail": "Publish the Data Coupler 64-bit with a Single File, Self Contained, Ready-To-Run",
"type": "shell", "type": "shell",
"command": "dotnet", "command": "dotnet",
"args": [ "args": [
"publish", "publish",
"--configuration", "Data_Coupler/Data_Coupler.csproj",
"Release", "-c", "Release",
"--output", "-r", "win-x64",
"${workspaceFolder}/publish", "--self-contained", "true",
"--project", "-p:PublishSingleFile=true",
"Data_Coupler/Data_Coupler.csproj" "-p:PublishReadyToRun=true",
"-p:PublishTrimmed=false",
"-o", "C:\\Temp\\Publish\\Data_Coupler"
],
"group": "build",
"problemMatcher": []
},
{
"label": "Publish Data_Coupler Temp SingleFile Self-Contained Ready-To-Run win-x86",
"detail": "Publish the Data Coupler 32-bit with a Single File, Self Contained, Ready-To-Run",
"type": "shell",
"command": "dotnet",
"args": [
"publish",
"Data_Coupler/Data_Coupler.csproj",
"-c", "Release",
"-r", "win-x86",
"--self-contained", "true",
"-p:PublishSingleFile=true",
"-p:PublishReadyToRun=true",
"-p:PublishTrimmed=false",
"-o", "C:\\Temp\\Publish\\Data_Coupler_x86"
], ],
"group": "build", "group": "build",
"problemMatcher": [] "problemMatcher": []
@@ -806,11 +806,11 @@ public class CredentialService : ICredentialService
} }
// Copia tutti i parametri che non sono specifici del servizio // Copia tutti i parametri che non sono specifici del servizio
var serviceSpecificKeys = new HashSet<string> var serviceSpecificKeys = new HashSet<string>
{ {
"CompanyDatabase", "Language", "Version", "UseTrustedConnection", "CompanyDatabase", "Language", "Version", "UseTrustedConnection",
"SecurityToken", "ClientId", "ClientSecret", "ApiVersion", "SecurityToken", "ClientId", "ClientSecret", "ApiVersion",
"IsSandbox", "UseSoapApi", "RefreshToken", "AccessToken", "TokenExpiry" "IsSandbox", "UseSoapApi", "GrantType", "RefreshToken", "AccessToken", "TokenExpiry"
}; };
foreach (var param in additionalParams) foreach (var param in additionalParams)
@@ -112,6 +112,16 @@ public interface IKeyAssociationService
/// </summary> /// </summary>
Task<KeyAssociation?> FindAssociationByKeyValueParallelAsync(string keyValue); Task<KeyAssociation?> FindAssociationByKeyValueParallelAsync(string keyValue);
/// <summary>
/// Versione bulk: ricerca in un colpo solo tutte le associazioni attive per la combinazione
/// (KeyValue ∈ keyValues, DestinationEntity, RestCredentialName) usando una query SQL IN(...).
/// Riduce drasticamente le query SQLite quando si processano molti record.
/// </summary>
Task<Dictionary<string, KeyAssociation>> FindAssociationsByKeyValuesBulkAsync(
IEnumerable<string> keyValues,
string destinationEntity,
string restCredentialName);
/// <summary> /// <summary>
/// Versione thread-safe per operazioni parallele - Elimina associazione /// Versione thread-safe per operazioni parallele - Elimina associazione
/// </summary> /// </summary>
@@ -341,9 +341,9 @@ public class KeyAssociationService : IKeyAssociationService
var options = new DbContextOptionsBuilder<CredentialDbContext>() var options = new DbContextOptionsBuilder<CredentialDbContext>()
.UseSqlite(_context.Database.GetConnectionString()) .UseSqlite(_context.Database.GetConnectionString())
.Options; .Options;
using var parallelContext = new CredentialDbContext(options); using var parallelContext = new CredentialDbContext(options);
try try
{ {
return await parallelContext.KeyAssociations return await parallelContext.KeyAssociations
@@ -358,6 +358,63 @@ public class KeyAssociationService : IKeyAssociationService
} }
} }
/// <summary>
/// Bulk lookup delle associazioni: una sola query con WHERE KeyValue IN (...).
/// Per N chiavi sostituisce fino a 2N query SQLite del flusso per-record.
/// </summary>
public async Task<Dictionary<string, KeyAssociation>> FindAssociationsByKeyValuesBulkAsync(
IEnumerable<string> keyValues,
string destinationEntity,
string restCredentialName)
{
var distinctKeys = keyValues
.Where(k => !string.IsNullOrEmpty(k))
.Distinct()
.ToList();
if (distinctKeys.Count == 0)
return new Dictionary<string, KeyAssociation>(StringComparer.Ordinal);
try
{
// SQLite ha un limite hardcoded di ~999 parametri per query: chunk per sicurezza.
const int chunkSize = 500;
var result = new Dictionary<string, KeyAssociation>(StringComparer.Ordinal);
for (int i = 0; i < distinctKeys.Count; i += chunkSize)
{
var chunk = distinctKeys.Skip(i).Take(chunkSize).ToList();
var associations = await _context.KeyAssociations
.AsNoTracking()
.Where(ka => ka.IsActive &&
ka.DestinationEntity == destinationEntity &&
ka.RestCredentialName == restCredentialName &&
chunk.Contains(ka.KeyValue))
.ToListAsync();
// Se ci sono duplicati (KeyValue ripetuto), tieni il più recente
foreach (var assoc in associations
.GroupBy(a => a.KeyValue)
.Select(g => g.OrderByDescending(a => a.UpdatedAt ?? a.CreatedAt).First()))
{
result[assoc.KeyValue] = assoc;
}
}
_logger.LogDebug("BULK: Ricerca associazioni completata - {Found}/{Total} match per {Entity}/{Credential}",
result.Count, distinctKeys.Count, destinationEntity, restCredentialName);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "BULK: Errore nella ricerca bulk delle associazioni ({Count} chiavi, Entity={Entity})",
distinctKeys.Count, destinationEntity);
throw;
}
}
/// <summary> /// <summary>
/// Versione thread-safe per operazioni parallele - Delete association /// Versione thread-safe per operazioni parallele - Delete association
/// </summary> /// </summary>
@@ -85,6 +85,14 @@ public interface IDataConnectionCredentialService
Task<KeyAssociation?> FindKeyAssociationByValueParallelAsync(string keyValue); Task<KeyAssociation?> FindKeyAssociationByValueParallelAsync(string keyValue);
Task<bool> DeleteKeyAssociationParallelAsync(int id); Task<bool> DeleteKeyAssociationParallelAsync(int id);
/// <summary>
/// Bulk lookup associazioni - una sola query SQLite per N chiavi.
/// </summary>
Task<Dictionary<string, KeyAssociation>> FindKeyAssociationsByValuesBulkAsync(
IEnumerable<string> keyValues,
string destinationEntity,
string restCredentialName);
// Deletion synchronization operations // Deletion synchronization operations
Task<int> MarkDeletedAssociationsAsync(List<string> sourceKeyValues, string destinationEntity, string restCredentialName); Task<int> MarkDeletedAssociationsAsync(List<string> sourceKeyValues, string destinationEntity, string restCredentialName);
Task<List<KeyAssociation>> GetPendingDeletionsAsync(string destinationEntity, string restCredentialName); Task<List<KeyAssociation>> GetPendingDeletionsAsync(string destinationEntity, string restCredentialName);
@@ -168,16 +168,7 @@ public class DataConnectionCredentialService : IDataConnectionCredentialService
if (credential == null) if (credential == null)
throw new InvalidOperationException($"REST API credential '{credentialName}' not found"); throw new InvalidOperationException($"REST API credential '{credentialName}' not found");
var options = new DataConnection.REST.Configuration.RestServiceOptions var options = BuildRestServiceOptions(credential);
{
BaseUrl = credential.BaseUrl,
ApiKey = credential.ApiKey,
Username = credential.Username,
Password = credential.Password,
AuthToken = credential.AuthToken,
TimeoutSeconds = credential.TimeoutSeconds,
IgnoreSslErrors = credential.IgnoreSslErrors
};
_logger.LogDebug("Created RestServiceOptions for credential: {Name} ({BaseUrl})", _logger.LogDebug("Created RestServiceOptions for credential: {Name} ({BaseUrl})",
credentialName, credential.BaseUrl); credentialName, credential.BaseUrl);
@@ -191,19 +182,42 @@ public class DataConnectionCredentialService : IDataConnectionCredentialService
if (credential == null) if (credential == null)
throw new InvalidOperationException($"REST API credential with ID '{credentialId}' not found"); throw new InvalidOperationException($"REST API credential with ID '{credentialId}' not found");
var options = BuildRestServiceOptions(credential);
_logger.LogDebug("Created RestServiceOptions for credential ID: {Id} ({BaseUrl})",
credentialId, credential.BaseUrl);
return options;
}
private static DataConnection.REST.Configuration.RestServiceOptions BuildRestServiceOptions(RestApiCredential credential)
{
var options = new DataConnection.REST.Configuration.RestServiceOptions var options = new DataConnection.REST.Configuration.RestServiceOptions
{ {
BaseUrl = credential.BaseUrl, BaseUrl = credential.BaseUrl,
ApiKey = credential.ApiKey,
Username = credential.Username, Username = credential.Username,
Password = credential.Password, Password = credential.Password,
AuthToken = credential.AuthToken,
TimeoutSeconds = credential.TimeoutSeconds, TimeoutSeconds = credential.TimeoutSeconds,
IgnoreSslErrors = credential.IgnoreSslErrors IgnoreSslErrors = credential.IgnoreSslErrors
}; };
_logger.LogDebug("Created RestServiceOptions for credential ID: {Id} ({BaseUrl})", // Mapping coerente con DataConnectionFactory.CreateRestServiceClientAsync
credentialId, credential.BaseUrl); switch (credential.ServiceType)
{
case RestServiceType.Salesforce:
options.ApiKey = credential.ClientId;
options.AuthToken = credential.ClientSecret;
options.SalesforceGrantType = credential.GrantType;
break;
case RestServiceType.SapB1ServiceLayer:
options.ApiKey = credential.CompanyDatabase;
options.AuthToken = credential.AuthToken;
break;
default:
options.ApiKey = credential.ApiKey;
options.AuthToken = credential.AuthToken;
break;
}
return options; return options;
} }
@@ -550,8 +564,8 @@ public class DataConnectionCredentialService : IDataConnectionCredentialService
{ {
try try
{ {
_logger.LogInformation("Testing Salesforce authentication for {Name} ({BaseUrl})", _logger.LogInformation("Testing Salesforce authentication for {Name} ({BaseUrl}, GrantType={GrantType})",
credential.Name, credential.BaseUrl); credential.Name, credential.BaseUrl, credential.GrantType);
_logger.LogDebug("Salesforce credential details: Username={Username}, HasPassword={HasPassword}, HasSecurityToken={HasSecurityToken}, HasClientId={HasClientId}, HasClientSecret={HasClientSecret}", _logger.LogDebug("Salesforce credential details: Username={Username}, HasPassword={HasPassword}, HasSecurityToken={HasSecurityToken}, HasClientId={HasClientId}, HasClientSecret={HasClientSecret}",
credential.Username, !string.IsNullOrEmpty(credential.Password), !string.IsNullOrEmpty(credential.SecurityToken), credential.Username, !string.IsNullOrEmpty(credential.Password), !string.IsNullOrEmpty(credential.SecurityToken),
@@ -560,49 +574,69 @@ public class DataConnectionCredentialService : IDataConnectionCredentialService
using var httpClient = new HttpClient(); using var httpClient = new HttpClient();
httpClient.Timeout = TimeSpan.FromSeconds(credential.TimeoutSeconds); httpClient.Timeout = TimeSpan.FromSeconds(credential.TimeoutSeconds);
// Test di autenticazione OAuth2
var tokenUrl = credential.BaseUrl.TrimEnd('/') + "/services/oauth2/token"; var tokenUrl = credential.BaseUrl.TrimEnd('/') + "/services/oauth2/token";
var tokenData = new List<KeyValuePair<string, string>> List<KeyValuePair<string, string>> tokenData;
{ string flowLabel;
new("grant_type", "password"),
new("username", credential.Username ?? "")
};
// Aggiungiamo password + security token se disponibile if (credential.GrantType == CredentialManager.Models.SalesforceGrantType.ClientCredentials)
var password = credential.Password ?? "";
if (!string.IsNullOrEmpty(credential.SecurityToken))
{ {
password += credential.SecurityToken; // Client Credentials flow — server-to-server, no user
} if (string.IsNullOrEmpty(credential.ClientId) || string.IsNullOrEmpty(credential.ClientSecret))
tokenData.Add(new("password", password)); {
return (false, "Flusso client_credentials richiede ClientId e ClientSecret configurati.");
}
// Aggiungiamo client credentials se disponibili tokenData = new List<KeyValuePair<string, string>>
if (!string.IsNullOrEmpty(credential.ClientId)) {
{ new("grant_type", "client_credentials"),
tokenData.Add(new("client_id", credential.ClientId)); new("client_id", credential.ClientId),
new("client_secret", credential.ClientSecret)
};
flowLabel = "client_credentials";
} }
if (!string.IsNullOrEmpty(credential.ClientSecret)) else
{ {
tokenData.Add(new("client_secret", credential.ClientSecret)); // Password flow (default)
var password = credential.Password ?? "";
if (!string.IsNullOrEmpty(credential.SecurityToken))
{
password += credential.SecurityToken;
}
tokenData = new List<KeyValuePair<string, string>>
{
new("grant_type", "password"),
new("username", credential.Username ?? ""),
new("password", password)
};
if (!string.IsNullOrEmpty(credential.ClientId))
{
tokenData.Add(new("client_id", credential.ClientId));
}
if (!string.IsNullOrEmpty(credential.ClientSecret))
{
tokenData.Add(new("client_secret", credential.ClientSecret));
}
flowLabel = "password";
} }
_logger.LogDebug("Posting to Salesforce token URL: {TokenUrl}", tokenUrl); _logger.LogDebug("Posting to Salesforce token URL: {TokenUrl} (flow={Flow})", tokenUrl, flowLabel);
var tokenContent = new FormUrlEncodedContent(tokenData); var tokenContent = new FormUrlEncodedContent(tokenData);
var response = await httpClient.PostAsync(tokenUrl, tokenContent); var response = await httpClient.PostAsync(tokenUrl, tokenContent);
if (response.IsSuccessStatusCode) if (response.IsSuccessStatusCode)
{ {
var responseContent = await response.Content.ReadAsStringAsync(); _logger.LogInformation("Salesforce authentication ({Flow}) successful for {Name}", flowLabel, credential.Name);
_logger.LogInformation("Salesforce authentication successful for {Name}", credential.Name); return (true, $"Autenticazione Salesforce riuscita!\n\nDettagli:\n- Login URL: {credential.BaseUrl}\n- API Version: {credential.ApiVersion}\n- Sandbox: {credential.IsSandbox}\n- Tipo Auth: OAuth2 ({flowLabel})\n- Timeout: {credential.TimeoutSeconds}s");
return (true, $"Autenticazione Salesforce riuscita!\n\nDettagli:\n- Login URL: {credential.BaseUrl}\n- API Version: {credential.ApiVersion}\n- Sandbox: {credential.IsSandbox}\n- Tipo Auth: OAuth2\n- Timeout: {credential.TimeoutSeconds}s");
} }
else else
{ {
var errorContent = await response.Content.ReadAsStringAsync(); var errorContent = await response.Content.ReadAsStringAsync();
_logger.LogWarning("Salesforce authentication failed for {Name}. Status: {StatusCode}, Response: {Response}", _logger.LogWarning("Salesforce authentication ({Flow}) failed for {Name}. Status: {StatusCode}, Response: {Response}",
credential.Name, response.StatusCode, errorContent); flowLabel, credential.Name, response.StatusCode, errorContent);
return (false, $"Autenticazione Salesforce fallita. Status: {response.StatusCode}\nDettagli: {errorContent}"); return (false, $"Autenticazione Salesforce ({flowLabel}) fallita. Status: {response.StatusCode}\nDettagli: {errorContent}");
} }
} }
catch (HttpRequestException ex) catch (HttpRequestException ex)
@@ -1074,6 +1108,14 @@ public class DataConnectionCredentialService : IDataConnectionCredentialService
return await _keyAssociationService.FindAssociationByKeyValueParallelAsync(keyValue); return await _keyAssociationService.FindAssociationByKeyValueParallelAsync(keyValue);
} }
public async Task<Dictionary<string, KeyAssociation>> FindKeyAssociationsByValuesBulkAsync(
IEnumerable<string> keyValues,
string destinationEntity,
string restCredentialName)
{
return await _keyAssociationService.FindAssociationsByKeyValuesBulkAsync(keyValues, destinationEntity, restCredentialName);
}
public async Task<bool> DeleteKeyAssociationParallelAsync(int id) public async Task<bool> DeleteKeyAssociationParallelAsync(int id)
{ {
return await _keyAssociationService.DeleteAssociationParallelAsync(id); return await _keyAssociationService.DeleteAssociationParallelAsync(id);
@@ -19,7 +19,10 @@ public class DatabaseSchemaProviderFactory
{ {
return databaseType switch return databaseType switch
{ {
DatabaseType.SqlServer => new SqlServerSchemaProvider(), DatabaseType.Odbc => new OdbcSchemaProvider(), // Aggiungere qui altri provider quando implementati DatabaseType.SqlServer => new SqlServerSchemaProvider(),
DatabaseType.Odbc => new OdbcSchemaProvider(),
DatabaseType.OleDb => new OleDbSchemaProvider(),
// Aggiungere qui altri provider quando implementati
// DatabaseType.MySql => new MySqlSchemaProvider(), // DatabaseType.MySql => new MySqlSchemaProvider(),
// DatabaseType.PostgreSql => new PostgreSqlSchemaProvider(), // DatabaseType.PostgreSql => new PostgreSqlSchemaProvider(),
// DatabaseType.Oracle => new OracleSchemaProvider(), // DatabaseType.Oracle => new OracleSchemaProvider(),
@@ -126,13 +126,19 @@ namespace DataConnection.REST.Implementations
{ {
_accessToken = tokenResponse.AccessToken; _accessToken = tokenResponse.AccessToken;
_instanceUrl = tokenResponse.InstanceUrl; _instanceUrl = tokenResponse.InstanceUrl;
_tokenExpiry = DateTime.UtcNow.AddSeconds(3600); // Salesforce doesn't always return expires_in
// Se Salesforce restituisce expires_in (es. client_credentials), usalo con un margine di 60s;
// altrimenti (password flow non restituisce expires_in) fallback al default 1h conservativo.
var ttlSeconds = tokenResponse.ExpiresIn.HasValue && tokenResponse.ExpiresIn.Value > 60
? tokenResponse.ExpiresIn.Value - 60
: 3600;
_tokenExpiry = DateTime.UtcNow.AddSeconds(ttlSeconds);
_httpClient.DefaultRequestHeaders.Authorization = _httpClient.DefaultRequestHeaders.Authorization =
new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", _accessToken); new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", _accessToken);
_logger.LogInformation("Salesforce authentication ({Flow}) successful. InstanceUrl={InstanceUrl}, TokenExpiry={Expiry}", _logger.LogInformation("Salesforce authentication ({Flow}) successful. InstanceUrl={InstanceUrl}, TokenExpiry={Expiry} (TTL {Ttl}s, server expires_in={ExpiresIn})",
flowName, _instanceUrl, _tokenExpiry.ToLocalTime()); flowName, _instanceUrl, _tokenExpiry.ToLocalTime(), ttlSeconds, tokenResponse.ExpiresIn?.ToString() ?? "null");
return true; return true;
} }
@@ -1616,6 +1622,13 @@ namespace DataConnection.REST.Implementations
[JsonPropertyName("signature")] [JsonPropertyName("signature")]
public string Signature { get; set; } = string.Empty; public string Signature { get; set; } = string.Empty;
/// <summary>
/// Durata di validità del token in secondi. Presente solo in alcuni flussi OAuth Salesforce
/// (es. client_credentials, JWT bearer); assente in altri (es. username/password) — in tal caso 0/null.
/// </summary>
[JsonPropertyName("expires_in")]
public int? ExpiresIn { get; set; }
} }
private class SalesforceSObjectsResponse private class SalesforceSObjectsResponse
@@ -2161,5 +2174,157 @@ namespace DataConnection.REST.Implementations
}).ToList(); }).ToList();
} }
} }
/// <summary>
/// Elimina N record SObject in batch tramite Salesforce Composite API.
/// Riduce N HTTP calls a ceil(N/25), eseguite in parallelo.
/// </summary>
/// <param name="entityName">Nome SObject (es. "Account").</param>
/// <param name="entityIds">Lista di Id da eliminare.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Risultato per ogni Id (Success/ErrorMessage).</returns>
public async Task<List<CompositeOperationResult>> BatchDeleteEntitiesAsync(
string entityName,
List<string> entityIds,
CancellationToken cancellationToken = default)
{
if (!IsAuthenticated())
{
_logger.LogDebug("Error: Not authenticated to Salesforce. Cannot perform batch delete.");
return new List<CompositeOperationResult>();
}
if (entityIds == null || entityIds.Count == 0)
return new List<CompositeOperationResult>();
const int maxBatchSize = 25;
var batches = new List<(List<string> batch, int startIndex, int batchNumber)>();
for (int i = 0; i < entityIds.Count; i += maxBatchSize)
{
var chunk = entityIds.Skip(i).Take(maxBatchSize).ToList();
batches.Add((chunk, i, (i / maxBatchSize) + 1));
}
_logger.LogDebug($"--- BatchDelete: {entityIds.Count} record in {batches.Count} batch (parallel) ---");
var batchTasks = batches.Select(async b =>
await ExecuteDeleteBatchAsync(entityName, b.batch, b.startIndex, cancellationToken));
var batchResults = await Task.WhenAll(batchTasks);
var allResults = new List<CompositeOperationResult>();
foreach (var r in batchResults) allResults.AddRange(r);
_logger.LogDebug($"All delete batches completed: {allResults.Count(r => r.Success)} success, {allResults.Count(r => !r.Success)} failed");
return allResults;
}
private async Task<List<CompositeOperationResult>> ExecuteDeleteBatchAsync(
string entityName,
List<string> batch,
int startIndex,
CancellationToken cancellationToken)
{
try
{
var compositeUri = $"{_instanceUrl}/services/data/v60.0/composite/";
var compositeRequest = new SalesforceCompositeRequest();
for (int i = 0; i < batch.Count; i++)
{
var entityId = batch[i];
compositeRequest.CompositeRequest.Add(new SalesforceCompositeSubRequest
{
Method = "DELETE",
Url = $"/services/data/v60.0/sobjects/{entityName}/{entityId}",
ReferenceId = $"delete_{startIndex + i}"
// Body intenzionalmente null per DELETE
});
}
var jsonContent = new StringContent(
JsonSerializer.Serialize(compositeRequest, SalesforceJsonOptions),
System.Text.Encoding.UTF8,
"application/json");
var response = await _httpClient.PostAsync(compositeUri, jsonContent, cancellationToken);
if (!response.IsSuccessStatusCode)
{
var errorContent = await response.Content.ReadAsStringAsync(cancellationToken);
_logger.LogDebug($"Salesforce Batch Delete failed: {response.StatusCode} - {errorContent}");
return batch.Select((id, idx) => new CompositeOperationResult
{
ReferenceId = $"delete_{startIndex + idx}",
EntityId = id,
Success = false,
ErrorMessage = $"Batch operation failed: {response.StatusCode} - {errorContent}"
}).ToList();
}
var responseContent = await response.Content.ReadAsStringAsync(cancellationToken);
var compositeResponse = JsonSerializer.Deserialize<SalesforceCompositeResponse>(responseContent, SalesforceJsonOptions);
var results = new List<CompositeOperationResult>();
if (compositeResponse?.CompositeResponse != null)
{
for (int i = 0; i < compositeResponse.CompositeResponse.Count; i++)
{
var sub = compositeResponse.CompositeResponse[i];
var originalId = i < batch.Count ? batch[i] : string.Empty;
var result = new CompositeOperationResult
{
ReferenceId = sub.ReferenceId,
EntityId = originalId,
HttpStatusCode = sub.HttpStatusCode,
// 204 No Content è la risposta di successo standard per DELETE
Success = sub.HttpStatusCode >= 200 && sub.HttpStatusCode < 300
};
if (!result.Success)
result.ErrorMessage = sub.Body?.ToString() ?? "Unknown error";
results.Add(result);
}
}
return results;
}
catch (Exception ex)
{
_logger.LogDebug($"Error during Salesforce batch delete: {ex.Message}");
return batch.Select((id, idx) => new CompositeOperationResult
{
ReferenceId = $"delete_{startIndex + idx}",
EntityId = id,
Success = false,
ErrorMessage = ex.Message
}).ToList();
}
}
/// <summary>
/// Bulk PATCH per aggiornare un singolo campo (es. campo di tombstone/mark-as-deleted) su N record.
/// </summary>
public Task<List<CompositeOperationResult>> BatchPatchSingleFieldAsync(
string entityName,
IEnumerable<string> entityIds,
string fieldName,
object fieldValue,
CancellationToken cancellationToken = default)
{
var updates = entityIds
.Where(id => !string.IsNullOrEmpty(id))
.ToDictionary(
id => id,
_ => (Dictionary<string, object>)new Dictionary<string, object> { { fieldName, fieldValue } });
return BatchUpdateEntitiesAsync(entityName, updates, cancellationToken);
}
} }
} }
@@ -70,15 +70,26 @@ public partial class DataCoupler : ComponentBase
/// <summary> /// <summary>
/// Verifica se la credenziale database selezionata è di tipo ODBC /// Verifica se la credenziale database selezionata è di tipo ODBC
/// </summary> /// </summary>
/// <returns>True se la credenziale è ODBC, altrimenti False</returns>
protected bool IsOdbcConnection() protected bool IsOdbcConnection()
{ {
if (string.IsNullOrEmpty(selectedDatabaseCredential)) if (string.IsNullOrEmpty(selectedDatabaseCredential))
return false; return false;
var credential = databaseCredentials.FirstOrDefault(c => c.Name == selectedDatabaseCredential); var credential = databaseCredentials.FirstOrDefault(c => c.Name == selectedDatabaseCredential);
return credential?.DatabaseType == DatabaseType.Odbc; return credential?.DatabaseType == DatabaseType.Odbc;
} }
/// <summary>
/// Verifica se la credenziale database selezionata è di tipo OLE DB
/// </summary>
protected bool IsOleDbConnection()
{
if (string.IsNullOrEmpty(selectedDatabaseCredential))
return false;
var credential = databaseCredentials.FirstOrDefault(c => c.Name == selectedDatabaseCredential);
return credential?.DatabaseType == DatabaseType.OleDb;
}
/// <summary> /// <summary>
/// Gestisce il cambio di credenziale database selezionata /// Gestisce il cambio di credenziale database selezionata
@@ -621,11 +632,12 @@ public partial class DataCoupler : ComponentBase
return; return;
} }
// Per ODBC, crea un database manager temporaneo se non esiste // Per ODBC e OLE DB, crea un database manager temporaneo se non esiste
var managerToUse = currentDatabaseManager; var managerToUse = currentDatabaseManager;
if (managerToUse == null && IsOdbcConnection()) if (managerToUse == null && (IsOdbcConnection() || IsOleDbConnection()))
{ {
Logger.LogInformation("Creando database manager temporaneo per validazione query ODBC"); Logger.LogInformation("Creando database manager temporaneo per validazione query {Type}",
IsOdbcConnection() ? "ODBC" : "OLE DB");
tempManager = await ConnectionFactory.CreateDatabaseManagerAsync(selectedDatabaseCredential); tempManager = await ConnectionFactory.CreateDatabaseManagerAsync(selectedDatabaseCredential);
managerToUse = tempManager; managerToUse = tempManager;
} }
@@ -661,8 +673,8 @@ public partial class DataCoupler : ComponentBase
Logger.LogInformation("Query validata con successo: {ColumnCount} colonne", queryColumns.Count); Logger.LogInformation("Query validata con successo: {ColumnCount} colonne", queryColumns.Count);
// Per ODBC, salva il manager se non era già presente // Per ODBC e OLE DB, salva il manager temporaneo per riuso
if (IsOdbcConnection() && currentDatabaseManager == null && tempManager != null) if ((IsOdbcConnection() || IsOleDbConnection()) && currentDatabaseManager == null && tempManager != null)
{ {
currentDatabaseManager = tempManager; currentDatabaseManager = tempManager;
tempManager = null; // Non distruggerlo nel finally tempManager = null; // Non distruggerlo nel finally
@@ -747,6 +759,7 @@ public partial class DataCoupler : ComponentBase
return databaseType switch return databaseType switch
{ {
DatabaseType.SqlServer => $"SELECT TOP {limit} * FROM ({baseQuery}) AS subquery", DatabaseType.SqlServer => $"SELECT TOP {limit} * FROM ({baseQuery}) AS subquery",
DatabaseType.OleDb => $"SELECT TOP {limit} * FROM ({baseQuery}) AS subquery",
DatabaseType.Oracle => $"SELECT * FROM ({baseQuery}) WHERE ROWNUM <= {limit}", DatabaseType.Oracle => $"SELECT * FROM ({baseQuery}) WHERE ROWNUM <= {limit}",
DatabaseType.MySql => $"{baseQuery} LIMIT {limit}", DatabaseType.MySql => $"{baseQuery} LIMIT {limit}",
DatabaseType.PostgreSql => $"{baseQuery} LIMIT {limit}", DatabaseType.PostgreSql => $"{baseQuery} LIMIT {limit}",
+35 -44
View File
@@ -3491,10 +3491,42 @@ public partial class DataCoupler : ComponentBase
// Crea lista indicizzata per mantenere il record number // Crea lista indicizzata per mantenere il record number
var indexedRecords = records.Select((record, index) => new { Record = record, RecordNumber = index + 1 }).ToList(); var indexedRecords = records.Select((record, index) => new { Record = record, RecordNumber = index + 1 }).ToList();
Logger.LogInformation("COMPOSITE: Inizio analisi parallela di {RecordCount} record", indexedRecords.Count); Logger.LogInformation("COMPOSITE: Inizio analisi di {RecordCount} record", indexedRecords.Count);
var analysisStartTime = DateTime.UtcNow; var analysisStartTime = DateTime.UtcNow;
// Processa tutti i record in parallelo // === STEP A: Bulk Pre-Discovery (1 query SQLite + poche SOQL IN invece di 2N+N) ===
var sourceKeysForBulk = new List<string>(indexedRecords.Count);
foreach (var idx in indexedRecords)
{
var key = GenerateSourceKey(idx.Record);
if (!string.IsNullOrEmpty(key))
sourceKeysForBulk.Add(key);
}
Dictionary<string, KeyAssociation> associationsByKey = new(StringComparer.Ordinal);
if (currentUseRecordAssociations && !string.IsNullOrEmpty(currentSourceKeyField) && sourceKeysForBulk.Count > 0)
{
var commonRequest = new PreDiscoveryRequest
{
SourceKeyField = currentSourceKeyField,
DestinationEntity = currentEntityName,
CredentialName = currentCredentialName,
DestinationKeyField = GetEntityIdField(),
FieldMappings = currentFieldMappings,
RestClient = currentRestClient,
EnablePreDiscovery = true,
UseParallelMethod = true,
IsScheduledTransfer = false
};
associationsByKey = await AssociationService.BatchFindOrCreateAssociationsAsync(
sourceKeysForBulk, commonRequest);
Logger.LogInformation("COMPOSITE: Bulk Pre-Discovery completata - {Found}/{Total} associazioni risolte",
associationsByKey.Count, sourceKeysForBulk.Count);
}
// === STEP B: Analisi locale parallela (no I/O) ===
var processingTasks = indexedRecords.Select(async indexedRecord => var processingTasks = indexedRecords.Select(async indexedRecord =>
{ {
try try
@@ -3513,48 +3545,7 @@ public partial class DataCoupler : ComponentBase
// Analizza le associazioni per capire se aggiornare, creare o saltare // Analizza le associazioni per capire se aggiornare, creare o saltare
if (currentUseRecordAssociations && !string.IsNullOrEmpty(sourceKey)) if (currentUseRecordAssociations && !string.IsNullOrEmpty(sourceKey))
{ {
Logger.LogDebug("COMPOSITE PARALLEL: Cerco associazione per KeyValue: '{KeyValue}', Entity: '{Entity}', Credential: '{Credential}'", associationsByKey.TryGetValue(sourceKey, out var existingAssociation);
sourceKey, currentEntityName, currentCredentialName);
// Usa i metodi paralleli per le operazioni di database
var existingAssociation = await CredentialService.FindKeyAssociationByValueParallelAsync(
sourceKey, currentEntityName, currentCredentialName);
// FALLBACK: Se non troviamo l'associazione con tutti i parametri, proviamo solo con il KeyValue
if (existingAssociation == null)
{
existingAssociation = await CredentialService.FindKeyAssociationByValueParallelAsync(sourceKey);
if (existingAssociation != null)
{
// Verifica compatibilità
if (existingAssociation.DestinationEntity != currentEntityName ||
existingAssociation.RestCredentialName != currentCredentialName)
{
existingAssociation = null;
}
}
}
// 🔍 PRE-DISCOVERY: Usa il servizio centralizzato
if (existingAssociation == null)
{
var preDiscoveryRequest = new PreDiscoveryRequest
{
SourceKey = sourceKey,
SourceKeyField = currentSourceKeyField,
DestinationEntity = currentEntityName,
CredentialName = currentCredentialName,
DestinationKeyField = GetEntityIdField(),
FieldMappings = currentFieldMappings,
RestClient = currentRestClient,
CurrentDataHash = currentDataHash,
EnablePreDiscovery = true,
UseParallelMethod = true, // Usa metodi paralleli thread-safe
IsScheduledTransfer = false
};
existingAssociation = await AssociationService.FindOrCreateAssociationAsync(preDiscoveryRequest);
}
if (existingAssociation != null && existingAssociation.IsActive) if (existingAssociation != null && existingAssociation.IsActive)
{ {
+239
View File
@@ -1,11 +1,13 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text;
using System.Text.Json; using System.Text.Json;
using System.Threading.Tasks; using System.Threading.Tasks;
using CredentialManager.Models; using CredentialManager.Models;
using CredentialManager.Services; using CredentialManager.Services;
using DataConnection.CredentialManagement.Interfaces; using DataConnection.CredentialManagement.Interfaces;
using DataConnection.REST.Implementations;
using DataConnection.REST.Interfaces; using DataConnection.REST.Interfaces;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@@ -69,6 +71,227 @@ public class AssociationService : IAssociationService
return null; return null;
} }
/// <summary>
/// Versione bulk del find-or-create — vedi <see cref="IAssociationService.BatchFindOrCreateAssociationsAsync"/>.
/// </summary>
public async Task<Dictionary<string, KeyAssociation>> BatchFindOrCreateAssociationsAsync(
IEnumerable<string> sourceKeys,
PreDiscoveryRequest commonRequest)
{
if (commonRequest == null)
throw new ArgumentNullException(nameof(commonRequest));
var distinctKeys = sourceKeys
.Where(k => !string.IsNullOrEmpty(k))
.Distinct()
.ToList();
var result = new Dictionary<string, KeyAssociation>(StringComparer.Ordinal);
if (distinctKeys.Count == 0)
return result;
// STEP 1 — Bulk lookup nel DB locale (1 query SQLite invece di N)
Dictionary<string, KeyAssociation> localMatches;
try
{
localMatches = await _credentialService.FindKeyAssociationsByValuesBulkAsync(
distinctKeys, commonRequest.DestinationEntity, commonRequest.CredentialName);
}
catch (Exception ex)
{
_logger.LogError(ex, "BULK PRE-DISCOVERY: bulk lookup locale fallito, fallback per-record");
// Fallback per-record (mantiene comportamento esistente in caso di problemi)
foreach (var key in distinctKeys)
{
var req = ClonePreDiscoveryRequest(commonRequest, key);
var found = await FindOrCreateAssociationAsync(req);
if (found != null) result[key] = found;
}
return result;
}
foreach (var kvp in localMatches)
result[kvp.Key] = kvp.Value;
var missingKeys = distinctKeys.Where(k => !result.ContainsKey(k)).ToList();
_logger.LogInformation("BULK PRE-DISCOVERY: {Local}/{Total} associazioni trovate localmente, {Missing} da cercare nella destinazione",
localMatches.Count, distinctKeys.Count, missingKeys.Count);
if (missingKeys.Count == 0 || !commonRequest.EnablePreDiscovery)
return result;
// STEP 2 — Pre-Discovery batched sulla destinazione REST
// Verifica che il campo chiave sia mappato
if (string.IsNullOrEmpty(commonRequest.SourceKeyField) ||
!commonRequest.FieldMappings.TryGetValue(commonRequest.SourceKeyField, out var mappedField))
{
_logger.LogWarning("BULK PRE-DISCOVERY: campo chiave '{SourceKeyField}' non mappato; skip discovery",
commonRequest.SourceKeyField);
return result;
}
// Solo SalesforceServiceClient supporta SOQL IN ottimizzata; per altri client si ricade al per-record.
if (commonRequest.RestClient is SalesforceServiceClient sfClient)
{
var discovered = await PerformBulkPreDiscoverySalesforceAsync(
sfClient, missingKeys, mappedField, commonRequest);
foreach (var kvp in discovered)
result[kvp.Key] = kvp.Value;
}
else
{
_logger.LogDebug("BULK PRE-DISCOVERY: client REST non Salesforce, fallback per-record per {Count} chiavi", missingKeys.Count);
foreach (var key in missingKeys)
{
var req = ClonePreDiscoveryRequest(commonRequest, key);
var found = await PerformPreDiscoveryAsync(req);
if (found != null) result[key] = found;
}
}
return result;
}
/// <summary>
/// Pre-Discovery batched specifica per Salesforce: usa SOQL <c>WHERE field IN (...)</c>
/// per recuperare in pochissime chiamate API tutti i record che matchano una qualsiasi delle chiavi mancanti.
/// </summary>
private async Task<Dictionary<string, KeyAssociation>> PerformBulkPreDiscoverySalesforceAsync(
SalesforceServiceClient sfClient,
List<string> missingKeys,
string mappedDestinationField,
PreDiscoveryRequest commonRequest)
{
var output = new Dictionary<string, KeyAssociation>(StringComparer.Ordinal);
// Chunk per stare sotto il limite SOQL/URL (~16 KB GET): ~200 valori per query
const int chunkSize = 200;
var queries = new List<string>();
for (int i = 0; i < missingKeys.Count; i += chunkSize)
{
var chunk = missingKeys.Skip(i).Take(chunkSize).ToList();
var sb = new StringBuilder();
sb.Append("SELECT Id, ");
sb.Append(mappedDestinationField);
sb.Append(" FROM ");
sb.Append(commonRequest.DestinationEntity);
sb.Append(" WHERE ");
sb.Append(mappedDestinationField);
sb.Append(" IN (");
sb.Append(string.Join(",", chunk.Select(v => $"'{v.Replace("'", "\\'")}'")));
sb.Append(')');
queries.Add(sb.ToString());
}
_logger.LogInformation("BULK PRE-DISCOVERY: {QueryCount} SOQL IN-query (~{ChunkSize} chiavi/query, Composite API ceil(N/25) HTTP call)",
queries.Count, chunkSize);
// BatchExecuteQueriesAsync raggruppa fino a 25 query in 1 Composite request
var batchResults = await sfClient.BatchExecuteQueriesAsync(queries);
// Indicizza i risultati per chiave: dal record letto leggiamo il valore di mappedDestinationField
var entityIdByKey = new Dictionary<string, string>(StringComparer.Ordinal);
foreach (var batchResult in batchResults.Where(r => r.Success && r.Records != null))
{
foreach (var record in batchResult.Records)
{
if (!record.TryGetValue(mappedDestinationField, out var keyVal) || keyVal == null)
continue;
var keyStr = keyVal.ToString();
if (string.IsNullOrEmpty(keyStr))
continue;
var idStr = ExtractDestinationId(record);
if (string.IsNullOrEmpty(idStr))
continue;
// In caso di duplicati in Salesforce, prendiamo il primo
if (!entityIdByKey.ContainsKey(keyStr))
entityIdByKey[keyStr] = idStr;
}
}
_logger.LogInformation("BULK PRE-DISCOVERY: trovati {Found}/{Missing} record esistenti nella destinazione",
entityIdByKey.Count, missingKeys.Count);
if (entityIdByKey.Count == 0)
return output;
// Salvataggio associazioni Pre-Discovery in parallelo
var saveTasks = entityIdByKey.Select(async kvp =>
{
try
{
var newAssoc = new KeyAssociation
{
KeyValue = kvp.Key,
SourceKeyField = commonRequest.SourceKeyField,
DestinationKeyField = commonRequest.DestinationKeyField ?? "Id",
MappedDestinationField = mappedDestinationField,
DestinationEntity = commonRequest.DestinationEntity,
DestinationId = kvp.Value,
RestCredentialName = commonRequest.CredentialName,
CreatedAt = DateTime.UtcNow,
LastVerifiedAt = DateTime.UtcNow,
IsActive = true,
AdditionalInfo = JsonSerializer.Serialize(new Dictionary<string, object>
{
{ "CreatedBy", "PreDiscovery" },
{ "DiscoveredAt", DateTime.UtcNow },
{ "MappingCount", commonRequest.FieldMappings.Count },
{ "BulkPreDiscovery", true },
{ "ScheduledTransfer", commonRequest.IsScheduledTransfer },
{ "SourceType", commonRequest.SourceType ?? string.Empty }
})
};
var id = commonRequest.UseParallelMethod
? await _credentialService.SaveKeyAssociationParallelAsync(newAssoc)
: await _credentialService.SaveKeyAssociationAsync(newAssoc);
newAssoc.Id = id;
return (kvp.Key, newAssoc);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "BULK PRE-DISCOVERY: errore nel salvataggio associazione per KeyValue '{KeyValue}'", kvp.Key);
return (kvp.Key, (KeyAssociation?)null);
}
});
var savedResults = await Task.WhenAll(saveTasks);
foreach (var (key, assoc) in savedResults)
{
if (assoc != null) output[key] = assoc;
}
return output;
}
private static PreDiscoveryRequest ClonePreDiscoveryRequest(PreDiscoveryRequest source, string sourceKey)
{
return new PreDiscoveryRequest
{
SourceKey = sourceKey,
SourceKeyField = source.SourceKeyField,
DestinationEntity = source.DestinationEntity,
CredentialName = source.CredentialName,
DestinationKeyField = source.DestinationKeyField,
FieldMappings = source.FieldMappings,
RestClient = source.RestClient,
CurrentDataHash = source.CurrentDataHash,
EnablePreDiscovery = source.EnablePreDiscovery,
UseParallelMethod = source.UseParallelMethod,
IsScheduledTransfer = source.IsScheduledTransfer,
SourceType = source.SourceType
};
}
/// <summary> /// <summary>
/// Verifica se un'associazione è stata creata dal Pre-Discovery /// Verifica se un'associazione è stata creata dal Pre-Discovery
/// controllando il campo AdditionalInfo /// controllando il campo AdditionalInfo
@@ -285,6 +508,22 @@ public interface IAssociationService
{ {
Task<KeyAssociation?> FindOrCreateAssociationAsync(PreDiscoveryRequest request); Task<KeyAssociation?> FindOrCreateAssociationAsync(PreDiscoveryRequest request);
bool IsPreDiscoveryAssociation(KeyAssociation association); bool IsPreDiscoveryAssociation(KeyAssociation association);
/// <summary>
/// Versione bulk del find-or-create.
/// 1) Una sola query SQLite (WHERE KeyValue IN …) per recuperare le associazioni esistenti.
/// 2) Per le chiavi non trovate localmente, una manciata di SOQL "IN" su Salesforce
/// (~200 chiavi per query, Composite API: ceil(K/25) HTTP call) invece di K chiamate singole.
/// 3) Le associazioni Pre-Discovery scoperte vengono salvate e restituite.
/// </summary>
/// <param name="sourceKeys">Lista (non vuota) dei valori chiave sorgente per tutti i record da analizzare.</param>
/// <param name="commonRequest">Parametri condivisi (entity, credential, restClient, mappings, ecc.).
/// <see cref="PreDiscoveryRequest.SourceKey"/> e <see cref="PreDiscoveryRequest.CurrentDataHash"/>
/// sono ignorati; vengono presi dal parametro <paramref name="sourceKeys"/>.</param>
/// <returns>Dizionario KeyValue → KeyAssociation (solo per chiavi trovate/create).</returns>
Task<Dictionary<string, KeyAssociation>> BatchFindOrCreateAssociationsAsync(
IEnumerable<string> sourceKeys,
PreDiscoveryRequest commonRequest);
} }
/// <summary> /// <summary>
+189 -69
View File
@@ -1,5 +1,6 @@
using CredentialManager.Models; using CredentialManager.Models;
using DataConnection.CredentialManagement.Interfaces; using DataConnection.CredentialManagement.Interfaces;
using DataConnection.REST.Implementations;
using DataConnection.REST.Interfaces; using DataConnection.REST.Interfaces;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@@ -79,76 +80,18 @@ public class DeletionSyncService : IDeletionSyncService
_logger.LogInformation("Trovate {Count} cancellazioni in attesa di sincronizzazione", _logger.LogInformation("Trovate {Count} cancellazioni in attesa di sincronizzazione",
pendingDeletions.Count); pendingDeletions.Count);
// Step 3: Esegui le cancellazioni nella destinazione // Step 3: Esegui le cancellazioni nella destinazione.
foreach (var deletion in pendingDeletions) // Per Salesforce usiamo le Composite API in batch (ceil(N/25) HTTP call invece di N);
// per gli altri client REST manteniamo il loop sequenziale (nessun batch supportato).
if (restClient is SalesforceServiceClient salesforceClient)
{ {
try await ExecuteBatchedSalesforceDeletionsAsync(
{ salesforceClient, destinationEntity, pendingDeletions, options, result);
bool syncSuccess = false; }
string errorMessage = ""; else
{
switch (options.Action) await ExecuteSequentialDeletionsAsync(
{ restClient, destinationEntity, pendingDeletions, options, result);
case DeletionAction.Delete:
// Elimina fisicamente il record
syncSuccess = await DeleteRecordAsync(
restClient, destinationEntity, deletion.DestinationId);
break;
case DeletionAction.Deactivate:
// Marca il record come inattivo
syncSuccess = await DeactivateRecordAsync(
restClient, destinationEntity, deletion.DestinationId);
break;
case DeletionAction.Mark:
// Imposta un campo personalizzato
if (string.IsNullOrEmpty(options.MarkField) || string.IsNullOrEmpty(options.MarkValue))
{
errorMessage = "MarkField e MarkValue devono essere specificati per DeletionAction.Mark";
_logger.LogWarning(errorMessage);
result.Errors.Add($"KeyValue: {deletion.KeyValue} - {errorMessage}");
continue;
}
syncSuccess = await MarkRecordAsync(
restClient, destinationEntity, deletion.DestinationId,
options.MarkField, options.MarkValue);
break;
default:
errorMessage = $"Azione di cancellazione non supportata: {options.Action}";
_logger.LogWarning(errorMessage);
result.Errors.Add($"KeyValue: {deletion.KeyValue} - {errorMessage}");
continue;
}
if (syncSuccess)
{
// Marca la cancellazione come sincronizzata
await _credentialService.MarkDeletionSyncedAsync(deletion.Id);
result.DeletedRecordsSynced++;
_logger.LogInformation(
"Cancellazione sincronizzata: KeyValue={KeyValue}, DestinationId={DestinationId}, Action={Action}",
deletion.KeyValue, deletion.DestinationId, options.Action);
}
else
{
result.SyncErrors++;
var error = $"Errore nella sincronizzazione della cancellazione per KeyValue: {deletion.KeyValue}";
result.Errors.Add(error);
_logger.LogWarning(error);
}
}
catch (Exception ex)
{
result.SyncErrors++;
var error = $"Errore durante la sincronizzazione della cancellazione per KeyValue: {deletion.KeyValue} - {ex.Message}";
result.Errors.Add(error);
_logger.LogError(ex, "Errore nella sincronizzazione della cancellazione per {KeyValue}",
deletion.KeyValue);
}
} }
result.IsSuccess = result.SyncErrors == 0; result.IsSuccess = result.SyncErrors == 0;
@@ -170,6 +113,183 @@ public class DeletionSyncService : IDeletionSyncService
return result; return result;
} }
/// <summary>
/// Esegue le cancellazioni in batch via Salesforce Composite API.
/// Riduce N round-trip HTTP a ceil(N/25) batch in parallelo.
/// </summary>
private async Task ExecuteBatchedSalesforceDeletionsAsync(
SalesforceServiceClient salesforceClient,
string destinationEntity,
List<KeyAssociation> pendingDeletions,
DeletionSyncOptions options,
DeletionSyncResult result)
{
// Per Mark serve MarkField e MarkValue: validazione preventiva (un solo log)
if (options.Action == DeletionAction.Mark &&
(string.IsNullOrEmpty(options.MarkField) || string.IsNullOrEmpty(options.MarkValue)))
{
const string err = "MarkField e MarkValue devono essere specificati per DeletionAction.Mark";
_logger.LogWarning(err);
foreach (var d in pendingDeletions)
{
result.SyncErrors++;
result.Errors.Add($"KeyValue: {d.KeyValue} - {err}");
}
return;
}
// Mappa entityId → KeyAssociation per ricostruire l'associazione dal risultato batch
var deletionsById = pendingDeletions
.Where(d => !string.IsNullOrEmpty(d.DestinationId))
.GroupBy(d => d.DestinationId)
.ToDictionary(g => g.Key, g => g.First()); // se duplicati, prima occorrenza
var entityIds = deletionsById.Keys.ToList();
if (entityIds.Count == 0)
return;
_logger.LogInformation("DELETION SYNC (Salesforce batched): {Count} record, action={Action}",
entityIds.Count, options.Action);
List<DataConnection.REST.Implementations.SalesforceServiceClient.CompositeOperationResult> batchResults;
try
{
switch (options.Action)
{
case DeletionAction.Delete:
batchResults = await salesforceClient.BatchDeleteEntitiesAsync(destinationEntity, entityIds);
break;
case DeletionAction.Deactivate:
// Aggiorna IsActive/Active = false in batch.
// Non sappiamo a priori quale dei due campi esista sull'SObject: proviamo IsActive,
// se Salesforce ritorna errore il record verrà segnalato come fallito.
var deactivateUpdates = entityIds.ToDictionary(
id => id,
_ => (Dictionary<string, object>)new Dictionary<string, object>
{
{ "IsActive", false }
});
batchResults = await salesforceClient.BatchUpdateEntitiesAsync(destinationEntity, deactivateUpdates);
break;
case DeletionAction.Mark:
batchResults = await salesforceClient.BatchPatchSingleFieldAsync(
destinationEntity, entityIds, options.MarkField!, options.MarkValue!);
break;
default:
_logger.LogWarning("DELETION SYNC: azione non supportata: {Action}", options.Action);
foreach (var d in pendingDeletions)
{
result.SyncErrors++;
result.Errors.Add($"KeyValue: {d.KeyValue} - Azione non supportata: {options.Action}");
}
return;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "DELETION SYNC: errore nell'esecuzione del batch Salesforce");
foreach (var d in pendingDeletions)
{
result.SyncErrors++;
result.Errors.Add($"KeyValue: {d.KeyValue} - {ex.Message}");
}
return;
}
// Aggiorna lo stato delle cancellazioni in DB in parallelo per i record sincronizzati con successo
var markSyncedTasks = new List<Task>();
foreach (var br in batchResults)
{
if (!deletionsById.TryGetValue(br.EntityId ?? string.Empty, out var deletion))
continue;
if (br.Success)
{
result.DeletedRecordsSynced++;
markSyncedTasks.Add(_credentialService.MarkDeletionSyncedAsync(deletion.Id));
_logger.LogDebug(
"DELETION SYNC: KeyValue={KeyValue}, DestinationId={DestinationId}, Action={Action} OK",
deletion.KeyValue, deletion.DestinationId, options.Action);
}
else
{
result.SyncErrors++;
var msg = $"KeyValue: {deletion.KeyValue} - {br.ErrorMessage ?? "Unknown error"}";
result.Errors.Add(msg);
_logger.LogWarning("DELETION SYNC fallita: {Msg}", msg);
}
}
if (markSyncedTasks.Count > 0)
await Task.WhenAll(markSyncedTasks);
}
/// <summary>
/// Fallback sequenziale per client REST non Salesforce.
/// </summary>
private async Task ExecuteSequentialDeletionsAsync(
IRestServiceClient restClient,
string destinationEntity,
List<KeyAssociation> pendingDeletions,
DeletionSyncOptions options,
DeletionSyncResult result)
{
foreach (var deletion in pendingDeletions)
{
try
{
bool syncSuccess = false;
string errorMessage = "";
switch (options.Action)
{
case DeletionAction.Delete:
syncSuccess = await DeleteRecordAsync(restClient, destinationEntity, deletion.DestinationId);
break;
case DeletionAction.Deactivate:
syncSuccess = await DeactivateRecordAsync(restClient, destinationEntity, deletion.DestinationId);
break;
case DeletionAction.Mark:
if (string.IsNullOrEmpty(options.MarkField) || string.IsNullOrEmpty(options.MarkValue))
{
errorMessage = "MarkField e MarkValue devono essere specificati per DeletionAction.Mark";
_logger.LogWarning(errorMessage);
result.Errors.Add($"KeyValue: {deletion.KeyValue} - {errorMessage}");
continue;
}
syncSuccess = await MarkRecordAsync(restClient, destinationEntity, deletion.DestinationId,
options.MarkField, options.MarkValue);
break;
default:
errorMessage = $"Azione di cancellazione non supportata: {options.Action}";
_logger.LogWarning(errorMessage);
result.Errors.Add($"KeyValue: {deletion.KeyValue} - {errorMessage}");
continue;
}
if (syncSuccess)
{
await _credentialService.MarkDeletionSyncedAsync(deletion.Id);
result.DeletedRecordsSynced++;
}
else
{
result.SyncErrors++;
result.Errors.Add($"Errore nella sincronizzazione della cancellazione per KeyValue: {deletion.KeyValue}");
}
}
catch (Exception ex)
{
result.SyncErrors++;
result.Errors.Add($"KeyValue: {deletion.KeyValue} - {ex.Message}");
_logger.LogError(ex, "Errore nella sincronizzazione della cancellazione per {KeyValue}", deletion.KeyValue);
}
}
}
/// <summary> /// <summary>
/// Elimina fisicamente un record dalla destinazione /// Elimina fisicamente un record dalla destinazione
/// </summary> /// </summary>
@@ -895,10 +895,45 @@ public class ScheduledProfileExecutionService : IScheduledProfileExecutionServic
// Crea lista indicizzata per mantenere il record number // Crea lista indicizzata per mantenere il record number
var indexedRecords = sourceRecords.Select((record, index) => new { Record = record, RecordNumber = index + 1 }).ToList(); var indexedRecords = sourceRecords.Select((record, index) => new { Record = record, RecordNumber = index + 1 }).ToList();
_logger.LogInformation("COMPOSITE SCHEDULED: Inizio analisi parallela di {RecordCount} record", indexedRecords.Count); _logger.LogInformation("COMPOSITE SCHEDULED: Inizio analisi di {RecordCount} record", indexedRecords.Count);
var analysisStartTime = DateTime.UtcNow; var analysisStartTime = DateTime.UtcNow;
// Processa tutti i record in parallelo // === STEP A: Bulk Pre-Discovery (1 query SQLite + poche SOQL IN invece di 2N+N) ===
// Pre-calcolo locale: source key per ogni record (operazione thread-safe)
var sourceKeyByRecordIndex = new Dictionary<int, string>(indexedRecords.Count);
foreach (var idx in indexedRecords)
{
var key = GenerateSourceKey(idx.Record, profile.SourceKeyField);
if (!string.IsNullOrEmpty(key))
sourceKeyByRecordIndex[idx.RecordNumber] = key;
}
Dictionary<string, KeyAssociation> associationsByKey = new(StringComparer.Ordinal);
if (currentUseRecordAssociations && !string.IsNullOrEmpty(profile.SourceKeyField) && sourceKeyByRecordIndex.Count > 0)
{
var commonRequest = new PreDiscoveryRequest
{
SourceKeyField = profile.SourceKeyField,
DestinationEntity = currentEntityName,
CredentialName = currentCredentialName,
DestinationKeyField = "Id",
FieldMappings = fieldMappings,
RestClient = restClient,
EnablePreDiscovery = true,
UseParallelMethod = true,
IsScheduledTransfer = true,
SourceType = profile.SourceType
};
associationsByKey = await _associationService.BatchFindOrCreateAssociationsAsync(
sourceKeyByRecordIndex.Values, commonRequest);
_logger.LogInformation("COMPOSITE SCHEDULED: Bulk Pre-Discovery completata - {Found}/{Total} associazioni risolte",
associationsByKey.Count, sourceKeyByRecordIndex.Count);
}
// === STEP B: Analisi locale parallela per decidere create/update/skip ===
// Nessuna chiamata DB o REST in questo loop — solo memoria.
var processingTasks = indexedRecords.Select(async indexedRecord => var processingTasks = indexedRecords.Select(async indexedRecord =>
{ {
try try
@@ -916,49 +951,7 @@ public class ScheduledProfileExecutionService : IScheduledProfileExecutionServic
// Analizza le associazioni per capire se aggiornare, creare o saltare // Analizza le associazioni per capire se aggiornare, creare o saltare
if (currentUseRecordAssociations && !string.IsNullOrEmpty(sourceKey)) if (currentUseRecordAssociations && !string.IsNullOrEmpty(sourceKey))
{ {
_logger.LogDebug("COMPOSITE SCHEDULED: Cerco associazione per KeyValue: '{KeyValue}', Entity: '{Entity}', Credential: '{Credential}'", associationsByKey.TryGetValue(sourceKey, out var existingAssociation);
sourceKey, currentEntityName, currentCredentialName);
// Cerca associazione esistente usando il metodo parallelo
var existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(
sourceKey, currentEntityName, currentCredentialName);
// FALLBACK: Se non troviamo l'associazione con tutti i parametri, proviamo solo con il KeyValue
if (existingAssociation == null)
{
existingAssociation = await _dataConnectionCredentialService.FindKeyAssociationByValueParallelAsync(sourceKey);
if (existingAssociation != null)
{
// Verifica compatibilità
if (existingAssociation.DestinationEntity != currentEntityName ||
existingAssociation.RestCredentialName != currentCredentialName)
{
existingAssociation = null;
}
}
}
// 🔍 PRE-DISCOVERY: Usa il servizio centralizzato
if (existingAssociation == null && !string.IsNullOrEmpty(profile.SourceKeyField))
{
var preDiscoveryRequest = new PreDiscoveryRequest
{
SourceKey = sourceKey,
SourceKeyField = profile.SourceKeyField,
DestinationEntity = currentEntityName,
CredentialName = currentCredentialName,
DestinationKeyField = "Id",
FieldMappings = fieldMappings,
RestClient = restClient,
CurrentDataHash = currentDataHash,
EnablePreDiscovery = true,
UseParallelMethod = true, // Usa metodi paralleli thread-safe
IsScheduledTransfer = true,
SourceType = profile.SourceType
};
existingAssociation = await _associationService.FindOrCreateAssociationAsync(preDiscoveryRequest);
}
if (existingAssociation != null && existingAssociation.IsActive) if (existingAssociation != null && existingAssociation.IsActive)
{ {