feat: Implementazione completa sistema schedulazione con intervalli personalizzati

- Aggiunto supporto schedulazione con intervalli flessibili (secondi/minuti/ore/giorni/settimane/mesi)
- Esteso modello ProfileSchedule con campi IntervalValue e IntervalUnit
- Ottimizzato ScheduledJobService per controlli ogni 30s con esecuzione parallela
- Implementata interfaccia UI completa con anteprima real-time in italiano
- Aggiunta migrazione database AddIntervalSchedulingFields
- Implementati metodi calcolo NextExecutionTime per intervalli
- Aggiunta gestione tracking anti-duplicati e cleanup automatico
- Creata documentazione completa (6 file, 2500+ righe)

Modifiche tecniche:
- ProfileSchedule.cs: Nuovi campi e metodi CalculateNextInterval/GetScheduleDescription
- ScheduledJobService.cs: Ridotto check interval a 30s, aggiunto parallel processing
- ProfileScheduleService.cs: Supporto calcolo intervalli in UpdateNextExecutionTimeAsync
- Scheduling.razor: Aggiunta sezione UI per configurazione intervalli
- Scheduling.razor.cs: Implementato GetIntervalPreview() e gestione stato campi
This commit is contained in:
2025-10-02 01:12:39 +02:00
parent b76a6760fb
commit d042863a56
71 changed files with 17860 additions and 144 deletions
+168 -46
View File
@@ -10,6 +10,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.Data.SqlClient;
using CredentialManager.Migrations;
namespace DataConnection.EF;
@@ -101,41 +102,87 @@ public class EFCoreDatabaseManager : IDatabaseManager
return await _context.Set<T>().FromSqlRaw(sql, parameters).ToListAsync();
}
public async Task<List<Dictionary<string, object>>> ExecuteRawQueryAsync(string sql, params object[] parameters)
public async Task<List<Dictionary<string, object>>> ExecuteRawQueryAsync(string sql, string databaseName, params object[] parameters)
{
using var command = _context.Database.GetDbConnection().CreateCommand();
command.CommandText = sql;
// Aggiungi i parametri
for (int i = 0; i < parameters.Length; i++)
try
{
var parameter = command.CreateParameter();
parameter.ParameterName = $"@p{i}";
parameter.Value = parameters[i] ?? DBNull.Value;
command.Parameters.Add(parameter);
}
await _context.Database.OpenConnectionAsync();
var results = new List<Dictionary<string, object>>();
using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
var row = new Dictionary<string, object>();
for (int i = 0; i < reader.FieldCount; i++)
// Assicurarsi che la connessione sia aperta
if (_context.Database.GetDbConnection().State != ConnectionState.Open)
{
var columnName = reader.GetName(i);
var value = reader.IsDBNull(i) ? null : reader.GetValue(i);
row[columnName] = value ?? ""; // Usa stringa vuota invece di null
await _context.Database.OpenConnectionAsync();
}
results.Add(row);
var connection = _context.Database.GetDbConnection();
// Debug: verifica il database attuale della connessione
Console.WriteLine($"ExecuteRawQueryAsync: Database connessione prima: {connection.Database}");
Console.WriteLine($"ExecuteRawQueryAsync: Connection string: {connection.ConnectionString}");
// Estrai il database target dalla connection string del DbContext
var connectionString = _context.Database.GetConnectionString();
Console.WriteLine($"ExecuteRawQueryAsync: Connection string completa: {connectionString}");
if (!string.IsNullOrEmpty(connectionString))
{
//var builder = new SqlConnectionStringBuilder(connectionString);
var targetDatabase = databaseName;
Console.WriteLine($"ExecuteRawQueryAsync: InitialCatalog dalla connection string: '{targetDatabase}'");
Console.WriteLine($"ExecuteRawQueryAsync: Database corrente connessione: '{connection.Database}'");
// Se il database della connessione non corrisponde al target, forza il cambio
if (!string.IsNullOrEmpty(targetDatabase) && !string.Equals(connection.Database, targetDatabase, StringComparison.OrdinalIgnoreCase))
{
Console.WriteLine($"ExecuteRawQueryAsync: MISMATCH RILEVATO! Forzando cambio database da '{connection.Database}' a '{targetDatabase}'");
await connection.ChangeDatabaseAsync(targetDatabase);
Console.WriteLine($"ExecuteRawQueryAsync: Database connessione dopo cambio: '{connection.Database}'");
}
else
{
Console.WriteLine($"ExecuteRawQueryAsync: Database già corretto: '{connection.Database}' = '{targetDatabase}'");
}
}
else
{
Console.WriteLine("ExecuteRawQueryAsync: ATTENZIONE! Connection string è vuota!");
}
using var command = connection.CreateCommand();
command.CommandText = sql;
// Aggiungi i parametri
for (int i = 0; i < parameters.Length; i++)
{
var parameter = command.CreateParameter();
parameter.ParameterName = $"@p{i}";
parameter.Value = parameters[i] ?? DBNull.Value;
command.Parameters.Add(parameter);
}
var results = new List<Dictionary<string, object>>();
using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
var row = new Dictionary<string, object>();
for (int i = 0; i < reader.FieldCount; i++)
{
var columnName = reader.GetName(i);
var value = reader.IsDBNull(i) ? null : reader.GetValue(i);
row[columnName] = value ?? ""; // Usa stringa vuota invece di null
}
results.Add(row);
}
return results;
}
catch (Exception ex)
{
Console.WriteLine($"Errore nell'esecuzione della query raw: {ex.Message}");
throw;
}
return results;
}
public async Task<int> ExecuteCommandAsync(string sql, params object[] parameters)
@@ -148,15 +195,20 @@ public class EFCoreDatabaseManager : IDatabaseManager
try
{
// Assicurarsi che il contesto sia connesso
await _context.Database.OpenConnectionAsync();
if (_context.Database.GetDbConnection().State != ConnectionState.Open)
{
await _context.Database.OpenConnectionAsync();
}
// Usa la factory per ottenere il provider appropriato in base al tipo di database
var schemaProvider = DatabaseSchemaProviderFactory.CreateProvider(_options.DatabaseType);
// Usa il provider per ottenere lo schema
// Usa il provider per ottenere lo schema utilizzando la connection string corrente del DbContext
var connectionString = _context.Database.GetConnectionString();
if (connectionString == null)
throw new InvalidOperationException("Connection string is null");
Console.WriteLine($"GetDatabaseSchemaAsync: Utilizzo connection string: {connectionString}");
var result = await schemaProvider.GetDatabaseSchemaAsync(connectionString);
@@ -180,7 +232,7 @@ public class EFCoreDatabaseManager : IDatabaseManager
// Usa la factory per ottenere il provider appropriato in base al tipo di database
var schemaProvider = DatabaseSchemaProviderFactory.CreateProvider(_options.DatabaseType);
// Usa il provider per ottenere la lista dei database
// Usa il provider per ottenere la lista dei database utilizzando la connection string corrente del DbContext
var connectionString = _context.Database.GetConnectionString();
if (connectionString == null)
throw new InvalidOperationException("Connection string is null");
@@ -207,7 +259,7 @@ public class EFCoreDatabaseManager : IDatabaseManager
// Usa la factory per ottenere il provider appropriato in base al tipo di database
var schemaProvider = DatabaseSchemaProviderFactory.CreateProvider(_options.DatabaseType);
// Usa il provider per ottenere la lista delle tabelle
// Usa il provider per ottenere la lista delle tabelle utilizzando la connection string corrente del DbContext
var connectionString = _context.Database.GetConnectionString();
if (connectionString == null)
throw new InvalidOperationException("Connection string is null");
@@ -235,7 +287,7 @@ public class EFCoreDatabaseManager : IDatabaseManager
// Usa la factory per ottenere il provider appropriato in base al tipo di database
var schemaProvider = DatabaseSchemaProviderFactory.CreateProvider(_options.DatabaseType);
// Usa il provider per ottenere lo schema della tabella
// Usa il provider per ottenere lo schema della tabella utilizzando la connection string corrente del DbContext
var connectionString = _context.Database.GetConnectionString();
if (connectionString == null)
throw new InvalidOperationException("Connection string is null");
@@ -257,14 +309,45 @@ public class EFCoreDatabaseManager : IDatabaseManager
{
var records = new List<Dictionary<string, object>>();
// Usa la stessa connection string utilizzata per il discovery dello schema
var connectionString = _context.Database.GetConnectionString();
if (connectionString == null)
throw new InvalidOperationException("Connection string is null");
// Usa la connessione del DbContext che è stata aggiornata se è stato chiamato ChangeDatabaseAsync
if (_context.Database.GetDbConnection().State != ConnectionState.Open)
{
await _context.Database.OpenConnectionAsync();
}
// Determina il tipo di connessione in base al DatabaseType
using var connection = CreateConnection(connectionString);
await connection.OpenAsync();
var connection = _context.Database.GetDbConnection();
// Debug: verifica il database attuale della connessione
Console.WriteLine($"GetAllRecordsAsync: Database connessione prima: {connection.Database}");
// Estrai il database target dalla connection string del DbContext
var connectionString = _context.Database.GetConnectionString();
Console.WriteLine($"GetAllRecordsAsync: Connection string completa: {connectionString}");
if (!string.IsNullOrEmpty(connectionString))
{
var builder = new SqlConnectionStringBuilder(connectionString);
var targetDatabase = builder.InitialCatalog;
Console.WriteLine($"GetAllRecordsAsync: InitialCatalog dalla connection string: '{targetDatabase}'");
Console.WriteLine($"GetAllRecordsAsync: Database corrente connessione: '{connection.Database}'");
// Se il database della connessione non corrisponde al target, forza il cambio
if (!string.IsNullOrEmpty(targetDatabase) && !string.Equals(connection.Database, targetDatabase, StringComparison.OrdinalIgnoreCase))
{
Console.WriteLine($"GetAllRecordsAsync: MISMATCH RILEVATO! Forzando cambio database da '{connection.Database}' a '{targetDatabase}'");
await connection.ChangeDatabaseAsync(targetDatabase);
Console.WriteLine($"GetAllRecordsAsync: Database connessione dopo cambio: '{connection.Database}'");
}
else
{
Console.WriteLine($"GetAllRecordsAsync: Database già corretto: '{connection.Database}' = '{targetDatabase}'");
}
}
else
{
Console.WriteLine("GetAllRecordsAsync: ATTENZIONE! Connection string è vuota!");
}
using var command = connection.CreateCommand();
@@ -319,9 +402,13 @@ public class EFCoreDatabaseManager : IDatabaseManager
if (currentConnectionString == null)
throw new InvalidOperationException("Connection string is null");
Console.WriteLine($"ChangeDatabaseAsync: Connessione corrente: {currentConnectionString}");
// Crea una nuova connection string con il database specificato
var newConnectionString = UpdateConnectionStringDatabase(currentConnectionString, databaseName);
Console.WriteLine($"ChangeDatabaseAsync: Nuova connessione: {newConnectionString}");
// Ricrea il contesto con la nuova connection string
var optionsBuilder = new DbContextOptionsBuilder<ExistingDatabaseContext>();
@@ -350,6 +437,8 @@ public class EFCoreDatabaseManager : IDatabaseManager
// Testa la connessione al nuovo database
await _context.Database.OpenConnectionAsync();
Console.WriteLine($"ChangeDatabaseAsync: Connessione aggiornata verificata: {_context.Database.GetConnectionString()}");
}
catch (Exception ex)
{
@@ -406,12 +495,45 @@ public class EFCoreDatabaseManager : IDatabaseManager
{
try
{
var connectionString = _context.Database.GetConnectionString();
if (connectionString == null)
throw new InvalidOperationException("Connection string is null");
// Usa la connessione del DbContext che è stata aggiornata se è stato chiamato ChangeDatabaseAsync
if (_context.Database.GetDbConnection().State != ConnectionState.Open)
{
await _context.Database.OpenConnectionAsync();
}
using var connection = CreateConnection(connectionString);
await connection.OpenAsync();
var connection = _context.Database.GetDbConnection();
// Debug: verifica il database attuale della connessione
Console.WriteLine($"GetPrimaryKeyFieldAsync: Database connessione prima: {connection.Database}");
// Estrai il database target dalla connection string del DbContext
var connectionString = _context.Database.GetConnectionString();
Console.WriteLine($"GetPrimaryKeyFieldAsync: Connection string completa: {connectionString}");
if (!string.IsNullOrEmpty(connectionString))
{
var builder = new SqlConnectionStringBuilder(connectionString);
var targetDatabase = builder.InitialCatalog;
Console.WriteLine($"GetPrimaryKeyFieldAsync: InitialCatalog dalla connection string: '{targetDatabase}'");
Console.WriteLine($"GetPrimaryKeyFieldAsync: Database corrente connessione: '{connection.Database}'");
// Se il database della connessione non corrisponde al target, forza il cambio
if (!string.IsNullOrEmpty(targetDatabase) && !string.Equals(connection.Database, targetDatabase, StringComparison.OrdinalIgnoreCase))
{
Console.WriteLine($"GetPrimaryKeyFieldAsync: MISMATCH RILEVATO! Forzando cambio database da '{connection.Database}' a '{targetDatabase}'");
await connection.ChangeDatabaseAsync(targetDatabase);
Console.WriteLine($"GetPrimaryKeyFieldAsync: Database connessione dopo cambio: '{connection.Database}'");
}
else
{
Console.WriteLine($"GetPrimaryKeyFieldAsync: Database già corretto: '{connection.Database}' = '{targetDatabase}'");
}
}
else
{
Console.WriteLine("GetPrimaryKeyFieldAsync: ATTENZIONE! Connection string è vuota!");
}
using var command = connection.CreateCommand();
@@ -44,7 +44,7 @@ public interface IDatabaseManager : IDisposable
/// <summary>
/// Esegue una query SQL raw e restituisce i risultati come dictionary
/// </summary>
Task<List<Dictionary<string, object>>> ExecuteRawQueryAsync(string sql, params object[] parameters);
Task<List<Dictionary<string, object>>> ExecuteRawQueryAsync(string sql, string databaseName = "", params object[] parameters);
/// <summary>
/// Esegue un comando SQL che non restituisce risultati
@@ -558,6 +558,652 @@ namespace DataConnection.REST.Implementations
Console.WriteLine($"Error during Salesforce entity search: {ex.Message}");
return new List<Dictionary<string, object>>();
}
}
/// <summary>
/// Executes multiple queries using Salesforce Composite API for efficient data extraction
/// </summary>
/// <param name="queries">List of SOQL queries to execute</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>List of query results mapped by query index</returns>
public async Task<List<BatchQueryResult>> BatchExecuteQueriesAsync(List<string> queries, CancellationToken cancellationToken = default)
{
if (!IsAuthenticated())
{
Console.WriteLine("Error: Not authenticated to Salesforce. Cannot perform batch queries.");
return new List<BatchQueryResult>();
}
if (!queries.Any())
{
return new List<BatchQueryResult>();
}
// Salesforce limit: max 25 operations per composite request
const int maxBatchSize = 25;
// Split into batches of 25
var batches = new List<(List<string> batch, int startIndex, int batchNumber)>();
for (int i = 0; i < queries.Count; i += maxBatchSize)
{
var batch = queries.Skip(i).Take(maxBatchSize).ToList();
var batchNumber = (i / maxBatchSize) + 1;
batches.Add((batch, i, batchNumber));
}
var totalBatches = batches.Count;
Console.WriteLine($"--- Starting parallel execution of {totalBatches} query batch(es) with {queries.Count} total queries ---");
// Execute all batches in parallel
var batchTasks = batches.Select(async b =>
{
Console.WriteLine($"--- Processing Query Batch {b.batchNumber}/{totalBatches}: {b.batch.Count} queries (parallel) ---");
return await ExecuteQueryBatchAsync(b.batch, b.startIndex, cancellationToken);
});
var batchResults = await Task.WhenAll(batchTasks);
// Aggregate all results maintaining original order
var allResults = new List<BatchQueryResult>();
foreach (var result in batchResults)
{
allResults.AddRange(result);
}
Console.WriteLine($"All query batches completed: {allResults.Count(r => r.Success)} success, {allResults.Count(r => !r.Success)} failed");
return allResults.OrderBy(r => r.QueryIndex).ToList();
}
private async Task<List<BatchQueryResult>> ExecuteQueryBatchAsync(List<string> queries, int startIndex, CancellationToken cancellationToken)
{
try
{
// Salesforce Composite API endpoint
var compositeUri = $"{_instanceUrl}/services/data/v60.0/composite/";
// Build composite request
var compositeRequest = new SalesforceCompositeRequest();
for (int i = 0; i < queries.Count; i++)
{
var encodedQuery = Uri.EscapeDataString(queries[i]);
var subrequest = new SalesforceCompositeSubRequest
{
Method = "GET",
Url = $"/services/data/v60.0/query/?q={encodedQuery}",
ReferenceId = $"query_{startIndex + i}"
};
compositeRequest.CompositeRequest.Add(subrequest);
}
var jsonContent = new StringContent(
JsonSerializer.Serialize(compositeRequest, SalesforceJsonOptions),
System.Text.Encoding.UTF8,
"application/json"
);
var response = await _httpClient.PostAsync(compositeUri, jsonContent, cancellationToken);
if (!response.IsSuccessStatusCode)
{
var errorContent = await response.Content.ReadAsStringAsync(cancellationToken);
Console.WriteLine($"Salesforce Batch Query failed: {response.StatusCode}");
Console.WriteLine($"Error details: {errorContent}");
// Return error results for all queries in this batch
return queries.Select((query, index) => new BatchQueryResult
{
QueryIndex = startIndex + index,
Query = query,
Success = false,
ErrorMessage = $"Batch operation failed: {response.StatusCode} - {errorContent}",
Records = new List<Dictionary<string, object>>()
}).ToList();
}
var responseContent = await response.Content.ReadAsStringAsync(cancellationToken);
var compositeResponse = JsonSerializer.Deserialize<SalesforceCompositeResponse>(responseContent, SalesforceJsonOptions);
var results = new List<BatchQueryResult>();
if (compositeResponse?.CompositeResponse != null)
{
for (int i = 0; i < compositeResponse.CompositeResponse.Count; i++)
{
var subResponse = compositeResponse.CompositeResponse[i];
var originalQuery = i < queries.Count ? queries[i] : "";
var result = new BatchQueryResult
{
QueryIndex = startIndex + i,
Query = originalQuery,
Success = subResponse.HttpStatusCode >= 200 && subResponse.HttpStatusCode < 300
};
if (result.Success && subResponse.Body != null)
{
try
{
if (subResponse.Body is JsonElement bodyElement)
{
var queryResponse = JsonSerializer.Deserialize<SalesforceQueryResponse>(bodyElement.GetRawText(), SalesforceJsonOptions);
result.Records = queryResponse?.Records ?? new List<Dictionary<string, object>>();
result.TotalSize = queryResponse?.TotalSize ?? 0;
result.NextRecordsUrl = queryResponse?.NextRecordsUrl;
}
}
catch (JsonException ex)
{
result.Success = false;
result.ErrorMessage = $"Failed to parse query response: {ex.Message}";
result.Records = new List<Dictionary<string, object>>();
}
}
else
{
result.ErrorMessage = subResponse.Body?.ToString() ?? "Unknown error";
result.Records = new List<Dictionary<string, object>>();
}
results.Add(result);
}
}
return results;
}
catch (Exception ex)
{
Console.WriteLine($"Error during Salesforce batch query: {ex.Message}");
// Return error results for all queries in this batch
return queries.Select((query, index) => new BatchQueryResult
{
QueryIndex = startIndex + index,
Query = query,
Success = false,
ErrorMessage = ex.Message,
Records = new List<Dictionary<string, object>>()
}).ToList();
}
}
/// <summary>
/// Finds multiple entities by different key fields using batch queries for improved performance
/// </summary>
/// <param name="entityName">The name of the SObject to search</param>
/// <param name="keyFieldsList">List of key field combinations to search for</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>Dictionary mapping original search index to found entities</returns>
public async Task<Dictionary<int, List<Dictionary<string, object>>>> BatchFindEntitiesByKeysAsync(string entityName, List<Dictionary<string, object>> keyFieldsList, CancellationToken cancellationToken = default)
{
try
{
Console.WriteLine($"--- Starting Salesforce Batch Entity Search: {entityName} ---");
Console.WriteLine($"Searching for {keyFieldsList.Count} different key combinations");
if (!await EnsureAuthenticatedAsync(cancellationToken))
{
Console.WriteLine("Authentication failed for batch entity search");
return new Dictionary<int, List<Dictionary<string, object>>>();
}
// Build queries for each key field combination
var queries = new List<string>();
for (int i = 0; i < keyFieldsList.Count; i++)
{
var keyFields = keyFieldsList[i];
if (!keyFields.Any()) continue;
var whereConditions = keyFields.Select(kvp =>
{
var value = kvp.Value?.ToString() ?? "";
if (kvp.Value is string)
{
value = $"'{value.Replace("'", "\\'")}'";
}
return $"{kvp.Key} = {value}";
});
var query = $"SELECT Id FROM {entityName} WHERE {string.Join(" AND ", whereConditions)}";
queries.Add(query);
Console.WriteLine($"Query {i}: {query}");
}
if (!queries.Any())
{
Console.WriteLine("No valid queries generated for batch search");
return new Dictionary<int, List<Dictionary<string, object>>>();
}
// Execute batch queries
var batchResults = await BatchExecuteQueriesAsync(queries, cancellationToken);
// Map results back to original indices
var results = new Dictionary<int, List<Dictionary<string, object>>>();
foreach (var batchResult in batchResults)
{
results[batchResult.QueryIndex] = batchResult.Records;
}
var totalFound = results.Values.Sum(list => list.Count);
Console.WriteLine($"Batch entity search completed: {totalFound} total entities found across {results.Count} queries");
return results;
}
catch (Exception ex)
{
Console.WriteLine($"Error during Salesforce batch entity search: {ex.Message}");
return new Dictionary<int, List<Dictionary<string, object>>>();
}
}
/// <summary>
/// Extracts multiple entities by their IDs using batch queries
/// </summary>
/// <param name="entityName">The name of the SObject to retrieve</param>
/// <param name="entityIds">List of entity IDs to retrieve</param>
/// <param name="fieldsToSelect">Fields to select (if null, selects all fields)</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>List of retrieved entities</returns>
public async Task<List<Dictionary<string, object>>> BatchGetEntitiesByIdsAsync(string entityName, List<string> entityIds, List<string>? fieldsToSelect = null, CancellationToken cancellationToken = default)
{
try
{
Console.WriteLine($"--- Starting Salesforce Batch Entity Retrieval: {entityName} ---");
Console.WriteLine($"Retrieving {entityIds.Count} entities by ID");
if (!await EnsureAuthenticatedAsync(cancellationToken))
{
Console.WriteLine("Authentication failed for batch entity retrieval");
return new List<Dictionary<string, object>>();
}
if (!entityIds.Any())
{
return new List<Dictionary<string, object>>();
}
// Determine fields to select
string selectFields = "*";
if (fieldsToSelect?.Any() == true)
{
selectFields = string.Join(", ", fieldsToSelect);
}
else
{
// If no specific fields requested, get basic fields plus Id
selectFields = "Id"; // Start with Id, add more fields if needed
// Optionally, you could get entity metadata to select all fields
// For now, we'll use a basic set of common fields
var commonFields = new[] { "Name", "CreatedDate", "LastModifiedDate" };
selectFields = $"Id, {string.Join(", ", commonFields)}";
}
// Create batch queries - we can use IN clause to group multiple IDs per query
// Salesforce SOQL IN clause can handle up to 4000 characters
const int maxIdsPerQuery = 200; // Conservative limit to avoid URL length issues
var queries = new List<string>();
for (int i = 0; i < entityIds.Count; i += maxIdsPerQuery)
{
var batchIds = entityIds.Skip(i).Take(maxIdsPerQuery);
var idList = string.Join("','", batchIds.Select(id => id.Replace("'", "\\'")));
var query = $"SELECT {selectFields} FROM {entityName} WHERE Id IN ('{idList}')";
queries.Add(query);
}
Console.WriteLine($"Created {queries.Count} batch queries for {entityIds.Count} entity IDs");
// Execute batch queries
var batchResults = await BatchExecuteQueriesAsync(queries, cancellationToken);
// Aggregate all records from all queries
var allRecords = new List<Dictionary<string, object>>();
foreach (var batchResult in batchResults.Where(r => r.Success))
{
allRecords.AddRange(batchResult.Records);
}
Console.WriteLine($"Successfully retrieved {allRecords.Count} entities out of {entityIds.Count} requested");
return allRecords;
}
catch (Exception ex)
{
Console.WriteLine($"Error during Salesforce batch entity retrieval: {ex.Message}");
return new List<Dictionary<string, object>>();
}
}
/// <summary>
/// Extracts all records from an entity using pagination and batch processing
/// </summary>
/// <param name="entityName">The name of the SObject to extract</param>
/// <param name="fieldsToSelect">Specific fields to select (if null, uses common fields)</param>
/// <param name="whereClause">Optional WHERE clause for filtering</param>
/// <param name="maxRecords">Maximum number of records to retrieve (0 = no limit)</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>List of all extracted records</returns>
public async Task<List<Dictionary<string, object>>> ExtractAllEntitiesAsync(string entityName, List<string>? fieldsToSelect = null, string? whereClause = null, int maxRecords = 0, CancellationToken cancellationToken = default)
{
try
{
Console.WriteLine($"--- Starting Salesforce Full Entity Extraction: {entityName} ---");
if (!await EnsureAuthenticatedAsync(cancellationToken))
{
Console.WriteLine("Authentication failed for entity extraction");
return new List<Dictionary<string, object>>();
}
// Determine fields to select
string selectFields = "*";
if (fieldsToSelect?.Any() == true)
{
selectFields = string.Join(", ", fieldsToSelect);
}
else
{
// Use common fields if none specified
selectFields = "Id"; // Always include Id
// Get entity details to determine available fields
var entityDetails = await DiscoverEntityDetailsAsync(entityName, cancellationToken);
if (entityDetails?.Properties?.Any() == true)
{
// Select up to 10 most common fields to avoid URL length limits
var availableFields = entityDetails.Properties
.Where(p => !string.IsNullOrEmpty(p.Name))
.Take(10)
.Select(p => p.Name);
selectFields = string.Join(", ", availableFields);
}
}
// Build initial query
var query = $"SELECT {selectFields} FROM {entityName}";
if (!string.IsNullOrWhiteSpace(whereClause))
{
query += $" WHERE {whereClause}";
}
query += " ORDER BY Id"; // Add ordering for consistent pagination
Console.WriteLine($"Initial extraction query: {query}");
var allRecords = new List<Dictionary<string, object>>();
var currentQuery = query;
var hasMore = true;
var pageCount = 0;
while (hasMore && (maxRecords == 0 || allRecords.Count < maxRecords))
{
pageCount++;
Console.WriteLine($"Extracting page {pageCount}...");
var encodedQuery = Uri.EscapeDataString(currentQuery);
var queryEndpoint = $"/services/data/v60.0/query/?q={encodedQuery}";
var response = await GetAsync<SalesforceQueryResponse>($"{_instanceUrl}{queryEndpoint}", cancellationToken);
if (response?.Records != null)
{
var pageRecords = response.Records;
// Apply max records limit if specified
if (maxRecords > 0)
{
var remainingCapacity = maxRecords - allRecords.Count;
if (remainingCapacity < pageRecords.Count)
{
pageRecords = pageRecords.Take(remainingCapacity).ToList();
}
}
allRecords.AddRange(pageRecords);
Console.WriteLine($"Page {pageCount}: Retrieved {pageRecords.Count} records, total: {allRecords.Count}");
// Check if there are more records
hasMore = !response.Done && !string.IsNullOrEmpty(response.NextRecordsUrl);
if (hasMore && response.NextRecordsUrl != null)
{
// Use the nextRecordsUrl for the next query
currentQuery = response.NextRecordsUrl.Replace($"{_instanceUrl}/services/data/v60.0/query/", "");
currentQuery = Uri.UnescapeDataString(currentQuery);
}
}
else
{
Console.WriteLine($"No response received for page {pageCount}");
hasMore = false;
}
// Prevent infinite loops
if (pageCount > 1000)
{
Console.WriteLine("Maximum page limit reached (1000), stopping extraction");
break;
}
}
Console.WriteLine($"Entity extraction completed: {allRecords.Count} total records extracted in {pageCount} pages");
return allRecords;
}
catch (Exception ex)
{
Console.WriteLine($"Error during Salesforce entity extraction: {ex.Message}");
return new List<Dictionary<string, object>>();
}
}
/// <summary>
/// Extracts entities using multiple parallel queries with different criteria
/// Useful for extracting large datasets by splitting them by date ranges or other criteria
/// </summary>
/// <param name="entityName">The name of the SObject to extract</param>
/// <param name="fieldsToSelect">Specific fields to select</param>
/// <param name="whereClauses">List of WHERE clauses for parallel extraction</param>
/// <param name="maxRecordsPerQuery">Maximum records per individual query</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>Combined list of all extracted records</returns>
public async Task<List<Dictionary<string, object>>> ExtractEntitiesParallelAsync(string entityName, List<string>? fieldsToSelect, List<string> whereClauses, int maxRecordsPerQuery = 0, CancellationToken cancellationToken = default)
{
try
{
Console.WriteLine($"--- Starting Salesforce Parallel Entity Extraction: {entityName} ---");
Console.WriteLine($"Using {whereClauses.Count} parallel extraction criteria");
if (!await EnsureAuthenticatedAsync(cancellationToken))
{
Console.WriteLine("Authentication failed for parallel entity extraction");
return new List<Dictionary<string, object>>();
}
// Determine fields to select
string selectFields = "Id";
if (fieldsToSelect?.Any() == true)
{
selectFields = string.Join(", ", fieldsToSelect);
}
// Create extraction tasks for each WHERE clause
var extractionTasks = whereClauses.Select(async (whereClause, index) =>
{
try
{
Console.WriteLine($"Starting parallel extraction {index + 1}/{whereClauses.Count}: {whereClause}");
var records = await ExtractAllEntitiesAsync(entityName, fieldsToSelect, whereClause, maxRecordsPerQuery, cancellationToken);
Console.WriteLine($"Parallel extraction {index + 1} completed: {records.Count} records");
return records;
}
catch (Exception ex)
{
Console.WriteLine($"Error in parallel extraction {index + 1}: {ex.Message}");
return new List<Dictionary<string, object>>();
}
}).ToList();
// Wait for all parallel extractions to complete
var allResults = await Task.WhenAll(extractionTasks);
// Combine all results
var combinedRecords = new List<Dictionary<string, object>>();
foreach (var result in allResults)
{
combinedRecords.AddRange(result);
}
// Remove potential duplicates based on Id field
var deduplicatedRecords = combinedRecords
.GroupBy(record => record.ContainsKey("Id") ? record["Id"]?.ToString() : Guid.NewGuid().ToString())
.Select(group => group.First())
.ToList();
var duplicatesRemoved = combinedRecords.Count - deduplicatedRecords.Count;
if (duplicatesRemoved > 0)
{
Console.WriteLine($"Removed {duplicatesRemoved} duplicate records");
}
Console.WriteLine($"Parallel entity extraction completed: {deduplicatedRecords.Count} unique records from {whereClauses.Count} parallel queries");
return deduplicatedRecords;
}
catch (Exception ex)
{
Console.WriteLine($"Error during Salesforce parallel entity extraction: {ex.Message}");
return new List<Dictionary<string, object>>();
}
}
/// <summary>
/// Helper method to split large datasets into date-based chunks for parallel extraction
/// </summary>
/// <param name="startDate">Start date for extraction</param>
/// <param name="endDate">End date for extraction</param>
/// <param name="dateFieldName">Name of the date field to use for splitting (e.g., "CreatedDate", "LastModifiedDate")</param>
/// <param name="chunkSizeInDays">Size of each date chunk in days</param>
/// <returns>List of WHERE clauses for parallel extraction</returns>
public List<string> CreateDateBasedWhereClauses(DateTime startDate, DateTime endDate, string dateFieldName = "CreatedDate", int chunkSizeInDays = 30)
{
var whereClauses = new List<string>();
var currentDate = startDate;
while (currentDate < endDate)
{
var chunkEndDate = currentDate.AddDays(chunkSizeInDays);
if (chunkEndDate > endDate)
{
chunkEndDate = endDate;
}
var startDateString = currentDate.ToString("yyyy-MM-ddTHH:mm:ssZ");
var endDateString = chunkEndDate.ToString("yyyy-MM-ddTHH:mm:ssZ");
var whereClause = $"{dateFieldName} >= {startDateString} AND {dateFieldName} < {endDateString}";
whereClauses.Add(whereClause);
currentDate = chunkEndDate;
}
Console.WriteLine($"Created {whereClauses.Count} date-based chunks for parallel extraction between {startDate:yyyy-MM-dd} and {endDate:yyyy-MM-dd}");
return whereClauses;
}
/// <summary>
/// High-performance method to extract large datasets by automatically splitting them into optimal chunks
/// </summary>
/// <param name="entityName">The name of the SObject to extract</param>
/// <param name="fieldsToSelect">Specific fields to select</param>
/// <param name="baseWhereClause">Base WHERE clause (optional)</param>
/// <param name="maxRecords">Maximum total records to extract (0 = no limit)</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>List of all extracted records</returns>
public async Task<List<Dictionary<string, object>>> ExtractLargeDatasetAsync(string entityName, List<string>? fieldsToSelect = null, string? baseWhereClause = null, int maxRecords = 0, CancellationToken cancellationToken = default)
{
try
{
Console.WriteLine($"--- Starting Large Dataset Extraction: {entityName} ---");
// First, try to get a count to determine if we need parallel processing
var countQuery = $"SELECT COUNT() FROM {entityName}";
if (!string.IsNullOrWhiteSpace(baseWhereClause))
{
countQuery += $" WHERE {baseWhereClause}";
}
Console.WriteLine($"Checking dataset size with query: {countQuery}");
try
{
var countResponse = await GetAsync<Dictionary<string, object>>($"{_instanceUrl}/services/data/v60.0/query/?q={Uri.EscapeDataString(countQuery)}", cancellationToken);
if (countResponse?.ContainsKey("totalSize") == true && int.TryParse(countResponse["totalSize"].ToString(), out int totalRecords))
{
Console.WriteLine($"Dataset contains approximately {totalRecords} records");
// If dataset is large (>10,000 records), use parallel extraction
if (totalRecords > 10000)
{
Console.WriteLine("Large dataset detected, using parallel extraction with date-based chunking");
// Create date-based chunks for the last 2 years by default
var endDate = DateTime.UtcNow;
var startDate = endDate.AddYears(-2);
var whereClauses = CreateDateBasedWhereClauses(startDate, endDate, "CreatedDate", 7); // 7-day chunks
// Add base WHERE clause to each chunk if provided
if (!string.IsNullOrWhiteSpace(baseWhereClause))
{
whereClauses = whereClauses.Select(wc => $"({baseWhereClause}) AND ({wc})").ToList();
}
return await ExtractEntitiesParallelAsync(entityName, fieldsToSelect, whereClauses, maxRecords / whereClauses.Count, cancellationToken);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Could not determine dataset size, proceeding with standard extraction: {ex.Message}");
}
// For smaller datasets or if count failed, use standard extraction
Console.WriteLine("Using standard sequential extraction");
return await ExtractAllEntitiesAsync(entityName, fieldsToSelect, baseWhereClause, maxRecords, cancellationToken);
}
catch (Exception ex)
{
Console.WriteLine($"Error during large dataset extraction: {ex.Message}");
return new List<Dictionary<string, object>>();
}
}
/// <summary>
/// Optimized method to extract recently modified entities using batch operations
/// </summary>
/// <param name="entityName">The name of the SObject to extract</param>
/// <param name="fieldsToSelect">Specific fields to select</param>
/// <param name="hoursBack">Number of hours back to look for modifications (default: 24)</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>List of recently modified entities</returns>
public async Task<List<Dictionary<string, object>>> ExtractRecentlyModifiedAsync(string entityName, List<string>? fieldsToSelect = null, int hoursBack = 24, CancellationToken cancellationToken = default)
{
try
{
var startTime = DateTime.UtcNow.AddHours(-hoursBack);
var whereClause = $"LastModifiedDate >= {startTime:yyyy-MM-ddTHH:mm:ssZ}";
Console.WriteLine($"--- Extracting recently modified {entityName} (last {hoursBack} hours) ---");
Console.WriteLine($"Using WHERE clause: {whereClause}");
return await ExtractAllEntitiesAsync(entityName, fieldsToSelect, whereClause, 0, cancellationToken);
}
catch (Exception ex)
{
Console.WriteLine($"Error during recent entities extraction: {ex.Message}");
return new List<Dictionary<string, object>>();
}
} /// <summary>
/// Deletes an entity in Salesforce by its ID.
/// </summary>
@@ -804,6 +1450,7 @@ namespace DataConnection.REST.Implementations
[JsonPropertyName("fields")]
public List<SalesforceField> Fields { get; set; } = new List<SalesforceField>(); } /// <summary>
/// Finds entities by required fields to detect duplicates.
/// Now uses batch operations for improved performance when checking multiple field combinations.
/// </summary>
/// <param name="entityName">The name of the entity to search.</param>
/// <param name="requiredFields">The required fields and their values to search for.</param>
@@ -825,44 +1472,18 @@ namespace DataConnection.REST.Implementations
{
Console.WriteLine("No required fields provided for duplicate search");
return new List<Dictionary<string, object>>();
} // Build WHERE clause with required fields
var whereConditions = new List<string>();
foreach (var field in requiredFields)
{
if (field.Value != null)
{
var value = field.Value.ToString();
// Escape single quotes in string values
if (field.Value is string stringValue)
{
value = stringValue.Replace("'", "\\'");
whereConditions.Add($"{field.Key} = '{value}'");
}
else
{
whereConditions.Add($"{field.Key} = {value}");
}
}
}
if (!whereConditions.Any())
// Use the new batch search functionality for a single key field combination
var keyFieldsList = new List<Dictionary<string, object>> { requiredFields };
var batchResults = await BatchFindEntitiesByKeysAsync(entityName, keyFieldsList, cancellationToken);
// Extract results for the single query (index 0)
if (batchResults.ContainsKey(0))
{
Console.WriteLine("No valid field values provided for duplicate search");
return new List<Dictionary<string, object>>();
}
var whereClause = string.Join(" AND ", whereConditions);
var query = $"SELECT Id, {string.Join(", ", requiredFields.Keys)} FROM {entityName} WHERE {whereClause}";
Console.WriteLine($"Executing duplicate search query: {query}");
var queryEndpoint = $"/services/data/v59.0/query?q={Uri.EscapeDataString(query)}";
var response = await GetAsync<SalesforceQueryResponse>($"{_instanceUrl}{queryEndpoint}", cancellationToken);
if (response?.Records != null)
{
Console.WriteLine($"Found {response.Records.Count} potential duplicates for required fields: {string.Join(", ", requiredFields.Select(kv => $"{kv.Key}={kv.Value}"))}");
return response.Records;
var results = batchResults[0];
Console.WriteLine($"Found {results.Count} potential duplicates for required fields: {string.Join(", ", requiredFields.Select(kv => $"{kv.Key}={kv.Value}"))}");
return results;
}
Console.WriteLine("No duplicates found");
@@ -900,6 +1521,26 @@ namespace DataConnection.REST.Implementations
{
[JsonPropertyName("records")]
public List<Dictionary<string, object>> Records { get; set; } = new List<Dictionary<string, object>>();
[JsonPropertyName("totalSize")]
public int TotalSize { get; set; }
[JsonPropertyName("done")]
public bool Done { get; set; }
[JsonPropertyName("nextRecordsUrl")]
public string? NextRecordsUrl { get; set; }
}
public class BatchQueryResult
{
public int QueryIndex { get; set; }
public string Query { get; set; } = string.Empty;
public bool Success { get; set; }
public string ErrorMessage { get; set; } = string.Empty;
public List<Dictionary<string, object>> Records { get; set; } = new List<Dictionary<string, object>>();
public int TotalSize { get; set; }
public string? NextRecordsUrl { get; set; }
}
private class SalesforceCompositeRequest