diff --git a/.env.example b/.env.example
index 83c8fc8..b855037 100644
--- a/.env.example
+++ b/.env.example
@@ -16,6 +16,9 @@ DAILY_STATS_MESSAGE_ID=24
BOTSTAT=abcdefg12345
MONETAG_URL=https://example.com/your-monetag-link/
+# Logging settings (optional)
+# LOG_LEVEL=INFO # Options: DEBUG, INFO, WARNING, ERROR, CRITICAL
+
# yt-dlp settings (optional)
# YTDLP_COOKIES=cookies.txt
diff --git a/.gitignore b/.gitignore
index d76896c..f0d4b46 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,6 +9,7 @@ config.ini
sqlite*.db
LICENSE.md
cookies.txt
+proxies.txt
#extensions
*.log
diff --git a/data/config.py b/data/config.py
index 8443bfa..cbbcdc8 100644
--- a/data/config.py
+++ b/data/config.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+import logging
import os
from json import loads as json_loads
from pathlib import Path
@@ -44,6 +45,21 @@ def _parse_json_list(key: str) -> list[int]:
return []
+def _parse_log_level(key: str, default: str = "INFO") -> int:
+ """Parse an environment variable as a logging level, returning default if unset/invalid."""
+ level_map = {
+ "DEBUG": logging.DEBUG,
+ "INFO": logging.INFO,
+ "WARNING": logging.WARNING,
+ "ERROR": logging.ERROR,
+ "CRITICAL": logging.CRITICAL,
+ }
+
+ default_level = level_map.get(default.upper().strip(), logging.INFO)
+ value = os.getenv(key, default).upper().strip()
+ return level_map.get(value, default_level)
+
+
class BotConfig(TypedDict):
"""Type definition for bot configuration."""
@@ -103,6 +119,18 @@ class PerformanceConfig(TypedDict):
max_video_duration: int # Maximum video duration in seconds (0 = no limit)
+class TikTokConfig(TypedDict):
+ """Type definition for TikTok extraction configuration."""
+
+ cookies_file: str # Path to Netscape-format cookies file (optional)
+
+
+class LoggingConfig(TypedDict):
+ """Type definition for logging configuration."""
+
+ log_level: int # Logging level (e.g., logging.INFO, logging.DEBUG)
+
+
class Config(TypedDict):
"""Type definition for the main configuration."""
@@ -112,6 +140,8 @@ class Config(TypedDict):
queue: QueueConfig
proxy: ProxyConfig
performance: PerformanceConfig
+ tiktok: TikTokConfig
+ logging: LoggingConfig
config: Config = {
@@ -159,6 +189,12 @@ class Config(TypedDict):
),
"max_video_duration": _parse_int_env("MAX_VIDEO_DURATION", 1800), # 30 minutes
},
+ "tiktok": {
+ "cookies_file": os.getenv("YTDLP_COOKIES", ""),
+ },
+ "logging": {
+ "log_level": _parse_log_level("LOG_LEVEL", "INFO"),
+ },
}
admin_ids: list[int] = config["bot"]["admin_ids"]
diff --git a/data/loader.py b/data/loader.py
index 3d3eb39..c6932ef 100644
--- a/data/loader.py
+++ b/data/loader.py
@@ -11,21 +11,32 @@
from data.config import config
from data.database import init_db, initialize_database_components
-logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)-5.5s] %(message)s",
- handlers=[
- # logging.FileHandler("bot.log"),
- logging.StreamHandler()
- ])
-logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
-logging.getLogger('apscheduler.scheduler').propagate = False
-logging.getLogger('aiogram').setLevel(logging.WARNING)
-
-local_server = AiohttpSession(api=TelegramAPIServer.from_base(config["bot"]["tg_server"]))
-bot = Bot(token=config["bot"]["token"], session=local_server, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
+logging.basicConfig(
+ level=config["logging"]["log_level"],
+ format="%(asctime)s [%(levelname)-5.5s] %(message)s",
+ handlers=[
+ # logging.FileHandler("bot.log"),
+ logging.StreamHandler()
+ ],
+)
+logging.getLogger("apscheduler.executors.default").setLevel(logging.WARNING)
+logging.getLogger("apscheduler.scheduler").propagate = False
+logging.getLogger("aiogram").setLevel(logging.WARNING)
+
+local_server = AiohttpSession(
+ api=TelegramAPIServer.from_base(config["bot"]["tg_server"])
+)
+bot = Bot(
+ token=config["bot"]["token"],
+ session=local_server,
+ default=DefaultBotProperties(parse_mode=ParseMode.HTML),
+)
dp = Dispatcher(storage=MemoryStorage())
-scheduler = AsyncIOScheduler(timezone="America/Los_Angeles", job_defaults={"coalesce": True})
+scheduler = AsyncIOScheduler(
+ timezone="America/Los_Angeles", job_defaults={"coalesce": True}
+)
async def setup_db(db_url: str):
diff --git a/data/locale/ar.json b/data/locale/ar.json
index c63610e..4da392b 100644
--- a/data/locale/ar.json
+++ b/data/locale/ar.json
@@ -38,6 +38,7 @@
"error_deleted": "تم حذف هذا الفيديو من قبل المنشئ.",
"error_private": "هذا الفيديو خاص ولا يمكن الوصول إليه.",
"error_region": "هذا الفيديو غير متوفر في منطقتك.",
+ "error_invalid_link": "رابط الفيديو غير صالح.\nيرجى التحقق من الرابط والمحاولة مرة أخرى.",
"error_network": "حدث خطأ في الشبكة.\nيرجى المحاولة مرة أخرى.",
"error_rate_limit": "طلبات كثيرة جدًا.\nيرجى الانتظار قليلاً والمحاولة مرة أخرى.",
"error_too_long": "هذا الفيديو طويل جدًا.\nالحد الأقصى المسموح به هو 30 دقيقة.",
diff --git a/data/locale/en.json b/data/locale/en.json
index ec370c4..6f17478 100644
--- a/data/locale/en.json
+++ b/data/locale/en.json
@@ -38,6 +38,7 @@
"error_deleted": "This video has been deleted by the creator.",
"error_private": "This video is private and cannot be accessed.",
"error_region": "This video is not available in your region.",
+ "error_invalid_link": "Invalid video link.\nPlease check the link and try again.",
"error_network": "Network error occurred.\nPlease try again.",
"error_rate_limit": "Too many requests.\nPlease wait a moment and try again.",
"error_too_long": "This video is too long.\nMaximum allowed duration is 30 minutes.",
diff --git a/data/locale/hi.json b/data/locale/hi.json
index d9c8729..87a965d 100644
--- a/data/locale/hi.json
+++ b/data/locale/hi.json
@@ -38,6 +38,7 @@
"error_deleted": "यह वीडियो निर्माता द्वारा हटा दिया गया है।",
"error_private": "यह वीडियो निजी है और एक्सेस नहीं किया जा सकता।",
"error_region": "यह वीडियो आपके क्षेत्र में उपलब्ध नहीं है।",
+ "error_invalid_link": "अमान्य वीडियो लिंक।\nकृपया लिंक जांचें और फिर से प्रयास करें।",
"error_network": "नेटवर्क त्रुटि हुई।\nकृपया फिर से प्रयास करें।",
"error_rate_limit": "बहुत अधिक अनुरोध।\nकृपया कुछ देर प्रतीक्षा करें और फिर से प्रयास करें।",
"error_too_long": "यह वीडियो बहुत लंबा है।\nअधिकतम अनुमत अवधि 30 मिनट है।",
diff --git a/data/locale/id.json b/data/locale/id.json
index 6097c29..e833703 100644
--- a/data/locale/id.json
+++ b/data/locale/id.json
@@ -38,6 +38,7 @@
"error_deleted": "Video ini sudah dihapus oleh pembuatnya.",
"error_private": "Video ini privat dan tidak bisa diakses.",
"error_region": "Video ini tidak tersedia di wilayahmu.",
+ "error_invalid_link": "Tautan video tidak valid.\nSilakan periksa tautan dan coba lagi.",
"error_network": "Terjadi kesalahan jaringan.\nSilakan coba lagi.",
"error_rate_limit": "Terlalu banyak permintaan.\nSilakan tunggu sebentar dan coba lagi.",
"error_too_long": "Video ini terlalu panjang.\nDurasi maksimum yang diizinkan adalah 30 menit.",
diff --git a/data/locale/ru.json b/data/locale/ru.json
index 3c27903..47efd14 100644
--- a/data/locale/ru.json
+++ b/data/locale/ru.json
@@ -38,6 +38,7 @@
"error_deleted": "Это видео было удалено автором.",
"error_private": "Это видео приватное и недоступно.",
"error_region": "Это видео недоступно в вашем регионе.",
+ "error_invalid_link": "Неверная ссылка на видео.\nПожалуйста, проверьте ссылку и попробуйте снова.",
"error_network": "Произошла сетевая ошибка.\nПожалуйста, попробуйте снова.",
"error_rate_limit": "Слишком много запросов.\nПожалуйста, подождите немного и попробуйте снова.",
"error_too_long": "Это видео слишком длинное.\nМаксимальная продолжительность — 30 минут.",
diff --git a/data/locale/so.json b/data/locale/so.json
index 5c2e242..f3a7636 100644
--- a/data/locale/so.json
+++ b/data/locale/so.json
@@ -38,6 +38,7 @@
"error_deleted": "Fiidiyowgan waxaa tirtiray abuurahiisa.",
"error_private": "Fiidiyowgan waa qarsoon yahay mana la heli karo.",
"error_region": "Fiidiyowgan laguma heli karo aagaaga.",
+ "error_invalid_link": "Link-a fiidiyowgu ma shaqeynayo.\nFadlan hubi link-a oo isku day mar kale.",
"error_network": "Khalad shabakad ayaa dhacay.\nFadlan isku day mar kale.",
"error_rate_limit": "Codsiyo badan ayaa la sameeyay.\nFadlan sug wakhti yar oo isku day mar kale.",
"error_too_long": "Fiidiyowgan waa dheer yahay.\nMudda ugu badan ee la ogol yahay waa 30 daqiiqo.",
diff --git a/data/locale/uk.json b/data/locale/uk.json
index c68e610..632b30a 100644
--- a/data/locale/uk.json
+++ b/data/locale/uk.json
@@ -38,6 +38,7 @@
"error_deleted": "Це відео було видалено автором.",
"error_private": "Це відео приватне і недоступне.",
"error_region": "Це відео недоступне у вашому регіоні.",
+ "error_invalid_link": "Невірне посилання на відео.\nБудь ласка, перевірте посилання та спробуйте ще раз.",
"error_network": "Сталася мережева помилка.\nБудь ласка, спробуйте ще раз.",
"error_rate_limit": "Занадто багато запитів.\nБудь ласка, зачекайте трохи і спробуйте знову.",
"error_too_long": "Це відео занадто довге.\nМаксимальна тривалість — 30 хвилин.",
diff --git a/data/locale/vi.json b/data/locale/vi.json
index dfc3864..11b305f 100644
--- a/data/locale/vi.json
+++ b/data/locale/vi.json
@@ -38,6 +38,7 @@
"error_deleted": "Video này đã bị người tạo xóa.",
"error_private": "Video này ở chế độ riêng tư và không thể truy cập.",
"error_region": "Video này không khả dụng ở khu vực của bạn.",
+ "error_invalid_link": "Liên kết video không hợp lệ.\nVui lòng kiểm tra liên kết và thử lại.",
"error_network": "Đã xảy ra lỗi mạng.\nVui lòng thử lại.",
"error_rate_limit": "Quá nhiều yêu cầu.\nVui lòng đợi một chút và thử lại.",
"error_too_long": "Video này quá dài.\nThời lượng tối đa cho phép là 30 phút.",
diff --git a/handlers/get_inline.py b/handlers/get_inline.py
index 3920355..474b937 100644
--- a/handlers/get_inline.py
+++ b/handlers/get_inline.py
@@ -129,9 +129,9 @@ async def update_inline_status(attempt: int):
await asyncio.sleep(0.5)
try:
- # Use queue with bypass_user_limit=True for inline downloads
- # Inline downloads bypass the per-user queue limit
- async with queue.info_queue(user_id, bypass_user_limit=True) as acquired:
+ # Use queue with bypass=True for inline downloads
+ # Inline downloads bypass the per-user queue entirely
+ async with queue.user_queue(user_id, bypass=True) as acquired:
if not acquired:
# This shouldn't happen with bypass, but handle anyway
await bot.edit_message_text(
@@ -148,8 +148,6 @@ async def update_inline_status(attempt: int):
)
if video_info.is_slideshow: # Process image
- # Clean up resources before returning (close YDL context)
- video_info.close()
return await bot.edit_message_text(
inline_message_id=message_id, text=locale[lang]["only_video_supported"]
)
@@ -170,10 +168,6 @@ async def update_inline_status(attempt: int):
full_name=full_name,
)
- # Clean up video_info resources (videos already closed in video() method,
- # but call close() for safety - it's idempotent)
- video_info.close()
-
try: # Try to write log into database
# Write log into database
await add_video(
diff --git a/handlers/get_video.py b/handlers/get_video.py
index 48a9f07..955c93d 100644
--- a/handlers/get_video.py
+++ b/handlers/get_video.py
@@ -2,7 +2,7 @@
from aiogram import Router, F
from aiogram.exceptions import TelegramBadRequest
-from aiogram.types import Message, ReactionTypeEmoji, CallbackQuery
+from aiogram.types import Message, ReactionTypeEmoji
from aiogram.utils.keyboard import InlineKeyboardBuilder
from data.config import locale, second_ids, monetag_url, config
@@ -25,19 +25,6 @@
# Note: Must use valid Telegram reaction emojis (🔄 and ⏳ are not valid)
RETRY_EMOJIS = ["👀", "🤔", "🙏"]
-# Callback data prefix for retry button
-RETRY_CALLBACK_PREFIX = "retry_video"
-
-
-def try_again_button(lang: str):
- """Create a 'Try Again' button for queue full error."""
- keyb = InlineKeyboardBuilder()
- keyb.button(
- text=locale[lang]["try_again_button"],
- callback_data=RETRY_CALLBACK_PREFIX,
- )
- return keyb.as_markup()
-
@video_router.message(F.text)
async def send_tiktok_video(message: Message):
@@ -63,12 +50,12 @@ async def send_tiktok_video(message: Message):
else: # Set lang and file mode if in DB
lang, file_mode = settings
- # Get queue manager and retry config
+ # Get queue manager
queue = QueueManager.get_instance()
retry_config = config["queue"]
try:
- # Check if link is valid
+ # Check if link is valid (BEFORE queue - quick check)
video_link, is_mobile = await api.regex_check(message.text)
# If not valid
if video_link is None:
@@ -77,13 +64,16 @@ async def send_tiktok_video(message: Message):
await message.reply(locale[lang]["link_error"])
return
- # Check per-user queue limit before proceeding
- user_queue_count = queue.get_user_queue_count(message.chat.id)
- if user_queue_count >= retry_config["max_user_queue_size"]:
+ # Check queue limit BEFORE showing reaction
+ if (
+ queue.get_user_queue_count(message.chat.id)
+ >= retry_config["max_user_queue_size"]
+ ):
if not group_chat:
await message.reply(
- locale[lang]["error_queue_full"].format(user_queue_count),
- reply_markup=try_again_button(lang),
+ locale[lang]["error_queue_full"].format(
+ queue.get_user_queue_count(message.chat.id)
+ )
)
return
@@ -114,18 +104,22 @@ async def update_retry_status(attempt: int):
except Exception as e:
logging.warning(f"Failed to update retry emoji to {emoji}: {e}")
- # Acquire info queue slot with per-user limit
- async with queue.info_queue(message.chat.id) as acquired:
+ # ENTIRE operation inside queue - waits silently for turn
+ async with queue.user_queue(message.chat.id) as acquired:
if not acquired:
- # User limit exceeded (shouldn't happen due to pre-check, but handle anyway)
+ # Queue full (race condition - another request got in first)
if status_message:
await status_message.delete()
+ else:
+ try:
+ await message.react([])
+ except TelegramBadRequest:
+ pass
if not group_chat:
await message.reply(
locale[lang]["error_queue_full"].format(
queue.get_user_queue_count(message.chat.id)
- ),
- reply_markup=try_again_button(lang),
+ )
)
return
@@ -150,19 +144,16 @@ async def update_retry_status(attempt: int):
await message.reply(get_error_message(e, lang))
return
- # Successfully got video info - show processing emoji
- if not status_message:
- try:
- await message.react(
- [ReactionTypeEmoji(emoji="👨💻")], disable_notification=True
- )
- except TelegramBadRequest:
- logging.debug("Failed to set processing reaction")
+ # Successfully got video info - show processing emoji
+ if not status_message:
+ try:
+ await message.react(
+ [ReactionTypeEmoji(emoji="👨💻")], disable_notification=True
+ )
+ except TelegramBadRequest:
+ logging.debug("Failed to set processing reaction")
- # Use try/finally to ensure video_info resources are cleaned up
- # (especially download context for slideshows)
- try:
- # Send video/images (no global send queue - per-user limit only)
+ # Send video/images (INSIDE queue - next request waits)
if video_info.is_slideshow: # Process images
# Send upload image action
await bot.send_chat_action(
@@ -260,9 +251,6 @@ async def update_retry_status(attempt: int):
except Exception as e:
logging.error("Can't write into database")
logging.error(e)
- finally:
- # Clean up video_info resources (closes YDL context for slideshows)
- video_info.close()
except Exception as e: # If something went wrong
error_text = error_catch(e)
@@ -283,32 +271,3 @@ async def update_retry_status(attempt: int):
logging.debug("Failed to update UI during error cleanup")
except Exception as cleanup_err:
logging.warning(f"Unexpected error during cleanup: {cleanup_err}")
-
-
-@video_router.callback_query(F.data == RETRY_CALLBACK_PREFIX)
-async def handle_retry_callback(callback: CallbackQuery):
- """Handle 'Try Again' button click for queue full error."""
- # Ensure callback.message exists and is accessible
- if not callback.message or not hasattr(callback.message, "reply_to_message"):
- await callback.answer("Message not accessible", show_alert=True)
- return
-
- # Get the original message that contains the TikTok link
- original_message = callback.message.reply_to_message
-
- if not original_message or not original_message.text:
- await callback.answer("Original message not found", show_alert=True)
- return
-
- # Delete the error message with the button
- try:
- if hasattr(callback.message, "delete"):
- await callback.message.delete()
- except TelegramBadRequest:
- logging.debug("Retry button message already deleted")
-
- # Answer the callback to remove loading state
- await callback.answer()
-
- # Re-process the original message
- await send_tiktok_video(original_message)
diff --git a/main.py b/main.py
index 2698dd2..f34dfd3 100644
--- a/main.py
+++ b/main.py
@@ -18,13 +18,8 @@
async def main() -> None:
await setup_db(config["bot"]["db_url"])
- # Configure TikTokClient executor size for high throughput
- # Must be called before any TikTokClient is instantiated
- TikTokClient.set_executor_size(config["performance"]["thread_pool_size"])
logging.info(
- f"TikTokClient configured: executor={config['performance']['thread_pool_size']} workers, "
- f"aiohttp_pool={config['performance']['aiohttp_pool_size']}, "
- f"limit_per_host={config['performance']['aiohttp_limit_per_host']}"
+ f"TikTokClient configured: curl_pool={config['performance']['curl_pool_size']}"
)
# Initialize proxy manager if configured
@@ -35,6 +30,9 @@ async def main() -> None:
)
logging.info("Proxy manager initialized")
+ # Initialize TikTok client session early to avoid race conditions
+ await TikTokClient.initialize_session()
+
scheduler.start()
dp.include_routers(
user_router,
@@ -52,12 +50,12 @@ async def main() -> None:
await dp.start_polling(bot)
finally:
# Cleanup shared resources on shutdown
- logging.info("Shutting down: cleaning up TikTokClient resources...")
- await TikTokClient.close_curl_session() # curl_cffi session for media downloads
- await TikTokClient.close_connector() # aiohttp connector for URL resolution
- TikTokClient.shutdown_executor()
+ logging.info("Shutting down: cleaning up resources...")
+ await (
+ TikTokClient.close_curl_session()
+ ) # curl_cffi session for all TikTok operations
await close_http_session() # aiohttp session for thumbnail/cover downloads
- logging.info("TikTokClient resources cleaned up")
+ logging.info("Resources cleaned up")
if __name__ == "__main__":
diff --git a/misc/queue_manager.py b/misc/queue_manager.py
index b685c6a..185c0f4 100644
--- a/misc/queue_manager.py
+++ b/misc/queue_manager.py
@@ -1,4 +1,4 @@
-"""Global queue manager for controlling concurrent operations."""
+"""Sequential per-user queue manager."""
from __future__ import annotations
@@ -14,138 +14,113 @@
class QueueManager:
"""
- Singleton queue manager for controlling concurrent operations.
+ Sequential per-user queue with max depth.
- Features:
- - Per-user tracking for info queue (limits concurrent requests per user)
- - Bypass option for inline downloads
+ Each user's requests are processed one at a time (sequential).
+ If user exceeds max_queue_size, new requests are rejected.
Usage:
queue = QueueManager.get_instance()
- # For regular video downloads (with per-user limit):
- async with queue.info_queue(user_id) as acquired:
+ async with queue.user_queue(user_id) as acquired:
if not acquired:
- # User limit exceeded, show error
+ # Queue full, show error
return
- video_info = await api.video_with_retry(...)
-
- # For inline downloads (bypass per-user limit):
- async with queue.info_queue(user_id, bypass_user_limit=True) as acquired:
- video_info = await api.video_with_retry(...)
+ # Process video (fetch + send) - next request waits
"""
_instance: QueueManager | None = None
- def __init__(self, max_user_queue: int):
- """
- Initialize the queue manager.
-
- Args:
- max_user_queue: Maximum videos per user in info queue
- """
- self.max_user_queue = max_user_queue
- self._user_info_counts: dict[int, int] = {}
- self._lock = asyncio.Lock()
+ def __init__(self, max_queue_size: int = 3):
+ self.max_queue_size = max_queue_size
+ self._user_locks: dict[int, asyncio.Lock] = {}
+ self._user_queue_counts: dict[int, int] = {}
+ self._dict_lock = asyncio.Lock() # Protects dicts
- logger.info(f"QueueManager initialized: max_user_queue={max_user_queue}")
+ logger.info(f"QueueManager initialized: max_queue_size={max_queue_size}")
@classmethod
def get_instance(cls) -> QueueManager:
"""Get or create the singleton instance."""
if cls._instance is None:
- queue_config = config["queue"]
cls._instance = cls(
- max_user_queue=queue_config["max_user_queue_size"],
+ max_queue_size=config["queue"]["max_user_queue_size"],
)
return cls._instance
@classmethod
def reset_instance(cls) -> None:
- """Reset the singleton instance (useful for testing)."""
+ """Reset singleton (for testing)."""
cls._instance = None
def get_user_queue_count(self, user_id: int) -> int:
"""Get current queue count for a user."""
- return self._user_info_counts.get(user_id, 0)
-
- async def acquire_info_for_user(
- self, user_id: int, bypass_user_limit: bool = False
- ) -> bool:
- """
- Acquire info slot for a user.
-
- Args:
- user_id: Telegram user/chat ID
- bypass_user_limit: If True, skip per-user limit check (for inline)
-
- Returns:
- True if acquired successfully, False if user limit exceeded
- """
- async with self._lock:
- if not bypass_user_limit:
- current_count = self._user_info_counts.get(user_id, 0)
- if current_count >= self.max_user_queue:
- logger.debug(
- f"User {user_id} rejected: {current_count}/{self.max_user_queue} in queue"
- )
- return False
-
- # Increment user count
- self._user_info_counts[user_id] = self._user_info_counts.get(user_id, 0) + 1
-
- logger.debug(
- f"User {user_id} acquired info slot "
- f"(user_count={self._user_info_counts.get(user_id, 0)})"
- )
- return True
-
- async def release_info_for_user(self, user_id: int) -> None:
- """Release info slot for a user.
-
- This method is async to properly acquire the lock and prevent
- race conditions when multiple coroutines release concurrently.
- """
- async with self._lock:
- if user_id in self._user_info_counts:
- self._user_info_counts[user_id] -= 1
- if self._user_info_counts[user_id] <= 0:
- del self._user_info_counts[user_id]
-
- logger.debug(
- f"User {user_id} released info slot "
- f"(user_count={self._user_info_counts.get(user_id, 0)})"
- )
+ return self._user_queue_counts.get(user_id, 0)
+
+ async def _get_user_lock(self, user_id: int) -> asyncio.Lock:
+ """Get or create lock for user."""
+ async with self._dict_lock:
+ if user_id not in self._user_locks:
+ self._user_locks[user_id] = asyncio.Lock()
+ return self._user_locks[user_id]
+
+ async def _increment_count(self, user_id: int) -> bool:
+ """Increment queue count. Returns False if at max."""
+ async with self._dict_lock:
+ current = self._user_queue_counts.get(user_id, 0)
+ if current >= self.max_queue_size:
+ return False
+ self._user_queue_counts[user_id] = current + 1
+ logger.debug(f"User {user_id} queue: {current + 1}/{self.max_queue_size}")
+ return True
+
+ async def _decrement_count(self, user_id: int) -> None:
+ """Decrement queue count and cleanup lock if no longer needed."""
+ async with self._dict_lock:
+ if user_id in self._user_queue_counts:
+ self._user_queue_counts[user_id] -= 1
+ if self._user_queue_counts[user_id] <= 0:
+ del self._user_queue_counts[user_id]
+ # Clean up lock when user has no active requests
+ # This prevents unbounded memory growth with many unique users
+ if user_id in self._user_locks:
+ del self._user_locks[user_id]
+ logger.debug(
+ f"User {user_id} queue: {self._user_queue_counts.get(user_id, 0)}/{self.max_queue_size}"
+ )
@asynccontextmanager
- async def info_queue(
- self, user_id: int, bypass_user_limit: bool = False
+ async def user_queue(
+ self, user_id: int, bypass: bool = False
) -> AsyncGenerator[bool, None]:
"""
- Context manager for info queue with per-user limiting.
+ Sequential queue with max depth.
Args:
user_id: Telegram user/chat ID
- bypass_user_limit: If True, skip per-user limit check (for inline)
+ bypass: If True, skip queue entirely (for inline mode)
Yields:
- True if acquired successfully, False if user limit exceeded
-
- Usage:
- async with queue.info_queue(user_id) as acquired:
- if not acquired:
- await message.reply("Queue full, please wait...")
- return
- # Do work...
+ True if acquired, False if queue full
"""
- acquired = await self.acquire_info_for_user(user_id, bypass_user_limit)
+ if bypass:
+ yield True
+ return
+
+ # Check if queue is full BEFORE waiting
+ if not await self._increment_count(user_id):
+ yield False # Queue full
+ return
+
+ # Get user's lock and wait for turn
+ lock = await self._get_user_lock(user_id)
try:
- yield acquired
+ async with lock:
+ yield True # Acquired, process request
finally:
- if acquired:
- await self.release_info_for_user(user_id)
+ await self._decrement_count(user_id)
@property
def active_users_count(self) -> int:
- """Number of users currently with items in the info queue."""
- return len(self._user_info_counts)
+ """Number of users with items in queue."""
+ return len(self._user_queue_counts)
diff --git a/misc/video_types.py b/misc/video_types.py
index a300061..ca45642 100644
--- a/misc/video_types.py
+++ b/misc/video_types.py
@@ -27,6 +27,7 @@
TikTokRateLimitError,
TikTokRegionError,
TikTokExtractionError,
+ TikTokInvalidLinkError,
TikTokVideoTooLongError,
VideoInfo,
MusicInfo,
@@ -212,6 +213,8 @@ def get_error_message(error: TikTokError, lang: str) -> str:
return lang_dict.get("error_deleted", "This video has been deleted.")
elif isinstance(error, TikTokPrivateError):
return lang_dict.get("error_private", "This video is private.")
+ elif isinstance(error, TikTokInvalidLinkError):
+ return lang_dict.get("error_invalid_link", "Invalid video link.")
elif isinstance(error, TikTokNetworkError):
return lang_dict.get("error_network", "Network error occurred.")
elif isinstance(error, TikTokRateLimitError):
diff --git a/tiktok_api/__init__.py b/tiktok_api/__init__.py
index 240ab14..41ca412 100644
--- a/tiktok_api/__init__.py
+++ b/tiktok_api/__init__.py
@@ -1,7 +1,7 @@
"""TikTok API client for extracting video and music information.
This module provides a clean interface to extract TikTok video/slideshow data
-and music information using yt-dlp internally.
+and music information using curl_cffi with browser impersonation.
Example:
>>> from tiktok_api import TikTokClient, ProxyManager, VideoInfo, TikTokDeletedError
@@ -25,6 +25,7 @@
TikTokDeletedError,
TikTokError,
TikTokExtractionError,
+ TikTokInvalidLinkError,
TikTokNetworkError,
TikTokPrivateError,
TikTokRateLimitError,
@@ -51,5 +52,6 @@
"TikTokRateLimitError",
"TikTokRegionError",
"TikTokExtractionError",
+ "TikTokInvalidLinkError",
"TikTokVideoTooLongError",
]
diff --git a/tiktok_api/client.py b/tiktok_api/client.py
index 03ba449..fece9ae 100644
--- a/tiktok_api/client.py
+++ b/tiktok_api/client.py
@@ -1,35 +1,27 @@
"""TikTok API client for extracting video and music information."""
import asyncio
+import http.cookiejar
+import json
import logging
import os
import random
import re
-import threading
-from concurrent.futures import ThreadPoolExecutor
-from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Tuple
-
-# Type alias for download progress callback: (bytes_downloaded, total_bytes or None)
-ProgressCallback = Callable[[int, Optional[int]], None]
-
-import aiohttp
-from aiohttp import TCPConnector, ClientTimeout
-import yt_dlp
-
-# Dynamic import of yt-dlp bypass mechanisms (updates automatically with yt-dlp)
-from yt_dlp.utils import std_headers as YTDLP_STD_HEADERS
+from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional
+from urllib.parse import urlparse
# curl_cffi for browser impersonation (TLS fingerprint bypass)
import curl_cffi
from curl_cffi.requests import AsyncSession as CurlAsyncSession
from curl_cffi import CurlError
-# Import yt-dlp's browser targets for dynamic impersonation
-# This ensures impersonation targets update automatically with yt-dlp
+# Static imports from yt-dlp - only used for headers and browser targets
+# These are static values, no yt-dlp runtime execution
+from yt_dlp.utils import std_headers as YTDLP_STD_HEADERS
+
try:
from yt_dlp.networking._curlcffi import BROWSER_TARGETS, _TARGETS_COMPAT_LOOKUP
except ImportError:
- # Fallback if yt-dlp structure changes or curl_cffi not available during import
BROWSER_TARGETS = {}
_TARGETS_COMPAT_LOOKUP = {}
@@ -37,6 +29,7 @@
TikTokDeletedError,
TikTokError,
TikTokExtractionError,
+ TikTokInvalidLinkError,
TikTokNetworkError,
TikTokPrivateError,
TikTokRateLimitError,
@@ -50,139 +43,111 @@
logger = logging.getLogger(__name__)
-# Regex for extracting video ID from redirected URLs (used by legacy get_id functions)
-_redirect_regex = re.compile(r"https?://[^\s]+tiktok\.com/[^\s]+?/([0-9]+)")
+# Maximum retries for each operation step
+MAX_RETRIES = 3
+
+
+def _strip_proxy_auth(proxy_url: Optional[str]) -> str:
+ """Strip authentication info from proxy URL for safe logging."""
+ if proxy_url is None:
+ return "direct connection"
+
+ match = re.match(r"^((?:https?|socks5h?|socks4a?)://)(?:[^@]+@)?(.+)$", proxy_url)
+ if match:
+ scheme, host_port = match.groups()
+ return f"{scheme}{host_port}"
+
+ return proxy_url
class TikTokClient:
"""Client for extracting TikTok video and music information.
- This client uses yt-dlp internally to extract video/slideshow data and music
- from TikTok URLs. It supports both regular videos and slideshows (image posts).
-
- All media downloads (video, audio, images) use curl_cffi with browser
- impersonation (TLS fingerprint spoofing) derived from yt-dlp's BROWSER_TARGETS.
- This automatically updates when you update yt-dlp, ensuring compatibility
- with TikTok's anti-bot detection.
+ Uses curl_cffi with browser impersonation for all TikTok operations.
+ Proxies are sticky within a single video() or music() call - only rotated on retry.
Args:
- proxy_manager: Optional ProxyManager instance for round-robin proxy rotation.
- If provided, each request will use the next proxy in rotation.
- data_only_proxy: If True, proxy is used only for API extraction, not for
- media downloads. Defaults to False.
- cookies: Optional path to a Netscape-format cookies file (e.g., exported from browser).
- If not provided, uses YTDLP_COOKIES env var. If the file doesn't exist,
- a warning is logged and cookies are not used.
- aiohttp_pool_size: Total connection pool size for async downloads. Default: 200.
- aiohttp_limit_per_host: Per-host connection limit. Default: 50.
+ proxy_manager: Optional ProxyManager for proxy rotation.
+ data_only_proxy: If True, proxy is used only for API, not for media downloads.
+ cookies: Optional path to Netscape-format cookies file.
+ max_video_duration: Maximum video duration in seconds (default from config).
+ Videos longer than this raise TikTokVideoTooLongError.
+ long_video_threshold: Duration threshold in seconds (default: 60 = 1 min).
+ Videos longer than this include metadata (thumbnail, dimensions) in VideoInfo.
Example:
>>> from tiktok_api import TikTokClient, ProxyManager
>>> proxy_manager = ProxyManager.initialize("proxies.txt", include_host=True)
>>> client = TikTokClient(proxy_manager=proxy_manager, data_only_proxy=True)
>>> video_info = await client.video("https://www.tiktok.com/@user/video/123")
- >>> print(video_info.author)
- >>> print(video_info.duration)
-
- # With cookies for authenticated requests:
- >>> client = TikTokClient(cookies="cookies.txt")
"""
- # Configurable executor - call set_executor_size() before first use
- _executor: Optional[ThreadPoolExecutor] = None
- _executor_lock = threading.Lock()
- _executor_size: int = 128 # Default, configurable via set_executor_size()
-
- _aiohttp_connector: Optional[TCPConnector] = None
- _connector_lock = threading.Lock()
-
- # curl_cffi session for browser-impersonated media downloads
+ # Shared curl_cffi session (class-level singleton)
_curl_session: Optional[CurlAsyncSession] = None
- _curl_session_lock = threading.Lock()
_impersonate_target: Optional[str] = None
+ _session_lock: Optional[asyncio.Lock] = None
@classmethod
- def _get_impersonate_target(cls) -> str:
- """Get the best impersonation target from yt-dlp's BROWSER_TARGETS.
-
- Uses the same priority as yt-dlp:
- 1. Prioritize desktop over mobile (non-ios, non-android)
- 2. Prioritize Chrome > Safari > Firefox > Edge > Tor
- 3. Prioritize newest version
+ def _get_session_lock(cls) -> asyncio.Lock:
+ """Get or create session lock. Safe because Lock() creation is atomic."""
+ if cls._session_lock is None:
+ cls._session_lock = asyncio.Lock()
+ return cls._session_lock
- This ensures the impersonation target updates automatically when you
- update yt-dlp, without any hardcoded values.
-
- Returns:
- curl_cffi-compatible impersonate string (e.g., "chrome136")
- """
- import itertools
-
- # Get curl_cffi version as tuple for comparison
+ @classmethod
+ def _get_impersonate_target(cls) -> str:
+ """Get the best impersonation target from yt-dlp's BROWSER_TARGETS."""
try:
curl_cffi_version = tuple(
int(x) for x in curl_cffi.__version__.split(".")[:2]
)
except (ValueError, AttributeError):
- curl_cffi_version = (0, 9) # Minimum supported version
+ curl_cffi_version = (0, 9)
- # Collect all available targets for our curl_cffi version
available_targets: dict[str, Any] = {}
for version, targets in BROWSER_TARGETS.items():
if curl_cffi_version >= version:
available_targets.update(targets)
if not available_targets:
- # Fallback to a common target if BROWSER_TARGETS is empty
- logger.warning(
- "No BROWSER_TARGETS available from yt-dlp, using 'chrome' fallback"
- )
+ logger.warning("No BROWSER_TARGETS available, using 'chrome' fallback")
return "chrome"
- # Sort by yt-dlp's priority (same logic as _curlcffi.py)
- # This ensures we pick the same target yt-dlp would use
+ # Sort by yt-dlp's priority (desktop > mobile, Chrome > Safari > Firefox)
sorted_targets = sorted(
available_targets.items(),
key=lambda x: (
- # deprioritize mobile targets since they give very different behavior
x[1].os not in ("ios", "android"),
- # prioritize tor < edge < firefox < safari < chrome
("tor", "edge", "firefox", "safari", "chrome").index(x[1].client)
if x[1].client in ("tor", "edge", "firefox", "safari", "chrome")
else -1,
- # prioritize newest version
float(x[1].version) if x[1].version else 0,
- # group by os name
x[1].os or "",
),
reverse=True,
)
- # Get the best target name
best_name = sorted_targets[0][0]
- # Apply compatibility lookup for older curl_cffi versions
if curl_cffi_version < (0, 11):
best_name = _TARGETS_COMPAT_LOOKUP.get(best_name, best_name)
- logger.debug(
- f"Selected impersonation target: {best_name} "
- f"(curl_cffi {curl_cffi.__version__})"
- )
+ logger.debug(f"Selected impersonation target: {best_name}")
return best_name
@classmethod
- def _get_curl_session(cls) -> CurlAsyncSession:
- """Get or create shared curl_cffi AsyncSession with browser impersonation.
+ async def _get_curl_session(cls) -> CurlAsyncSession:
+ """Get or create shared curl_cffi AsyncSession (async-safe).
- The session uses yt-dlp's BROWSER_TARGETS to select the best impersonation
- target, ensuring TLS fingerprint matches a real browser.
-
- Pool size is configurable via CURL_POOL_SIZE environment variable.
+ Uses double-checked locking to ensure only one session is created
+ even under concurrent first-use from multiple coroutines.
"""
- with cls._curl_session_lock:
- # Check if session needs to be created
- # Note: CurlAsyncSession doesn't have is_closed, we track via _curl_session being None
+ # Fast path - already initialized
+ if cls._curl_session is not None:
+ return cls._curl_session
+
+ # Slow path - acquire lock and double-check
+ async with cls._get_session_lock():
if cls._curl_session is None:
from data.config import config
@@ -191,19 +156,30 @@ def _get_curl_session(cls) -> CurlAsyncSession:
cls._impersonate_target = cls._get_impersonate_target()
cls._curl_session = CurlAsyncSession(
- impersonate=cls._impersonate_target,
+ impersonate=cls._impersonate_target, # type: ignore[arg-type]
max_clients=pool_size,
)
logger.info(
- f"Created curl_cffi session with impersonate={cls._impersonate_target}, "
+ f"Created curl_cffi session: impersonate={cls._impersonate_target}, "
f"max_clients={pool_size}"
)
- return cls._curl_session
+ return cls._curl_session
+
+ @classmethod
+ async def initialize_session(cls) -> None:
+ """Initialize the shared curl_cffi session.
+
+ Call this at application startup to ensure the session is created
+ before any concurrent requests. This avoids the lock overhead during
+ normal operation.
+ """
+ await cls._get_curl_session()
+ logger.info("TikTokClient session initialized")
@classmethod
async def close_curl_session(cls) -> None:
"""Close shared curl_cffi session. Call on application shutdown."""
- with cls._curl_session_lock:
+ async with cls._get_session_lock():
session = cls._curl_session
cls._curl_session = None
cls._impersonate_target = None
@@ -213,1144 +189,639 @@ async def close_curl_session(cls) -> None:
except Exception as e:
logger.debug(f"Error closing curl_cffi session: {e}")
- @classmethod
- def set_executor_size(cls, size: int) -> None:
- """Set executor size before first use. Call at app startup.
-
- Args:
- size: Number of worker threads for sync yt-dlp extraction calls.
- Higher values allow more concurrent extractions.
- """
- with cls._executor_lock:
- if cls._executor is not None:
- logger.warning(
- f"Executor already created with {cls._executor._max_workers} workers. "
- f"set_executor_size({size}) has no effect. Call before first TikTokClient use."
- )
- return
- cls._executor_size = size
- logger.info(f"TikTokClient executor size set to {size}")
-
- @classmethod
- def _get_executor(cls) -> ThreadPoolExecutor:
- """Get or create the shared ThreadPoolExecutor."""
- with cls._executor_lock:
- if cls._executor is None:
- cls._executor = ThreadPoolExecutor(
- max_workers=cls._executor_size,
- thread_name_prefix="tiktok_sync_",
- )
- logger.info(
- f"Created TikTokClient executor with {cls._executor_size} workers"
- )
- return cls._executor
-
- @classmethod
- def _get_connector(cls, pool_size: int, limit_per_host: int = 50) -> TCPConnector:
- """Get or create shared aiohttp connector for URL resolution.
-
- Note: This is only used for resolving short URLs (vm.tiktok.com, vt.tiktok.com).
- Media downloads use curl_cffi for browser impersonation.
-
- Args:
- pool_size: Total connection pool size
- limit_per_host: Maximum connections per host
- """
- with cls._connector_lock:
- if cls._aiohttp_connector is None or cls._aiohttp_connector.closed:
- cls._aiohttp_connector = TCPConnector(
- limit=pool_size,
- limit_per_host=limit_per_host,
- ttl_dns_cache=300,
- enable_cleanup_closed=True,
- force_close=False, # Keep connections alive for reuse
- )
- return cls._aiohttp_connector
-
- @classmethod
- async def close_connector(cls) -> None:
- """Close shared aiohttp connector. Call on application shutdown."""
- # Grab and clear connector under lock
- with cls._connector_lock:
- connector = cls._aiohttp_connector
- cls._aiohttp_connector = None
- # Close outside lock to avoid blocking
- if connector and not connector.closed:
- await connector.close()
-
- @classmethod
- def shutdown_executor(cls) -> None:
- """Shutdown the shared executor. Call on application shutdown."""
- with cls._executor_lock:
- if cls._executor is not None:
- cls._executor.shutdown(wait=False)
- cls._executor = None
-
def __init__(
self,
proxy_manager: Optional["ProxyManager"] = None,
data_only_proxy: bool = False,
cookies: Optional[str] = None,
+ max_video_duration: Optional[int] = None,
+ long_video_threshold: int = 60,
+ # Legacy parameters - kept for compatibility but unused
aiohttp_pool_size: int = 200,
aiohttp_limit_per_host: int = 50,
):
self.proxy_manager = proxy_manager
self.data_only_proxy = data_only_proxy
- self.aiohttp_pool_size = aiohttp_pool_size
- self.aiohttp_limit_per_host = aiohttp_limit_per_host
-
- # Handle cookies with validation
- cookies_path = cookies or os.getenv("YTDLP_COOKIES")
- if cookies_path:
- # Convert relative path to absolute path
- if not os.path.isabs(cookies_path):
- cookies_path = os.path.abspath(cookies_path)
-
- if os.path.isfile(cookies_path):
- self.cookies = cookies_path
- else:
- logger.warning(
- f"Cookie file not found: {cookies_path} - cookies will not be used"
- )
- self.cookies = None
+ self.long_video_threshold = long_video_threshold
+
+ # Get max_video_duration from config if not specified
+ if max_video_duration is None:
+ from data.config import config
+
+ perf_config = config.get("performance", {})
+ self.max_video_duration = perf_config.get("max_video_duration", 1800)
else:
- self.cookies = None
+ self.max_video_duration = max_video_duration
+
+ # Load cookies from Netscape file
+ self._cookies: dict[str, str] = {}
+ if cookies is None:
+ from data.config import config
+
+ cookies = config.get("tiktok", {}).get("cookies_file", "")
+ if cookies:
+ self._load_cookies(cookies)
+ # URL patterns
self.mobile_regex = re.compile(r"https?://[^\s]+tiktok\.com/[^\s]+")
self.web_regex = re.compile(r"https?://www\.tiktok\.com/@[^\s]+?/video/[0-9]+")
self.photo_regex = re.compile(
r"https?://www\.tiktok\.com/@[^\s]+?/photo/[0-9]+"
)
- self.mus_regex = re.compile(r"https?://www\.tiktok\.com/music/[^\s]+")
- def _get_proxy_info(self) -> str:
- """Get proxy configuration info for logging."""
- if self.proxy_manager:
- count = self.proxy_manager.get_proxy_count()
- return f"rotating ({count} proxies)"
- return "None"
+ # Data extraction patterns (compiled for performance)
+ # Use .*? with DOTALL to handle JSON containing '<' characters (e.g., "I <3 TikTok")
+ self._universal_data_re = re.compile(
+ r'',
+ re.DOTALL,
+ )
+ self._sigi_state_re = re.compile(
+ r'',
+ re.DOTALL,
+ )
+ self._next_data_re = re.compile(
+ r'',
+ re.DOTALL,
+ )
- def _get_bypass_headers(self, referer_url: str) -> dict[str, str]:
- """Get bypass headers dynamically from yt-dlp.
+ def _load_cookies(self, cookies_path: str) -> None:
+ """Load cookies from Netscape-format file."""
+ if not os.path.isabs(cookies_path):
+ cookies_path = os.path.abspath(cookies_path)
- Uses yt-dlp's standard headers which are updated with each yt-dlp release.
- We add Origin and Referer for CORS compliance with TikTok CDN.
+ if not os.path.isfile(cookies_path):
+ logger.warning(f"Cookie file not found: {cookies_path}")
+ return
- Args:
- referer_url: The referer URL to set in headers
+ try:
+ jar = http.cookiejar.MozillaCookieJar(cookies_path)
+ jar.load(ignore_discard=True, ignore_expires=True)
+ for cookie in jar:
+ if "tiktok" in (cookie.domain or "").lower():
+ self._cookies[cookie.name] = cookie.value
+ if self._cookies:
+ logger.info(f"Loaded {len(self._cookies)} cookies from {cookies_path}")
+ except Exception as e:
+ logger.warning(f"Failed to load cookies from {cookies_path}: {e}")
- Returns:
- Dict of headers for media download
- """
- headers = dict(YTDLP_STD_HEADERS) # Copy to avoid mutation
+ def _get_headers(
+ self, referer_url: str = "https://www.tiktok.com/"
+ ) -> dict[str, str]:
+ """Get request headers with browser-like values."""
+ headers = dict(YTDLP_STD_HEADERS)
headers["Referer"] = referer_url
headers["Origin"] = "https://www.tiktok.com"
headers["Accept"] = "*/*"
- # Avoid hardcoding Sec-Fetch-* headers; incorrect values can break
- # audio/video downloads. Let the browser/client defaults apply.
return headers
- def _get_cookies_from_context(
- self, download_context: dict[str, Any], media_url: Optional[str] = None
- ) -> dict[str, str]:
- """Extract cookies from yt-dlp context for media download.
-
- Uses yt-dlp's InfoExtractor to get cookies for the specific media URL.
- This properly handles TikTok's cookie propagation to CDN domains.
-
- Args:
- download_context: Dict containing 'ydl', 'ie' with YoutubeDL instance
- media_url: Optional media URL to get cookies for (for CDN domain matching)
+ def _get_initial_proxy(self) -> Optional[str]:
+ """Get initial proxy for a request."""
+ if self.proxy_manager:
+ return self.proxy_manager.get_next_proxy()
+ return None
- Returns:
- Dict of cookie name -> value
- """
- cookies: dict[str, str] = {}
- try:
- ie = download_context.get("ie")
- if ie:
- # Get cookies from TikTok main domain first
- tiktok_cookies = ie._get_cookies("https://www.tiktok.com/")
- for cookie_name, cookie in tiktok_cookies.items():
- cookies[cookie_name] = cookie.value
-
- # If media_url is provided, also get cookies for that specific domain
- if media_url:
- media_cookies = ie._get_cookies(media_url)
- for cookie_name, cookie in media_cookies.items():
- cookies[cookie_name] = cookie.value
- else:
- # Fallback: extract all cookies from cookiejar
- ydl = download_context.get("ydl")
- if ydl and hasattr(ydl, "cookiejar"):
- for cookie in ydl.cookiejar:
- cookies[cookie.name] = cookie.value
- except Exception as e:
- logger.debug(f"Failed to extract cookies from context: {e}")
- return cookies
+ def _rotate_proxy(self, current_proxy: Optional[str]) -> Optional[str]:
+ """Rotate to next proxy on failure."""
+ if self.proxy_manager:
+ new_proxy = self.proxy_manager.get_next_proxy()
+ logger.debug(
+ f"Rotating proxy: {_strip_proxy_auth(current_proxy)} -> "
+ f"{_strip_proxy_auth(new_proxy)}"
+ )
+ return new_proxy
+ return current_proxy
- async def _download_media_async(
- self,
- media_url: str,
- download_context: dict[str, Any],
- duration: Optional[int] = None,
- max_retries: int = 3,
- base_delay: float = 1.0,
- chunk_size: int = 65536,
- progress_callback: Optional[ProgressCallback] = None,
- ) -> Optional[bytes]:
- """Download media asynchronously using curl_cffi with browser impersonation.
+ # -------------------------------------------------------------------------
+ # URL Resolution
+ # -------------------------------------------------------------------------
- Features:
- - Uses yt-dlp's BROWSER_TARGETS for TLS fingerprint impersonation
- - Conditional streaming for long videos (> threshold) to reduce memory spikes
- - Automatic retry with exponential backoff for CDN failures
- - Optional progress callback for download monitoring
+ async def _resolve_short_url(self, url: str, proxy: Optional[str]) -> str:
+ """Resolve short URLs (vm.tiktok.com, vt.tiktok.com, /t/) to full URLs."""
+ parsed = urlparse(url)
+ host = (parsed.hostname or "").lower()
+ path = parsed.path or ""
- Args:
- media_url: Direct URL to the media on TikTok CDN
- download_context: Dict containing 'ydl', 'ie', 'referer_url', and 'proxy'
- duration: Video duration in seconds. If > threshold, uses streaming download.
- max_retries: Maximum retry attempts for retryable errors (default: 3)
- base_delay: Base delay in seconds for exponential backoff (default: 1.0)
- chunk_size: Chunk size for streaming downloads in bytes (default: 64KB)
- progress_callback: Optional callback(downloaded_bytes, total_bytes) for progress
+ is_short_url = host in {"vm.tiktok.com", "vt.tiktok.com"} or (
+ host in {"www.tiktok.com", "tiktok.com"} and path.startswith("/t/")
+ )
- Returns:
- Media bytes if successful, None otherwise
- """
- from data.config import config
+ if not is_short_url:
+ return url
- perf_config = config.get("performance", {})
- streaming_threshold = perf_config.get("streaming_duration_threshold", 300)
+ session = await self._get_curl_session()
+ headers = self._get_headers()
- # Use streaming for long videos (> 5 minutes by default)
- use_streaming = duration is not None and duration > streaming_threshold
+ response = await session.get(
+ url,
+ headers=headers,
+ cookies=self._cookies,
+ proxy=proxy,
+ timeout=15,
+ allow_redirects=True,
+ )
- referer_url = download_context.get("referer_url", "https://www.tiktok.com/")
- headers = self._get_bypass_headers(referer_url)
+ final_url = str(response.url)
+ logger.debug(f"Resolved short URL: {url} -> {final_url}")
+ return final_url
- # Get cookies using yt-dlp's cookie handling for proper domain matching
- cookies = self._get_cookies_from_context(download_context, media_url)
+ def _extract_video_id(self, url: str) -> Optional[str]:
+ """Extract video ID from TikTok URL."""
+ match = re.search(r"/(?:video|photo)/(\d+)", url)
+ if match:
+ return match.group(1)
+ return None
- # Use proxy from context unless data_only_proxy is True
- proxy = None
- if not self.data_only_proxy:
- proxy = download_context.get("proxy")
+ async def _resolve_and_extract_id(
+ self, video_link: str, proxy: Optional[str]
+ ) -> tuple[str, str, Optional[str]]:
+ """Resolve URL and extract video ID with retries.
- session = self._get_curl_session()
+ Returns:
+ Tuple of (full_url, video_id, current_proxy)
+ """
+ current_proxy = proxy
- for attempt in range(1, max_retries + 1):
- response = None
+ for attempt in range(1, MAX_RETRIES + 1):
try:
- response = await session.get(
- media_url,
- headers=headers,
- cookies=cookies,
- proxy=proxy,
- timeout=60,
- allow_redirects=True,
- stream=use_streaming,
- )
+ full_url = await self._resolve_short_url(video_link, current_proxy)
+ video_id = self._extract_video_id(full_url)
- if response.status_code == 200:
- if use_streaming:
- # Stream in chunks for long videos to reduce memory spikes.
- # Note: We still buffer all chunks in memory before returning.
- # This is intentional for simplicity - the streaming reduces
- # peak memory during download by not loading the entire response
- # at once, but the final join still requires full size in memory.
- # For truly large files, consider streaming to disk instead.
- total_size = response.headers.get("content-length")
- total_size = int(total_size) if total_size else None
-
- chunks: list[bytes] = []
- downloaded = 0
- async for chunk in response.aiter_content(chunk_size):
- chunks.append(chunk)
- downloaded += len(chunk)
- if progress_callback:
- progress_callback(downloaded, total_size)
-
- if use_streaming and duration:
- logger.debug(
- f"Streamed {downloaded} bytes for {duration}s video"
- )
- return b"".join(chunks)
- else:
- # Direct content for short videos (faster, less overhead)
- return response.content
-
- elif response.status_code in (403, 429, 500, 502, 503, 504):
- # Retryable error - CDN issues, rate limiting, etc.
- if attempt < max_retries:
- delay = base_delay * (2 ** (attempt - 1))
- jitter = delay * 0.1 * random.random()
- logger.warning(
- f"CDN returned {response.status_code} for {media_url}, "
- f"retry {attempt}/{max_retries} after {delay:.1f}s"
- )
- await asyncio.sleep(delay + jitter)
- continue
- else:
- logger.error(
- f"Media download failed after {max_retries} attempts "
- f"with status {response.status_code} for {media_url}"
- )
- return None
- else:
- # Non-retryable error (e.g., 404)
- logger.error(
- f"Media download failed with status {response.status_code} "
- f"for {media_url}"
+ if not video_id:
+ raise TikTokExtractionError(
+ f"Could not extract video ID from {video_link}"
)
- return None
- except CurlError as e:
- if attempt < max_retries:
- delay = base_delay * (2 ** (attempt - 1))
+ return full_url, video_id, current_proxy
+
+ except (CurlError, TikTokExtractionError) as e:
+ if attempt < MAX_RETRIES:
+ current_proxy = self._rotate_proxy(current_proxy)
+ delay = 1.0 * (2 ** (attempt - 1))
logger.warning(
- f"curl_cffi error for {media_url}, "
- f"retry {attempt}/{max_retries} after {delay:.1f}s: {e}"
+ f"URL resolution attempt {attempt}/{MAX_RETRIES} failed: {e}"
)
await asyncio.sleep(delay)
continue
- logger.error(
- f"curl_cffi download failed after {max_retries} attempts "
- f"for {media_url}: {e}"
- )
- return None
-
- except Exception as e:
- # Unexpected errors - don't retry
- logger.error(f"Unexpected error downloading media {media_url}: {e}")
- return None
-
- finally:
- # Ensure response is properly closed to release connection back to pool.
- # curl_cffi responses should be closed to prevent connection leaks.
- if response is not None:
- try:
- response.close()
- except Exception:
- pass # Ignore errors during cleanup
-
- return None # Should not reach here, but satisfy type checker
-
- async def regex_check(
- self, video_link: str
- ) -> Tuple[Optional[str], Optional[bool]]:
- """Check if a link matches known TikTok URL patterns.
-
- Args:
- video_link: URL to check
-
- Returns:
- Tuple of (matched_link, is_mobile) where is_mobile indicates if it's
- a short/mobile URL. Returns (None, None) if no pattern matches.
- """
- if self.web_regex.search(video_link) is not None:
- link = self.web_regex.findall(video_link)[0]
- return link, False
- elif self.photo_regex.search(video_link) is not None:
- link = self.photo_regex.findall(video_link)[0]
- return link, False
- elif self.mobile_regex.search(video_link) is not None:
- link = self.mobile_regex.findall(video_link)[0]
- return link, True
- else:
- return None, None
+ raise TikTokNetworkError(f"Failed to resolve URL: {e}") from e
- async def get_video_id_from_mobile(self, link: str) -> Optional[str]:
- """Extract video ID from a mobile/short TikTok URL by following redirects.
-
- This resolves short URLs like vm.tiktok.com or vt.tiktok.com to get the
- actual video ID.
-
- Args:
- link: Mobile/short TikTok URL
+ except TikTokError:
+ raise
- Returns:
- Video ID string if found, None otherwise.
- """
- async with aiohttp.ClientSession() as client:
- try:
- async with client.get(link, allow_redirects=True) as response:
- return response.url.name
except Exception as e:
- logger.error(f"Failed to get video ID from mobile link {link}: {e}")
- return None
+ raise TikTokExtractionError(f"URL resolution failed: {e}") from e
- async def get_video_id(self, link: str, is_mobile: bool) -> Optional[str]:
- """Extract video ID from a TikTok URL.
+ raise TikTokNetworkError(f"URL resolution failed after {MAX_RETRIES} attempts")
- Args:
- link: TikTok URL (web or mobile)
- is_mobile: Whether the link is a mobile/short URL
+ # -------------------------------------------------------------------------
+ # Video Info Extraction
+ # -------------------------------------------------------------------------
+
+ def _parse_webpage_data(
+ self, html: str, video_id: str
+ ) -> tuple[dict[str, Any], int]:
+ """Parse video data from TikTok webpage HTML.
Returns:
- Video ID string if found, None otherwise.
+ Tuple of (video_data dict, status_code)
"""
- video_id: Optional[str] = None
- if not is_mobile:
- matches = _redirect_regex.findall(link)
- if matches:
- video_id = matches[0]
- else:
+ # Try UNIVERSAL_DATA first (most common format)
+ match = self._universal_data_re.search(html)
+ if match:
try:
- video_id = await self.get_video_id_from_mobile(link)
- except Exception:
+ data = json.loads(match.group(1))
+ scope = data.get("__DEFAULT_SCOPE__", {})
+ video_detail = scope.get("webapp.video-detail", {})
+ status = video_detail.get("statusCode", 0)
+ item_info = video_detail.get("itemInfo", {})
+ video_data = item_info.get("itemStruct", {})
+ if video_data:
+ logger.debug("Parsed video data from UNIVERSAL_DATA")
+ return video_data, status
+ except json.JSONDecodeError:
pass
- return video_id
- async def _resolve_url(self, url: str) -> str:
- """Resolve short URLs (vm.tiktok.com, vt.tiktok.com) to full URLs.
-
- Uses shared connector for connection pooling efficiency.
- """
- if "vm.tiktok.com" in url or "vt.tiktok.com" in url:
- connector = self._get_connector(
- self.aiohttp_pool_size, self.aiohttp_limit_per_host
- )
- timeout = ClientTimeout(total=15, connect=5, sock_read=10)
+ # Try SIGI_STATE
+ match = self._sigi_state_re.search(html)
+ if match:
try:
- async with aiohttp.ClientSession(
- connector=connector,
- timeout=timeout,
- connector_owner=False, # Don't close shared connector
- ) as session:
- async with session.get(url, allow_redirects=True) as response:
- return str(response.url)
- except Exception as e:
- logger.error(f"Failed to resolve URL {url}: {e}")
- return url
- return url
+ data = json.loads(match.group(1))
+ status = data.get("VideoPage", {}).get("statusCode", 0)
+ video_data = data.get("ItemModule", {}).get(video_id, {})
+ if video_data:
+ logger.debug("Parsed video data from SIGI_STATE")
+ return video_data, status
+ except json.JSONDecodeError:
+ pass
- def _extract_video_id(self, url: str) -> Optional[str]:
- """Extract video ID from TikTok URL."""
- match = re.search(r"/(?:video|photo)/(\d+)", url)
+ # Try Next.js data
+ match = self._next_data_re.search(html)
if match:
- return match.group(1)
- return None
+ try:
+ data = json.loads(match.group(1))
+ page_props = data.get("props", {}).get("pageProps", {})
+ status = page_props.get("statusCode", 0)
+ video_data = page_props.get("itemInfo", {}).get("itemStruct", {})
+ if video_data:
+ logger.debug("Parsed video data from NEXT_DATA")
+ return video_data, status
+ except json.JSONDecodeError:
+ pass
- def _get_ydl_opts(
- self, use_proxy: bool = True, explicit_proxy: Any = ...
- ) -> dict[str, Any]:
- """Get base yt-dlp options.
+ return {}, -1
- Args:
- use_proxy: If True and no explicit_proxy is given, use proxy from config.
- If False, no proxy is used (for media downloads when data_only_proxy=True).
- explicit_proxy: If provided (including None), use this specific proxy decision
- instead of getting one from rotation. Pass None to force direct
- connection. Uses sentinel default (...) to distinguish "not provided"
- from "provided as None".
+ def _check_status(self, status: int, video_link: str) -> None:
+ """Check status code and raise appropriate error."""
+ if status == 0:
+ return # Success
- Returns:
- Dict of yt-dlp options.
- """
- opts: dict[str, Any] = {
- "quiet": True,
- "no_warnings": True,
- }
-
- # Use explicit proxy decision if it was provided (even if None = direct connection)
- if explicit_proxy is not ...:
- if explicit_proxy is not None:
- opts["proxy"] = explicit_proxy
- logger.debug(f"Using explicit proxy: {explicit_proxy}")
- else:
- logger.debug("Using explicit direct connection (no proxy)")
- elif use_proxy and self.proxy_manager:
- proxy = self.proxy_manager.get_next_proxy()
- if proxy is not None: # None means direct connection
- opts["proxy"] = proxy
- logger.debug(f"Using proxy: {proxy}")
- else:
- logger.debug("Using direct connection (no proxy)")
+ if status in (10216, 10222):
+ raise TikTokPrivateError(f"Video {video_link} is private")
+ elif status == 10204:
+ raise TikTokRegionError(f"Video {video_link} is blocked in your region")
+ elif status == 10000:
+ raise TikTokDeletedError(f"Video {video_link} was deleted")
+ elif status != 0 and status != -1:
+ logger.warning(f"Unknown TikTok status code: {status}")
- if self.cookies:
- opts["cookiefile"] = self.cookies
- logger.debug(f"yt-dlp using cookie file: {self.cookies}")
- return opts
+ async def _fetch_video_info(
+ self, url: str, video_id: str, proxy: Optional[str]
+ ) -> dict[str, Any]:
+ """Fetch and parse video info from TikTok webpage."""
+ # Normalize photo URLs to video URLs (TikTok serves slideshow data on /video/ endpoint)
+ normalized_url = url.replace("/photo/", "/video/")
+
+ session = await self._get_curl_session()
+ headers = self._get_headers(normalized_url)
+
+ response = await session.get(
+ normalized_url,
+ headers=headers,
+ cookies=self._cookies,
+ proxy=proxy,
+ timeout=30,
+ allow_redirects=True,
+ )
- def _extract_raw_data_sync(
- self, url: str, video_id: str, explicit_proxy: Any = ...
- ) -> Tuple[Optional[dict[str, Any]], Optional[str]]:
- """
- Extract raw TikTok data using yt-dlp's internal API.
- This method supports both videos AND slideshows.
+ if response.status_code != 200:
+ if response.status_code == 429:
+ raise TikTokRateLimitError("Rate limited by TikTok")
+ raise TikTokNetworkError(f"HTTP {response.status_code}")
- Args:
- url: The TikTok URL to extract
- video_id: The video ID extracted from the URL
- explicit_proxy: Explicit proxy decision for this request. If provided
- (including None), uses this instead of rotation. Pass None
- to force direct connection.
-
- NOTE: This code relies on yt-dlp's private API (_extract_web_data_and_status),
- which may change or be removed in future yt-dlp releases. Keep yt-dlp up-to-date
- and be prepared to update this code if the private API changes.
- """
- ydl_opts = self._get_ydl_opts(use_proxy=True, explicit_proxy=explicit_proxy)
+ # Check for login redirect
+ final_url = str(response.url)
+ if "/login" in urlparse(final_url).path:
+ raise TikTokPrivateError("TikTok requires login to access this content")
- try:
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
- # Get the TikTok extractor
- ie = ydl.get_info_extractor("TikTok")
- ie.set_downloader(ydl)
-
- # Convert /photo/ to /video/ URL (yt-dlp requirement)
- normalized_url = url.replace("/photo/", "/video/")
-
- # Guard: Check if the private method exists before calling it.
- # This method is part of yt-dlp's internal API and may be absent
- # in future releases.
- if not hasattr(ie, "_extract_web_data_and_status"):
- logger.error(
- "yt-dlp's TikTok extractor is missing '_extract_web_data_and_status' method. "
- f"Current yt-dlp version: {yt_dlp.version.__version__}. "
- "Please update yt-dlp to a compatible version: pip install -U yt-dlp"
- )
- raise TikTokExtractionError(
- "Incompatible yt-dlp version: missing required internal method. "
- "Please update yt-dlp: pip install -U yt-dlp"
- )
+ html = response.text
- try:
- # Use yt-dlp's internal method to get raw webpage data
- video_data, status = ie._extract_web_data_and_status(
- normalized_url, video_id
- )
- except AttributeError as e:
- logger.error(
- f"Failed to call yt-dlp internal method: {e}. "
- f"Current yt-dlp version: {yt_dlp.version.__version__}. "
- "Please update yt-dlp: pip install -U yt-dlp"
- )
- raise TikTokExtractionError(
- "Incompatible yt-dlp version. Please update yt-dlp: pip install -U yt-dlp"
- ) from e
+ # Debug logging - also serves as a timing buffer for curl_cffi response handling
+ # (removing this logging has caused intermittent extraction failures)
+ has_universal = "__UNIVERSAL_DATA_FOR_REHYDRATION__" in html
+ has_sigi = "SIGI_STATE" in html or "sigi-persisted-data" in html
+ has_next = "__NEXT_DATA__" in html
+ logger.debug(
+ f"Fetched {len(html)} bytes, patterns: UNIVERSAL={has_universal}, "
+ f"SIGI={has_sigi}, NEXT={has_next}"
+ )
- return video_data, status
- except yt_dlp.utils.DownloadError as e:
- error_msg = str(e).lower()
- if (
- "unavailable" in error_msg
- or "removed" in error_msg
- or "deleted" in error_msg
- ):
- logger.warning(f"Video appears deleted: {e}")
- return None, "deleted"
- elif "private" in error_msg:
- logger.warning(f"Video is private: {e}")
- return None, "private"
- elif "rate" in error_msg or "too many" in error_msg or "429" in error_msg:
- logger.warning(f"Rate limited: {e}")
- return None, "rate_limit"
- elif (
- "region" in error_msg
- or "geo" in error_msg
- or "country" in error_msg
- or "not available in your" in error_msg
- ):
- logger.warning(f"Region blocked: {e}")
- return None, "region"
- # IP blocked and other errors -> generic extraction error
- logger.error(
- f"yt-dlp download error for video {video_id} ({url}): {e}\n"
- f" yt-dlp version: {yt_dlp.version.__version__}\n"
- f" Proxy: {self._get_proxy_info()}\n"
- f" Cookies: {self.cookies or 'None'}"
- )
- return None, "extraction"
- except yt_dlp.utils.ExtractorError as e:
- error_msg = str(e)
- logger.error(
- f"yt-dlp extractor error for video {video_id} ({url}): {error_msg}\n"
- f" yt-dlp version: {yt_dlp.version.__version__}\n"
- f" Proxy: {self._get_proxy_info()}\n"
- f" Cookies: {self.cookies or 'None'}"
- )
- # Log additional guidance for common issues
- if "unable to extract" in error_msg.lower():
- logger.error(
- "This may indicate TikTok changed their page structure. "
- "Try updating yt-dlp: pip install -U yt-dlp\n"
- "If the issue persists, check https://github.com/yt-dlp/yt-dlp/issues"
- )
- return None, "extraction"
- except TikTokError:
- raise
- except Exception as e:
- logger.error(
- f"yt-dlp extraction failed for video {video_id} ({url}): {e}\n"
- f" yt-dlp version: {yt_dlp.version.__version__}\n"
- f" Error type: {type(e).__name__}",
- exc_info=True,
- )
- return None, "extraction"
+ if not any([has_universal, has_sigi, has_next]):
+ # Log preview at DEBUG level to avoid exposing sensitive data in production logs
+ logger.debug(f"No data patterns found! HTML preview: {html[:2000]}")
- def _extract_with_context_sync(
- self, url: str, video_id: str, request_proxy: Any = ...
- ) -> Tuple[Optional[dict[str, Any]], Optional[str], Optional[dict[str, Any]]]:
- """
- Extract TikTok data and return the download context for later media downloads.
+ video_data, status = self._parse_webpage_data(html, video_id)
- This method keeps the YoutubeDL instance alive so it can be reused for
- downloading media (videos, images) with the same auth context.
+ self._check_status(status, url)
- Args:
- url: The TikTok URL to extract
- video_id: The video ID extracted from the URL
- request_proxy: Explicit proxy decision for this request. If provided
- (including None), uses this instead of rotation and stores
- in download_context for media downloads. Pass None to force
- direct connection.
+ if not video_data:
+ raise TikTokExtractionError("Unable to extract video data from webpage")
- Returns:
- Tuple of (video_data, status, download_context)
- - video_data: Raw TikTok API response
- - status: Error status string or None
- - download_context: Dict with 'ydl', 'ie', 'referer_url', 'proxy' for media downloads
-
- Note:
- The caller is responsible for closing the YDL instance in download_context
- when done. On error paths, this method closes the YDL instance before returning.
- """
- ydl_opts = self._get_ydl_opts(use_proxy=True, explicit_proxy=request_proxy)
- ydl = None
+ return video_data
- try:
- # Create YDL instance WITHOUT context manager so it stays alive
- ydl = yt_dlp.YoutubeDL(ydl_opts)
-
- # Get the TikTok extractor
- ie = ydl.get_info_extractor("TikTok")
- ie.set_downloader(ydl)
-
- # Convert /photo/ to /video/ URL (yt-dlp requirement)
- normalized_url = url.replace("/photo/", "/video/")
-
- # Guard: Check if the private method exists
- if not hasattr(ie, "_extract_web_data_and_status"):
- logger.error(
- "yt-dlp's TikTok extractor is missing '_extract_web_data_and_status' method. "
- f"Current yt-dlp version: {yt_dlp.version.__version__}. "
- "Please update yt-dlp: pip install -U yt-dlp"
- )
- raise TikTokExtractionError(
- "Incompatible yt-dlp version: missing required internal method. "
- "Please update yt-dlp: pip install -U yt-dlp"
- )
+ async def _fetch_video_info_with_retry(
+ self, url: str, video_id: str, proxy: Optional[str]
+ ) -> tuple[dict[str, Any], Optional[str]]:
+ """Fetch video info with retries. Returns (video_data, final_proxy)."""
+ current_proxy = proxy
+ last_error: Optional[Exception] = None
+ for attempt in range(1, MAX_RETRIES + 1):
try:
- # Use yt-dlp's internal method to get raw webpage data
- # This also sets up all necessary cookies
- video_data, status = ie._extract_web_data_and_status(
- normalized_url, video_id
- )
- except AttributeError as e:
- logger.error(
- f"Failed to call yt-dlp internal method: {e}. "
- f"Current yt-dlp version: {yt_dlp.version.__version__}. "
- "Please update yt-dlp: pip install -U yt-dlp"
- )
- raise TikTokExtractionError(
- "Incompatible yt-dlp version. Please update: pip install -U yt-dlp"
- ) from e
-
- # Create download context with the live instances
- download_context = {
- "ydl": ydl,
- "ie": ie,
- "referer_url": url,
- "proxy": request_proxy, # Store proxy for per-request assignment
- }
-
- # Success - transfer ownership of ydl to caller via download_context
- # Set ydl to None so finally block doesn't close it
- ydl = None
- return video_data, status, download_context
-
- except yt_dlp.utils.DownloadError as e:
- error_msg = str(e).lower()
- if (
- "unavailable" in error_msg
- or "removed" in error_msg
- or "deleted" in error_msg
- ):
- logger.warning(f"Video appears deleted: {e}")
- return None, "deleted", None
- elif "private" in error_msg:
- logger.warning(f"Video is private: {e}")
- return None, "private", None
- elif "rate" in error_msg or "too many" in error_msg or "429" in error_msg:
- logger.warning(f"Rate limited: {e}")
- return None, "rate_limit", None
- elif (
- "region" in error_msg
- or "geo" in error_msg
- or "country" in error_msg
- or "not available in your" in error_msg
- ):
- logger.warning(f"Region blocked: {e}")
- return None, "region", None
- logger.error(
- f"yt-dlp download error for video {video_id} ({url}): {e}\n"
- f" yt-dlp version: {yt_dlp.version.__version__}\n"
- f" Proxy: {self._get_proxy_info()}\n"
- f" Cookies: {self.cookies or 'None'}"
- )
- return None, "extraction", None
- except yt_dlp.utils.ExtractorError as e:
- error_msg = str(e)
- logger.error(
- f"yt-dlp extractor error for video {video_id} ({url}): {error_msg}\n"
- f" yt-dlp version: {yt_dlp.version.__version__}\n"
- f" Proxy: {self._get_proxy_info()}\n"
- f" Cookies: {self.cookies or 'None'}"
- )
- # Log additional guidance for common issues
- if "unable to extract" in error_msg.lower():
- logger.error(
- "This may indicate TikTok changed their page structure. "
- "Try updating yt-dlp: pip install -U yt-dlp\n"
- "If the issue persists, check https://github.com/yt-dlp/yt-dlp/issues"
- )
- return None, "extraction", None
- except TikTokError:
- raise
- except Exception as e:
- logger.error(
- f"yt-dlp extraction failed for video {video_id} ({url}): {e}\n"
- f" yt-dlp version: {yt_dlp.version.__version__}\n"
- f" Error type: {type(e).__name__}",
- exc_info=True,
- )
- return None, "extraction", None
- finally:
- # Close ydl if we still own it (i.e., we didn't successfully transfer
- # ownership to the caller via download_context)
- if ydl is not None:
- try:
- ydl.close()
- except Exception:
- pass
-
- async def _run_sync(self, func: Any, *args: Any) -> Any:
- """Run synchronous function in executor."""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(self._get_executor(), func, *args)
-
- def _close_download_context(
- self, download_context: Optional[dict[str, Any]]
- ) -> None:
- """Close the YoutubeDL instance in a download context if present.
+ video_data = await self._fetch_video_info(url, video_id, current_proxy)
+ return video_data, current_proxy
- Args:
- download_context: Dict containing 'ydl' key with YoutubeDL instance, or None
- """
- if download_context and "ydl" in download_context:
- try:
- download_context["ydl"].close()
- except Exception:
- pass # Ignore errors during cleanup
+ except (TikTokNetworkError, TikTokRateLimitError, CurlError) as e:
+ last_error = e
+ if attempt < MAX_RETRIES:
+ current_proxy = self._rotate_proxy(current_proxy)
+ delay = 1.0 * (2 ** (attempt - 1))
+ logger.warning(
+ f"Video info fetch attempt {attempt}/{MAX_RETRIES} failed: {e}"
+ )
+ await asyncio.sleep(delay)
+ continue
+ raise
- def _raise_for_status(self, status: str, video_link: str) -> None:
- """Raise appropriate exception based on status string."""
- if status == "deleted":
- raise TikTokDeletedError(f"Video {video_link} was deleted")
- elif status == "private":
- raise TikTokPrivateError(f"Video {video_link} is private")
- elif status == "rate_limit":
- raise TikTokRateLimitError("Rate limited by TikTok")
- elif status == "network":
- raise TikTokNetworkError("Network error occurred")
- elif status == "region":
- raise TikTokRegionError(
- f"Video {video_link} is not available in your region"
- )
- else:
- # Handle "extraction" and any unknown status values
- raise TikTokExtractionError(f"Failed to extract video {video_link}")
+ except TikTokError:
+ raise
- async def download_image(self, image_url: str, video_info: VideoInfo) -> bytes:
- """
- Download an image using async aiohttp with yt-dlp bypass headers/cookies.
+ except Exception as e:
+ raise TikTokExtractionError(f"Video info fetch failed: {e}") from e
- This method uses the download context that was saved during video extraction,
- applying the same cookies and headers for authentication.
+ if last_error:
+ raise last_error
+ raise TikTokExtractionError("Video info fetch failed")
- Args:
- image_url: Direct URL to the image on TikTok CDN
- video_info: VideoInfo object that was returned by video() method
- (must be a slideshow with _download_context set)
+ # -------------------------------------------------------------------------
+ # Media Download
+ # -------------------------------------------------------------------------
- Returns:
- Image bytes
+ async def _download_media(
+ self,
+ media_url: str,
+ proxy: Optional[str],
+ referer_url: str = "https://www.tiktok.com/",
+ timeout: int = 60,
+ ) -> bytes:
+ """Download media (video, audio, image) from TikTok CDN."""
+ session = await self._get_curl_session()
+ headers = self._get_headers(referer_url)
+
+ response = await session.get(
+ media_url,
+ headers=headers,
+ cookies=self._cookies,
+ proxy=proxy,
+ timeout=timeout,
+ allow_redirects=True,
+ )
- Raises:
- ValueError: If video_info has no download context
- TikTokNetworkError: If the download fails
- """
- if not video_info._download_context:
- raise ValueError(
- "VideoInfo has no download context - was it extracted as a slideshow?"
+ if response.status_code != 200:
+ raise TikTokNetworkError(
+ f"Media download failed: HTTP {response.status_code}"
)
- result = await self._download_media_async(
- image_url, video_info._download_context
- )
+ return response.content
- if result is None:
- raise TikTokNetworkError(f"Failed to download image: {image_url}")
+ async def _download_media_with_retry(
+ self,
+ media_url: str,
+ proxy: Optional[str],
+ referer_url: str = "https://www.tiktok.com/",
+ ) -> tuple[bytes, Optional[str]]:
+ """Download media with retries. Returns (data, final_proxy)."""
+ current_proxy = proxy if not self.data_only_proxy else None
+ last_error: Optional[Exception] = None
+
+ for attempt in range(1, MAX_RETRIES + 1):
+ try:
+ data = await self._download_media(media_url, current_proxy, referer_url)
+ return data, current_proxy
- return result
+ except (TikTokNetworkError, CurlError) as e:
+ last_error = e
+ if attempt < MAX_RETRIES:
+ if not self.data_only_proxy:
+ current_proxy = self._rotate_proxy(current_proxy)
+ delay = 1.0 * (2 ** (attempt - 1))
+ logger.warning(
+ f"Media download attempt {attempt}/{MAX_RETRIES} failed: {e}"
+ )
+ await asyncio.sleep(delay)
+ continue
+ raise TikTokNetworkError(f"Media download failed: {e}") from e
- async def detect_image_format(self, image_url: str, video_info: VideoInfo) -> str:
- """
- Detect image format using HTTP Range request (only fetches first 20 bytes).
+ except Exception as e:
+ raise TikTokNetworkError(f"Media download failed: {e}") from e
- Uses curl_cffi with browser impersonation for TLS fingerprint bypass.
+ raise TikTokNetworkError("Media download failed after all retries")
- Args:
- image_url: Direct URL to the image on TikTok CDN
- video_info: VideoInfo object with download context
+ # -------------------------------------------------------------------------
+ # URL Pattern Matching
+ # -------------------------------------------------------------------------
+
+ async def regex_check(
+ self, video_link: str
+ ) -> tuple[Optional[str], Optional[bool]]:
+ """Check if a link matches known TikTok URL patterns.
Returns:
- File extension string: ".jpg", ".webp", ".heic", or ".jpg" (default)
+ Tuple of (matched_link, is_mobile).
"""
- if not video_info._download_context:
- # Fallback: assume needs processing if no context
- return ".heic"
-
- referer_url = video_info._download_context.get(
- "referer_url", "https://www.tiktok.com/"
- )
- headers = self._get_bypass_headers(referer_url)
- headers["Range"] = "bytes=0-19" # Only fetch first 20 bytes
- cookies = self._get_cookies_from_context(
- video_info._download_context, image_url
- )
-
- # Use proxy from context unless data_only_proxy is True
- proxy = None
- if not self.data_only_proxy:
- proxy = video_info._download_context.get("proxy")
-
- session = self._get_curl_session()
-
- response = None
- try:
- response = await session.get(
- image_url,
- headers=headers,
- cookies=cookies,
- proxy=proxy,
- timeout=10,
- allow_redirects=True,
- )
- if response.status_code in (200, 206): # 206 = Partial Content
- return self._detect_format_from_bytes(response.content)
- else:
- logger.warning(
- f"Range request returned status {response.status_code} for {image_url}"
- )
- return ".heic" # Assume needs processing on error
- except Exception as e:
- logger.debug(f"Range request failed for {image_url}: {e}")
- return ".heic" # Assume needs processing on error
- finally:
- if response is not None:
- response.close()
+ if self.web_regex.search(video_link) is not None:
+ link = self.web_regex.findall(video_link)[0]
+ return link, False
+ elif self.photo_regex.search(video_link) is not None:
+ link = self.photo_regex.findall(video_link)[0]
+ return link, False
+ elif self.mobile_regex.search(video_link) is not None:
+ link = self.mobile_regex.findall(video_link)[0]
+ return link, True
+ return None, None
- @staticmethod
- def _detect_format_from_bytes(data: bytes) -> str:
- """Detect image format from magic bytes."""
- if data.startswith(b"\xff\xd8\xff"):
- return ".jpg"
- elif data.startswith(b"RIFF") and len(data) >= 12 and data[8:12] == b"WEBP":
- return ".webp"
- elif len(data) >= 12 and (
- data[4:12] == b"ftypheic" or data[4:12] == b"ftypmif1"
- ):
- return ".heic"
- else:
- return ".jpg" # Unknown format, default to jpg
+ # -------------------------------------------------------------------------
+ # Public API: Video
+ # -------------------------------------------------------------------------
- async def video(self, video_link: str) -> VideoInfo:
- """
- Extract video/slideshow data from TikTok URL.
+ async def video(
+ self,
+ video_link: str,
+ _resolved: Optional[tuple[str, str, Optional[str]]] = None,
+ ) -> VideoInfo:
+ """Extract video/slideshow data from TikTok URL.
Args:
video_link: TikTok video or slideshow URL
Returns:
VideoInfo: Object containing video/slideshow information.
- - For videos: data contains bytes, url contains direct video URL
+ - For videos: data contains bytes
- For slideshows: data contains list of image URLs
Raises:
- TikTokDeletedError: Video was deleted by creator
+ TikTokDeletedError: Video was deleted
TikTokPrivateError: Video is private
- TikTokNetworkError: Network/connection error
- TikTokRateLimitError: Too many requests
- TikTokRegionError: Video not available in region
+ TikTokNetworkError: Network error
+ TikTokRateLimitError: Rate limited
+ TikTokRegionError: Video geo-blocked
+ TikTokVideoTooLongError: Video exceeds max duration
TikTokExtractionError: Generic extraction failure
"""
- download_context = None
- context_transferred = False # Track if context ownership was transferred
+ # Pick a proxy for this entire operation
+ current_proxy = self._get_initial_proxy()
try:
- # Get proxy once for the entire request (per-request proxy assignment)
- request_proxy: Optional[str] = None
- if self.proxy_manager:
- request_proxy = self.proxy_manager.get_next_proxy()
- if request_proxy:
- logger.info(f"Video attempt using proxy: {request_proxy}")
- else:
- logger.info("Video attempt using direct connection (no proxy)")
-
- # Resolve short URLs
- full_url = await self._resolve_url(video_link)
- video_id = self._extract_video_id(full_url)
-
- if not video_id:
- logger.error(f"Could not extract video ID from {video_link}")
- raise TikTokExtractionError(
- f"Could not extract video ID from {video_link}"
+ # Step 1: Resolve URL and extract video ID
+ if _resolved:
+ full_url, video_id, current_proxy = _resolved
+ else:
+ full_url, video_id, current_proxy = await self._resolve_and_extract_id(
+ video_link, current_proxy
)
- # Extract raw data with download context for authenticated downloads
- # Pass request_proxy for per-request proxy assignment
- video_data, status, download_context = await self._run_sync(
- self._extract_with_context_sync, full_url, video_id, request_proxy
+ # Step 2: Fetch video info
+ video_data, current_proxy = await self._fetch_video_info_with_retry(
+ full_url, video_id, current_proxy
)
- # Check for error status and raise appropriate exception
- if status and status not in ("ok", None):
- self._raise_for_status(status, video_link)
-
- # Check if it's a slideshow (imagePost present in raw data)
- if video_data:
- image_post = video_data.get("imagePost")
- if image_post:
- images = image_post.get("images", [])
- image_urls = []
-
- for img in images:
- url_list = img.get("imageURL", {}).get("urlList", [])
- if url_list:
- # Use first URL (primary CDN)
- image_urls.append(url_list[0])
-
- if image_urls:
- author = video_data.get("author", {}).get("uniqueId", "")
-
- # Transfer context ownership to VideoInfo
- context_transferred = True
- return VideoInfo(
- type="images",
- data=image_urls,
- id=int(video_id),
- cover=None,
- width=None,
- height=None,
- duration=None,
- author=author,
- link=video_link,
- url=None,
- _download_context=download_context,
- )
-
- # It's a video - extract video URL from raw data and download to memory
- # No need to call _extract_video_info_sync again - we already have the data
-
- if not video_data:
- raise TikTokExtractionError(
- f"Failed to extract video info for {video_link}"
- )
-
- if not download_context:
- raise TikTokExtractionError(
- f"No download context available for {video_link}"
- )
+ # Check for slideshow (imagePost)
+ image_post = video_data.get("imagePost")
+ if image_post:
+ images = image_post.get("images", [])
+ image_urls = []
+
+ for img in images:
+ url_list = img.get("imageURL", {}).get("urlList", [])
+ if url_list:
+ image_urls.append(url_list[0])
+
+ if image_urls:
+ author = video_data.get("author", {}).get("uniqueId", "")
+ return VideoInfo(
+ type="images",
+ data=image_urls,
+ id=int(video_id),
+ cover=None,
+ width=None,
+ height=None,
+ duration=None,
+ author=author,
+ link=video_link,
+ url=None,
+ _proxy=current_proxy,
+ )
- # Get video info and extract metadata early (needed for download decisions)
+ # It's a video - extract info and download
video_info = video_data.get("video", {})
- # Extract duration BEFORE download - needed for streaming decision
+ # Get duration
duration = video_info.get("duration")
if duration:
duration = int(duration)
- # Check if video exceeds maximum duration (configurable via MAX_VIDEO_DURATION env)
- # This prevents downloading very large files that would strain resources
- # Default: 1800 seconds (30 minutes). Set to 0 to disable limit.
- from data.config import config
-
- perf_config = config.get("performance", {})
- max_video_duration = perf_config.get("max_video_duration", 1800)
- if max_video_duration > 0 and duration and duration > max_video_duration:
- logger.warning(
- f"Video {video_link} exceeds max duration: {duration}s > {max_video_duration}s"
- )
+ # Check max duration
+ if (
+ self.max_video_duration > 0
+ and duration
+ and duration > self.max_video_duration
+ ):
raise TikTokVideoTooLongError(
- f"Video is {duration // 60} minutes long, max allowed is {max_video_duration // 60} minutes"
+ f"Video is {duration // 60} minutes long, "
+ f"max allowed is {self.max_video_duration // 60} minutes"
)
- # Get video URL from raw TikTok data
- # Try multiple paths as TikTok API structure can vary
- video_url = None
-
- # Try playAddr first (primary playback URL)
- play_addr = video_info.get("playAddr")
- if play_addr:
- video_url = play_addr
+ # Get video URL
+ video_url = (
+ video_info.get("playAddr")
+ or video_info.get("downloadAddr")
+ or self._get_video_url_from_bitrate(video_info)
+ )
- # Try downloadAddr (sometimes has better quality)
if not video_url:
- download_addr = video_info.get("downloadAddr")
- if download_addr:
- video_url = download_addr
+ raise TikTokExtractionError("Could not find video URL")
- # Try bitrateInfo for specific quality URLs
- if not video_url:
- bitrate_info = video_info.get("bitrateInfo", [])
- if bitrate_info:
- # Get the best quality (usually first or last)
- for br in bitrate_info:
- play_addr_obj = br.get("PlayAddr", {})
- url_list = play_addr_obj.get("UrlList", [])
- if url_list:
- video_url = url_list[0]
- break
+ # Save extraction proxy before download (download may use different proxy
+ # when data_only_proxy=True, but we want to preserve the extraction proxy
+ # for VideoInfo._proxy which is used for slideshow image downloads)
+ extraction_proxy = current_proxy
- if not video_url:
- logger.error(f"Could not find video URL in raw data for {video_link}")
- raise TikTokExtractionError(
- f"Could not find video URL for {video_link}"
- )
-
- # Download video using curl_cffi with browser impersonation
- # Pass duration for conditional streaming (streams if > 5 minutes)
- video_bytes = await self._download_media_async(
- video_url, download_context, duration=duration
+ # Step 3: Download video
+ video_bytes, _ = await self._download_media_with_retry(
+ video_url, current_proxy, full_url
)
- # Close the download context - it's no longer needed for videos
- # (unlike slideshows where we keep it for image downloads)
- self._close_download_context(download_context)
- context_transferred = True # Mark as handled
-
- if video_bytes is None:
- raise TikTokExtractionError(f"Failed to download video {video_link}")
+ # Extract metadata
+ author = video_data.get("author", {}).get("uniqueId", "")
- # Log successful download with proxy info
- proxy_info = request_proxy or "direct connection"
- logger.info(
- f"Successfully downloaded video {video_id} using proxy: {proxy_info}"
- )
+ # Include metadata for long videos (> threshold)
+ if duration and duration > self.long_video_threshold:
+ cover = video_info.get("cover") or video_info.get("originCover")
+ width = (
+ int(video_info.get("width")) if video_info.get("width") else None
+ )
+ height = (
+ int(video_info.get("height")) if video_info.get("height") else None
+ )
+ else:
+ cover = None
+ width = None
+ height = None
- # Extract remaining metadata from raw data
- width = video_info.get("width")
- height = video_info.get("height")
- author = video_data.get("author", {}).get("uniqueId", "")
- cover = video_info.get("cover") or video_info.get("originCover")
+ logger.info(f"Successfully downloaded video {video_id}")
return VideoInfo(
type="video",
data=video_bytes,
id=int(video_id),
cover=cover,
- width=int(width) if width else None,
- height=int(height) if height else None,
+ width=width,
+ height=height,
duration=duration,
author=author,
link=video_link,
url=video_url,
+ _proxy=extraction_proxy,
)
except TikTokError:
- # Re-raise TikTok errors as-is
raise
- except asyncio.CancelledError:
- # Handle cancellation (e.g., from timeout) - ensure cleanup happens
- logger.debug(f"Video extraction cancelled for {video_link}")
- raise
- except aiohttp.ClientError as e:
- logger.error(f"Network error extracting video {video_link}: {e}")
- raise TikTokNetworkError(f"Network error: {e}") from e
except Exception as e:
logger.error(f"Error extracting video {video_link}: {e}")
raise TikTokExtractionError(f"Failed to extract video: {e}") from e
- finally:
- # Clean up download context if ownership wasn't transferred
- if not context_transferred:
- self._close_download_context(download_context)
+
+ def _get_video_url_from_bitrate(self, video_info: dict[str, Any]) -> Optional[str]:
+ """Extract video URL from bitrateInfo."""
+ for br in video_info.get("bitrateInfo", []):
+ play_addr = br.get("PlayAddr", {})
+ url_list = play_addr.get("UrlList", [])
+ if url_list:
+ return url_list[0]
+ return None
async def video_with_retry(
self,
video_link: str,
max_attempts: int = 3,
- request_timeout: float = 10.0,
+ request_timeout: float = 30.0,
base_delay: float = 1.0,
- on_retry: Callable[[int], Awaitable[None]] | None = None,
+ on_retry: Optional[Callable[[int], Awaitable[None]]] = None,
) -> VideoInfo:
- """
- Extract video info with retry logic and per-request timeout.
-
- Each request times out after `request_timeout` seconds. On timeout or
- transient errors (network, rate limit, extraction), retries with exponential
- backoff. Does NOT retry on permanent errors (deleted, private, region).
+ """Extract video info with top-level retry logic and timeout.
Args:
video_link: TikTok video URL
- max_attempts: Maximum number of attempts (default: 3)
- request_timeout: Timeout per request in seconds (default: 10)
- base_delay: Base delay for exponential backoff in seconds (default: 1.0)
- on_retry: Optional async callback called with attempt number (1, 2, 3...)
- before each attempt. Use for updating status (e.g., emoji reactions).
+ max_attempts: Maximum attempts
+ request_timeout: Timeout per attempt in seconds
+ base_delay: Base delay for exponential backoff
+ on_retry: Optional async callback(attempt_number) before each attempt
Returns:
- VideoInfo object containing video/slideshow data
+ VideoInfo object
Raises:
- TikTokDeletedError: Video was deleted (not retried)
- TikTokPrivateError: Video is private (not retried)
- TikTokRegionError: Video geo-blocked (not retried)
- TikTokNetworkError: Network error after all retries exhausted
- TikTokRateLimitError: Rate limited after all retries exhausted
- TikTokExtractionError: Extraction error after all retries exhausted
-
- Example:
- async def update_status(attempt: int):
- emojis = ["👀", "🔄", "⏳"]
- await message.react([ReactionTypeEmoji(emoji=emojis[attempt - 1])])
-
- video_info = await client.video_with_retry(
- video_link,
- max_attempts=3,
- request_timeout=10.0,
- on_retry=update_status,
- )
+ TikTokDeletedError, TikTokPrivateError, TikTokRegionError: Not retried
+ TikTokVideoTooLongError: Not retried
+ Other errors: After all retries exhausted
"""
- last_error: Exception | None = None
+ last_error: Optional[Exception] = None
+
+ # Resolve URL once before retry loop (has its own 3 retries)
+ current_proxy = self._get_initial_proxy()
+ try:
+ resolved = await self._resolve_and_extract_id(video_link, current_proxy)
+ except (TikTokNetworkError, TikTokExtractionError) as e:
+ raise TikTokInvalidLinkError(f"Invalid video link: {video_link}") from e
for attempt in range(1, max_attempts + 1):
try:
- # Call status callback before each attempt
if on_retry:
await on_retry(attempt)
async with asyncio.timeout(request_timeout):
- return await self.video(video_link)
+ return await self.video(video_link, _resolved=resolved)
except asyncio.TimeoutError as e:
logger.warning(
- f"Attempt {attempt}/{max_attempts} timed out after "
- f"{request_timeout}s for {video_link}"
+ f"Attempt {attempt}/{max_attempts} timed out for {video_link}"
)
last_error = e
@@ -1364,30 +835,21 @@ async def update_status(attempt: int):
)
last_error = e
- except (TikTokDeletedError, TikTokPrivateError, TikTokRegionError):
- # Permanent errors - don't retry, raise immediately
+ except (
+ TikTokDeletedError,
+ TikTokPrivateError,
+ TikTokRegionError,
+ TikTokVideoTooLongError,
+ ):
raise
except TikTokError:
- # Any other TikTok error - don't retry
raise
- # Exponential backoff with jitter (only if not last attempt)
if attempt < max_attempts:
- # Calculate delay: base_delay * 2^(attempt-1) = 1s, 2s, 4s...
delay = base_delay * (2 ** (attempt - 1))
- # Add jitter (±10%) to prevent thundering herd
- jitter = delay * 0.1 * (2 * random.random() - 1)
- delay = max(0.5, delay + jitter) # Minimum 0.5s delay
- logger.debug(
- f"Retry backoff: sleeping {delay:.2f}s before attempt {attempt + 1}"
- )
- await asyncio.sleep(delay)
-
- # All attempts exhausted
- logger.error(
- f"All {max_attempts} attempts failed for {video_link}: {last_error}"
- )
+ jitter = delay * 0.1 * random.random()
+ await asyncio.sleep(delay + jitter)
if isinstance(last_error, asyncio.TimeoutError):
raise TikTokNetworkError(f"Request timed out after {max_attempts} attempts")
@@ -1395,16 +857,14 @@ async def update_status(attempt: int):
if last_error:
raise last_error
- raise TikTokExtractionError(
- f"Failed to extract video after {max_attempts} attempts"
- )
+ raise TikTokExtractionError(f"Failed after {max_attempts} attempts")
- async def music(self, video_id: int) -> MusicInfo:
- """
- Extract music info from a TikTok video.
+ # -------------------------------------------------------------------------
+ # Public API: Music
+ # -------------------------------------------------------------------------
- All network operations are fully async - uses yt-dlp only for metadata
- extraction, then downloads audio via async aiohttp.
+ async def music(self, video_id: int) -> MusicInfo:
+ """Extract music info from a TikTok video.
Args:
video_id: TikTok video ID
@@ -1413,147 +873,69 @@ async def music(self, video_id: int) -> MusicInfo:
MusicInfo: Object containing music/audio information
Raises:
- TikTokDeletedError: Video was deleted by creator
- TikTokPrivateError: Video is private
- TikTokNetworkError: Network/connection error
- TikTokRateLimitError: Too many requests
- TikTokRegionError: Video not available in region
- TikTokExtractionError: Generic extraction failure
+ TikTokExtractionError: No music found or extraction failed
+ TikTokNetworkError: Network error
"""
- download_context = None
+ current_proxy = self._get_initial_proxy()
+
try:
- # Get proxy once for the entire request (per-request proxy assignment)
- request_proxy: Optional[str] = None
- if self.proxy_manager:
- request_proxy = self.proxy_manager.get_next_proxy()
- if request_proxy:
- logger.info(f"Music attempt using proxy: {request_proxy}")
- else:
- logger.info("Music attempt using direct connection (no proxy)")
-
- # Construct a URL with the video ID
+ # Construct URL
url = f"https://www.tiktok.com/@_/video/{video_id}"
- # Extract with context (keeps YDL alive for authenticated downloads)
- video_data, status, download_context = await self._run_sync(
- self._extract_with_context_sync, url, str(video_id), request_proxy
+ # Fetch video info
+ video_data, current_proxy = await self._fetch_video_info_with_retry(
+ url, str(video_id), current_proxy
)
- # Check for error status and raise appropriate exception
- if status and status not in ("ok", None):
- self._raise_for_status(status, str(video_id))
-
- if video_data is None:
- raise TikTokExtractionError(f"No data returned for video {video_id}")
-
- if not download_context:
- raise TikTokExtractionError(
- f"No download context available for video {video_id}"
- )
-
- # Get music info
- music_info = video_data.get("music")
+ # Extract music info
+ music_info = video_data.get("music", {})
if not music_info:
- raise TikTokExtractionError(f"No music info found for video {video_id}")
+ raise TikTokExtractionError(f"No music info for video {video_id}")
music_url = music_info.get("playUrl")
if not music_url:
- raise TikTokExtractionError(f"No music URL found for video {video_id}")
+ raise TikTokExtractionError(f"No music URL for video {video_id}")
- # Download audio using async aiohttp with yt-dlp context (headers, cookies)
- audio_bytes = await self._download_media_async(music_url, download_context)
- if audio_bytes is None:
- raise TikTokExtractionError(
- f"Failed to download audio for video {video_id}"
- )
-
- # Log successful download with proxy info
- proxy_info = request_proxy or "direct connection"
- logger.info(
- f"Successfully downloaded music from video {video_id} using proxy: {proxy_info}"
+ # Download audio
+ audio_bytes, _ = await self._download_media_with_retry(
+ music_url, current_proxy, url
)
- # Get the music cover URL from music object
- cover_url = (
- music_info.get("coverLarge")
- or music_info.get("coverMedium")
- or music_info.get("coverThumb")
- or ""
- )
+ logger.info(f"Successfully downloaded music from video {video_id}")
return MusicInfo(
data=audio_bytes,
- id=int(video_id),
+ id=video_id,
title=music_info.get("title", ""),
author=music_info.get("authorName", ""),
duration=int(music_info.get("duration", 0)),
- cover=cover_url,
+ cover=(
+ music_info.get("coverLarge")
+ or music_info.get("coverMedium")
+ or music_info.get("coverThumb")
+ or ""
+ ),
)
except TikTokError:
- # Re-raise TikTok errors as-is
raise
- except aiohttp.ClientError as e:
- logger.error(f"Network error extracting music for video {video_id}: {e}")
- raise TikTokNetworkError(f"Network error: {e}") from e
except Exception as e:
logger.error(f"Error extracting music for video {video_id}: {e}")
raise TikTokExtractionError(f"Failed to extract music: {e}") from e
- finally:
- # Always clean up download context
- self._close_download_context(download_context)
async def music_with_retry(
self,
video_id: int,
max_attempts: int = 3,
- request_timeout: float = 10.0,
+ request_timeout: float = 30.0,
base_delay: float = 1.0,
- on_retry: Callable[[int], Awaitable[None]] | None = None,
+ on_retry: Optional[Callable[[int], Awaitable[None]]] = None,
) -> MusicInfo:
- """
- Extract music info with retry logic and per-request timeout.
-
- Each request times out after `request_timeout` seconds. On timeout or
- transient errors (network, rate limit, extraction), retries with exponential
- backoff. Does NOT retry on permanent errors (deleted, private, region).
-
- Args:
- video_id: TikTok video ID
- max_attempts: Maximum number of attempts (default: 3)
- request_timeout: Timeout per request in seconds (default: 10)
- base_delay: Base delay for exponential backoff in seconds (default: 1.0)
- on_retry: Optional async callback called with attempt number (1, 2, 3...)
- before each attempt. Use for updating status (e.g., emoji reactions).
-
- Returns:
- MusicInfo object containing audio data
-
- Raises:
- TikTokDeletedError: Video was deleted (not retried)
- TikTokPrivateError: Video is private (not retried)
- TikTokRegionError: Video geo-blocked (not retried)
- TikTokNetworkError: Network error after all retries exhausted
- TikTokRateLimitError: Rate limited after all retries exhausted
- TikTokExtractionError: Extraction error after all retries exhausted
-
- Example:
- async def update_status(attempt: int):
- emojis = ["👀", "🔄", "⏳"]
- await message.react([ReactionTypeEmoji(emoji=emojis[attempt - 1])])
-
- music_info = await client.music_with_retry(
- video_id,
- max_attempts=3,
- request_timeout=10.0,
- on_retry=update_status,
- )
- """
- last_error: Exception | None = None
+ """Extract music info with top-level retry logic and timeout."""
+ last_error: Optional[Exception] = None
for attempt in range(1, max_attempts + 1):
try:
- # Call status callback before each attempt
if on_retry:
await on_retry(attempt)
@@ -1562,8 +944,7 @@ async def update_status(attempt: int):
except asyncio.TimeoutError as e:
logger.warning(
- f"Attempt {attempt}/{max_attempts} timed out after "
- f"{request_timeout}s for music from video {video_id}"
+ f"Attempt {attempt}/{max_attempts} timed out for music {video_id}"
)
last_error = e
@@ -1573,34 +954,20 @@ async def update_status(attempt: int):
TikTokExtractionError,
) as e:
logger.warning(
- f"Attempt {attempt}/{max_attempts} failed for music from video {video_id}: {e}"
+ f"Attempt {attempt}/{max_attempts} failed for music {video_id}: {e}"
)
last_error = e
except (TikTokDeletedError, TikTokPrivateError, TikTokRegionError):
- # Permanent errors - don't retry, raise immediately
raise
except TikTokError:
- # Any other TikTok error - don't retry
raise
- # Exponential backoff with jitter (only if not last attempt)
if attempt < max_attempts:
- # Calculate delay: base_delay * 2^(attempt-1) = 1s, 2s, 4s...
delay = base_delay * (2 ** (attempt - 1))
- # Add jitter (±10%) to prevent thundering herd
- jitter = delay * 0.1 * (2 * random.random() - 1)
- delay = max(0.5, delay + jitter) # Minimum 0.5s delay
- logger.debug(
- f"Retry backoff: sleeping {delay:.2f}s before attempt {attempt + 1}"
- )
- await asyncio.sleep(delay)
-
- # All attempts exhausted
- logger.error(
- f"All {max_attempts} attempts failed for music from video {video_id}: {last_error}"
- )
+ jitter = delay * 0.1 * random.random()
+ await asyncio.sleep(delay + jitter)
if isinstance(last_error, asyncio.TimeoutError):
raise TikTokNetworkError(f"Request timed out after {max_attempts} attempts")
@@ -1608,9 +975,107 @@ async def update_status(attempt: int):
if last_error:
raise last_error
- raise TikTokExtractionError(
- f"Failed to extract music after {max_attempts} attempts"
- )
+ raise TikTokExtractionError(f"Failed after {max_attempts} attempts")
+
+ # -------------------------------------------------------------------------
+ # Public API: Image Download (for slideshows)
+ # -------------------------------------------------------------------------
+
+ async def download_image(self, image_url: str, video_info: VideoInfo) -> bytes:
+ """Download a slideshow image.
+
+ Uses the proxy stored in VideoInfo from the original video() call.
+
+ Args:
+ image_url: Direct URL to the image
+ video_info: VideoInfo from video() call (contains proxy info)
+
+ Returns:
+ Image bytes
+
+ Raises:
+ TikTokNetworkError: Download failed
+ """
+ # Use proxy from VideoInfo if available and not data_only_proxy
+ proxy = None
+ if not self.data_only_proxy and video_info._proxy:
+ proxy = video_info._proxy
+
+ try:
+ data, _ = await self._download_media_with_retry(
+ image_url, proxy, video_info.link
+ )
+ return data
+ except Exception as e:
+ raise TikTokNetworkError(f"Failed to download image: {e}") from e
+
+ async def detect_image_format(self, image_url: str, video_info: VideoInfo) -> str:
+ """Detect image format using HTTP Range request (first 20 bytes).
+
+ Args:
+ image_url: Direct URL to the image
+ video_info: VideoInfo from video() call
+
+ Returns:
+ File extension: ".jpg", ".webp", ".heic", or ".jpg" (default)
+ """
+ proxy = None
+ if not self.data_only_proxy and video_info._proxy:
+ proxy = video_info._proxy
+
+ session = await self._get_curl_session()
+ headers = self._get_headers(video_info.link)
+ headers["Range"] = "bytes=0-19"
+
+ try:
+ response = await session.get(
+ image_url,
+ headers=headers,
+ cookies=self._cookies,
+ proxy=proxy,
+ timeout=10,
+ allow_redirects=True,
+ )
+
+ if response.status_code in (200, 206):
+ return self._detect_format_from_bytes(response.content)
+ return ".heic"
+
+ except Exception as e:
+ logger.debug(f"Range request failed for {image_url}: {e}")
+ return ".heic"
+
+ @staticmethod
+ def _detect_format_from_bytes(data: bytes) -> str:
+ """Detect image format from magic bytes."""
+ if data.startswith(b"\xff\xd8\xff"):
+ return ".jpg"
+ elif data.startswith(b"RIFF") and len(data) >= 12 and data[8:12] == b"WEBP":
+ return ".webp"
+ elif len(data) >= 12 and (
+ data[4:12] == b"ftypheic" or data[4:12] == b"ftypmif1"
+ ):
+ return ".heic"
+ return ".jpg"
+
+ # -------------------------------------------------------------------------
+ # Legacy Methods (for backwards compatibility)
+ # -------------------------------------------------------------------------
+
+ async def get_video_id_from_mobile(self, link: str) -> Optional[str]:
+ """Extract video ID from mobile URL (legacy method)."""
+ try:
+ full_url = await self._resolve_short_url(link, self._get_initial_proxy())
+ return self._extract_video_id(full_url)
+ except Exception as e:
+ logger.error(f"Failed to get video ID from mobile link: {e}")
+ return None
+
+ async def get_video_id(self, link: str, is_mobile: bool) -> Optional[str]:
+ """Extract video ID from URL (legacy method)."""
+ if not is_mobile:
+ return self._extract_video_id(link)
+ return await self.get_video_id_from_mobile(link)
# Backwards compatibility alias
diff --git a/tiktok_api/exceptions.py b/tiktok_api/exceptions.py
index 704541c..146a2a7 100644
--- a/tiktok_api/exceptions.py
+++ b/tiktok_api/exceptions.py
@@ -47,3 +47,9 @@ class TikTokVideoTooLongError(TikTokError):
"""Video exceeds the maximum allowed duration."""
pass
+
+
+class TikTokInvalidLinkError(TikTokError):
+ """Invalid or unrecognized TikTok video link."""
+
+ pass
diff --git a/tiktok_api/models.py b/tiktok_api/models.py
index eb42db9..52cc3c9 100644
--- a/tiktok_api/models.py
+++ b/tiktok_api/models.py
@@ -2,11 +2,8 @@
from __future__ import annotations
-import logging
from dataclasses import dataclass, field
-from typing import Any, List, Optional, Union
-
-logger = logging.getLogger(__name__)
+from typing import List, Optional, Union
@dataclass
@@ -17,9 +14,9 @@ class VideoInfo:
type: Content type - "video" for videos, "images" for slideshows
data: Video bytes (for videos) or list of image URLs (for slideshows)
id: Unique TikTok video/post ID
- cover: Thumbnail/cover image URL
- width: Video width in pixels (None for slideshows)
- height: Video height in pixels (None for slideshows)
+ cover: Thumbnail/cover image URL (only for videos > 1 minute)
+ width: Video width in pixels (only for videos > 1 minute)
+ height: Video height in pixels (only for videos > 1 minute)
duration: Video duration in seconds (None for slideshows)
author: Author's username
link: Original TikTok link
@@ -37,53 +34,10 @@ class VideoInfo:
link: str
url: Optional[str] = None # Only present for videos
- # Download context for slideshows (set by TikTokClient).
- # Contains yt-dlp YoutubeDL instance and TikTok extractor with cookies/auth
- # already configured from the extraction phase. This allows image downloads
- # to use the same authentication context as the video info extraction.
- # Structure: {'ydl': YoutubeDL, 'ie': TikTokIE, 'referer_url': str}
- _download_context: Optional[dict[str, Any]] = field(default=None, repr=False)
-
- # Track whether close() was called to avoid double-close and __del__ warnings
- _closed: bool = field(default=False, repr=False)
-
- def close(self) -> None:
- """Close the download context and release resources.
-
- Call this method when you're done using the VideoInfo,
- especially for slideshows where the download context is kept alive
- for image downloads.
-
- This is idempotent - calling close() multiple times is safe.
- """
- if self._closed:
- return
-
- if self._download_context and "ydl" in self._download_context:
- try:
- self._download_context["ydl"].close()
- except Exception:
- pass # Ignore errors during cleanup
- self._download_context = None
-
- self._closed = True
-
- def __enter__(self) -> VideoInfo:
- """Enter context manager."""
- return self
-
- def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
- """Exit context manager and close resources."""
- self.close()
-
- def __del__(self) -> None:
- """Destructor - warn if resources were not explicitly closed."""
- if self._download_context is not None and not self._closed:
- logger.warning(
- f"VideoInfo(id={self.id}) was garbage collected without close() - "
- "potential resource leak. Call close() or use 'with' statement."
- )
- self.close()
+ # Proxy used for this video extraction (for slideshow image downloads)
+ # This allows image downloads to use the same proxy that successfully
+ # extracted the video info.
+ _proxy: Optional[str] = field(default=None, repr=False)
@property
def is_video(self) -> bool:
diff --git a/tiktok_api/proxy_manager.py b/tiktok_api/proxy_manager.py
index db835eb..3637911 100644
--- a/tiktok_api/proxy_manager.py
+++ b/tiktok_api/proxy_manager.py
@@ -2,8 +2,10 @@
import logging
import os
+import re
import threading
from typing import Optional
+from urllib.parse import quote
logger = logging.getLogger(__name__)
@@ -40,6 +42,33 @@ def __init__(self, proxy_file: str, include_host: bool = False):
self._rotation_lock = threading.Lock()
self._load_proxies(proxy_file, include_host)
+ def _encode_proxy_auth(self, proxy_url: str) -> str:
+ """URL-encode username and password in proxy URL."""
+ from urllib.parse import urlsplit, urlunsplit
+
+ try:
+ parts = urlsplit(proxy_url)
+ except Exception:
+ return proxy_url
+
+ if not parts.username and not parts.password:
+ return proxy_url
+
+ username = quote(parts.username or "", safe="")
+ password = quote(parts.password or "", safe="")
+
+ host = parts.hostname or ""
+ netloc = host
+ if parts.port:
+ netloc = f"{host}:{parts.port}"
+ if parts.username is not None:
+ if parts.password is None:
+ netloc = f"{username}@{netloc}"
+ else:
+ netloc = f"{username}:{password}@{netloc}"
+
+ return urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment))
+
def _load_proxies(self, file_path: str, include_host: bool) -> None:
"""Load proxies from file.
@@ -70,7 +99,9 @@ def _load_proxies(self, file_path: str, include_host: bool) -> None:
# Skip empty lines and comments
if not line or line.startswith("#"):
continue
- self._proxies.append(line)
+ # URL-encode authentication credentials
+ encoded_proxy = self._encode_proxy_auth(line)
+ self._proxies.append(encoded_proxy)
except Exception as e:
logger.error(f"Failed to load proxy file {file_path}: {e}")