GeoPulse_Pipeline/GeoPulse Pipeline/KafkaConsumer.cs
2026-04-30 11:40:45 +08:00

114 lines
3.3 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Data;
using System.Text.Json;
using Confluent.Kafka;
using Npgsql;
namespace GeoPulse_Pipeline;
public class KafkaConsumer : BackgroundService
{
private readonly ILogger<KafkaConsumer> _logger;
private readonly NpgsqlDataSource _dataSource;
private readonly string _kafkaHost;
private const int BatchSize = 1000;
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
public KafkaConsumer(ILogger<KafkaConsumer> logger, IConfiguration configuration, NpgsqlDataSource dataSource)
{
_logger = logger;
_kafkaHost = configuration["KafkaHost"];
_dataSource = dataSource;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Yield();
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _kafkaHost,
GroupId = "telemetry-db-writer-group", // 💡 建議換個名字,強迫 Kafka 重置 Offset
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
.SetErrorHandler((_, e) => _logger.LogError($"Kafka 底層錯誤: {e.Reason}"))
.Build();
consumer.Subscribe("telemetry-events");
var buffer = new List<ConsumeResult<Ignore, string>>();
var lastBatchTime = DateTime.UtcNow;
_logger.LogInformation("🚀 Consumer 已啟動,開始監控 Kafka 訊息...");
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
if (consumeResult != null)
{
buffer.Add(consumeResult);
}
bool isBatchFull = buffer.Count >= BatchSize;
bool isTimeUp = buffer.Count > 0 && (DateTime.UtcNow - lastBatchTime) >= MaxWaitTime;
if (isBatchFull || isTimeUp)
{
_logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料...");
await BulkInsertToDatabaseAsync(buffer, stoppingToken);
var offsets = buffer.GroupBy(msg => msg.TopicPartition)
.Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList();
consumer.Commit(offsets);
_logger.LogInformation($"Kafka已寫入DB {buffer.Count} 筆資料至 PostgreSQL。");
buffer.Clear();
lastBatchTime = DateTime.UtcNow;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Kafka寫入DB批次處理發生錯誤原因: {Message}", ex.Message);
buffer.Clear();
lastBatchTime = DateTime.UtcNow;
await Task.Delay(2000, stoppingToken);
}
}
}
private async Task BulkInsertToDatabaseAsync(List<ConsumeResult<Ignore, string>> buffer,
CancellationToken stoppingToken)
{
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
await using var batch = _dataSource.CreateBatch();
foreach (var msg in buffer)
{
var data = JsonSerializer.Deserialize<TelemetryRequest>(msg.Message.Value, jsonOptions);
if (data == null) continue;
var cmd = batch.CreateBatchCommand();
cmd.CommandText =
@"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)";
cmd.Parameters.AddWithValue(data.DeviceId ?? string.Empty);
cmd.Parameters.AddWithValue(data.Lng);
cmd.Parameters.AddWithValue(data.Lat);
cmd.Parameters.AddWithValue(data.Timestamp);
batch.BatchCommands.Add(cmd);
}
await batch.ExecuteNonQueryAsync(stoppingToken);
}
}