GeoPulse_Pipeline/GeoPulse Pipeline/PartitionManager.cs
saingchildren a05d5a9050
All checks were successful
Deploy GeoPulse / Build And Deploy (push) Successful in 4s
feat: make sure the partition table exsits.
2026-05-09 13:04:44 +08:00

64 lines
2.3 KiB
C#

using System.Collections.Concurrent;
using Npgsql;
namespace GeoPulse_Pipeline;
public class PartitionManager
{
private readonly NpgsqlDataSource _dataSource;
private readonly ConcurrentDictionary<string, bool> _knownPartitions = new();
private readonly ILogger<PartitionManager> _logger;
public PartitionManager(NpgsqlDataSource dataSource, ILogger<PartitionManager> logger)
{
_dataSource = dataSource;
_logger = logger;
}
public async Task EnsurePartitionExistsAsync(DateTimeOffset timestamp)
{
// According to user example, partitions are daily based on UTC+8
var localTime = timestamp.ToOffset(TimeSpan.FromHours(8));
var date = localTime.Date;
var partitionName = $"telemetry_history_y{date:yyyy}m{date:MM}d{date:dd}";
if (_knownPartitions.ContainsKey(partitionName))
{
return;
}
var startTime = new DateTimeOffset(date, TimeSpan.FromHours(8));
var endTime = startTime.AddDays(1);
// SQL to create partition if it doesn't exist
// We use a DO block to ensure atomicity and handle the OWNER change
var sql = $@"
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = '{partitionName}') THEN
CREATE TABLE public.{partitionName} PARTITION OF public.telemetry_history
FOR VALUES FROM ('{startTime:yyyy-MM-dd HH:mm:sszzz}') TO ('{endTime:yyyy-MM-dd HH:mm:sszzz}');
ALTER TABLE public.{partitionName} OWNER TO postgres;
END IF;
END $$;";
try
{
await using var conn = await _dataSource.OpenConnectionAsync();
await using var cmd = new NpgsqlCommand(sql, conn);
await cmd.ExecuteNonQueryAsync();
_knownPartitions.TryAdd(partitionName, true);
_logger.LogInformation("已確保分區表存在: {PartitionName}", partitionName);
}
catch (Exception ex)
{
_logger.LogError(ex, "建立分區表 {PartitionName} 時發生錯誤", partitionName);
// We don't throw here to avoid blocking the whole batch if one partition fails,
// but the subsequent insert will fail anyway if it's needed.
// Actually, it's better to let it throw so the caller knows.
throw;
}
}
}