From 33606ca9eb26ff21f8a5cd58425ce782867479bc Mon Sep 17 00:00:00 2001 From: saingchildren <80457007+saingchildren@users.noreply.github.com> Date: Thu, 30 Apr 2026 10:09:12 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=A7=A3=E6=B1=BAkafka=E6=9C=83?= =?UTF-8?q?=E9=81=BA=E6=BC=8Fcommit=E5=B7=B2=E8=99=95=E7=90=86=E7=9A=84mee?= =?UTF-8?q?sage=E7=9A=84=E5=95=8F=E9=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- GeoPulse Pipeline/KafkaConsumer.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/GeoPulse Pipeline/KafkaConsumer.cs b/GeoPulse Pipeline/KafkaConsumer.cs index cf26d4c..cd3b096 100644 --- a/GeoPulse Pipeline/KafkaConsumer.cs +++ b/GeoPulse Pipeline/KafkaConsumer.cs @@ -64,8 +64,9 @@ public class KafkaConsumer : BackgroundService await BulkInsertToDatabaseAsync(buffer, stoppingToken); - var latestOffset = buffer.Last(); - consumer.Commit(latestOffset); + var offsets = buffer.GroupBy(msg => msg.TopicPartition) + .Select(g => new TopicPartitionOffset(g.Key, g.Last().Offset + 1)).ToList(); + consumer.Commit(offsets); _logger.LogInformation($"✅ [成功] 已寫入 {buffer.Count} 筆資料至 PostgreSQL。");