diff --git a/GeoPulse Pipeline/KafkaConsumer.cs b/GeoPulse Pipeline/KafkaConsumer.cs index 1607058..aa6beb8 100644 --- a/GeoPulse Pipeline/KafkaConsumer.cs +++ b/GeoPulse Pipeline/KafkaConsumer.cs @@ -10,6 +10,7 @@ public class KafkaConsumer : BackgroundService { private readonly ILogger _logger; private readonly NpgsqlDataSource _dataSource; + private readonly PartitionManager _partitionManager; private readonly string _kafkaHost; private readonly ElasticsearchClient _esClient; @@ -21,11 +22,13 @@ public class KafkaConsumer : BackgroundService ILogger logger, IConfiguration configuration, NpgsqlDataSource dataSource, + PartitionManager partitionManager, ElasticsearchClient esClient) { _logger = logger; _kafkaHost = configuration["KafkaHost"]; _dataSource = dataSource; + _partitionManager = partitionManager; _esClient = esClient; } @@ -100,13 +103,28 @@ public class KafkaConsumer : BackgroundService { var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true }; - await using var batch = _dataSource.CreateBatch(); + // 先收集所有不重複的日期並確保分區表存在 + var uniqueDates = new HashSet(); + var processedMessages = new List<(TelemetryRequest Data, ConsumeResult Msg)>(); foreach (var msg in buffer) { var data = JsonSerializer.Deserialize(msg.Message.Value, jsonOptions); if (data == null) continue; + + uniqueDates.Add(data.Timestamp); + processedMessages.Add((data, msg)); + } + foreach (var timestamp in uniqueDates) + { + await _partitionManager.EnsurePartitionExistsAsync(timestamp); + } + + await using var batch = _dataSource.CreateBatch(); + + foreach (var (data, msg) in processedMessages) + { var cmd = batch.CreateBatchCommand(); cmd.CommandText = @"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), $1, ST_SetSRID(ST_MakePoint($2, $3), 4326), $4)"; diff --git a/GeoPulse Pipeline/PartitionManager.cs b/GeoPulse Pipeline/PartitionManager.cs new file mode 100644 index 0000000..f96bfd2 --- /dev/null +++ b/GeoPulse Pipeline/PartitionManager.cs @@ -0,0 +1,63 @@ +using System.Collections.Concurrent; +using Npgsql; + +namespace GeoPulse_Pipeline; + +public class PartitionManager +{ + private readonly NpgsqlDataSource _dataSource; + private readonly ConcurrentDictionary _knownPartitions = new(); + private readonly ILogger _logger; + + public PartitionManager(NpgsqlDataSource dataSource, ILogger logger) + { + _dataSource = dataSource; + _logger = logger; + } + + public async Task EnsurePartitionExistsAsync(DateTimeOffset timestamp) + { + // According to user example, partitions are daily based on UTC+8 + var localTime = timestamp.ToOffset(TimeSpan.FromHours(8)); + var date = localTime.Date; + var partitionName = $"telemetry_history_y{date:yyyy}m{date:MM}d{date:dd}"; + + if (_knownPartitions.ContainsKey(partitionName)) + { + return; + } + + var startTime = new DateTimeOffset(date, TimeSpan.FromHours(8)); + var endTime = startTime.AddDays(1); + + // SQL to create partition if it doesn't exist + // We use a DO block to ensure atomicity and handle the OWNER change + var sql = $@" +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = '{partitionName}') THEN + CREATE TABLE public.{partitionName} PARTITION OF public.telemetry_history + FOR VALUES FROM ('{startTime:yyyy-MM-dd HH:mm:sszzz}') TO ('{endTime:yyyy-MM-dd HH:mm:sszzz}'); + ALTER TABLE public.{partitionName} OWNER TO postgres; + END IF; +END $$;"; + + try + { + await using var conn = await _dataSource.OpenConnectionAsync(); + await using var cmd = new NpgsqlCommand(sql, conn); + await cmd.ExecuteNonQueryAsync(); + + _knownPartitions.TryAdd(partitionName, true); + _logger.LogInformation("已確保分區表存在: {PartitionName}", partitionName); + } + catch (Exception ex) + { + _logger.LogError(ex, "建立分區表 {PartitionName} 時發生錯誤", partitionName); + // We don't throw here to avoid blocking the whole batch if one partition fails, + // but the subsequent insert will fail anyway if it's needed. + // Actually, it's better to let it throw so the caller knows. + throw; + } + } +} diff --git a/GeoPulse Pipeline/Program.cs b/GeoPulse Pipeline/Program.cs index 1977f43..26da335 100644 --- a/GeoPulse Pipeline/Program.cs +++ b/GeoPulse Pipeline/Program.cs @@ -44,6 +44,7 @@ builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); builder.Services.AddHostedService(); builder.Services.AddNpgsqlDataSource(connString); +builder.Services.AddSingleton(); builder.Services.AddScoped(sp => sp.GetRequiredService().CreateConnection()); var app = builder.Build(); @@ -61,6 +62,7 @@ app.MapPost("/api/telemetry", async ( [FromQuery] bool useKafka, IProducer producer, IDbConnection dbConnection, + PartitionManager partitionManager, ILogger logger) => { if (string.IsNullOrWhiteSpace(request.DeviceId)) @@ -87,8 +89,9 @@ app.MapPost("/api/telemetry", async ( { try { + await partitionManager.EnsurePartitionExistsAsync(request.Timestamp); string sql = - @"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), @DeviceId, @Lng, @Lat, @Timestamp"; + @"INSERT INTO telemetry_history (id, device_id, geom, timestamp) VALUES (gen_random_uuid(), @DeviceId, @Lng, @Lat, @Timestamp)"; await dbConnection.ExecuteAsync(sql, request); return Results.Ok(new { Status = "Success", Route = "Direct-DB", Message = "已直接寫入資料庫" });