Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import datetime
import functools
import json
import re
import select
import typing as t
from types import MappingProxyType
Expand Down Expand Up @@ -213,6 +214,8 @@ class PostgresLogBasedStream(SQLStream):

replication_key = "_sdc_lsn"

_WAL2JSON_ENUM_QUOTE_RE = re.compile(r'"type":""([^"]+)""')

connection_parameters: ConnectionParameters

def __init__(
Expand Down Expand Up @@ -349,11 +352,18 @@ def consume(self, message, cursor) -> dict | None:
try:
message_payload = json.loads(message.payload)
except json.JSONDecodeError:
self.logger.warning(
"A message payload of %s could not be converted to JSON",
message.payload,
)
return {}
# wal2json outputs PostgreSQL enum types with unescaped quotes
# e.g., "type":""EnumName"" instead of "type":"EnumName"
# Try to fix this by removing the extra quotes around type values
fixed_payload = self._fix_wal2json_enum_quotes(message.payload)
try:
message_payload = json.loads(fixed_payload)
except json.JSONDecodeError:
self.logger.warning(
"A message payload of %s could not be converted to JSON",
message.payload,
)
Comment on lines +359 to +365
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perf: this adds another try/except block to each iteration of the log message loop. Even if the performance hit is measurable, having the sync crash is worse.

I %100 need to add some sort of automatic benchmarks for general and log-based performance here.

return {}

row = {}

Expand Down Expand Up @@ -403,6 +413,16 @@ def consume(self, message, cursor) -> dict | None:

return row

def _fix_wal2json_enum_quotes(self, payload: str) -> str:
"""Fix malformed JSON from wal2json for PostgreSQL enum types.

wal2json outputs enum type names with unescaped quotes, e.g.:
"type":""EnumName""
This is invalid JSON. We fix it by removing the extra quotes:
"type":"EnumName"
"""
return self._WAL2JSON_ENUM_QUOTE_RE.sub(r'"type":"\1"', payload)

def _parse_column_value(self, column, cursor):
# When using log based replication, the wal2json output for columns of
# array types returns a string encoded in sql format, e.g. '{a,b}'
Expand Down