From 0ad1c4b58bcb123b1483aa18d4b75bd1a7f8932e Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Sun, 11 Jan 2026 17:08:05 +0400 Subject: [PATCH] Remove incorrect duplicate batch skipping in ReorgAwareStream The _is_duplicate_batch check was incorrectly skipping batches based only on comparing ranges to the previous batch. This is wrong because: 1. Proper duplicate detection is handled by state_store.is_processed() in load_stream_continuous, which checks against ALL processed batches 2. The range-only comparison does not check hashes, so batches with same ranges but different data (after reorg) would be incorrectly skipped 3. After crash/restart, prev_ranges_by_network is empty anyway --- src/amp/streaming/reorg.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/amp/streaming/reorg.py b/src/amp/streaming/reorg.py index 9083db7..81702b1 100644 --- a/src/amp/streaming/reorg.py +++ b/src/amp/streaming/reorg.py @@ -49,13 +49,6 @@ def __next__(self) -> ResponseBatch: # Get next batch from underlying stream batch = next(self.stream_iterator) - # Note: ranges_complete flag is handled by CheckpointStore in load_stream_continuous - # Check if this batch contains only duplicate ranges - if self._is_duplicate_batch(batch.metadata.ranges): - self.logger.debug(f'Skipping duplicate batch with ranges: {batch.metadata.ranges}') - # Recursively call to get the next non-duplicate batch - return self.__next__() - # Detect reorgs by comparing with previous ranges invalidation_ranges = self._detect_reorg(batch.metadata.ranges)