Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pywaybackup/PyWayBackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ def _workflow(self):
resources after the backup is complete.

"""
collection = None
try:
self._startup()

Expand All @@ -385,6 +386,12 @@ def _workflow(self):
self._keep = True
ex.exception(message="", e=e)
finally:
# if a collection was created during the workflow, close its DB session cleanly
try:
if collection:
collection.close()
except Exception:
pass
self._shutdown()

def paths(self, rel: bool = False) -> dict:
Expand Down
78 changes: 62 additions & 16 deletions pywaybackup/Snapshot.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import os
import threading

from pywaybackup.db import Database, select, update, waybackup_snapshots
from pywaybackup.db import Database, select, update, waybackup_snapshots, and_
from pywaybackup.helper import url_split
from pywaybackup.Verbosity import Verbosity as vb


class Snapshot:
Expand Down Expand Up @@ -70,20 +71,55 @@ def __on_sqlite():
return False

def __get_row():
with self._db.session.begin():
row = self._db.session.execute(
select(waybackup_snapshots)
# Atomic claim: find next unprocessed scid, set response='LOCK' only if still unprocessed,
# then fetch that row. This avoids relying on FOR UPDATE or explicit nested transactions
# which can trigger "A transaction is already begun on this Session" errors.

session = self._db.session

# get next available SnapshotId
vb.write(verbose="high", content=f"[Snapshot.fetch] selecting next scid")
scid = (
session.execute(
select(waybackup_snapshots.scid)
.where(waybackup_snapshots.response.is_(None))
.order_by(waybackup_snapshots.scid)
.limit(1)
.with_for_update(skip_locked=True)
).scalar_one_or_none()

if row is None:
return None

row.response = "LOCK"

)
.scalar_one_or_none()
)

if scid is None:
vb.write(verbose="high", content=f"[Snapshot.fetch] no unprocessed scid found")
return None

# try to atomically claim the row by updating only if still unclaimed
result = session.execute(
update(waybackup_snapshots)
.where(and_(waybackup_snapshots.scid == scid, waybackup_snapshots.response.is_(None)))
.values(response="LOCK")
)

# if another worker claimed it first, rowcount will be 0 and cannot be claimed by this worker.
vb.write(verbose="high", content=f"[Snapshot.fetch] attempted to claim scid={scid}, rowcount={result.rowcount}")
if result.rowcount == 0:
try:
session.commit()
except Exception:
pass
vb.write(verbose="high", content=f"[Snapshot.fetch] scid={scid} already claimed by another worker")
return None

# The row has been claimed by the worker and can now be fetched.
row = session.execute(select(waybackup_snapshots).where(waybackup_snapshots.scid == scid)).scalar_one_or_none()
try:
session.commit()
except Exception:
try:
session.rollback()
except Exception:
pass
vb.write(verbose="high", content=f"[Snapshot.fetch] claimed scid={scid} and fetched row")
return row

if __on_sqlite():
Expand All @@ -101,10 +137,20 @@ def modify(self, column, value):
value: New value to set for the column.
"""
column = getattr(waybackup_snapshots, column)
self._db.session.execute(
update(waybackup_snapshots).where(waybackup_snapshots.scid == self.scid).values({column: value})
)
self._db.session.commit()
try:
vb.write(verbose="high", content=f"[Snapshot.modify] updating scid={self.scid} column={column.key}")
self._db.session.execute(
update(waybackup_snapshots).where(waybackup_snapshots.scid == self.scid).values({column: value})
)
self._db.session.commit()
vb.write(verbose="high", content=f"[Snapshot.modify] update committed scid={self.scid} column={column.key}")
except Exception as e:
vb.write(verbose="high", content=f"[Snapshot.modify] update failed scid={self.scid} error={e}; rolling back")
try:
self._db.session.rollback()
except Exception:
pass
raise

def create_output(self):
"""
Expand Down
130 changes: 76 additions & 54 deletions pywaybackup/SnapshotCollection.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,45 +163,56 @@ def _insert_batch_safe(line_batch):

vb.write(verbose=None, content="\nInserting CDX data into database...")

progressbar = Progressbar(
try:
vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] starting insert_cdx operation")
progressbar = Progressbar(
unit=" lines",
total=self._cdx_total,
desc="process cdx".ljust(15),
ascii="░▒█",
bar_format="{l_bar}{bar:50}{r_bar}{bar:-10b}",
)
line_batchsize = 2500
line_batch = []
total_inserted = 0
first_line = True

with self.cdxfile as f:
for line in f:
if first_line:
first_line = False
continue
line = line.strip()
if line.endswith("]]"):
line = line.rsplit("]", 1)[0]
if line.endswith(","):
line = line.rsplit(",", 1)[0]

try:
line_batch.append(__parse_line(line))
except json.decoder.JSONDecodeError:
self._snapshot_faulty += 1
continue

if len(line_batch) >= line_batchsize:
)
line_batchsize = 2500
line_batch = []
total_inserted = 0
first_line = True

with self.cdxfile as f:
for line in f:
if first_line:
first_line = False
continue
line = line.strip()
if line.endswith("]]"):
line = line.rsplit("]", 1)[0]
if line.endswith(","):
line = line.rsplit(",", 1)[0]

try:
line_batch.append(__parse_line(line))
except json.decoder.JSONDecodeError:
self._snapshot_faulty += 1
continue

if len(line_batch) >= line_batchsize:
total_inserted += _insert_batch_safe(line_batch=line_batch)
line_batch = []
progressbar.update(line_batchsize)

if line_batch:
total_inserted += _insert_batch_safe(line_batch=line_batch)
line_batch = []
progressbar.update(line_batchsize)

if line_batch:
total_inserted += _insert_batch_safe(line_batch=line_batch)
progressbar.update(len(line_batch))

self.db.session.commit()
progressbar.update(len(line_batch))

self.db.session.commit()
vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] insert_cdx commit successful")
except Exception as e:
vb.write(verbose=True, content=f"[SnapshotCollection._insert_cdx] exception: {e}; rolling back")
try:
self.db.session.rollback()
vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] rollback successful")
except Exception:
vb.write(verbose=True, content="[SnapshotCollection._insert_cdx] rollback failed")
raise

def _index_snapshots(self):
"""
Expand Down Expand Up @@ -297,29 +308,40 @@ def _skip_set(self):
"""

# ? for now per row / no bulk for compatibility
with self.csvfile as f:
total_skipped = 0
for row in f:
self.db.session.execute(
update(waybackup_snapshots)
.where(
and_(
waybackup_snapshots.timestamp == row["timestamp"],
waybackup_snapshots.url_origin == row["url_origin"],
try:
vb.write(verbose=True, content="[SnapshotCollection._skip_set] applying CSV skips to DB")
with self.csvfile as f:
total_skipped = 0
for row in f:
self.db.session.execute(
update(waybackup_snapshots)
.where(
and_(
waybackup_snapshots.timestamp == row["timestamp"],
waybackup_snapshots.url_origin == row["url_origin"],
)
)
.values(
url_archive=row["url_archive"],
redirect_url=row["redirect_url"],
redirect_timestamp=row["redirect_timestamp"],
response=row["response"],
file=row["file"],
)
)
.values(
url_archive=row["url_archive"],
redirect_url=row["redirect_url"],
redirect_timestamp=row["redirect_timestamp"],
response=row["response"],
file=row["file"],
)
)
total_skipped += 1

self.db.session.commit()
self._filter_skip = total_skipped
total_skipped += 1

self.db.session.commit()
self._filter_skip = total_skipped
vb.write(verbose=True, content=f"[SnapshotCollection._skip_set] commit successful, total_skipped={total_skipped}")
except Exception as e:
vb.write(verbose=True, content=f"[SnapshotCollection._skip_set] exception: {e}; rolling back")
try:
self.db.session.rollback()
vb.write(verbose=True, content="[SnapshotCollection._skip_set] rollback successful")
except Exception:
vb.write(verbose=True, content="[SnapshotCollection._skip_set] rollback failed")
raise

def count_total(self) -> int:
return self.db.session.query(waybackup_snapshots.scid).count()
Expand Down
21 changes: 21 additions & 0 deletions pywaybackup/Worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ def init(self):
self.db = Database()
self.connection = http.client.HTTPSConnection("web.archive.org")

def close(self):
"""
Try to close the database and connection.
"""
try:
if hasattr(self, "db") and self.db:
try:
vb.write(verbose=True, content=f"[Worker.close] closing DB for worker {self.id}")
self.db.close()
vb.write(verbose=True, content=f"[Worker.close] DB closed for worker {self.id}")
except Exception:
pass
finally:
try:
if hasattr(self, "connection") and self.connection:
vb.write(verbose=True, content=f"[Worker.close] closing connection for worker {self.id}")
self.connection.close()
vb.write(verbose=True, content=f"[Worker.close] connection closed for worker {self.id}")
except Exception:
pass

def assign_snapshot(self, total_amount: int):
self.snapshot = Snapshot(self.db, output=self.output, mode=self.mode)
self.total_amount = total_amount
Expand Down
25 changes: 23 additions & 2 deletions pywaybackup/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from typing import Optional # python 3.8
from pywaybackup.Verbosity import Verbosity as vb

Base = declarative_base()

Expand Down Expand Up @@ -129,8 +130,28 @@ def __init__(self):
self.session = self.sessman()

def close(self):
self.session.commit()
self.session.close()
"""
Try to commit any pending work; if that fails, rollback to avoid leaving open transactions
"""
try:
if self.session.in_transaction():
vb.write(verbose=True, content=f"[Database.close] session in transaction: committing")
try:
self.session.commit()
vb.write(verbose=True, content=f"[Database.close] commit successful")
except Exception as e:
vb.write(verbose=True, content=f"[Database.close] commit failed: {e}; rolling back")
try:
self.session.rollback()
vb.write(verbose=True, content=f"[Database.close] rollback successful")
except Exception:
vb.write(verbose=True, content=f"[Database.close] rollback failed")
finally:
try:
self.session.close()
vb.write(verbose=True, content=f"[Database.close] session closed")
except Exception as e:
vb.write(verbose=True, content=f"[Database.close] session close failed: {e}")

def write_progress(self, done: int, total: int):
"""
Expand Down
14 changes: 7 additions & 7 deletions pywaybackup/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def sanitize_filename(filename: str) -> str:
"""
Sanitize a string to be used as (part of) a filename.
"""
disallowed = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
disallowed = ["<", ">", ":", '"', "/", "\\", "|", "?", "*", "=", "#", "!", "~"]
for char in disallowed:
filename = filename.replace(char, ".")
filename = ".".join(filter(None, filename.split(".")))
Expand Down Expand Up @@ -63,12 +63,12 @@ def url_split(url, index=False):
else:
filename = "index.html" if index else ""
subdir = "/".join(path_parts).strip("/")
# sanitize subdir and filename for windows
if check_nt():
special_chars = [":", "*", "?", "&", "=", "<", ">", "\\", "|"]
for char in special_chars:
subdir = subdir.replace(char, f"%{ord(char):02x}")
filename = filename.replace(char, f"%{ord(char):02x}")

# Sanitize special characters that are problematic in file- and foldernames
special_chars = [":", "*", "?", "&", "=", "<", ">", "\\", "|", "#", "!", "~"]
for char in special_chars:
subdir = subdir.replace(char, f"%{ord(char):02x}")
filename = filename.replace(char, f"%{ord(char):02x}")
filename = filename.replace("%20", " ")
return domain, subdir, filename

Expand Down