diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..bdaa458 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,22 @@ +; http://editorconfig.org/ + +root = true + +[*] +indent_style = space +indent_size = 4 +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true + +[*.{yml,yaml}] +indent_size = 2 + +[{vendor,inc/phpseclib}/**] +; Use editor default (possible autodetection). +indent_style = +indent_size = +end_of_line = +trim_trailing_whitespace = +insert_final_newline = \ No newline at end of file diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..b702f28 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1,3 @@ +# These are supported funding model platforms + +github: [LaswitchTech] diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 0000000..3b6bc43 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,31 @@ +--- +name: Bug report +about: Create a report to help us improve +title: '' +labels: bug +assignees: '' + +--- + +### Description + +[Description of the bug or feature] + +### Steps to reproduce + +1. [First Step] +2. [Second Step] +3. [and so on...] + +**Expected behavior:** [What you expected to happen] + +**Actual behavior:** [What actually happened] + +### Versions + +* [PHP] +* [Browser] + +### Screenshots or Logs + +[Paste your logs or attach the screenshot] diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 0000000..191ad50 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,20 @@ +--- +name: Feature request +about: Suggest an idea for this project +title: '' +labels: feature request +assignees: '' + +--- + +**Is your feature request related to a problem? Please describe.** +A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] + +**Describe the solution you'd like** +A clear and concise description of what you want to happen. + +**Describe alternatives you've considered** +A clear and concise description of any alternative solutions or features you've considered. + +**Additional context** +Add any other context or screenshots about the feature request here. \ No newline at end of file diff --git a/.github/no-response.yml b/.github/no-response.yml new file mode 100644 index 0000000..c7c7e69 --- /dev/null +++ b/.github/no-response.yml @@ -0,0 +1,13 @@ +# Configuration for probot-no-response - https://github.com/probot/no-response + +# Number of days of inactivity before an Issue is closed for lack of response +daysUntilClose: 14 +# Label requiring a response +responseRequiredLabel: need more info +# Comment to post when closing an Issue for lack of response. Set to `false` to disable +closeComment: > + This issue has been automatically closed because there has been no response + to our request for more information from the original author. With only the + information that is currently in the issue, we don't have enough information + to take action. Please reach out if you have or find the answers we need so + that we can investigate further. \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..4cde3ed --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,60 @@ +name: Release + +on: + push: + tags: + - 'v*' + +jobs: + release: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Set Tag as Filename + id: tag_name + run: echo "TAG_NAME=${GITHUB_REF##*/}" >> $GITHUB_ENV + + - name: Create ZIP file + run: zip -r "${{ env.TAG_NAME }}.zip" . + + - name: Generate Changelog + id: generate_changelog + run: | + # Find the most recent tag before the current one + PREV_TAG=$(git describe --tags --abbrev=0 HEAD^) + + # Create a new CHANGELOG.md file with headers + echo -e "# Changelog\n" > CHANGELOG.md + + # List commit messages between the previous tag and current HEAD + git log ${PREV_TAG}..HEAD --pretty=format:"* %s" >> CHANGELOG.md + + # List unique contributors for these commits + echo -e "\n\n# Contributors\n" >> CHANGELOG.md + git log ${PREV_TAG}..HEAD --format='%aN' | sort -u | awk '{print "* " $0}' >> CHANGELOG.md + + - name: Create Release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GH_PAT }} + with: + tag_name: ${{ github.ref }} + release_name: Release ${{ github.ref }} + draft: false + prerelease: false + body_path: ./CHANGELOG.md + + - name: Upload Asset + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GH_PAT }} + with: + upload_url: ${{ steps.create_release.outputs.upload_url }} + asset_path: ./${{ env.TAG_NAME }}.zip + asset_name: source.zip + asset_content_type: application/zip diff --git a/.gitignore b/.gitignore index b7faf40..1b4c0e1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,16 +1,44 @@ +# Python +/build/ + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ +TOKEN +runtime + +# Mac OS X +.DS_Store +*.DS_Store + +# Git +.git + +# Filetypes +*.cfg +*.log +*.db + +# Unique Directories +/tmp/ +/data/ + # Byte-compiled / optimized / DLL files __pycache__/ -*.py[codz] +*.py[cod] *$py.class -# C extensions -*.so - # Distribution / packaging .Python build/ develop-eggs/ -dist/ +config/ +logs/ downloads/ eggs/ .eggs/ @@ -46,7 +74,7 @@ htmlcov/ nosetests.xml coverage.xml *.cover -*.py.cover +*.py,cover .hypothesis/ .pytest_cache/ cover/ @@ -82,47 +110,13 @@ target/ profile_default/ ipython_config.py -# pyenv -# For a library or package, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# .python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# UV -# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -#uv.lock - -# poetry -# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. -# This is especially recommended for binary packages to ensure reproducibility, and is more -# commonly ignored for libraries. -# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock -#poetry.toml - # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python. -# https://pdm-project.org/en/latest/usage/project/#working-with-version-control #pdm.lock -#pdm.toml -.pdm-python -.pdm-build/ - -# pixi -# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control. -#pixi.lock -# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one -# in the .venv directory. It is recommended not to include this directory in version control. -.pixi +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ @@ -134,74 +128,11 @@ celerybeat.pid # SageMath parsed files *.sage.py -# Environments -.env -.envrc -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ +# building source files +src/bin/freerdp/source +src/bin/freerdp/install +src/bin/freerdp/build -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# Cython debug symbols -cython_debug/ - -# PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ - -# Abstra -# Abstra is an AI-powered process automation framework. -# Ignore directories containing user credentials, local state, and settings. -# Learn more at https://abstra.io/docs -.abstra/ - -# Visual Studio Code -# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore -# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore -# and can be added to the global gitignore or merged into this file. However, if you prefer, -# you could uncomment the following to ignore the entire vscode folder -# .vscode/ - -# Ruff stuff: -.ruff_cache/ - -# PyPI configuration file -.pypirc - -# Cursor -# Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to -# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data -# refer to https://docs.cursor.com/context/ignore-files -.cursorignore -.cursorindexingignore - -# Marimo -marimo/_static/ -marimo/_lsp/ -__marimo__/ +# Exclusions +!src/bin/freerdp/*/*/lib/ +!src/bin/freerdp/*/*/xfreerdp diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..137e0dc --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "src/core"] + path = src/core + url = https://github.com/LaswitchTech/corePY.git + branch = dev diff --git a/README.md b/README.md index 714570a..add0fce 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,45 @@ +

+ # Replicator -Data Replication Software, Replacement for Windows DFSR with support for Local, SMB and FTP filesystems. +![License](https://img.shields.io/github/license/LaswitchTech/Replicator?style=for-the-badge) +![GitHub repo size](https://img.shields.io/github/repo-size/LaswitchTech/Replicator?style=for-the-badge&logo=github) +![GitHub top language](https://img.shields.io/github/languages/top/LaswitchTech/Replicator?style=for-the-badge) +![GitHub Downloads](https://img.shields.io/github/downloads/LaswitchTech/Replicator/total?style=for-the-badge) +![Version](https://img.shields.io/github/v/release/LaswitchTech/Replicator?label=Version&style=for-the-badge) + +## Description +Replicator is a cross-platform Python application designed to provide replication services similar to Windows DFSR. It supports local filesystems, SMB, FTP and SSHFS, making it a versatile solution for data replication needs across different operating systems. + +## Features + - **Cross-Platform Compatibility**: Replicator is compatible with Windows, macOS and Linux, with specific adjustments made to ensure seamless operation on both operating systems. + - **Customizable Interface**: The application uses a customizable UI that allows users to define their preferred settings. + - **Logging and Debugging**: The application includes logging features for easier debugging and tracking of issues during the replication process. + +## License +This software is distributed under the [GPLv3](LICENSE) license. + +## Security +Please disclose any vulnerabilities found responsibly – report security issues to the maintainers privately. See [SECURITY.md](SECURITY.md) for more information. + +## Contributing +Contributions to Replicator are welcome! If you have ideas for new features or have found bugs, please open an issue or submit a pull request. + +### How to Contribute + - **Fork the Repository**: Create a fork of the repository on GitHub. + - **Create a New Branch**: For new features or bug fixes, create a new branch in your fork. + - **Submit a Pull Request**: Once your changes are ready, submit a pull request to the main repository. + +## To Do + - ~~**Support for Local**: Add support for Local filesystems.~~ + - ~~**Support for SMB**: Add support for SMB Shares.~~ + - ~~**Support for FTP**: Add support for FTP Shares.~~ --- REMOVED --- + - ~~**Support for SSHFS**: Add support for SSH filesystems.~~ --- REMOVED --- + - ~~**Mode Mirror**: Add replication mode Mirror.~~ + - **Mode Incremental**: Add replication mode Incremental. + - ~~**Direction**: Add replication direction (One way, Two way).~~ + - ~~**Scheduling**: Add scheduling capabilities for replication tasks.~~ + - ~~**Conflict Resolution**: Implement conflict resolution strategies for file changes.~~ + - ~~**Service Integration**: Integrate with system services for background operation.~~ + +## Wait, where is the documentation? +Review the [Documentation](https://laswitchtech.com/en/blog/projects/replicator/index). diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000..86201e0 --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,10 @@ +# Security Policy + +Security vulnerabilities can be reported for the current stable release and the `stable` branch. + +## Reporting a Vulnerability + +You have multiple options on reporting vulnerabilities + +* Send an e-mail to [Support Team](mailto:support@laswitchtech.com) +* Open a [Github Issue](https://github.com/LaswitchTech/Replicator/issues) diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..45c7a58 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +v0.0.1 diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..87b99ac --- /dev/null +++ b/build.sh @@ -0,0 +1,397 @@ +#!/bin/bash +set -euo pipefail + +# Function to print messages with a timestamp +log() { + echo "$(date +'%Y-%m-%d %H:%M:%S') - $1" +} + +# Function to detect the operating system +detect_os() { + case "$(uname -s)" in + Darwin) + echo "macos" + ;; + Linux) + echo "linux" + ;; + *) + echo "unsupported" + ;; + esac +} + +# Set the name of the application +NAME="PyRDPConnect" + +# Determine the operating system +OS=$(detect_os) + +if [ "$OS" == "unsupported" ]; then + log "Unsupported operating system. Exiting." + exit 1 +fi + +# Determine arch and whether we should use system PyQt5 (APT) on Linux ARM +ARCH="$(uname -m)" +USE_SYSTEM_PYQT=0 +if [ "$OS" = "linux" ] && { [ "$ARCH" = "aarch64" ] || [ "$ARCH" = "armv7l" ] || [ "$ARCH" = "armhf" ]; }; then + USE_SYSTEM_PYQT=1 +fi + +# Path to a vendored FreeRDP binary (set on macOS; Linux uses --add-data and resolves at runtime) +FREERDP_BIN="" + +# Create a directory to store the final output based on the OS +FINAL_DIR="dist/$OS" +if [ -d "$FINAL_DIR" ]; then + rm -rf "$FINAL_DIR" +fi +mkdir -p "$FINAL_DIR" + +# Require python3.11 on PATH +PYTHON_BIN="$(command -v python3.11 || true)" +if [ -z "$PYTHON_BIN" ]; then + log "python3.11 not found. On macOS, run: brew install python@3.11" + exit 1 +fi + +# (Re)create venv if missing or wrong version +NEED_RECREATE=0 +if [ ! -x "env/bin/python" ]; then + NEED_RECREATE=1 +else + VENV_VER="$(env/bin/python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")' || echo unknown)" + if [ "$VENV_VER" != "3.11" ]; then + NEED_RECREATE=1 + fi +fi + +if [ "$NEED_RECREATE" -eq 1 ]; then + log "Creating fresh Python 3.11 virtual environment..." + rm -rf env + if [ "$USE_SYSTEM_PYQT" -eq 1 ]; then + # allow APT-installed PyQt5 to be visible in the venv + "$PYTHON_BIN" -m venv --system-site-packages env + else + "$PYTHON_BIN" -m venv env + fi +fi + +# Activate venv +# shellcheck disable=SC1091 +source env/bin/activate + +# Double-check version (hard fail if not 3.11) +ACTIVE_VER="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')" +if [ "$ACTIVE_VER" != "3.11" ]; then + log "Active Python is $ACTIVE_VER, expected 3.11. Aborting." + exit 1 +fi +log "Using Python $(python -V)" + +# Ensure that pip is updated +log "Updating pip..." +python -m pip install --upgrade pip wheel + +log "Installing build dependencies..." +# PyInstaller 6.9+ supports 3.11 well; lock to <7 to avoid future surprizes. +# PyQt5 5.15.x is stable for Qt5 on macOS/Linux; lock <6. +python -m pip install "pyinstaller>=6.9,<7" "sip>=6.9,<7" + +if [ "$USE_SYSTEM_PYQT" -eq 1 ]; then + # Ensure system PyQt5 (and QtSvg) are present + if command -v apt-get >/dev/null 2>&1; then + log "Installing system PyQt5 via APT (requires sudo)..." + sudo apt-get update + sudo apt-get install -y python3-pyqt5 python3-pyqt5.qtsvg + else + log "ERROR: APT not found; cannot install system PyQt5. Install PyQt5 manually or switch to a distro with APT." + exit 1 + fi + + # ensure system dist-packages are visible inside the venv (Debian/RPi) + SYS_PYTHON="$(command -v python3)" + + # Collect all system "dist-packages" dirs that might contain APT's PyQt5 + SYS_DIST_DIRS="$("$SYS_PYTHON" - <<'PY' +import site, sys +paths=set() + +# Prefer entries that actually end with dist-packages +for p in getattr(site, 'getsitepackages', lambda: [])() or []: + if p.endswith('dist-packages'): + paths.add(p) + +up = getattr(site, 'getusersitepackages', lambda: None)() +if up and str(up).endswith('dist-packages'): + paths.add(up) + +# Common Debian/RPi locations +for c in ('/usr/lib/python3/dist-packages', '/usr/local/lib/python3/dist-packages'): + paths.add(c) + +print('\\n'.join(sorted(paths))) +PY + )" + + # Path to *this venv's* site-packages + VENV_SITE_PKGS="$(python - <<'PY' +import sysconfig +print(sysconfig.get_paths()["purelib"]) +PY + )" + + # Write a .pth pointing to each system dist-packages dir + : > "$VENV_SITE_PKGS/_system_dist_packages.pth" + while IFS= read -r d; do + [ -n "$d" ] && echo "$d" >> "$VENV_SITE_PKGS/_system_dist_packages.pth" + done <<< "$SYS_DIST_DIRS" + + # Hard guarantee via sitecustomize.py + cat > "$VENV_SITE_PKGS/sitecustomize.py" <<'PY' +import sys +NEED = [ + '/usr/lib/python3/dist-packages', + '/usr/local/lib/python3/dist-packages', +] +for p in NEED: + if p not in sys.path: + sys.path.append(p) +PY + + # Sanity check (now should succeed) + python - <<'PY' +import sys +try: + import PyQt5, PyQt5.QtCore, PyQt5.QtWidgets, PyQt5.QtSvg + print("OK: System PyQt5 detected at:", PyQt5.__file__) +except Exception as e: + print("sys.path =", sys.path) + raise SystemExit(f"PyQt5 missing after setup: {e}") +PY +else + # Non-ARM / macOS etc: keep using PyPI wheels + python -m pip install "PyQt5>=5.15,<6" +fi + +# Optional tools you had; keeping them only if you need them: +python -m pip install PySide6-Addons +# python -m pip install importlib PySide6-Addons + +# Check if the .spec file exists +SPEC_FILE="$NAME.spec" +ICON_FILE="src/icons/icon.icns" + +# Cleanup: Remove the leftover dist/$NAME directory on macOS +log "Cleaning up..." +if [ -d "dist/$NAME" ]; then + rm -rf "dist/$NAME" +fi +if [ -f "$SPEC_FILE" ]; then + rm -f "$SPEC_FILE" +fi + +log "Verifying required PyQt5 modules are present..." +python - <<'PY' +from importlib.util import find_spec +missing = [m for m in ("PyQt5", "PyQt5.QtSvg") if find_spec(m) is None] +if missing: + raise SystemExit(f"Missing modules before build: {missing}") +print("Qt check passed.") +PY + +log ".spec file not found. Generating a new one with PyInstaller..." +if [ "$OS" == "macos" ]; then + pyinstaller --windowed --name "$NAME" src/PyRDPConnect.py +elif [ "$OS" == "linux" ]; then + # Linux build: bundle data and hidden import directly + pyinstaller \ + --onefile \ + --name "$NAME" \ + --hidden-import PyQt5.QtSvg \ + --add-data "src/app:app" \ + --add-data "src/styles:styles" \ + --add-data "src/icons:icons" \ + --add-data "src/img:img" \ + --add-data "src/freerdp/linux:freerdp/linux" \ + src/PyRDPConnect.py +fi + +# Ensure the spec file now exists +if [ ! -f "$SPEC_FILE" ]; then + log "Failed to create .spec file. Exiting." + exit 1 +fi + +log "Generated .spec file: $SPEC_FILE" + +# Update the .spec file to include the custom icon, data files, and hidden imports +log "Updating the .spec file to include the custom icon, data files, and hidden imports..." +if [ "$OS" == "macos" ]; then + sed -i '' "s|icon=None|icon='$ICON_FILE'|g" $SPEC_FILE + sed -i '' "/Analysis/s/(.*)/\0, hiddenimports=['PyQt5.QtSvg']/" $SPEC_FILE + sed -i '' "/a.datas +=/a \\ + datas=[('src/styles', 'styles'), ('src/icons', 'icons'), ('src/app', 'app'), ('src/img', 'img')], + " $SPEC_FILE +elif [ "$OS" == "linux" ]; then + sed -i "s|icon=None|icon='$ICON_FILE'|g" $SPEC_FILE + sed -i "/Analysis/s/(.*)/\0, hiddenimports=['PyQt5.QtSvg']/" $SPEC_FILE + sed -i "/a.datas +=/a \\ + datas=[('src/styles', 'styles'), ('src/icons', 'icons'), ('src/app', 'app'), ('src/img', 'img')], + " $SPEC_FILE +fi + +# Build the project with PyInstaller using the updated .spec file +log "Building the project with PyInstaller..." +pyinstaller --noconfirm $SPEC_FILE + +# Copy resources into the appropriate location +if [ "$OS" == "macos" ]; then + APP_ROOT="dist/$NAME.app/Contents" + APP_MACOS="$APP_ROOT/MacOS" + APP_RES="$APP_ROOT/Resources" + + log "Creating app resource directories..." + mkdir -p "$APP_RES/styles" "$APP_RES/img" "$APP_RES/icons" + cp -R src/styles/* "$APP_RES/styles/" + cp -R src/img/* "$APP_RES/img/" + cp -R src/icons/* "$APP_RES/icons/" + + # ---- Bundle FreeRDP as PyRDPConnect expects (Resources/freerdp/macos/...) ---- + VENDOR_DIR_SRC="src/freerdp/macos" + VENDOR_DIR_DST="$APP_RES/freerdp/macos" + mkdir -p "$VENDOR_DIR_DST" + log "Copying FreeRDP tree to Resources..." + rsync -a "$VENDOR_DIR_SRC/" "$VENDOR_DIR_DST/" + + FREERDP_BIN="$VENDOR_DIR_DST/xfreerdp" + LIB_DIR="$VENDOR_DIR_DST/lib" + X11_DIR="$VENDOR_DIR_DST/x11" # optional (if you vendored X11 libs) + PLUGINS_DIR="$VENDOR_DIR_DST/plugins" # optional + + chmod +x "$FREERDP_BIN" + + # ---- Patch rpaths + rewrite absolute Homebrew refs -> @rpath/ ---- + BREW_PREFIX="$(brew --prefix 2>/dev/null || echo /opt/homebrew)" + + add_rpath_if_missing() { + local bin="$1" r="$2" + if ! otool -l "$bin" | awk '/LC_RPATH/{getline; print $2}' | grep -qx "$r"; then + install_name_tool -add_rpath "$r" "$bin" 2>/dev/null || true + fi + } + + # We want dyld to look inside ../lib and ../x11 relative to xfreerdp + add_rpath_if_missing "$FREERDP_BIN" "@loader_path/../lib" + if [ -d "$X11_DIR" ]; then + add_rpath_if_missing "$FREERDP_BIN" "@loader_path/../x11" + fi + + # For each lib we ship, set id to @rpath/ and rewrite any Homebrew absolute deps to @rpath/ + patch_one_file() { + local file="$1" + # set its own id (for dylibs) + if [[ "$file" == *.dylib ]]; then + install_name_tool -id "@rpath/$(basename "$file")" "$file" 2>/dev/null || true + fi + # rewrite deps + otool -L "$file" | awk 'NR>1{print $1}' | while read -r dep; do + [ -z "$dep" ] && continue + case "$dep" in + /System/*|/usr/lib/*) continue ;; # keep system libs + esac + if echo "$dep" | grep -Eq "^$BREW_PREFIX/(opt|Cellar)/"; then + base="$(basename "$dep")" + install_name_tool -change "$dep" "@rpath/$base" "$file" 2>/dev/null || true + fi + done + } + + log "Patching bundled dylibs..." + if [ -d "$LIB_DIR" ]; then + find "$LIB_DIR" -type f -name "*.dylib" -print0 | while IFS= read -r -d '' f; do + chmod u+w "$f" + patch_one_file "$f" + done + fi + if [ -d "$X11_DIR" ]; then + find "$X11_DIR" -type f -name "*.dylib" -print0 | while IFS= read -r -d '' f; do + chmod u+w "$f" + patch_one_file "$f" + done + fi + + log "Patching xfreerdp to use @rpath for Homebrew deps..." + patch_one_file "$FREERDP_BIN" + + # Ad-hoc sign so dyld doesn’t complain + log "Ad-hoc codesigning bundled libs and app..." + if [ -d "$LIB_DIR" ]; then + find "$LIB_DIR" -type f -name "*.dylib" -exec codesign --force --timestamp=none -s - {} \; + fi + if [ -d "$X11_DIR" ]; then + find "$X11_DIR" -type f -name "*.dylib" -exec codesign --force --timestamp=none -s - {} \; + fi + [ -f "$FREERDP_BIN" ] && codesign --force --timestamp=none -s - "$FREERDP_BIN" + codesign --force --deep --timestamp=none -s - "dist/$NAME.app" + + log "Verify xfreerdp linkage (should show @rpath -> ../lib and ../x11):" + otool -L "$FREERDP_BIN" | sed 's/^/ /' +else + log "Moving the executable to the $FINAL_DIR directory..." + mv "dist/$NAME" "$FINAL_DIR/" +fi + +# Verify the vendored version (macOS and linux) +expect_major="3" # adjust if you vendor 2.x +if [ -x "${FREERDP_BIN:-}" ]; then + vend_ver="$("$FREERDP_BIN" +version 2>/dev/null | head -n1 | grep -Eo '[0-9]+\.[0-9]+(\.[0-9]+)?' || true)" + if [ -z "$vend_ver" ]; then + log "WARN: Could not detect vendored FreeRDP version from +version" + else + vmaj="${vend_ver%%.*}" + if [ "$vmaj" != "$expect_major" ]; then + log "ERROR: Vendored FreeRDP major version is $vmaj, expected $expect_major" + exit 1 + fi + log "Vendored FreeRDP version: $vend_ver" + fi +fi + +# On Linux, fail fast if vendored xfreerdp has unresolved deps +if [ "$OS" = "linux" ] && [ -x "${FREERDP_BIN:-}" ]; then + if command -v ldd >/dev/null 2>&1; then + if ldd "$FREERDP_BIN" | grep -q "not found"; then + log "ERROR: Missing shared libraries for vendored xfreerdp:" + ldd "$FREERDP_BIN" | grep "not found" || true + exit 1 + fi + else + log "WARN: ldd not available; skipping shared-library check." + fi +fi + +# Move the built application or executable to the appropriate directory +if [ "$OS" == "macos" ]; then + log "Moving the .app bundle to the $FINAL_DIR directory..." + mv "dist/$NAME.app" "$FINAL_DIR/" + + # Create a DMG image + log "Creating a DMG image for macOS..." + DMG_NAME="$FINAL_DIR/$NAME.dmg" + hdiutil create "$DMG_NAME" -volname "$NAME" -srcfolder "$FINAL_DIR/$NAME.app" -ov -format UDZO + + log "DMG image created at $DMG_NAME" +fi + +# Cleanup: Remove the leftover dist/$NAME directory on macOS +if [ -d "dist/$NAME" ]; then + log "Cleaning up the dist directory..." + rm -rf "dist/$NAME" +fi + +log "Build completed successfully." + +# Deactivate the virtual environment +deactivate diff --git a/src/bin/rclone b/src/bin/rclone new file mode 120000 index 0000000..276b3ba --- /dev/null +++ b/src/bin/rclone @@ -0,0 +1 @@ +../core/bin/rclone \ No newline at end of file diff --git a/src/core b/src/core new file mode 160000 index 0000000..4e2ad24 --- /dev/null +++ b/src/core @@ -0,0 +1 @@ +Subproject commit 4e2ad24fd643103251d1a33290226ae5a7096601 diff --git a/src/icons/icon.icns b/src/icons/icon.icns new file mode 100644 index 0000000..c989200 Binary files /dev/null and b/src/icons/icon.icns differ diff --git a/src/icons/icon.png b/src/icons/icon.png new file mode 100644 index 0000000..e683fe2 Binary files /dev/null and b/src/icons/icon.png differ diff --git a/src/icons/icon.svg b/src/icons/icon.svg new file mode 100644 index 0000000..d29f626 --- /dev/null +++ b/src/icons/icon.svg @@ -0,0 +1,51 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main.py b/src/main.py new file mode 100755 index 0000000..b9c688e --- /dev/null +++ b/src/main.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +# src/main.py + +import sys + +from core.application import Application +from core.cli import CommandLine +from replicator.replicator import Replicator + +# --------------------------------------------------------------------------- +# Customization and start of the application +# --------------------------------------------------------------------------- + +name = "Replicator" + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def has_option(): + return any(arg.startswith('--') for arg in sys.argv) + +# --------------------------------------------------------------------------- +# Start of the application +# --------------------------------------------------------------------------- + +def start_app(): + app = Application(name,sys.argv) + + # Create main window and register it with Application + win = Replicator() + app.set_mainWindow(win) + + # All other code gets app via QApplication.instance() + sys.exit(app.exec_()) + +def start_cli(): + cli = CommandLine(name,sys.argv) + + # Create main command line handler and register it with CommandLine + handler = Replicator() + handler.cli(cli) + + # All other code gets app via QApplication.instance() + sys.exit(cli.exec()) + +# --------------------------------------------------------------------------- +# Main entry point +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + if has_option(): + start_cli() + else: + start_app() diff --git a/src/replicator/__init__.py b/src/replicator/__init__.py new file mode 100644 index 0000000..edab12c --- /dev/null +++ b/src/replicator/__init__.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 +# src/core/__init__.py + +from .replicator import Replicator +from .ui import JobDialog, ScheduleDialog +from .job import Schedule, Endpoint, Job, JobRunResult, JobStore +from .migration import Migration + +__version__ = "1.0.0" + +__all__ = ["Replicator", "JobDialog", "ScheduleDialog", "Schedule", "Endpoint", "Job", "JobRunResult", "JobStore", "Migration"] diff --git a/src/replicator/job.py b/src/replicator/job.py new file mode 100644 index 0000000..c7ae25c --- /dev/null +++ b/src/replicator/job.py @@ -0,0 +1,1023 @@ +#!/usr/bin/env python3 +# src/replicator/job.py + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone, time +from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union +import json +import sqlite3 + +import tempfile +import time +from pathlib import Path +from urllib.parse import urlparse + +import os +import re + +try: + # corePY SQLite wrapper (preferred) + from core.database.sqlite import SQLite # type: ignore +except Exception: # pragma: no cover + SQLite = None # type: ignore + +try: + # corePY Share mount helper + from core.filesystem.share import Share, ShareAuth, ShareError # type: ignore +except Exception: # pragma: no cover + Share = None # type: ignore + ShareAuth = None # type: ignore + ShareError = Exception # type: ignore + + + +JsonDict = Dict[str, Any] + + +# --------------------------------------------------------------------------- +# Logging helpers (Share adapter + secret redaction) +# --------------------------------------------------------------------------- + +def _redact_secrets(s: str) -> str: + """Redact obvious credentials from command strings/log lines.""" + if not s: + return s + s = re.sub(r"(pass=)([^,\s]+)", r"\1***", s, flags=re.IGNORECASE) + s = re.sub(r"(password=)([^,\s]+)", r"\1***", s, flags=re.IGNORECASE) + # common flags + s = re.sub(r"(--password\s+)(\S+)", r"\1***", s, flags=re.IGNORECASE) + s = re.sub(r"(--pass\s+)(\S+)", r"\1***", s, flags=re.IGNORECASE) + return s + + +class _ShareLogger: + """Adapter that exposes debug/info/warning/error and forwards to Job.run(logger=...).""" + + def __init__(self, fn: Optional[Callable[[str, str], None]]): + self._fn = fn + + def debug(self, msg: str) -> None: + if self._fn: + self._fn(_redact_secrets(msg), "debug") + + def info(self, msg: str) -> None: + if self._fn: + self._fn(_redact_secrets(msg), "info") + + def warning(self, msg: str) -> None: + if self._fn: + self._fn(_redact_secrets(msg), "warning") + + def error(self, msg: str) -> None: + if self._fn: + self._fn(_redact_secrets(msg), "error") + + + +# --------------------------------------------------------------------------- +# Remote endpoints (Share mount helpers) +# --------------------------------------------------------------------------- + +class RemoteMountError(RuntimeError): + pass + +def _parse_smb_location(location: str) -> Tuple[str, str, str]: + """Return (host, share, subpath) from location. + + Accepts forms: + - //host/share/subpath + - \\host\\share\\subpath + - host/share/subpath + """ + loc = (location or "").strip() + loc = loc.replace("\\", "/") + loc = loc.lstrip("/") + parts = [p for p in loc.split("/") if p] + if len(parts) < 2: + raise RemoteMountError("SMB location must include host and share (e.g. //server/Share[/path]).") + host = parts[0] + share = parts[1] + subpath = "/".join(parts[2:]) if len(parts) > 2 else "" + return host, share, subpath + + +@dataclass +class _MountedEndpoint: + local_path: str + mount_point: Optional[str] = None + share: Any = None # Share instance when mounted via core.filesystem.share + + def cleanup(self) -> None: + if self.share is not None and self.mount_point: + try: + self.share.umount(self.mount_point, elevate=False) + except Exception: + pass + + +def _mount_endpoint_if_remote( + endpoint: "Endpoint", + job_id: Union[int, None], + role: str, + *, + logger: Optional[Callable[[str, str], None]] = None, + timeout: int = 15, +) -> _MountedEndpoint: + """If endpoint is remote (smb), mount it and return local path; otherwise return local location. + + Uses core.filesystem.share.Share (rclone-backed on macOS when only rclone is bundled). + """ + t = (endpoint.type or "local").lower() + if t == "local": + return _MountedEndpoint(local_path=endpoint.location) + + if t != "smb": + raise RemoteMountError(f"Unsupported endpoint type: {t}") + + if Share is None: + raise RemoteMountError("Share support not available (failed to import core.filesystem.share.Share)") + + # Mount under a stable temp folder for the job + base = Path(tempfile.gettempdir()) / "replicator" / "mounts" / (str(job_id or "new")) / role + base.mkdir(parents=True, exist_ok=True) + mount_point = str(base) + + # Parse endpoint location into host + remote path + host = "" + remote = "" + auth_in = dict(endpoint.auth or {}) if isinstance(endpoint.auth, dict) else {} + + if t == "smb": + host, share, subpath = _parse_smb_location(endpoint.location) + remote = share + (f"/{subpath}" if subpath else "") + + # Build ShareAuth + try: + share_auth = ShareAuth( + username=str(auth_in.get("username") or ""), + password=str(auth_in.get("password") or ""), + domain=str(auth_in.get("domain") or ""), + port=int(auth_in.get("port") or 445) or None, + guest=bool(auth_in.get("guest", True)), + ) + except Exception: + # Fallback if ShareAuth signature changes + share_auth = None + + share_logger = _ShareLogger(logger) + share = Share(logger=share_logger) + + # Allow passing through extra rclone args (e.g. VFS cache mode) via auth['rcloneArgs'] + extra_args = auth_in.get("rcloneArgs") if isinstance(auth_in.get("rcloneArgs"), list) else None + if extra_args: + share_logger.debug(f"[Share] Extra mount args provided: {extra_args}") + + # Mount + try: + share_logger.debug(f"[Share] Mount request: protocol={t} host={host} remote={remote} mount_point={mount_point}") + # Share.mount supports a generic signature; pass known fields. + share.mount( + protocol=t, + host=host, + remote=remote, + mount_point=mount_point, + auth=share_auth or auth_in, + timeout=timeout, + read_only=False, + elevate=False, + ) + except Exception as e: + raise RemoteMountError(f"Failed to mount {t} endpoint via Share: {e}") + + # Share performs its own probe logging; return the mounted endpoint. + return _MountedEndpoint(local_path=mount_point, mount_point=mount_point, share=share) + + +# --------------------------------------------------------------------------- +# Data models +# --------------------------------------------------------------------------- + +@dataclass(frozen=True) +class Endpoint: + type: str = "local" + location: str = "" + auth: JsonDict = field(default_factory=dict) + + def validate(self, role: str) -> List[str]: + errs: List[str] = [] + if not self.type: + errs.append(f"{role} endpoint type is required.") + if not self.location: + errs.append(f"{role} endpoint location is required.") + t = (self.type or "").lower() + + if t == "smb": + port = self.auth.get("port") + if port is not None: + try: + p = int(port) + if p <= 0 or p > 65535: + errs.append(f"{role} endpoint port must be between 1 and 65535.") + except Exception: + errs.append(f"{role} endpoint port must be an integer.") + return errs + + def to_db_fields(self, role: str) -> JsonDict: + t = (self.type or "local").lower() + auth = dict(self.auth or {}) + + port: Optional[int] = None + guest: int = 1 + username: Optional[str] = None + password: Optional[str] = None + useKey: int = 0 + sshKey: Optional[str] = None + options: JsonDict = {} + + if t == "local": + pass + elif t == "smb": + guest = 1 if bool(auth.get("guest", True)) else 0 + username = auth.get("username") or None + password = auth.get("password") or None + # Optional SMB domain + options["domain"] = auth.get("domain") + # Optional rclone args (applies to all remote types) + if isinstance(auth.get("rcloneArgs"), list): + options["rcloneArgs"] = auth.get("rcloneArgs") + + known_keys = {"guest", "username", "password", "port", "domain"} + for k, v in auth.items(): + if k not in known_keys: + options[k] = v + + return { + "role": role, + "type": t, + "location": self.location, + "port": port, + "guest": guest, + "username": username, + "password": password, + "useKey": useKey, + "sshKey": sshKey, + "options": json.dumps(options) if options else None, + } + + @staticmethod + def from_db_row(row: Mapping[str, Any]) -> "Endpoint": + t = (row.get("type") or "local").lower() + auth: JsonDict = {} + + if t == "local": + auth = {} + elif t == "smb": + auth = { + "guest": bool(row.get("guest", 1)), + "username": row.get("username") or "", + "password": row.get("password") or "", + } + auth["port"] = row.get("port") or 445 + # domain is stored in options JSON when present + + opt = row.get("options") + if opt: + try: + extra = json.loads(opt) + if isinstance(extra, dict): + auth.update(extra) + except Exception: + pass + + return Endpoint(type=t, location=row.get("location") or "", auth=auth) + + +@dataclass +class Schedule: + enabled: bool = True + intervalSeconds: int = 3600 + windows: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "enabled": bool(self.enabled), + "intervalSeconds": int(self.intervalSeconds or 0), + "windows": self.windows if isinstance(self.windows, dict) else {}, + } + + @staticmethod + def from_dict(d: Optional[Dict[str, Any]]) -> "Schedule": + d = d or {} + enabled = bool(d.get("enabled", True)) + # intervalSeconds can be on the schedule itself, or in per-day window objects + try: + interval_s = int(d.get("intervalSeconds") or 0) + except Exception: + interval_s = 0 + windows = d.get("windows") if isinstance(d.get("windows"), dict) else {} + if interval_s <= 0: + # best-effort: derive from first window on any day + try: + for _k, _v in windows.items(): + if isinstance(_v, list) and _v and isinstance(_v[0], dict): + v = _v[0].get("intervalSeconds") + if v is not None: + interval_s = int(v or 0) + break + except Exception: + pass + if interval_s <= 0: + interval_s = 3600 + return Schedule(enabled=enabled, intervalSeconds=interval_s, windows=windows) + + def validate(self) -> List[str]: + errs: List[str] = [] + if self.enabled: + try: + iv = int(self.intervalSeconds or 0) + except Exception: + iv = 0 + if iv <= 0: + errs.append("Schedule intervalSeconds must be > 0 when schedule is enabled.") + # windows is optional; if provided, validate structure best-effort + if self.windows and not isinstance(self.windows, dict): + errs.append("Schedule windows must be an object/dict.") + return errs + + def _parse_hhmm(val: Any) -> Optional[Tuple[int, int]]: + try: + parts = str(val).strip().split(":") + if len(parts) != 2: + return None + hh = int(parts[0]); mm = int(parts[1]) + if hh < 0 or hh > 23 or mm < 0 or mm > 59: + return None + return (hh, mm) + except Exception: + return None + + try: + for _day, arr in (self.windows or {}).items(): + if not isinstance(arr, list): + errs.append("Schedule windows entries must be lists.") + continue + for w in arr: + if not isinstance(w, dict): + errs.append("Schedule window must be an object.") + continue + if _parse_hhmm(w.get("start")) is None: + errs.append("Schedule window start must be HH:MM.") + if _parse_hhmm(w.get("end")) is None: + errs.append("Schedule window end must be HH:MM.") + if "intervalSeconds" in w: + try: + if int(w.get("intervalSeconds") or 0) <= 0: + errs.append("Schedule window intervalSeconds must be > 0 when provided.") + except Exception: + errs.append("Schedule window intervalSeconds must be an integer.") + except Exception: + # best-effort validation only + pass + + return errs + + def _window_allows_now_local(self, now_local: datetime) -> bool: + # If windows is empty/missing => allowed. + if not self.windows or not isinstance(self.windows, dict): + return True + + wd = now_local.weekday() # 0..6 (Mon..Sun) + day_windows = self.windows.get(str(wd)) or self.windows.get(wd) + if not isinstance(day_windows, list) or not day_windows: + return False + + tnow = now_local.time().replace(tzinfo=None) + + def _parse_hhmm(val: Any) -> Optional[Tuple[int, int]]: + try: + parts = str(val).strip().split(":") + if len(parts) != 2: + return None + hh = int(parts[0]); mm = int(parts[1]) + if hh < 0 or hh > 23 or mm < 0 or mm > 59: + return None + return (hh, mm) + except Exception: + return None + + for w in day_windows: + if not isinstance(w, dict): + continue + ps = _parse_hhmm(w.get("start")) + pe = _parse_hhmm(w.get("end")) + if ps is None or pe is None: + continue + sh, sm = ps; eh, em = pe + ts = datetime(2000, 1, 1, sh, sm).time() + te = datetime(2000, 1, 1, eh, em).time() + + if ts <= te: + if ts <= tnow <= te: + return True + else: + # overnight window (e.g. 22:00-06:00) + if tnow >= ts or tnow <= te: + return True + + return False + + def interval_seconds_for_now(self, now: Optional[datetime] = None) -> int: + now_local = (now or datetime.now(timezone.utc)).astimezone() + # Priority: + # 1) per-day window intervalSeconds (first window entry) + # 2) schedule.intervalSeconds + try: + wd = now_local.weekday() + day = None + if isinstance(self.windows, dict): + day = self.windows.get(str(wd)) or self.windows.get(wd) + if isinstance(day, list) and day and isinstance(day[0], dict): + v = day[0].get("intervalSeconds") + if v is not None: + iv = int(v or 0) + if iv > 0: + return iv + except Exception: + pass + + try: + iv = int(self.intervalSeconds or 0) + return iv if iv > 0 else 3600 + except Exception: + return 3600 + + def should_run_now(self, now: Optional[datetime] = None) -> bool: + if not self.enabled: + return False + now_local = (now or datetime.now(timezone.utc)).astimezone() + return self._window_allows_now_local(now_local) + + def next_run_at(self, now: Optional[datetime] = None) -> Optional[datetime]: + # This domain object doesn't store lastScheduledRunAt; caller should decide cadence. + # We return "now + interval" if windows allow now; otherwise the next allowed window start. + if not self.enabled: + return None + + now_utc = now or datetime.now(timezone.utc) + now_local = now_utc.astimezone() + + if self._window_allows_now_local(now_local): + return now_utc + timedelta(seconds=int(self.interval_seconds_for_now(now_utc))) + + # Find next allowed window start within the next 7 days (best-effort). + if not self.windows or not isinstance(self.windows, dict): + return now_utc + timedelta(seconds=int(self.interval_seconds_for_now(now_utc))) + + def _parse_hhmm(val: Any) -> Optional[Tuple[int, int]]: + try: + parts = str(val).strip().split(":") + if len(parts) != 2: + return None + hh = int(parts[0]); mm = int(parts[1]) + if hh < 0 or hh > 23 or mm < 0 or mm > 59: + return None + return (hh, mm) + except Exception: + return None + + base_local = now_local.replace(second=0, microsecond=0) + for add_days in range(0, 8): + day_local = base_local + timedelta(days=add_days) + wd = day_local.weekday() + day_windows = self.windows.get(str(wd)) or self.windows.get(wd) + if not isinstance(day_windows, list) or not day_windows: + continue + # use the first window as the start candidate (UI currently writes a single window per day) + w0 = day_windows[0] if isinstance(day_windows[0], dict) else None + if not w0: + continue + ps = _parse_hhmm(w0.get("start")) + if ps is None: + continue + sh, sm = ps + candidate_local = day_local.replace(hour=sh, minute=sm) + candidate_utc = candidate_local.astimezone(timezone.utc) + if candidate_utc > now_utc: + return candidate_utc + + return None + + +@dataclass +class JobRunResult: + ok: bool + started_at: str + ended_at: str + result: str + message: Optional[str] = None + stats: JsonDict = field(default_factory=dict) + + +@dataclass +class Job: + id: Optional[int] = None + name: str = "" + enabled: bool = True + mode: str = "mirror" + direction: str = "unidirectional" + allowDeletion: bool = False + preserveMetadata: bool = True + conflictPolicy: str = "newest" + pairId: Optional[str] = None + + sourceEndpoint: Endpoint = field(default_factory=Endpoint) + targetEndpoint: Endpoint = field(default_factory=Endpoint) + + schedule: Schedule = field(default_factory=Schedule) + + lastRun: Optional[str] = None + lastResult: Optional[str] = None + lastError: Optional[str] = None + + def validate(self) -> List[str]: + errs: List[str] = [] + if not self.name.strip(): + errs.append("Job name is required.") + if not self.mode: + errs.append("Job mode is required.") + if not self.direction: + errs.append("Job direction is required.") + + errs.extend(self.sourceEndpoint.validate("source")) + errs.extend(self.targetEndpoint.validate("target")) + errs.extend(self.schedule.validate()) + + d = (self.direction or "").lower() + if d not in ("unidirectional", "bidirectional"): + errs.append("Job direction must be 'unidirectional' or 'bidirectional'.") + + cp = (self.conflictPolicy or "newest").lower() + if cp not in ("newest", "keepa", "a", "keepb", "b"): + errs.append("conflictPolicy must be one of: newest, keepA/a, keepB/b.") + + return errs + + def should_run_now(self, now: Optional[datetime] = None) -> bool: + if not self.enabled: + return False + return self.schedule.should_run_now(now) + + def next_run_at(self, now: Optional[datetime] = None) -> Optional[datetime]: + if not self.enabled: + return None + return self.schedule.next_run_at(now) + + def run( + self, + *, + now: Optional[datetime] = None, + copy_func: Optional[Callable[..., bool]] = None, + bidirectional_func: Optional[Callable[..., Tuple[bool, JsonDict]]] = None, + logger: Optional[Callable[[str, str], None]] = None, + ) -> JobRunResult: + now_dt = now or datetime.now(timezone.utc) + started_at = now_dt.isoformat() + + def _log(msg: str, level: str = "info") -> None: + if logger: + try: + logger(msg, level) + except Exception: + pass + + errs = self.validate() + if errs: + msg = "; ".join(errs) + ended = datetime.now(timezone.utc).isoformat() + self.lastRun = ended + self.lastResult = "fail" + self.lastError = msg + return JobRunResult( + ok=False, + started_at=started_at, + ended_at=ended, + result="fail", + message=msg, + stats={}, + ) + + if not self.enabled: + ended = datetime.now(timezone.utc).isoformat() + return JobRunResult( + ok=True, + started_at=started_at, + ended_at=ended, + result="ok", + message="Job disabled; skipped.", + stats={}, + ) + + preserve = bool(self.preserveMetadata) + allow_del = bool(self.allowDeletion) + + # Resolve endpoints (mount SMB endpoints to local paths for the duration of the run) + mounted: List[_MountedEndpoint] = [] + src_m = _mount_endpoint_if_remote(self.sourceEndpoint, self.id, "source", logger=logger) + mounted.append(src_m) + dst_m = _mount_endpoint_if_remote(self.targetEndpoint, self.id, "target", logger=logger) + mounted.append(dst_m) + + src = src_m.local_path + dst = dst_m.local_path + + ok: bool = False + stats: JsonDict = {} + + try: + if (self.direction or "").lower() == "bidirectional": + if not bidirectional_func: + raise NotImplementedError("Bidirectional engine not provided.") + + # IMPORTANT: + # The bidirectional engine historically enforced local endpoints only by checking + # endpoint types. Since remote endpoints are mounted to local paths for the duration + # of the run, we provide a local-view of this job to the engine. + job_local_view = Job( + id=self.id, + name=self.name, + enabled=self.enabled, + mode=self.mode, + direction=self.direction, + allowDeletion=self.allowDeletion, + preserveMetadata=self.preserveMetadata, + conflictPolicy=self.conflictPolicy, + pairId=self.pairId, + sourceEndpoint=Endpoint(type="local", location=src, auth={}), + targetEndpoint=Endpoint(type="local", location=dst, auth={}), + schedule=self.schedule, + lastRun=self.lastRun, + lastResult=self.lastResult, + lastError=self.lastError, + ) + + _log(f"[Job] Running bidirectional job '{self.name}': {src} <-> {dst}", "info") + ok, stats = bidirectional_func(job_local_view, None) + else: + if not copy_func: + raise NotImplementedError("Copy function not provided.") + _log(f"[Job] Running unidirectional job '{self.name}': {src} -> {dst}", "info") + ok = bool(copy_func(src, dst, preserve_metadata=preserve, allow_deletion=allow_del)) + except NotImplementedError as e: + ok = False + self.lastError = str(e) + _log(f"[Job] Not supported: {e}", "error") + except Exception as e: + ok = False + self.lastError = str(e) + _log(f"[Job] Execution failed: {e}", "error") + finally: + # Always unmount remote endpoints + for m in reversed(mounted): + try: + m.cleanup() + except Exception: + pass + + ended_at = datetime.now(timezone.utc).isoformat() + self.lastRun = ended_at + self.lastResult = "ok" if ok else "fail" + if ok: + self.lastError = None + else: + self.lastError = self.lastError or "Failed" + + return JobRunResult( + ok=ok, + started_at=started_at, + ended_at=ended_at, + result="ok" if ok else "fail", + message=None if ok else self.lastError, + stats=stats or {}, + ) + + def to_row_dicts(self) -> Dict[str, Any]: + job_row = { + "id": self.id, + "name": self.name, + "enabled": 1 if self.enabled else 0, + "mode": self.mode or "mirror", + "direction": self.direction or "unidirectional", + "allowDeletion": 1 if self.allowDeletion else 0, + "preserveMetadata": 1 if self.preserveMetadata else 0, + "pairId": self.pairId, + "conflictPolicy": self.conflictPolicy or "newest", + "lastRun": self.lastRun, + "lastResult": self.lastResult, + "lastError": self.lastError, + } + + endpoint_rows = [ + self.sourceEndpoint.to_db_fields("source"), + self.targetEndpoint.to_db_fields("target"), + ] + + sched_row = { + "enabled": 1 if self.schedule and self.schedule.enabled else 0, + "intervalSeconds": int(self.schedule.intervalSeconds if self.schedule else 3600), + "windows": json.dumps(self.schedule.windows if self.schedule and isinstance(self.schedule.windows, dict) else {}), + } + + return { + "job_row": job_row, + "endpoint_rows": endpoint_rows, + "schedule_row": sched_row, + } + + @staticmethod + def from_db_rows( + job_row: Mapping[str, Any], + endpoint_rows: Iterable[Mapping[str, Any]], + schedule_row: Optional[Mapping[str, Any]] = None, + ) -> "Job": + j = Job( + id=int(job_row.get("id")) if job_row.get("id") is not None else None, + name=str(job_row.get("name") or ""), + enabled=bool(job_row.get("enabled", 1)), + mode=str(job_row.get("mode") or "mirror"), + direction=str(job_row.get("direction") or "unidirectional"), + allowDeletion=bool(job_row.get("allowDeletion", 0)), + preserveMetadata=bool(job_row.get("preserveMetadata", 1)), + conflictPolicy=str(job_row.get("conflictPolicy") or "newest"), + pairId=job_row.get("pairId"), + lastRun=job_row.get("lastRun"), + lastResult=job_row.get("lastResult"), + lastError=job_row.get("lastError"), + ) + + src = None + tgt = None + for r in endpoint_rows: + role = (r.get("role") or "").lower() + ep = Endpoint.from_db_row(r) + if role == "source": + src = ep + elif role == "target": + tgt = ep + + j.sourceEndpoint = src or Endpoint() + j.targetEndpoint = tgt or Endpoint() + + if schedule_row: + # Read intervalSeconds and windows as new model, fallback as needed + try: + interval_s = int(schedule_row.get("intervalSeconds") or 0) + except Exception: + interval_s = 0 + # Remove legacy everyMinutes fallback + if interval_s <= 0: + interval_s = 3600 + windows = {} + try: + raw = schedule_row.get("windows") + if raw: + windows = json.loads(raw) if isinstance(raw, str) else (raw if isinstance(raw, dict) else {}) + except Exception: + windows = {} + j.schedule = Schedule( + enabled=bool(schedule_row.get("enabled", 1)), + intervalSeconds=int(interval_s), + windows=windows, + ) + else: + j.schedule = Schedule() + + return j + + def to_legacy_dict(self) -> JsonDict: + return { + "id": self.id, + "name": self.name, + "enabled": self.enabled, + "mode": self.mode, + "direction": self.direction, + "allowDeletion": self.allowDeletion, + "preserveMetadata": self.preserveMetadata, + "pairId": self.pairId, + "conflictPolicy": self.conflictPolicy, + "sourceEndpoint": {"type": self.sourceEndpoint.type, "location": self.sourceEndpoint.location, "auth": dict(self.sourceEndpoint.auth)}, + "targetEndpoint": {"type": self.targetEndpoint.type, "location": self.targetEndpoint.location, "auth": dict(self.targetEndpoint.auth)}, + "schedule": self.schedule.to_dict() if self.schedule else {"enabled": True, "intervalSeconds": 3600, "windows": {}}, + "lastRun": self.lastRun, + "lastResult": self.lastResult, + "lastError": self.lastError, + } + + +# --------------------------------------------------------------------------- +# JobStore (DB persistence) +# --------------------------------------------------------------------------- + +class JobStore: + """SQLite persistence for Job domain objects (UI-agnostic).""" + + def __init__(self, db: Any): + self._db = db + + def _is_core_sqlite(self) -> bool: + return SQLite is not None and isinstance(self._db, SQLite) + + def _conn(self) -> sqlite3.Connection: + if isinstance(self._db, sqlite3.Connection): + return self._db + if callable(self._db): + c = self._db() + if not isinstance(c, sqlite3.Connection): + raise TypeError("JobStore connection provider must return sqlite3.Connection") + return c + raise TypeError("JobStore requires core.database.sqlite.SQLite or sqlite3.Connection") + + def _select(self, table: str, where: Optional[str] = None, params: Any = None, *, order_by: Optional[str] = None) -> List[Dict[str, Any]]: + if self._is_core_sqlite(): + return self._db.select(table, where=where, params=params, order_by=order_by) # type: ignore[union-attr] + conn = self._conn() + sql = f'SELECT * FROM "{table}"' + if where: + sql += f" WHERE {where}" + if order_by: + sql += f" ORDER BY {order_by}" + sql += ";" + cur = conn.execute(sql, params or ()) + rows = cur.fetchall() + return [dict(r) for r in rows] + + def _one(self, table: str, where: str, params: Any) -> Optional[Dict[str, Any]]: + if self._is_core_sqlite(): + return self._db.one(f'SELECT * FROM "{table}" WHERE {where} LIMIT 1;', params) # type: ignore[union-attr] + conn = self._conn() + cur = conn.execute(f'SELECT * FROM "{table}" WHERE {where} LIMIT 1;', params) + r = cur.fetchone() + return dict(r) if r else None + + def _insert(self, table: str, data: Dict[str, Any]) -> int: + if self._is_core_sqlite(): + return int(self._db.insert(table, data)) # type: ignore[union-attr] + conn = self._conn() + keys = list(data.keys()) + cols = ", ".join([f'"{k}"' for k in keys]) + placeholders = ", ".join(["?" for _ in keys]) + sql = f'INSERT INTO "{table}" ({cols}) VALUES ({placeholders});' + cur = conn.execute(sql, tuple(data[k] for k in keys)) + return int(cur.lastrowid or 0) + + def _update(self, table: str, data: Dict[str, Any], where: str, params: Any) -> None: + if self._is_core_sqlite(): + if not isinstance(params, dict): + raise ValueError("JobStore._update with core SQLite requires dict params") + self._db.update(table, data, where, params) # type: ignore[union-attr] + return + conn = self._conn() + keys = list(data.keys()) + set_clause = ", ".join([f'"{k}"=?' for k in keys]) + sql = f'UPDATE "{table}" SET {set_clause} WHERE {where};' + conn.execute(sql, tuple(data[k] for k in keys) + tuple(params if isinstance(params, tuple) else ())) + + def _upsert(self, table: str, data: Dict[str, Any], conflict_columns: List[str], update_columns: Optional[List[str]] = None) -> None: + if self._is_core_sqlite(): + self._db.upsert(table, data, conflict_columns, update_columns) # type: ignore[union-attr] + return + + keys = list(data.keys()) + cols = ", ".join([f'"{k}"' for k in keys]) + placeholders = ", ".join(["?" for _ in keys]) + conflict = ", ".join([f'"{c}"' for c in conflict_columns]) + if update_columns is None: + update_columns = [k for k in keys if k not in conflict_columns] + + if update_columns: + set_clause = ", ".join([f'"{k}"=excluded."{k}"' for k in update_columns]) + sql = f'INSERT INTO "{table}" ({cols}) VALUES ({placeholders}) ON CONFLICT({conflict}) DO UPDATE SET {set_clause};' + else: + sql = f'INSERT INTO "{table}" ({cols}) VALUES ({placeholders}) ON CONFLICT({conflict}) DO NOTHING;' + + conn = self._conn() + conn.execute(sql, tuple(data[k] for k in keys)) + + def _delete(self, table: str, where: str, params: Any) -> None: + if self._is_core_sqlite(): + self._db.delete(table, where, params) # type: ignore[union-attr] + return + conn = self._conn() + conn.execute(f'DELETE FROM "{table}" WHERE {where};', params) + + def _transaction(self): + if self._is_core_sqlite(): + return self._db.transaction() # type: ignore[union-attr] + return self._conn() + + # ------------------------------- + # Reads + # ------------------------------- + + def fetch_all(self) -> List[Job]: + job_rows = self._select("jobs", order_by="id ASC") + jobs: List[Job] = [] + for jr in job_rows: + jid = int(jr.get("id") or 0) + ep_rows = self.fetch_endpoints_rows(jid) + sched_row = self.fetch_schedule_row(jid) + jobs.append(Job.from_db_rows(jr, ep_rows, sched_row)) + return jobs + + def fetch_by_id(self, job_id: int) -> Optional[Job]: + jr = self._one("jobs", "id = ?", (int(job_id),)) + if not jr: + return None + ep_rows = self.fetch_endpoints_rows(int(job_id)) + sched_row = self.fetch_schedule_row(int(job_id)) + return Job.from_db_rows(jr, ep_rows, sched_row) + + def fetch_endpoints_rows(self, job_id: int) -> List[Dict[str, Any]]: + return self._select("endpoints", "jobId = ?", (int(job_id),)) + + def fetch_schedule_row(self, job_id: int) -> Optional[Dict[str, Any]]: + return self._one("schedule", "jobId = ?", (int(job_id),)) + + # ------------------------------- + # Writes + # ------------------------------- + + def upsert(self, job: Job) -> int: + row_dicts = job.to_row_dicts() + job_row: Dict[str, Any] = row_dicts["job_row"] + endpoint_rows: List[Dict[str, Any]] = row_dicts["endpoint_rows"] + schedule_row: Dict[str, Any] = row_dicts["schedule_row"] + + with self._transaction(): + # --- jobs --- + data = { + "name": job_row.get("name"), + "enabled": int(job_row.get("enabled") or 0), + "mode": job_row.get("mode") or "mirror", + "direction": job_row.get("direction") or "unidirectional", + "allowDeletion": int(job_row.get("allowDeletion") or 0), + "preserveMetadata": int(job_row.get("preserveMetadata") or 0), + "pairId": job_row.get("pairId"), + "conflictPolicy": job_row.get("conflictPolicy") or "newest", + "lastRun": job_row.get("lastRun"), + "lastResult": job_row.get("lastResult"), + "lastError": job_row.get("lastError"), + } + + if job.id: + self._update("jobs", data, "id = :id", {"id": int(job.id)}) + job_id = int(job.id) + else: + job_id = self._insert("jobs", data) + job.id = job_id + + # --- endpoints (unique: jobId+role) --- + for ep in endpoint_rows: + role = ep.get("role") + if role not in ("source", "target"): + continue + ep_data = { + "jobId": job_id, + "role": role, + "type": ep.get("type"), + "location": ep.get("location"), + "port": ep.get("port"), + "guest": ep.get("guest"), + "username": ep.get("username"), + "password": ep.get("password"), + "useKey": ep.get("useKey"), + "sshKey": ep.get("sshKey"), + "options": ep.get("options"), + } + self._upsert( + "endpoints", + ep_data, + ["jobId", "role"], + update_columns=["type", "location", "port", "guest", "username", "password", "useKey", "sshKey", "options"], + ) + + # --- schedule (unique: jobId) --- + s_data = { + "jobId": job_id, + "enabled": int(schedule_row.get("enabled") or 0), + "intervalSeconds": int(schedule_row.get("intervalSeconds") or 0), + "windows": schedule_row.get("windows"), + } + self._upsert( + "schedule", + s_data, + ["jobId"], + update_columns=["enabled", "intervalSeconds", "windows"], + ) + + return int(job.id or 0) + + def delete(self, job_id: int) -> None: + with self._transaction(): + self._delete("jobs", "id = ?", (int(job_id),)) diff --git a/src/replicator/migration.py b/src/replicator/migration.py new file mode 100644 index 0000000..622fe89 --- /dev/null +++ b/src/replicator/migration.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +# src/replicator/migration.py + +from __future__ import annotations + +from typing import List, Optional, Sequence, Tuple + +try: + from core.database.sqlite import SQLite + from core.log import Log +except ImportError: + from database.sqlite import SQLite + from log import Log + + +class Migration: + """Database schema creation + migrations for Replicator. + + This class intentionally *does not* re-implement a DB wrapper. + It relies on corePY's `SQLite` for connections/transactions and only + owns Replicator's schema + migration history + small meta KV helpers. + """ + + def __init__(self, db: SQLite, logger: Optional[Log] = None): + self._db = db + self._logger = logger + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def ensure(self) -> None: + """Ensure base tables exist and apply all pending migrations.""" + self._ensure_schema_migrations_table() + self._apply_migrations(self._migrations()) + + def get_meta(self, key: str, default: Optional[str] = None) -> Optional[str]: + """Read a value from the `meta` table. Returns default if missing.""" + try: + row = self._db.one("SELECT value FROM meta WHERE key = ?", (key,)) + if not row: + return default + val = row.get("value") + return default if val is None else str(val) + except Exception: + return default + + def set_meta(self, key: str, value: Optional[str]) -> None: + """Upsert a value into the `meta` table.""" + try: + with self._db.transaction(): + exists = self._db.scalar("SELECT 1 FROM meta WHERE key = ? LIMIT 1", (key,)) + if exists: + self._db.execute( + "UPDATE meta SET value = ?, modified = CURRENT_TIMESTAMP WHERE key = ?", + (value, key), + ) + else: + self._db.execute( + "INSERT INTO meta (key, value) VALUES (?, ?)", + (key, value), + ) + except Exception: + # best effort + pass + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + def _log(self, msg: str, *, level: str = "info", channel: str = "migration") -> None: + if self._logger is not None and hasattr(self._logger, "append"): + self._logger.append(msg, level=level, channel=channel) # type: ignore[call-arg] + else: + print(msg) + + def _ensure_schema_migrations_table(self) -> None: + self._db.execute( + """ + CREATE TABLE IF NOT EXISTS schema_migrations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created DATETIME DEFAULT CURRENT_TIMESTAMP, + modified DATETIME DEFAULT CURRENT_TIMESTAMP, + name TEXT NOT NULL UNIQUE + ); + """ + ) + + def _is_applied(self, name: str) -> bool: + r = self._db.scalar( + "SELECT 1 FROM schema_migrations WHERE name = ? LIMIT 1;", + (name,), + ) + return bool(r) + + def _apply_migrations(self, migrations: Sequence[Tuple[str, Sequence[str]]]) -> None: + for name, stmts in migrations: + if self._is_applied(name): + continue + + try: + with self._db.transaction(): + for stmt in stmts: + try: + self._db.execute(stmt) + except Exception as e: + # SQLite: ignore duplicate column errors when applying additive migrations on fresh schemas + msg = str(e).lower() + if "duplicate column name" in msg: + continue + raise + self._db.execute("INSERT INTO schema_migrations (name) VALUES (?)", (name,)) + + self._log(f"[Migration] applied {name}", level="debug") + except Exception as e: + raise RuntimeError(f"Failed to apply migration {name}: {e}") + + def _migrations(self) -> List[Tuple[str, List[str]]]: + return [ + ( + "0001_init", + [ + # jobs + """ + CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created DATETIME DEFAULT CURRENT_TIMESTAMP, + modified DATETIME DEFAULT CURRENT_TIMESTAMP, + name TEXT NOT NULL, + enabled INTEGER NOT NULL DEFAULT 1, + mode TEXT NOT NULL DEFAULT 'mirror', + direction TEXT NOT NULL DEFAULT 'unidirectional', + allowDeletion INTEGER NOT NULL DEFAULT 0, + preserveMetadata INTEGER NOT NULL DEFAULT 1, + pairId TEXT NULL, + conflictPolicy TEXT NOT NULL DEFAULT 'newest', + lastRun TEXT NULL, + lastResult TEXT NULL, + lastError TEXT NULL + ); + """, + "CREATE INDEX IF NOT EXISTS idx_jobs_enabled ON jobs(enabled);", + + # endpoints + """ + CREATE TABLE IF NOT EXISTS endpoints ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created DATETIME DEFAULT CURRENT_TIMESTAMP, + modified DATETIME DEFAULT CURRENT_TIMESTAMP, + jobId INTEGER NOT NULL, + role TEXT NOT NULL, + type TEXT NOT NULL, + location TEXT NOT NULL, + port INTEGER NULL, + guest INTEGER NOT NULL DEFAULT 1, + username TEXT NULL, + password TEXT NULL, + useKey INTEGER NOT NULL DEFAULT 0, + sshKey TEXT NULL, + options TEXT NULL, + FOREIGN KEY(jobId) REFERENCES jobs(id) ON DELETE CASCADE + ); + """, + "CREATE UNIQUE INDEX IF NOT EXISTS uq_endpoints_job_role ON endpoints(jobId, role);", + + # schedule + """ + CREATE TABLE IF NOT EXISTS schedule ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created DATETIME DEFAULT CURRENT_TIMESTAMP, + modified DATETIME DEFAULT CURRENT_TIMESTAMP, + jobId INTEGER NOT NULL UNIQUE, + -- Schedule is controlled via per-day windows and intervalSeconds. + enabled INTEGER NOT NULL DEFAULT 1, + intervalSeconds INTEGER NOT NULL DEFAULT 3600, + nextRunAt TEXT NULL, + lastScheduledRunAt TEXT NULL, + windows TEXT NULL, + FOREIGN KEY(jobId) REFERENCES jobs(id) ON DELETE CASCADE + ); + """, + + # runs + """ + CREATE TABLE IF NOT EXISTS runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created DATETIME DEFAULT CURRENT_TIMESTAMP, + modified DATETIME DEFAULT CURRENT_TIMESTAMP, + jobId INTEGER NOT NULL, + startedAt TEXT NOT NULL, + endedAt TEXT NULL, + result TEXT NOT NULL, + message TEXT NULL, + stats TEXT NULL, + FOREIGN KEY(jobId) REFERENCES jobs(id) ON DELETE CASCADE + ); + """, + "CREATE INDEX IF NOT EXISTS idx_runs_job_started ON runs(jobId, startedAt);", + + # file_state + """ + CREATE TABLE IF NOT EXISTS file_state ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created DATETIME DEFAULT CURRENT_TIMESTAMP, + modified DATETIME DEFAULT CURRENT_TIMESTAMP, + jobId INTEGER NOT NULL, + side TEXT NOT NULL, + relPath TEXT NOT NULL, + size INTEGER NOT NULL DEFAULT 0, + mtime INTEGER NOT NULL DEFAULT 0, + hash TEXT NULL, + isDir INTEGER NOT NULL DEFAULT 0, + deleted INTEGER NOT NULL DEFAULT 0, + deletedAt TEXT NULL, + meta TEXT NULL, + lastSeenAt TEXT NULL, + lastSeenRunId INTEGER NULL, + FOREIGN KEY(jobId) REFERENCES jobs(id) ON DELETE CASCADE + ); + """, + "CREATE UNIQUE INDEX IF NOT EXISTS uq_file_state ON file_state(jobId, side, relPath);", + + # conflicts + """ + CREATE TABLE IF NOT EXISTS conflicts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created DATETIME DEFAULT CURRENT_TIMESTAMP, + modified DATETIME DEFAULT CURRENT_TIMESTAMP, + jobId INTEGER NOT NULL, + runId INTEGER NULL, + relPath TEXT NOT NULL, + a_size INTEGER NULL, + a_mtime INTEGER NULL, + a_hash TEXT NULL, + b_size INTEGER NULL, + b_mtime INTEGER NULL, + b_hash TEXT NULL, + status TEXT NOT NULL DEFAULT 'open', + resolution TEXT NULL, + note TEXT NULL, + FOREIGN KEY(jobId) REFERENCES jobs(id) ON DELETE CASCADE, + FOREIGN KEY(runId) REFERENCES runs(id) ON DELETE SET NULL + ); + """, + "CREATE INDEX IF NOT EXISTS idx_conflicts_job_status ON conflicts(jobId, status);", + + # meta (simple KV store) + """ + CREATE TABLE IF NOT EXISTS meta ( + key TEXT PRIMARY KEY, + created DATETIME DEFAULT CURRENT_TIMESTAMP, + modified DATETIME DEFAULT CURRENT_TIMESTAMP, + value TEXT NULL + ); + """, + ], + ), + ] diff --git a/src/replicator/replicator.py b/src/replicator/replicator.py new file mode 100644 index 0000000..96680a1 --- /dev/null +++ b/src/replicator/replicator.py @@ -0,0 +1,1618 @@ +#!/usr/bin/env python3 +# src/replicator/replicator.py + +from __future__ import annotations + +from typing import Optional, Any, Dict, List + +import os +import json +import shutil +import tempfile +import time +from pathlib import Path + +# Add datetime import for lastRun/lastResult +from datetime import datetime, timezone, timedelta + +from PyQt5.QtCore import Qt +from PyQt5.QtGui import QPixmap +from PyQt5.QtWidgets import ( + QApplication, + QMainWindow, + QWidget, + QVBoxLayout, + QHBoxLayout, + QLabel, + QTableWidget, + QTableWidgetItem, + QHeaderView, + QDialog, +) + +from .ui import JobDialog, ScheduleDialog +from .migration import Migration +from .job import Job, Endpoint, Schedule, JobStore + + +try: + from core.helper import Helper + from core.configuration import Configuration + from core.log import Log + from core.ui import MsgBox, Form + from core.filesystem.filesystem import FileSystem + from core.filesystem.share import Share, ShareAuth + from core.database.sqlite import SQLite +except ImportError: + from helper import Helper + from configuration import Configuration + from log import Log + from ui import MsgBox, Form + from filesystem.filesystem import FileSystem + from filesystem.share import Share, ShareAuth + from database.sqlite import SQLite + + + +# --------------------------------------------------------------------------- +# Remote endpoints (Share mount) +# --------------------------------------------------------------------------- + +class RemoteMountError(RuntimeError): + pass + + +class _ShareLogger: + """Adapter that forwards Share logs into Replicator's logger.""" + + def __init__(self, append_fn): + self._append = append_fn + + def debug(self, msg: str) -> None: + self._append(f"[Share] {msg}", level="debug") + + def info(self, msg: str) -> None: + self._append(f"[Share] {msg}", level="info") + + def warning(self, msg: str) -> None: + self._append(f"[Share] {msg}", level="warning") + + def error(self, msg: str) -> None: + self._append(f"[Share] {msg}", level="error") + + +class _MountedEndpoint: + def __init__(self, local_path: str, mount_point: str | None = None, share: Share | None = None): + self.local_path = local_path + self.mount_point = mount_point + self.share = share + + def cleanup(self) -> None: + if self.share is not None and self.mount_point: + try: + self.share.umount(self.mount_point, elevate=False) + except Exception: + pass + + + +# --- Location parsing helpers for remote endpoints --- + +def _parse_smb_location(loc: str) -> tuple[str, str]: + """Parse SMB location into (host, remote). + + Accepts forms: + - \\host\Share + - \\host\Share\dir\sub + - //host/Share/dir + - host/Share/dir + Returns: + host, remote where remote is "Share" or "Share/dir/sub". + """ + s = (loc or "").strip() + s = s.replace("/", "\\") + while s.startswith("\\"): + s = s[1:] + parts = [p for p in s.split("\\") if p] + if len(parts) < 2: + raise RemoteMountError(f"Invalid SMB location: {loc}") + host = parts[0] + remote = "/".join(parts[1:]) + return host, remote + + + + +def _mount_endpoint_if_remote( + endpoint: dict[str, Any], + job_id: int | None, + role: str, + *, + share: Share, + log_append, +) -> _MountedEndpoint: + """Mount SMB endpoints using Share and return a local path usable by the sync engine.""" + + t = (endpoint.get("type") or "local").lower() + loc = str(endpoint.get("location") or "").strip() + + if t == "local": + return _MountedEndpoint(local_path=loc) + + if t != "smb": + raise RemoteMountError(f"Unsupported endpoint type: {t}") + + base = Path(tempfile.gettempdir()) / "replicator" / "mounts" / (str(job_id or "new")) / role + base.mkdir(parents=True, exist_ok=True) + mount_point = str(base) + + # Build auth/options payload (do NOT log secrets) + auth: dict[str, Any] = {} + if isinstance(endpoint.get("auth"), dict): + auth.update(endpoint.get("auth") or {}) + + # Backward-compatible: endpoints table columns may be stored at top-level + for k in ("port", "guest", "username", "password", "useKey", "sshKey", "options"): + if k in endpoint and endpoint.get(k) is not None and k not in auth: + auth[k] = endpoint.get(k) + + # Build ShareAuth object + share_auth = ShareAuth( + username=auth.get("username"), + password=auth.get("password"), + domain=auth.get("domain") or auth.get("workgroup"), + ) + + # Parse host and remote from location (SMB only) + host, remote = _parse_smb_location(loc) + + # Determine port and options + port = None + try: + if auth.get("port") is not None: + port = int(auth.get("port")) + except Exception: + port = None + if port is None: + port = 445 + + opts = {} + if isinstance(auth.get("options"), dict): + opts.update(auth.get("options") or {}) + + # Emit a safe debug line (mask secrets, include host/remote) + safe_auth = dict(auth) + for k in ("password", "pass"): + if k in safe_auth and safe_auth[k]: + safe_auth[k] = "***" + log_append( + f"[Replicator][Share] mount protocol={t} host={host} remote={remote} mount_point={mount_point} auth={safe_auth}", + level="debug", + ) + + # Call Share.mount with correct signature + share.mount( + t, + host, + remote, + mount_point, + auth=share_auth, + port=port, + options=opts, + read_only=bool(auth.get("read_only") or auth.get("ro") or False), + elevate=False, + timeout=60, + ) + + return _MountedEndpoint(local_path=mount_point, mount_point=mount_point, share=share) + + +class Replicator(QMainWindow): + + # ------------------------------------------------------------------ + # Initialization + # ------------------------------------------------------------------ + + def __init__( + self, + helper: Optional[Helper] = None, + configuration: Optional[Configuration] = None, + logger: Optional[Log] = None, + ): + super().__init__() + + self._app = QApplication.instance() + if self._app is None: + raise RuntimeError("Replicator must be created after QApplication/Application.") + + helper = helper or getattr(self._app, "helper", None) + configuration = configuration or getattr(self._app, "configuration", None) + logger = logger or getattr(self._app, "logger", None) + + if helper is None or configuration is None: + raise RuntimeError("Replicator requires corePY Helper + Configuration.") + + self._helper: Helper = helper + self._configuration: Configuration = configuration + self._logger: Optional[Log] = logger + + self._fs = FileSystem(helper=self._helper, logger=self._logger) + + # --- Database path setup --- + # Use Helper.get_cwd() if present, else os.getcwd() + if hasattr(self._helper, "get_cwd") and callable(getattr(self._helper, "get_cwd", None)): + base_dir = self._helper.get_cwd() + else: + base_dir = os.getcwd() + data_dir = os.path.join(base_dir, "data") + db_path = os.path.join(data_dir, "replicator.db") + + # --- Database (corePY SQLite wrapper) + migrations --- + self._db = SQLite(db_path=db_path) + self._migration = Migration(self._db, logger=self._logger) + self._migration.ensure() + + self._store = JobStore(self._db) + + self._log(f"[Replicator] Database: {db_path}", level="debug") + self._log("[Replicator] Database migrations applied.", level="info") + + # Domain jobs (Job objects) + self._jobs: List[Job] = [] + self._table: Optional[QTableWidget] = None + + # After DB is ready, log a snapshot of DB layout/counts (non-verbose) + self._db_debug_snapshot(verbose=False) + + # ------------------------------------------------------------------ + # Scheduler (service mode) + # ------------------------------------------------------------------ + + def _parse_iso_dt(self, s: Any) -> Optional[datetime]: + if not s: + return None + try: + dt = datetime.fromisoformat(str(s)) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except Exception: + return None + + def _window_allows_now_local(self, windows: Dict[str, Any], now_local: datetime) -> bool: + """ + Determine if 'now_local' falls inside an allowed window for its weekday. + Windows are interpreted as LOCAL times (America/Montreal via system tz). + Supports overnight windows like 22:00-06:00. + If windows is empty/missing => allowed. + """ + if not windows or not isinstance(windows, dict): + return True + + wd = now_local.weekday() # 0..6 (Mon..Sun) + day_windows = windows.get(str(wd)) or windows.get(wd) + if not isinstance(day_windows, list) or not day_windows: + return False + + tnow = now_local.time().replace(tzinfo=None) + + def _parse_hhmm(val: Any) -> Optional[tuple[int, int]]: + try: + parts = str(val).strip().split(":") + if len(parts) != 2: + return None + hh = int(parts[0]); mm = int(parts[1]) + if hh < 0 or hh > 23 or mm < 0 or mm > 59: + return None + return hh, mm + except Exception: + return None + + for w in day_windows: + if not isinstance(w, dict): + continue + s = w.get("start"); e = w.get("end") + if not s or not e: + continue + ps = _parse_hhmm(s); pe = _parse_hhmm(e) + if ps is None or pe is None: + continue + sh, sm = ps; eh, em = pe + ts = datetime(2000, 1, 1, sh, sm).time() + te = datetime(2000, 1, 1, eh, em).time() + + if ts <= te: + if ts <= tnow <= te: + return True + else: + # overnight + if tnow >= ts or tnow <= te: + return True + + return False + + def _interval_seconds_for_schedule_row(self, sched_row: Dict[str, Any], now_local: datetime) -> int: + """ + Return interval seconds for this schedule row. + Priority: + 1) schedule.intervalSeconds column + 2) windows[weekday][0].intervalSeconds + 3) service.defaultInterval + """ + default_iv = self._default_interval_seconds() + + try: + col = sched_row.get("intervalSeconds") + if col is not None: + iv = int(col or 0) + if iv > 0: + return iv + except Exception: + pass + + try: + windows = sched_row.get("windows") or {} + if isinstance(windows, str): + windows = json.loads(windows) if windows else {} + if isinstance(windows, dict): + wd = now_local.weekday() + day = windows.get(str(wd)) or windows.get(wd) + if isinstance(day, list) and day and isinstance(day[0], dict): + v = day[0].get("intervalSeconds") + if v is not None: + iv = int(v or 0) + if iv > 0: + return iv + except Exception: + pass + + return default_iv + + def _should_run_scheduled(self, job: Job, *, now_utc: datetime, now_local: datetime) -> bool: + """ + Decide if a job should run in SERVICE/SCHEDULED mode. + Manual 'Run now' intentionally bypasses this. + """ + if not job.id: + return False + + sched = self._db.one( + "SELECT enabled, nextRunAt, lastScheduledRunAt, windows, intervalSeconds FROM schedule WHERE jobId=?", + (int(job.id),), + ) + if not sched: + # No schedule row => do not auto-run (manual run still works) + return False + + if not bool(sched.get("enabled", 0)): + return False + + # windows are local-time based + windows: Dict[str, Any] = {} + try: + raw = sched.get("windows") + if raw: + windows = json.loads(raw) if isinstance(raw, str) else (raw if isinstance(raw, dict) else {}) + except Exception: + windows = {} + + if not self._window_allows_now_local(windows, now_local): + return False + + # Respect nextRunAt if present + next_dt = self._parse_iso_dt(sched.get("nextRunAt")) + if next_dt and now_utc < next_dt: + return False + + interval_s = self._interval_seconds_for_schedule_row(sched, now_local) + last_dt = self._parse_iso_dt(sched.get("lastScheduledRunAt")) + + if last_dt is None: + # first scheduled run in an allowed window + return True + + elapsed = (now_utc - last_dt).total_seconds() + return elapsed >= float(interval_s) + # Bidirectional sync helpers (local-only for now) + # ------------------------------------------------------------------ + + def _endpoint_local_root(self, ep: Dict[str, Any]) -> str: + """Return the local root path for an endpoint or raise. + This is a simple extractor for the already-mounted path.""" + if not isinstance(ep, dict): + raise ValueError("Endpoint is missing") + loc = (ep.get("location") or "").strip() + if not loc: + raise ValueError("Endpoint location is required") + return loc + + def _scan_local_tree(self, root: str) -> Dict[str, Dict[str, Any]]: + """Return a map relPath -> {isDir,size,mtime} for a local filesystem root.""" + root = os.path.abspath(root) + out: Dict[str, Dict[str, Any]] = {} + if not os.path.exists(root): + return out + + # Walk directories; include dirs as entries so deletions can be propagated. + for dirpath, dirnames, filenames in os.walk(root): + # Normalize and skip hidden special entries if needed (keep simple for now) + rel_dir = os.path.relpath(dirpath, root) + if rel_dir == ".": + rel_dir = "" + + # Record directory itself (except root) + if rel_dir: + try: + st = os.stat(dirpath) + out[rel_dir] = { + "isDir": True, + "size": 0, + "mtime": int(st.st_mtime), + } + except Exception: + # If stat fails, still record directory + out[rel_dir] = {"isDir": True, "size": 0, "mtime": 0} + + for fn in filenames: + full = os.path.join(dirpath, fn) + rel = os.path.relpath(full, root) + try: + st = os.stat(full) + out[rel] = { + "isDir": False, + "size": int(st.st_size), + "mtime": int(st.st_mtime), + } + except Exception: + out[rel] = {"isDir": False, "size": 0, "mtime": 0} + + return out + + def _load_prev_file_state(self, job_id: int, side: str) -> Dict[str, Dict[str, Any]]: + rows = self._db.query( + "SELECT relPath, size, mtime, isDir, deleted, deletedAt, lastSeenAt, lastSeenRunId FROM file_state WHERE jobId=? AND side=?", + (job_id, side), + ) + prev: Dict[str, Dict[str, Any]] = {} + for r in rows or []: + prev[str(r.get("relPath"))] = { + "size": int(r.get("size") or 0), + "mtime": int(r.get("mtime") or 0), + "isDir": bool(r.get("isDir")), + "deleted": bool(r.get("deleted")), + "deletedAt": r.get("deletedAt"), + "lastSeenAt": r.get("lastSeenAt"), + "lastSeenRunId": r.get("lastSeenRunId"), + } + return prev + + def _persist_file_state(self, job_id: int, side: str, cur: Dict[str, Dict[str, Any]], run_id: Optional[int]) -> None: + # Use wrapper transaction; no raw connection needed. + now_iso = datetime.now(timezone.utc).isoformat() + with self._db.transaction(): + # Upsert all current entries + for rel, meta in cur.items(): + is_dir = 1 if meta.get("isDir") else 0 + size = int(meta.get("size", 0) or 0) + mtime = int(meta.get("mtime", 0) or 0) + + exists = self._db.scalar( + "SELECT id FROM file_state WHERE jobId=? AND side=? AND relPath=?", + (job_id, side, rel), + ) + if exists: + self._db.execute( + "UPDATE file_state SET size=?, mtime=?, isDir=?, deleted=0, deletedAt=NULL, lastSeenAt=?, lastSeenRunId=? WHERE jobId=? AND side=? AND relPath=?", + (size, mtime, is_dir, now_iso, run_id, job_id, side, rel), + ) + else: + self._db.execute( + "INSERT INTO file_state (jobId, side, relPath, size, mtime, isDir, deleted, deletedAt, lastSeenAt, lastSeenRunId) VALUES (?,?,?,?,?,?,0,NULL,?,?)", + (job_id, side, rel, size, mtime, is_dir, now_iso, run_id), + ) + + # Mark missing as deleted (single UPDATE) + rels = list(cur.keys()) + if rels: + placeholders = ",".join(["?"] * len(rels)) + self._db.execute( + f"UPDATE file_state SET deleted=1, deletedAt=COALESCE(deletedAt, ?) WHERE jobId=? AND side=? AND deleted=0 AND relPath NOT IN ({placeholders})", + (now_iso, job_id, side, *rels), + ) + else: + # cur is empty: mark all as deleted for this job/side + self._db.execute( + "UPDATE file_state SET deleted=1, deletedAt=COALESCE(deletedAt, ?) WHERE jobId=? AND side=? AND deleted=0", + (now_iso, job_id, side), + ) + + def _ensure_parent_dir(self, path: str) -> None: + parent = os.path.dirname(path) + if parent and not os.path.exists(parent): + os.makedirs(parent, exist_ok=True) + + def _copy_local_path(self, src_root: str, dst_root: str, rel: str, is_dir: bool, preserve_metadata: bool) -> None: + src_full = os.path.join(src_root, rel) + dst_full = os.path.join(dst_root, rel) + + if is_dir: + os.makedirs(dst_full, exist_ok=True) + return + + self._ensure_parent_dir(dst_full) + if preserve_metadata: + shutil.copy2(src_full, dst_full) + else: + shutil.copy(src_full, dst_full) + + def _delete_local_path(self, root: str, rel: str, is_dir: bool) -> None: + full = os.path.join(root, rel) + if not os.path.exists(full): + return + if is_dir: + # Only remove if empty; never rmtree blindly in sync engine. + try: + os.rmdir(full) + except OSError: + pass + else: + try: + os.remove(full) + except Exception: + pass + + def _record_conflict( + self, + job_id: int, + run_id: Optional[int], + rel: str, + a: Dict[str, Any], + b: Dict[str, Any], + note: str = "", + ) -> None: + try: + # Avoid inserting duplicate open conflicts for the same path/note. + existing = self._db.one( + "SELECT id FROM conflicts WHERE jobId=? AND relPath=? AND status='open' AND COALESCE(note,'')=COALESCE(?, '') ORDER BY id DESC LIMIT 1", + (job_id, rel, note or ""), + ) + if existing: + return + self._db.execute( + """ + INSERT INTO conflicts (jobId, runId, relPath, a_size, a_mtime, a_hash, b_size, b_mtime, b_hash, status, note) + VALUES (?,?,?,?,?,?,?,?,?,'open',?) + """, + ( + job_id, + run_id, + rel, + int(a.get("size", 0) or 0), + int(a.get("mtime", 0) or 0), + a.get("hash"), + int(b.get("size", 0) or 0), + int(b.get("mtime", 0) or 0), + b.get("hash"), + note or None, + ), + ) + except Exception as e: + self._log(f"[Replicator][DB] Failed to record conflict for '{rel}': {e}", level="warning") + + def _run_job_bidirectional(self, job: Dict[str, Any], run_id: Optional[int]) -> tuple[bool, Dict[str, Any]]: + """Bidirectional sync using `file_state` as the persistent baseline. + + Currently implemented for local<->local only. + """ + job_id = int(job.get("id") or 0) + if job_id <= 0: + raise ValueError("Job must have an id to run bidirectional sync") + + src_ep = job.get("sourceEndpoint") + dst_ep = job.get("targetEndpoint") + a_root = self._endpoint_local_root(src_ep) + b_root = self._endpoint_local_root(dst_ep) + + allow_deletion = bool(job.get("allowDeletion", False)) + preserve_metadata = bool(job.get("preserveMetadata", True)) + conflict_policy = (job.get("conflictPolicy") or "newest").lower() + + # Load previous baseline before scanning/persisting. + prev_a = self._load_prev_file_state(job_id, "A") + prev_b = self._load_prev_file_state(job_id, "B") + + # Scan current trees + cur_a = self._scan_local_tree(a_root) + cur_b = self._scan_local_tree(b_root) + + def _meta_changed(cur: Dict[str, Any], prev: Dict[str, Any]) -> bool: + return ( + bool(cur.get("isDir")) != bool(prev.get("isDir")) + or int(cur.get("size", 0) or 0) != int(prev.get("size", 0) or 0) + or int(cur.get("mtime", 0) or 0) != int(prev.get("mtime", 0) or 0) + ) + + def _changed_set(cur: Dict[str, Dict[str, Any]], prev: Dict[str, Dict[str, Any]]) -> set[str]: + out = set() + for rel, meta in cur.items(): + p = prev.get(rel) + if p is None or p.get("deleted", False): + out.add(rel) + else: + if _meta_changed(meta, p): + out.add(rel) + return out + + def _deleted_set(cur: Dict[str, Dict[str, Any]], prev: Dict[str, Dict[str, Any]]) -> set[str]: + # deleted since last baseline means it existed (not deleted) and now missing + out = set() + for rel, p in prev.items(): + if p.get("deleted", False): + continue + if rel not in cur: + out.add(rel) + return out + + changed_a = _changed_set(cur_a, prev_a) + changed_b = _changed_set(cur_b, prev_b) + deleted_a = _deleted_set(cur_a, prev_a) + deleted_b = _deleted_set(cur_b, prev_b) + + self._log( + f"[Replicator][BiDi] Snapshot A={len(cur_a)} entries, B={len(cur_b)} entries; changedA={len(changed_a)} changedB={len(changed_b)} deletedA={len(deleted_a)} deletedB={len(deleted_b)}", + level="debug", + ) + + # Build unified rel set + all_paths = set(cur_a.keys()) | set(cur_b.keys()) | set(prev_a.keys()) | set(prev_b.keys()) + + actions_copy_a_to_b: list[tuple[str, bool]] = [] + actions_copy_b_to_a: list[tuple[str, bool]] = [] + actions_del_a: list[tuple[str, bool]] = [] + actions_del_b: list[tuple[str, bool]] = [] + + # Helper for newest + def _winner_newest(a: Dict[str, Any], b: Dict[str, Any]) -> str: + am = int(a.get("mtime", 0) or 0) + bm = int(b.get("mtime", 0) or 0) + if am == bm: + # tie-breaker: larger size wins + return "A" if int(a.get("size", 0) or 0) >= int(b.get("size", 0) or 0) else "B" + return "A" if am > bm else "B" + + for rel in sorted(all_paths): + a_cur = cur_a.get(rel) + b_cur = cur_b.get(rel) + + a_exists = a_cur is not None + b_exists = b_cur is not None + + a_changed = rel in changed_a + b_changed = rel in changed_b + + a_deleted = rel in deleted_a + b_deleted = rel in deleted_b + + # If exists only on one side. + # When deletions are enabled and the missing side deleted it since the last baseline, + # deletion should win unless the existing side also changed (conflict). + if a_exists and not b_exists: + if b_deleted and allow_deletion: + if a_changed: + # conflict: B deleted while A changed + self._record_conflict(job_id, run_id, rel, a_cur, {"deleted": True}, note="B deleted, A changed") + winner = "A" if conflict_policy == "newest" else "A" + if winner == "A": + actions_copy_a_to_b.append((rel, bool(a_cur.get("isDir")))) + else: + actions_del_a.append((rel, bool(a_cur.get("isDir")))) + else: + # propagate deletion (keep B deleted, delete A) + actions_del_a.append((rel, bool(a_cur.get("isDir")))) + elif b_deleted and not allow_deletion: + # If B was deleted and deletions are not allowed, do NOT copy A->B (do nothing) + pass + else: + # no deletion involved: treat as create on A, copy to B + actions_copy_a_to_b.append((rel, bool(a_cur.get("isDir")))) + continue + + if b_exists and not a_exists: + if a_deleted and allow_deletion: + if b_changed: + # conflict: A deleted while B changed + self._record_conflict(job_id, run_id, rel, {"deleted": True}, b_cur, note="A deleted, B changed") + winner = "B" if conflict_policy == "newest" else "B" + if winner == "B": + actions_copy_b_to_a.append((rel, bool(b_cur.get("isDir")))) + else: + actions_del_b.append((rel, bool(b_cur.get("isDir")))) + else: + # propagate deletion (keep A deleted, delete B) + actions_del_b.append((rel, bool(b_cur.get("isDir")))) + elif a_deleted and not allow_deletion: + # If A was deleted and deletions are not allowed, do NOT copy B->A (do nothing) + pass + else: + actions_copy_b_to_a.append((rel, bool(b_cur.get("isDir")))) + continue + + # Missing on both sides: maybe deletion propagation; nothing to do. + if not a_exists and not b_exists: + continue + + # Exists on both: check if identical + if a_exists and b_exists: + same = ( + bool(a_cur.get("isDir")) == bool(b_cur.get("isDir")) + and int(a_cur.get("size", 0) or 0) == int(b_cur.get("size", 0) or 0) + and int(a_cur.get("mtime", 0) or 0) == int(b_cur.get("mtime", 0) or 0) + ) + + if same: + # Maybe propagate deletions if one side deleted previously (should not happen if same exists) + continue + + # Not same: detect changes since baseline + if a_changed and b_changed: + # true conflict + self._record_conflict(job_id, run_id, rel, a_cur, b_cur, note="Changed on both sides") + if conflict_policy == "newest": + winner = _winner_newest(a_cur, b_cur) + elif conflict_policy in ("keepa", "a"): + winner = "A" + elif conflict_policy in ("keepb", "b"): + winner = "B" + else: + winner = _winner_newest(a_cur, b_cur) + + if winner == "A": + actions_copy_a_to_b.append((rel, bool(a_cur.get("isDir")))) + else: + actions_copy_b_to_a.append((rel, bool(b_cur.get("isDir")))) + elif a_changed and not b_changed: + actions_copy_a_to_b.append((rel, bool(a_cur.get("isDir")))) + elif b_changed and not a_changed: + actions_copy_b_to_a.append((rel, bool(b_cur.get("isDir")))) + else: + # differs but we can't prove which changed vs baseline (e.g., first run with baseline empty) + # Choose newest as default. + winner = _winner_newest(a_cur, b_cur) + if winner == "A": + actions_copy_a_to_b.append((rel, bool(a_cur.get("isDir")))) + else: + actions_copy_b_to_a.append((rel, bool(b_cur.get("isDir")))) + + # Deletions propagation (safe rules) + if allow_deletion: + # If A deleted and B did not change since baseline, delete on B + for rel in sorted(deleted_a): + if rel in changed_b: + # conflict: deleted on A but changed on B + self._record_conflict(job_id, run_id, rel, {"deleted": True}, cur_b.get(rel, {}), note="A deleted, B changed") + continue + pb = prev_b.get(rel) + if pb and not pb.get("deleted", False) and rel in cur_b: + actions_del_b.append((rel, bool(cur_b[rel].get("isDir")))) + + # If B deleted and A did not change since baseline, delete on A + for rel in sorted(deleted_b): + if rel in changed_a: + self._record_conflict(job_id, run_id, rel, cur_a.get(rel, {}), {"deleted": True}, note="B deleted, A changed") + continue + pa = prev_a.get(rel) + if pa and not pa.get("deleted", False) and rel in cur_a: + actions_del_a.append((rel, bool(cur_a[rel].get("isDir")))) + + # Execute actions + try: + # Copy directories first to ensure parents exist + for rel, is_dir in actions_copy_a_to_b: + self._copy_local_path(a_root, b_root, rel, is_dir, preserve_metadata) + for rel, is_dir in actions_copy_b_to_a: + self._copy_local_path(b_root, a_root, rel, is_dir, preserve_metadata) + + # Then copy files (copy method handles both, but ordering helps for deep paths) + # (Already handled in _copy_local_path) + + # Deletions last + for rel, is_dir in actions_del_a: + self._delete_local_path(a_root, rel, is_dir) + for rel, is_dir in actions_del_b: + self._delete_local_path(b_root, rel, is_dir) + + except Exception as e: + self._log(f"[Replicator][BiDi] Execution failed: {e}", level="error") + return False, {} + + # After actions, scan again and persist final file state as baseline + final_a = self._scan_local_tree(a_root) + final_b = self._scan_local_tree(b_root) + self._persist_file_state(job_id, "A", final_a, run_id) + self._persist_file_state(job_id, "B", final_b, run_id) + + stats = { + "copyAtoB": len(actions_copy_a_to_b), + "copyBtoA": len(actions_copy_b_to_a), + "delA": len(actions_del_a), + "delB": len(actions_del_b), + "changedA": len(changed_a), + "changedB": len(changed_b), + "deletedA": len(deleted_a), + "deletedB": len(deleted_b), + } + self._log(f"[Replicator][BiDi] Actions: {stats}", level="debug") + + # Update run stats if possible + if run_id: + try: + self._db.execute( + "UPDATE runs SET stats=? WHERE id=?", + (json.dumps(stats), run_id), + ) + except Exception: + pass + + return True, stats + + # ------------------------------------------------------------------ + # CLI integration + # ------------------------------------------------------------------ + + def cli(self, cli: QApplication = None) -> None: + """ + Called by corePY CommandLine when running in CLI mode. + """ + if cli is None: + return + cli.add("run", "Run all replication jobs.", self.run) + cli.service.add("run", "Run all replication jobs", self.run_scheduled, interval=1) + + # ------------------------------------------------------------------ + # UI + # ------------------------------------------------------------------ + + def show(self): + self.init() + super().show() + + def init(self): + self.setWindowTitle(getattr(self._app, "name", "Replicator")) + self.setObjectName("Replicator") + # Ensure minimum window width for table visibility + self.setMinimumWidth(800) + + central = QWidget(self) + self.setCentralWidget(central) + root = QVBoxLayout(central) + root.setContentsMargins(16, 16, 16, 16) + root.setSpacing(12) + + # Logo (top, centered) + logo = QLabel() + logo.setAlignment(Qt.AlignCenter) + + # Use app icon/logo if you have one; otherwise harmless + # Adjust path to wherever you store icons in Replicator + candidate = self._helper.get_path("icons/icon.png") or self._helper.get_path("core/icons/info.svg") + name = getattr(self._app, "name", None) or self._app.applicationName() + if candidate and self._helper.file_exists(candidate) and candidate.lower().endswith(".png"): + pm = QPixmap(candidate) + if not pm.isNull(): + logo.setPixmap(pm.scaled(256, 256, Qt.KeepAspectRatio, Qt.SmoothTransformation)) + else: + logo.setText(name) + logo.setStyleSheet("font-size: 32px; font-weight: 200; opacity: 0.8; padding: 0px; margin: 0px;") + + root.addWidget(logo) + + # Application name (under logo) + app_name = QLabel() + app_name.setText(str(name) if name else "") + app_name.setAlignment(Qt.AlignCenter) + app_name.setStyleSheet("font-size: 32px; font-weight: 200; opacity: 0.8; padding: 0px; margin: 0px;") + root.addWidget(app_name) + + # --- Actions row (top, after logo) --- + actions_row = QHBoxLayout() + # No stretch at start; left-aligned by default + self._add_btn = Form.button(label="", icon="plus-circle", action=self._add_job) + self._add_btn.setToolTip("Add job") + self._edit_btn = Form.button(label="", icon="pencil-square", action=self._edit_job) + self._edit_btn.setToolTip("Edit job") + self._edit_btn.setVisible(False) + self._dup_btn = Form.button(label="", icon="files", action=self._duplicate_job) + self._dup_btn.setToolTip("Duplicate job") + self._dup_btn.setVisible(False) + self._del_btn = Form.button(label="", icon="trash", action=self._delete_job) + self._del_btn.setToolTip("Delete job") + self._del_btn.setVisible(False) + self._schedule_btn = Form.button(label="", icon="calendar-event", action=self._edit_schedule) + self._schedule_btn.setToolTip("Schedule") + self._schedule_btn.setVisible(False) + actions_row.addWidget(self._add_btn) + actions_row.addWidget(self._edit_btn) + actions_row.addWidget(self._dup_btn) + actions_row.addWidget(self._del_btn) + actions_row.addWidget(self._schedule_btn) + actions_row.addStretch(1) + root.addLayout(actions_row) + + # Table + self._table = QTableWidget(0, 6, self) + self._table.setHorizontalHeaderLabels([ + "Name", "Enabled", "Source", "Target", "Last run", "Result" + ]) + self._table.setSelectionBehavior(QTableWidget.SelectRows) + self._table.setEditTriggers(QTableWidget.NoEditTriggers) + + # Hide row numbers + self._table.verticalHeader().setVisible(False) + + # Connect selection change to update visibility of action buttons + self._table.selectionModel().selectionChanged.connect(self._on_selection_changed) + + # Ensure columns never shrink below their content (use scrollbar for overflow) + header = self._table.horizontalHeader() + for i in range(self._table.columnCount()): + header.setSectionResizeMode(i, QHeaderView.ResizeToContents) + + # Keep horizontal scrollbar available for overflow + header.setStretchLastSection(False) + + root.addWidget(self._table) + + # --- Bottom row: Configurator and Run now (right aligned) --- + bottom_row = QHBoxLayout() + bottom_row.addStretch(1) + config_btn = Form.button(label="", icon="gear-fill", action=self._configuration.show) + config_btn.setToolTip("Configurator") + run_btn = Form.button(label="", icon="play-fill", action=lambda: self._run_with_ui_feedback()) + run_btn.setToolTip("Run now") + bottom_row.addWidget(config_btn) + bottom_row.addWidget(run_btn) + root.addLayout(bottom_row) + + self._reload_jobs() + self._on_selection_changed() + + + def _on_selection_changed(self, *_args): + has = self._selected_index() >= 0 + if hasattr(self, "_edit_btn"): + self._edit_btn.setVisible(has) + if hasattr(self, "_dup_btn"): + self._dup_btn.setVisible(has) + if hasattr(self, "_del_btn"): + self._del_btn.setVisible(has) + if hasattr(self, "_schedule_btn"): + self._schedule_btn.setVisible(has) + + def _selected_index(self) -> int: + if not self._table: + return -1 + rows = self._table.selectionModel().selectedRows() + if not rows: + return -1 + return rows[0].row() + + def _reload_jobs(self): + self._jobs = self._store.fetch_all() + self._refresh_table() + + def _save_jobs(self): + return + + def _refresh_table(self): + if not self._table: + return + self._table.setRowCount(0) + + for job in self._jobs: + r = self._table.rowCount() + self._table.insertRow(r) + + def _it(v: Any) -> QTableWidgetItem: + item = QTableWidgetItem(str(v) if v is not None else "") + item.setFlags(item.flags() ^ Qt.ItemIsEditable) + return item + + self._table.setItem(r, 0, _it(job.name)) + self._table.setItem(r, 1, _it("On" if job.enabled else "Off")) + + src_str = f"{job.sourceEndpoint.type}:{job.sourceEndpoint.location}" + tgt_str = f"{job.targetEndpoint.type}:{job.targetEndpoint.location}" + self._table.setItem(r, 2, _it(src_str)) + self._table.setItem(r, 3, _it(tgt_str)) + + last_run = job.lastRun + self._table.setItem(r, 4, _it(last_run if last_run else "Never")) + + self._table.setItem(r, 5, _it(job.lastResult or "")) + + def _default_interval_seconds(self) -> int: + """Return the configured default interval in seconds (service.defaultInterval), fallback to 3600.""" + try: + v = self._configuration.get("service.defaultInterval", 3600) + iv = int(v) + return iv if iv > 0 else 3600 + except Exception: + return 3600 + + def _default_schedule_dict(self) -> Dict[str, Any]: + interval = self._default_interval_seconds() + windows: Dict[str, Any] = {} + for wd in range(7): + windows[str(wd)] = [{"start": "00:00", "end": "23:59", "intervalSeconds": interval}] + return { + "enabled": True, + "intervalSeconds": interval, + "windows": windows, + } + + def _schedule_interval_seconds_for_now(self, sched: Dict[str, Any]) -> int: + """Best-effort interval seconds for logging (supports per-day interval stored in window dicts).""" + try: + windows = sched.get("windows") if isinstance(sched.get("windows"), dict) else {} + wd = datetime.now().astimezone().weekday() + day = windows.get(str(wd)) or windows.get(wd) # type: ignore[index] + if isinstance(day, list) and day and isinstance(day[0], dict): + v = day[0].get("intervalSeconds") + if v is not None: + return int(v) + except Exception: + pass + + default_interval = self._default_interval_seconds() + + try: + if "intervalSeconds" in sched: + v = int(sched.get("intervalSeconds") or 0) + return v if v > 0 else default_interval + except Exception: + pass + + return default_interval + + def _job_from_legacy_dict(self, d: Dict[str, Any], *, existing_id: Optional[int] = None) -> Job: + src = d.get("sourceEndpoint") or {"type": "local", "location": d.get("source", ""), "auth": {}} + tgt = d.get("targetEndpoint") or {"type": "local", "location": d.get("target", ""), "auth": {}} + + # Schedule defaults: + # - enabled + # - every day, all day + # - interval 3600 seconds (1h) + sched = d.get("schedule") + if not isinstance(sched, dict): + sched = self._default_schedule_dict() + + default_interval = self._default_interval_seconds() + + # Normalize intervalSeconds (no everyMinutes support) + try: + iv = int(sched.get("intervalSeconds") or 0) + except Exception: + iv = 0 + if iv <= 0: + iv = default_interval + sched["intervalSeconds"] = iv + + # Ensure windows exist; if missing, make it "every day, all day" with per-day interval + windows = sched.get("windows") + if not isinstance(windows, dict) or not windows: + windows = {} + for wd in range(7): + windows[str(wd)] = [{"start": "00:00", "end": "23:59", "intervalSeconds": iv}] + sched["windows"] = windows + else: + # Ensure each enabled day window includes intervalSeconds (per-day) for the upcoming scheduler logic + for k, v in list(windows.items()): + if isinstance(v, list) and v and isinstance(v[0], dict): + if "intervalSeconds" not in v[0]: + v[0]["intervalSeconds"] = iv + + j = Job( + id=existing_id, + name=str(d.get("name") or ""), + enabled=bool(d.get("enabled", True)), + mode=str(d.get("mode") or "mirror"), + direction=str(d.get("direction") or "unidirectional"), + allowDeletion=bool(d.get("allowDeletion", False)), + preserveMetadata=bool(d.get("preserveMetadata", True)), + conflictPolicy=str(d.get("conflictPolicy") or "newest"), + pairId=d.get("pairId"), + lastRun=d.get("lastRun"), + lastResult=d.get("lastResult"), + lastError=d.get("lastError"), + ) + + j.sourceEndpoint = Endpoint( + type=str(src.get("type") or "local"), + location=str(src.get("location") or ""), + auth=dict(src.get("auth") or {}), + ) + j.targetEndpoint = Endpoint( + type=str(tgt.get("type") or "local"), + location=str(tgt.get("location") or ""), + auth=dict(tgt.get("auth") or {}), + ) + + windows = sched.get("windows") if isinstance(sched.get("windows"), dict) else {} + + j.schedule = Schedule( + enabled=bool(sched.get("enabled", True)), + intervalSeconds=int(iv), + windows=windows, + ) + + return j + + def _add_job(self): + # Pass configured service.defaultInterval into the dialog so Schedule defaults match user config + dlg = JobDialog(self, default_interval_seconds=self._default_interval_seconds()) + if dlg.exec_() == QDialog.Accepted: + new_job_dict = dlg.value() + if "schedule" not in new_job_dict or not isinstance(new_job_dict.get("schedule"), dict): + new_job_dict["schedule"] = self._default_schedule_dict() + job_obj = self._job_from_legacy_dict(new_job_dict) + job_id = self._store.upsert(job_obj) + job_obj.id = job_id + self._reload_jobs() + + def _edit_job(self): + idx = self._selected_index() + if idx < 0: + MsgBox.show(self, "Jobs", "Select a job to edit.", icon="info") + return + + current = self._jobs[idx] + # Pass configured service.defaultInterval into the dialog so Schedule defaults remain consistent + dlg = JobDialog(self, current.to_legacy_dict(), default_interval_seconds=self._default_interval_seconds()) + if dlg.exec_() == QDialog.Accepted: + updated_dict = dlg.value() + job_obj = self._job_from_legacy_dict(updated_dict, existing_id=current.id) + self._store.upsert(job_obj) + self._reload_jobs() + + def _duplicate_job(self): + idx = self._selected_index() + if idx < 0: + MsgBox.show(self, "Jobs", "Select a job to duplicate.", icon="info") + return + + orig = self._jobs[idx] + d = orig.to_legacy_dict() + d.pop("id", None) + d["name"] = f"{orig.name} (copy)" if orig.name else "Copy" + + job_obj = self._job_from_legacy_dict(d) + self._store.upsert(job_obj) + self._reload_jobs() + + if self._table: + # Select the newest row (best-effort) + self._table.selectRow(self._table.rowCount() - 1) + + def _delete_job(self): + idx = self._selected_index() + if idx < 0: + MsgBox.show(self, "Jobs", "Select a job to delete.", icon="info") + return + + job = self._jobs[idx] + choice = MsgBox.show( + self, + title="Delete", + message=f"Delete job '{job.name}'?", + icon="question", + buttons=("Cancel", "Delete"), + default="Cancel", + ) + if choice == "Delete": + if job.id: + self._store.delete(int(job.id)) + self._reload_jobs() + + def _edit_schedule(self): + idx = self._selected_index() + if idx < 0: + MsgBox.show(self, "Jobs", "Select a job to edit schedule.", icon="info") + return + + current = self._jobs[idx] + # Pass configured service.defaultInterval into the schedule editor so defaults match user config + dlg = ScheduleDialog(self, job=current.to_legacy_dict(), default_interval_seconds=self._default_interval_seconds()) + if dlg.exec_() == QDialog.Accepted: + d = current.to_legacy_dict() + d["schedule"] = dlg.value() + job_obj = self._job_from_legacy_dict(d, existing_id=current.id) + self._store.upsert(job_obj) + self._reload_jobs() + + def _run_with_ui_feedback(self): + ok = self.run() + MsgBox.show( + self, + title="Replicator", + message="Replication completed successfully." if ok else "Replication finished with errors.", + icon="info" if ok else "warning", + ) + + # ------------------------------------------------------------------ + # Execution + # ------------------------------------------------------------------ + + def run_scheduled(self) -> bool: + """ + Scheduled/service execution entrypoint. + Respects per-job schedule (enabled/windows/interval). + Returns True if all eligible jobs succeeded. + """ + self._reload_jobs() + + verbose_db = False + try: + verbose_db = bool(self._configuration.get("log.verbose", False)) + except Exception: + verbose_db = False + self._db_debug_snapshot(verbose=verbose_db) + + if not self._jobs: + self._log("[Replicator] No jobs configured.", level="warning") + return False + + now_utc = datetime.now(timezone.utc) + now_local = now_utc.astimezone() # interpret windows as local times + + any_ran = False + all_ok = True + + for job in self._jobs: + if not bool(job.enabled): + continue + + if not self._should_run_scheduled(job, now_utc=now_utc, now_local=now_local): + continue + + any_ran = True + ok = self._run_job(job) + all_ok = all_ok and ok + + # Update schedule bookkeeping for this job + try: + sched_row = self._db.one( + "SELECT enabled, nextRunAt, lastScheduledRunAt, windows, intervalSeconds FROM schedule WHERE jobId=?", + (int(job.id),), + ) or {} + interval_s = self._interval_seconds_for_schedule_row(sched_row, now_local) + next_run = (datetime.now(timezone.utc) + timedelta(seconds=int(interval_s))).isoformat() + self._db.update( + "schedule", + { + "lastScheduledRunAt": datetime.now(timezone.utc).isoformat(), + "nextRunAt": next_run, + }, + "jobId = :jobId", + {"jobId": int(job.id)}, + ) + except Exception: + pass + + if not any_ran: + self._log("[Replicator] No jobs due per schedule.", level="debug") + + return all_ok + + def run(self) -> bool: + """ + Run all configured jobs (from DB). + Returns True if all jobs succeeded. + """ + self._reload_jobs() + # After reloading jobs, log a DB snapshot for debugging. + # Only log verbose DB details when log.verbose is enabled. + verbose_db = False + try: + verbose_db = bool(self._configuration.get("log.verbose", False)) + except Exception: + verbose_db = False + + self._db_debug_snapshot(verbose=verbose_db) + jobs: List[Job] = self._jobs + if not jobs: + self._log("[Replicator] No jobs configured.", level="warning") + return False + + all_ok = True + for job in jobs: + if not bool(job.enabled): + self._log(f"[Replicator] Skipping disabled job '{job.name or 'Unnamed'}'.", level="info") + continue + ok = self._run_job(job) + all_ok = all_ok and ok + + # Run DB maintenance at most once per day. + try: + last = self._migration.get_meta("maintenance.lastRunAt") + should = True + if last: + try: + last_dt = datetime.fromisoformat(str(last)) + if last_dt.tzinfo is None: + last_dt = last_dt.replace(tzinfo=timezone.utc) + should = (datetime.now(timezone.utc) - last_dt).total_seconds() >= 86400 + except Exception: + should = True + if should: + self._db_maintenance() + except Exception: + pass + + return all_ok + + def _run_job(self, job: Job) -> bool: + name = job.name or "Unnamed" + if not bool(job.enabled): + self._log(f"[Replicator] Job '{name}' is disabled; skipping.", level="info") + return True + job_id = job.id + src = job.sourceEndpoint.location + src_type = job.sourceEndpoint.type + dst = job.targetEndpoint.location + dst_type = job.targetEndpoint.type + allow_deletion = bool(job.allowDeletion) + preserve_metadata = bool(job.preserveMetadata) + mode = job.mode + direction = job.direction + schedule = job.to_legacy_dict().get("schedule") or {} + + if not src or not dst: + self._log(f"[Replicator] Job '{name}' invalid: missing source/target.", level="error") + return False + + sched_str = "" + if isinstance(schedule, dict) and bool(schedule.get("enabled", True)): + interval_s = self._schedule_interval_seconds_for_now(schedule) + sched_str = f", schedule=Interval {interval_s}s" + logline = f"[Replicator] Running job '{name}': {src} -> {dst} (mode={mode}, direction={direction}, delete={allow_deletion}, meta={preserve_metadata}{sched_str}" + logline += f", srcType={src_type}, dstType={dst_type})" + self._log(logline) + + # Insert a run record at start + started_at = datetime.now(timezone.utc).isoformat() + run_id = None + bidi_stats = None + try: + run_id = None + if job_id: + try: + run_id = int(self._db.insert("runs", {"jobId": int(job_id), "startedAt": started_at, "result": "running"})) + except Exception as e: + self._log(f"[Replicator] Failed to insert run record: {e}", level="warning") + except Exception as e: + self._log(f"[Replicator] Failed to insert run record: {e}", level="warning") + + record_noop_runs = bool(self._configuration.get("db.recordNoopRuns", False)) + + ok = False + try: + if str(direction).lower() == "bidirectional": + # Mount remote endpoints (SMB only) via Share so the bidirectional engine can + # operate on local filesystem paths AND we get proper connectivity debug logs. + mounted: list[_MountedEndpoint] = [] + job_dict = job.to_legacy_dict() + + src_ep = job_dict.get("sourceEndpoint") or {"type": "local", "location": src, "auth": {}} + dst_ep = job_dict.get("targetEndpoint") or {"type": "local", "location": dst, "auth": {}} + + share_logger = _ShareLogger(self._log) + share = Share(logger=share_logger) + + src_m = _mount_endpoint_if_remote(src_ep, job_id, "source", share=share, log_append=self._log) + mounted.append(src_m) + dst_m = _mount_endpoint_if_remote(dst_ep, job_id, "target", share=share, log_append=self._log) + mounted.append(dst_m) + + # Local view for the bidirectional sync engine + job_dict["sourceEndpoint"] = {"type": "local", "location": src_m.local_path, "auth": {}} + job_dict["targetEndpoint"] = {"type": "local", "location": dst_m.local_path, "auth": {}} + + try: + ok, bidi_stats = self._run_job_bidirectional(job_dict, run_id) + finally: + for m in reversed(mounted): + try: + m.cleanup() + except Exception: + pass + else: + ok = self._fs.copy( + src, + dst, + preserve_metadata=preserve_metadata, + allow_deletion=allow_deletion, + ) + except NotImplementedError as e: + self._log(f"[Replicator] Job '{name}' not supported: {e}", level="error") + ok = False + except RemoteMountError as e: + self._log(f"[Replicator] Job '{name}' failed to mount remote endpoint: {e}", level="error") + ok = False + except Exception as e: + self._log(f"[Replicator] Job '{name}' failed: {e}", level="error") + ok = False + + # If this was a bidirectional run with no actions/changes, optionally drop the run row. + if run_id and bidi_stats is not None and not record_noop_runs: + try: + noop = ( + int(bidi_stats.get("copyAtoB", 0) or 0) == 0 + and int(bidi_stats.get("copyBtoA", 0) or 0) == 0 + and int(bidi_stats.get("delA", 0) or 0) == 0 + and int(bidi_stats.get("delB", 0) or 0) == 0 + and int(bidi_stats.get("changedA", 0) or 0) == 0 + and int(bidi_stats.get("changedB", 0) or 0) == 0 + and int(bidi_stats.get("deletedA", 0) or 0) == 0 + and int(bidi_stats.get("deletedB", 0) or 0) == 0 + ) + if noop: + self._db.execute("DELETE FROM runs WHERE id = ?", (run_id,)) + run_id = None + except Exception: + pass + + # Persist lastRun and lastResult, refresh UI, save to DB + now_str = datetime.now(timezone.utc).isoformat() + job.lastRun = now_str + job.lastResult = "ok" if ok else "fail" + job.lastError = None if ok else (job.lastError or "Failed") + self._store.upsert(job) + if run_id: + try: + self._db.update( + "runs", + {"endedAt": now_str, "result": ("ok" if ok else "fail"), "message": (None if ok else (job.lastError or "Failed"))}, + "id = :id", + {"id": int(run_id)}, + ) + except Exception as e: + self._log(f"[Replicator] Failed to update run record: {e}", level="warning") + if self._table: + self._reload_jobs() + + self._log(f"[Replicator] Job '{job.name}' result: {'OK' if ok else 'FAIL'} (lastResult={job.lastResult})", level="info" if ok else "warning") + return ok + + def _log(self, msg: str, level: str = "info", channel: str = "replicator") -> None: + if self._logger is not None and hasattr(self._logger, "append"): + self._logger.append(msg, level=level, channel=channel) # type: ignore[call-arg] + else: + print(msg) + + def _db_debug_snapshot(self, verbose: bool = False) -> None: + """ + Log a compact snapshot of the current DB layout + row counts. + If verbose=True, also logs the most recent rows for key tables. + """ + try: + # List tables + tables = [r["name"] for r in self._db.query("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name")] + + self._log(f"[Replicator][DB] Tables: {', '.join(tables) if tables else '(none)'}", level="debug") + + # Layout + counts + for t in tables: + try: + cols = self._db.query(f"PRAGMA table_info({t})") + col_names = [c.get("name") for c in cols] + count = int(self._db.scalar(f"SELECT COUNT(*) FROM {t}") or 0) + self._log(f"[Replicator][DB] {t}: columns={len(col_names)} rows={count}", level="debug") + if verbose: + self._log(f"[Replicator][DB] {t}: {', '.join(col_names)}", level="debug") + except Exception as e: + self._log(f"[Replicator][DB] Failed introspecting {t}: {e}", level="warning") + + if not verbose: + return + + for t, order_col in (("jobs", "id"), ("runs", "id"), ("schedule", "id"), ("conflicts", "id")): + if t in tables: + try: + rows = self._db.query(f"SELECT * FROM {t} ORDER BY {order_col} DESC LIMIT 3") + self._log(f"[Replicator][DB] {t}: latest {len(rows)} row(s)", level="debug") + for r in rows: + dr = dict(r) + for k in ("password", "pass", "sshKey", "key_file"): + if k in dr and dr[k]: + dr[k] = "***" + self._log(f"[Replicator][DB] {t}: {dr}", level="debug") + except Exception as e: + self._log(f"[Replicator][DB] Failed reading latest rows from {t}: {e}", level="warning") + + except Exception as e: + self._log(f"[Replicator][DB] Snapshot failed: {e}", level="warning") + + + + + def _db_maintenance(self) -> None: + """Prune old history and run lightweight SQLite maintenance. + + Safe defaults: + - Keep last N runs per job (db.retention.runs.keepLast) + - Also prune runs older than X days (db.retention.runs.keepDays) + - Keep open conflicts; prune resolved conflicts older than X days + - Optionally VACUUM (off by default) + + This is designed to be called periodically (e.g. daily) by the service loop. + """ + try: + keep_last = int(self._configuration.get("db.retention.runs.keepLast", 500)) + keep_days = int(self._configuration.get("db.retention.runs.keepDays", 30)) + prune_conflict_days = int(self._configuration.get("db.retention.conflicts.keepDays", 90)) + prune_deleted_state_days = int(self._configuration.get("db.retention.fileState.deletedKeepDays", 30)) + do_vacuum = bool(self._configuration.get("db.maintenance.vacuum", False)) + except Exception: + keep_last, keep_days = 500, 30 + prune_conflict_days, prune_deleted_state_days = 90, 30 + do_vacuum = False + + now = datetime.now(timezone.utc) + now_iso = now.isoformat() + + # Runs: keep last N per job + prune anything older than keep_days + try: + job_ids = [int(r.get("id")) for r in (self._db.query("SELECT id FROM jobs") or [])] + with self._db.transaction(): + for jid in job_ids: + if keep_last > 0: + self._db.execute( + """ + DELETE FROM runs + WHERE jobId = ? + AND id NOT IN ( + SELECT id FROM runs WHERE jobId = ? ORDER BY id DESC LIMIT ? + ) + """, + (jid, jid, keep_last), + ) + + if keep_days > 0: + self._db.execute( + "DELETE FROM runs WHERE julianday(created) < julianday('now', ?) ", + (f'-{keep_days} days',), + ) + except Exception as e: + self._log(f"[Replicator][DB] Maintenance: runs prune failed: {e}", level="warning") + + # Conflicts: keep all open; prune non-open older than prune_conflict_days + try: + if prune_conflict_days > 0: + with self._db.transaction(): + self._db.execute( + "DELETE FROM conflicts WHERE status <> 'open' AND julianday(created) < julianday('now', ?) ", + (f'-{prune_conflict_days} days',), + ) + except Exception as e: + self._log(f"[Replicator][DB] Maintenance: conflicts prune failed: {e}", level="warning") + + # file_state: prune deleted entries that have been deleted for a long time + try: + if prune_deleted_state_days > 0: + with self._db.transaction(): + self._db.execute( + "DELETE FROM file_state WHERE deleted = 1 AND deletedAt IS NOT NULL AND julianday(deletedAt) < julianday('now', ?) ", + (f'-{prune_deleted_state_days} days',), + ) + except Exception as e: + self._log(f"[Replicator][DB] Maintenance: file_state prune failed: {e}", level="warning") + + # SQLite maintenance: optimize; optionally vacuum + try: + self._db.execute("PRAGMA optimize;") + if do_vacuum: + self._db.execute("VACUUM;") + self._migration.set_meta("maintenance.lastRunAt", now_iso) + self._log("[Replicator][DB] Maintenance completed.", level="debug") + except Exception as e: + self._log(f"[Replicator][DB] Maintenance failed: {e}", level="warning") diff --git a/src/replicator/ui.py b/src/replicator/ui.py new file mode 100644 index 0000000..9457cab --- /dev/null +++ b/src/replicator/ui.py @@ -0,0 +1,775 @@ +#!/usr/bin/env python3 +# src/replicator/ui.py + +from __future__ import annotations + +from typing import Optional, Any, Dict, List, Tuple + +from PyQt5.QtCore import Qt, QTime, pyqtSignal +from PyQt5.QtWidgets import ( + QWidget, + QVBoxLayout, + QHBoxLayout, + QLabel, + QPushButton, + QDialog, + QFormLayout, + QLineEdit, + QCheckBox, + QComboBox, + QSpinBox, + QTextEdit, + QFrame, + QSizePolicy, + QLayout, + QTimeEdit, + QFileDialog, +) + +try: + from core.ui import MsgBox +except ImportError: + from ui import MsgBox + + +# ------------------------------------------------------------------ +# Helpers +# ------------------------------------------------------------------ +class BrowseLineEdit(QLineEdit): + """QLineEdit that emits a signal when clicked (used to open browse dialogs).""" + clicked = pyqtSignal() + + def mousePressEvent(self, event): + try: + self.clicked.emit() + except Exception: + pass + super().mousePressEvent(event) + + +# ------------------------------------------------------------------ +# Job Dialog +# ------------------------------------------------------------------ +class JobDialog(QDialog): + def __init__(self, parent=None, job: Optional[Dict[str, Any]] = None, *, default_interval_seconds: Optional[int] = None): + super().__init__(parent) + self.setWindowTitle("Replication Job") + self.setModal(True) + self.setFixedHeight(400) + self.setMinimumHeight(400) + self.setMinimumWidth(1200) + + job = job or {} + self._original_job = job + + # Default interval for new schedules (seconds). Provided by the app (e.g. Configuration service.defaultInterval). + try: + di = int(default_interval_seconds) if default_interval_seconds is not None else 3600 + except Exception: + di = 3600 + if di <= 0: + di = 3600 + self._default_interval_seconds = di + + layout = QVBoxLayout(self) + layout.setAlignment(Qt.AlignTop) + layout.setSizeConstraint(QLayout.SetMinimumSize) + + # ------------------------------ + # Name row + # ------------------------------ + name_row = QHBoxLayout() + name_row.addWidget(QLabel("Name")) + self.name = QLineEdit(job.get("name", "")) + name_row.addWidget(self.name, 1) + layout.addLayout(name_row) + + # Helper to get endpoint dict from job or fallback + def _get_endpoint(key_endpoint: str, key_str: str, default_type: str = "local") -> Dict[str, Any]: + ep = job.get(key_endpoint) + if isinstance(ep, dict): + return dict(ep) + return {"type": default_type, "location": job.get(key_str, ""), "auth": {}} + + self._source_ep = _get_endpoint("sourceEndpoint", "source", "local") + self._target_ep = _get_endpoint("targetEndpoint", "target", "local") + + # ------------------------------ + # Two-column Source / Destination + # ------------------------------ + cols = QHBoxLayout() + cols.setSpacing(20) + cols.setAlignment(Qt.AlignTop) + + src_frame = QFrame() + src_frame.setObjectName("EndpointFrame") + src_frame.setFrameShape(QFrame.NoFrame) + src_frame.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Maximum) + src_frame.setMinimumHeight(0) + src_frame.setStyleSheet( + "#EndpointFrame { border: 1px solid rgba(255,255,255,0.12); border-radius: 8px; padding: 10px; }" + ) + src_col = QVBoxLayout(src_frame) + src_col.setContentsMargins(10, 10, 10, 10) + src_col.setSpacing(8) + src_col.setAlignment(Qt.AlignTop) + + dst_frame = QFrame() + dst_frame.setObjectName("EndpointFrame") + dst_frame.setFrameShape(QFrame.NoFrame) + dst_frame.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Maximum) + dst_frame.setMinimumHeight(0) + dst_frame.setStyleSheet( + "#EndpointFrame { border: 1px solid rgba(255,255,255,0.12); border-radius: 8px; padding: 10px; }" + ) + dst_col = QVBoxLayout(dst_frame) + dst_col.setContentsMargins(10, 10, 10, 10) + dst_col.setSpacing(8) + dst_col.setAlignment(Qt.AlignTop) + + src_title = QLabel("Source") + src_title.setStyleSheet("font-weight: 600;") + dst_title = QLabel("Destination") + dst_title.setStyleSheet("font-weight: 600;") + + src_col.addWidget(src_title) + dst_col.addWidget(dst_title) + + ( + self._source_type_combo, + self._source_location_edit, + self._source_port_spin, + self._source_endpoint_row, + self._source_auth_widget, + self._source_auth_widgets, + ) = self._build_endpoint(existing=self._source_ep) + + ( + self._target_type_combo, + self._target_location_edit, + self._target_port_spin, + self._target_endpoint_row, + self._target_auth_widget, + self._target_auth_widgets, + ) = self._build_endpoint(existing=self._target_ep) + + src_col.addWidget(self._source_endpoint_row) + src_col.addWidget(self._source_auth_widget) + + dst_col.addWidget(self._target_endpoint_row) + dst_col.addWidget(self._target_auth_widget) + + cols.addWidget(src_frame, 1) + cols.addWidget(dst_frame, 1) + layout.addLayout(cols) + + # ------------------------------ + # Mode + Direction on same line + # ------------------------------ + self.mode = QComboBox() + self.mode.addItems(["mirror"]) + mode_val = job.get("mode", "mirror") + idx = self.mode.findText(mode_val) + if idx >= 0: + self.mode.setCurrentIndex(idx) + + self.direction = QComboBox() + self.direction.addItems(["unidirectional", "bidirectional"]) + direction_val = job.get("direction", "unidirectional") + idx = self.direction.findText(direction_val) + if idx >= 0: + self.direction.setCurrentIndex(idx) + + mode_dir_row = QHBoxLayout() + + mode_wrap = QWidget() + mode_wrap.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed) + mode_lay = QHBoxLayout(mode_wrap) + mode_lay.setContentsMargins(0, 0, 0, 0) + mode_lay.setSpacing(8) + mode_lay.addWidget(QLabel("Mode")) + mode_lay.addWidget(self.mode, 1) + + dir_wrap = QWidget() + dir_wrap.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed) + dir_lay = QHBoxLayout(dir_wrap) + dir_lay.setContentsMargins(0, 0, 0, 0) + dir_lay.setSpacing(8) + dir_lay.addWidget(QLabel("Direction")) + dir_lay.addWidget(self.direction, 1) + + mode_dir_row.addWidget(mode_wrap, 1) + mode_dir_row.addWidget(dir_wrap, 1) + layout.addLayout(mode_dir_row) + + # ------------------------------ + # Other options + # ------------------------------ + opts_row = QHBoxLayout() + self.allow_deletion = QCheckBox("Allow deletion") + self.allow_deletion.setChecked(bool(job.get("allowDeletion", False))) + self.preserve_metadata = QCheckBox("Preserve metadata") + self.preserve_metadata.setChecked(bool(job.get("preserveMetadata", True))) + self.enabled = QCheckBox("Enabled") + self.enabled.setChecked(bool(job.get("enabled", True))) + opts_row.addWidget(self.allow_deletion) + opts_row.addSpacing(16) + opts_row.addWidget(self.preserve_metadata) + opts_row.addWidget(self.enabled) + opts_row.addStretch(1) + + # Schedule button (UI stays here) + self._schedule_btn = QPushButton("Schedule…") + self._schedule_btn.clicked.connect(self._open_schedule) + opts_row.addWidget(self._schedule_btn) + + layout.addLayout(opts_row) + + # ------------------------------ + # Buttons + # ------------------------------ + btn_row = QHBoxLayout() + btn_row.addStretch(1) + cancel_btn = QPushButton("Cancel") + ok_btn = QPushButton("Save") + cancel_btn.clicked.connect(self.reject) + ok_btn.clicked.connect(self._on_ok) + btn_row.addWidget(cancel_btn) + btn_row.addWidget(ok_btn) + layout.addLayout(btn_row) + + # Wire type changes + self._source_type_combo.currentIndexChanged.connect(lambda _i: self._on_type_changed( + self._source_type_combo, + self._source_location_edit, + self._source_port_spin, + self._source_auth_widget, + self._source_auth_widgets, + )) + self._target_type_combo.currentIndexChanged.connect(lambda _i: self._on_type_changed( + self._target_type_combo, + self._target_location_edit, + self._target_port_spin, + self._target_auth_widget, + self._target_auth_widgets, + )) + + # Trigger initial state + self._on_type_changed( + self._source_type_combo, + self._source_location_edit, + self._source_port_spin, + self._source_auth_widget, + self._source_auth_widgets, + initial=True, + ) + self._on_type_changed( + self._target_type_combo, + self._target_location_edit, + self._target_port_spin, + self._target_auth_widget, + self._target_auth_widgets, + initial=True, + ) + + def _open_schedule(self) -> None: + job = self._original_job if isinstance(self._original_job, dict) else {} + dlg = ScheduleDialog(self, job=job, default_interval_seconds=self._default_interval_seconds) + if dlg.exec_() == QDialog.Accepted: + # store schedule back into original job dict so value() preserves it + if not isinstance(self._original_job, dict): + self._original_job = {} + self._original_job["schedule"] = dlg.value() + + # ------------------------------------------------------------------ + # Endpoint UI + # ------------------------------------------------------------------ + + def _build_endpoint(self, existing: Dict[str, Any]): + type_combo = QComboBox() + type_combo.addItems(["local", "smb"]) + idx = type_combo.findText(existing.get("type", "local")) + if idx >= 0: + type_combo.setCurrentIndex(idx) + if idx < 0: + type_combo.setCurrentIndex(0) + type_combo.setFixedWidth(110) + type_combo.setProperty("existing_type", existing.get("type", "local")) + type_combo.setProperty("existing_auth", existing.get("auth", {}) or {}) + + location_edit = BrowseLineEdit(existing.get("location", "")) + + port_spin = QSpinBox() + port_spin.setRange(1, 65535) + port_spin.setFixedWidth(110) + # Ports are not used for local/SMB endpoints. + port_spin.setValue(1) + port_spin.setVisible(False) + + endpoint_fields = QWidget() + fields_lay = QHBoxLayout(endpoint_fields) + fields_lay.setContentsMargins(0, 0, 0, 0) + fields_lay.setSpacing(8) + fields_lay.addWidget(type_combo) + fields_lay.addWidget(location_edit, 1) + fields_lay.addWidget(port_spin) + + def _browse_location(): + try: + typ = (type_combo.currentText() or "").lower() + if typ != "local": + return + start_dir = location_edit.text().strip() or "" + # If the user typed a file path, prefer its directory. + if start_dir and not start_dir.endswith("/") and not start_dir.endswith("\\"): + try: + import os + if os.path.isfile(start_dir): + start_dir = os.path.dirname(start_dir) + except Exception: + pass + selected = QFileDialog.getExistingDirectory(self, "Select folder", start_dir) + if selected: + location_edit.setText(selected) + except Exception: + return + + # Click-to-browse for local paths (no extra button needed). + location_edit.clicked.connect(_browse_location) + + endpoint_row = QWidget() + endpoint_row.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed) + endpoint_form = QFormLayout(endpoint_row) + endpoint_form.setContentsMargins(0, 0, 0, 0) + endpoint_form.setSpacing(6) + endpoint_form.addRow(endpoint_fields) + + auth_widget = QFrame() + auth_widget.setFrameShape(QFrame.NoFrame) + auth_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Maximum) + auth_widget.setMinimumHeight(0) + + auth_lay = QVBoxLayout(auth_widget) + auth_lay.setContentsMargins(0, 0, 0, 0) + auth_lay.setSpacing(6) + auth_lay.setAlignment(Qt.AlignTop) + + auth_title = QLabel("Authentication") + auth_title.setStyleSheet("font-weight: 600;") + + widgets: Dict[str, Any] = {} + + # SMB auth + smb_wrap = QWidget() + smb_wrap.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Maximum) + smb_form = QFormLayout(smb_wrap) + smb_form.setContentsMargins(0, 0, 0, 0) + smb_guest = QCheckBox("Login as Guest") + smb_user_lbl = QLabel("Username") + smb_username = QLineEdit() + smb_pass_lbl = QLabel("Password") + smb_password = QLineEdit() + smb_password.setEchoMode(QLineEdit.Password) + + smb_auth = existing.get("auth", {}) if existing.get("type") == "smb" else {} + smb_guest.setChecked(bool(smb_auth.get("guest", True))) + smb_username.setText(smb_auth.get("username", "")) + smb_password.setText(smb_auth.get("password", "")) + + smb_form.addRow(smb_guest) + smb_form.addRow(smb_user_lbl, smb_username) + smb_form.addRow(smb_pass_lbl, smb_password) + + def _smb_guest_update(): + guest = smb_guest.isChecked() + smb_user_lbl.setVisible(not guest) + smb_username.setVisible(not guest) + smb_pass_lbl.setVisible(not guest) + smb_password.setVisible(not guest) + + smb_guest.stateChanged.connect(_smb_guest_update) + _smb_guest_update() + + auth_lay.addWidget(auth_title) + auth_lay.addWidget(smb_wrap) + + widgets["smb"] = { + "wrap": smb_wrap, + "guest": smb_guest, + "user_lbl": smb_user_lbl, + "username": smb_username, + "pass_lbl": smb_pass_lbl, + "password": smb_password, + } + + return type_combo, location_edit, port_spin, endpoint_row, auth_widget, widgets + + def _on_type_changed( + self, + type_combo: QComboBox, + location_edit: QLineEdit, + port_spin: QSpinBox, + auth_widget: QWidget, + widgets: Dict[str, Any], + initial: bool = False, + ): + typ = type_combo.currentText() + + placeholder = { + "local": "Local path (e.g. /data or C:\\Data)", + "smb": "SMB path (e.g. \\\\SERVER\\Share\\Folder)", + }.get(typ, "") + location_edit.setPlaceholderText(placeholder) + # For local endpoints, clicking the location field opens a folder chooser. + # For non-local endpoints, keep normal typing behavior. + if (typ or "").lower() == "local": + location_edit.setCursorPosition(len(location_edit.text())) + # Ports are not used for local/SMB endpoints. + port_spin.setVisible(False) + + if typ == "local": + auth_widget.setVisible(False) + auth_widget.setMaximumHeight(0) + else: + auth_widget.setVisible(True) + auth_widget.setMaximumHeight(16777215) + + # Hide all auth sections + for key in ("smb",): + if key in widgets and "wrap" in widgets[key]: + widgets[key]["wrap"].setVisible(False) + + # Show only the relevant auth section + if typ == "smb": + widgets["smb"]["wrap"].setVisible(True) + w = widgets["smb"] + if w["guest"].isChecked() is False: + if not w["username"].text().strip() and not w["password"].text().strip(): + w["guest"].setChecked(True) + + auth_widget.adjustSize() + if auth_widget.parentWidget() is not None: + auth_widget.parentWidget().adjustSize() + self.adjustSize() + + # ------------------------------------------------------------------ + # Validation + # ------------------------------------------------------------------ + + def _on_ok(self): + if not self.name.text().strip(): + MsgBox.show(self, "Job", "Name is required.", icon="warning") + return + + src_type = self._source_type_combo.currentText() + tgt_type = self._target_type_combo.currentText() + + src_loc = self._source_location_edit.text().strip() + tgt_loc = self._target_location_edit.text().strip() + + if not src_loc: + MsgBox.show(self, "Job", "Source location is required.", icon="warning") + return + if not tgt_loc: + MsgBox.show(self, "Job", "Target location is required.", icon="warning") + return + + if src_type == "smb": + w = self._source_auth_widgets["smb"] + if not w["guest"].isChecked(): + if not w["username"].text().strip() or not w["password"].text().strip(): + MsgBox.show(self, "Job", "Source username and password are required.", icon="warning") + return + + if tgt_type == "smb": + w = self._target_auth_widgets["smb"] + if not w["guest"].isChecked(): + if not w["username"].text().strip() or not w["password"].text().strip(): + MsgBox.show(self, "Job", "Target username and password are required.", icon="warning") + return + + self.accept() + + # ------------------------------------------------------------------ + # Value extraction + # ------------------------------------------------------------------ + + def value(self) -> Dict[str, Any]: + def _extract(type_combo: QComboBox, location_edit: QLineEdit, port_spin: QSpinBox, widgets: Dict[str, Any]) -> Dict[str, Any]: + typ = type_combo.currentText() + location = location_edit.text().strip() + auth: Dict[str, Any] = {} + + if typ == "local": + auth = {} + elif typ == "smb": + w = widgets["smb"] + auth = { + "guest": bool(w["guest"].isChecked()), + "username": w["username"].text().strip(), + "password": w["password"].text(), + } + else: + # Unsupported/legacy types fall back to local + typ = "local" + auth = {} + + return {"type": typ, "location": location, "auth": auth} + + source_ep = _extract(self._source_type_combo, self._source_location_edit, self._source_port_spin, self._source_auth_widgets) + target_ep = _extract(self._target_type_combo, self._target_location_edit, self._target_port_spin, self._target_auth_widgets) + + val: Dict[str, Any] = { + "name": self.name.text().strip(), + "sourceEndpoint": source_ep, + "targetEndpoint": target_ep, + "source": source_ep["location"], # backward compatibility + "target": target_ep["location"], # backward compatibility + "allowDeletion": bool(self.allow_deletion.isChecked()), + "preserveMetadata": bool(self.preserve_metadata.isChecked()), + "mode": self.mode.currentText(), + "direction": self.direction.currentText(), + "enabled": bool(self.enabled.isChecked()), + } + + # Preserve schedule (and update if ScheduleDialog was used) + if self._original_job and isinstance(self._original_job, dict): + sched = self._original_job.get("schedule") + if sched is not None: + val["schedule"] = sched + + return val + + +# ------------------------------------------------------------------ +# Schedule Dialog +# ------------------------------------------------------------------ +class ScheduleDialog(QDialog): + """Schedule editor UI. + + Persists as: + schedule = { + "enabled": bool, + "intervalSeconds": int, # computed fallback (minimum per-day interval if enabled, else default) + "windows": { + "0":[{"start":"22:00","end":"06:00","intervalSeconds":3600}], + ... + } + } + """ + + _DAYS: List[Tuple[int, str]] = [ + (0, "Monday"), + (1, "Tuesday"), + (2, "Wednesday"), + (3, "Thursday"), + (4, "Friday"), + (5, "Saturday"), + (6, "Sunday"), + ] + + def __init__(self, parent=None, job: Optional[Dict[str, Any]] = None, *, default_interval_seconds: Optional[int] = None): + super().__init__(parent) + self.setWindowTitle("Schedule") + self.setModal(True) + + # Default interval for new schedules (seconds). Provided by the app (e.g. Configuration service.defaultInterval). + try: + di = int(default_interval_seconds) if default_interval_seconds is not None else 3600 + except Exception: + di = 3600 + if di <= 0: + di = 3600 + self._default_interval_seconds = di + + job = job or {} + schedule = job.get("schedule") if isinstance(job.get("schedule"), dict) else {} + if not schedule: + # Default schedule: enabled, every day, all day, default interval + schedule = {"enabled": True, "intervalSeconds": int(self._default_interval_seconds), "windows": {}} + for wd in range(7): + schedule["windows"][str(wd)] = [{"start": "00:00", "end": "23:59", "intervalSeconds": int(self._default_interval_seconds)}] + else: + # Ensure windows exists; if missing, default to every day all day + if not isinstance(schedule.get("windows"), dict) or not schedule.get("windows"): + schedule["windows"] = {} + for wd in range(7): + schedule["windows"][str(wd)] = [{"start": "00:00", "end": "23:59", "intervalSeconds": int(schedule.get("intervalSeconds") or self._default_interval_seconds)}] + else: + # Ensure each day window has intervalSeconds + try: + for _k, _v in list(schedule["windows"].items()): + if isinstance(_v, list) and _v and isinstance(_v[0], dict) and "intervalSeconds" not in _v[0]: + _v[0]["intervalSeconds"] = int(schedule.get("intervalSeconds") or self._default_interval_seconds) + except Exception: + pass + + self._windows: Dict[str, Any] = schedule.get("windows", {}) if isinstance(schedule.get("windows", {}), dict) else {} + + layout = QVBoxLayout(self) + layout.setAlignment(Qt.AlignTop) + layout.setSizeConstraint(QLayout.SetMinimumSize) + + form = QFormLayout() + layout.addLayout(form) + + self.enabled = QCheckBox() + self.enabled.setChecked(bool(schedule.get("enabled", False))) + + form.addRow("Enabled", self.enabled) + + # Windows editor + win_frame = QFrame() + win_frame.setFrameShape(QFrame.NoFrame) + win_lay = QVBoxLayout(win_frame) + win_lay.setContentsMargins(0, 0, 0, 0) + win_lay.setSpacing(6) + + title = QLabel("Allowed run windows") + title.setStyleSheet("font-weight: 600;") + win_lay.addWidget(title) + + self._day_controls: Dict[int, Dict[str, Any]] = {} + + for wd, label in self._DAYS: + row = QHBoxLayout() + row.setSpacing(10) + + day_enabled = QCheckBox(label) + start = QTimeEdit() + end = QTimeEdit() + start.setDisplayFormat("HH:mm") + end.setDisplayFormat("HH:mm") + start.setTime(QTime(0, 0)) + end.setTime(QTime(23, 59)) + + interval = QSpinBox() + interval.setRange(1, 604800) # 1s .. 7 days + interval.setFixedWidth(130) + interval.setValue(int(schedule.get("intervalSeconds", self._default_interval_seconds) or self._default_interval_seconds)) + + # Load from existing windows if present + key = str(wd) + day_windows = self._windows.get(key) + if isinstance(day_windows, list) and len(day_windows) > 0 and isinstance(day_windows[0], dict): + w0 = day_windows[0] + s = str(w0.get("start", "00:00")) + e = str(w0.get("end", "23:59")) + day_enabled.setChecked(True) + start.setTime(self._parse_qtime(s, QTime(0, 0))) + end.setTime(self._parse_qtime(e, QTime(23, 59))) + try: + interval.setValue(int(w0.get("intervalSeconds", schedule.get("intervalSeconds", self._default_interval_seconds)) or self._default_interval_seconds)) + except Exception: + interval.setValue(int(schedule.get("intervalSeconds", self._default_interval_seconds) or self._default_interval_seconds)) + else: + day_enabled.setChecked(False) + try: + interval.setValue(int(schedule.get("intervalSeconds", self._default_interval_seconds) or self._default_interval_seconds)) + except Exception: + interval.setValue(int(self._default_interval_seconds)) + + # Disable edits when not enabled + def _apply_enabled_state(_=None, *, _day_enabled=day_enabled, _start=start, _end=end, _interval=interval): + en = _day_enabled.isChecked() + _start.setEnabled(en) + _end.setEnabled(en) + _interval.setEnabled(en) + + day_enabled.stateChanged.connect(_apply_enabled_state) + _apply_enabled_state() + + row.addWidget(day_enabled, 1) + row.addWidget(QLabel("Start")) + row.addWidget(start) + row.addWidget(QLabel("End")) + row.addWidget(end) + row.addWidget(QLabel("Interval (s)")) + row.addWidget(interval) + + win_lay.addLayout(row) + + self._day_controls[wd] = {"enabled": day_enabled, "start": start, "end": end, "interval": interval} + + layout.addWidget(win_frame) + + btn_row = QHBoxLayout() + btn_row.addStretch(1) + cancel_btn = QPushButton("Cancel") + ok_btn = QPushButton("Save") + cancel_btn.clicked.connect(self.reject) + ok_btn.clicked.connect(self._on_ok) + btn_row.addWidget(cancel_btn) + btn_row.addWidget(ok_btn) + layout.addLayout(btn_row) + + self._sync_enabled_state() + + self.enabled.stateChanged.connect(lambda _i: self._sync_enabled_state()) + + def _sync_enabled_state(self): + en = self.enabled.isChecked() + for wd, ctrls in self._day_controls.items(): + ctrls["enabled"].setEnabled(en) + # if schedule disabled, visually disable time edits too + if not en: + ctrls["start"].setEnabled(False) + ctrls["end"].setEnabled(False) + ctrls["interval"].setEnabled(False) + else: + # restore per-day checkbox logic + day_en = ctrls["enabled"].isChecked() + ctrls["start"].setEnabled(day_en) + ctrls["end"].setEnabled(day_en) + ctrls["interval"].setEnabled(day_en) + + def _parse_qtime(self, s: str, default: QTime) -> QTime: + try: + parts = s.strip().split(":") + if len(parts) != 2: + return default + hh = int(parts[0]) + mm = int(parts[1]) + if hh < 0 or hh > 23 or mm < 0 or mm > 59: + return default + return QTime(hh, mm) + except Exception: + return default + + def _on_ok(self): + if self.enabled.isChecked(): + for _wd, ctrls in self._day_controls.items(): + if not ctrls["enabled"].isChecked(): + continue + if int(ctrls["interval"].value() or 0) < 1: + MsgBox.show(self, "Schedule", "Per-day interval must be at least 1 second for enabled days.", icon="warning") + return + + # Basic validation: ensure enabled days have valid times (QTimeEdit ensures format) + # Allow overnight windows naturally (start > end) — that's intended. + self.accept() + + def value(self) -> Dict[str, Any]: + windows: Dict[str, Any] = {} + enabled_intervals = [] + if self.enabled.isChecked(): + for wd, ctrls in self._day_controls.items(): + if not ctrls["enabled"].isChecked(): + continue + s = ctrls["start"].time().toString("HH:mm") + e = ctrls["end"].time().toString("HH:mm") + itv = int(ctrls["interval"].value()) + windows[str(wd)] = [{"start": s, "end": e, "intervalSeconds": itv}] + enabled_intervals.append(itv) + + # Compute fallback intervalSeconds: min per-day interval if enabled, else default + if self.enabled.isChecked() and enabled_intervals: + computed_interval_seconds = min(enabled_intervals) + else: + computed_interval_seconds = int(self._default_interval_seconds) + + return { + "enabled": bool(self.enabled.isChecked()), + "intervalSeconds": int(computed_interval_seconds), + "windows": windows, + } diff --git a/test.sh b/test.sh new file mode 100755 index 0000000..4d981c8 --- /dev/null +++ b/test.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +touch tmp/a/file-$(date +%H-%M).txt diff --git a/update.sh b/update.sh new file mode 100755 index 0000000..86308c9 --- /dev/null +++ b/update.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +BRANCH=$(git config -f .gitmodules submodule.src/core.branch) +cd src/core && git checkout $BRANCH && git pull && cd ../.. +git add src/core +git commit -m "General: Update submodule corePY" || echo "No changes to commit" +git push