diff --git a/GeoPulse Pipeline/GeoPulse Pipeline.csproj b/GeoPulse Pipeline/GeoPulse Pipeline.csproj
index d845a22..391d8b4 100644
--- a/GeoPulse Pipeline/GeoPulse Pipeline.csproj
+++ b/GeoPulse Pipeline/GeoPulse Pipeline.csproj
@@ -13,6 +13,8 @@
+
+
diff --git a/GeoPulse Pipeline/KafkaConsumer.cs b/GeoPulse Pipeline/KafkaConsumer.cs
index cd3b096..0466806 100644
--- a/GeoPulse Pipeline/KafkaConsumer.cs
+++ b/GeoPulse Pipeline/KafkaConsumer.cs
@@ -46,7 +46,7 @@ public class KafkaConsumer : BackgroundService
while (!stoppingToken.IsCancellationRequested)
{
- try // 💡 關鍵修正:try 放在 while 裡面
+ try
{
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
@@ -68,7 +68,7 @@ public class KafkaConsumer : BackgroundService
.Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList();
consumer.Commit(offsets);
- _logger.LogInformation($"✅ [成功] 已寫入 {buffer.Count} 筆資料至 PostgreSQL。");
+ _logger.LogInformation($"Kafka已寫入DB {buffer.Count} 筆資料至 PostgreSQL。");
buffer.Clear();
lastBatchTime = DateTime.UtcNow;
@@ -76,14 +76,11 @@ public class KafkaConsumer : BackgroundService
}
catch (Exception ex)
{
- // 💡 就算這裡噴錯,程式也只是印出 Log,然後回到 while 頂部繼續跑下一輪
- _logger.LogError(ex, "❌ [失敗] 批次處理發生錯誤!原因: {Message}", ex.Message);
+ _logger.LogError(ex, "Kafka寫入DB批次處理發生錯誤!原因: {Message}", ex.Message);
- // 發生錯誤後,為了避免壞資料卡死 buffer 導致無窮重試,這裡建議清空
buffer.Clear();
lastBatchTime = DateTime.UtcNow;
- // 稍微停 2 秒,避免在連續錯誤時瘋狂刷屏
await Task.Delay(2000, stoppingToken);
}
}
diff --git a/GeoPulse Pipeline/Program.cs b/GeoPulse Pipeline/Program.cs
index adfd00f..9941d53 100644
--- a/GeoPulse Pipeline/Program.cs
+++ b/GeoPulse Pipeline/Program.cs
@@ -5,8 +5,27 @@ using Dapper;
using GeoPulse_Pipeline;
using Microsoft.AspNetCore.Mvc;
using Npgsql;
+using Serilog;
+using Serilog.Sinks.Elasticsearch;
+
+Log.Logger = new LoggerConfiguration()
+ .MinimumLevel.Information()
+ .MinimumLevel.Override("Microsoft", Serilog.Events.LogEventLevel.Warning)
+ .MinimumLevel.Override("System.Data", Serilog.Events.LogEventLevel.Warning)
+ .MinimumLevel.Override("Npgsql", Serilog.Events.LogEventLevel.Warning)
+ .Enrich.FromLogContext()
+ .WriteTo.Console()
+ .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri("http://192.168.0.110:9200"))
+ {
+ AutoRegisterTemplate = true,
+ AutoRegisterTemplateVersion = AutoRegisterTemplateVersion.ESv8,
+ IndexFormat = "geopulse-logs-{0:yyyy.MM.dd}",
+ InlineFields = true
+ })
+ .CreateLogger();
var builder = WebApplication.CreateBuilder(args);
+builder.Host.UseSerilog();
var kafkaHost = builder.Configuration.GetValue("KafkaHost");
var producerConfig = new ProducerConfig()
{
@@ -37,10 +56,14 @@ app.MapPost("/api/telemetry", async (
[FromBody] TelemetryRequest request,
[FromQuery] bool useKafka,
IProducer producer,
- IDbConnection dbConnection) =>
+ IDbConnection dbConnection,
+ ILogger logger) =>
{
if (string.IsNullOrWhiteSpace(request.DeviceId))
+ {
+ logger.LogWarning("拒絕接收資料:DeviceID為空。");
return Results.BadRequest("Device ID is required.");
+ }
if (useKafka)
{
@@ -52,7 +75,7 @@ app.MapPost("/api/telemetry", async (
}
catch (ProduceException ex)
{
- Console.WriteLine($"Kafka Delivery failed: {ex.Error.Reason}");
+ logger.LogError(ex, "Kafka 寫入失敗: {Reason}", ex.Error.Reason);
return Results.Problem("Kafka 寫入失敗");
}
}
@@ -68,7 +91,7 @@ app.MapPost("/api/telemetry", async (
}
catch (Exception ex)
{
- Console.WriteLine($"DB Insert failed: {ex.Message}");
+ logger.LogError(ex, "資料庫直接寫入失敗: {Message}", ex.Message);
return Results.Problem("資料庫寫入失敗");
}
}