GeoPulse_Pipeline/GeoPulse Pipeline/KafkaConsumer.cs
saingchildren a05d5a9050
All checks were successful
Deploy GeoPulse / Build And Deploy (push) Successful in 4s
feat: make sure the partition table exsits.
2026-05-09 13:04:44 +08:00

174 lines
5.0 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Data;
using System.Text.Json;
using Confluent.Kafka;
using Elastic.Clients.Elasticsearch;
using Npgsql;
namespace GeoPulse_Pipeline;
public class KafkaConsumer : BackgroundService
{
private readonly ILogger<KafkaConsumer> _logger;
private readonly NpgsqlDataSource _dataSource;
private readonly PartitionManager _partitionManager;
private readonly string _kafkaHost;
private readonly ElasticsearchClient _esClient;
private const int BatchSize = 1000;
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
public KafkaConsumer(
ILogger<KafkaConsumer> logger,
IConfiguration configuration,
NpgsqlDataSource dataSource,
PartitionManager partitionManager,
ElasticsearchClient esClient)
{
_logger = logger;
_kafkaHost = configuration["KafkaHost"];
_dataSource = dataSource;
_partitionManager = partitionManager;
_esClient = esClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Yield();
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _kafkaHost,
GroupId = "telemetry-db-writer-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
.SetErrorHandler((_, e) => _logger.LogError($"Kafka 底層錯誤: {e.Reason}"))
.Build();
consumer.Subscribe("telemetry-events");
var buffer = new List<ConsumeResult<Ignore, string>>();
var lastBatchTime = DateTime.UtcNow;
_logger.LogInformation("Consumer 已啟動,開始監控 Kafka 訊息...");
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
if (consumeResult != null)
{
buffer.Add(consumeResult);
}
bool isBatchFull = buffer.Count >= BatchSize;
bool isTimeUp = buffer.Count > 0 && (DateTime.UtcNow - lastBatchTime) >= MaxWaitTime;
if (isBatchFull || isTimeUp)
{
_logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料...");
await BulkInsertToDatabaseAsync(buffer, stoppingToken);
await BulkInsertToElasticsearchAsync(buffer, stoppingToken);
var offsets = buffer.GroupBy(msg => msg.TopicPartition)
.Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList();
consumer.Commit(offsets);
_logger.LogInformation($"Kafka已寫入DB {buffer.Count} 筆資料至 PostgreSQL。");
buffer.Clear();
lastBatchTime = DateTime.UtcNow;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Kafka寫入DB批次處理發生錯誤原因: {Message}", ex.Message);
buffer.Clear();
lastBatchTime = DateTime.UtcNow;
await Task.Delay(2000, stoppingToken);
}
}
}
private async Task BulkInsertToDatabaseAsync(List<ConsumeResult<Ignore, string>> buffer,
CancellationToken stoppingToken)
{
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
// 先收集所有不重複的日期並確保分區表存在
var uniqueDates = new HashSet<DateTimeOffset>();
var processedMessages = new List<(TelemetryRequest Data, ConsumeResult<Ignore, string> Msg)>();
foreach (var msg in buffer)
{
var data = JsonSerializer.Deserialize<TelemetryRequest>(msg.Message.Value, jsonOptions);
if (data == null) continue;
uniqueDates.Add(data.Timestamp);
processedMessages.Add((data, msg));
}
foreach (var timestamp in uniqueDates)
{
await _partitionManager.EnsurePartitionExistsAsync(timestamp);
}
await using var batch = _dataSource.CreateBatch();
foreach (var (data, msg) in processedMessages)
{
var cmd = batch.CreateBatchCommand();
cmd.CommandText =
@"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)";
cmd.Parameters.AddWithValue(data.DeviceId ?? string.Empty);
cmd.Parameters.AddWithValue(data.Lng);
cmd.Parameters.AddWithValue(data.Lat);
cmd.Parameters.AddWithValue(data.Timestamp);
batch.BatchCommands.Add(cmd);
}
await batch.ExecuteNonQueryAsync(stoppingToken);
}
private async Task BulkInsertToElasticsearchAsync(List<ConsumeResult<Ignore, string>> buffer,
CancellationToken stoppingToken)
{
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
var documents = new List<object>();
foreach (var msg in buffer)
{
var data = JsonSerializer.Deserialize<TelemetryRequest>(msg.Message.Value, jsonOptions);
if (data == null) continue;
documents.Add(new
{
deviceId = data.DeviceId,
timestamp = data.Timestamp,
location = new { lat = data.Lat, lon = data.Lng }
});
}
var indexName = $"geopulse-telemetry-{DateTime.UtcNow:yyyy.MM.dd}";
var response = await _esClient.BulkAsync(b => b
.Index(indexName)
.CreateMany(documents), stoppingToken);
if (!response.IsValidResponse)
{
_logger.LogError("寫入 Elasticsearch 失敗: {Error}", response.DebugInformation);
}
}
}