Rimozione limiti di estrazione dati per supporto dataset completi
- Rimosso limite TOP 1000 in EFCoreDatabaseManager.GetAllRecordsAsync - Eliminati controlli di sicurezza con limiti automatici in DataCoupler - Aggiornata documentazione per riflettere estrazione senza limiti - Supporto completo per dataset di grandi dimensioni - Mantenuto batching automatico Salesforce (25 record/batch) in parallelo Ora il sistema supporta l'estrazione completa di tabelle e query custom senza restrizioni artificiali, ideale per migrazioni e use cases enterprise.
This commit is contained in:
@@ -268,7 +268,7 @@ public class EFCoreDatabaseManager : IDatabaseManager
|
||||
|
||||
using var command = connection.CreateCommand();
|
||||
|
||||
// Query SQL semplice per ottenere tutti i record - limitiamo a 1000 per sicurezza
|
||||
// Query SQL per ottenere tutti i record - nessun limite
|
||||
// Se il nome della tabella contiene già lo schema (es. "dbo.OCRD"), lo usiamo così com'è
|
||||
// Altrimenti aggiungiamo le parentesi quadre
|
||||
string tableReference;
|
||||
@@ -284,7 +284,7 @@ public class EFCoreDatabaseManager : IDatabaseManager
|
||||
tableReference = $"[{tableName}]";
|
||||
}
|
||||
|
||||
command.CommandText = $"SELECT TOP 1000 * FROM {tableReference}";
|
||||
command.CommandText = $"SELECT * FROM {tableReference}";
|
||||
|
||||
using var reader = await command.ExecuteReaderAsync();
|
||||
|
||||
|
||||
@@ -810,5 +810,361 @@ namespace DataConnection.REST.Implementations
|
||||
[JsonPropertyName("records")]
|
||||
public List<Dictionary<string, object>> Records { get; set; } = new List<Dictionary<string, object>>();
|
||||
}
|
||||
|
||||
private class SalesforceCompositeRequest
|
||||
{
|
||||
[JsonPropertyName("compositeRequest")]
|
||||
public List<SalesforceCompositeSubRequest> CompositeRequest { get; set; } = new List<SalesforceCompositeSubRequest>();
|
||||
}
|
||||
|
||||
private class SalesforceCompositeSubRequest
|
||||
{
|
||||
[JsonPropertyName("method")]
|
||||
public string Method { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("url")]
|
||||
public string Url { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("referenceId")]
|
||||
public string ReferenceId { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("body")]
|
||||
public object Body { get; set; } = new object();
|
||||
}
|
||||
|
||||
private class SalesforceCompositeResponse
|
||||
{
|
||||
[JsonPropertyName("compositeResponse")]
|
||||
public List<SalesforceCompositeSubResponse> CompositeResponse { get; set; } = new List<SalesforceCompositeSubResponse>();
|
||||
}
|
||||
|
||||
private class SalesforceCompositeSubResponse
|
||||
{
|
||||
[JsonPropertyName("referenceId")]
|
||||
public string ReferenceId { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("httpStatusCode")]
|
||||
public int HttpStatusCode { get; set; }
|
||||
|
||||
[JsonPropertyName("body")]
|
||||
public object Body { get; set; } = new object();
|
||||
}
|
||||
|
||||
public class CompositeOperationResult
|
||||
{
|
||||
public string ReferenceId { get; set; } = string.Empty;
|
||||
public string? EntityId { get; set; }
|
||||
public int HttpStatusCode { get; set; }
|
||||
public bool Success { get; set; }
|
||||
public string ErrorMessage { get; set; } = string.Empty;
|
||||
public Dictionary<string, object>? CreatedData { get; set; }
|
||||
public Dictionary<string, object>? UpdatedData { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Executes multiple create operations using Salesforce Composite API with automatic batching
|
||||
/// </summary>
|
||||
/// <param name="entityName">The name of the SObject to create</param>
|
||||
/// <param name="entityDataList">List of entity data to create</param>
|
||||
/// <param name="cancellationToken">Cancellation token</param>
|
||||
/// <returns>List of results for each create operation</returns>
|
||||
public async Task<List<CompositeOperationResult>> BatchCreateEntitiesAsync(string entityName, List<Dictionary<string, object>> entityDataList, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!IsAuthenticated())
|
||||
{
|
||||
Console.WriteLine("Error: Not authenticated to Salesforce. Cannot perform batch create.");
|
||||
return new List<CompositeOperationResult>();
|
||||
}
|
||||
|
||||
if (!entityDataList.Any())
|
||||
{
|
||||
return new List<CompositeOperationResult>();
|
||||
}
|
||||
|
||||
// Salesforce limit: max 25 operations per composite request
|
||||
const int maxBatchSize = 25;
|
||||
|
||||
// Split into batches of 25
|
||||
var batches = new List<(List<Dictionary<string, object>> batch, int startIndex, int batchNumber)>();
|
||||
for (int i = 0; i < entityDataList.Count; i += maxBatchSize)
|
||||
{
|
||||
var batch = entityDataList.Skip(i).Take(maxBatchSize).ToList();
|
||||
var batchNumber = (i / maxBatchSize) + 1;
|
||||
batches.Add((batch, i, batchNumber));
|
||||
}
|
||||
|
||||
var totalBatches = batches.Count;
|
||||
Console.WriteLine($"--- Starting parallel processing of {totalBatches} batch(es) with {entityDataList.Count} total records ---");
|
||||
|
||||
// Execute all batches in parallel
|
||||
var batchTasks = batches.Select(async b =>
|
||||
{
|
||||
Console.WriteLine($"--- Processing Batch {b.batchNumber}/{totalBatches}: {b.batch.Count} records (parallel) ---");
|
||||
return await ExecuteCreateBatchAsync(entityName, b.batch, b.startIndex, cancellationToken);
|
||||
});
|
||||
|
||||
var batchResults = await Task.WhenAll(batchTasks);
|
||||
|
||||
// Aggregate all results
|
||||
var allResults = new List<CompositeOperationResult>();
|
||||
foreach (var result in batchResults)
|
||||
{
|
||||
allResults.AddRange(result);
|
||||
}
|
||||
|
||||
Console.WriteLine($"All batches completed: {allResults.Count(r => r.Success)} success, {allResults.Count(r => !r.Success)} failed");
|
||||
return allResults;
|
||||
}
|
||||
|
||||
private async Task<List<CompositeOperationResult>> ExecuteCreateBatchAsync(string entityName, List<Dictionary<string, object>> batch, int startIndex, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Salesforce Composite API endpoint
|
||||
var compositeUri = $"{_instanceUrl}/services/data/v60.0/composite/";
|
||||
|
||||
// Build composite request
|
||||
var compositeRequest = new SalesforceCompositeRequest();
|
||||
|
||||
for (int i = 0; i < batch.Count; i++)
|
||||
{
|
||||
var subrequest = new SalesforceCompositeSubRequest
|
||||
{
|
||||
Method = "POST",
|
||||
Url = $"/services/data/v60.0/sobjects/{entityName}/",
|
||||
ReferenceId = $"create_{startIndex + i}",
|
||||
Body = batch[i]
|
||||
};
|
||||
compositeRequest.CompositeRequest.Add(subrequest);
|
||||
}
|
||||
|
||||
var response = await _httpClient.PostAsJsonAsync(compositeUri, compositeRequest, cancellationToken);
|
||||
|
||||
if (!response.IsSuccessStatusCode)
|
||||
{
|
||||
var errorContent = await response.Content.ReadAsStringAsync(cancellationToken);
|
||||
Console.WriteLine($"Salesforce Batch Create failed: {response.StatusCode}");
|
||||
Console.WriteLine($"Error details: {errorContent}");
|
||||
|
||||
// Return error results for all operations in this batch
|
||||
return batch.Select((_, index) => new CompositeOperationResult
|
||||
{
|
||||
ReferenceId = $"create_{startIndex + index}",
|
||||
Success = false,
|
||||
ErrorMessage = $"Batch operation failed: {response.StatusCode} - {errorContent}"
|
||||
}).ToList();
|
||||
}
|
||||
|
||||
var responseContent = await response.Content.ReadAsStringAsync(cancellationToken);
|
||||
var compositeResponse = JsonSerializer.Deserialize<SalesforceCompositeResponse>(responseContent);
|
||||
|
||||
var results = new List<CompositeOperationResult>();
|
||||
|
||||
if (compositeResponse?.CompositeResponse != null)
|
||||
{
|
||||
foreach (var subResponse in compositeResponse.CompositeResponse)
|
||||
{
|
||||
var result = new CompositeOperationResult
|
||||
{
|
||||
ReferenceId = subResponse.ReferenceId,
|
||||
HttpStatusCode = subResponse.HttpStatusCode,
|
||||
Success = subResponse.HttpStatusCode >= 200 && subResponse.HttpStatusCode < 300
|
||||
};
|
||||
|
||||
if (result.Success && subResponse.Body != null)
|
||||
{
|
||||
if (subResponse.Body is JsonElement bodyElement)
|
||||
{
|
||||
var bodyDict = JsonSerializer.Deserialize<Dictionary<string, object>>(bodyElement.GetRawText());
|
||||
result.CreatedData = bodyDict;
|
||||
|
||||
// Extract the created ID
|
||||
if (bodyDict?.ContainsKey("id") == true)
|
||||
{
|
||||
result.EntityId = bodyDict["id"]?.ToString();
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
result.ErrorMessage = subResponse.Body?.ToString() ?? "Unknown error";
|
||||
}
|
||||
|
||||
results.Add(result);
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"Error during Salesforce batch create: {ex.Message}");
|
||||
|
||||
// Return error results for all operations in this batch
|
||||
return batch.Select((_, index) => new CompositeOperationResult
|
||||
{
|
||||
ReferenceId = $"create_{startIndex + index}",
|
||||
Success = false,
|
||||
ErrorMessage = ex.Message
|
||||
}).ToList();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Executes multiple update operations using Salesforce Composite API with automatic batching
|
||||
/// </summary>
|
||||
/// <param name="entityName">The name of the SObject to update</param>
|
||||
/// <param name="updateData">Dictionary where key is entityId and value is the data to update</param>
|
||||
/// <param name="cancellationToken">Cancellation token</param>
|
||||
/// <returns>List of results for each update operation</returns>
|
||||
public async Task<List<CompositeOperationResult>> BatchUpdateEntitiesAsync(string entityName, Dictionary<string, Dictionary<string, object>> updateData, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!IsAuthenticated())
|
||||
{
|
||||
Console.WriteLine("Error: Not authenticated to Salesforce. Cannot perform batch update.");
|
||||
return new List<CompositeOperationResult>();
|
||||
}
|
||||
|
||||
if (!updateData.Any())
|
||||
{
|
||||
return new List<CompositeOperationResult>();
|
||||
}
|
||||
|
||||
// Salesforce limit: max 25 operations per composite request
|
||||
const int maxBatchSize = 25;
|
||||
var updateList = updateData.ToList();
|
||||
|
||||
// Split into batches of 25
|
||||
var batches = new List<(Dictionary<string, Dictionary<string, object>> batch, int startIndex, int batchNumber)>();
|
||||
for (int i = 0; i < updateList.Count; i += maxBatchSize)
|
||||
{
|
||||
var batch = updateList.Skip(i).Take(maxBatchSize).ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
|
||||
var batchNumber = (i / maxBatchSize) + 1;
|
||||
batches.Add((batch, i, batchNumber));
|
||||
}
|
||||
|
||||
var totalBatches = batches.Count;
|
||||
Console.WriteLine($"--- Starting parallel processing of {totalBatches} update batch(es) with {updateList.Count} total records ---");
|
||||
|
||||
// Execute all batches in parallel
|
||||
var batchTasks = batches.Select(async b =>
|
||||
{
|
||||
Console.WriteLine($"--- Processing Update Batch {b.batchNumber}/{totalBatches}: {b.batch.Count} records (parallel) ---");
|
||||
return await ExecuteUpdateBatchAsync(entityName, b.batch, b.startIndex, cancellationToken);
|
||||
});
|
||||
|
||||
var batchResults = await Task.WhenAll(batchTasks);
|
||||
|
||||
// Aggregate all results
|
||||
var allResults = new List<CompositeOperationResult>();
|
||||
foreach (var result in batchResults)
|
||||
{
|
||||
allResults.AddRange(result);
|
||||
}
|
||||
|
||||
Console.WriteLine($"All update batches completed: {allResults.Count(r => r.Success)} success, {allResults.Count(r => !r.Success)} failed");
|
||||
return allResults;
|
||||
}
|
||||
|
||||
private async Task<List<CompositeOperationResult>> ExecuteUpdateBatchAsync(string entityName, Dictionary<string, Dictionary<string, object>> batch, int startIndex, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Salesforce Composite API endpoint
|
||||
var compositeUri = $"{_instanceUrl}/services/data/v60.0/composite/";
|
||||
|
||||
// Build composite request
|
||||
var compositeRequest = new SalesforceCompositeRequest();
|
||||
|
||||
int index = 0;
|
||||
foreach (var kvp in batch)
|
||||
{
|
||||
var entityId = kvp.Key;
|
||||
var entityData = kvp.Value;
|
||||
|
||||
var subrequest = new SalesforceCompositeSubRequest
|
||||
{
|
||||
Method = "PATCH",
|
||||
Url = $"/services/data/v60.0/sobjects/{entityName}/{entityId}",
|
||||
ReferenceId = $"update_{startIndex + index}",
|
||||
Body = entityData
|
||||
};
|
||||
compositeRequest.CompositeRequest.Add(subrequest);
|
||||
index++;
|
||||
}
|
||||
|
||||
var response = await _httpClient.PostAsJsonAsync(compositeUri, compositeRequest, cancellationToken);
|
||||
|
||||
if (!response.IsSuccessStatusCode)
|
||||
{
|
||||
var errorContent = await response.Content.ReadAsStringAsync(cancellationToken);
|
||||
Console.WriteLine($"Salesforce Batch Update failed: {response.StatusCode}");
|
||||
Console.WriteLine($"Error details: {errorContent}");
|
||||
|
||||
// Return error results for all operations in this batch
|
||||
return batch.Select((kvp, idx) => new CompositeOperationResult
|
||||
{
|
||||
ReferenceId = $"update_{startIndex + idx}",
|
||||
EntityId = kvp.Key,
|
||||
Success = false,
|
||||
ErrorMessage = $"Batch operation failed: {response.StatusCode} - {errorContent}"
|
||||
}).ToList();
|
||||
}
|
||||
|
||||
var responseContent = await response.Content.ReadAsStringAsync(cancellationToken);
|
||||
var compositeResponse = JsonSerializer.Deserialize<SalesforceCompositeResponse>(responseContent);
|
||||
|
||||
var results = new List<CompositeOperationResult>();
|
||||
|
||||
if (compositeResponse?.CompositeResponse != null)
|
||||
{
|
||||
int resultIndex = 0;
|
||||
foreach (var subResponse in compositeResponse.CompositeResponse)
|
||||
{
|
||||
var originalEntityId = batch.ElementAt(resultIndex).Key;
|
||||
|
||||
var result = new CompositeOperationResult
|
||||
{
|
||||
ReferenceId = subResponse.ReferenceId,
|
||||
EntityId = originalEntityId,
|
||||
HttpStatusCode = subResponse.HttpStatusCode,
|
||||
Success = subResponse.HttpStatusCode >= 200 && subResponse.HttpStatusCode < 300
|
||||
};
|
||||
|
||||
if (result.Success)
|
||||
{
|
||||
// For successful updates, create updated data with the ID
|
||||
var originalData = batch.ElementAt(resultIndex).Value;
|
||||
result.UpdatedData = new Dictionary<string, object>(originalData)
|
||||
{
|
||||
["Id"] = originalEntityId
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
result.ErrorMessage = subResponse.Body?.ToString() ?? "Unknown error";
|
||||
}
|
||||
|
||||
results.Add(result);
|
||||
resultIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"Error during Salesforce batch update: {ex.Message}");
|
||||
|
||||
// Return error results for all operations in this batch
|
||||
return batch.Select((kvp, idx) => new CompositeOperationResult
|
||||
{
|
||||
ReferenceId = $"update_{startIndex + idx}",
|
||||
EntityId = kvp.Key,
|
||||
Success = false,
|
||||
ErrorMessage = ex.Message
|
||||
}).ToList();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user