diff --git a/GeoPulse Pipeline/KafkaConsumer.cs b/GeoPulse Pipeline/KafkaConsumer.cs new file mode 100644 index 0000000..49bcc4d --- /dev/null +++ b/GeoPulse Pipeline/KafkaConsumer.cs @@ -0,0 +1,108 @@ +using System.Text.Json; +using Confluent.Kafka; +using Npgsql; + +namespace GeoPulse_Pipeline; + +public class KafkaConsumer : BackgroundService +{ + private readonly ILogger _logger; + private readonly string _connectionString; + private readonly string _kafkaHost; + + private const int BatchSize = 1000; + private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2); + + public KafkaConsumer(ILogger logger, IConfiguration configuration) + { + _logger = logger; + _connectionString = configuration.GetConnectionString("DefaultConnection") + ?? throw new ArgumentNullException("資料庫連線字串未設定"); + _kafkaHost = configuration["KafkaHost"]; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await Task.Yield(); + + var consumerConfig = new ConsumerConfig + { + BootstrapServers = _kafkaHost, + GroupId = "telemetry-db-writer-group", + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false + }; + + using var consumer = new ConsumerBuilder(consumerConfig).Build(); + consumer.Subscribe("telemetry-events"); + + var buffer = new List>(); + var lastBatchTime = DateTime.UtcNow; + + try + { + while (!stoppingToken.IsCancellationRequested) + { + var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500)); + + if (consumeResult != null) + { + buffer.Add(consumeResult); + } + + bool isBatchFull = buffer.Count >= BatchSize; + bool isTimeUp = buffer.Count > 0 && (DateTime.UtcNow - lastBatchTime) >= MaxWaitTime; + + if (isBatchFull || isTimeUp) + { + await BulkInsertToDatabaseAsync(buffer, stoppingToken); + + var latestOffset = buffer.Last(); + consumer.Commit(latestOffset); + + _logger.LogInformation($"成功批次寫入 {buffer.Count} 筆資料至 PostgreSQL。"); + + buffer.Clear(); + lastBatchTime = DateTime.UtcNow; + } + } + } + catch (Exception ex) + { + _logger.LogError($"發生錯誤: {ex.Message} \n {ex.StackTrace}"); + + buffer.Clear(); + } + finally + { + consumer.Close(); + } + } + + private async Task BulkInsertToDatabaseAsync(List> buffer, + CancellationToken stoppingToken) + { + var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; + await using var dataSource = NpgsqlDataSource.Create(_connectionString); + await using var batch = dataSource.CreateBatch(); + + foreach (var msg in buffer) + { + var data = JsonSerializer.Deserialize(msg.Message.Value, jsonOptions); + if (data == null) continue; + + var cmd = batch.CreateBatchCommand(); + cmd.CommandText = + @"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)"; + + cmd.Parameters.AddWithValue(data.DeviceId ?? string.Empty); + cmd.Parameters.AddWithValue(data.Lng); + cmd.Parameters.AddWithValue(data.Lat); + cmd.Parameters.AddWithValue(data.Timestamp); + + batch.BatchCommands.Add(cmd); + } + + await batch.ExecuteNonQueryAsync(stoppingToken); + } +} \ No newline at end of file