Compare commits
No commits in common. "967020a1c762efbdf563d514add35310d6eae230" and "33606ca9eb26ff21f8a5cd58425ce782867479bc" have entirely different histories.
967020a1c7
...
33606ca9eb
@ -10,12 +10,9 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Confluent.Kafka" Version="2.14.0" />
|
<PackageReference Include="Confluent.Kafka" Version="2.14.0" />
|
||||||
<PackageReference Include="Dapper" Version="2.1.72" />
|
<PackageReference Include="Dapper" Version="2.1.72" />
|
||||||
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="9.3.6" />
|
|
||||||
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.5" />
|
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.5" />
|
||||||
<PackageReference Include="Npgsql" Version="10.0.2" />
|
<PackageReference Include="Npgsql" Version="10.0.2" />
|
||||||
<PackageReference Include="Npgsql.DependencyInjection" Version="10.0.2" />
|
<PackageReference Include="Npgsql.DependencyInjection" Version="10.0.2" />
|
||||||
<PackageReference Include="Serilog.AspNetCore" Version="10.0.0" />
|
|
||||||
<PackageReference Include="Serilog.Sinks.Elasticsearch" Version="10.0.0" />
|
|
||||||
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
|
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
using System.Data;
|
using System.Data;
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
using Elastic.Clients.Elasticsearch;
|
|
||||||
using Npgsql;
|
using Npgsql;
|
||||||
|
|
||||||
namespace GeoPulse_Pipeline;
|
namespace GeoPulse_Pipeline;
|
||||||
@ -11,22 +10,15 @@ public class KafkaConsumer : BackgroundService
|
|||||||
private readonly ILogger<KafkaConsumer> _logger;
|
private readonly ILogger<KafkaConsumer> _logger;
|
||||||
private readonly NpgsqlDataSource _dataSource;
|
private readonly NpgsqlDataSource _dataSource;
|
||||||
private readonly string _kafkaHost;
|
private readonly string _kafkaHost;
|
||||||
private readonly ElasticsearchClient _esClient;
|
|
||||||
|
|
||||||
private const int BatchSize = 1000;
|
private const int BatchSize = 1000;
|
||||||
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
|
private readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(2);
|
||||||
|
|
||||||
|
public KafkaConsumer(ILogger<KafkaConsumer> logger, IConfiguration configuration, NpgsqlDataSource dataSource)
|
||||||
public KafkaConsumer(
|
|
||||||
ILogger<KafkaConsumer> logger,
|
|
||||||
IConfiguration configuration,
|
|
||||||
NpgsqlDataSource dataSource,
|
|
||||||
ElasticsearchClient esClient)
|
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_kafkaHost = configuration["KafkaHost"];
|
_kafkaHost = configuration["KafkaHost"];
|
||||||
_dataSource = dataSource;
|
_dataSource = dataSource;
|
||||||
_esClient = esClient;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
@ -36,7 +28,7 @@ public class KafkaConsumer : BackgroundService
|
|||||||
var consumerConfig = new ConsumerConfig
|
var consumerConfig = new ConsumerConfig
|
||||||
{
|
{
|
||||||
BootstrapServers = _kafkaHost,
|
BootstrapServers = _kafkaHost,
|
||||||
GroupId = "telemetry-db-writer-group",
|
GroupId = "telemetry-db-writer-group", // 💡 建議換個名字,強迫 Kafka 重置 Offset
|
||||||
AutoOffsetReset = AutoOffsetReset.Earliest,
|
AutoOffsetReset = AutoOffsetReset.Earliest,
|
||||||
EnableAutoCommit = false
|
EnableAutoCommit = false
|
||||||
};
|
};
|
||||||
@ -50,11 +42,11 @@ public class KafkaConsumer : BackgroundService
|
|||||||
var buffer = new List<ConsumeResult<Ignore, string>>();
|
var buffer = new List<ConsumeResult<Ignore, string>>();
|
||||||
var lastBatchTime = DateTime.UtcNow;
|
var lastBatchTime = DateTime.UtcNow;
|
||||||
|
|
||||||
_logger.LogInformation("Consumer 已啟動,開始監控 Kafka 訊息...");
|
_logger.LogInformation("🚀 Consumer 已啟動,開始監控 Kafka 訊息...");
|
||||||
|
|
||||||
while (!stoppingToken.IsCancellationRequested)
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
try
|
try // 💡 關鍵修正:try 放在 while 裡面
|
||||||
{
|
{
|
||||||
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
|
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
|
||||||
|
|
||||||
@ -71,13 +63,12 @@ public class KafkaConsumer : BackgroundService
|
|||||||
_logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料...");
|
_logger.LogInformation($"[處理中] 準備批次寫入 {buffer.Count} 筆資料...");
|
||||||
|
|
||||||
await BulkInsertToDatabaseAsync(buffer, stoppingToken);
|
await BulkInsertToDatabaseAsync(buffer, stoppingToken);
|
||||||
await BulkInsertToElasticsearchAsync(buffer, stoppingToken);
|
|
||||||
|
|
||||||
var offsets = buffer.GroupBy(msg => msg.TopicPartition)
|
var offsets = buffer.GroupBy(msg => msg.TopicPartition)
|
||||||
.Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList();
|
.Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList();
|
||||||
consumer.Commit(offsets);
|
consumer.Commit(offsets);
|
||||||
|
|
||||||
_logger.LogInformation($"Kafka已寫入DB {buffer.Count} 筆資料至 PostgreSQL。");
|
_logger.LogInformation($"✅ [成功] 已寫入 {buffer.Count} 筆資料至 PostgreSQL。");
|
||||||
|
|
||||||
buffer.Clear();
|
buffer.Clear();
|
||||||
lastBatchTime = DateTime.UtcNow;
|
lastBatchTime = DateTime.UtcNow;
|
||||||
@ -85,11 +76,14 @@ public class KafkaConsumer : BackgroundService
|
|||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, "Kafka寫入DB批次處理發生錯誤!原因: {Message}", ex.Message);
|
// 💡 就算這裡噴錯,程式也只是印出 Log,然後回到 while 頂部繼續跑下一輪
|
||||||
|
_logger.LogError(ex, "❌ [失敗] 批次處理發生錯誤!原因: {Message}", ex.Message);
|
||||||
|
|
||||||
|
// 發生錯誤後,為了避免壞資料卡死 buffer 導致無窮重試,這裡建議清空
|
||||||
buffer.Clear();
|
buffer.Clear();
|
||||||
lastBatchTime = DateTime.UtcNow;
|
lastBatchTime = DateTime.UtcNow;
|
||||||
|
|
||||||
|
// 稍微停 2 秒,避免在連續錯誤時瘋狂刷屏
|
||||||
await Task.Delay(2000, stoppingToken);
|
await Task.Delay(2000, stoppingToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -99,7 +93,6 @@ public class KafkaConsumer : BackgroundService
|
|||||||
CancellationToken stoppingToken)
|
CancellationToken stoppingToken)
|
||||||
{
|
{
|
||||||
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
||||||
|
|
||||||
await using var batch = _dataSource.CreateBatch();
|
await using var batch = _dataSource.CreateBatch();
|
||||||
|
|
||||||
foreach (var msg in buffer)
|
foreach (var msg in buffer)
|
||||||
@ -121,36 +114,4 @@ public class KafkaConsumer : BackgroundService
|
|||||||
|
|
||||||
await batch.ExecuteNonQueryAsync(stoppingToken);
|
await batch.ExecuteNonQueryAsync(stoppingToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task BulkInsertToElasticsearchAsync(List<ConsumeResult<Ignore, string>> buffer,
|
|
||||||
CancellationToken stoppingToken)
|
|
||||||
{
|
|
||||||
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
|
||||||
|
|
||||||
var documents = new List<object>();
|
|
||||||
|
|
||||||
foreach (var msg in buffer)
|
|
||||||
{
|
|
||||||
var data = JsonSerializer.Deserialize<TelemetryRequest>(msg.Message.Value, jsonOptions);
|
|
||||||
if (data == null) continue;
|
|
||||||
|
|
||||||
documents.Add(new
|
|
||||||
{
|
|
||||||
deviceId = data.DeviceId,
|
|
||||||
timestamp = data.Timestamp,
|
|
||||||
location = new { lat = data.Lat, lon = data.Lng }
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
var indexName = $"geopulse-telemetry-{DateTime.UtcNow:yyyy.MM.dd}";
|
|
||||||
|
|
||||||
var response = await _esClient.BulkAsync(b => b
|
|
||||||
.Index(indexName)
|
|
||||||
.CreateMany(documents), stoppingToken);
|
|
||||||
|
|
||||||
if (!response.IsValidResponse)
|
|
||||||
{
|
|
||||||
_logger.LogError("寫入 Elasticsearch 失敗: {Error}", response.DebugInformation);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
@ -5,31 +5,8 @@ using Dapper;
|
|||||||
using GeoPulse_Pipeline;
|
using GeoPulse_Pipeline;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
using Npgsql;
|
using Npgsql;
|
||||||
using Serilog;
|
|
||||||
using Serilog.Sinks.Elasticsearch;
|
|
||||||
using Elastic.Clients.Elasticsearch;
|
|
||||||
|
|
||||||
Log.Logger = new LoggerConfiguration()
|
|
||||||
.MinimumLevel.Information()
|
|
||||||
.MinimumLevel.Override("Microsoft", Serilog.Events.LogEventLevel.Warning)
|
|
||||||
.MinimumLevel.Override("System.Data", Serilog.Events.LogEventLevel.Warning)
|
|
||||||
.MinimumLevel.Override("Npgsql", Serilog.Events.LogEventLevel.Warning)
|
|
||||||
.Enrich.FromLogContext()
|
|
||||||
.WriteTo.Console()
|
|
||||||
.WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri("http://192.168.0.110:9200"))
|
|
||||||
{
|
|
||||||
AutoRegisterTemplate = true,
|
|
||||||
AutoRegisterTemplateVersion = AutoRegisterTemplateVersion.ESv8,
|
|
||||||
IndexFormat = "geopulse-logs-{0:yyyy.MM.dd}",
|
|
||||||
InlineFields = true
|
|
||||||
})
|
|
||||||
.CreateLogger();
|
|
||||||
|
|
||||||
var builder = WebApplication.CreateBuilder(args);
|
var builder = WebApplication.CreateBuilder(args);
|
||||||
builder.Host.UseSerilog();
|
|
||||||
var esSettings = new ElasticsearchClientSettings(new Uri("http://192.168.0.110:9200"));
|
|
||||||
var esClient = new ElasticsearchClient(esSettings);
|
|
||||||
builder.Services.AddSingleton(esClient);
|
|
||||||
var kafkaHost = builder.Configuration.GetValue<string>("KafkaHost");
|
var kafkaHost = builder.Configuration.GetValue<string>("KafkaHost");
|
||||||
var producerConfig = new ProducerConfig()
|
var producerConfig = new ProducerConfig()
|
||||||
{
|
{
|
||||||
@ -60,14 +37,10 @@ app.MapPost("/api/telemetry", async (
|
|||||||
[FromBody] TelemetryRequest request,
|
[FromBody] TelemetryRequest request,
|
||||||
[FromQuery] bool useKafka,
|
[FromQuery] bool useKafka,
|
||||||
IProducer<Null, string> producer,
|
IProducer<Null, string> producer,
|
||||||
IDbConnection dbConnection,
|
IDbConnection dbConnection) =>
|
||||||
ILogger<Program> logger) =>
|
|
||||||
{
|
{
|
||||||
if (string.IsNullOrWhiteSpace(request.DeviceId))
|
if (string.IsNullOrWhiteSpace(request.DeviceId))
|
||||||
{
|
|
||||||
logger.LogWarning("拒絕接收資料:DeviceID為空。");
|
|
||||||
return Results.BadRequest("Device ID is required.");
|
return Results.BadRequest("Device ID is required.");
|
||||||
}
|
|
||||||
|
|
||||||
if (useKafka)
|
if (useKafka)
|
||||||
{
|
{
|
||||||
@ -79,7 +52,7 @@ app.MapPost("/api/telemetry", async (
|
|||||||
}
|
}
|
||||||
catch (ProduceException<Null, string> ex)
|
catch (ProduceException<Null, string> ex)
|
||||||
{
|
{
|
||||||
logger.LogError(ex, "Kafka 寫入失敗: {Reason}", ex.Error.Reason);
|
Console.WriteLine($"Kafka Delivery failed: {ex.Error.Reason}");
|
||||||
return Results.Problem("Kafka 寫入失敗");
|
return Results.Problem("Kafka 寫入失敗");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -95,7 +68,7 @@ app.MapPost("/api/telemetry", async (
|
|||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
logger.LogError(ex, "資料庫直接寫入失敗: {Message}", ex.Message);
|
Console.WriteLine($"DB Insert failed: {ex.Message}");
|
||||||
return Results.Problem("資料庫寫入失敗");
|
return Results.Problem("資料庫寫入失敗");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user