Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ git clone git@github.com:schwaho/SimpleTicketing.git
cd SimpleTicketing
```

If you want a known working state, check out the latest release tag (e.g. `v0.1.0`).
```bash
git checkout -b main-v0.1.0 v0.1.0
```

#### 2. Create a virtual environment

```bash
Expand Down
11 changes: 8 additions & 3 deletions src/simple_ticketing/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import shutil
from importlib import resources
import typer as typer_module
from simple_ticketing.database import db_init, db_clear
from simple_ticketing.database import open_connection, close_connection, transaction, db_clear
from simple_ticketing.db_schema import create_all_tables
from simple_ticketing.cert import create_ca, create_signed_certificate

app = typer_module.Typer()
Expand Down Expand Up @@ -62,6 +63,7 @@ def init() -> None:
instance_dir = os.path.join("instance")
data_dir = os.path.join(instance_dir, "data")
structur = ["qr_codes", "tickets", "pdf", "tmp"]
database_path = os.path.join("instance", "data", "tickets.db")

if not os.path.exists(data_dir):
for path in structur:
Expand All @@ -72,8 +74,11 @@ def init() -> None:

copy_examples(instance_dir)

db_init()
logger.info("Database has been created.")
open_connection(database_path)
with transaction():
create_all_tables()
logger.info("Database has been created.")
close_connection()

ca_cert, ca_key = create_ca()
create_signed_certificate(ca_cert, ca_key)
Expand Down
209 changes: 208 additions & 1 deletion src/simple_ticketing/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import os
import logging
import sqlite3
from typing import List, Tuple, Any, Dict, Optional
from contextlib import contextmanager
from contextvars import ContextVar
from typing import List, Tuple, Any, Dict, Optional, Iterator, Union

logger = logging.getLogger(__name__)

Expand All @@ -30,6 +32,211 @@
"hidden": "BOOLEAN DEFAULT FALSE",
}

_current_connection: ContextVar[Optional[sqlite3.Connection]] = ContextVar(
"current_db_connection",
default=None,
)


def open_connection(db_path: str) -> None:
"""
Open a SQLite database connection and store it in the thread-local context.

Args:
db_path: Path to the SQLite database file.

Raises:
sqlite3.Error: If the connection cannot be established.
"""
conn = sqlite3.connect(
db_path,
isolation_level=None,
)
conn.row_factory = sqlite3.Row
_current_connection.set(conn)


def get_connection() -> sqlite3.Connection:
"""
Retrieve the current SQLite database connection from the thread-local context.

Returns:
sqlite3.Connection: The active database connection.

Raises:
RuntimeError: If no connection has been initialized.
"""
conn = _current_connection.get()
if conn is None:
raise RuntimeError("Database connection not initialized")
return conn


def close_connection() -> None:
"""
Close the current SQLite database connection and clear the thread-local context.

Does nothing if no connection is currently open.
"""
conn = _current_connection.get()
if conn is not None:
conn.close()
_current_connection.set(None)


@contextmanager
def transaction() -> Iterator[None]:
"""
Execute multiple database operations atomically.

All statements executed within this context are part of a single
database transaction. If an exception is raised, the transaction
is rolled back. Otherwise, it is committed.

Raises:
sqlite3.DatabaseError: If committing or rolling back fails.
"""
conn = get_connection()

try:
logger.debug("BEGIN TRANSACTION")
conn.execute("BEGIN")

yield

except sqlite3.DatabaseError as e:
logger.debug("ROLLBACK TRANSACTION")
conn.rollback()
raise e

logger.debug("COMMIT TRANSACTION")
conn.commit()


def execute(sql: str, params: Optional[Union[Tuple[Any, ...], Dict[str, Any]]] = None) -> None:
"""Execute a single SQL statement without returning a result.

This function is the lowest-level execution primitive. It is responsible
for parameter binding, execution, error handling, and logging. It must not
encode any domain knowledge or assumptions about the queried data.

Args:
sql: The SQL statement to execute.
params: Optional positional or named parameters for the SQL statement.

Raises:
sqlite3.DatabaseError: If execution fails for any reason.
"""
conn = get_connection()
try:
if params is not None:
logger.debug("Executing SQL: %s with params: %s", sql, params)
conn.execute(sql, params)
else:
logger.debug("Executing SQL: %s", sql)
conn.execute(sql)
except sqlite3.DatabaseError:
logger.exception("Database execution failed.")
raise


def execute_many(sql: str, params: List[Union[Tuple[Any, ...], Dict[str, Any]]]) -> None:
"""Execute the same SQL statement multiple times with different parameters.

Intended for batch inserts or updates. This function does not implement
any domain-specific batching logic.

Args:
sql: The SQL statement to execute.
params: A list of parameter sets (positional or named) to apply to the SQL statement.

Raises:
sqlite3.DatabaseError: If execution fails for any reason.
"""
if not params:
logger.debug("execute_many called with empty params list.")
return

conn = get_connection()
try:
logger.debug("Executing SQL many times: %s with %d parameter sets", sql, len(params))
conn.executemany(sql, params)
except sqlite3.DatabaseError:
logger.exception("Database batch execution failed.")
raise


def fetch_one(
sql: str, params: Optional[Union[Tuple[Any, ...], Dict[str, Any]]] = None
) -> Optional[Dict[str, Any]]:
"""Execute a SQL query and return a single row.

If the query yields no result, None is returned. If multiple rows are
returned by the query, only the first row is returned.

Args:
sql: The SQL SELECT statement to execute.
params: Optional positional or named parameters for the SQL statement.

Returns:
A single row represented as a mapping, or None if no row was found.

Raises:
sqlite3.DatabaseError: If execution fails for any reason.
"""
conn = get_connection()
cursor = conn.cursor()
try:
logger.debug("Executing SQL (fetch_one): %s with params: %s", sql, params)
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
row = cursor.fetchone()
if row is None:
return None
return dict(row)
except sqlite3.DatabaseError:
logger.exception("Database query failed (fetch_one).")
raise
finally:
cursor.close()


def fetch_all(
sql: str, params: Optional[Union[Tuple[Any, ...], Dict[str, Any]]] = None
) -> List[Dict[str, Any]]:
"""Execute a SQL query and return all resulting rows.

The database layer does not interpret or post-process results. Ordering,
grouping, and semantic meaning are the responsibility of the caller.

Args:
sql: The SQL SELECT statement to execute.
params: Optional positional or named parameters for the SQL statement.

Returns:
A list of rows represented as mappings. The list may be empty.

Raises:
sqlite3.DatabaseError: If execution fails for any reason.
"""
conn = get_connection()
cursor = conn.cursor()
try:
logger.debug("Executing SQL (fetch_all): %s with params: %s", sql, params)
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
rows = cursor.fetchall()
return [dict(row) for row in rows]
except sqlite3.DatabaseError:
logger.exception("Database query failed (fetch_all).")
raise
finally:
cursor.close()


def db_get_connection() -> sqlite3.Connection:
"""
Expand Down
Loading
Loading