GeoPulse_Pipeline/GeoPulse Pipeline/Program.cs
saingchildren a05d5a9050
All checks were successful
Deploy GeoPulse / Build And Deploy (push) Successful in 4s
feat: make sure the partition table exsits.
2026-05-09 13:04:44 +08:00

115 lines
3.6 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Data;
using System.Text.Json;
using Confluent.Kafka;
using Dapper;
using GeoPulse_Pipeline;
using Microsoft.AspNetCore.Mvc;
using Npgsql;
using Serilog;
using Serilog.Sinks.Elasticsearch;
using Elastic.Clients.Elasticsearch;
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Information()
.MinimumLevel.Override("Microsoft", Serilog.Events.LogEventLevel.Warning)
.MinimumLevel.Override("System.Data", Serilog.Events.LogEventLevel.Warning)
.MinimumLevel.Override("Npgsql", Serilog.Events.LogEventLevel.Warning)
.Enrich.FromLogContext()
.WriteTo.Console()
.WriteTo.Elasticsearch(new ElasticsearchSinkOptions(new Uri("http://192.168.0.110:9200"))
{
AutoRegisterTemplate = true,
AutoRegisterTemplateVersion = AutoRegisterTemplateVersion.ESv8,
IndexFormat = "geopulse-logs-{0:yyyy.MM.dd}",
InlineFields = true
})
.CreateLogger();
var builder = WebApplication.CreateBuilder(args);
builder.Host.UseSerilog();
var esSettings = new ElasticsearchClientSettings(new Uri("http://192.168.0.110:9200"));
var esClient = new ElasticsearchClient(esSettings);
builder.Services.AddSingleton(esClient);
var kafkaHost = builder.Configuration.GetValue<string>("KafkaHost");
var producerConfig = new ProducerConfig()
{
BootstrapServers = kafkaHost,
Acks = Acks.Leader
};
var connString = builder.Configuration.GetConnectionString("DefaultConnection");
var kafkaProducer = new ProducerBuilder<Null, string>(producerConfig).Build();
builder.Services.AddSingleton(kafkaProducer);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddHostedService<KafkaConsumer>();
builder.Services.AddNpgsqlDataSource(connString);
builder.Services.AddSingleton<PartitionManager>();
builder.Services.AddScoped<IDbConnection>(sp => sp.GetRequiredService<NpgsqlDataSource>().CreateConnection());
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.MapPost("/api/telemetry", async (
[FromBody] TelemetryRequest request,
[FromQuery] bool useKafka,
IProducer<Null, string> producer,
IDbConnection dbConnection,
PartitionManager partitionManager,
ILogger<Program> logger) =>
{
if (string.IsNullOrWhiteSpace(request.DeviceId))
{
logger.LogWarning("拒絕接收資料DeviceID為空。");
return Results.BadRequest("Device ID is required.");
}
if (useKafka)
{
var message = JsonSerializer.Serialize(request);
try
{
await producer.ProduceAsync("telemetry-events", new Message<Null, string> { Value = message });
return Results.Ok(new { Status = "Success", Route = "Kafka", Message = "已推入消息佇列" });
}
catch (ProduceException<Null, string> ex)
{
logger.LogError(ex, "Kafka 寫入失敗: {Reason}", ex.Error.Reason);
return Results.Problem("Kafka 寫入失敗");
}
}
else
{
try
{
await partitionManager.EnsurePartitionExistsAsync(request.Timestamp);
string sql =
@"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), @DeviceId, @Lng, @Lat, @Timestamp)";
await dbConnection.ExecuteAsync(sql, request);
return Results.Ok(new { Status = "Success", Route = "Direct-DB", Message = "已直接寫入資料庫" });
}
catch (Exception ex)
{
logger.LogError(ex, "資料庫直接寫入失敗: {Message}", ex.Message);
return Results.Problem("資料庫寫入失敗");
}
}
});
app.Lifetime.ApplicationStopping.Register(() =>
{
kafkaProducer.Flush(TimeSpan.FromSeconds(10));
kafkaProducer.Dispose();
});
app.Run();
record TelemetryRequest(string DeviceId, double Lng, double Lat, DateTimeOffset Timestamp);