diff --git a/tap_postgres/client.py b/tap_postgres/client.py index d3a93145..adaa086d 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -8,6 +8,7 @@ import datetime import functools import json +import re import select import typing as t from types import MappingProxyType @@ -213,6 +214,8 @@ class PostgresLogBasedStream(SQLStream): replication_key = "_sdc_lsn" + _WAL2JSON_ENUM_QUOTE_RE = re.compile(r'"type":""([^"]+)""') + connection_parameters: ConnectionParameters def __init__( @@ -349,11 +352,18 @@ def consume(self, message, cursor) -> dict | None: try: message_payload = json.loads(message.payload) except json.JSONDecodeError: - self.logger.warning( - "A message payload of %s could not be converted to JSON", - message.payload, - ) - return {} + # wal2json outputs PostgreSQL enum types with unescaped quotes + # e.g., "type":""EnumName"" instead of "type":"EnumName" + # Try to fix this by removing the extra quotes around type values + fixed_payload = self._fix_wal2json_enum_quotes(message.payload) + try: + message_payload = json.loads(fixed_payload) + except json.JSONDecodeError: + self.logger.warning( + "A message payload of %s could not be converted to JSON", + message.payload, + ) + return {} row = {} @@ -403,6 +413,16 @@ def consume(self, message, cursor) -> dict | None: return row + def _fix_wal2json_enum_quotes(self, payload: str) -> str: + """Fix malformed JSON from wal2json for PostgreSQL enum types. + + wal2json outputs enum type names with unescaped quotes, e.g.: + "type":""EnumName"" + This is invalid JSON. We fix it by removing the extra quotes: + "type":"EnumName" + """ + return self._WAL2JSON_ENUM_QUOTE_RE.sub(r'"type":"\1"', payload) + def _parse_column_value(self, column, cursor): # When using log based replication, the wal2json output for columns of # array types returns a string encoded in sql format, e.g. '{a,b}'