feat: add minimal api
This commit is contained in:
parent
cb84e6118e
commit
610afac511
91
GeoPulse Pipeline/Program.cs
Normal file
91
GeoPulse Pipeline/Program.cs
Normal file
@ -0,0 +1,91 @@
|
||||
using System.Text.Json;
|
||||
using Confluent.Kafka;
|
||||
using GeoPulse_Pipeline;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Npgsql;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
var kafkaHost = builder.Configuration.GetValue<string>("KafkaHost");
|
||||
var producerConfig = new ProducerConfig()
|
||||
{
|
||||
BootstrapServers = kafkaHost,
|
||||
Acks = Acks.Leader
|
||||
};
|
||||
|
||||
|
||||
var kafkaProducer = new ProducerBuilder<Null, string>(producerConfig).Build();
|
||||
builder.Services.AddSingleton(kafkaProducer);
|
||||
builder.Services.AddEndpointsApiExplorer();
|
||||
builder.Services.AddSwaggerGen();
|
||||
builder.Services.AddHostedService<KafkaConsumer>();
|
||||
|
||||
var app = builder.Build();
|
||||
|
||||
if (app.Environment.IsDevelopment())
|
||||
{
|
||||
app.UseSwagger();
|
||||
app.UseSwaggerUI();
|
||||
}
|
||||
|
||||
app.UseHttpsRedirection();
|
||||
|
||||
app.MapPost("/api/telemetry", async (
|
||||
[FromBody] TelemetryRequest request,
|
||||
[FromQuery] bool useKafka,
|
||||
IProducer<Null, string> producer,
|
||||
IConfiguration config) =>
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(request.DeviceId))
|
||||
return Results.BadRequest("Device ID is required.");
|
||||
|
||||
if (useKafka)
|
||||
{
|
||||
var message = JsonSerializer.Serialize(request);
|
||||
try
|
||||
{
|
||||
await producer.ProduceAsync("telemetry-events", new Message<Null, string> { Value = message });
|
||||
return Results.Ok(new { Status = "Success", Route = "Kafka", Message = "已推入消息佇列" });
|
||||
}
|
||||
catch (ProduceException<Null, string> ex)
|
||||
{
|
||||
Console.WriteLine($"Kafka Delivery failed: {ex.Error.Reason}");
|
||||
return Results.Problem("Kafka 寫入失敗");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
try
|
||||
{
|
||||
var connString = config.GetConnectionString("DefaultConnection");
|
||||
|
||||
await using var dataSource = NpgsqlDataSource.Create(connString!);
|
||||
await using var cmd = dataSource.CreateCommand(@"
|
||||
INSERT INTO telemetry_table (id, device_id, geom, timestamp)
|
||||
VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)");
|
||||
|
||||
cmd.Parameters.AddWithValue(request.DeviceId);
|
||||
cmd.Parameters.AddWithValue(request.Lng);
|
||||
cmd.Parameters.AddWithValue(request.Lat);
|
||||
cmd.Parameters.AddWithValue(request.Timestamp);
|
||||
|
||||
await cmd.ExecuteNonQueryAsync();
|
||||
|
||||
return Results.Ok(new { Status = "Success", Route = "Direct-DB", Message = "已直接寫入資料庫" });
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Console.WriteLine($"DB Insert failed: {ex.Message}");
|
||||
return Results.Problem("資料庫寫入失敗");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
app.Lifetime.ApplicationStopping.Register(() =>
|
||||
{
|
||||
kafkaProducer.Flush(TimeSpan.FromSeconds(10));
|
||||
kafkaProducer.Dispose();
|
||||
});
|
||||
|
||||
app.Run();
|
||||
|
||||
record TelemetryRequest(string DeviceId, double Lng, double Lat, DateTimeOffset Timestamp);
|
||||
Loading…
Reference in New Issue
Block a user