GeoPulse_Pipeline/GeoPulse Pipeline/KafkaConsumer.cs
2026-05-02 09:55:17 +08:00

156 lines
4.4 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Data;
using System.Text.Json;
using Confluent.Kafka;
using Elastic.Clients.Elasticsearch;
using Npgsql;
namespace GeoPulse_Pipeline;
public class KafkaConsumer : BackgroundService
{
private readonly ILogger<KafkaConsumer> _logger;
private readonly NpgsqlDataSource _dataSource;
private readonly string _kafkaHost;
private readonly ElasticsearchClient _esClient;
private const int BatchSize = 1000;
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
public KafkaConsumer(
ILogger<KafkaConsumer> logger,
IConfiguration configuration,
NpgsqlDataSource dataSource,
ElasticsearchClient esClient)
{
_logger = logger;
_kafkaHost = configuration["KafkaHost"];
_dataSource = dataSource;
_esClient = esClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Yield();
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _kafkaHost,
GroupId = "telemetry-db-writer-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
.SetErrorHandler((_, e) => _logger.LogError($"Kafka 底層錯誤: {e.Reason}"))
.Build();
consumer.Subscribe("telemetry-events");
var buffer = new List<ConsumeResult<Ignore, string>>();
var lastBatchTime = DateTime.UtcNow;
_logger.LogInformation("Consumer 已啟動,開始監控 Kafka 訊息...");
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
if (consumeResult != null)
{
buffer.Add(consumeResult);
}
bool isBatchFull = buffer.Count >= BatchSize;
bool isTimeUp = buffer.Count > 0 && (DateTime.UtcNow - lastBatchTime) >= MaxWaitTime;
if (isBatchFull || isTimeUp)
{
_logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料...");
await BulkInsertToDatabaseAsync(buffer, stoppingToken);
await BulkInsertToElasticsearchAsync(buffer, stoppingToken);
var offsets = buffer.GroupBy(msg => msg.TopicPartition)
.Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList();
consumer.Commit(offsets);
_logger.LogInformation($"Kafka已寫入DB {buffer.Count} 筆資料至 PostgreSQL。");
buffer.Clear();
lastBatchTime = DateTime.UtcNow;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Kafka寫入DB批次處理發生錯誤原因: {Message}", ex.Message);
buffer.Clear();
lastBatchTime = DateTime.UtcNow;
await Task.Delay(2000, stoppingToken);
}
}
}
private async Task BulkInsertToDatabaseAsync(List<ConsumeResult<Ignore, string>> buffer,
CancellationToken stoppingToken)
{
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
await using var batch = _dataSource.CreateBatch();
foreach (var msg in buffer)
{
var data = JsonSerializer.Deserialize<TelemetryRequest>(msg.Message.Value, jsonOptions);
if (data == null) continue;
var cmd = batch.CreateBatchCommand();
cmd.CommandText =
@"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)";
cmd.Parameters.AddWithValue(data.DeviceId ?? string.Empty);
cmd.Parameters.AddWithValue(data.Lng);
cmd.Parameters.AddWithValue(data.Lat);
cmd.Parameters.AddWithValue(data.Timestamp);
batch.BatchCommands.Add(cmd);
}
await batch.ExecuteNonQueryAsync(stoppingToken);
}
private async Task BulkInsertToElasticsearchAsync(List<ConsumeResult<Ignore, string>> buffer,
CancellationToken stoppingToken)
{
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
var documents = new List<object>();
foreach (var msg in buffer)
{
var data = JsonSerializer.Deserialize<TelemetryRequest>(msg.Message.Value, jsonOptions);
if (data == null) continue;
documents.Add(new
{
deviceId = data.DeviceId,
timestamp = data.Timestamp,
location = new { lat = data.Lat, lon = data.Lng }
});
}
var indexName = $"geopulse-telemetry-{DateTime.UtcNow:yyyy.MM.dd}";
var response = await _esClient.BulkAsync(b => b
.Index(indexName)
.CreateMany(documents), stoppingToken);
if (!response.IsValidResponse)
{
_logger.LogError("寫入 Elasticsearch 失敗: {Error}", response.DebugInformation);
}
}
}