156 lines
4.4 KiB
C#
156 lines
4.4 KiB
C#
using System.Data;
|
||
using System.Text.Json;
|
||
using Confluent.Kafka;
|
||
using Elastic.Clients.Elasticsearch;
|
||
using Npgsql;
|
||
|
||
namespace GeoPulse_Pipeline;
|
||
|
||
public class KafkaConsumer : BackgroundService
|
||
{
|
||
private readonly ILogger<KafkaConsumer> _logger;
|
||
private readonly NpgsqlDataSource _dataSource;
|
||
private readonly string _kafkaHost;
|
||
private readonly ElasticsearchClient _esClient;
|
||
|
||
private const int BatchSize = 1000;
|
||
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
|
||
|
||
|
||
public KafkaConsumer(
|
||
ILogger<KafkaConsumer> logger,
|
||
IConfiguration configuration,
|
||
NpgsqlDataSource dataSource,
|
||
ElasticsearchClient esClient)
|
||
{
|
||
_logger = logger;
|
||
_kafkaHost = configuration["KafkaHost"];
|
||
_dataSource = dataSource;
|
||
_esClient = esClient;
|
||
}
|
||
|
||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||
{
|
||
await Task.Yield();
|
||
|
||
var consumerConfig = new ConsumerConfig
|
||
{
|
||
BootstrapServers = _kafkaHost,
|
||
GroupId = "telemetry-db-writer-group",
|
||
AutoOffsetReset = AutoOffsetReset.Earliest,
|
||
EnableAutoCommit = false
|
||
};
|
||
|
||
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
|
||
.SetErrorHandler((_, e) => _logger.LogError($"Kafka 底層錯誤: {e.Reason}"))
|
||
.Build();
|
||
|
||
consumer.Subscribe("telemetry-events");
|
||
|
||
var buffer = new List<ConsumeResult<Ignore, string>>();
|
||
var lastBatchTime = DateTime.UtcNow;
|
||
|
||
_logger.LogInformation("Consumer 已啟動,開始監控 Kafka 訊息...");
|
||
|
||
while (!stoppingToken.IsCancellationRequested)
|
||
{
|
||
try
|
||
{
|
||
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
|
||
|
||
if (consumeResult != null)
|
||
{
|
||
buffer.Add(consumeResult);
|
||
}
|
||
|
||
bool isBatchFull = buffer.Count >= BatchSize;
|
||
bool isTimeUp = buffer.Count > 0 && (DateTime.UtcNow - lastBatchTime) >= MaxWaitTime;
|
||
|
||
if (isBatchFull || isTimeUp)
|
||
{
|
||
_logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料...");
|
||
|
||
await BulkInsertToDatabaseAsync(buffer, stoppingToken);
|
||
await BulkInsertToElasticsearchAsync(buffer, stoppingToken);
|
||
|
||
var offsets = buffer.GroupBy(msg => msg.TopicPartition)
|
||
.Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList();
|
||
consumer.Commit(offsets);
|
||
|
||
_logger.LogInformation($"Kafka已寫入DB {buffer.Count} 筆資料至 PostgreSQL。");
|
||
|
||
buffer.Clear();
|
||
lastBatchTime = DateTime.UtcNow;
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, "Kafka寫入DB批次處理發生錯誤!原因: {Message}", ex.Message);
|
||
|
||
buffer.Clear();
|
||
lastBatchTime = DateTime.UtcNow;
|
||
|
||
await Task.Delay(2000, stoppingToken);
|
||
}
|
||
}
|
||
}
|
||
|
||
private async Task BulkInsertToDatabaseAsync(List<ConsumeResult<Ignore, string>> buffer,
|
||
CancellationToken stoppingToken)
|
||
{
|
||
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
||
|
||
await using var batch = _dataSource.CreateBatch();
|
||
|
||
foreach (var msg in buffer)
|
||
{
|
||
var data = JsonSerializer.Deserialize<TelemetryRequest>(msg.Message.Value, jsonOptions);
|
||
if (data == null) continue;
|
||
|
||
var cmd = batch.CreateBatchCommand();
|
||
cmd.CommandText =
|
||
@"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)";
|
||
|
||
cmd.Parameters.AddWithValue(data.DeviceId ?? string.Empty);
|
||
cmd.Parameters.AddWithValue(data.Lng);
|
||
cmd.Parameters.AddWithValue(data.Lat);
|
||
cmd.Parameters.AddWithValue(data.Timestamp);
|
||
|
||
batch.BatchCommands.Add(cmd);
|
||
}
|
||
|
||
await batch.ExecuteNonQueryAsync(stoppingToken);
|
||
}
|
||
|
||
private async Task BulkInsertToElasticsearchAsync(List<ConsumeResult<Ignore, string>> buffer,
|
||
CancellationToken stoppingToken)
|
||
{
|
||
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
||
|
||
var documents = new List<object>();
|
||
|
||
foreach (var msg in buffer)
|
||
{
|
||
var data = JsonSerializer.Deserialize<TelemetryRequest>(msg.Message.Value, jsonOptions);
|
||
if (data == null) continue;
|
||
|
||
documents.Add(new
|
||
{
|
||
deviceId = data.DeviceId,
|
||
timestamp = data.Timestamp,
|
||
location = new { lat = data.Lat, lon = data.Lng }
|
||
});
|
||
}
|
||
|
||
var indexName = $"geopulse-telemetry-{DateTime.UtcNow:yyyy.MM.dd}";
|
||
|
||
var response = await _esClient.BulkAsync(b => b
|
||
.Index(indexName)
|
||
.CreateMany(documents), stoppingToken);
|
||
|
||
if (!response.IsValidResponse)
|
||
{
|
||
_logger.LogError("寫入 Elasticsearch 失敗: {Error}", response.DebugInformation);
|
||
}
|
||
}
|
||
} |