fix: Fix InvalidStreamSortException in LOG_BASED replication#747
Merged
edgarrmondragon merged 4 commits intoMeltanoLabs:mainfrom Mar 11, 2026
Merged
Conversation
Fix: LOG_BASED replication bookmark not advancing between syncs
InvalidStreamSortException in LOG_BASED replication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
After setting
is_sorted = TrueonPostgresLogBasedStream, LOG_BASED streams crash withInvalidStreamSortExceptionon subsequent syncs:This happens because the previous
_increment_stream_statedelegated to the Singer SDK'sincrement_state()withcheck_sorted=True. When a stream resumes viastart_replication(start_lsn=bookmark_value), PostgreSQL can deliver WAL records withdata_startslightly below the stored bookmark — particularly when the replication slot is shared across multiple tables and changes from interleaved transactions straddle the resume point. The SDK interprets this as unsorted data and raises the exception.Root Cause
The SDK's
increment_state()function, when called withis_sorted=Trueandcheck_sorted=True(the default), enforces that every incoming record's replication key value must be>=the previous maximum. It initializes the "previous maximum" from the bookmark'sreplication_key_value. If any WAL record arrives with an LSN even 1 byte below the bookmark, the stream crashes.This is a false positive: PostgreSQL's WAL is globally ordered, but when resuming from a specific LSN on a shared replication slot, records near the resume boundary can have
data_startvalues marginally below the requested start position.Fix
Replace the
_increment_stream_stateoverride with a self-contained max-forward-only implementation that:replication_key_valuefrom state>=the existing valuesinger_sdk.helpers._state.increment_stateThis preserves the bookmark-advancing behavior introduced in PR #3 while eliminating the crash on minor LSN out-of-order conditions.
Changes
tap_postgres/client.py: Rewrote_increment_stream_stateto use direct max-forward-only comparison instead of delegating toincrement_state(). Removed unusedfrom singer_sdk.helpers._state import increment_stateimport.Why not just set
check_sorted = False?Setting
check_sorted = Falseon the class would disable the sort validation but still route through the SDK'sincrement_state()code path, which usesprogress_markerswhenis_sorted=Falseand direct state updates whenis_sorted=True. By implementing our own simple max-forward-only logic, we avoid any coupling to the SDK's internal state management semantics for this edge case and make the behavior explicit and predictable.Side Effects & Corner Cases
old_value is None, the first record's LSN is always accepted, correctly bootstrapping the bookmark._sdc_lsnis always an integer: Simple>=comparison is type-safe; no string/datetime comparison edge cases.Test Plan
InvalidStreamSortExceptionon subsequent syncsreplication_key_valueadvances correctly after each syncsend_feedback(flush_lsn=...)reports the advanced LSN to PostgreSQL, preventing WAL growth