Compare commits
No commits in common. "33606ca9eb26ff21f8a5cd58425ce782867479bc" and "1936795778ee352549eabda07fa0534150af8b7c" have entirely different histories.
33606ca9eb
...
1936795778
@ -12,7 +12,6 @@
|
|||||||
<PackageReference Include="Dapper" Version="2.1.72" />
|
<PackageReference Include="Dapper" Version="2.1.72" />
|
||||||
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.5" />
|
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.5" />
|
||||||
<PackageReference Include="Npgsql" Version="10.0.2" />
|
<PackageReference Include="Npgsql" Version="10.0.2" />
|
||||||
<PackageReference Include="Npgsql.DependencyInjection" Version="10.0.2" />
|
|
||||||
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
|
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
|||||||
@ -1,4 +1,3 @@
|
|||||||
using System.Data;
|
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
using Npgsql;
|
using Npgsql;
|
||||||
@ -8,17 +7,18 @@ namespace GeoPulse_Pipeline;
|
|||||||
public class KafkaConsumer : BackgroundService
|
public class KafkaConsumer : BackgroundService
|
||||||
{
|
{
|
||||||
private readonly ILogger<KafkaConsumer> _logger;
|
private readonly ILogger<KafkaConsumer> _logger;
|
||||||
private readonly NpgsqlDataSource _dataSource;
|
private readonly string _connectionString;
|
||||||
private readonly string _kafkaHost;
|
private readonly string _kafkaHost;
|
||||||
|
|
||||||
private const int BatchSize = 1000;
|
private const int BatchSize = 1000;
|
||||||
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
|
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
|
||||||
|
|
||||||
public KafkaConsumer(ILogger<KafkaConsumer> logger, IConfiguration configuration, NpgsqlDataSource dataSource)
|
public KafkaConsumer(ILogger<KafkaConsumer> logger, IConfiguration configuration)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
_connectionString = configuration.GetConnectionString("DefaultConnection")
|
||||||
|
?? throw new ArgumentNullException("資料庫連線字串未設定");
|
||||||
_kafkaHost = configuration["KafkaHost"];
|
_kafkaHost = configuration["KafkaHost"];
|
||||||
_dataSource = dataSource;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
@ -28,25 +28,20 @@ public class KafkaConsumer : BackgroundService
|
|||||||
var consumerConfig = new ConsumerConfig
|
var consumerConfig = new ConsumerConfig
|
||||||
{
|
{
|
||||||
BootstrapServers = _kafkaHost,
|
BootstrapServers = _kafkaHost,
|
||||||
GroupId = "telemetry-db-writer-group", // 💡 建議換個名字,強迫 Kafka 重置 Offset
|
GroupId = "telemetry-db-writer-group",
|
||||||
AutoOffsetReset = AutoOffsetReset.Earliest,
|
AutoOffsetReset = AutoOffsetReset.Earliest,
|
||||||
EnableAutoCommit = false
|
EnableAutoCommit = false
|
||||||
};
|
};
|
||||||
|
|
||||||
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
|
using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
|
||||||
.SetErrorHandler((_, e) => _logger.LogError($"Kafka 底層錯誤: {e.Reason}"))
|
|
||||||
.Build();
|
|
||||||
|
|
||||||
consumer.Subscribe("telemetry-events");
|
consumer.Subscribe("telemetry-events");
|
||||||
|
|
||||||
var buffer = new List<ConsumeResult<Ignore, string>>();
|
var buffer = new List<ConsumeResult<Ignore, string>>();
|
||||||
var lastBatchTime = DateTime.UtcNow;
|
var lastBatchTime = DateTime.UtcNow;
|
||||||
|
|
||||||
_logger.LogInformation("🚀 Consumer 已啟動,開始監控 Kafka 訊息...");
|
try
|
||||||
|
|
||||||
while (!stoppingToken.IsCancellationRequested)
|
|
||||||
{
|
{
|
||||||
try // 💡 關鍵修正:try 放在 while 裡面
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
|
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
|
||||||
|
|
||||||
@ -60,32 +55,27 @@ public class KafkaConsumer : BackgroundService
|
|||||||
|
|
||||||
if (isBatchFull || isTimeUp)
|
if (isBatchFull || isTimeUp)
|
||||||
{
|
{
|
||||||
_logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料...");
|
|
||||||
|
|
||||||
await BulkInsertToDatabaseAsync(buffer, stoppingToken);
|
await BulkInsertToDatabaseAsync(buffer, stoppingToken);
|
||||||
|
|
||||||
var offsets = buffer.GroupBy(msg => msg.TopicPartition)
|
var latestOffset = buffer.Last();
|
||||||
.Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList();
|
consumer.Commit(latestOffset);
|
||||||
consumer.Commit(offsets);
|
|
||||||
|
|
||||||
_logger.LogInformation($"✅ [成功] 已寫入 {buffer.Count} 筆資料至 PostgreSQL。");
|
_logger.LogInformation($"成功批次寫入 {buffer.Count} 筆資料至 PostgreSQL。");
|
||||||
|
|
||||||
buffer.Clear();
|
buffer.Clear();
|
||||||
lastBatchTime = DateTime.UtcNow;
|
lastBatchTime = DateTime.UtcNow;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
}
|
||||||
{
|
catch (Exception ex)
|
||||||
// 💡 就算這裡噴錯,程式也只是印出 Log,然後回到 while 頂部繼續跑下一輪
|
{
|
||||||
_logger.LogError(ex, "❌ [失敗] 批次處理發生錯誤!原因: {Message}", ex.Message);
|
_logger.LogError($"發生錯誤: {ex.Message} \n {ex.StackTrace}");
|
||||||
|
|
||||||
// 發生錯誤後,為了避免壞資料卡死 buffer 導致無窮重試,這裡建議清空
|
buffer.Clear();
|
||||||
buffer.Clear();
|
}
|
||||||
lastBatchTime = DateTime.UtcNow;
|
finally
|
||||||
|
{
|
||||||
// 稍微停 2 秒,避免在連續錯誤時瘋狂刷屏
|
consumer.Close();
|
||||||
await Task.Delay(2000, stoppingToken);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,7 +83,8 @@ public class KafkaConsumer : BackgroundService
|
|||||||
CancellationToken stoppingToken)
|
CancellationToken stoppingToken)
|
||||||
{
|
{
|
||||||
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
||||||
await using var batch = _dataSource.CreateBatch();
|
await using var dataSource = NpgsqlDataSource.Create(_connectionString);
|
||||||
|
await using var batch = dataSource.CreateBatch();
|
||||||
|
|
||||||
foreach (var msg in buffer)
|
foreach (var msg in buffer)
|
||||||
{
|
{
|
||||||
|
|||||||
@ -1,7 +1,5 @@
|
|||||||
using System.Data;
|
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
using Dapper;
|
|
||||||
using GeoPulse_Pipeline;
|
using GeoPulse_Pipeline;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
using Npgsql;
|
using Npgsql;
|
||||||
@ -14,14 +12,12 @@ var producerConfig = new ProducerConfig()
|
|||||||
Acks = Acks.Leader
|
Acks = Acks.Leader
|
||||||
};
|
};
|
||||||
|
|
||||||
var connString = builder.Configuration.GetConnectionString("DefaultConnection");
|
|
||||||
var kafkaProducer = new ProducerBuilder<Null, string>(producerConfig).Build();
|
var kafkaProducer = new ProducerBuilder<Null, string>(producerConfig).Build();
|
||||||
builder.Services.AddSingleton(kafkaProducer);
|
builder.Services.AddSingleton(kafkaProducer);
|
||||||
builder.Services.AddEndpointsApiExplorer();
|
builder.Services.AddEndpointsApiExplorer();
|
||||||
builder.Services.AddSwaggerGen();
|
builder.Services.AddSwaggerGen();
|
||||||
builder.Services.AddHostedService<KafkaConsumer>();
|
builder.Services.AddHostedService<KafkaConsumer>();
|
||||||
builder.Services.AddNpgsqlDataSource(connString);
|
|
||||||
builder.Services.AddScoped<IDbConnection>(sp => sp.GetRequiredService<NpgsqlDataSource>().CreateConnection());
|
|
||||||
|
|
||||||
var app = builder.Build();
|
var app = builder.Build();
|
||||||
|
|
||||||
@ -34,44 +30,54 @@ if (app.Environment.IsDevelopment())
|
|||||||
app.UseHttpsRedirection();
|
app.UseHttpsRedirection();
|
||||||
|
|
||||||
app.MapPost("/api/telemetry", async (
|
app.MapPost("/api/telemetry", async (
|
||||||
[FromBody] TelemetryRequest request,
|
[FromBody] TelemetryRequest request,
|
||||||
[FromQuery] bool useKafka,
|
[FromQuery] bool useKafka,
|
||||||
IProducer<Null, string> producer,
|
IProducer<Null, string> producer,
|
||||||
IDbConnection dbConnection) =>
|
IConfiguration config) =>
|
||||||
{
|
{
|
||||||
if (string.IsNullOrWhiteSpace(request.DeviceId))
|
if (string.IsNullOrWhiteSpace(request.DeviceId))
|
||||||
return Results.BadRequest("Device ID is required.");
|
return Results.BadRequest("Device ID is required.");
|
||||||
|
|
||||||
if (useKafka)
|
if (useKafka)
|
||||||
{
|
{
|
||||||
var message = JsonSerializer.Serialize(request);
|
var message = JsonSerializer.Serialize(request);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await producer.ProduceAsync("telemetry-events", new Message<Null, string> { Value = message });
|
await producer.ProduceAsync("telemetry-events", new Message<Null, string> { Value = message });
|
||||||
return Results.Ok(new { Status = "Success", Route = "Kafka", Message = "已推入消息佇列" });
|
return Results.Ok(new { Status = "Success", Route = "Kafka", Message = "已推入消息佇列" });
|
||||||
}
|
}
|
||||||
catch (ProduceException<Null, string> ex)
|
catch (ProduceException<Null, string> ex)
|
||||||
{
|
{
|
||||||
Console.WriteLine($"Kafka Delivery failed: {ex.Error.Reason}");
|
Console.WriteLine($"Kafka Delivery failed: {ex.Error.Reason}");
|
||||||
return Results.Problem("Kafka 寫入失敗");
|
return Results.Problem("Kafka 寫入失敗");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
string sql =
|
var connString = config.GetConnectionString("DefaultConnection");
|
||||||
@"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), @DeviceId, @Lng, @Lat, @Timestamp";
|
|
||||||
await dbConnection.ExecuteAsync(sql, request);
|
await using var dataSource = NpgsqlDataSource.Create(connString!);
|
||||||
|
await using var cmd = dataSource.CreateCommand(@"
|
||||||
|
INSERT INTO telemetry_table (id, device_id, geom, timestamp)
|
||||||
|
VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)");
|
||||||
|
|
||||||
return Results.Ok(new { Status = "Success", Route = "Direct-DB", Message = "已直接寫入資料庫" });
|
cmd.Parameters.AddWithValue(request.DeviceId);
|
||||||
}
|
cmd.Parameters.AddWithValue(request.Lng);
|
||||||
catch (Exception ex)
|
cmd.Parameters.AddWithValue(request.Lat);
|
||||||
{
|
cmd.Parameters.AddWithValue(request.Timestamp);
|
||||||
Console.WriteLine($"DB Insert failed: {ex.Message}");
|
|
||||||
return Results.Problem("資料庫寫入失敗");
|
await cmd.ExecuteNonQueryAsync();
|
||||||
}
|
|
||||||
}
|
return Results.Ok(new { Status = "Success", Route = "Direct-DB", Message = "已直接寫入資料庫" });
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"DB Insert failed: {ex.Message}");
|
||||||
|
return Results.Problem("資料庫寫入失敗");
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
app.Lifetime.ApplicationStopping.Register(() =>
|
app.Lifetime.ApplicationStopping.Register(() =>
|
||||||
|
|||||||
@ -5,8 +5,7 @@
|
|||||||
"Logging": {
|
"Logging": {
|
||||||
"LogLevel": {
|
"LogLevel": {
|
||||||
"Default": "Information",
|
"Default": "Information",
|
||||||
"Microsoft.AspNetCore": "Warning",
|
"Microsoft.AspNetCore": "Warning"
|
||||||
"Npgsql": "Warning"
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"AllowedHosts": "*",
|
"AllowedHosts": "*",
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user