using System.Data; using System.Text.Json; using Confluent.Kafka; using Elastic.Clients.Elasticsearch; using Npgsql; namespace GeoPulse_Pipeline; public class KafkaConsumer : BackgroundService { private readonly ILogger _logger; private readonly NpgsqlDataSource _dataSource; private readonly PartitionManager _partitionManager; private readonly string _kafkaHost; private readonly ElasticsearchClient _esClient; private const int BatchSize = 1000; private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2); public KafkaConsumer( ILogger logger, IConfiguration configuration, NpgsqlDataSource dataSource, PartitionManager partitionManager, ElasticsearchClient esClient) { _logger = logger; _kafkaHost = configuration["KafkaHost"]; _dataSource = dataSource; _partitionManager = partitionManager; _esClient = esClient; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await Task.Yield(); var consumerConfig = new ConsumerConfig { BootstrapServers = _kafkaHost, GroupId = "telemetry-db-writer-group", AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false }; using var consumer = new ConsumerBuilder(consumerConfig) .SetErrorHandler((_, e) => _logger.LogError($"Kafka 底層錯誤: {e.Reason}")) .Build(); consumer.Subscribe("telemetry-events"); var buffer = new List>(); var lastBatchTime = DateTime.UtcNow; _logger.LogInformation("Consumer 已啟動,開始監控 Kafka 訊息..."); while (!stoppingToken.IsCancellationRequested) { try { var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500)); if (consumeResult != null) { buffer.Add(consumeResult); } bool isBatchFull = buffer.Count >= BatchSize; bool isTimeUp = buffer.Count > 0 && (DateTime.UtcNow - lastBatchTime) >= MaxWaitTime; if (isBatchFull || isTimeUp) { _logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料..."); await BulkInsertToDatabaseAsync(buffer, stoppingToken); await BulkInsertToElasticsearchAsync(buffer, stoppingToken); var offsets = buffer.GroupBy(msg => msg.TopicPartition) .Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList(); consumer.Commit(offsets); _logger.LogInformation($"Kafka已寫入DB {buffer.Count} 筆資料至 PostgreSQL。"); buffer.Clear(); lastBatchTime = DateTime.UtcNow; } } catch (Exception ex) { _logger.LogError(ex, "Kafka寫入DB批次處理發生錯誤!原因: {Message}", ex.Message); buffer.Clear(); lastBatchTime = DateTime.UtcNow; await Task.Delay(2000, stoppingToken); } } } private async Task BulkInsertToDatabaseAsync(List> buffer, CancellationToken stoppingToken) { var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; // 先收集所有不重複的日期並確保分區表存在 var uniqueDates = new HashSet(); var processedMessages = new List<(TelemetryRequest Data, ConsumeResult Msg)>(); foreach (var msg in buffer) { var data = JsonSerializer.Deserialize(msg.Message.Value, jsonOptions); if (data == null) continue; uniqueDates.Add(data.Timestamp); processedMessages.Add((data, msg)); } foreach (var timestamp in uniqueDates) { await _partitionManager.EnsurePartitionExistsAsync(timestamp); } await using var batch = _dataSource.CreateBatch(); foreach (var (data, msg) in processedMessages) { var cmd = batch.CreateBatchCommand(); cmd.CommandText = @"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)"; cmd.Parameters.AddWithValue(data.DeviceId ?? string.Empty); cmd.Parameters.AddWithValue(data.Lng); cmd.Parameters.AddWithValue(data.Lat); cmd.Parameters.AddWithValue(data.Timestamp); batch.BatchCommands.Add(cmd); } await batch.ExecuteNonQueryAsync(stoppingToken); } private async Task BulkInsertToElasticsearchAsync(List> buffer, CancellationToken stoppingToken) { var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; var documents = new List(); foreach (var msg in buffer) { var data = JsonSerializer.Deserialize(msg.Message.Value, jsonOptions); if (data == null) continue; documents.Add(new { deviceId = data.DeviceId, timestamp = data.Timestamp, location = new { lat = data.Lat, lon = data.Lng } }); } var indexName = $"geopulse-telemetry-{DateTime.UtcNow:yyyy.MM.dd}"; var response = await _esClient.BulkAsync(b => b .Index(indexName) .CreateMany(documents), stoppingToken); if (!response.IsValidResponse) { _logger.LogError("寫入 Elasticsearch 失敗: {Error}", response.DebugInformation); } } }