6452d45b77
Implementata la possibilita' di usare Salesforce come fonte dati e un database relazionale come destinazione nel flusso di trasferimento dati. ## Modifiche principali ### Nuovi file partial class - Data_Coupler/Extensions/DataCoupler/SalesforceSourceMethod.cs - Stato e metodi per Salesforce come fonte (credenziali, connessione, discovery SObject) - ConnectToSalesforceSource(): autenticazione parallela + discovery summaries/details - SelectSalesforceSourceEntity(): selezione SObject e caricamento campi - GetAllRecordsFromSalesforceSource(): estrazione via ExtractAllEntitiesAsync con solo i campi mappati - Filtro/ricerca SObject in tempo reale - Data_Coupler/Extensions/DataCoupler/DatabaseDestinationMethod.cs - Stato e metodi per database come destinazione - ConnectToDestinationDatabase(): connessione e discovery tabelle - SelectDestinationTable(): caricamento schema tabella on-demand - IsDestinationDatabaseReady: proprieta' calcolata per validazione - Toggle UI tra destinazione REST e Database ### IDatabaseManager interface - Aggiunto UpsertRecordAsync(tableName, keyField, keyValue, record): - Esegue SELECT COUNT(*) per verificare esistenza record - UPDATE se esiste, INSERT se non esiste - Implementato in EFCoreDatabaseManager (parametri named @p0..@pN) - Implementato in OdbcDatabaseManager (parametri posizionali ?) ### DataCoupler.razor (UI) - Aggiunto 'Salesforce (REST API)' nel dropdown tipo fonte - Sezione UI Salesforce fonte: selettore credenziali, bottone connessione, lista SObject con ricerca - Toggle destinazione REST/Database nella card destra - Sezione UI Database destinazione: selettore credenziali, bottone connessione, lista tabelle con ricerca - Colonna destra mapping aggiornata: mostra colonne DB se destinazione e' database, proprieta' REST altrimenti - Colonna sinistra mapping: aggiunta sezione campi SObject Salesforce - isSourceReady aggiornato per includere fonte Salesforce - isDestinationReady aggiornato per includere destinazione database - Etichette mapping dinamiche in base ai tipi selezionati ### DataCoupler.razor.cs (logica) - LoadCredentials(): aggiunto caricamento credenziali Salesforce fonte - ResetSourceState(): aggiunto reset stato Salesforce fonte - ResetDestinationState(): aggiunto reset stato database destinazione - GetAllRecordsFromSource(): aggiunto branch Salesforce - StartDataTransfer(): routing verso StartDataTransferToDatabase() se dest=database - Aggiunto StartDataTransferToDatabase(): estrae record fonte, applica mapping e default values, chiama UpsertRecordAsync per ogni record - Rimosso codice duplicato in StartDataTransfer()
411 lines
14 KiB
C#
411 lines
14 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Data;
|
|
using System.Data.Odbc;
|
|
using System.Linq;
|
|
using System.Linq.Expressions;
|
|
using System.Threading.Tasks;
|
|
using DataConnection.EF.SchemaProviders;
|
|
using DataConnection.Interfaces;
|
|
|
|
namespace DataConnection.DB;
|
|
|
|
/// <summary>
|
|
/// Database manager per connessioni ODBC dirette (senza Entity Framework)
|
|
/// </summary>
|
|
public class OdbcDatabaseManager : IDatabaseManager
|
|
{
|
|
private readonly string _connectionString;
|
|
private readonly OdbcSchemaProvider _schemaProvider;
|
|
private string _currentDatabase = string.Empty;
|
|
|
|
public OdbcDatabaseManager(string connectionString)
|
|
{
|
|
_connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
|
|
_schemaProvider = new OdbcSchemaProvider();
|
|
}
|
|
|
|
public async Task<bool> TestConnectionAsync()
|
|
{
|
|
try
|
|
{
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
return true;
|
|
}
|
|
catch
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
public Task<IEnumerable<T>> GetAsync<T>(
|
|
Expression<Func<T, bool>>? filter = null,
|
|
Func<IQueryable<T>, IOrderedQueryable<T>>? orderBy = null,
|
|
string includeProperties = "",
|
|
int? skip = null,
|
|
int? take = null) where T : class
|
|
{
|
|
throw new NotSupportedException("GetAsync<T> with LINQ expressions is not supported for ODBC. Use ExecuteQueryAsync instead.");
|
|
}
|
|
|
|
public Task<T?> GetByIdAsync<T>(object id) where T : class
|
|
{
|
|
throw new NotSupportedException("GetByIdAsync<T> is not supported for ODBC. Use ExecuteQueryAsync with WHERE clause instead.");
|
|
}
|
|
|
|
public Task<IEnumerable<T>> ExecuteQueryAsync<T>(string sql, params object[] parameters) where T : class
|
|
{
|
|
throw new NotSupportedException("ExecuteQueryAsync<T> with entity type is not supported for ODBC. Use ExecuteRawQueryAsync instead.");
|
|
}
|
|
|
|
public async Task<List<Dictionary<string, object>>> ExecuteRawQueryAsync(string sql, string databaseName = "", params object[] parameters)
|
|
{
|
|
var results = new List<Dictionary<string, object>>();
|
|
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
// Cambia database se specificato
|
|
if (!string.IsNullOrEmpty(databaseName) && databaseName != _currentDatabase)
|
|
{
|
|
await connection.ChangeDatabaseAsync(databaseName);
|
|
_currentDatabase = databaseName;
|
|
}
|
|
|
|
using var command = new OdbcCommand(sql, connection);
|
|
|
|
// Aggiungi parametri
|
|
if (parameters != null && parameters.Length > 0)
|
|
{
|
|
for (int i = 0; i < parameters.Length; i++)
|
|
{
|
|
command.Parameters.Add(new OdbcParameter($"@p{i}", parameters[i] ?? DBNull.Value));
|
|
}
|
|
}
|
|
|
|
using var reader = await command.ExecuteReaderAsync();
|
|
|
|
while (await reader.ReadAsync())
|
|
{
|
|
var row = new Dictionary<string, object>();
|
|
for (int i = 0; i < reader.FieldCount; i++)
|
|
{
|
|
var fieldName = reader.GetName(i);
|
|
var value = reader.IsDBNull(i) ? DBNull.Value : reader.GetValue(i);
|
|
row[fieldName] = value;
|
|
}
|
|
results.Add(row);
|
|
}
|
|
|
|
return results;
|
|
}
|
|
|
|
public async Task<int> ExecuteCommandAsync(string sql, params object[] parameters)
|
|
{
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
using var command = new OdbcCommand(sql, connection);
|
|
|
|
if (parameters != null && parameters.Length > 0)
|
|
{
|
|
for (int i = 0; i < parameters.Length; i++)
|
|
{
|
|
command.Parameters.Add(new OdbcParameter($"@p{i}", parameters[i] ?? DBNull.Value));
|
|
}
|
|
}
|
|
|
|
return await command.ExecuteNonQueryAsync();
|
|
}
|
|
|
|
public async Task<List<string>> GetAvailableDatabasesAsync()
|
|
{
|
|
var databases = await _schemaProvider.GetAvailableDatabasesAsync(_connectionString);
|
|
return databases.ToList();
|
|
}
|
|
|
|
public async Task ChangeDatabaseAsync(string databaseName)
|
|
{
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
await connection.ChangeDatabaseAsync(databaseName);
|
|
_currentDatabase = databaseName;
|
|
}
|
|
|
|
public async Task<IDictionary<string, IEnumerable<DbColumnInfo>>> GetDatabaseSchemaAsync()
|
|
{
|
|
return await _schemaProvider.GetDatabaseSchemaAsync(_connectionString);
|
|
}
|
|
|
|
public async Task<IEnumerable<string>> GetTableNamesAsync()
|
|
{
|
|
return await _schemaProvider.GetTableNamesAsync(_connectionString);
|
|
}
|
|
|
|
public async Task<IEnumerable<DbColumnInfo>> GetTableSchemaAsync(string tableName)
|
|
{
|
|
return await _schemaProvider.GetTableSchemaAsync(_connectionString, tableName);
|
|
}
|
|
|
|
public async Task<IEnumerable<Dictionary<string, object>>> GetAllRecordsAsync(string tableName)
|
|
{
|
|
var query = $"SELECT * FROM {tableName}";
|
|
var results = await ExecuteRawQueryAsync(query);
|
|
return results;
|
|
}
|
|
|
|
public async Task<string?> GetPrimaryKeyFieldAsync(string tableName)
|
|
{
|
|
try
|
|
{
|
|
var schema = await GetTableSchemaAsync(tableName);
|
|
var pkColumn = schema.FirstOrDefault(c => c.IsPrimaryKey);
|
|
return pkColumn?.Name;
|
|
}
|
|
catch
|
|
{
|
|
return null;
|
|
}
|
|
}
|
|
|
|
public async Task<IEnumerable<IDictionary<string, object?>>> ExecuteQueryAsync(string query, int? maxRows = null)
|
|
{
|
|
var results = new List<IDictionary<string, object?>>();
|
|
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
using var command = new OdbcCommand(query, connection);
|
|
if (maxRows.HasValue)
|
|
{
|
|
command.CommandText = WrapQueryWithLimit(query, maxRows.Value);
|
|
}
|
|
|
|
using var reader = await command.ExecuteReaderAsync();
|
|
|
|
while (await reader.ReadAsync())
|
|
{
|
|
var row = new Dictionary<string, object?>();
|
|
for (int i = 0; i < reader.FieldCount; i++)
|
|
{
|
|
var fieldName = reader.GetName(i);
|
|
var value = reader.IsDBNull(i) ? null : reader.GetValue(i);
|
|
row[fieldName] = value;
|
|
}
|
|
results.Add(row);
|
|
}
|
|
|
|
return results;
|
|
}
|
|
|
|
public async Task<int> ExecuteNonQueryAsync(string query)
|
|
{
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
using var command = new OdbcCommand(query, connection);
|
|
return await command.ExecuteNonQueryAsync();
|
|
}
|
|
|
|
public async Task<object?> ExecuteScalarAsync(string query)
|
|
{
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
using var command = new OdbcCommand(query, connection);
|
|
return await command.ExecuteScalarAsync();
|
|
}
|
|
|
|
public async Task<int> InsertAsync(string tableName, IDictionary<string, object?> data)
|
|
{
|
|
var columns = string.Join(", ", data.Keys.Select(k => $"[{k}]"));
|
|
var parameters = string.Join(", ", data.Keys.Select((_, i) => $"?"));
|
|
|
|
var query = $"INSERT INTO {tableName} ({columns}) VALUES ({parameters})";
|
|
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
using var command = new OdbcCommand(query, connection);
|
|
|
|
foreach (var value in data.Values)
|
|
{
|
|
command.Parameters.Add(new OdbcParameter { Value = value ?? DBNull.Value });
|
|
}
|
|
|
|
return await command.ExecuteNonQueryAsync();
|
|
}
|
|
|
|
public async Task<int> UpdateAsync(string tableName, IDictionary<string, object?> data, IDictionary<string, object?> whereClause)
|
|
{
|
|
var setClause = string.Join(", ", data.Keys.Select(k => $"[{k}] = ?"));
|
|
var whereConditions = string.Join(" AND ", whereClause.Keys.Select(k => $"[{k}] = ?"));
|
|
|
|
var query = $"UPDATE {tableName} SET {setClause} WHERE {whereConditions}";
|
|
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
using var command = new OdbcCommand(query, connection);
|
|
|
|
// Aggiungi parametri SET
|
|
foreach (var value in data.Values)
|
|
{
|
|
command.Parameters.Add(new OdbcParameter { Value = value ?? DBNull.Value });
|
|
}
|
|
|
|
// Aggiungi parametri WHERE
|
|
foreach (var value in whereClause.Values)
|
|
{
|
|
command.Parameters.Add(new OdbcParameter { Value = value ?? DBNull.Value });
|
|
}
|
|
|
|
return await command.ExecuteNonQueryAsync();
|
|
}
|
|
|
|
public async Task<int> DeleteAsync(string tableName, IDictionary<string, object?> whereClause)
|
|
{
|
|
var whereConditions = string.Join(" AND ", whereClause.Keys.Select(k => $"[{k}] = ?"));
|
|
var query = $"DELETE FROM {tableName} WHERE {whereConditions}";
|
|
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
using var command = new OdbcCommand(query, connection);
|
|
|
|
foreach (var value in whereClause.Values)
|
|
{
|
|
command.Parameters.Add(new OdbcParameter { Value = value ?? DBNull.Value });
|
|
}
|
|
|
|
return await command.ExecuteNonQueryAsync();
|
|
}
|
|
|
|
public async Task<int> BulkInsertAsync(string tableName, IEnumerable<IDictionary<string, object?>> dataList)
|
|
{
|
|
int totalInserted = 0;
|
|
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
using var transaction = connection.BeginTransaction();
|
|
|
|
try
|
|
{
|
|
foreach (var data in dataList)
|
|
{
|
|
var columns = string.Join(", ", data.Keys.Select(k => $"[{k}]"));
|
|
var parameters = string.Join(", ", data.Keys.Select((_, i) => $"?"));
|
|
|
|
var query = $"INSERT INTO {tableName} ({columns}) VALUES ({parameters})";
|
|
|
|
using var command = new OdbcCommand(query, connection, transaction);
|
|
|
|
foreach (var value in data.Values)
|
|
{
|
|
command.Parameters.Add(new OdbcParameter { Value = value ?? DBNull.Value });
|
|
}
|
|
|
|
totalInserted += await command.ExecuteNonQueryAsync();
|
|
}
|
|
|
|
transaction.Commit();
|
|
}
|
|
catch
|
|
{
|
|
transaction.Rollback();
|
|
throw;
|
|
}
|
|
|
|
return totalInserted;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Wrappa la query con LIMIT/TOP a seconda del dialetto SQL
|
|
/// Nota: ODBC non ha una sintassi standard, quindi usiamo TOP (SQL Server style)
|
|
/// che è supportato dalla maggior parte dei driver
|
|
/// </summary>
|
|
private string WrapQueryWithLimit(string query, int maxRows)
|
|
{
|
|
// Verifica se la query ha già un LIMIT o TOP
|
|
var upperQuery = query.Trim().ToUpperInvariant();
|
|
|
|
if (upperQuery.Contains("LIMIT ") || upperQuery.Contains("TOP "))
|
|
{
|
|
return query; // Query già limitata
|
|
}
|
|
|
|
// Prova con SELECT TOP (SQL Server, SAP HANA)
|
|
if (upperQuery.StartsWith("SELECT "))
|
|
{
|
|
return query.Insert(7, $"TOP {maxRows} ");
|
|
}
|
|
|
|
// Fallback: aggiungi LIMIT alla fine (MySQL, PostgreSQL style)
|
|
return $"{query} LIMIT {maxRows}";
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
// Nessuna risorsa da rilasciare per ODBC diretto
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<bool> UpsertRecordAsync(string tableName, string keyField, object? keyValue, Dictionary<string, object?> record)
|
|
{
|
|
try
|
|
{
|
|
using var connection = new OdbcConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
// Controlla se il record esiste già (ODBC usa ? come placeholder)
|
|
using var checkCmd = new OdbcCommand($"SELECT COUNT(*) FROM {tableName} WHERE [{keyField}] = ?", connection);
|
|
checkCmd.Parameters.Add(new OdbcParameter { Value = keyValue ?? DBNull.Value });
|
|
|
|
var countResult = await checkCmd.ExecuteScalarAsync();
|
|
bool exists = Convert.ToInt64(countResult ?? 0L) > 0;
|
|
|
|
if (exists)
|
|
{
|
|
// UPDATE
|
|
var fields = record.Keys.ToList();
|
|
var setClauses = fields.Select(f => $"[{f}] = ?").ToList();
|
|
var updateSql = $"UPDATE {tableName} SET {string.Join(", ", setClauses)} WHERE [{keyField}] = ?";
|
|
|
|
using var updateCmd = new OdbcCommand(updateSql, connection);
|
|
|
|
foreach (var f in fields)
|
|
updateCmd.Parameters.Add(new OdbcParameter { Value = record[f] ?? DBNull.Value });
|
|
|
|
// Parametro per la WHERE
|
|
updateCmd.Parameters.Add(new OdbcParameter { Value = keyValue ?? DBNull.Value });
|
|
|
|
await updateCmd.ExecuteNonQueryAsync();
|
|
}
|
|
else
|
|
{
|
|
// INSERT
|
|
var fields = record.Keys.ToList();
|
|
var fieldNames = string.Join(", ", fields.Select(f => $"[{f}]"));
|
|
var paramPlaceholders = string.Join(", ", fields.Select(_ => "?"));
|
|
var insertSql = $"INSERT INTO {tableName} ({fieldNames}) VALUES ({paramPlaceholders})";
|
|
|
|
using var insertCmd = new OdbcCommand(insertSql, connection);
|
|
|
|
foreach (var f in fields)
|
|
insertCmd.Parameters.Add(new OdbcParameter { Value = record[f] ?? DBNull.Value });
|
|
|
|
await insertCmd.ExecuteNonQueryAsync();
|
|
}
|
|
|
|
return true;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine($"Errore nell'upsert ODBC in {tableName}: {ex.Message}");
|
|
return false;
|
|
}
|
|
}
|
|
}
|