diff --git a/GeoPulse Pipeline/GeoPulse Pipeline.csproj b/GeoPulse Pipeline/GeoPulse Pipeline.csproj index 391d8b4..1803278 100644 --- a/GeoPulse Pipeline/GeoPulse Pipeline.csproj +++ b/GeoPulse Pipeline/GeoPulse Pipeline.csproj @@ -10,6 +10,7 @@ + diff --git a/GeoPulse Pipeline/KafkaConsumer.cs b/GeoPulse Pipeline/KafkaConsumer.cs index 0466806..1607058 100644 --- a/GeoPulse Pipeline/KafkaConsumer.cs +++ b/GeoPulse Pipeline/KafkaConsumer.cs @@ -1,6 +1,7 @@ using System.Data; using System.Text.Json; using Confluent.Kafka; +using Elastic.Clients.Elasticsearch; using Npgsql; namespace GeoPulse_Pipeline; @@ -10,15 +11,22 @@ public class KafkaConsumer : BackgroundService private readonly ILogger _logger; private readonly NpgsqlDataSource _dataSource; private readonly string _kafkaHost; + private readonly ElasticsearchClient _esClient; private const int BatchSize = 1000; private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2); - public KafkaConsumer(ILogger logger, IConfiguration configuration, NpgsqlDataSource dataSource) + + public KafkaConsumer( + ILogger logger, + IConfiguration configuration, + NpgsqlDataSource dataSource, + ElasticsearchClient esClient) { _logger = logger; _kafkaHost = configuration["KafkaHost"]; _dataSource = dataSource; + _esClient = esClient; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -28,7 +36,7 @@ public class KafkaConsumer : BackgroundService var consumerConfig = new ConsumerConfig { BootstrapServers = _kafkaHost, - GroupId = "telemetry-db-writer-group", // 💡 建議換個名字,強迫 Kafka 重置 Offset + GroupId = "telemetry-db-writer-group", AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false }; @@ -42,7 +50,7 @@ public class KafkaConsumer : BackgroundService var buffer = new List>(); var lastBatchTime = DateTime.UtcNow; - _logger.LogInformation("🚀 Consumer 已啟動,開始監控 Kafka 訊息..."); + _logger.LogInformation("Consumer 已啟動,開始監控 Kafka 訊息..."); while (!stoppingToken.IsCancellationRequested) { @@ -63,6 +71,7 @@ public class KafkaConsumer : BackgroundService _logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料..."); await BulkInsertToDatabaseAsync(buffer, stoppingToken); + await BulkInsertToElasticsearchAsync(buffer, stoppingToken); var offsets = buffer.GroupBy(msg => msg.TopicPartition) .Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList(); @@ -90,6 +99,7 @@ public class KafkaConsumer : BackgroundService CancellationToken stoppingToken) { var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; + await using var batch = _dataSource.CreateBatch(); foreach (var msg in buffer) @@ -111,4 +121,36 @@ public class KafkaConsumer : BackgroundService await batch.ExecuteNonQueryAsync(stoppingToken); } + + private async Task BulkInsertToElasticsearchAsync(List> buffer, + CancellationToken stoppingToken) + { + var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; + + var documents = new List(); + + foreach (var msg in buffer) + { + var data = JsonSerializer.Deserialize(msg.Message.Value, jsonOptions); + if (data == null) continue; + + documents.Add(new + { + deviceId = data.DeviceId, + timestamp = data.Timestamp, + location = new { lat = data.Lat, lon = data.Lng } + }); + } + + var indexName = $"geopulse-telemetry-{DateTime.UtcNow:yyyy.MM.dd}"; + + var response = await _esClient.BulkAsync(b => b + .Index(indexName) + .CreateMany(documents), stoppingToken); + + if (!response.IsValidResponse) + { + _logger.LogError("寫入 Elasticsearch 失敗: {Error}", response.DebugInformation); + } + } } \ No newline at end of file diff --git a/GeoPulse Pipeline/Program.cs b/GeoPulse Pipeline/Program.cs index 9941d53..1977f43 100644 --- a/GeoPulse Pipeline/Program.cs +++ b/GeoPulse Pipeline/Program.cs @@ -7,6 +7,7 @@ using Microsoft.AspNetCore.Mvc; using Npgsql; using Serilog; using Serilog.Sinks.Elasticsearch; +using Elastic.Clients.Elasticsearch; Log.Logger = new LoggerConfiguration() .MinimumLevel.Information() @@ -26,6 +27,9 @@ Log.Logger = new LoggerConfiguration() var builder = WebApplication.CreateBuilder(args); builder.Host.UseSerilog(); +var esSettings = new ElasticsearchClientSettings(new Uri("http://192.168.0.110:9200")); +var esClient = new ElasticsearchClient(esSettings); +builder.Services.AddSingleton(esClient); var kafkaHost = builder.Configuration.GetValue("KafkaHost"); var producerConfig = new ProducerConfig() {