Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions cyberdrop_dl/appdata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import annotations

import dataclasses
from contextvars import ContextVar
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pathlib import Path

_appdata: ContextVar[AppData] = ContextVar("_appdata")


@dataclasses.dataclass(slots=True)
class AppData:
path: Path
cookies_dir: Path = dataclasses.field(init=False)
cache_file: Path = dataclasses.field(init=False)
default_config: Path = dataclasses.field(init=False)
db_file: Path = dataclasses.field(init=False)

def __post_init__(self) -> None:
self.cookies_dir = self.path / "cookies"
self.cache_file = self.path / "cache.yaml"
self.default_config = self.path / "config.yaml"
self.db_file = self.path / "cyberdrop.db"

def __fspath__(self) -> str:
return str(self)

def __str__(self) -> str:
return str(self.path)

def mkdirs(self) -> None:
for dir in (self.cookies_dir,):
dir.mkdir(parents=True, exist_ok=True)


def get() -> AppData:
return _appdata.get()
65 changes: 65 additions & 0 deletions cyberdrop_dl/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

import dataclasses
from collections.abc import Iterator, MutableMapping
from contextvars import ContextVar, Token
from typing import TYPE_CHECKING, Any, Self

from cyberdrop_dl import __version__
from cyberdrop_dl.utils import yaml

if TYPE_CHECKING:
from pathlib import Path

_cache: ContextVar[Cache] = ContextVar("_cache")


@dataclasses.dataclass(slots=True)
class Cache(MutableMapping[str, Any]):
file: Path
_cache: dict[str, Any] = dataclasses.field(init=False)
_token: Token[Cache] | None = None

def __post_init__(self) -> None:
self._cache = yaml.load(self.file)

def __getitem__(self, key: str) -> Any:
return self.get(key)

def __iter__(self) -> Iterator[str]:
return iter(self._cache)

def __len__(self) -> int:
return len(self._cache)

def __delitem__(self, key: str) -> None:
try:
_ = self._cache.pop(key)
except KeyError:
pass
else:
self._save()

def __setitem__(self, key: str, value: Any, /) -> None:
self._cache[key] = value
self._save()

def __enter__(self) -> Self:
self._token = _cache.set(self)
return self

def __exit__(self, *_) -> None:
assert self._token is not None
self._token = _cache.reset(self._token)
self.close()

def _save(self) -> None:
if self._token is None:
yaml.save(self.file, self._cache)

def close(self) -> None:
self["version"] = __version__


def get():
return _cache.get()
199 changes: 59 additions & 140 deletions cyberdrop_dl/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
from __future__ import annotations
import shutil
from typing import Annotated, Literal

import dataclasses
import sys
from argparse import SUPPRESS, ArgumentParser, RawDescriptionHelpFormatter
from shutil import get_terminal_size
from typing import TYPE_CHECKING, Any, Final, NoReturn
import cyclopts
import pydantic
from cyclopts import Parameter

from pydantic import BaseModel, ValidationError

from cyberdrop_dl import __version__, env
from cyberdrop_dl.cli import arguments
from cyberdrop_dl.cli.model import CLIargs, ParsedArgs
from cyberdrop_dl.config import ConfigSettings, GlobalSettings

if TYPE_CHECKING:
from argparse import _ArgumentGroup as ArgGroup # pyright: ignore[reportPrivateUsage]
from collections.abc import Sequence
from cyberdrop_dl import __version__, env, signature
from cyberdrop_dl.cli.model import CLIargs, ParsedArgs, RetryArgs
from cyberdrop_dl.models.types import HttpURL
from cyberdrop_dl.utils.yaml import format_validation_error


def is_terminal_in_portrait() -> bool:
Expand All @@ -24,7 +17,7 @@ def is_terminal_in_portrait() -> bool:
if env.PORTRAIT_MODE:
return True

terminal_size = get_terminal_size()
terminal_size = shutil.get_terminal_size()
width, height = terminal_size.columns, terminal_size.lines
aspect_ratio = width / height

Expand All @@ -33,139 +26,65 @@ def is_terminal_in_portrait() -> bool:
return False

# Check for mobile device in portrait mode
if (aspect_ratio < 1.5 and height >= 40) or (width <= 85 and aspect_ratio < 2.3):
if (aspect_ratio < 1.5 and height >= 40) or (aspect_ratio < 2.3 and width <= 85):
return True

# Assume landscape mode for other cases
return False


class CustomHelpFormatter(RawDescriptionHelpFormatter):
MAX_HELP_POS: Final = 80
INDENT_INCREMENT: Final = 2

def __init__(self, prog: str, width: int | None = None) -> None:
super().__init__(prog, self.INDENT_INCREMENT, self.MAX_HELP_POS, width)

def _get_help_string(self, action) -> str | None:
if action.help:
return action.help.replace("program's", "CDL") # The ' messes up the markdown formatting
return action.help


@dataclasses.dataclass(slots=True)
class CLIParser:
parser: ArgumentParser
groups: dict[str, list[ArgGroup]]

def parse_args(self, args: Sequence[str] | None = None) -> dict[str, dict[str, Any]]:
return self._unflatten(self._parse_args(args))

def _parse_args(self, args: Sequence[str] | None = None) -> dict[str, Any]:
return dict(sorted(vars(self.parser.parse_intermixed_args(args)).items()))

def _unflatten(self, namespace: dict[str, Any]) -> dict[str, dict[str, Any]]:
parsed_args: dict[str, dict[str, Any]] = {}

for name, groups in self.groups.items():
parsed_args[name] = {}
for group in groups:
group_dict = {arg.dest: v for arg in group._group_actions if (v := namespace.get(arg.dest)) is not None}
if group_dict:
assert group.title
parsed_args[name][group.title] = _unflatten_nested_args(group_dict)

parsed_args["cli_only_args"] = parsed_args["cli_only_args"]["CLI-only options"]
return parsed_args


def make_parser() -> CLIParser:
kwargs: dict[str, Any] = {"color": True} if sys.version_info > (3, 14) else {}
parser = ArgumentParser(
description="Bulk asynchronous downloader for multiple file hosts",
usage="cyberdrop-dl [OPTIONS] URL [URL...]",
allow_abbrev=False,
formatter_class=CustomHelpFormatter,
**kwargs,
)
_ = parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}")

cli_only = parser.add_argument_group("CLI-only options")
_add_args_from_model(cli_only, CLIargs)

groups = {
"config_settings": _create_groups_from_nested_models(parser, ConfigSettings),
"global_settings": _create_groups_from_nested_models(parser, GlobalSettings),
"cli_only_args": [cli_only],
}

return CLIParser(parser, groups)


def parse_args(args: Sequence[str] | None = None) -> ParsedArgs:
"""Parses the command line arguments passed into the program."""
from cyberdrop_dl.utils.yaml import handle_validation_error

parsed_args = make_parser().parse_args(args)
try:
model = ParsedArgs.model_validate(parsed_args, extra="forbid")

except ValidationError as e:
handle_validation_error(e, title="CLI arguments")
sys.exit(1)

if model.cli_only_args.show_supported_sites:
show_supported_sites()

return model


def show_supported_sites() -> NoReturn:
from rich import print

class App(cyclopts.App):
@signature.copy(cyclopts.App._parse_known_args)
def _parse_known_args(self, *args, **kwargs):
try:
return super()._parse_known_args(*args, **kwargs)
except cyclopts.ValidationError as e:
if isinstance(e.__cause__, pydantic.ValidationError):
e.exception_message = format_validation_error(e.__cause__, title="CLI arguments")
raise


app = App(
name="cyberdrop-dl",
help="Bulk asynchronous downloader for multiple file hosts",
version=f"{__version__}.NTFS",
default_parameter=Parameter(negative_iterable=[]),
)


@app.command()
def download(
links: Annotated[
list[HttpURL] | None,
Parameter(
name="links",
negative=[],
help="link(s) to content to download",
),
] = None,
/,
*,
cli_args: CLIargs = CLIargs(), # noqa: B008 # pyright: ignore[reportCallInDefaultInitializer]
parsed_settings: ParsedArgs = ParsedArgs(), # pyright: ignore[reportCallInDefaultInitializer] # noqa: B008
):
"""Scrape and download files from a list of URLs (from a file or stdin)"""
return links, cli_args, parsed_settings


@app.command()
def show_supported_sites() -> None:
"""Show a list of all supported sites"""
from cyberdrop_dl.utils.markdown import get_crawlers_info_as_rich_table

table = get_crawlers_info_as_rich_table()
print(table)
sys.exit(0)


def _unflatten_nested_args(data: dict[str, Any]) -> dict[str, Any]:
result: dict[str, Any] = {}

for command_name, value in data.items():
inner_names = command_name.split(".")
current_level = result
for index, key in enumerate(inner_names):
if index < len(inner_names) - 1:
if key not in current_level:
current_level[key] = {}
current_level = current_level[key]
else:
current_level[key] = value
return result


def _add_args_from_model(parser: ArgumentParser | ArgGroup, model: type[BaseModel]) -> None:
cli_args = model is CLIargs

for arg in arguments.parse(model):
options = arg.compose_options()

if cli_args and arg.arg_type is bool and not (arg.cli_name == "portrait" and env.RUNNING_IN_TERMUX):
default = arg.default if cli_args else SUPPRESS
options["action"] = "store_false" if default else "store_true"
app.console.print(table)

_ = parser.add_argument(*arg.name_or_flags, **options)

@app.command()
def retry(choice: Literal["all", "failed", "maintenance"], /, *, retry: RetryArgs | None = None):
"""Retry failed downloads"""
return choice, retry or RetryArgs()

def _create_groups_from_nested_models(parser: ArgumentParser, model: type[BaseModel]) -> list[ArgGroup]:
groups: list[ArgGroup] = []
for name, field in model.model_fields.items():
submodel = field.annotation
assert submodel and issubclass(submodel, BaseModel)
submodel_group = parser.add_argument_group(name)
_add_args_from_model(submodel_group, submodel)
groups.append(submodel_group)

return groups
if __name__ == "__main__":
app()
Loading
Loading