64 lines
2.3 KiB
C#
64 lines
2.3 KiB
C#
using System.Collections.Concurrent;
|
|
using Npgsql;
|
|
|
|
namespace GeoPulse_Pipeline;
|
|
|
|
public class PartitionManager
|
|
{
|
|
private readonly NpgsqlDataSource _dataSource;
|
|
private readonly ConcurrentDictionary<string, bool> _knownPartitions = new();
|
|
private readonly ILogger<PartitionManager> _logger;
|
|
|
|
public PartitionManager(NpgsqlDataSource dataSource, ILogger<PartitionManager> logger)
|
|
{
|
|
_dataSource = dataSource;
|
|
_logger = logger;
|
|
}
|
|
|
|
public async Task EnsurePartitionExistsAsync(DateTimeOffset timestamp)
|
|
{
|
|
// According to user example, partitions are daily based on UTC+8
|
|
var localTime = timestamp.ToOffset(TimeSpan.FromHours(8));
|
|
var date = localTime.Date;
|
|
var partitionName = $"telemetry_history_y{date:yyyy}m{date:MM}d{date:dd}";
|
|
|
|
if (_knownPartitions.ContainsKey(partitionName))
|
|
{
|
|
return;
|
|
}
|
|
|
|
var startTime = new DateTimeOffset(date, TimeSpan.FromHours(8));
|
|
var endTime = startTime.AddDays(1);
|
|
|
|
// SQL to create partition if it doesn't exist
|
|
// We use a DO block to ensure atomicity and handle the OWNER change
|
|
var sql = $@"
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = '{partitionName}') THEN
|
|
CREATE TABLE public.{partitionName} PARTITION OF public.telemetry_history
|
|
FOR VALUES FROM ('{startTime:yyyy-MM-dd HH:mm:sszzz}') TO ('{endTime:yyyy-MM-dd HH:mm:sszzz}');
|
|
ALTER TABLE public.{partitionName} OWNER TO postgres;
|
|
END IF;
|
|
END $$;";
|
|
|
|
try
|
|
{
|
|
await using var conn = await _dataSource.OpenConnectionAsync();
|
|
await using var cmd = new NpgsqlCommand(sql, conn);
|
|
await cmd.ExecuteNonQueryAsync();
|
|
|
|
_knownPartitions.TryAdd(partitionName, true);
|
|
_logger.LogInformation("已確保分區表存在: {PartitionName}", partitionName);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "建立分區表 {PartitionName} 時發生錯誤", partitionName);
|
|
// We don't throw here to avoid blocking the whole batch if one partition fails,
|
|
// but the subsequent insert will fail anyway if it's needed.
|
|
// Actually, it's better to let it throw so the caller knows.
|
|
throw;
|
|
}
|
|
}
|
|
}
|