From bedaaa800f543ad350fc78d07ce683b4bb644610 Mon Sep 17 00:00:00 2001 From: saingchildren <80457007+saingchildren@users.noreply.github.com> Date: Wed, 29 Apr 2026 16:57:06 +0800 Subject: [PATCH] feat: using dapper operate db --- GeoPulse Pipeline/GeoPulse Pipeline.csproj | 1 + GeoPulse Pipeline/KafkaConsumer.cs | 48 +++++++----- GeoPulse Pipeline/Program.cs | 86 ++++++++++------------ 3 files changed, 69 insertions(+), 66 deletions(-) diff --git a/GeoPulse Pipeline/GeoPulse Pipeline.csproj b/GeoPulse Pipeline/GeoPulse Pipeline.csproj index eddf4fe..d845a22 100644 --- a/GeoPulse Pipeline/GeoPulse Pipeline.csproj +++ b/GeoPulse Pipeline/GeoPulse Pipeline.csproj @@ -12,6 +12,7 @@ + diff --git a/GeoPulse Pipeline/KafkaConsumer.cs b/GeoPulse Pipeline/KafkaConsumer.cs index 49bcc4d..cf26d4c 100644 --- a/GeoPulse Pipeline/KafkaConsumer.cs +++ b/GeoPulse Pipeline/KafkaConsumer.cs @@ -1,3 +1,4 @@ +using System.Data; using System.Text.Json; using Confluent.Kafka; using Npgsql; @@ -7,18 +8,17 @@ namespace GeoPulse_Pipeline; public class KafkaConsumer : BackgroundService { private readonly ILogger _logger; - private readonly string _connectionString; + private readonly NpgsqlDataSource _dataSource; private readonly string _kafkaHost; private const int BatchSize = 1000; private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2); - public KafkaConsumer(ILogger logger, IConfiguration configuration) + public KafkaConsumer(ILogger logger, IConfiguration configuration, NpgsqlDataSource dataSource) { _logger = logger; - _connectionString = configuration.GetConnectionString("DefaultConnection") - ?? throw new ArgumentNullException("資料庫連線字串未設定"); _kafkaHost = configuration["KafkaHost"]; + _dataSource = dataSource; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -28,20 +28,25 @@ public class KafkaConsumer : BackgroundService var consumerConfig = new ConsumerConfig { BootstrapServers = _kafkaHost, - GroupId = "telemetry-db-writer-group", + GroupId = "telemetry-db-writer-group", // 💡 建議換個名字,強迫 Kafka 重置 Offset AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false }; - using var consumer = new ConsumerBuilder(consumerConfig).Build(); + using var consumer = new ConsumerBuilder(consumerConfig) + .SetErrorHandler((_, e) => _logger.LogError($"Kafka 底層錯誤: {e.Reason}")) + .Build(); + consumer.Subscribe("telemetry-events"); var buffer = new List>(); var lastBatchTime = DateTime.UtcNow; - try + _logger.LogInformation("🚀 Consumer 已啟動,開始監控 Kafka 訊息..."); + + while (!stoppingToken.IsCancellationRequested) { - while (!stoppingToken.IsCancellationRequested) + try // 💡 關鍵修正:try 放在 while 裡面 { var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500)); @@ -55,27 +60,31 @@ public class KafkaConsumer : BackgroundService if (isBatchFull || isTimeUp) { + _logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料..."); + await BulkInsertToDatabaseAsync(buffer, stoppingToken); var latestOffset = buffer.Last(); consumer.Commit(latestOffset); - _logger.LogInformation($"成功批次寫入 {buffer.Count} 筆資料至 PostgreSQL。"); + _logger.LogInformation($"✅ [成功] 已寫入 {buffer.Count} 筆資料至 PostgreSQL。"); buffer.Clear(); lastBatchTime = DateTime.UtcNow; } } - } - catch (Exception ex) - { - _logger.LogError($"發生錯誤: {ex.Message} \n {ex.StackTrace}"); + catch (Exception ex) + { + // 💡 就算這裡噴錯,程式也只是印出 Log,然後回到 while 頂部繼續跑下一輪 + _logger.LogError(ex, "❌ [失敗] 批次處理發生錯誤!原因: {Message}", ex.Message); - buffer.Clear(); - } - finally - { - consumer.Close(); + // 發生錯誤後,為了避免壞資料卡死 buffer 導致無窮重試,這裡建議清空 + buffer.Clear(); + lastBatchTime = DateTime.UtcNow; + + // 稍微停 2 秒,避免在連續錯誤時瘋狂刷屏 + await Task.Delay(2000, stoppingToken); + } } } @@ -83,8 +92,7 @@ public class KafkaConsumer : BackgroundService CancellationToken stoppingToken) { var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; - await using var dataSource = NpgsqlDataSource.Create(_connectionString); - await using var batch = dataSource.CreateBatch(); + await using var batch = _dataSource.CreateBatch(); foreach (var msg in buffer) { diff --git a/GeoPulse Pipeline/Program.cs b/GeoPulse Pipeline/Program.cs index dbaaa4d..adfd00f 100644 --- a/GeoPulse Pipeline/Program.cs +++ b/GeoPulse Pipeline/Program.cs @@ -1,5 +1,7 @@ +using System.Data; using System.Text.Json; using Confluent.Kafka; +using Dapper; using GeoPulse_Pipeline; using Microsoft.AspNetCore.Mvc; using Npgsql; @@ -12,12 +14,14 @@ var producerConfig = new ProducerConfig() Acks = Acks.Leader }; - +var connString = builder.Configuration.GetConnectionString("DefaultConnection"); var kafkaProducer = new ProducerBuilder(producerConfig).Build(); builder.Services.AddSingleton(kafkaProducer); builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); builder.Services.AddHostedService(); +builder.Services.AddNpgsqlDataSource(connString); +builder.Services.AddScoped(sp => sp.GetRequiredService().CreateConnection()); var app = builder.Build(); @@ -30,54 +34,44 @@ if (app.Environment.IsDevelopment()) app.UseHttpsRedirection(); app.MapPost("/api/telemetry", async ( - [FromBody] TelemetryRequest request, - [FromQuery] bool useKafka, - IProducer producer, - IConfiguration config) => + [FromBody] TelemetryRequest request, + [FromQuery] bool useKafka, + IProducer producer, + IDbConnection dbConnection) => { - if (string.IsNullOrWhiteSpace(request.DeviceId)) - return Results.BadRequest("Device ID is required."); + if (string.IsNullOrWhiteSpace(request.DeviceId)) + return Results.BadRequest("Device ID is required."); - if (useKafka) - { - var message = JsonSerializer.Serialize(request); - try - { - await producer.ProduceAsync("telemetry-events", new Message { Value = message }); - return Results.Ok(new { Status = "Success", Route = "Kafka", Message = "已推入消息佇列" }); - } - catch (ProduceException ex) - { - Console.WriteLine($"Kafka Delivery failed: {ex.Error.Reason}"); - return Results.Problem("Kafka 寫入失敗"); - } - } - else - { - try - { - var connString = config.GetConnectionString("DefaultConnection"); - - await using var dataSource = NpgsqlDataSource.Create(connString!); - await using var cmd = dataSource.CreateCommand(@" - INSERT INTO telemetry_table (id, device_id, geom, timestamp) - VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)"); + if (useKafka) + { + var message = JsonSerializer.Serialize(request); + try + { + await producer.ProduceAsync("telemetry-events", new Message { Value = message }); + return Results.Ok(new { Status = "Success", Route = "Kafka", Message = "已推入消息佇列" }); + } + catch (ProduceException ex) + { + Console.WriteLine($"Kafka Delivery failed: {ex.Error.Reason}"); + return Results.Problem("Kafka 寫入失敗"); + } + } + else + { + try + { + string sql = + @"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), @DeviceId, @Lng, @Lat, @Timestamp"; + await dbConnection.ExecuteAsync(sql, request); - cmd.Parameters.AddWithValue(request.DeviceId); - cmd.Parameters.AddWithValue(request.Lng); - cmd.Parameters.AddWithValue(request.Lat); - cmd.Parameters.AddWithValue(request.Timestamp); - - await cmd.ExecuteNonQueryAsync(); - - return Results.Ok(new { Status = "Success", Route = "Direct-DB", Message = "已直接寫入資料庫" }); - } - catch (Exception ex) - { - Console.WriteLine($"DB Insert failed: {ex.Message}"); - return Results.Problem("資料庫寫入失敗"); - } - } + return Results.Ok(new { Status = "Success", Route = "Direct-DB", Message = "已直接寫入資料庫" }); + } + catch (Exception ex) + { + Console.WriteLine($"DB Insert failed: {ex.Message}"); + return Results.Problem("資料庫寫入失敗"); + } + } }); app.Lifetime.ApplicationStopping.Register(() =>