diff --git a/GeoPulse Pipeline/KafkaConsumer.cs b/GeoPulse Pipeline/KafkaConsumer.cs index cf26d4c..cd3b096 100644 --- a/GeoPulse Pipeline/KafkaConsumer.cs +++ b/GeoPulse Pipeline/KafkaConsumer.cs @@ -64,8 +64,9 @@ public class KafkaConsumer : BackgroundService await BulkInsertToDatabaseAsync(buffer, stoppingToken); - var latestOffset = buffer.Last(); - consumer.Commit(latestOffset); + var offsets = buffer.GroupBy(msg => msg.TopicPartition) + .Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList(); + consumer.Commit(offsets); _logger.LogInformation($"✅ [成功] 已寫入 {buffer.Count} 筆資料至 PostgreSQL。");