diff --git a/GeoPulse Pipeline/Program.cs b/GeoPulse Pipeline/Program.cs new file mode 100644 index 0000000..dbaaa4d --- /dev/null +++ b/GeoPulse Pipeline/Program.cs @@ -0,0 +1,91 @@ +using System.Text.Json; +using Confluent.Kafka; +using GeoPulse_Pipeline; +using Microsoft.AspNetCore.Mvc; +using Npgsql; + +var builder = WebApplication.CreateBuilder(args); +var kafkaHost = builder.Configuration.GetValue("KafkaHost"); +var producerConfig = new ProducerConfig() +{ + BootstrapServers = kafkaHost, + Acks = Acks.Leader +}; + + +var kafkaProducer = new ProducerBuilder(producerConfig).Build(); +builder.Services.AddSingleton(kafkaProducer); +builder.Services.AddEndpointsApiExplorer(); +builder.Services.AddSwaggerGen(); +builder.Services.AddHostedService(); + +var app = builder.Build(); + +if (app.Environment.IsDevelopment()) +{ + app.UseSwagger(); + app.UseSwaggerUI(); +} + +app.UseHttpsRedirection(); + +app.MapPost("/api/telemetry", async ( + [FromBody] TelemetryRequest request, + [FromQuery] bool useKafka, + IProducer producer, + IConfiguration config) => +{ + if (string.IsNullOrWhiteSpace(request.DeviceId)) + return Results.BadRequest("Device ID is required."); + + if (useKafka) + { + var message = JsonSerializer.Serialize(request); + try + { + await producer.ProduceAsync("telemetry-events", new Message { Value = message }); + return Results.Ok(new { Status = "Success", Route = "Kafka", Message = "已推入消息佇列" }); + } + catch (ProduceException ex) + { + Console.WriteLine($"Kafka Delivery failed: {ex.Error.Reason}"); + return Results.Problem("Kafka 寫入失敗"); + } + } + else + { + try + { + var connString = config.GetConnectionString("DefaultConnection"); + + await using var dataSource = NpgsqlDataSource.Create(connString!); + await using var cmd = dataSource.CreateCommand(@" + INSERT INTO telemetry_table (id, device_id, geom, timestamp) + VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)"); + + cmd.Parameters.AddWithValue(request.DeviceId); + cmd.Parameters.AddWithValue(request.Lng); + cmd.Parameters.AddWithValue(request.Lat); + cmd.Parameters.AddWithValue(request.Timestamp); + + await cmd.ExecuteNonQueryAsync(); + + return Results.Ok(new { Status = "Success", Route = "Direct-DB", Message = "已直接寫入資料庫" }); + } + catch (Exception ex) + { + Console.WriteLine($"DB Insert failed: {ex.Message}"); + return Results.Problem("資料庫寫入失敗"); + } + } +}); + +app.Lifetime.ApplicationStopping.Register(() => +{ + kafkaProducer.Flush(TimeSpan.FromSeconds(10)); + kafkaProducer.Dispose(); +}); + +app.Run(); + +record TelemetryRequest(string DeviceId, double Lng, double Lat, DateTimeOffset Timestamp); \ No newline at end of file