From c504f73f805596ab1486fdc507f55f38f433a981 Mon Sep 17 00:00:00 2001 From: Kashif Sohail Date: Wed, 25 Feb 2026 21:57:05 +0500 Subject: [PATCH 1/3] fix: Initialize connector before accessing connection_parameters in discover_streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `connection_parameters` attribute is set as a side effect of the `connector` cached property. In `discover_streams`, accessing `self.connection_parameters` (for LOG_BASED streams) without first ensuring `self.connector` has been called could raise an AttributeError. This change accesses the connector once at the top of the method, stores it in a local variable, and reuses it — guaranteeing initialization order and avoiding redundant cached property lookups in the loop. Co-authored-by: Cursor --- tap_postgres/tap.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tap_postgres/tap.py b/tap_postgres/tap.py index 3171511c..8c8cd597 100644 --- a/tap_postgres/tap.py +++ b/tap_postgres/tap.py @@ -715,6 +715,10 @@ def discover_streams(self) -> Sequence[Stream]: Returns: List of discovered Stream objects. """ + # Access connector first to ensure connection_parameters is initialized. + # connection_parameters is set as a side effect of the connector property. + connector = self.connector + streams: list[SQLStream] = [] for catalog_entry in self.catalog_dict["streams"]: if catalog_entry["replication_method"] == "LOG_BASED": @@ -723,9 +727,9 @@ def discover_streams(self) -> Sequence[Stream]: self, catalog_entry, connection_parameters=self.connection_parameters, - connector=self.connector, + connector=connector, ) ) else: - streams.append(PostgresStream(self, catalog_entry, connector=self.connector)) + streams.append(PostgresStream(self, catalog_entry, connector=connector)) return streams From 81bc0b2562a700cbbe8d217c1c7b64e7aa25a8d8 Mon Sep 17 00:00:00 2001 From: Kashif Sohail Date: Thu, 26 Feb 2026 16:02:58 +0500 Subject: [PATCH 2/3] fix: Handle wal2json enum type quoting issue in LOG_BASED replication --- tap_postgres/client.py | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/tap_postgres/client.py b/tap_postgres/client.py index b2a538d9..a1b8628f 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -8,6 +8,7 @@ import datetime import functools import json +import re import select import typing as t from types import MappingProxyType @@ -393,11 +394,18 @@ def consume(self, message, cursor) -> dict | None: try: message_payload = json.loads(message.payload) except json.JSONDecodeError: - self.logger.warning( - "A message payload of %s could not be converted to JSON", - message.payload, - ) - return {} + # wal2json outputs PostgreSQL enum types with unescaped quotes + # e.g., "type":""EnumName"" instead of "type":"EnumName" + # Try to fix this by removing the extra quotes around type values + fixed_payload = self._fix_wal2json_enum_quotes(message.payload) + try: + message_payload = json.loads(fixed_payload) + except json.JSONDecodeError: + self.logger.warning( + "A message payload of %s could not be converted to JSON", + message.payload, + ) + return {} row = {} @@ -445,6 +453,21 @@ def consume(self, message, cursor) -> dict | None: return row + def _fix_wal2json_enum_quotes(self, payload: str) -> str: + """Fix malformed JSON from wal2json for PostgreSQL enum types. + + wal2json outputs enum type names with unescaped quotes, e.g.: + "type":""EnumName"" + This is invalid JSON. We fix it by removing the extra quotes: + "type":"EnumName" + """ + # Pattern matches "type":"" followed by a word and then "" + # e.g., "type":""OrderStatus"" -> "type":"OrderStatus" + # The malformed output has literal double-double quotes around enum names + pattern = r'"type":""([^"]+)""' + replacement = r'"type":"\1"' + return re.sub(pattern, replacement, payload) + def _parse_column_value(self, column, cursor): # When using log based replication, the wal2json output for columns of # array types returns a string encoded in sql format, e.g. '{a,b}' From c74735d6f59013a01026f842a068f1354da1ed2c Mon Sep 17 00:00:00 2001 From: Kashif Sohail Date: Tue, 3 Mar 2026 21:49:50 +0500 Subject: [PATCH 3/3] perf: Compile wal2json enum quote regex as a class-level constant --- tap_postgres/client.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tap_postgres/client.py b/tap_postgres/client.py index a1b8628f..dd281c92 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -258,6 +258,8 @@ class PostgresLogBasedStream(SQLStream): replication_key = "_sdc_lsn" + _WAL2JSON_ENUM_QUOTE_RE = re.compile(r'"type":""([^"]+)""') + connection_parameters: ConnectionParameters def __init__( @@ -461,12 +463,7 @@ def _fix_wal2json_enum_quotes(self, payload: str) -> str: This is invalid JSON. We fix it by removing the extra quotes: "type":"EnumName" """ - # Pattern matches "type":"" followed by a word and then "" - # e.g., "type":""OrderStatus"" -> "type":"OrderStatus" - # The malformed output has literal double-double quotes around enum names - pattern = r'"type":""([^"]+)""' - replacement = r'"type":"\1"' - return re.sub(pattern, replacement, payload) + return self._WAL2JSON_ENUM_QUOTE_RE.sub(r'"type":"\1"', payload) def _parse_column_value(self, column, cursor): # When using log based replication, the wal2json output for columns of