using System.Text.Json; using Confluent.Kafka; using Npgsql; namespace GeoPulse_Pipeline; public class KafkaConsumer : BackgroundService { private readonly ILogger _logger; private readonly string _connectionString; private readonly string _kafkaHost; private const int BatchSize = 1000; private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2); public KafkaConsumer(ILogger logger, IConfiguration configuration) { _logger = logger; _connectionString = configuration.GetConnectionString("DefaultConnection") ?? throw new ArgumentNullException("資料庫連線字串未設定"); _kafkaHost = configuration["KafkaHost"]; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await Task.Yield(); var consumerConfig = new ConsumerConfig { BootstrapServers = _kafkaHost, GroupId = "telemetry-db-writer-group", AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false }; using var consumer = new ConsumerBuilder(consumerConfig).Build(); consumer.Subscribe("telemetry-events"); var buffer = new List>(); var lastBatchTime = DateTime.UtcNow; try { while (!stoppingToken.IsCancellationRequested) { var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500)); if (consumeResult != null) { buffer.Add(consumeResult); } bool isBatchFull = buffer.Count >= BatchSize; bool isTimeUp = buffer.Count > 0 && (DateTime.UtcNow - lastBatchTime) >= MaxWaitTime; if (isBatchFull || isTimeUp) { await BulkInsertToDatabaseAsync(buffer, stoppingToken); var latestOffset = buffer.Last(); consumer.Commit(latestOffset); _logger.LogInformation($"成功批次寫入 {buffer.Count} 筆資料至 PostgreSQL。"); buffer.Clear(); lastBatchTime = DateTime.UtcNow; } } } catch (Exception ex) { _logger.LogError($"發生錯誤: {ex.Message} \n {ex.StackTrace}"); buffer.Clear(); } finally { consumer.Close(); } } private async Task BulkInsertToDatabaseAsync(List> buffer, CancellationToken stoppingToken) { var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; await using var dataSource = NpgsqlDataSource.Create(_connectionString); await using var batch = dataSource.CreateBatch(); foreach (var msg in buffer) { var data = JsonSerializer.Deserialize(msg.Message.Value, jsonOptions); if (data == null) continue; var cmd = batch.CreateBatchCommand(); cmd.CommandText = @"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)"; cmd.Parameters.AddWithValue(data.DeviceId ?? string.Empty); cmd.Parameters.AddWithValue(data.Lng); cmd.Parameters.AddWithValue(data.Lat); cmd.Parameters.AddWithValue(data.Timestamp); batch.BatchCommands.Add(cmd); } await batch.ExecuteNonQueryAsync(stoppingToken); } }