diff --git a/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs b/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs index 4d7f651..15c510b 100644 --- a/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs +++ b/src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs @@ -142,7 +142,7 @@ public void CommitOffsets() var topicDirs = new ZKGroupTopicDirs(this.config.GroupId, topic.Key); foreach (KeyValuePair partition in topic.Value) { - var newOffset = partition.Value.ConsumeOffset; + var newOffset = partition.Value.ConsumeOffset + 1; try { if (partition.Value.ConsumeOffsetValid) diff --git a/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs b/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs index 64c2210..e2748d8 100644 --- a/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs +++ b/src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs @@ -501,14 +501,12 @@ private void AddPartitionTopicInfo(ZKGroupTopicDirs topicDirs, string partition, //if first time starting a consumer, set the initial offset based on the config long offset = -1; - long offsetCommited = -1; if (offsetCommitedString != null) { - offsetCommited = long.Parse(offsetCommitedString); - offset = offsetCommited + 1; + offset = long.Parse(offsetCommitedString); } - Logger.InfoFormat("Final offset {0} for topic {1} partition {2} OffsetCommited {3}" - , offset, topic, partition, offsetCommited); + Logger.InfoFormat("Final offset {0} for topic {1} partition {2}." + , offset, topic, partition); var queue = this.queues[new Tuple(topic, consumerThreadId)]; var partTopicInfo = new PartitionTopicInfo( @@ -516,11 +514,11 @@ private void AddPartitionTopicInfo(ZKGroupTopicDirs topicDirs, string partition, leader, partitionId, queue, - offsetCommited, + offset - 1, offset, offset, this.config.FetchSize, - offsetCommited); + offset); partTopicInfoMap[partitionId] = partTopicInfo; Logger.InfoFormat("{0} selected new offset {1}", partTopicInfo, offset); }