From 610afac5110de962e92718b81218227bc200f3f0 Mon Sep 17 00:00:00 2001 From: saingchildren <80457007+saingchildren@users.noreply.github.com> Date: Wed, 29 Apr 2026 10:54:40 +0800 Subject: [PATCH] feat: add minimal api --- GeoPulse Pipeline/Program.cs | 91 ++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 GeoPulse Pipeline/Program.cs diff --git a/GeoPulse Pipeline/Program.cs b/GeoPulse Pipeline/Program.cs new file mode 100644 index 0000000..dbaaa4d --- /dev/null +++ b/GeoPulse Pipeline/Program.cs @@ -0,0 +1,91 @@ +using System.Text.Json; +using Confluent.Kafka; +using GeoPulse_Pipeline; +using Microsoft.AspNetCore.Mvc; +using Npgsql; + +var builder = WebApplication.CreateBuilder(args); +var kafkaHost = builder.Configuration.GetValue("KafkaHost"); +var producerConfig = new ProducerConfig() +{ + BootstrapServers = kafkaHost, + Acks = Acks.Leader +}; + + +var kafkaProducer = new ProducerBuilder(producerConfig).Build(); +builder.Services.AddSingleton(kafkaProducer); +builder.Services.AddEndpointsApiExplorer(); +builder.Services.AddSwaggerGen(); +builder.Services.AddHostedService(); + +var app = builder.Build(); + +if (app.Environment.IsDevelopment()) +{ + app.UseSwagger(); + app.UseSwaggerUI(); +} + +app.UseHttpsRedirection(); + +app.MapPost("/api/telemetry", async ( + [FromBody] TelemetryRequest request, + [FromQuery] bool useKafka, + IProducer producer, + IConfiguration config) => +{ + if (string.IsNullOrWhiteSpace(request.DeviceId)) + return Results.BadRequest("Device ID is required."); + + if (useKafka) + { + var message = JsonSerializer.Serialize(request); + try + { + await producer.ProduceAsync("telemetry-events", new Message { Value = message }); + return Results.Ok(new { Status = "Success", Route = "Kafka", Message = "已推入消息佇列" }); + } + catch (ProduceException ex) + { + Console.WriteLine($"Kafka Delivery failed: {ex.Error.Reason}"); + return Results.Problem("Kafka 寫入失敗"); + } + } + else + { + try + { + var connString = config.GetConnectionString("DefaultConnection"); + + await using var dataSource = NpgsqlDataSource.Create(connString!); + await using var cmd = dataSource.CreateCommand(@" + INSERT INTO telemetry_table (id, device_id, geom, timestamp) + VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)"); + + cmd.Parameters.AddWithValue(request.DeviceId); + cmd.Parameters.AddWithValue(request.Lng); + cmd.Parameters.AddWithValue(request.Lat); + cmd.Parameters.AddWithValue(request.Timestamp); + + await cmd.ExecuteNonQueryAsync(); + + return Results.Ok(new { Status = "Success", Route = "Direct-DB", Message = "已直接寫入資料庫" }); + } + catch (Exception ex) + { + Console.WriteLine($"DB Insert failed: {ex.Message}"); + return Results.Problem("資料庫寫入失敗"); + } + } +}); + +app.Lifetime.ApplicationStopping.Register(() => +{ + kafkaProducer.Flush(TimeSpan.FromSeconds(10)); + kafkaProducer.Dispose(); +}); + +app.Run(); + +record TelemetryRequest(string DeviceId, double Lng, double Lat, DateTimeOffset Timestamp); \ No newline at end of file