From cb63549b580b1f42fe9e55c121f962adb26d4e01 Mon Sep 17 00:00:00 2001 From: jorntx <> Date: Fri, 16 Jan 2026 08:52:11 +0100 Subject: [PATCH 1/4] Enhance filename sanitization to include additional disallowed characters Add additional '#=!~' sanitizing to filenames and subdir foldernames --- pywaybackup/helper.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pywaybackup/helper.py b/pywaybackup/helper.py index ca5005a..8362b82 100644 --- a/pywaybackup/helper.py +++ b/pywaybackup/helper.py @@ -15,7 +15,7 @@ def sanitize_filename(filename: str) -> str: """ Sanitize a string to be used as (part of) a filename. """ - disallowed = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"] + disallowed = ["<", ">", ":", '"', "/", "\\", "|", "?", "*", "=", "#", "!", "~"] for char in disallowed: filename = filename.replace(char, ".") filename = ".".join(filter(None, filename.split("."))) @@ -63,12 +63,12 @@ def url_split(url, index=False): else: filename = "index.html" if index else "" subdir = "/".join(path_parts).strip("/") - # sanitize subdir and filename for windows - if check_nt(): - special_chars = [":", "*", "?", "&", "=", "<", ">", "\\", "|"] - for char in special_chars: - subdir = subdir.replace(char, f"%{ord(char):02x}") - filename = filename.replace(char, f"%{ord(char):02x}") + + # Sanitize special characters that are problematic in file- and foldernames + special_chars = [":", "*", "?", "&", "=", "<", ">", "\\", "|", "#", "!", "~"] + for char in special_chars: + subdir = subdir.replace(char, f"%{ord(char):02x}") + filename = filename.replace(char, f"%{ord(char):02x}") filename = filename.replace("%20", " ") return domain, subdir, filename From 5b0d2de2eb5e219382cbc4ae5c48c0702979092f Mon Sep 17 00:00:00 2001 From: Victor Johnston Date: Mon, 19 Jan 2026 09:39:27 +0100 Subject: [PATCH 2/4] Add atomic claim of snapshots --- pywaybackup/Snapshot.py | 45 ++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/pywaybackup/Snapshot.py b/pywaybackup/Snapshot.py index 1b343c1..3755501 100644 --- a/pywaybackup/Snapshot.py +++ b/pywaybackup/Snapshot.py @@ -1,7 +1,7 @@ import os import threading -from pywaybackup.db import Database, select, update, waybackup_snapshots +from pywaybackup.db import Database, select, update, waybackup_snapshots, and_ from pywaybackup.helper import url_split @@ -70,20 +70,41 @@ def __on_sqlite(): return False def __get_row(): - with self._db.session.begin(): - row = self._db.session.execute( - select(waybackup_snapshots) + # Atomic claim: find next unprocessed scid, set response='LOCK' only if still unprocessed, + # then fetch that row. This avoids relying on FOR UPDATE or explicit nested transactions + # which can trigger "A transaction is already begun on this Session" errors. + + session = self._db.session + + # get next available SnapshotId + scid = ( + session.execute( + select(waybackup_snapshots.scid) .where(waybackup_snapshots.response.is_(None)) .order_by(waybackup_snapshots.scid) .limit(1) - .with_for_update(skip_locked=True) - ).scalar_one_or_none() - - if row is None: - return None - - row.response = "LOCK" - + ) + .scalar_one_or_none() + ) + + if scid is None: + return None + + # try to atomically claim the row by updating only if still unclaimed + result = session.execute( + update(waybackup_snapshots) + .where(and_(waybackup_snapshots.scid == scid, waybackup_snapshots.response.is_(None))) + .values(response="LOCK") + ) + + # if another worker claimed it first, rowcount will be 0 and cannot be claimed by this worker. + if result.rowcount == 0: + session.commit() + return None + + # The row has been claimed by the worker and can now be fetched. + row = session.execute(select(waybackup_snapshots).where(waybackup_snapshots.scid == scid)).scalar_one_or_none() + session.commit() return row if __on_sqlite(): From e1dd090ca6d499f940eb29a535410cf5a4f29eba Mon Sep 17 00:00:00 2001 From: Victor Johnston Date: Mon, 19 Jan 2026 11:50:22 +0100 Subject: [PATCH 3/4] Harden closing of database connections --- pywaybackup/PyWayBackup.py | 7 ++ pywaybackup/Snapshot.py | 37 +++++++-- pywaybackup/SnapshotCollection.py | 130 +++++++++++++++++------------- pywaybackup/Worker.py | 21 +++++ pywaybackup/db.py | 25 +++++- 5 files changed, 158 insertions(+), 62 deletions(-) diff --git a/pywaybackup/PyWayBackup.py b/pywaybackup/PyWayBackup.py index 0ae84af..75418d1 100644 --- a/pywaybackup/PyWayBackup.py +++ b/pywaybackup/PyWayBackup.py @@ -363,6 +363,7 @@ def _workflow(self): resources after the backup is complete. """ + collection = None try: self._startup() @@ -384,6 +385,12 @@ def _workflow(self): self._keep = True ex.exception(message="", e=e) finally: + # if a collection was created during the workflow, close its DB session cleanly + try: + if collection: + collection.close() + except Exception: + pass self._shutdown() def paths(self, rel: bool = False) -> dict: diff --git a/pywaybackup/Snapshot.py b/pywaybackup/Snapshot.py index 3755501..75c5b54 100644 --- a/pywaybackup/Snapshot.py +++ b/pywaybackup/Snapshot.py @@ -3,6 +3,7 @@ from pywaybackup.db import Database, select, update, waybackup_snapshots, and_ from pywaybackup.helper import url_split +from pywaybackup.Verbosity import Verbosity as vb class Snapshot: @@ -77,6 +78,7 @@ def __get_row(): session = self._db.session # get next available SnapshotId + vb.write(verbose=True, content=f"[Snapshot.fetch] selecting next scid") scid = ( session.execute( select(waybackup_snapshots.scid) @@ -88,6 +90,7 @@ def __get_row(): ) if scid is None: + vb.write(verbose=True, content=f"[Snapshot.fetch] no unprocessed scid found") return None # try to atomically claim the row by updating only if still unclaimed @@ -98,13 +101,25 @@ def __get_row(): ) # if another worker claimed it first, rowcount will be 0 and cannot be claimed by this worker. + vb.write(verbose=True, content=f"[Snapshot.fetch] attempted to claim scid={scid}, rowcount={result.rowcount}") if result.rowcount == 0: - session.commit() + try: + session.commit() + except Exception: + pass + vb.write(verbose=True, content=f"[Snapshot.fetch] scid={scid} already claimed by another worker") return None # The row has been claimed by the worker and can now be fetched. row = session.execute(select(waybackup_snapshots).where(waybackup_snapshots.scid == scid)).scalar_one_or_none() - session.commit() + try: + session.commit() + except Exception: + try: + session.rollback() + except Exception: + pass + vb.write(verbose=True, content=f"[Snapshot.fetch] claimed scid={scid} and fetched row") return row if __on_sqlite(): @@ -122,10 +137,20 @@ def modify(self, column, value): value: New value to set for the column. """ column = getattr(waybackup_snapshots, column) - self._db.session.execute( - update(waybackup_snapshots).where(waybackup_snapshots.scid == self.scid).values({column: value}) - ) - self._db.session.commit() + try: + vb.write(verbose=True, content=f"[Snapshot.modify] updating scid={self.scid} column={column.key}") + self._db.session.execute( + update(waybackup_snapshots).where(waybackup_snapshots.scid == self.scid).values({column: value}) + ) + self._db.session.commit() + vb.write(verbose=True, content=f"[Snapshot.modify] update committed scid={self.scid} column={column.key}") + except Exception as e: + vb.write(verbose=True, content=f"[Snapshot.modify] update failed scid={self.scid} error={e}; rolling back") + try: + self._db.session.rollback() + except Exception: + pass + raise def create_output(self): """ diff --git a/pywaybackup/SnapshotCollection.py b/pywaybackup/SnapshotCollection.py index dcafcf9..81b8055 100644 --- a/pywaybackup/SnapshotCollection.py +++ b/pywaybackup/SnapshotCollection.py @@ -163,45 +163,56 @@ def _insert_batch_safe(line_batch): vb.write(verbose=None, content="\nInserting CDX data into database...") - progressbar = Progressbar( + try: + vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] starting insert_cdx operation") + progressbar = Progressbar( unit=" lines", total=self._cdx_total, desc="process cdx".ljust(15), ascii="░▒█", bar_format="{l_bar}{bar:50}{r_bar}{bar:-10b}", - ) - line_batchsize = 2500 - line_batch = [] - total_inserted = 0 - first_line = True - - with self.cdxfile as f: - for line in f: - if first_line: - first_line = False - continue - line = line.strip() - if line.endswith("]]"): - line = line.rsplit("]", 1)[0] - if line.endswith(","): - line = line.rsplit(",", 1)[0] - - try: - line_batch.append(__parse_line(line)) - except json.decoder.JSONDecodeError: - self._snapshot_faulty += 1 - continue - - if len(line_batch) >= line_batchsize: + ) + line_batchsize = 2500 + line_batch = [] + total_inserted = 0 + first_line = True + + with self.cdxfile as f: + for line in f: + if first_line: + first_line = False + continue + line = line.strip() + if line.endswith("]]"): + line = line.rsplit("]", 1)[0] + if line.endswith(","): + line = line.rsplit(",", 1)[0] + + try: + line_batch.append(__parse_line(line)) + except json.decoder.JSONDecodeError: + self._snapshot_faulty += 1 + continue + + if len(line_batch) >= line_batchsize: + total_inserted += _insert_batch_safe(line_batch=line_batch) + line_batch = [] + progressbar.update(line_batchsize) + + if line_batch: total_inserted += _insert_batch_safe(line_batch=line_batch) - line_batch = [] - progressbar.update(line_batchsize) - - if line_batch: - total_inserted += _insert_batch_safe(line_batch=line_batch) - progressbar.update(len(line_batch)) - - self.db.session.commit() + progressbar.update(len(line_batch)) + + self.db.session.commit() + vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] insert_cdx commit successful") + except Exception as e: + vb.write(verbose=True, content=f"[SnapshotCollection._insert_cdx] exception: {e}; rolling back") + try: + self.db.session.rollback() + vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] rollback successful") + except Exception: + vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] rollback failed") + raise def _index_snapshots(self): """ @@ -297,29 +308,40 @@ def _skip_set(self): """ # ? for now per row / no bulk for compatibility - with self.csvfile as f: - total_skipped = 0 - for row in f: - self.db.session.execute( - update(waybackup_snapshots) - .where( - and_( - waybackup_snapshots.timestamp == row["timestamp"], - waybackup_snapshots.url_origin == row["url_origin"], + try: + vb.write(verbose=True, content="[SnapshotCollection._skip_set] applying CSV skips to DB") + with self.csvfile as f: + total_skipped = 0 + for row in f: + self.db.session.execute( + update(waybackup_snapshots) + .where( + and_( + waybackup_snapshots.timestamp == row["timestamp"], + waybackup_snapshots.url_origin == row["url_origin"], + ) + ) + .values( + url_archive=row["url_archive"], + redirect_url=row["redirect_url"], + redirect_timestamp=row["redirect_timestamp"], + response=row["response"], + file=row["file"], ) ) - .values( - url_archive=row["url_archive"], - redirect_url=row["redirect_url"], - redirect_timestamp=row["redirect_timestamp"], - response=row["response"], - file=row["file"], - ) - ) - total_skipped += 1 - - self.db.session.commit() - self._filter_skip = total_skipped + total_skipped += 1 + + self.db.session.commit() + self._filter_skip = total_skipped + vb.write(verbose=True, content=f"[SnapshotCollection._skip_set] commit successful, total_skipped={total_skipped}") + except Exception as e: + vb.write(verbose=True, content=f"[SnapshotCollection._skip_set] exception: {e}; rolling back") + try: + self.db.session.rollback() + vb.write(verbose=True, content="[SnapshotCollection._skip_set] rollback successful") + except Exception: + vb.write(verbose=True, content="[SnapshotCollection._skip_set] rollback failed") + raise def count_total(self) -> int: return self.db.session.query(waybackup_snapshots.scid).count() diff --git a/pywaybackup/Worker.py b/pywaybackup/Worker.py index 6f66a6c..4a9ff01 100644 --- a/pywaybackup/Worker.py +++ b/pywaybackup/Worker.py @@ -21,6 +21,27 @@ def init(self): self.db = Database() self.connection = http.client.HTTPSConnection("web.archive.org") + def close(self): + """ + Try to close the database and connection. + """ + try: + if hasattr(self, "db") and self.db: + try: + vb.write(verbose=True, content=f"[Worker.close] closing DB for worker {self.id}") + self.db.close() + vb.write(verbose=True, content=f"[Worker.close] DB closed for worker {self.id}") + except Exception: + pass + finally: + try: + if hasattr(self, "connection") and self.connection: + vb.write(verbose=True, content=f"[Worker.close] closing connection for worker {self.id}") + self.connection.close() + vb.write(verbose=True, content=f"[Worker.close] connection closed for worker {self.id}") + except Exception: + pass + def assign_snapshot(self, total_amount: int): self.snapshot = Snapshot(self.db, output=self.output, mode=self.mode) self.total_amount = total_amount diff --git a/pywaybackup/db.py b/pywaybackup/db.py index 508f833..e4fb412 100644 --- a/pywaybackup/db.py +++ b/pywaybackup/db.py @@ -18,6 +18,7 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from typing import Optional # python 3.8 +from pywaybackup.Verbosity import Verbosity as vb Base = declarative_base() @@ -129,8 +130,28 @@ def __init__(self): self.session = self.sessman() def close(self): - self.session.commit() - self.session.close() + """ + Try to commit any pending work; if that fails, rollback to avoid leaving open transactions + """ + try: + if self.session.in_transaction(): + vb.write(verbose=True, content=f"[Database.close] session in transaction: committing") + try: + self.session.commit() + vb.write(verbose=True, content=f"[Database.close] commit successful") + except Exception as e: + vb.write(verbose=True, content=f"[Database.close] commit failed: {e}; rolling back") + try: + self.session.rollback() + vb.write(verbose=True, content=f"[Database.close] rollback successful") + except Exception: + vb.write(verbose=True, content=f"[Database.close] rollback failed") + finally: + try: + self.session.close() + vb.write(verbose=True, content=f"[Database.close] session closed") + except Exception as e: + vb.write(verbose=True, content=f"[Database.close] session close failed: {e}") def write_progress(self, done: int, total: int): """ From 1e1599a6ea9c25be099600d7c771dd62b6ac1b61 Mon Sep 17 00:00:00 2001 From: Victor Johnston Date: Wed, 11 Feb 2026 11:50:48 +0100 Subject: [PATCH 4/4] Set verbosity level to high --- pywaybackup/Snapshot.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pywaybackup/Snapshot.py b/pywaybackup/Snapshot.py index 75c5b54..d9c027c 100644 --- a/pywaybackup/Snapshot.py +++ b/pywaybackup/Snapshot.py @@ -78,7 +78,7 @@ def __get_row(): session = self._db.session # get next available SnapshotId - vb.write(verbose=True, content=f"[Snapshot.fetch] selecting next scid") + vb.write(verbose="high", content=f"[Snapshot.fetch] selecting next scid") scid = ( session.execute( select(waybackup_snapshots.scid) @@ -90,7 +90,7 @@ def __get_row(): ) if scid is None: - vb.write(verbose=True, content=f"[Snapshot.fetch] no unprocessed scid found") + vb.write(verbose="high", content=f"[Snapshot.fetch] no unprocessed scid found") return None # try to atomically claim the row by updating only if still unclaimed @@ -101,13 +101,13 @@ def __get_row(): ) # if another worker claimed it first, rowcount will be 0 and cannot be claimed by this worker. - vb.write(verbose=True, content=f"[Snapshot.fetch] attempted to claim scid={scid}, rowcount={result.rowcount}") + vb.write(verbose="high", content=f"[Snapshot.fetch] attempted to claim scid={scid}, rowcount={result.rowcount}") if result.rowcount == 0: try: session.commit() except Exception: pass - vb.write(verbose=True, content=f"[Snapshot.fetch] scid={scid} already claimed by another worker") + vb.write(verbose="high", content=f"[Snapshot.fetch] scid={scid} already claimed by another worker") return None # The row has been claimed by the worker and can now be fetched. @@ -119,7 +119,7 @@ def __get_row(): session.rollback() except Exception: pass - vb.write(verbose=True, content=f"[Snapshot.fetch] claimed scid={scid} and fetched row") + vb.write(verbose="high", content=f"[Snapshot.fetch] claimed scid={scid} and fetched row") return row if __on_sqlite(): @@ -138,14 +138,14 @@ def modify(self, column, value): """ column = getattr(waybackup_snapshots, column) try: - vb.write(verbose=True, content=f"[Snapshot.modify] updating scid={self.scid} column={column.key}") + vb.write(verbose="high", content=f"[Snapshot.modify] updating scid={self.scid} column={column.key}") self._db.session.execute( update(waybackup_snapshots).where(waybackup_snapshots.scid == self.scid).values({column: value}) ) self._db.session.commit() - vb.write(verbose=True, content=f"[Snapshot.modify] update committed scid={self.scid} column={column.key}") + vb.write(verbose="high", content=f"[Snapshot.modify] update committed scid={self.scid} column={column.key}") except Exception as e: - vb.write(verbose=True, content=f"[Snapshot.modify] update failed scid={self.scid} error={e}; rolling back") + vb.write(verbose="high", content=f"[Snapshot.modify] update failed scid={self.scid} error={e}; rolling back") try: self._db.session.rollback() except Exception: