Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
FROM python:3.10-slim-buster
FROM python:3.13-slim-bookworm
ENV ARCHITECTURE=x64
ENV PYTHONDONTWRITEBYTECODE 1 # Keeps Python from generating .pyc files in the container
ENV PYTHONUNBUFFERED 1 # Turns off buffering for easier container logging
ENV SQLALCHEMY_SILENCE_UBER_WARNING 1 # because we really should upgrade to SQLAlchemy 2.x
ENV QUESTDB_CONNECT_HOST "host.docker.internal"

RUN apt-get -y update
RUN apt-get -y upgrade
RUN apt-get -y --no-install-recommends install syslog-ng ca-certificates vim procps unzip less tar gzip iputils-ping gcc build-essential
RUN apt-get clean
RUN rm -rf /var/lib/apt/lists/*
RUN apt-get -y update \
&& apt-get -y upgrade \
&& apt-get -y --no-install-recommends install ca-certificates vim procps unzip less tar gzip iputils-ping gcc build-essential \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

COPY . /app
WORKDIR /app
RUN pip install -U pip && pip install psycopg2-binary 'SQLAlchemy<=1.4.47' .
RUN pip install -U pip && pip install .
CMD ["python", "src/examples/sqlalchemy_orm.py"]
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ compose-down:
echo "y" | docker volume prune

docker-test:
docker run -e QUESTDB_CONNECT_HOST='host.docker.internal' -e SQLALCHEMY_SILENCE_UBER_WARNING=1 questdb/questdb-connect:latest
docker run -e QUESTDB_CONNECT_HOST='host.docker.internal' questdb/questdb-connect:latest

test:
python3 -m pytest
Expand Down
13 changes: 10 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
# https://pip.pypa.io/en/stable/reference/build-system/pyproject-toml/
name = 'questdb-connect'
version = '1.1.5' # Standalone production version (with engine)
version = '1.2.0' # SA 1.4 + 2.0 dual compat
# version = '0.0.113' # testing version
authors = [{ name = 'questdb.io', email = 'support@questdb.io' }]
description = "SqlAlchemy library"
Expand All @@ -14,8 +14,14 @@ classifiers = [
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13',
]
dependencies = [
'SQLAlchemy>=1.4',
'psycopg2-binary>=2.9',
'packaging',
]
dependencies = []

[project.urls]
'Homepage' = "https://github.com/questdb/questdb-connect/"
Expand All @@ -32,7 +38,7 @@ questdb = 'qdb_superset.db_engine_specs.questdb:QuestDbEngineSpec'
[project.optional-dependencies]
test = [
'psycopg2-binary~=2.9.6',
'SQLAlchemy>=1.4, <2',
'SQLAlchemy>=1.4',
'apache-superset>=3.0.0',
'sqlparse==0.4.4',
'pytest~=7.3.0',
Expand Down Expand Up @@ -63,6 +69,7 @@ max-args = 10
'tests/test_dialect.py' = ['S101', 'PLR2004']
'tests/test_types.py' = ['S101']
'tests/test_superset.py' = ['S101']
'tests/test_sa2_compat.py' = ['S101', 'PLR2004']
'tests/conftest.py' = ['S608']
'src/examples/sqlalchemy_raw.py' = ['S608']
'src/examples/server_utilisation.py' = ['S311']
34 changes: 25 additions & 9 deletions src/questdb_connect/dialect.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc

import sqlalchemy
from packaging.version import Version
from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2
from sqlalchemy.sql.compiler import GenericTypeCompiler

Expand All @@ -9,7 +10,10 @@
from .inspector import QDBInspector

# ===== SQLAlchemy Dialect ======
# https://docs.sqlalchemy.org/en/14/ apache-superset requires SQLAlchemy 1.4
# https://docs.sqlalchemy.org/en/20/

SA_VERSION = Version(sqlalchemy.__version__)
SA_V2 = SA_VERSION >= Version("2.0")


def connection_uri(
Expand All @@ -21,24 +25,32 @@ def connection_uri(
def create_engine(
host: str, port: str, username: str, password: str, database: str = "main"
):
kwargs = {
"hide_parameters": False,
"isolation_level": "REPEATABLE READ",
}
if not SA_V2:
kwargs["future"] = True
kwargs["implicit_returning"] = False
return sqlalchemy.create_engine(
connection_uri(host, port, username, password, database),
future=True,
hide_parameters=False,
implicit_returning=False,
isolation_level="REPEATABLE READ",
**kwargs,
)


def create_superset_engine(
host: str, port: str, username: str, password: str, database: str = "main"
):
kwargs = {
"hide_parameters": False,
"isolation_level": "REPEATABLE READ",
}
if not SA_V2:
kwargs["future"] = False
kwargs["implicit_returning"] = True
return sqlalchemy.create_engine(
connection_uri(host, port, username, password, database),
future=False,
hide_parameters=False,
implicit_returning=True,
isolation_level="REPEATABLE READ",
**kwargs,
)


Expand Down Expand Up @@ -73,6 +85,10 @@ def dbapi(cls):

return dbapi

@classmethod
def import_dbapi(cls):
return cls.dbapi()

def get_schema_names(self, conn, **kw):
return ["public"]

Expand Down
136 changes: 79 additions & 57 deletions src/questdb_connect/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,24 @@

import psycopg2
import sqlalchemy
from sqlalchemy.engine import Connection

from .common import PartitionBy
from .table_engine import QDBTableEngine
from .types import resolve_type_from_name


class QDBInspector(sqlalchemy.engine.reflection.Inspector, abc.ABC):
def _get_connection(self):
"""Get a usable connection from self.bind.

In SA 1.4, self.bind may be an Engine (with .execute()).
In SA 2.0, Engine.execute() is removed, so we must use .connect().
"""
if isinstance(self.bind, Connection):
return self.bind
return self.bind.connect()

def reflecttable(
self,
table,
Expand All @@ -32,71 +43,82 @@ def reflect_table(
_reflect_info=None,
):
table_name = table.name
conn = self._get_connection()
try:
result_set = self.bind.execute(
sqlalchemy.text(
"SELECT designatedTimestamp, partitionBy, walEnabled FROM tables() WHERE table_name = :tn"
),
{"tn": table_name},
)
except psycopg2.DatabaseError:
# older version
result_set = self.bind.execute(
try:
result_set = conn.execute(
sqlalchemy.text(
"SELECT designatedTimestamp, partitionBy, walEnabled FROM tables() WHERE table_name = :tn"
),
{"tn": table_name},
)
except psycopg2.DatabaseError:
# older QuestDB version uses 'name' instead of 'table_name'
result_set = conn.execute(
sqlalchemy.text(
"SELECT designatedTimestamp, partitionBy, walEnabled FROM tables() WHERE name = :tn"
),
{"tn": table_name},
)
if not result_set:
self._panic_table(table_name)
table_attrs = result_set.first()
if table_attrs:
col_ts_name = table_attrs[0]
partition_by = PartitionBy[table_attrs[1]]
is_wal = True if table_attrs[2] else False
else:
col_ts_name = None
partition_by = PartitionBy.NONE
is_wal = True
dedup_upsert_keys = []
for row in conn.execute(
sqlalchemy.text(
"SELECT designatedTimestamp, partitionBy, walEnabled FROM tables() WHERE name = :tn"
'SELECT "column", "type", "upsertKey" FROM table_columns(:tn)'
),
{"tn": table_name},
)
if not result_set:
self._panic_table(table_name)
table_attrs = result_set.first()
if table_attrs:
col_ts_name = table_attrs[0]
partition_by = PartitionBy[table_attrs[1]]
is_wal = True if table_attrs[2] else False
else:
col_ts_name = None
partition_by = PartitionBy.NONE
is_wal = True
dedup_upsert_keys = []
for row in self.bind.execute(
sqlalchemy.text(
'SELECT "column", "type", "upsertKey" FROM table_columns(:tn)'
),
{"tn": table_name},
):
col_name = row[0]
if include_columns and col_name not in include_columns:
continue
if exclude_columns and col_name in exclude_columns:
continue
if row[2]: # upsertKey
dedup_upsert_keys.append(col_name)
col_type = resolve_type_from_name(row[1])
table.append_column(
sqlalchemy.Column(
col_name,
col_type,
primary_key=(
col_ts_name and col_ts_name.upper() == col_name.upper()
),
):
col_name = row[0]
if include_columns and col_name not in include_columns:
continue
if exclude_columns and col_name in exclude_columns:
continue
if row[2]: # upsertKey
dedup_upsert_keys.append(col_name)
col_type = resolve_type_from_name(row[1])
table.append_column(
sqlalchemy.Column(
col_name,
col_type,
primary_key=(
col_ts_name and col_ts_name.upper() == col_name.upper()
),
)
)
table.engine = QDBTableEngine(
table_name,
col_ts_name,
partition_by,
is_wal,
tuple(dedup_upsert_keys) if dedup_upsert_keys else None,
)
table.engine = QDBTableEngine(
table_name,
col_ts_name,
partition_by,
is_wal,
tuple(dedup_upsert_keys) if dedup_upsert_keys else None,
)
table.metadata = sqlalchemy.MetaData()
table.metadata = sqlalchemy.MetaData()
finally:
# Close the connection if we opened it (bind was an Engine)
if not isinstance(self.bind, Connection):
conn.close()

def get_columns(self, table_name, schema=None, **kw):
result_set = self.bind.execute(
sqlalchemy.text('SELECT "column", "type" FROM table_columns(:tn)'),
{"tn": table_name},
)
return self.format_table_columns(table_name, result_set)
conn = self._get_connection()
try:
result_set = conn.execute(
sqlalchemy.text('SELECT "column", "type" FROM table_columns(:tn)'),
{"tn": table_name},
)
return self.format_table_columns(table_name, result_set)
finally:
if not isinstance(self.bind, Connection):
conn.close()

def get_schema_names(self):
return ["public"]
Expand Down
64 changes: 64 additions & 0 deletions tests/test_sa2_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Tests for SQLAlchemy 2.0 compatibility.

These tests verify the SA 2.0 migration changes work correctly.
"""
import questdb_connect as qdbc
import sqlalchemy
from questdb_connect.dialect import SA_V2, QuestDBDialect


def test_import_dbapi():
"""import_dbapi() must exist and return the questdb_connect module."""
dbapi = QuestDBDialect.import_dbapi()
assert dbapi is qdbc


def test_dbapi_still_works():
"""dbapi() must still work for SA 1.4 backward compat."""
dbapi = QuestDBDialect.dbapi()
assert dbapi is qdbc


def test_create_engine_no_deprecated_params(test_config):
"""create_engine() must not pass deprecated params on SA 2.0."""
import warnings
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
engine = qdbc.create_engine(
test_config.host,
test_config.port,
test_config.username,
test_config.password,
test_config.database,
)
sa_dep = [x for x in w if issubclass(x.category, DeprecationWarning)
and ("implicit_returning" in str(x.message) or "future" in str(x.message))]
assert len(sa_dep) == 0, f"Unexpected deprecation warnings: {sa_dep}"
engine.dispose()


def test_create_superset_engine_no_deprecated_params(test_config):
"""create_superset_engine() must not pass deprecated params on SA 2.0."""
import warnings
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
engine = qdbc.create_superset_engine(
test_config.host,
test_config.port,
test_config.username,
test_config.password,
test_config.database,
)
sa_dep = [x for x in w if issubclass(x.category, DeprecationWarning)
and ("implicit_returning" in str(x.message) or "future" in str(x.message))]
assert len(sa_dep) == 0, f"Unexpected deprecation warnings: {sa_dep}"
engine.dispose()


def test_sa_version_detection():
"""SA_V2 flag must match the installed SQLAlchemy version."""
major = int(sqlalchemy.__version__.split(".")[0])
if major >= 2:
assert SA_V2 is True
else:
assert SA_V2 is False