feat: add elsaticsearch

This commit is contained in:
saingchildren 2026-05-02 09:55:17 +08:00
parent 069e3cbe96
commit 967020a1c7
3 changed files with 50 additions and 3 deletions

View File

@ -10,6 +10,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.14.0" /> <PackageReference Include="Confluent.Kafka" Version="2.14.0" />
<PackageReference Include="Dapper" Version="2.1.72" /> <PackageReference Include="Dapper" Version="2.1.72" />
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="9.3.6" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.5" /> <PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.5" />
<PackageReference Include="Npgsql" Version="10.0.2" /> <PackageReference Include="Npgsql" Version="10.0.2" />
<PackageReference Include="Npgsql.DependencyInjection" Version="10.0.2" /> <PackageReference Include="Npgsql.DependencyInjection" Version="10.0.2" />

View File

@ -1,6 +1,7 @@
using System.Data; using System.Data;
using System.Text.Json; using System.Text.Json;
using Confluent.Kafka; using Confluent.Kafka;
using Elastic.Clients.Elasticsearch;
using Npgsql; using Npgsql;
namespace GeoPulse_Pipeline; namespace GeoPulse_Pipeline;
@ -10,15 +11,22 @@ public class KafkaConsumer : BackgroundService
private readonly ILogger<KafkaConsumer> _logger; private readonly ILogger<KafkaConsumer> _logger;
private readonly NpgsqlDataSource _dataSource; private readonly NpgsqlDataSource _dataSource;
private readonly string _kafkaHost; private readonly string _kafkaHost;
private readonly ElasticsearchClient _esClient;
private const int BatchSize = 1000; private const int BatchSize = 1000;
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2); private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
public KafkaConsumer(ILogger<KafkaConsumer> logger, IConfiguration configuration, NpgsqlDataSource dataSource)
public KafkaConsumer(
ILogger<KafkaConsumer> logger,
IConfiguration configuration,
NpgsqlDataSource dataSource,
ElasticsearchClient esClient)
{ {
_logger = logger; _logger = logger;
_kafkaHost = configuration["KafkaHost"]; _kafkaHost = configuration["KafkaHost"];
_dataSource = dataSource; _dataSource = dataSource;
_esClient = esClient;
} }
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
@ -28,7 +36,7 @@ public class KafkaConsumer : BackgroundService
var consumerConfig = new ConsumerConfig var consumerConfig = new ConsumerConfig
{ {
BootstrapServers = _kafkaHost, BootstrapServers = _kafkaHost,
GroupId = "telemetry-db-writer-group", // 💡 建議換個名字,強迫 Kafka 重置 Offset GroupId = "telemetry-db-writer-group",
AutoOffsetReset = AutoOffsetReset.Earliest, AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false EnableAutoCommit = false
}; };
@ -42,7 +50,7 @@ public class KafkaConsumer : BackgroundService
var buffer = new List<ConsumeResult<Ignore, string>>(); var buffer = new List<ConsumeResult<Ignore, string>>();
var lastBatchTime = DateTime.UtcNow; var lastBatchTime = DateTime.UtcNow;
_logger.LogInformation("🚀 Consumer 已啟動,開始監控 Kafka 訊息..."); _logger.LogInformation("Consumer 已啟動,開始監控 Kafka 訊息...");
while (!stoppingToken.IsCancellationRequested) while (!stoppingToken.IsCancellationRequested)
{ {
@ -63,6 +71,7 @@ public class KafkaConsumer : BackgroundService
_logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料..."); _logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料...");
await BulkInsertToDatabaseAsync(buffer, stoppingToken); await BulkInsertToDatabaseAsync(buffer, stoppingToken);
await BulkInsertToElasticsearchAsync(buffer, stoppingToken);
var offsets = buffer.GroupBy(msg => msg.TopicPartition) var offsets = buffer.GroupBy(msg => msg.TopicPartition)
.Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList(); .Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList();
@ -90,6 +99,7 @@ public class KafkaConsumer : BackgroundService
CancellationToken stoppingToken) CancellationToken stoppingToken)
{ {
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
await using var batch = _dataSource.CreateBatch(); await using var batch = _dataSource.CreateBatch();
foreach (var msg in buffer) foreach (var msg in buffer)
@ -111,4 +121,36 @@ public class KafkaConsumer : BackgroundService
await batch.ExecuteNonQueryAsync(stoppingToken); await batch.ExecuteNonQueryAsync(stoppingToken);
} }
private async Task BulkInsertToElasticsearchAsync(List<ConsumeResult<Ignore, string>> buffer,
CancellationToken stoppingToken)
{
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
var documents = new List<object>();
foreach (var msg in buffer)
{
var data = JsonSerializer.Deserialize<TelemetryRequest>(msg.Message.Value, jsonOptions);
if (data == null) continue;
documents.Add(new
{
deviceId = data.DeviceId,
timestamp = data.Timestamp,
location = new { lat = data.Lat, lon = data.Lng }
});
}
var indexName = $"geopulse-telemetry-{DateTime.UtcNow:yyyy.MM.dd}";
var response = await _esClient.BulkAsync(b => b
.Index(indexName)
.CreateMany(documents), stoppingToken);
if (!response.IsValidResponse)
{
_logger.LogError("寫入 Elasticsearch 失敗: {Error}", response.DebugInformation);
}
}
} }

View File

@ -7,6 +7,7 @@ using Microsoft.AspNetCore.Mvc;
using Npgsql; using Npgsql;
using Serilog; using Serilog;
using Serilog.Sinks.Elasticsearch; using Serilog.Sinks.Elasticsearch;
using Elastic.Clients.Elasticsearch;
Log.Logger = new LoggerConfiguration() Log.Logger = new LoggerConfiguration()
.MinimumLevel.Information() .MinimumLevel.Information()
@ -26,6 +27,9 @@ Log.Logger = new LoggerConfiguration()
var builder = WebApplication.CreateBuilder(args); var builder = WebApplication.CreateBuilder(args);
builder.Host.UseSerilog(); builder.Host.UseSerilog();
var esSettings = new ElasticsearchClientSettings(new Uri("http://192.168.0.110:9200"));
var esClient = new ElasticsearchClient(esSettings);
builder.Services.AddSingleton(esClient);
var kafkaHost = builder.Configuration.GetValue<string>("KafkaHost"); var kafkaHost = builder.Configuration.GetValue<string>("KafkaHost");
var producerConfig = new ProducerConfig() var producerConfig = new ProducerConfig()
{ {