GeoPulse_Pipeline/GeoPulse Pipeline/KafkaConsumer.cs

117 lines
3.7 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Data;
using System.Text.Json;
using Confluent.Kafka;
using Npgsql;
namespace GeoPulse_Pipeline;
public class KafkaConsumer : BackgroundService
{
private readonly ILogger<KafkaConsumer> _logger;
private readonly NpgsqlDataSource _dataSource;
private readonly string _kafkaHost;
private const int BatchSize = 1000;
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
public KafkaConsumer(ILogger<KafkaConsumer> logger, IConfiguration configuration, NpgsqlDataSource dataSource)
{
_logger = logger;
_kafkaHost = configuration["KafkaHost"];
_dataSource = dataSource;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Yield();
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _kafkaHost,
GroupId = "telemetry-db-writer-group", // 💡 建議換個名字,強迫 Kafka 重置 Offset
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
.SetErrorHandler((_, e) => _logger.LogError($"Kafka 底層錯誤: {e.Reason}"))
.Build();
consumer.Subscribe("telemetry-events");
var buffer = new List<ConsumeResult<Ignore, string>>();
var lastBatchTime = DateTime.UtcNow;
_logger.LogInformation("🚀 Consumer 已啟動,開始監控 Kafka 訊息...");
while (!stoppingToken.IsCancellationRequested)
{
try // 💡 關鍵修正try 放在 while 裡面
{
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
if (consumeResult != null)
{
buffer.Add(consumeResult);
}
bool isBatchFull = buffer.Count >= BatchSize;
bool isTimeUp = buffer.Count > 0 && (DateTime.UtcNow - lastBatchTime) >= MaxWaitTime;
if (isBatchFull || isTimeUp)
{
_logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料...");
await BulkInsertToDatabaseAsync(buffer, stoppingToken);
var offsets = buffer.GroupBy(msg => msg.TopicPartition)
.Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList();
consumer.Commit(offsets);
_logger.LogInformation($"✅ [成功] 已寫入 {buffer.Count} 筆資料至 PostgreSQL。");
buffer.Clear();
lastBatchTime = DateTime.UtcNow;
}
}
catch (Exception ex)
{
// 💡 就算這裡噴錯,程式也只是印出 Log然後回到 while 頂部繼續跑下一輪
_logger.LogError(ex, "❌ [失敗] 批次處理發生錯誤!原因: {Message}", ex.Message);
// 發生錯誤後,為了避免壞資料卡死 buffer 導致無窮重試,這裡建議清空
buffer.Clear();
lastBatchTime = DateTime.UtcNow;
// 稍微停 2 秒,避免在連續錯誤時瘋狂刷屏
await Task.Delay(2000, stoppingToken);
}
}
}
private async Task BulkInsertToDatabaseAsync(List<ConsumeResult<Ignore, string>> buffer,
CancellationToken stoppingToken)
{
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
await using var batch = _dataSource.CreateBatch();
foreach (var msg in buffer)
{
var data = JsonSerializer.Deserialize<TelemetryRequest>(msg.Message.Value, jsonOptions);
if (data == null) continue;
var cmd = batch.CreateBatchCommand();
cmd.CommandText =
@"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)";
cmd.Parameters.AddWithValue(data.DeviceId ?? string.Empty);
cmd.Parameters.AddWithValue(data.Lng);
cmd.Parameters.AddWithValue(data.Lat);
cmd.Parameters.AddWithValue(data.Timestamp);
batch.BatchCommands.Add(cmd);
}
await batch.ExecuteNonQueryAsync(stoppingToken);
}
}