Compare commits

..

3 Commits

Author SHA1 Message Date
saingchildren
33606ca9eb fix: 解決kafka會遺漏commit已處理的meesage的問題 2026-04-30 10:09:12 +08:00
saingchildren
33638bdd36 fix: let Dapper don't show sql command in terminal 2026-04-30 09:25:01 +08:00
saingchildren
bedaaa800f feat: using dapper operate db 2026-04-29 16:57:06 +08:00
4 changed files with 74 additions and 69 deletions

View File

@ -12,6 +12,7 @@
<PackageReference Include="Dapper" Version="2.1.72" /> <PackageReference Include="Dapper" Version="2.1.72" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.5" /> <PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.5" />
<PackageReference Include="Npgsql" Version="10.0.2" /> <PackageReference Include="Npgsql" Version="10.0.2" />
<PackageReference Include="Npgsql.DependencyInjection" Version="10.0.2" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" /> <PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
</ItemGroup> </ItemGroup>

View File

@ -1,3 +1,4 @@
using System.Data;
using System.Text.Json; using System.Text.Json;
using Confluent.Kafka; using Confluent.Kafka;
using Npgsql; using Npgsql;
@ -7,18 +8,17 @@ namespace GeoPulse_Pipeline;
public class KafkaConsumer : BackgroundService public class KafkaConsumer : BackgroundService
{ {
private readonly ILogger<KafkaConsumer> _logger; private readonly ILogger<KafkaConsumer> _logger;
private readonly string _connectionString; private readonly NpgsqlDataSource _dataSource;
private readonly string _kafkaHost; private readonly string _kafkaHost;
private const int BatchSize = 1000; private const int BatchSize = 1000;
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2); private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
public KafkaConsumer(ILogger<KafkaConsumer> logger, IConfiguration configuration) public KafkaConsumer(ILogger<KafkaConsumer> logger, IConfiguration configuration, NpgsqlDataSource dataSource)
{ {
_logger = logger; _logger = logger;
_connectionString = configuration.GetConnectionString("DefaultConnection")
?? throw new ArgumentNullException("資料庫連線字串未設定");
_kafkaHost = configuration["KafkaHost"]; _kafkaHost = configuration["KafkaHost"];
_dataSource = dataSource;
} }
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
@ -28,20 +28,25 @@ public class KafkaConsumer : BackgroundService
var consumerConfig = new ConsumerConfig var consumerConfig = new ConsumerConfig
{ {
BootstrapServers = _kafkaHost, BootstrapServers = _kafkaHost,
GroupId = "telemetry-db-writer-group", GroupId = "telemetry-db-writer-group", // 💡 建議換個名字,強迫 Kafka 重置 Offset
AutoOffsetReset = AutoOffsetReset.Earliest, AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false EnableAutoCommit = false
}; };
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build(); using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
.SetErrorHandler((_, e) => _logger.LogError($"Kafka 底層錯誤: {e.Reason}"))
.Build();
consumer.Subscribe("telemetry-events"); consumer.Subscribe("telemetry-events");
var buffer = new List<ConsumeResult<Ignore, string>>(); var buffer = new List<ConsumeResult<Ignore, string>>();
var lastBatchTime = DateTime.UtcNow; var lastBatchTime = DateTime.UtcNow;
try _logger.LogInformation("🚀 Consumer 已啟動,開始監控 Kafka 訊息...");
while (!stoppingToken.IsCancellationRequested)
{ {
while (!stoppingToken.IsCancellationRequested) try // 💡 關鍵修正try 放在 while 裡面
{ {
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500)); var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
@ -55,27 +60,32 @@ public class KafkaConsumer : BackgroundService
if (isBatchFull || isTimeUp) if (isBatchFull || isTimeUp)
{ {
_logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料...");
await BulkInsertToDatabaseAsync(buffer, stoppingToken); await BulkInsertToDatabaseAsync(buffer, stoppingToken);
var latestOffset = buffer.Last(); var offsets = buffer.GroupBy(msg => msg.TopicPartition)
consumer.Commit(latestOffset); .Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList();
consumer.Commit(offsets);
_logger.LogInformation($"成功批次寫入 {buffer.Count} 筆資料至 PostgreSQL。"); _logger.LogInformation($"✅ [成功] 已寫入 {buffer.Count} 筆資料至 PostgreSQL。");
buffer.Clear(); buffer.Clear();
lastBatchTime = DateTime.UtcNow; lastBatchTime = DateTime.UtcNow;
} }
} }
} catch (Exception ex)
catch (Exception ex) {
{ // 💡 就算這裡噴錯,程式也只是印出 Log然後回到 while 頂部繼續跑下一輪
_logger.LogError($"發生錯誤: {ex.Message} \n {ex.StackTrace}"); _logger.LogError(ex, "❌ [失敗] 批次處理發生錯誤!原因: {Message}", ex.Message);
buffer.Clear(); // 發生錯誤後,為了避免壞資料卡死 buffer 導致無窮重試,這裡建議清空
} buffer.Clear();
finally lastBatchTime = DateTime.UtcNow;
{
consumer.Close(); // 稍微停 2 秒,避免在連續錯誤時瘋狂刷屏
await Task.Delay(2000, stoppingToken);
}
} }
} }
@ -83,8 +93,7 @@ public class KafkaConsumer : BackgroundService
CancellationToken stoppingToken) CancellationToken stoppingToken)
{ {
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
await using var dataSource = NpgsqlDataSource.Create(_connectionString); await using var batch = _dataSource.CreateBatch();
await using var batch = dataSource.CreateBatch();
foreach (var msg in buffer) foreach (var msg in buffer)
{ {

View File

@ -1,5 +1,7 @@
using System.Data;
using System.Text.Json; using System.Text.Json;
using Confluent.Kafka; using Confluent.Kafka;
using Dapper;
using GeoPulse_Pipeline; using GeoPulse_Pipeline;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Npgsql; using Npgsql;
@ -12,12 +14,14 @@ var producerConfig = new ProducerConfig()
Acks = Acks.Leader Acks = Acks.Leader
}; };
var connString = builder.Configuration.GetConnectionString("DefaultConnection");
var kafkaProducer = new ProducerBuilder<Null, string>(producerConfig).Build(); var kafkaProducer = new ProducerBuilder<Null, string>(producerConfig).Build();
builder.Services.AddSingleton(kafkaProducer); builder.Services.AddSingleton(kafkaProducer);
builder.Services.AddEndpointsApiExplorer(); builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen(); builder.Services.AddSwaggerGen();
builder.Services.AddHostedService<KafkaConsumer>(); builder.Services.AddHostedService<KafkaConsumer>();
builder.Services.AddNpgsqlDataSource(connString);
builder.Services.AddScoped<IDbConnection>(sp => sp.GetRequiredService<NpgsqlDataSource>().CreateConnection());
var app = builder.Build(); var app = builder.Build();
@ -30,54 +34,44 @@ if (app.Environment.IsDevelopment())
app.UseHttpsRedirection(); app.UseHttpsRedirection();
app.MapPost("/api/telemetry", async ( app.MapPost("/api/telemetry", async (
[FromBody] TelemetryRequest request, [FromBody] TelemetryRequest request,
[FromQuery] bool useKafka, [FromQuery] bool useKafka,
IProducer<Null, string> producer, IProducer<Null, string> producer,
IConfiguration config) => IDbConnection dbConnection) =>
{ {
if (string.IsNullOrWhiteSpace(request.DeviceId)) if (string.IsNullOrWhiteSpace(request.DeviceId))
return Results.BadRequest("Device ID is required."); return Results.BadRequest("Device ID is required.");
if (useKafka) if (useKafka)
{ {
var message = JsonSerializer.Serialize(request); var message = JsonSerializer.Serialize(request);
try try
{ {
await producer.ProduceAsync("telemetry-events", new Message<Null, string> { Value = message }); await producer.ProduceAsync("telemetry-events", new Message<Null, string> { Value = message });
return Results.Ok(new { Status = "Success", Route = "Kafka", Message = "已推入消息佇列" }); return Results.Ok(new { Status = "Success", Route = "Kafka", Message = "已推入消息佇列" });
} }
catch (ProduceException<Null, string> ex) catch (ProduceException<Null, string> ex)
{ {
Console.WriteLine($"Kafka Delivery failed: {ex.Error.Reason}"); Console.WriteLine($"Kafka Delivery failed: {ex.Error.Reason}");
return Results.Problem("Kafka 寫入失敗"); return Results.Problem("Kafka 寫入失敗");
} }
} }
else else
{ {
try try
{ {
var connString = config.GetConnectionString("DefaultConnection"); string sql =
@"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), @DeviceId, @Lng, @Lat, @Timestamp";
await using var dataSource = NpgsqlDataSource.Create(connString!); await dbConnection.ExecuteAsync(sql, request);
await using var cmd = dataSource.CreateCommand(@"
INSERT INTO telemetry_table (id, device_id, geom, timestamp)
VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)");
cmd.Parameters.AddWithValue(request.DeviceId); return Results.Ok(new { Status = "Success", Route = "Direct-DB", Message = "已直接寫入資料庫" });
cmd.Parameters.AddWithValue(request.Lng); }
cmd.Parameters.AddWithValue(request.Lat); catch (Exception ex)
cmd.Parameters.AddWithValue(request.Timestamp); {
Console.WriteLine($"DB Insert failed: {ex.Message}");
await cmd.ExecuteNonQueryAsync(); return Results.Problem("資料庫寫入失敗");
}
return Results.Ok(new { Status = "Success", Route = "Direct-DB", Message = "已直接寫入資料庫" }); }
}
catch (Exception ex)
{
Console.WriteLine($"DB Insert failed: {ex.Message}");
return Results.Problem("資料庫寫入失敗");
}
}
}); });
app.Lifetime.ApplicationStopping.Register(() => app.Lifetime.ApplicationStopping.Register(() =>

View File

@ -5,7 +5,8 @@
"Logging": { "Logging": {
"LogLevel": { "LogLevel": {
"Default": "Information", "Default": "Information",
"Microsoft.AspNetCore": "Warning" "Microsoft.AspNetCore": "Warning",
"Npgsql": "Warning"
} }
}, },
"AllowedHosts": "*", "AllowedHosts": "*",