116 lines
3.6 KiB
C#
116 lines
3.6 KiB
C#
using System.Data;
|
||
using System.Text.Json;
|
||
using Confluent.Kafka;
|
||
using Npgsql;
|
||
|
||
namespace GeoPulse_Pipeline;
|
||
|
||
public class KafkaConsumer : BackgroundService
|
||
{
|
||
private readonly ILogger<KafkaConsumer> _logger;
|
||
private readonly NpgsqlDataSource _dataSource;
|
||
private readonly string _kafkaHost;
|
||
|
||
private const int BatchSize = 1000;
|
||
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
|
||
|
||
public KafkaConsumer(ILogger<KafkaConsumer> logger, IConfiguration configuration, NpgsqlDataSource dataSource)
|
||
{
|
||
_logger = logger;
|
||
_kafkaHost = configuration["KafkaHost"];
|
||
_dataSource = dataSource;
|
||
}
|
||
|
||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||
{
|
||
await Task.Yield();
|
||
|
||
var consumerConfig = new ConsumerConfig
|
||
{
|
||
BootstrapServers = _kafkaHost,
|
||
GroupId = "telemetry-db-writer-group", // 💡 建議換個名字,強迫 Kafka 重置 Offset
|
||
AutoOffsetReset = AutoOffsetReset.Earliest,
|
||
EnableAutoCommit = false
|
||
};
|
||
|
||
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
|
||
.SetErrorHandler((_, e) => _logger.LogError($"Kafka 底層錯誤: {e.Reason}"))
|
||
.Build();
|
||
|
||
consumer.Subscribe("telemetry-events");
|
||
|
||
var buffer = new List<ConsumeResult<Ignore, string>>();
|
||
var lastBatchTime = DateTime.UtcNow;
|
||
|
||
_logger.LogInformation("🚀 Consumer 已啟動,開始監控 Kafka 訊息...");
|
||
|
||
while (!stoppingToken.IsCancellationRequested)
|
||
{
|
||
try // 💡 關鍵修正:try 放在 while 裡面
|
||
{
|
||
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
|
||
|
||
if (consumeResult != null)
|
||
{
|
||
buffer.Add(consumeResult);
|
||
}
|
||
|
||
bool isBatchFull = buffer.Count >= BatchSize;
|
||
bool isTimeUp = buffer.Count > 0 && (DateTime.UtcNow - lastBatchTime) >= MaxWaitTime;
|
||
|
||
if (isBatchFull || isTimeUp)
|
||
{
|
||
_logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料...");
|
||
|
||
await BulkInsertToDatabaseAsync(buffer, stoppingToken);
|
||
|
||
var latestOffset = buffer.Last();
|
||
consumer.Commit(latestOffset);
|
||
|
||
_logger.LogInformation($"✅ [成功] 已寫入 {buffer.Count} 筆資料至 PostgreSQL。");
|
||
|
||
buffer.Clear();
|
||
lastBatchTime = DateTime.UtcNow;
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
// 💡 就算這裡噴錯,程式也只是印出 Log,然後回到 while 頂部繼續跑下一輪
|
||
_logger.LogError(ex, "❌ [失敗] 批次處理發生錯誤!原因: {Message}", ex.Message);
|
||
|
||
// 發生錯誤後,為了避免壞資料卡死 buffer 導致無窮重試,這裡建議清空
|
||
buffer.Clear();
|
||
lastBatchTime = DateTime.UtcNow;
|
||
|
||
// 稍微停 2 秒,避免在連續錯誤時瘋狂刷屏
|
||
await Task.Delay(2000, stoppingToken);
|
||
}
|
||
}
|
||
}
|
||
|
||
private async Task BulkInsertToDatabaseAsync(List<ConsumeResult<Ignore, string>> buffer,
|
||
CancellationToken stoppingToken)
|
||
{
|
||
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
||
await using var batch = _dataSource.CreateBatch();
|
||
|
||
foreach (var msg in buffer)
|
||
{
|
||
var data = JsonSerializer.Deserialize<TelemetryRequest>(msg.Message.Value, jsonOptions);
|
||
if (data == null) continue;
|
||
|
||
var cmd = batch.CreateBatchCommand();
|
||
cmd.CommandText =
|
||
@"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)";
|
||
|
||
cmd.Parameters.AddWithValue(data.DeviceId ?? string.Empty);
|
||
cmd.Parameters.AddWithValue(data.Lng);
|
||
cmd.Parameters.AddWithValue(data.Lat);
|
||
cmd.Parameters.AddWithValue(data.Timestamp);
|
||
|
||
batch.BatchCommands.Add(cmd);
|
||
}
|
||
|
||
await batch.ExecuteNonQueryAsync(stoppingToken);
|
||
}
|
||
} |