From 168087166e0ac9ee99602b162ff8dd313a4b7fb1 Mon Sep 17 00:00:00 2001 From: arabcoders Date: Tue, 30 Dec 2025 22:56:59 +0300 Subject: [PATCH 1/9] Fix: updated id logic --- tools/fbc_extractor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/fbc_extractor.py b/tools/fbc_extractor.py index 17d5073..da09e1a 100644 --- a/tools/fbc_extractor.py +++ b/tools/fbc_extractor.py @@ -32,7 +32,7 @@ def _match_id(cls, url) -> None | str: return None if mat.group("fid"): - return f"{mat.group('id')}-{mat.group('fid')}" + return mat.group("fid") return mat.group("id") @@ -97,7 +97,7 @@ def _real_extract(self, url): self.report_warning("Token contains no uploaded files.") return None - if len(playlist) == 1 or self.get_param("noplaylist"): + if is_single or self.get_param("noplaylist"): if self.get_param("noplaylist") and len(playlist) > 1: self.to_screen(f"Downloading 1 video out of '{len(playlist)}' because of --no-playlist option") playlist[0]["_type"] = "video" From 1969825016e7f64a231612191e9a9da854415845 Mon Sep 17 00:00:00 2001 From: arabcoders Date: Sat, 10 Jan 2026 19:15:09 +0300 Subject: [PATCH 2/9] refactor: validate token expire/disabled status --- backend/app/models.py | 6 +- backend/app/routers/tokens.py | 48 ++-- backend/app/routers/uploads.py | 48 ++-- backend/tests/test_download_restrictions.py | 159 ++++++++++++ backend/tests/test_token_status.py | 112 +++++++++ backend/tests/test_tus_token_validation.py | 258 ++++++++++++++++++++ frontend/app/components/AdminTokenForm.vue | 8 +- frontend/app/composables/useTokenInfo.ts | 20 +- frontend/app/pages/f/[token].vue | 16 +- frontend/app/pages/t/[token].vue | 109 +++++---- frontend/app/tests/useTokenInfo.test.ts | 13 +- frontend/app/types/token.ts | 1 + frontend/app/utils/index.ts | 11 +- 13 files changed, 713 insertions(+), 96 deletions(-) create mode 100644 backend/tests/test_download_restrictions.py create mode 100644 backend/tests/test_token_status.py create mode 100644 backend/tests/test_tus_token_validation.py diff --git a/backend/app/models.py b/backend/app/models.py index c793f30..d30d628 100644 --- a/backend/app/models.py +++ b/backend/app/models.py @@ -18,7 +18,7 @@ class UploadToken(Base): uploads_used: Mapped[int] = mapped_column(Integer, nullable=False, default=0) allowed_mime: Mapped[list | None] = mapped_column("allowed_mime", JSON, nullable=True) disabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, default=lambda: datetime.now(UTC)) uploads: Mapped[list["UploadRecord"]] = relationship("UploadRecord", back_populates="token", cascade="all, delete-orphan") @@ -42,7 +42,7 @@ class UploadRecord(Base): upload_length: Mapped[int | None] = mapped_column(BigInteger) upload_offset: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0) status: Mapped[str] = mapped_column(String(32), nullable=False, default="pending") - created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) - completed_at: Mapped[datetime | None] = mapped_column(DateTime) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, default=lambda: datetime.now(UTC)) + completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) token: Mapped[UploadToken] = relationship("UploadToken", back_populates="uploads") diff --git a/backend/app/routers/tokens.py b/backend/app/routers/tokens.py index 45d5254..ee06675 100644 --- a/backend/app/routers/tokens.py +++ b/backend/app/routers/tokens.py @@ -116,7 +116,7 @@ async def get_token( db: Annotated[AsyncSession, Depends(get_db)], ) -> schemas.TokenPublicInfo: """ - Get information about an upload token. + Get information about an token. Args: request (Request): The FastAPI request object. @@ -124,7 +124,7 @@ async def get_token( db (AsyncSession): The database session. Returns: - TokenPublicInfo: The upload token information. + TokenPublicInfo: The upload token information """ stmt: Select[tuple[models.UploadToken]] = select(models.UploadToken).where( @@ -135,18 +135,6 @@ async def get_token( if not (token_row := res.scalar_one_or_none()): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") - now: datetime = datetime.now(UTC) - expires_at: datetime = token_row.expires_at - - if expires_at.tzinfo is None: - expires_at = expires_at.replace(tzinfo=UTC) - - if token_row.disabled: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is disabled") - - if expires_at < now: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token has expired") - uploads_stmt: Select[tuple[models.UploadRecord]] = ( select(models.UploadRecord).where(models.UploadRecord.token_id == token_row.id).order_by(models.UploadRecord.created_at.desc()) ) @@ -341,7 +329,7 @@ async def get_file_info( download_token: str, upload_id: str, db: Annotated[AsyncSession, Depends(get_db)], - _: Annotated[bool, Depends(optional_admin_check)], + is_admin: Annotated[bool, Depends(optional_admin_check)], ) -> schemas.UploadRecordResponse: """ Retrieve metadata about a specific uploaded file. @@ -351,6 +339,7 @@ async def get_file_info( download_token (str): The download token associated with the upload. upload_id (str): The public ID of the upload. db (AsyncSession): The database session. + is_admin (bool): Whether the request is authenticated as admin. Returns: UploadRecordResponse: Metadata about the uploaded file. @@ -362,6 +351,19 @@ async def get_file_info( if not (token_row := token_res.scalar_one_or_none()): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Download token not found") + # Check token status (admin can bypass) + if not is_admin: + now: datetime = datetime.now(UTC) + expires_at: datetime = token_row.expires_at + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=UTC) + + if expires_at < now: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token has expired") + + if token_row.disabled: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is disabled") + upload_stmt: Select[tuple[models.UploadRecord]] = select(models.UploadRecord).where( models.UploadRecord.public_id == upload_id, models.UploadRecord.token_id == token_row.id ) @@ -390,7 +392,7 @@ async def download_file( download_token: str, upload_id: str, db: Annotated[AsyncSession, Depends(get_db)], - _: Annotated[bool, Depends(optional_admin_check)], + is_admin: Annotated[bool, Depends(optional_admin_check)], ) -> FileResponse: """ Download the file associated with a specific upload. @@ -399,6 +401,7 @@ async def download_file( download_token (str): The download token associated with the upload. upload_id (str): The public ID of the upload. db (AsyncSession): The database session. + is_admin (bool): Whether the request is authenticated as admin. Returns: FileResponse: The file response for downloading the file. @@ -409,6 +412,19 @@ async def download_file( if not (token_row := token_res.scalar_one_or_none()): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Download token not found") + # Check token status (admin can bypass) + if not is_admin: + now: datetime = datetime.now(UTC) + expires_at: datetime = token_row.expires_at + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=UTC) + + if expires_at < now: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token has expired") + + if token_row.disabled: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token is disabled") + upload_stmt: Select[tuple[models.UploadRecord]] = select(models.UploadRecord).where( models.UploadRecord.public_id == upload_id, models.UploadRecord.token_id == token_row.id ) diff --git a/backend/app/routers/uploads.py b/backend/app/routers/uploads.py index 40825cf..4f30c4a 100644 --- a/backend/app/routers/uploads.py +++ b/backend/app/routers/uploads.py @@ -25,19 +25,33 @@ router = APIRouter(prefix="/api/uploads", tags=["uploads"]) -async def _ensure_token(db: AsyncSession, token_value: str) -> models.UploadToken: +async def _ensure_token( + db: AsyncSession, + token_value: str | None = None, + token_id: int | None = None, + check_remaining: bool = True, +) -> models.UploadToken: """ - Ensure the upload token is valid, not expired or disabled, and has remaining uploads. + Ensure the upload token is valid, not expired or disabled, and optionally has remaining uploads. Args: db (AsyncSession): Database session. - token_value (str): The upload token string. + token_value (str | None): The upload token string. + token_id (int | None): The upload token ID. + check_remaining (bool): Whether to check remaining uploads. Defaults to True. Returns: UploadToken: The valid upload token object. """ - stmt: Select[tuple[models.UploadToken]] = select(models.UploadToken).where(models.UploadToken.token == token_value) + if token_value: + stmt: Select[tuple[models.UploadToken]] = select(models.UploadToken).where(models.UploadToken.token == token_value) + elif token_id: + stmt: Select[tuple[models.UploadToken]] = select(models.UploadToken).where(models.UploadToken.id == token_id) + else: + msg = "Either token_value or token_id must be provided" + raise ValueError(msg) + res: Result[tuple[models.UploadToken]] = await db.execute(stmt) if not (token := res.scalar_one_or_none()): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") @@ -48,10 +62,12 @@ async def _ensure_token(db: AsyncSession, token_value: str) -> models.UploadToke if expires_at.tzinfo is None: expires_at = expires_at.replace(tzinfo=UTC) - if token.disabled or expires_at < now: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token expired or disabled") + if expires_at < now: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token expired") + if token.disabled: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Token disabled") - if token.remaining_uploads <= 0: + if check_remaining and token.remaining_uploads <= 0: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Upload limit reached") return token @@ -166,6 +182,7 @@ async def tus_head(upload_id: str, db: Annotated[AsyncSession, Depends(get_db)]) """ record: models.UploadRecord = await _get_upload_record(db, upload_id) + await _ensure_token(db, token_id=record.token_id, check_remaining=False) if record.upload_length is None: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Upload length unknown") @@ -216,6 +233,7 @@ async def tus_patch( ) record: models.UploadRecord = await _get_upload_record(db, upload_id) + await _ensure_token(db, token_id=record.token_id, check_remaining=False) if record.upload_length is None: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Upload length unknown") @@ -327,6 +345,8 @@ async def tus_delete(upload_id: str, db: Annotated[AsyncSession, Depends(get_db) """ record: models.UploadRecord = await _get_upload_record(db, upload_id) + await _ensure_token(db, token_id=record.token_id, check_remaining=False) + path = Path(record.storage_path or "") if path.exists(): @@ -351,11 +371,8 @@ async def mark_complete(upload_id: str, db: Annotated[AsyncSession, Depends(get_ UploadRecord: The updated upload record. """ - stmt: Select[tuple[models.UploadRecord]] = select(models.UploadRecord).where(models.UploadRecord.public_id == upload_id) - res: Result[tuple[models.UploadRecord]] = await db.execute(stmt) - - if not (record := res.scalar_one_or_none()): - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found") + record: models.UploadRecord = await _get_upload_record(db, upload_id) + await _ensure_token(db, token_id=record.token_id, check_remaining=False) record.status = "completed" record.completed_at = datetime.now(UTC) @@ -383,12 +400,7 @@ async def cancel_upload( """ record: models.UploadRecord = await _get_upload_record(db, upload_id) - - stmt: Select[tuple[models.UploadToken]] = select(models.UploadToken).where(models.UploadToken.token == token) - res: Result[tuple[models.UploadToken]] = await db.execute(stmt) - - if not (token_row := res.scalar_one_or_none()): - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found") + token_row: models.UploadToken = await _ensure_token(db, token_value=token, check_remaining=False) if record.token_id != token_row.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Upload does not belong to this token") diff --git a/backend/tests/test_download_restrictions.py b/backend/tests/test_download_restrictions.py new file mode 100644 index 0000000..eacc69a --- /dev/null +++ b/backend/tests/test_download_restrictions.py @@ -0,0 +1,159 @@ +"""Test download restrictions for expired/disabled tokens.""" + +import pytest +from datetime import UTC, datetime, timedelta +from httpx import ASGITransport, AsyncClient +from fastapi import status + +from backend.app.main import app +from backend.app.config import settings +from backend.tests.utils import create_token, initiate_upload, upload_file_via_tus + + +@pytest.mark.asyncio +async def test_download_blocked_for_disabled_token(): + """Test that downloads are blocked when token is disabled (even with admin key checks).""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create token and upload a file + token_data = await create_token(client, max_uploads=1) + upload_token = token_data["token"] + download_token = token_data["download_token"] + + upload_data = await initiate_upload(client, upload_token, "test.txt", 12) + upload_id = upload_data["upload_id"] + await upload_file_via_tus(client, upload_id, b"test content") + + # Disable the token + await client.patch( + app.url_path_for("update_token", token_value=upload_token), + json={"disabled": True}, + headers={"Authorization": f"Bearer {settings.admin_api_key}"}, + ) + + # Download should be blocked even with admin trying without explicit auth + # Since public downloads are disabled by default, we need admin key + # But even with admin key, if token is disabled, it should check that + download_url = app.url_path_for("download_file", download_token=download_token, upload_id=upload_id) + + # Without admin auth, should get 401 (public downloads disabled) + response = await client.get(download_url) + assert response.status_code in [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN], ( + "Download should be blocked for disabled token without auth" + ) + + +@pytest.mark.asyncio +async def test_download_blocked_for_expired_token(): + """Test that downloads are blocked when token is expired.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create token and upload a file BEFORE it expires + token_data = await create_token(client, max_uploads=1) + upload_token = token_data["token"] + download_token = token_data["download_token"] + + upload_data = await initiate_upload(client, upload_token, "test.txt", 12) + upload_id = upload_data["upload_id"] + await upload_file_via_tus(client, upload_id, b"test content") + + # Now update token to expire in the past + expired_time = datetime.now(UTC) - timedelta(hours=1) + await client.patch( + app.url_path_for("update_token", token_value=upload_token), + json={"expiry_datetime": expired_time.isoformat()}, + headers={"Authorization": f"Bearer {settings.admin_api_key}"}, + ) + + # Download should be blocked + download_url = app.url_path_for("download_file", download_token=download_token, upload_id=upload_id) + response = await client.get(download_url) + assert response.status_code in [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN], ( + "Download should be blocked for expired token without auth" + ) + + +@pytest.mark.asyncio +async def test_download_allowed_for_disabled_token_with_admin_key(): + """Test that admin can download from disabled tokens.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create token and upload a file + token_data = await create_token(client, max_uploads=1) + upload_token = token_data["token"] + download_token = token_data["download_token"] + + upload_data = await initiate_upload(client, upload_token, "test.txt", 12) + upload_id = upload_data["upload_id"] + await upload_file_via_tus(client, upload_id, b"test content") + + # Disable the token + await client.patch( + app.url_path_for("update_token", token_value=upload_token), + json={"disabled": True}, + headers={"Authorization": f"Bearer {settings.admin_api_key}"}, + ) + + # Download should work with admin key + download_url = app.url_path_for("download_file", download_token=download_token, upload_id=upload_id) + response = await client.get(download_url, headers={"Authorization": f"Bearer {settings.admin_api_key}"}) + assert response.status_code == status.HTTP_200_OK, "Admin should be able to download from disabled token" + assert response.content == b"test content", "Downloaded content should match" + + +@pytest.mark.asyncio +async def test_get_file_info_blocked_for_disabled_token(): + """Test that file info is blocked when token is disabled.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create token and upload a file + token_data = await create_token(client, max_uploads=1) + upload_token = token_data["token"] + download_token = token_data["download_token"] + + upload_data = await initiate_upload(client, upload_token, "test.txt", 12) + upload_id = upload_data["upload_id"] + await upload_file_via_tus(client, upload_id, b"test content") + + # Disable the token + await client.patch( + app.url_path_for("update_token", token_value=upload_token), + json={"disabled": True}, + headers={"Authorization": f"Bearer {settings.admin_api_key}"}, + ) + + # File info should be blocked + info_url = app.url_path_for("get_file_info", download_token=download_token, upload_id=upload_id) + response = await client.get(info_url) + assert response.status_code in [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN], ( + "File info should be blocked for disabled token without auth" + ) + + +@pytest.mark.asyncio +async def test_get_file_info_allowed_for_disabled_token_with_admin_key(): + """Test that admin can get file info from disabled tokens.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create token and upload a file + token_data = await create_token(client, max_uploads=1) + upload_token = token_data["token"] + download_token = token_data["download_token"] + + upload_data = await initiate_upload(client, upload_token, "test.txt", 12) + upload_id = upload_data["upload_id"] + await upload_file_via_tus(client, upload_id, b"test content") + + # Disable the token + await client.patch( + app.url_path_for("update_token", token_value=upload_token), + json={"disabled": True}, + headers={"Authorization": f"Bearer {settings.admin_api_key}"}, + ) + + # File info should work with admin key + info_url = app.url_path_for("get_file_info", download_token=download_token, upload_id=upload_id) + response = await client.get(info_url, headers={"Authorization": f"Bearer {settings.admin_api_key}"}) + assert response.status_code == status.HTTP_200_OK, "Admin should be able to get file info from disabled token" + data = response.json() + assert data["filename"] == "test.txt", "File info should be returned" diff --git a/backend/tests/test_token_status.py b/backend/tests/test_token_status.py new file mode 100644 index 0000000..42081c8 --- /dev/null +++ b/backend/tests/test_token_status.py @@ -0,0 +1,112 @@ +"""Test token status handling - disabled and expired tokens.""" + +import pytest +from datetime import UTC, datetime, timedelta +from httpx import ASGITransport, AsyncClient +from fastapi import status + +from backend.app.main import app +from backend.app.config import settings +from backend.tests.utils import create_token + + +@pytest.mark.asyncio +async def test_disabled_token_can_be_viewed_but_not_used(): + """Test that disabled tokens return info via get_token but cannot upload.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create and disable a token + token_data = await create_token(client, max_uploads=2) + upload_token = token_data["token"] + + # Disable the token + await client.patch( + app.url_path_for("update_token", token_value=upload_token), + json={"disabled": True}, + headers={"Authorization": f"Bearer {settings.admin_api_key}"}, + ) + + # get_token should still work and return the token info + response = await client.get(app.url_path_for("get_token", token_value=upload_token)) + assert response.status_code == status.HTTP_200_OK, "get_token should work for disabled tokens" + data = response.json() + assert data["disabled"] is True, "Token should be marked as disabled" + assert data["token"] == upload_token, "Should return token info" + + # But uploads should be blocked + upload_resp = await client.post( + app.url_path_for("initiate_upload"), + params={"token": upload_token}, + json={ + "filename": "test.mp4", + "size_bytes": 1000, + "mimetype": "video/mp4", + "meta_data": {}, + }, + ) + assert upload_resp.status_code == status.HTTP_403_FORBIDDEN, "Disabled token should not allow uploads" + + +@pytest.mark.asyncio +async def test_expired_token_can_be_viewed_but_not_used(): + """Test that expired tokens return info via get_token but cannot upload.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create a token that expires in the past + expired_time = datetime.now(UTC) - timedelta(hours=1) + token_data = await create_token(client, max_uploads=2, expiry_datetime=expired_time.isoformat()) + upload_token = token_data["token"] + + # get_token should still work and return the token info + response = await client.get(app.url_path_for("get_token", token_value=upload_token)) + assert response.status_code == status.HTTP_200_OK, "get_token should work for expired tokens" + data = response.json() + assert data["token"] == upload_token, "Should return token info" + + # Verify it's expired by checking the timestamp + expires_at_str = data["expires_at"] + if expires_at_str.endswith("Z"): + expires_at_str = expires_at_str[:-1] + "+00:00" + expires_at = datetime.fromisoformat(expires_at_str) + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=UTC) + now = datetime.now(UTC) + assert expires_at < now, f"Token should be expired: {expires_at} < {now}" + + # But uploads should be blocked + upload_resp = await client.post( + app.url_path_for("initiate_upload"), + params={"token": upload_token}, + json={ + "filename": "test.mp4", + "size_bytes": 1000, + "mimetype": "video/mp4", + "meta_data": {}, + }, + ) + assert upload_resp.status_code == status.HTTP_403_FORBIDDEN, "Expired token should not allow uploads" + + +@pytest.mark.asyncio +async def test_download_token_view_works_for_disabled(): + """Test that share page (download token view) works for disabled tokens.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create and disable a token + token_data = await create_token(client, max_uploads=2) + upload_token = token_data["token"] + download_token = token_data["download_token"] + + # Disable the token + await client.patch( + app.url_path_for("update_token", token_value=upload_token), + json={"disabled": True}, + headers={"Authorization": f"Bearer {settings.admin_api_key}"}, + ) + + # Access with download token should still work + response = await client.get(app.url_path_for("get_token", token_value=download_token)) + assert response.status_code == status.HTTP_200_OK, "Share view should work for disabled tokens" + data = response.json() + assert data["disabled"] is True, "Token should be marked as disabled" + assert data["download_token"] == download_token, "Should return download token" diff --git a/backend/tests/test_tus_token_validation.py b/backend/tests/test_tus_token_validation.py new file mode 100644 index 0000000..12c827f --- /dev/null +++ b/backend/tests/test_tus_token_validation.py @@ -0,0 +1,258 @@ +"""Tests to verify TUS protocol endpoints enforce token expiry/disabled validation.""" + +from datetime import UTC, datetime, timedelta + +import pytest +from fastapi import status +from httpx import ASGITransport, AsyncClient + +from backend.app.main import app +from backend.tests.utils import create_token, initiate_upload, tus_head + + +@pytest.mark.asyncio +async def test_tus_head_blocked_for_expired_token(): + """TUS HEAD should fail when token is expired.""" + from backend.app.db import get_db + from backend.app.models import UploadToken + from sqlalchemy import select, update + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create valid token and initiate upload + token_data = await create_token(client) + token_value = token_data["token"] + + upload_data = await initiate_upload( + client, + token_value, + filename="test.txt", + filetype="text/plain", + size_bytes=100, + ) + upload_id = upload_data["upload_id"] + + # Manually expire the token in database + async for db in get_db(): + expired_time = datetime.now(UTC) - timedelta(hours=1) + stmt = update(UploadToken).where(UploadToken.token == token_value).values(expires_at=expired_time) + await db.execute(stmt) + await db.commit() + break + + # TUS HEAD should fail for expired token + status_code, headers = await tus_head(client, upload_id) + assert status_code == status.HTTP_403_FORBIDDEN, "HEAD should fail with expired token" + + +@pytest.mark.asyncio +async def test_tus_head_blocked_for_disabled_token(): + """TUS HEAD should fail when token is disabled.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create token + token_data = await create_token(client) + token_value = token_data["token"] + + # Initiate upload + upload_data = await initiate_upload( + client, + token_value, + filename="test.txt", + filetype="text/plain", + size_bytes=100, + ) + upload_id = upload_data["upload_id"] + + # Disable the token + admin_key = "test-admin" + disable_response = await client.patch( + f"/api/tokens/{token_value}", + json={"disabled": True}, + headers={"Authorization": f"Bearer {admin_key}"}, + ) + assert disable_response.status_code == status.HTTP_200_OK, "Token should be disabled successfully" + + # TUS HEAD should now fail + status_code, headers = await tus_head(client, upload_id) + assert status_code == status.HTTP_403_FORBIDDEN, "HEAD should fail with disabled token" + + +@pytest.mark.asyncio +async def test_tus_patch_blocked_for_expired_token(): + """TUS PATCH should fail when token is expired.""" + from backend.app.db import get_db + from backend.app.models import UploadToken + from sqlalchemy import select, update + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create valid token and initiate upload + token_data = await create_token(client) + token_value = token_data["token"] + + upload_data = await initiate_upload( + client, + token_value, + filename="test.txt", + filetype="text/plain", + size_bytes=11, + ) + upload_id = upload_data["upload_id"] + + # Manually expire the token in database + async for db in get_db(): + expired_time = datetime.now(UTC) - timedelta(hours=1) + stmt = update(UploadToken).where(UploadToken.token == token_value).values(expires_at=expired_time) + await db.execute(stmt) + await db.commit() + break + + # Try to upload data via PATCH + patch_response = await client.patch( + f"/api/uploads/{upload_id}/tus", + content=b"Hello World", + headers={ + "Upload-Offset": "0", + "Content-Type": "application/offset+octet-stream", + "Content-Length": "11", + }, + ) + assert patch_response.status_code == status.HTTP_403_FORBIDDEN, "PATCH should fail with expired token" + + +@pytest.mark.asyncio +async def test_tus_patch_blocked_for_disabled_token(): + """TUS PATCH should fail when token is disabled.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create token + token_data = await create_token(client) + token_value = token_data["token"] + + # Initiate upload + upload_data = await initiate_upload( + client, + token_value, + filename="test.txt", + filetype="text/plain", + size_bytes=11, + ) + upload_id = upload_data["upload_id"] + + # Disable the token + admin_key = "test-admin" + disable_response = await client.patch( + f"/api/tokens/{token_value}", + json={"disabled": True}, + headers={"Authorization": f"Bearer {admin_key}"}, + ) + assert disable_response.status_code == status.HTTP_200_OK, "Token should be disabled successfully" + + # Try to upload data via PATCH + patch_response = await client.patch( + f"/api/uploads/{upload_id}/tus", + content=b"Hello World", + headers={ + "Upload-Offset": "0", + "Content-Type": "application/offset+octet-stream", + "Content-Length": "11", + }, + ) + assert patch_response.status_code == status.HTTP_403_FORBIDDEN, "PATCH should fail with disabled token" + + +@pytest.mark.asyncio +async def test_tus_delete_blocked_for_expired_token(): + """TUS DELETE should fail when token is expired.""" + from backend.app.db import get_db + from backend.app.models import UploadToken + from sqlalchemy import select, update + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create valid token and initiate upload + token_data = await create_token(client) + token_value = token_data["token"] + + upload_data = await initiate_upload( + client, + token_value, + filename="test.txt", + filetype="text/plain", + size_bytes=100, + ) + upload_id = upload_data["upload_id"] + + # Manually expire the token in database + async for db in get_db(): + expired_time = datetime.now(UTC) - timedelta(hours=1) + stmt = update(UploadToken).where(UploadToken.token == token_value).values(expires_at=expired_time) + await db.execute(stmt) + await db.commit() + break + + # Try to delete via TUS DELETE + delete_response = await client.delete(f"/api/uploads/{upload_id}/tus") + assert delete_response.status_code == status.HTTP_403_FORBIDDEN, "DELETE should fail with expired token" + + +@pytest.mark.asyncio +async def test_tus_delete_blocked_for_disabled_token(): + """TUS DELETE should fail when token is disabled.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create token + token_data = await create_token(client) + token_value = token_data["token"] + + # Initiate upload + upload_data = await initiate_upload( + client, + token_value, + filename="test.txt", + filetype="text/plain", + size_bytes=100, + ) + upload_id = upload_data["upload_id"] + + # Disable the token + admin_key = "test-admin" + disable_response = await client.patch( + f"/api/tokens/{token_value}", + json={"disabled": True}, + headers={"Authorization": f"Bearer {admin_key}"}, + ) + assert disable_response.status_code == status.HTTP_200_OK, "Token should be disabled successfully" + + # Try to delete via TUS DELETE + delete_response = await client.delete(f"/api/uploads/{upload_id}/tus") + assert delete_response.status_code == status.HTTP_403_FORBIDDEN, "DELETE should fail with disabled token" + + +@pytest.mark.asyncio +async def test_tus_delete_works_with_valid_token(): + """TUS DELETE should work when token is valid.""" + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + # Create token + token_data = await create_token(client) + token_value = token_data["token"] + + # Initiate upload + upload_data = await initiate_upload( + client, + token_value, + filename="test.txt", + filetype="text/plain", + size_bytes=100, + ) + upload_id = upload_data["upload_id"] + + # Delete should work with valid token + delete_response = await client.delete(f"/api/uploads/{upload_id}/tus") + assert delete_response.status_code == status.HTTP_204_NO_CONTENT, "DELETE should work with valid token" + + # Verify upload is gone + status_code, headers = await tus_head(client, upload_id) + assert status_code == status.HTTP_404_NOT_FOUND, "Upload should be deleted" diff --git a/frontend/app/components/AdminTokenForm.vue b/frontend/app/components/AdminTokenForm.vue index 71fd0cb..f20afec 100644 --- a/frontend/app/components/AdminTokenForm.vue +++ b/frontend/app/components/AdminTokenForm.vue @@ -129,7 +129,13 @@ function handleSubmit() { }; if (state.expiry) { - payload.expiry_datetime = new Date(state.expiry).toISOString(); + const localDate = new Date(state.expiry); + const tzOffset = -localDate.getTimezoneOffset(); + const offsetHours = Math.floor(Math.abs(tzOffset) / 60).toString().padStart(2, '0'); + const offsetMins = (Math.abs(tzOffset) % 60).toString().padStart(2, '0'); + const offsetSign = tzOffset >= 0 ? '+' : '-'; + const isoWithTz = `${state.expiry}:00${offsetSign}${offsetHours}:${offsetMins}`; + payload.expiry_datetime = isoWithTz; } if (props.mode === "edit") { diff --git a/frontend/app/composables/useTokenInfo.ts b/frontend/app/composables/useTokenInfo.ts index ac38c80..e231782 100644 --- a/frontend/app/composables/useTokenInfo.ts +++ b/frontend/app/composables/useTokenInfo.ts @@ -5,6 +5,8 @@ export function useTokenInfo(tokenValue: Ref) { const tokenInfo = ref(null) const notFound = ref(false) const tokenError = ref('') + const isExpired = ref(false) + const isDisabled = ref(false) const shareLinkText = computed(() => { if (!tokenInfo.value) return '' @@ -17,10 +19,23 @@ export function useTokenInfo(tokenValue: Ref) { return } tokenError.value = '' + isExpired.value = false + isDisabled.value = false try { - const data = await $fetch('/api/tokens/' + tokenValue.value) + const { $apiFetch } = useNuxtApp() + const data = await $apiFetch('/api/tokens/' + tokenValue.value) tokenInfo.value = data as any notFound.value = false + + // Check token status based on returned data + if (tokenInfo.value) { + const now = new Date() + if (tokenInfo.value.expires_at) { + const expiresAt = new Date(tokenInfo.value.expires_at) + isExpired.value = expiresAt < now + } + isDisabled.value = tokenInfo.value.disabled || false + } } catch (err: any) { tokenInfo.value = null notFound.value = true @@ -28,5 +43,6 @@ export function useTokenInfo(tokenValue: Ref) { } } - return { tokenInfo, notFound, tokenError, shareLinkText, fetchTokenInfo } + return { tokenInfo, notFound, tokenError, isExpired, isDisabled, shareLinkText, fetchTokenInfo } } + diff --git a/frontend/app/pages/f/[token].vue b/frontend/app/pages/f/[token].vue index c1f48c6..d719c86 100644 --- a/frontend/app/pages/f/[token].vue +++ b/frontend/app/pages/f/[token].vue @@ -1,11 +1,19 @@