using System.Collections.Concurrent; using Npgsql; namespace GeoPulse_Pipeline; public class PartitionManager { private readonly NpgsqlDataSource _dataSource; private readonly ConcurrentDictionary _knownPartitions = new(); private readonly ILogger _logger; public PartitionManager(NpgsqlDataSource dataSource, ILogger logger) { _dataSource = dataSource; _logger = logger; } public async Task EnsurePartitionExistsAsync(DateTimeOffset timestamp) { // According to user example, partitions are daily based on UTC+8 var localTime = timestamp.ToOffset(TimeSpan.FromHours(8)); var date = localTime.Date; var partitionName = $"telemetry_history_y{date:yyyy}m{date:MM}d{date:dd}"; if (_knownPartitions.ContainsKey(partitionName)) { return; } var startTime = new DateTimeOffset(date, TimeSpan.FromHours(8)); var endTime = startTime.AddDays(1); // SQL to create partition if it doesn't exist // We use a DO block to ensure atomicity and handle the OWNER change var sql = $@" DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = '{partitionName}') THEN CREATE TABLE public.{partitionName} PARTITION OF public.telemetry_history FOR VALUES FROM ('{startTime:yyyy-MM-dd HH:mm:sszzz}') TO ('{endTime:yyyy-MM-dd HH:mm:sszzz}'); ALTER TABLE public.{partitionName} OWNER TO postgres; END IF; END $$;"; try { await using var conn = await _dataSource.OpenConnectionAsync(); await using var cmd = new NpgsqlCommand(sql, conn); await cmd.ExecuteNonQueryAsync(); _knownPartitions.TryAdd(partitionName, true); _logger.LogInformation("已確保分區表存在: {PartitionName}", partitionName); } catch (Exception ex) { _logger.LogError(ex, "建立分區表 {PartitionName} 時發生錯誤", partitionName); // We don't throw here to avoid blocking the whole batch if one partition fails, // but the subsequent insert will fail anyway if it's needed. // Actually, it's better to let it throw so the caller knows. throw; } } }