Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions backend/app/embed_preview.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from pathlib import Path

from fastapi import HTTPException, Request, status
from fastapi.templating import Jinja2Templates
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from backend.app import models, utils

templates = Jinja2Templates(directory=str(Path(__file__).resolve().parent / "templates"))

EMBED_BOT_SIGNATURES: tuple[str, ...] = (
"discordbot",
"twitterbot",
"slackbot",
"facebookexternalhit",
"whatsapp",
)


def is_embed_bot(user_agent: str | None) -> bool:
user_agent_lower = (user_agent or "").lower()
return any(bot in user_agent_lower for bot in EMBED_BOT_SIGNATURES)


async def get_token(db: AsyncSession, token_value: str) -> models.UploadToken | None:
stmt = select(models.UploadToken).where((models.UploadToken.token == token_value) | (models.UploadToken.download_token == token_value))
result = await db.execute(stmt)
return result.scalar_one_or_none()


async def render_embed_preview(request: Request, db: AsyncSession, token_row: models.UploadToken, user: bool = False):
uploads_stmt = (
select(models.UploadRecord)
.where(models.UploadRecord.token_id == token_row.id, models.UploadRecord.status == "completed")
.order_by(models.UploadRecord.created_at.desc())
)
uploads_result = await db.execute(uploads_stmt)
uploads: list[models.UploadRecord] = list(uploads_result.scalars().all())

media_files = [upload for upload in uploads if upload.mimetype and utils.is_multimedia(upload.mimetype)]
if not media_files:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No multimedia uploads found")

first_media = media_files[0]
ffprobe_data = first_media.meta_data.get("ffprobe") if isinstance(first_media.meta_data, dict) else None
video_metadata = utils.extract_video_metadata(ffprobe_data)
mime_type = first_media.mimetype or "application/octet-stream"
is_video = mime_type.startswith("video/")
is_audio = mime_type.startswith("audio/")

return templates.TemplateResponse(
request=request,
name="share_preview.html",
context={
"request": request,
"title": first_media.filename or "Shared Media",
"description": f"{len(uploads)} file(s) shared" if len(uploads) > 1 else "Shared file",
"og_type": "video.other" if is_video else "music.song",
"share_url": str(request.url_for("share_page", token=token_row.download_token)),
"media_url": str(request.url_for("download_file", download_token=token_row.download_token, upload_id=first_media.public_id)),
"mime_type": mime_type,
"is_video": is_video,
"is_audio": is_audio,
"width": video_metadata.get("width"),
"height": video_metadata.get("height"),
"duration": video_metadata.get("duration"),
"duration_formatted": utils.format_duration(video_metadata["duration"]) if video_metadata.get("duration") else None,
"file_size": utils.format_file_size(first_media.size_bytes) if first_media.size_bytes else None,
"other_files": [
{
"name": upload.filename or "Unknown",
"size": utils.format_file_size(upload.size_bytes) if upload.size_bytes else "Unknown",
}
for upload in uploads
if upload.public_id != first_media.public_id
],
"is_user": user,
},
status_code=status.HTTP_200_OK,
)
116 changes: 33 additions & 83 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from fastapi.concurrency import run_in_threadpool
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from fastapi.templating import Jinja2Templates

from backend.app import version

Expand All @@ -22,13 +21,17 @@


def create_app() -> FastAPI:
from .embed_preview import (
get_token,
is_embed_bot,
render_embed_preview,
)

logging.basicConfig(level=logging.INFO, format="%(levelname)s [%(name)s] %(message)s")

Path(settings.storage_path).mkdir(parents=True, exist_ok=True)
Path(settings.config_path).mkdir(parents=True, exist_ok=True)

templates = Jinja2Templates(directory=str(Path(__file__).parent / "templates"))

@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Expand Down Expand Up @@ -151,94 +154,41 @@ def app_version() -> dict[str, str]:

frontend_dir: Path = Path(settings.frontend_export_path).resolve()

def serve_static():
if frontend_dir.exists():
index_file = frontend_dir / "index.html"
if index_file.exists():
return FileResponse(index_file, status_code=status.HTTP_200_OK)

raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)

@app.get("/e/{token}", name="token_embed")
@app.get("/e/{token}/")
async def token_embed(request: Request, token: str):
"""Render a static embed preview page."""
if not settings.allow_public_downloads:
return serve_static()

from backend.app.db import get_db

async for db in get_db():
if not (token_row := await get_token(db, token)):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Token not found")

return await render_embed_preview(request, db, token_row, user=True)

@app.get("/f/{token}", name="share_page")
@app.get("/f/{token}/")
async def share_page(token: str, request: Request, user_agent: Annotated[str | None, Header()] = None):
"""Handle /f/{token} with bot detection for embed preview."""
from sqlalchemy import select

from backend.app import models, utils
from backend.app.db import get_db

user_agent_lower: str = (user_agent or "").lower()
is_bot = any(bot in user_agent_lower for bot in ["discordbot", "twitterbot", "slackbot", "facebookexternalhit", "whatsapp"])

if is_bot and settings.allow_public_downloads:
if is_embed_bot(user_agent) and settings.allow_public_downloads:
async for db in get_db():
stmt = select(models.UploadToken).where((models.UploadToken.token == token) | (models.UploadToken.download_token == token))
result = await db.execute(stmt)
token_row = result.scalar_one_or_none()

if token_row:
uploads_stmt = (
select(models.UploadRecord)
.where(models.UploadRecord.token_id == token_row.id, models.UploadRecord.status == "completed")
.order_by(models.UploadRecord.created_at.desc())
)
uploads_result = await db.execute(uploads_stmt)
uploads = uploads_result.scalars().all()

media_files = [u for u in uploads if u.mimetype and utils.is_multimedia(u.mimetype)]

if media_files:
first_media = media_files[0]

is_video = first_media.mimetype.startswith("video/")
ffprobe_data = None
if first_media.meta_data and isinstance(first_media.meta_data, dict):
ffprobe_data = first_media.meta_data.get("ffprobe")

video_metadata = utils.extract_video_metadata(ffprobe_data)

other_files = [
{
"name": u.filename or "Unknown",
"size": utils.format_file_size(u.size_bytes) if u.size_bytes else "Unknown",
}
for u in uploads
if u.public_id != first_media.public_id
]

media_url = str(
request.url_for("download_file", download_token=token_row.download_token, upload_id=first_media.public_id)
)
share_url = str(request.url_for("share_page", token=token))

is_video = first_media.mimetype.startswith("video/")
is_audio = first_media.mimetype.startswith("audio/")

context = {
"request": request,
"title": first_media.filename or "Shared Media",
"description": f"{len(uploads)} file(s) shared" if len(uploads) > 1 else "Shared file",
"og_type": "video.other" if is_video else "music.song",
"share_url": share_url,
"media_url": media_url,
"mime_type": first_media.mimetype,
"is_video": is_video,
"is_audio": is_audio,
"width": video_metadata.get("width"),
"height": video_metadata.get("height"),
"duration": video_metadata.get("duration"),
"duration_formatted": utils.format_duration(video_metadata["duration"])
if video_metadata.get("duration")
else None,
"file_size": utils.format_file_size(first_media.size_bytes) if first_media.size_bytes else None,
"other_files": other_files,
}

return templates.TemplateResponse(
request=request,
name="share_preview.html",
context=context,
status_code=status.HTTP_200_OK,
)
if token_row := await get_token(db, token):
return await render_embed_preview(request, db, token_row)

if frontend_dir.exists():
index_file = frontend_dir / "index.html"
if index_file.exists():
return FileResponse(index_file, status_code=status.HTTP_200_OK)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return serve_static()

@app.get("/t/{token}", name="upload_page")
@app.get("/t/{token}/")
Expand Down
26 changes: 25 additions & 1 deletion backend/app/templates/share_preview.html
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,33 @@
padding: 1.5rem 0 1rem;
}
}

.fullscreen {
position: fixed;
inset: 0;
width: 100dvw;
height: 100dvh;
object-fit: contain;
background: #000;
max-height: unset !important;
}
</style>
</head>

<body>
{% if is_user %}
{% if is_video %}
<video class="fullscreen" controls autoplay preload="auto" playsinline>
<source src="{{ media_url }}" mime="{{ mime_type }}" />
Your browser does not support the video tag.
</video>
{% elif is_audio %}
<audio class="fullscreen" controls autoplay preload="auto" playsinline>
<source src="{{ media_url }}" mime="{{ mime_type }}" />
Your browser does not support the audio tag.
</audio>
{% endif %}
{% else %}
<div class="container">
<div class="header">
<h1>{{ title }}</h1>
Expand All @@ -299,7 +322,7 @@ <h1>{{ title }}</h1>
Your browser does not support the video tag.
</video>
{% elif is_audio %}
<audio controls preload="metadata">
<audio controls autoplay preload="metadata" playsinline>
<source src="{{ media_url }}" mime="{{ mime_type }}" />
Your browser does not support the audio tag.
</audio>
Expand Down Expand Up @@ -355,6 +378,7 @@ <h2>File Information</h2>
</div>
{% endif %}
</div>
{% endif %}
</body>

</html>
4 changes: 3 additions & 1 deletion backend/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ async def ensure_faststart_mp4(
tmp_dir = src.parent
fd, tmp_out = tempfile.mkstemp(
prefix=src.name + ".",
suffix=".faststart.tmp",
suffix=".part",
dir=tmp_dir,
)
os.close(fd)
Expand All @@ -308,6 +308,8 @@ async def ensure_faststart_mp4(
"copy",
"-movflags",
"+faststart",
"-f",
"mp4",
str(tmp_out_path),
]

Expand Down
1 change: 1 addition & 0 deletions backend/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
os.environ["FBC_ADMIN_API_KEY"] = "test-admin"
os.environ["FBC_SKIP_MIGRATIONS"] = "1"
os.environ["FBC_SKIP_CLEANUP"] = "1"
os.environ["FBC_ALLOW_PUBLIC_DOWNLOADS"] = "0"

ROOT = Path(__file__).resolve().parent.parent.parent
if str(ROOT) not in sys.path:
Expand Down
49 changes: 49 additions & 0 deletions backend/tests/test_share_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,52 @@ async def test_share_page_bot_preview_with_video(client):
html_content = response.text
assert "og:video" in html_content or "og:type" in html_content, "Should include OpenGraph video metadata"
assert "sample.mp4" in html_content, "Should include video filename"


@pytest.mark.asyncio
async def test_token_embed_page_renders_preview_for_public_token(client):
"""Test that static embed endpoint renders preview HTML for a shared media token."""
with patch("backend.app.security.settings.allow_public_downloads", True):
token_data = await create_token(client, max_uploads=1)
token_value = token_data["token"]
public_token = token_data["download_token"]

video_file = Path(__file__).parent / "fixtures" / "sample.mp4"
file_size = video_file.stat().st_size

from backend.app.main import app

init_resp = await client.post(
app.url_path_for("initiate_upload"),
json={
"filename": "sample.mp4",
"filetype": "video/mp4",
"size_bytes": file_size,
"meta_data": {},
},
params={"token": token_value},
)
assert init_resp.status_code == status.HTTP_201_CREATED, "Upload initiation should succeed"
upload_data = init_resp.json()
upload_id = upload_data["upload_id"]

patch_resp = await client.patch(
app.url_path_for("tus_patch", upload_id=upload_id),
content=video_file.read_bytes(),
headers={
"Content-Type": "application/offset+octet-stream",
"Upload-Offset": "0",
"Content-Length": str(file_size),
},
)
assert patch_resp.status_code == status.HTTP_204_NO_CONTENT, "Video upload should complete"

completed = await wait_for_processing([upload_id], timeout=10.0)
assert completed, "Video processing should complete within timeout"

response = await client.get(app.url_path_for("token_embed", token=public_token))

assert response.status_code == status.HTTP_200_OK, "Embed endpoint should return HTML for valid shared media"
html_content = response.text
assert "og:video" in html_content or "og:type" in html_content, "Embed page should include OpenGraph video metadata"
assert "sample.mp4" in html_content, "Embed page should include video filename"
Loading