Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion videodb/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@



__version__ = "0.2.17"
__version__ = "0.3.0"
__title__ = "videodb"
__author__ = "videodb"
__email__ = "contact@videodb.io"
Expand Down
6 changes: 6 additions & 0 deletions videodb/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class Workflows:
add_subtitles = "add_subtitles"


class ReframeMode:
simple = "simple"
smart = "smart"


class SemanticSearchDefaultValues:
result_threshold = 5
score_threshold = 0.2
Expand Down Expand Up @@ -85,6 +90,7 @@ class ApiPath:
meeting = "meeting"
record = "record"
editor = "editor"
reframe = "reframe"


class Status:
Expand Down
97 changes: 97 additions & 0 deletions videodb/audio.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Dict, List, Union
from videodb._constants import (
ApiPath,
Segmenter,
)


Expand All @@ -10,6 +12,8 @@ class Audio:
:ivar str collection_id: ID of the collection this audio belongs to
:ivar str name: Name of the audio file
:ivar float length: Duration of the audio in seconds
:ivar list transcript: Timestamped transcript segments
:ivar str transcript_text: Full transcript text
"""

def __init__(
Expand All @@ -20,6 +24,8 @@ def __init__(
self.collection_id = collection_id
self.name = kwargs.get("name", None)
self.length = kwargs.get("length", None)
self.transcript = kwargs.get("transcript", None)
self.transcript_text = kwargs.get("transcript_text", None)

def __repr__(self) -> str:
return (
Expand All @@ -43,6 +49,97 @@ def generate_url(self) -> str:
)
return url_data.get("signed_url", None)

def _fetch_transcript(
self,
start: int = None,
end: int = None,
segmenter: str = Segmenter.word,
length: int = 1,
force: bool = None,
) -> None:
if self.transcript and not force and not start and not end:
return
transcript_data = self._connection.get(
path=f"{ApiPath.audio}/{self.id}/{ApiPath.transcription}",
params={
"start": start,
"end": end,
"segmenter": segmenter,
"length": length,
"force": "true" if force else "false",
},
show_progress=True,
)
self.transcript = transcript_data.get("word_timestamps", [])
self.transcript_text = transcript_data.get("text", "")

def get_transcript(
self,
start: int = None,
end: int = None,
segmenter: Segmenter = Segmenter.word,
length: int = 1,
force: bool = None,
) -> List[Dict[str, Union[float, str]]]:
"""Get timestamped transcript segments for the audio.

:param int start: Start time in seconds
:param int end: End time in seconds
:param Segmenter segmenter: Segmentation type (:class:`Segmenter.word`,
:class:`Segmenter.sentence`, :class:`Segmenter.time`)
:param int length: Length of segments when using time segmenter
:param bool force: Force fetch new transcript
:return: List of dicts with keys: start (float), end (float), text (str)
:rtype: List[Dict[str, Union[float, str]]]
"""
self._fetch_transcript(
start=start, end=end, segmenter=segmenter, length=length, force=force
)
return self.transcript

def get_transcript_text(
self,
start: int = None,
end: int = None,
) -> str:
"""Get plain text transcript for the audio.

:param int start: Start time in seconds to get transcript from
:param int end: End time in seconds to get transcript until
:param bool force: Force fetch new transcript
:return: Full transcript text as string
:rtype: str
"""
self._fetch_transcript(start=start, end=end)
return self.transcript_text

def generate_transcript(
self,
force: bool = None,
language_code: str = None,
) -> dict:
"""Generate transcript for the audio.

:param bool force: Force generate new transcript
:param str language_code: Language code of the spoken audio. If not provided, language is automatically detected.
:return: Success dict if transcript generated or already exists
:rtype: dict
"""
transcript_data = self._connection.post(
path=f"{ApiPath.audio}/{self.id}/{ApiPath.transcription}",
data={
"force": True if force else False,
"language_code": language_code,
},
)
transcript = transcript_data.get("word_timestamps", [])
if transcript:
return {
"success": True,
"message": "Transcript generated successfully",
}
return transcript_data

def delete(self) -> None:
"""Delete the audio.

Expand Down
3 changes: 2 additions & 1 deletion videodb/editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ class AssetType(str, Enum):

class Fit(str, Enum):
"""Set how the asset should be scaled to fit the viewport using one of the following options:
crop (default) - scale the asset to fill the viewport while maintaining the aspect ratio. The asset will be cropped if it exceeds the bounds of the viewport.

crop (default) - scale the asset to fill the viewport while maintaining the aspect ratio. The asset will be cropped if it exceeds the bounds of the viewport.
cover - stretch the asset to fill the viewport without maintaining the aspect ratio.
contain - fit the entire asset within the viewport while maintaining the original aspect ratio.
none - preserves the original asset dimensions and does not apply any scaling."""

crop = "crop"
cover = "cover"
contain = "contain"
none = None


class Position(str, Enum):
Expand Down
82 changes: 76 additions & 6 deletions videodb/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from videodb._constants import (
ApiPath,
IndexType,
ReframeMode,
SceneExtractionType,
SearchType,
Segmenter,
Expand Down Expand Up @@ -232,9 +233,6 @@ def get_transcript_text(
self,
start: int = None,
end: int = None,
segmenter: str = Segmenter.word,
length: int = 1,
force: bool = None,
) -> str:
"""Get plain text transcript for the video.

Expand All @@ -244,9 +242,7 @@ def get_transcript_text(
:return: Full transcript text as string
:rtype: str
"""
self._fetch_transcript(
start=start, end=end, segmenter=segmenter, length=length, force=force
)
self._fetch_transcript(start=start, end=end)
return self.transcript_text

def generate_transcript(
Expand Down Expand Up @@ -654,3 +650,77 @@ def get_meeting(self):
**meeting_data,
)
return None

def reframe(
self,
start: Optional[float] = None,
end: Optional[float] = None,
target: Union[str, Dict[str, int]] = "vertical",
mode: str = ReframeMode.smart,
callback_url: Optional[str] = None,
) -> Optional["Video"]:
"""Reframe video to a new aspect ratio with optional object tracking.

:param float start: Start time in seconds (optional)
:param float end: End time in seconds (optional)
:param Union[str, dict] target: Target format - preset string (e.g., "vertical", "square", "landscape") or {"width": int, "height": int}
:param str mode: Reframing mode - "simple" or "smart" (default: "smart")
:param str callback_url: URL to receive callback when processing completes (optional)
:raises InvalidRequestError: If the reframe request fails
:return: :class:`Video <Video>` object if no callback_url, None otherwise
:rtype: Optional[:class:`videodb.video.Video`]
"""
reframe_data = self._connection.post(
path=f"{ApiPath.video}/{self.id}/{ApiPath.reframe}",
data={
"start": start,
"end": end,
"target": target,
"mode": mode,
"callback_url": callback_url,
},
)

if callback_url:
return None

if reframe_data:
return Video(self._connection, **reframe_data)

def smart_vertical_reframe(
self,
start: Optional[float] = None,
end: Optional[float] = None,
callback_url: Optional[str] = None,
) -> Optional["Video"]:
"""Convenience method for object-aware vertical reframing.

Equivalent to calling reframe(target="vertical", mode="smart").

:param float start: Start time in seconds (optional)
:param float end: End time in seconds (optional)
:param str callback_url: URL to receive callback when processing completes (optional)
:return: :class:`Video <Video>` object if no callback_url, None otherwise
:rtype: Optional[:class:`videodb.video.Video`]
"""
return self.reframe(
start=start,
end=end,
target="vertical",
mode=ReframeMode.smart,
callback_url=callback_url,
)

def download(self, name: Optional[str] = None) -> dict:
"""Download the video from its stream URL.

:param str name: Name for the downloaded file (optional, defaults to video name)
:raises InvalidRequestError: If the download request fails
:return: Download response data
:rtype: dict
"""
if not self.stream_url:
raise ValueError("Video does not have a stream_url")

download_name = name or self.name or f"video_{self.id}"
return self._connection.download(self.stream_url, download_name)