diff --git a/videodb/__about__.py b/videodb/__about__.py index 407daa2..89bbe9e 100644 --- a/videodb/__about__.py +++ b/videodb/__about__.py @@ -2,7 +2,7 @@ -__version__ = "0.2.17" +__version__ = "0.3.0" __title__ = "videodb" __author__ = "videodb" __email__ = "contact@videodb.io" diff --git a/videodb/_constants.py b/videodb/_constants.py index e62a70d..0654846 100644 --- a/videodb/_constants.py +++ b/videodb/_constants.py @@ -33,6 +33,11 @@ class Workflows: add_subtitles = "add_subtitles" +class ReframeMode: + simple = "simple" + smart = "smart" + + class SemanticSearchDefaultValues: result_threshold = 5 score_threshold = 0.2 @@ -85,6 +90,7 @@ class ApiPath: meeting = "meeting" record = "record" editor = "editor" + reframe = "reframe" class Status: diff --git a/videodb/audio.py b/videodb/audio.py index 21c250c..0e29fbe 100644 --- a/videodb/audio.py +++ b/videodb/audio.py @@ -1,5 +1,7 @@ +from typing import Dict, List, Union from videodb._constants import ( ApiPath, + Segmenter, ) @@ -10,6 +12,8 @@ class Audio: :ivar str collection_id: ID of the collection this audio belongs to :ivar str name: Name of the audio file :ivar float length: Duration of the audio in seconds + :ivar list transcript: Timestamped transcript segments + :ivar str transcript_text: Full transcript text """ def __init__( @@ -20,6 +24,8 @@ def __init__( self.collection_id = collection_id self.name = kwargs.get("name", None) self.length = kwargs.get("length", None) + self.transcript = kwargs.get("transcript", None) + self.transcript_text = kwargs.get("transcript_text", None) def __repr__(self) -> str: return ( @@ -43,6 +49,97 @@ def generate_url(self) -> str: ) return url_data.get("signed_url", None) + def _fetch_transcript( + self, + start: int = None, + end: int = None, + segmenter: str = Segmenter.word, + length: int = 1, + force: bool = None, + ) -> None: + if self.transcript and not force and not start and not end: + return + transcript_data = self._connection.get( + path=f"{ApiPath.audio}/{self.id}/{ApiPath.transcription}", + params={ + "start": start, + "end": end, + "segmenter": segmenter, + "length": length, + "force": "true" if force else "false", + }, + show_progress=True, + ) + self.transcript = transcript_data.get("word_timestamps", []) + self.transcript_text = transcript_data.get("text", "") + + def get_transcript( + self, + start: int = None, + end: int = None, + segmenter: Segmenter = Segmenter.word, + length: int = 1, + force: bool = None, + ) -> List[Dict[str, Union[float, str]]]: + """Get timestamped transcript segments for the audio. + + :param int start: Start time in seconds + :param int end: End time in seconds + :param Segmenter segmenter: Segmentation type (:class:`Segmenter.word`, + :class:`Segmenter.sentence`, :class:`Segmenter.time`) + :param int length: Length of segments when using time segmenter + :param bool force: Force fetch new transcript + :return: List of dicts with keys: start (float), end (float), text (str) + :rtype: List[Dict[str, Union[float, str]]] + """ + self._fetch_transcript( + start=start, end=end, segmenter=segmenter, length=length, force=force + ) + return self.transcript + + def get_transcript_text( + self, + start: int = None, + end: int = None, + ) -> str: + """Get plain text transcript for the audio. + + :param int start: Start time in seconds to get transcript from + :param int end: End time in seconds to get transcript until + :param bool force: Force fetch new transcript + :return: Full transcript text as string + :rtype: str + """ + self._fetch_transcript(start=start, end=end) + return self.transcript_text + + def generate_transcript( + self, + force: bool = None, + language_code: str = None, + ) -> dict: + """Generate transcript for the audio. + + :param bool force: Force generate new transcript + :param str language_code: Language code of the spoken audio. If not provided, language is automatically detected. + :return: Success dict if transcript generated or already exists + :rtype: dict + """ + transcript_data = self._connection.post( + path=f"{ApiPath.audio}/{self.id}/{ApiPath.transcription}", + data={ + "force": True if force else False, + "language_code": language_code, + }, + ) + transcript = transcript_data.get("word_timestamps", []) + if transcript: + return { + "success": True, + "message": "Transcript generated successfully", + } + return transcript_data + def delete(self) -> None: """Delete the audio. diff --git a/videodb/editor.py b/videodb/editor.py index 9efc793..4c3ae25 100644 --- a/videodb/editor.py +++ b/videodb/editor.py @@ -16,8 +16,8 @@ class AssetType(str, Enum): class Fit(str, Enum): """Set how the asset should be scaled to fit the viewport using one of the following options: - crop (default) - scale the asset to fill the viewport while maintaining the aspect ratio. The asset will be cropped if it exceeds the bounds of the viewport. + crop (default) - scale the asset to fill the viewport while maintaining the aspect ratio. The asset will be cropped if it exceeds the bounds of the viewport. cover - stretch the asset to fill the viewport without maintaining the aspect ratio. contain - fit the entire asset within the viewport while maintaining the original aspect ratio. none - preserves the original asset dimensions and does not apply any scaling.""" @@ -25,6 +25,7 @@ class Fit(str, Enum): crop = "crop" cover = "cover" contain = "contain" + none = None class Position(str, Enum): diff --git a/videodb/video.py b/videodb/video.py index 430e297..34af19d 100644 --- a/videodb/video.py +++ b/videodb/video.py @@ -3,6 +3,7 @@ from videodb._constants import ( ApiPath, IndexType, + ReframeMode, SceneExtractionType, SearchType, Segmenter, @@ -232,9 +233,6 @@ def get_transcript_text( self, start: int = None, end: int = None, - segmenter: str = Segmenter.word, - length: int = 1, - force: bool = None, ) -> str: """Get plain text transcript for the video. @@ -244,9 +242,7 @@ def get_transcript_text( :return: Full transcript text as string :rtype: str """ - self._fetch_transcript( - start=start, end=end, segmenter=segmenter, length=length, force=force - ) + self._fetch_transcript(start=start, end=end) return self.transcript_text def generate_transcript( @@ -654,3 +650,77 @@ def get_meeting(self): **meeting_data, ) return None + + def reframe( + self, + start: Optional[float] = None, + end: Optional[float] = None, + target: Union[str, Dict[str, int]] = "vertical", + mode: str = ReframeMode.smart, + callback_url: Optional[str] = None, + ) -> Optional["Video"]: + """Reframe video to a new aspect ratio with optional object tracking. + + :param float start: Start time in seconds (optional) + :param float end: End time in seconds (optional) + :param Union[str, dict] target: Target format - preset string (e.g., "vertical", "square", "landscape") or {"width": int, "height": int} + :param str mode: Reframing mode - "simple" or "smart" (default: "smart") + :param str callback_url: URL to receive callback when processing completes (optional) + :raises InvalidRequestError: If the reframe request fails + :return: :class:`Video