Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
FROM python:3.10-slim-buster
FROM python:3.13-slim-bookworm
ENV ARCHITECTURE=x64
ENV PYTHONDONTWRITEBYTECODE 1 # Keeps Python from generating .pyc files in the container
ENV PYTHONUNBUFFERED 1 # Turns off buffering for easier container logging
ENV SQLALCHEMY_SILENCE_UBER_WARNING 1 # because we really should upgrade to SQLAlchemy 2.x
ENV QUESTDB_CONNECT_HOST "host.docker.internal"

RUN apt-get -y update
RUN apt-get -y upgrade
RUN apt-get -y --no-install-recommends install syslog-ng ca-certificates vim procps unzip less tar gzip iputils-ping gcc build-essential
RUN apt-get clean
RUN rm -rf /var/lib/apt/lists/*
RUN apt-get -y update \
&& apt-get -y upgrade \
&& apt-get -y --no-install-recommends install ca-certificates vim procps unzip less tar gzip iputils-ping gcc build-essential \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

COPY . /app
WORKDIR /app
RUN pip install -U pip && pip install psycopg2-binary 'SQLAlchemy<=1.4.47' .
RUN pip install -U pip && pip install .
CMD ["python", "src/examples/sqlalchemy_orm.py"]
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ compose-down:
echo "y" | docker volume prune

docker-test:
docker run -e QUESTDB_CONNECT_HOST='host.docker.internal' -e SQLALCHEMY_SILENCE_UBER_WARNING=1 questdb/questdb-connect:latest
docker run -e QUESTDB_CONNECT_HOST='host.docker.internal' questdb/questdb-connect:latest

test:
python3 -m pytest
Expand Down
14 changes: 10 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
# https://pip.pypa.io/en/stable/reference/build-system/pyproject-toml/
name = 'questdb-connect'
version = '1.1.5' # Standalone production version (with engine)
version = '2.0.0' # SA 2.0 + Superset 4.1+
# version = '0.0.113' # testing version
authors = [{ name = 'questdb.io', email = 'support@questdb.io' }]
description = "SqlAlchemy library"
Expand All @@ -14,8 +14,13 @@ classifiers = [
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13',
]
dependencies = [
'SQLAlchemy>=2.0',
'psycopg2-binary>=2.9',
]
dependencies = []

[project.urls]
'Homepage' = "https://github.com/questdb/questdb-connect/"
Expand All @@ -32,8 +37,8 @@ questdb = 'qdb_superset.db_engine_specs.questdb:QuestDbEngineSpec'
[project.optional-dependencies]
test = [
'psycopg2-binary~=2.9.6',
'SQLAlchemy>=1.4, <2',
'apache-superset>=3.0.0',
'SQLAlchemy>=2.0',
'apache-superset>=4.1.0',
'sqlparse==0.4.4',
'pytest~=7.3.0',
'pytest_mock~=3.11.1',
Expand Down Expand Up @@ -63,6 +68,7 @@ max-args = 10
'tests/test_dialect.py' = ['S101', 'PLR2004']
'tests/test_types.py' = ['S101']
'tests/test_superset.py' = ['S101']
'tests/test_sa2_compat.py' = ['S101', 'PLR2004']
'tests/conftest.py' = ['S608']
'src/examples/sqlalchemy_raw.py' = ['S608']
'src/examples/server_utilisation.py' = ['S311']
27 changes: 14 additions & 13 deletions src/qdb_superset/db_engine_specs/questdb.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
from __future__ import annotations

import logging
import re
from datetime import datetime
from typing import Any

import questdb_connect.types as qdbc_types
from flask_babel import gettext as __
from marshmallow import fields, Schema
from questdb_connect.common import remove_public_schema
from marshmallow import Schema, fields
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql.expression import text, TextClause
from sqlalchemy.sql.expression import TextClause, text
from sqlalchemy.types import TypeEngine
import logging

import questdb_connect.types as qdbc_types
from questdb_connect.common import remove_public_schema

# Configure the logging
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)

from superset import sql_parse
from superset.db_engine_specs.base import (
BaseEngineSpec,
BasicParametersMixin,
BasicParametersType,
)
from superset import sql_parse
from superset.sql.parse import Table
from superset.utils import core as utils
from superset.utils.core import GenericDataType

Expand Down Expand Up @@ -269,9 +271,8 @@ def get_sqla_column_type(
def select_star( # pylint: disable=too-many-arguments
cls,
database: Any,
table_name: str,
table: Table,
engine: Engine,
schema: str | None = None,
limit: int = 100,
show_cols: bool = False,
indent: bool = True,
Expand All @@ -280,9 +281,8 @@ def select_star( # pylint: disable=too-many-arguments
) -> str:
"""Generate a "SELECT * from table_name" query with appropriate limit.
:param database: Database instance
:param table_name: Table name, unquoted
:param table: Table instance
:param engine: SqlAlchemy Engine instance
:param schema: Schema, unquoted
:param limit: limit to impose on query
:param show_cols: Show columns in query; otherwise use "*"
:param indent: Add indentation to query
Expand All @@ -292,9 +292,8 @@ def select_star( # pylint: disable=too-many-arguments
"""
return super().select_star(
database,
table_name,
Table(table=table.table, schema=None, catalog=table.catalog),
engine,
None,
limit,
show_cols,
indent,
Expand Down Expand Up @@ -332,16 +331,18 @@ def execute( # pylint: disable=unused-argument
cls,
cursor: Any,
query: str,
database: Any,
**kwargs: Any,
) -> None:
"""Execute a SQL query
:param cursor: Cursor instance
:param query: Query to execute
:param database: Database instance
:param kwargs: kwargs to be passed to cursor.execute()
:return:
"""
try:
sql = sql_parse.strip_comments_from_sql(query)
sql = sql_parse.strip_comments_from_sql(query, engine=cls.engine)
cursor.execute(sql)
except Exception as ex:
# Log the exception with traceback
Expand Down
8 changes: 2 additions & 6 deletions src/questdb_connect/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .inspector import QDBInspector

# ===== SQLAlchemy Dialect ======
# https://docs.sqlalchemy.org/en/14/ apache-superset requires SQLAlchemy 1.4
# https://docs.sqlalchemy.org/en/20/


def connection_uri(
Expand All @@ -23,9 +23,7 @@ def create_engine(
):
return sqlalchemy.create_engine(
connection_uri(host, port, username, password, database),
future=True,
hide_parameters=False,
implicit_returning=False,
isolation_level="REPEATABLE READ",
)

Expand All @@ -35,9 +33,7 @@ def create_superset_engine(
):
return sqlalchemy.create_engine(
connection_uri(host, port, username, password, database),
future=False,
hide_parameters=False,
implicit_returning=True,
isolation_level="REPEATABLE READ",
)

Expand Down Expand Up @@ -68,7 +64,7 @@ class QuestDBDialect(PGDialect_psycopg2, abc.ABC):
supports_is_distinct_from = False

@classmethod
def dbapi(cls):
def import_dbapi(cls):
import questdb_connect as dbapi

return dbapi
Expand Down
136 changes: 79 additions & 57 deletions src/questdb_connect/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,24 @@

import psycopg2
import sqlalchemy
from sqlalchemy.engine import Connection

from .common import PartitionBy
from .table_engine import QDBTableEngine
from .types import resolve_type_from_name


class QDBInspector(sqlalchemy.engine.reflection.Inspector, abc.ABC):
def _get_connection(self):
"""Get a usable connection from self.bind.

In SA 1.4, self.bind may be an Engine (with .execute()).
In SA 2.0, Engine.execute() is removed, so we must use .connect().
"""
if isinstance(self.bind, Connection):
return self.bind
return self.bind.connect()

def reflecttable(
self,
table,
Expand All @@ -32,71 +43,82 @@ def reflect_table(
_reflect_info=None,
):
table_name = table.name
conn = self._get_connection()
try:
result_set = self.bind.execute(
sqlalchemy.text(
"SELECT designatedTimestamp, partitionBy, walEnabled FROM tables() WHERE table_name = :tn"
),
{"tn": table_name},
)
except psycopg2.DatabaseError:
# older version
result_set = self.bind.execute(
try:
result_set = conn.execute(
sqlalchemy.text(
"SELECT designatedTimestamp, partitionBy, walEnabled FROM tables() WHERE table_name = :tn"
),
{"tn": table_name},
)
except psycopg2.DatabaseError:
# older QuestDB version uses 'name' instead of 'table_name'
result_set = conn.execute(
sqlalchemy.text(
"SELECT designatedTimestamp, partitionBy, walEnabled FROM tables() WHERE name = :tn"
),
{"tn": table_name},
)
if not result_set:
self._panic_table(table_name)
table_attrs = result_set.first()
if table_attrs:
col_ts_name = table_attrs[0]
partition_by = PartitionBy[table_attrs[1]]
is_wal = True if table_attrs[2] else False
else:
col_ts_name = None
partition_by = PartitionBy.NONE
is_wal = True
dedup_upsert_keys = []
for row in conn.execute(
sqlalchemy.text(
"SELECT designatedTimestamp, partitionBy, walEnabled FROM tables() WHERE name = :tn"
'SELECT "column", "type", "upsertKey" FROM table_columns(:tn)'
),
{"tn": table_name},
)
if not result_set:
self._panic_table(table_name)
table_attrs = result_set.first()
if table_attrs:
col_ts_name = table_attrs[0]
partition_by = PartitionBy[table_attrs[1]]
is_wal = True if table_attrs[2] else False
else:
col_ts_name = None
partition_by = PartitionBy.NONE
is_wal = True
dedup_upsert_keys = []
for row in self.bind.execute(
sqlalchemy.text(
'SELECT "column", "type", "upsertKey" FROM table_columns(:tn)'
),
{"tn": table_name},
):
col_name = row[0]
if include_columns and col_name not in include_columns:
continue
if exclude_columns and col_name in exclude_columns:
continue
if row[2]: # upsertKey
dedup_upsert_keys.append(col_name)
col_type = resolve_type_from_name(row[1])
table.append_column(
sqlalchemy.Column(
col_name,
col_type,
primary_key=(
col_ts_name and col_ts_name.upper() == col_name.upper()
),
):
col_name = row[0]
if include_columns and col_name not in include_columns:
continue
if exclude_columns and col_name in exclude_columns:
continue
if row[2]: # upsertKey
dedup_upsert_keys.append(col_name)
col_type = resolve_type_from_name(row[1])
table.append_column(
sqlalchemy.Column(
col_name,
col_type,
primary_key=(
col_ts_name and col_ts_name.upper() == col_name.upper()
),
)
)
table.engine = QDBTableEngine(
table_name,
col_ts_name,
partition_by,
is_wal,
tuple(dedup_upsert_keys) if dedup_upsert_keys else None,
)
table.engine = QDBTableEngine(
table_name,
col_ts_name,
partition_by,
is_wal,
tuple(dedup_upsert_keys) if dedup_upsert_keys else None,
)
table.metadata = sqlalchemy.MetaData()
table.metadata = sqlalchemy.MetaData()
finally:
# Close the connection if we opened it (bind was an Engine)
if not isinstance(self.bind, Connection):
conn.close()

def get_columns(self, table_name, schema=None, **kw):
result_set = self.bind.execute(
sqlalchemy.text('SELECT "column", "type" FROM table_columns(:tn)'),
{"tn": table_name},
)
return self.format_table_columns(table_name, result_set)
conn = self._get_connection()
try:
result_set = conn.execute(
sqlalchemy.text('SELECT "column", "type" FROM table_columns(:tn)'),
{"tn": table_name},
)
return self.format_table_columns(table_name, result_set)
finally:
if not isinstance(self.bind, Connection):
conn.close()

def get_schema_names(self):
return ["public"]
Expand Down
Loading
Loading