Skip to content

fix: Fix InvalidStreamSortException in LOG_BASED replication#747

Merged
edgarrmondragon merged 4 commits intoMeltanoLabs:mainfrom
ksohail22:fix/invalid-stream-sort-exception
Mar 11, 2026
Merged

fix: Fix InvalidStreamSortException in LOG_BASED replication#747
edgarrmondragon merged 4 commits intoMeltanoLabs:mainfrom
ksohail22:fix/invalid-stream-sort-exception

Conversation

@ksohail22
Copy link
Contributor

Problem

After setting is_sorted = True on PostgresLogBasedStream, LOG_BASED streams crash with InvalidStreamSortException on subsequent syncs:

singer_sdk.exceptions.InvalidStreamSortException: Unsorted data detected in stream.
Latest value '3665872270224' is smaller than previous max '3665872270520'.

This happens because the previous _increment_stream_state delegated to the Singer SDK's increment_state() with check_sorted=True. When a stream resumes via start_replication(start_lsn=bookmark_value), PostgreSQL can deliver WAL records with data_start slightly below the stored bookmark — particularly when the replication slot is shared across multiple tables and changes from interleaved transactions straddle the resume point. The SDK interprets this as unsorted data and raises the exception.

Root Cause

The SDK's increment_state() function, when called with is_sorted=True and check_sorted=True (the default), enforces that every incoming record's replication key value must be >= the previous maximum. It initializes the "previous maximum" from the bookmark's replication_key_value. If any WAL record arrives with an LSN even 1 byte below the bookmark, the stream crashes.

This is a false positive: PostgreSQL's WAL is globally ordered, but when resuming from a specific LSN on a shared replication slot, records near the resume boundary can have data_start values marginally below the requested start position.

Fix

Replace the _increment_stream_state override with a self-contained max-forward-only implementation that:

  1. Reads the current replication_key_value from state
  2. Only updates the bookmark if the new value is >= the existing value
  3. Silently skips records with lower LSN (they are still emitted to the target — only bookmark tracking is affected)
  4. Removes the dependency on singer_sdk.helpers._state.increment_state

This preserves the bookmark-advancing behavior introduced in PR #3 while eliminating the crash on minor LSN out-of-order conditions.

Changes

  • tap_postgres/client.py: Rewrote _increment_stream_state to use direct max-forward-only comparison instead of delegating to increment_state(). Removed unused from singer_sdk.helpers._state import increment_state import.

Why not just set check_sorted = False?

Setting check_sorted = False on the class would disable the sort validation but still route through the SDK's increment_state() code path, which uses progress_markers when is_sorted=False and direct state updates when is_sorted=True. By implementing our own simple max-forward-only logic, we avoid any coupling to the SDK's internal state management semantics for this edge case and make the behavior explicit and predictable.

Side Effects & Corner Cases

  • Duplicate records to the target: Records with LSN below the bookmark are still yielded to the target (this method only affects state tracking, not record emission). Targets using merge/upsert handle this idempotently.
  • No data loss: The bookmark only ever moves forward, so no WAL records are permanently skipped on future syncs.
  • First sync (no prior bookmark): When old_value is None, the first record's LSN is always accepted, correctly bootstrapping the bookmark.
  • _sdc_lsn is always an integer: Simple >= comparison is type-safe; no string/datetime comparison edge cases.

Test Plan

  • Verify that LOG_BASED streams no longer crash with InvalidStreamSortException on subsequent syncs
  • Verify that replication_key_value advances correctly after each sync
  • Verify that send_feedback(flush_lsn=...) reports the advanced LSN to PostgreSQL, preventing WAL growth
  • Run against production replication slots shared across multiple tables

@edgarrmondragon edgarrmondragon changed the title Fix InvalidStreamSortException in LOG_BASED replication fix: Fix InvalidStreamSortException in LOG_BASED replication Mar 11, 2026
@edgarrmondragon edgarrmondragon added this pull request to the merge queue Mar 11, 2026
Merged via the queue into MeltanoLabs:main with commit 51a3d11 Mar 11, 2026
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants