diff --git a/.env.example b/.env.example index 83c8fc8..b855037 100644 --- a/.env.example +++ b/.env.example @@ -16,6 +16,9 @@ DAILY_STATS_MESSAGE_ID=24 BOTSTAT=abcdefg12345 MONETAG_URL=https://example.com/your-monetag-link/ +# Logging settings (optional) +# LOG_LEVEL=INFO # Options: DEBUG, INFO, WARNING, ERROR, CRITICAL + # yt-dlp settings (optional) # YTDLP_COOKIES=cookies.txt diff --git a/.gitignore b/.gitignore index d76896c..f0d4b46 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ config.ini sqlite*.db LICENSE.md cookies.txt +proxies.txt #extensions *.log diff --git a/data/config.py b/data/config.py index 8443bfa..cbbcdc8 100644 --- a/data/config.py +++ b/data/config.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging import os from json import loads as json_loads from pathlib import Path @@ -44,6 +45,21 @@ def _parse_json_list(key: str) -> list[int]: return [] +def _parse_log_level(key: str, default: str = "INFO") -> int: + """Parse an environment variable as a logging level, returning default if unset/invalid.""" + level_map = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL, + } + + default_level = level_map.get(default.upper().strip(), logging.INFO) + value = os.getenv(key, default).upper().strip() + return level_map.get(value, default_level) + + class BotConfig(TypedDict): """Type definition for bot configuration.""" @@ -103,6 +119,18 @@ class PerformanceConfig(TypedDict): max_video_duration: int # Maximum video duration in seconds (0 = no limit) +class TikTokConfig(TypedDict): + """Type definition for TikTok extraction configuration.""" + + cookies_file: str # Path to Netscape-format cookies file (optional) + + +class LoggingConfig(TypedDict): + """Type definition for logging configuration.""" + + log_level: int # Logging level (e.g., logging.INFO, logging.DEBUG) + + class Config(TypedDict): """Type definition for the main configuration.""" @@ -112,6 +140,8 @@ class Config(TypedDict): queue: QueueConfig proxy: ProxyConfig performance: PerformanceConfig + tiktok: TikTokConfig + logging: LoggingConfig config: Config = { @@ -159,6 +189,12 @@ class Config(TypedDict): ), "max_video_duration": _parse_int_env("MAX_VIDEO_DURATION", 1800), # 30 minutes }, + "tiktok": { + "cookies_file": os.getenv("YTDLP_COOKIES", ""), + }, + "logging": { + "log_level": _parse_log_level("LOG_LEVEL", "INFO"), + }, } admin_ids: list[int] = config["bot"]["admin_ids"] diff --git a/data/loader.py b/data/loader.py index 3d3eb39..c6932ef 100644 --- a/data/loader.py +++ b/data/loader.py @@ -11,21 +11,32 @@ from data.config import config from data.database import init_db, initialize_database_components -logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)-5.5s] %(message)s", - handlers=[ - # logging.FileHandler("bot.log"), - logging.StreamHandler() - ]) -logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) -logging.getLogger('apscheduler.scheduler').propagate = False -logging.getLogger('aiogram').setLevel(logging.WARNING) - -local_server = AiohttpSession(api=TelegramAPIServer.from_base(config["bot"]["tg_server"])) -bot = Bot(token=config["bot"]["token"], session=local_server, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) +logging.basicConfig( + level=config["logging"]["log_level"], + format="%(asctime)s [%(levelname)-5.5s] %(message)s", + handlers=[ + # logging.FileHandler("bot.log"), + logging.StreamHandler() + ], +) +logging.getLogger("apscheduler.executors.default").setLevel(logging.WARNING) +logging.getLogger("apscheduler.scheduler").propagate = False +logging.getLogger("aiogram").setLevel(logging.WARNING) + +local_server = AiohttpSession( + api=TelegramAPIServer.from_base(config["bot"]["tg_server"]) +) +bot = Bot( + token=config["bot"]["token"], + session=local_server, + default=DefaultBotProperties(parse_mode=ParseMode.HTML), +) dp = Dispatcher(storage=MemoryStorage()) -scheduler = AsyncIOScheduler(timezone="America/Los_Angeles", job_defaults={"coalesce": True}) +scheduler = AsyncIOScheduler( + timezone="America/Los_Angeles", job_defaults={"coalesce": True} +) async def setup_db(db_url: str): diff --git a/data/locale/ar.json b/data/locale/ar.json index c63610e..4da392b 100644 --- a/data/locale/ar.json +++ b/data/locale/ar.json @@ -38,6 +38,7 @@ "error_deleted": "تم حذف هذا الفيديو من قبل المنشئ.", "error_private": "هذا الفيديو خاص ولا يمكن الوصول إليه.", "error_region": "هذا الفيديو غير متوفر في منطقتك.", + "error_invalid_link": "رابط الفيديو غير صالح.\nيرجى التحقق من الرابط والمحاولة مرة أخرى.", "error_network": "حدث خطأ في الشبكة.\nيرجى المحاولة مرة أخرى.", "error_rate_limit": "طلبات كثيرة جدًا.\nيرجى الانتظار قليلاً والمحاولة مرة أخرى.", "error_too_long": "هذا الفيديو طويل جدًا.\nالحد الأقصى المسموح به هو 30 دقيقة.", diff --git a/data/locale/en.json b/data/locale/en.json index ec370c4..6f17478 100644 --- a/data/locale/en.json +++ b/data/locale/en.json @@ -38,6 +38,7 @@ "error_deleted": "This video has been deleted by the creator.", "error_private": "This video is private and cannot be accessed.", "error_region": "This video is not available in your region.", + "error_invalid_link": "Invalid video link.\nPlease check the link and try again.", "error_network": "Network error occurred.\nPlease try again.", "error_rate_limit": "Too many requests.\nPlease wait a moment and try again.", "error_too_long": "This video is too long.\nMaximum allowed duration is 30 minutes.", diff --git a/data/locale/hi.json b/data/locale/hi.json index d9c8729..87a965d 100644 --- a/data/locale/hi.json +++ b/data/locale/hi.json @@ -38,6 +38,7 @@ "error_deleted": "यह वीडियो निर्माता द्वारा हटा दिया गया है।", "error_private": "यह वीडियो निजी है और एक्सेस नहीं किया जा सकता।", "error_region": "यह वीडियो आपके क्षेत्र में उपलब्ध नहीं है।", + "error_invalid_link": "अमान्य वीडियो लिंक।\nकृपया लिंक जांचें और फिर से प्रयास करें।", "error_network": "नेटवर्क त्रुटि हुई।\nकृपया फिर से प्रयास करें।", "error_rate_limit": "बहुत अधिक अनुरोध।\nकृपया कुछ देर प्रतीक्षा करें और फिर से प्रयास करें।", "error_too_long": "यह वीडियो बहुत लंबा है।\nअधिकतम अनुमत अवधि 30 मिनट है।", diff --git a/data/locale/id.json b/data/locale/id.json index 6097c29..e833703 100644 --- a/data/locale/id.json +++ b/data/locale/id.json @@ -38,6 +38,7 @@ "error_deleted": "Video ini sudah dihapus oleh pembuatnya.", "error_private": "Video ini privat dan tidak bisa diakses.", "error_region": "Video ini tidak tersedia di wilayahmu.", + "error_invalid_link": "Tautan video tidak valid.\nSilakan periksa tautan dan coba lagi.", "error_network": "Terjadi kesalahan jaringan.\nSilakan coba lagi.", "error_rate_limit": "Terlalu banyak permintaan.\nSilakan tunggu sebentar dan coba lagi.", "error_too_long": "Video ini terlalu panjang.\nDurasi maksimum yang diizinkan adalah 30 menit.", diff --git a/data/locale/ru.json b/data/locale/ru.json index 3c27903..47efd14 100644 --- a/data/locale/ru.json +++ b/data/locale/ru.json @@ -38,6 +38,7 @@ "error_deleted": "Это видео было удалено автором.", "error_private": "Это видео приватное и недоступно.", "error_region": "Это видео недоступно в вашем регионе.", + "error_invalid_link": "Неверная ссылка на видео.\nПожалуйста, проверьте ссылку и попробуйте снова.", "error_network": "Произошла сетевая ошибка.\nПожалуйста, попробуйте снова.", "error_rate_limit": "Слишком много запросов.\nПожалуйста, подождите немного и попробуйте снова.", "error_too_long": "Это видео слишком длинное.\nМаксимальная продолжительность — 30 минут.", diff --git a/data/locale/so.json b/data/locale/so.json index 5c2e242..f3a7636 100644 --- a/data/locale/so.json +++ b/data/locale/so.json @@ -38,6 +38,7 @@ "error_deleted": "Fiidiyowgan waxaa tirtiray abuurahiisa.", "error_private": "Fiidiyowgan waa qarsoon yahay mana la heli karo.", "error_region": "Fiidiyowgan laguma heli karo aagaaga.", + "error_invalid_link": "Link-a fiidiyowgu ma shaqeynayo.\nFadlan hubi link-a oo isku day mar kale.", "error_network": "Khalad shabakad ayaa dhacay.\nFadlan isku day mar kale.", "error_rate_limit": "Codsiyo badan ayaa la sameeyay.\nFadlan sug wakhti yar oo isku day mar kale.", "error_too_long": "Fiidiyowgan waa dheer yahay.\nMudda ugu badan ee la ogol yahay waa 30 daqiiqo.", diff --git a/data/locale/uk.json b/data/locale/uk.json index c68e610..632b30a 100644 --- a/data/locale/uk.json +++ b/data/locale/uk.json @@ -38,6 +38,7 @@ "error_deleted": "Це відео було видалено автором.", "error_private": "Це відео приватне і недоступне.", "error_region": "Це відео недоступне у вашому регіоні.", + "error_invalid_link": "Невірне посилання на відео.\nБудь ласка, перевірте посилання та спробуйте ще раз.", "error_network": "Сталася мережева помилка.\nБудь ласка, спробуйте ще раз.", "error_rate_limit": "Занадто багато запитів.\nБудь ласка, зачекайте трохи і спробуйте знову.", "error_too_long": "Це відео занадто довге.\nМаксимальна тривалість — 30 хвилин.", diff --git a/data/locale/vi.json b/data/locale/vi.json index dfc3864..11b305f 100644 --- a/data/locale/vi.json +++ b/data/locale/vi.json @@ -38,6 +38,7 @@ "error_deleted": "Video này đã bị người tạo xóa.", "error_private": "Video này ở chế độ riêng tư và không thể truy cập.", "error_region": "Video này không khả dụng ở khu vực của bạn.", + "error_invalid_link": "Liên kết video không hợp lệ.\nVui lòng kiểm tra liên kết và thử lại.", "error_network": "Đã xảy ra lỗi mạng.\nVui lòng thử lại.", "error_rate_limit": "Quá nhiều yêu cầu.\nVui lòng đợi một chút và thử lại.", "error_too_long": "Video này quá dài.\nThời lượng tối đa cho phép là 30 phút.", diff --git a/handlers/get_inline.py b/handlers/get_inline.py index 3920355..474b937 100644 --- a/handlers/get_inline.py +++ b/handlers/get_inline.py @@ -129,9 +129,9 @@ async def update_inline_status(attempt: int): await asyncio.sleep(0.5) try: - # Use queue with bypass_user_limit=True for inline downloads - # Inline downloads bypass the per-user queue limit - async with queue.info_queue(user_id, bypass_user_limit=True) as acquired: + # Use queue with bypass=True for inline downloads + # Inline downloads bypass the per-user queue entirely + async with queue.user_queue(user_id, bypass=True) as acquired: if not acquired: # This shouldn't happen with bypass, but handle anyway await bot.edit_message_text( @@ -148,8 +148,6 @@ async def update_inline_status(attempt: int): ) if video_info.is_slideshow: # Process image - # Clean up resources before returning (close YDL context) - video_info.close() return await bot.edit_message_text( inline_message_id=message_id, text=locale[lang]["only_video_supported"] ) @@ -170,10 +168,6 @@ async def update_inline_status(attempt: int): full_name=full_name, ) - # Clean up video_info resources (videos already closed in video() method, - # but call close() for safety - it's idempotent) - video_info.close() - try: # Try to write log into database # Write log into database await add_video( diff --git a/handlers/get_video.py b/handlers/get_video.py index 48a9f07..955c93d 100644 --- a/handlers/get_video.py +++ b/handlers/get_video.py @@ -2,7 +2,7 @@ from aiogram import Router, F from aiogram.exceptions import TelegramBadRequest -from aiogram.types import Message, ReactionTypeEmoji, CallbackQuery +from aiogram.types import Message, ReactionTypeEmoji from aiogram.utils.keyboard import InlineKeyboardBuilder from data.config import locale, second_ids, monetag_url, config @@ -25,19 +25,6 @@ # Note: Must use valid Telegram reaction emojis (🔄 and ⏳ are not valid) RETRY_EMOJIS = ["👀", "🤔", "🙏"] -# Callback data prefix for retry button -RETRY_CALLBACK_PREFIX = "retry_video" - - -def try_again_button(lang: str): - """Create a 'Try Again' button for queue full error.""" - keyb = InlineKeyboardBuilder() - keyb.button( - text=locale[lang]["try_again_button"], - callback_data=RETRY_CALLBACK_PREFIX, - ) - return keyb.as_markup() - @video_router.message(F.text) async def send_tiktok_video(message: Message): @@ -63,12 +50,12 @@ async def send_tiktok_video(message: Message): else: # Set lang and file mode if in DB lang, file_mode = settings - # Get queue manager and retry config + # Get queue manager queue = QueueManager.get_instance() retry_config = config["queue"] try: - # Check if link is valid + # Check if link is valid (BEFORE queue - quick check) video_link, is_mobile = await api.regex_check(message.text) # If not valid if video_link is None: @@ -77,13 +64,16 @@ async def send_tiktok_video(message: Message): await message.reply(locale[lang]["link_error"]) return - # Check per-user queue limit before proceeding - user_queue_count = queue.get_user_queue_count(message.chat.id) - if user_queue_count >= retry_config["max_user_queue_size"]: + # Check queue limit BEFORE showing reaction + if ( + queue.get_user_queue_count(message.chat.id) + >= retry_config["max_user_queue_size"] + ): if not group_chat: await message.reply( - locale[lang]["error_queue_full"].format(user_queue_count), - reply_markup=try_again_button(lang), + locale[lang]["error_queue_full"].format( + queue.get_user_queue_count(message.chat.id) + ) ) return @@ -114,18 +104,22 @@ async def update_retry_status(attempt: int): except Exception as e: logging.warning(f"Failed to update retry emoji to {emoji}: {e}") - # Acquire info queue slot with per-user limit - async with queue.info_queue(message.chat.id) as acquired: + # ENTIRE operation inside queue - waits silently for turn + async with queue.user_queue(message.chat.id) as acquired: if not acquired: - # User limit exceeded (shouldn't happen due to pre-check, but handle anyway) + # Queue full (race condition - another request got in first) if status_message: await status_message.delete() + else: + try: + await message.react([]) + except TelegramBadRequest: + pass if not group_chat: await message.reply( locale[lang]["error_queue_full"].format( queue.get_user_queue_count(message.chat.id) - ), - reply_markup=try_again_button(lang), + ) ) return @@ -150,19 +144,16 @@ async def update_retry_status(attempt: int): await message.reply(get_error_message(e, lang)) return - # Successfully got video info - show processing emoji - if not status_message: - try: - await message.react( - [ReactionTypeEmoji(emoji="👨‍💻")], disable_notification=True - ) - except TelegramBadRequest: - logging.debug("Failed to set processing reaction") + # Successfully got video info - show processing emoji + if not status_message: + try: + await message.react( + [ReactionTypeEmoji(emoji="👨‍💻")], disable_notification=True + ) + except TelegramBadRequest: + logging.debug("Failed to set processing reaction") - # Use try/finally to ensure video_info resources are cleaned up - # (especially download context for slideshows) - try: - # Send video/images (no global send queue - per-user limit only) + # Send video/images (INSIDE queue - next request waits) if video_info.is_slideshow: # Process images # Send upload image action await bot.send_chat_action( @@ -260,9 +251,6 @@ async def update_retry_status(attempt: int): except Exception as e: logging.error("Can't write into database") logging.error(e) - finally: - # Clean up video_info resources (closes YDL context for slideshows) - video_info.close() except Exception as e: # If something went wrong error_text = error_catch(e) @@ -283,32 +271,3 @@ async def update_retry_status(attempt: int): logging.debug("Failed to update UI during error cleanup") except Exception as cleanup_err: logging.warning(f"Unexpected error during cleanup: {cleanup_err}") - - -@video_router.callback_query(F.data == RETRY_CALLBACK_PREFIX) -async def handle_retry_callback(callback: CallbackQuery): - """Handle 'Try Again' button click for queue full error.""" - # Ensure callback.message exists and is accessible - if not callback.message or not hasattr(callback.message, "reply_to_message"): - await callback.answer("Message not accessible", show_alert=True) - return - - # Get the original message that contains the TikTok link - original_message = callback.message.reply_to_message - - if not original_message or not original_message.text: - await callback.answer("Original message not found", show_alert=True) - return - - # Delete the error message with the button - try: - if hasattr(callback.message, "delete"): - await callback.message.delete() - except TelegramBadRequest: - logging.debug("Retry button message already deleted") - - # Answer the callback to remove loading state - await callback.answer() - - # Re-process the original message - await send_tiktok_video(original_message) diff --git a/main.py b/main.py index 2698dd2..f34dfd3 100644 --- a/main.py +++ b/main.py @@ -18,13 +18,8 @@ async def main() -> None: await setup_db(config["bot"]["db_url"]) - # Configure TikTokClient executor size for high throughput - # Must be called before any TikTokClient is instantiated - TikTokClient.set_executor_size(config["performance"]["thread_pool_size"]) logging.info( - f"TikTokClient configured: executor={config['performance']['thread_pool_size']} workers, " - f"aiohttp_pool={config['performance']['aiohttp_pool_size']}, " - f"limit_per_host={config['performance']['aiohttp_limit_per_host']}" + f"TikTokClient configured: curl_pool={config['performance']['curl_pool_size']}" ) # Initialize proxy manager if configured @@ -35,6 +30,9 @@ async def main() -> None: ) logging.info("Proxy manager initialized") + # Initialize TikTok client session early to avoid race conditions + await TikTokClient.initialize_session() + scheduler.start() dp.include_routers( user_router, @@ -52,12 +50,12 @@ async def main() -> None: await dp.start_polling(bot) finally: # Cleanup shared resources on shutdown - logging.info("Shutting down: cleaning up TikTokClient resources...") - await TikTokClient.close_curl_session() # curl_cffi session for media downloads - await TikTokClient.close_connector() # aiohttp connector for URL resolution - TikTokClient.shutdown_executor() + logging.info("Shutting down: cleaning up resources...") + await ( + TikTokClient.close_curl_session() + ) # curl_cffi session for all TikTok operations await close_http_session() # aiohttp session for thumbnail/cover downloads - logging.info("TikTokClient resources cleaned up") + logging.info("Resources cleaned up") if __name__ == "__main__": diff --git a/misc/queue_manager.py b/misc/queue_manager.py index b685c6a..185c0f4 100644 --- a/misc/queue_manager.py +++ b/misc/queue_manager.py @@ -1,4 +1,4 @@ -"""Global queue manager for controlling concurrent operations.""" +"""Sequential per-user queue manager.""" from __future__ import annotations @@ -14,138 +14,113 @@ class QueueManager: """ - Singleton queue manager for controlling concurrent operations. + Sequential per-user queue with max depth. - Features: - - Per-user tracking for info queue (limits concurrent requests per user) - - Bypass option for inline downloads + Each user's requests are processed one at a time (sequential). + If user exceeds max_queue_size, new requests are rejected. Usage: queue = QueueManager.get_instance() - # For regular video downloads (with per-user limit): - async with queue.info_queue(user_id) as acquired: + async with queue.user_queue(user_id) as acquired: if not acquired: - # User limit exceeded, show error + # Queue full, show error return - video_info = await api.video_with_retry(...) - - # For inline downloads (bypass per-user limit): - async with queue.info_queue(user_id, bypass_user_limit=True) as acquired: - video_info = await api.video_with_retry(...) + # Process video (fetch + send) - next request waits """ _instance: QueueManager | None = None - def __init__(self, max_user_queue: int): - """ - Initialize the queue manager. - - Args: - max_user_queue: Maximum videos per user in info queue - """ - self.max_user_queue = max_user_queue - self._user_info_counts: dict[int, int] = {} - self._lock = asyncio.Lock() + def __init__(self, max_queue_size: int = 3): + self.max_queue_size = max_queue_size + self._user_locks: dict[int, asyncio.Lock] = {} + self._user_queue_counts: dict[int, int] = {} + self._dict_lock = asyncio.Lock() # Protects dicts - logger.info(f"QueueManager initialized: max_user_queue={max_user_queue}") + logger.info(f"QueueManager initialized: max_queue_size={max_queue_size}") @classmethod def get_instance(cls) -> QueueManager: """Get or create the singleton instance.""" if cls._instance is None: - queue_config = config["queue"] cls._instance = cls( - max_user_queue=queue_config["max_user_queue_size"], + max_queue_size=config["queue"]["max_user_queue_size"], ) return cls._instance @classmethod def reset_instance(cls) -> None: - """Reset the singleton instance (useful for testing).""" + """Reset singleton (for testing).""" cls._instance = None def get_user_queue_count(self, user_id: int) -> int: """Get current queue count for a user.""" - return self._user_info_counts.get(user_id, 0) - - async def acquire_info_for_user( - self, user_id: int, bypass_user_limit: bool = False - ) -> bool: - """ - Acquire info slot for a user. - - Args: - user_id: Telegram user/chat ID - bypass_user_limit: If True, skip per-user limit check (for inline) - - Returns: - True if acquired successfully, False if user limit exceeded - """ - async with self._lock: - if not bypass_user_limit: - current_count = self._user_info_counts.get(user_id, 0) - if current_count >= self.max_user_queue: - logger.debug( - f"User {user_id} rejected: {current_count}/{self.max_user_queue} in queue" - ) - return False - - # Increment user count - self._user_info_counts[user_id] = self._user_info_counts.get(user_id, 0) + 1 - - logger.debug( - f"User {user_id} acquired info slot " - f"(user_count={self._user_info_counts.get(user_id, 0)})" - ) - return True - - async def release_info_for_user(self, user_id: int) -> None: - """Release info slot for a user. - - This method is async to properly acquire the lock and prevent - race conditions when multiple coroutines release concurrently. - """ - async with self._lock: - if user_id in self._user_info_counts: - self._user_info_counts[user_id] -= 1 - if self._user_info_counts[user_id] <= 0: - del self._user_info_counts[user_id] - - logger.debug( - f"User {user_id} released info slot " - f"(user_count={self._user_info_counts.get(user_id, 0)})" - ) + return self._user_queue_counts.get(user_id, 0) + + async def _get_user_lock(self, user_id: int) -> asyncio.Lock: + """Get or create lock for user.""" + async with self._dict_lock: + if user_id not in self._user_locks: + self._user_locks[user_id] = asyncio.Lock() + return self._user_locks[user_id] + + async def _increment_count(self, user_id: int) -> bool: + """Increment queue count. Returns False if at max.""" + async with self._dict_lock: + current = self._user_queue_counts.get(user_id, 0) + if current >= self.max_queue_size: + return False + self._user_queue_counts[user_id] = current + 1 + logger.debug(f"User {user_id} queue: {current + 1}/{self.max_queue_size}") + return True + + async def _decrement_count(self, user_id: int) -> None: + """Decrement queue count and cleanup lock if no longer needed.""" + async with self._dict_lock: + if user_id in self._user_queue_counts: + self._user_queue_counts[user_id] -= 1 + if self._user_queue_counts[user_id] <= 0: + del self._user_queue_counts[user_id] + # Clean up lock when user has no active requests + # This prevents unbounded memory growth with many unique users + if user_id in self._user_locks: + del self._user_locks[user_id] + logger.debug( + f"User {user_id} queue: {self._user_queue_counts.get(user_id, 0)}/{self.max_queue_size}" + ) @asynccontextmanager - async def info_queue( - self, user_id: int, bypass_user_limit: bool = False + async def user_queue( + self, user_id: int, bypass: bool = False ) -> AsyncGenerator[bool, None]: """ - Context manager for info queue with per-user limiting. + Sequential queue with max depth. Args: user_id: Telegram user/chat ID - bypass_user_limit: If True, skip per-user limit check (for inline) + bypass: If True, skip queue entirely (for inline mode) Yields: - True if acquired successfully, False if user limit exceeded - - Usage: - async with queue.info_queue(user_id) as acquired: - if not acquired: - await message.reply("Queue full, please wait...") - return - # Do work... + True if acquired, False if queue full """ - acquired = await self.acquire_info_for_user(user_id, bypass_user_limit) + if bypass: + yield True + return + + # Check if queue is full BEFORE waiting + if not await self._increment_count(user_id): + yield False # Queue full + return + + # Get user's lock and wait for turn + lock = await self._get_user_lock(user_id) try: - yield acquired + async with lock: + yield True # Acquired, process request finally: - if acquired: - await self.release_info_for_user(user_id) + await self._decrement_count(user_id) @property def active_users_count(self) -> int: - """Number of users currently with items in the info queue.""" - return len(self._user_info_counts) + """Number of users with items in queue.""" + return len(self._user_queue_counts) diff --git a/misc/video_types.py b/misc/video_types.py index a300061..ca45642 100644 --- a/misc/video_types.py +++ b/misc/video_types.py @@ -27,6 +27,7 @@ TikTokRateLimitError, TikTokRegionError, TikTokExtractionError, + TikTokInvalidLinkError, TikTokVideoTooLongError, VideoInfo, MusicInfo, @@ -212,6 +213,8 @@ def get_error_message(error: TikTokError, lang: str) -> str: return lang_dict.get("error_deleted", "This video has been deleted.") elif isinstance(error, TikTokPrivateError): return lang_dict.get("error_private", "This video is private.") + elif isinstance(error, TikTokInvalidLinkError): + return lang_dict.get("error_invalid_link", "Invalid video link.") elif isinstance(error, TikTokNetworkError): return lang_dict.get("error_network", "Network error occurred.") elif isinstance(error, TikTokRateLimitError): diff --git a/tiktok_api/__init__.py b/tiktok_api/__init__.py index 240ab14..41ca412 100644 --- a/tiktok_api/__init__.py +++ b/tiktok_api/__init__.py @@ -1,7 +1,7 @@ """TikTok API client for extracting video and music information. This module provides a clean interface to extract TikTok video/slideshow data -and music information using yt-dlp internally. +and music information using curl_cffi with browser impersonation. Example: >>> from tiktok_api import TikTokClient, ProxyManager, VideoInfo, TikTokDeletedError @@ -25,6 +25,7 @@ TikTokDeletedError, TikTokError, TikTokExtractionError, + TikTokInvalidLinkError, TikTokNetworkError, TikTokPrivateError, TikTokRateLimitError, @@ -51,5 +52,6 @@ "TikTokRateLimitError", "TikTokRegionError", "TikTokExtractionError", + "TikTokInvalidLinkError", "TikTokVideoTooLongError", ] diff --git a/tiktok_api/client.py b/tiktok_api/client.py index 03ba449..fece9ae 100644 --- a/tiktok_api/client.py +++ b/tiktok_api/client.py @@ -1,35 +1,27 @@ """TikTok API client for extracting video and music information.""" import asyncio +import http.cookiejar +import json import logging import os import random import re -import threading -from concurrent.futures import ThreadPoolExecutor -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Tuple - -# Type alias for download progress callback: (bytes_downloaded, total_bytes or None) -ProgressCallback = Callable[[int, Optional[int]], None] - -import aiohttp -from aiohttp import TCPConnector, ClientTimeout -import yt_dlp - -# Dynamic import of yt-dlp bypass mechanisms (updates automatically with yt-dlp) -from yt_dlp.utils import std_headers as YTDLP_STD_HEADERS +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional +from urllib.parse import urlparse # curl_cffi for browser impersonation (TLS fingerprint bypass) import curl_cffi from curl_cffi.requests import AsyncSession as CurlAsyncSession from curl_cffi import CurlError -# Import yt-dlp's browser targets for dynamic impersonation -# This ensures impersonation targets update automatically with yt-dlp +# Static imports from yt-dlp - only used for headers and browser targets +# These are static values, no yt-dlp runtime execution +from yt_dlp.utils import std_headers as YTDLP_STD_HEADERS + try: from yt_dlp.networking._curlcffi import BROWSER_TARGETS, _TARGETS_COMPAT_LOOKUP except ImportError: - # Fallback if yt-dlp structure changes or curl_cffi not available during import BROWSER_TARGETS = {} _TARGETS_COMPAT_LOOKUP = {} @@ -37,6 +29,7 @@ TikTokDeletedError, TikTokError, TikTokExtractionError, + TikTokInvalidLinkError, TikTokNetworkError, TikTokPrivateError, TikTokRateLimitError, @@ -50,139 +43,111 @@ logger = logging.getLogger(__name__) -# Regex for extracting video ID from redirected URLs (used by legacy get_id functions) -_redirect_regex = re.compile(r"https?://[^\s]+tiktok\.com/[^\s]+?/([0-9]+)") +# Maximum retries for each operation step +MAX_RETRIES = 3 + + +def _strip_proxy_auth(proxy_url: Optional[str]) -> str: + """Strip authentication info from proxy URL for safe logging.""" + if proxy_url is None: + return "direct connection" + + match = re.match(r"^((?:https?|socks5h?|socks4a?)://)(?:[^@]+@)?(.+)$", proxy_url) + if match: + scheme, host_port = match.groups() + return f"{scheme}{host_port}" + + return proxy_url class TikTokClient: """Client for extracting TikTok video and music information. - This client uses yt-dlp internally to extract video/slideshow data and music - from TikTok URLs. It supports both regular videos and slideshows (image posts). - - All media downloads (video, audio, images) use curl_cffi with browser - impersonation (TLS fingerprint spoofing) derived from yt-dlp's BROWSER_TARGETS. - This automatically updates when you update yt-dlp, ensuring compatibility - with TikTok's anti-bot detection. + Uses curl_cffi with browser impersonation for all TikTok operations. + Proxies are sticky within a single video() or music() call - only rotated on retry. Args: - proxy_manager: Optional ProxyManager instance for round-robin proxy rotation. - If provided, each request will use the next proxy in rotation. - data_only_proxy: If True, proxy is used only for API extraction, not for - media downloads. Defaults to False. - cookies: Optional path to a Netscape-format cookies file (e.g., exported from browser). - If not provided, uses YTDLP_COOKIES env var. If the file doesn't exist, - a warning is logged and cookies are not used. - aiohttp_pool_size: Total connection pool size for async downloads. Default: 200. - aiohttp_limit_per_host: Per-host connection limit. Default: 50. + proxy_manager: Optional ProxyManager for proxy rotation. + data_only_proxy: If True, proxy is used only for API, not for media downloads. + cookies: Optional path to Netscape-format cookies file. + max_video_duration: Maximum video duration in seconds (default from config). + Videos longer than this raise TikTokVideoTooLongError. + long_video_threshold: Duration threshold in seconds (default: 60 = 1 min). + Videos longer than this include metadata (thumbnail, dimensions) in VideoInfo. Example: >>> from tiktok_api import TikTokClient, ProxyManager >>> proxy_manager = ProxyManager.initialize("proxies.txt", include_host=True) >>> client = TikTokClient(proxy_manager=proxy_manager, data_only_proxy=True) >>> video_info = await client.video("https://www.tiktok.com/@user/video/123") - >>> print(video_info.author) - >>> print(video_info.duration) - - # With cookies for authenticated requests: - >>> client = TikTokClient(cookies="cookies.txt") """ - # Configurable executor - call set_executor_size() before first use - _executor: Optional[ThreadPoolExecutor] = None - _executor_lock = threading.Lock() - _executor_size: int = 128 # Default, configurable via set_executor_size() - - _aiohttp_connector: Optional[TCPConnector] = None - _connector_lock = threading.Lock() - - # curl_cffi session for browser-impersonated media downloads + # Shared curl_cffi session (class-level singleton) _curl_session: Optional[CurlAsyncSession] = None - _curl_session_lock = threading.Lock() _impersonate_target: Optional[str] = None + _session_lock: Optional[asyncio.Lock] = None @classmethod - def _get_impersonate_target(cls) -> str: - """Get the best impersonation target from yt-dlp's BROWSER_TARGETS. - - Uses the same priority as yt-dlp: - 1. Prioritize desktop over mobile (non-ios, non-android) - 2. Prioritize Chrome > Safari > Firefox > Edge > Tor - 3. Prioritize newest version + def _get_session_lock(cls) -> asyncio.Lock: + """Get or create session lock. Safe because Lock() creation is atomic.""" + if cls._session_lock is None: + cls._session_lock = asyncio.Lock() + return cls._session_lock - This ensures the impersonation target updates automatically when you - update yt-dlp, without any hardcoded values. - - Returns: - curl_cffi-compatible impersonate string (e.g., "chrome136") - """ - import itertools - - # Get curl_cffi version as tuple for comparison + @classmethod + def _get_impersonate_target(cls) -> str: + """Get the best impersonation target from yt-dlp's BROWSER_TARGETS.""" try: curl_cffi_version = tuple( int(x) for x in curl_cffi.__version__.split(".")[:2] ) except (ValueError, AttributeError): - curl_cffi_version = (0, 9) # Minimum supported version + curl_cffi_version = (0, 9) - # Collect all available targets for our curl_cffi version available_targets: dict[str, Any] = {} for version, targets in BROWSER_TARGETS.items(): if curl_cffi_version >= version: available_targets.update(targets) if not available_targets: - # Fallback to a common target if BROWSER_TARGETS is empty - logger.warning( - "No BROWSER_TARGETS available from yt-dlp, using 'chrome' fallback" - ) + logger.warning("No BROWSER_TARGETS available, using 'chrome' fallback") return "chrome" - # Sort by yt-dlp's priority (same logic as _curlcffi.py) - # This ensures we pick the same target yt-dlp would use + # Sort by yt-dlp's priority (desktop > mobile, Chrome > Safari > Firefox) sorted_targets = sorted( available_targets.items(), key=lambda x: ( - # deprioritize mobile targets since they give very different behavior x[1].os not in ("ios", "android"), - # prioritize tor < edge < firefox < safari < chrome ("tor", "edge", "firefox", "safari", "chrome").index(x[1].client) if x[1].client in ("tor", "edge", "firefox", "safari", "chrome") else -1, - # prioritize newest version float(x[1].version) if x[1].version else 0, - # group by os name x[1].os or "", ), reverse=True, ) - # Get the best target name best_name = sorted_targets[0][0] - # Apply compatibility lookup for older curl_cffi versions if curl_cffi_version < (0, 11): best_name = _TARGETS_COMPAT_LOOKUP.get(best_name, best_name) - logger.debug( - f"Selected impersonation target: {best_name} " - f"(curl_cffi {curl_cffi.__version__})" - ) + logger.debug(f"Selected impersonation target: {best_name}") return best_name @classmethod - def _get_curl_session(cls) -> CurlAsyncSession: - """Get or create shared curl_cffi AsyncSession with browser impersonation. + async def _get_curl_session(cls) -> CurlAsyncSession: + """Get or create shared curl_cffi AsyncSession (async-safe). - The session uses yt-dlp's BROWSER_TARGETS to select the best impersonation - target, ensuring TLS fingerprint matches a real browser. - - Pool size is configurable via CURL_POOL_SIZE environment variable. + Uses double-checked locking to ensure only one session is created + even under concurrent first-use from multiple coroutines. """ - with cls._curl_session_lock: - # Check if session needs to be created - # Note: CurlAsyncSession doesn't have is_closed, we track via _curl_session being None + # Fast path - already initialized + if cls._curl_session is not None: + return cls._curl_session + + # Slow path - acquire lock and double-check + async with cls._get_session_lock(): if cls._curl_session is None: from data.config import config @@ -191,19 +156,30 @@ def _get_curl_session(cls) -> CurlAsyncSession: cls._impersonate_target = cls._get_impersonate_target() cls._curl_session = CurlAsyncSession( - impersonate=cls._impersonate_target, + impersonate=cls._impersonate_target, # type: ignore[arg-type] max_clients=pool_size, ) logger.info( - f"Created curl_cffi session with impersonate={cls._impersonate_target}, " + f"Created curl_cffi session: impersonate={cls._impersonate_target}, " f"max_clients={pool_size}" ) - return cls._curl_session + return cls._curl_session + + @classmethod + async def initialize_session(cls) -> None: + """Initialize the shared curl_cffi session. + + Call this at application startup to ensure the session is created + before any concurrent requests. This avoids the lock overhead during + normal operation. + """ + await cls._get_curl_session() + logger.info("TikTokClient session initialized") @classmethod async def close_curl_session(cls) -> None: """Close shared curl_cffi session. Call on application shutdown.""" - with cls._curl_session_lock: + async with cls._get_session_lock(): session = cls._curl_session cls._curl_session = None cls._impersonate_target = None @@ -213,1144 +189,639 @@ async def close_curl_session(cls) -> None: except Exception as e: logger.debug(f"Error closing curl_cffi session: {e}") - @classmethod - def set_executor_size(cls, size: int) -> None: - """Set executor size before first use. Call at app startup. - - Args: - size: Number of worker threads for sync yt-dlp extraction calls. - Higher values allow more concurrent extractions. - """ - with cls._executor_lock: - if cls._executor is not None: - logger.warning( - f"Executor already created with {cls._executor._max_workers} workers. " - f"set_executor_size({size}) has no effect. Call before first TikTokClient use." - ) - return - cls._executor_size = size - logger.info(f"TikTokClient executor size set to {size}") - - @classmethod - def _get_executor(cls) -> ThreadPoolExecutor: - """Get or create the shared ThreadPoolExecutor.""" - with cls._executor_lock: - if cls._executor is None: - cls._executor = ThreadPoolExecutor( - max_workers=cls._executor_size, - thread_name_prefix="tiktok_sync_", - ) - logger.info( - f"Created TikTokClient executor with {cls._executor_size} workers" - ) - return cls._executor - - @classmethod - def _get_connector(cls, pool_size: int, limit_per_host: int = 50) -> TCPConnector: - """Get or create shared aiohttp connector for URL resolution. - - Note: This is only used for resolving short URLs (vm.tiktok.com, vt.tiktok.com). - Media downloads use curl_cffi for browser impersonation. - - Args: - pool_size: Total connection pool size - limit_per_host: Maximum connections per host - """ - with cls._connector_lock: - if cls._aiohttp_connector is None or cls._aiohttp_connector.closed: - cls._aiohttp_connector = TCPConnector( - limit=pool_size, - limit_per_host=limit_per_host, - ttl_dns_cache=300, - enable_cleanup_closed=True, - force_close=False, # Keep connections alive for reuse - ) - return cls._aiohttp_connector - - @classmethod - async def close_connector(cls) -> None: - """Close shared aiohttp connector. Call on application shutdown.""" - # Grab and clear connector under lock - with cls._connector_lock: - connector = cls._aiohttp_connector - cls._aiohttp_connector = None - # Close outside lock to avoid blocking - if connector and not connector.closed: - await connector.close() - - @classmethod - def shutdown_executor(cls) -> None: - """Shutdown the shared executor. Call on application shutdown.""" - with cls._executor_lock: - if cls._executor is not None: - cls._executor.shutdown(wait=False) - cls._executor = None - def __init__( self, proxy_manager: Optional["ProxyManager"] = None, data_only_proxy: bool = False, cookies: Optional[str] = None, + max_video_duration: Optional[int] = None, + long_video_threshold: int = 60, + # Legacy parameters - kept for compatibility but unused aiohttp_pool_size: int = 200, aiohttp_limit_per_host: int = 50, ): self.proxy_manager = proxy_manager self.data_only_proxy = data_only_proxy - self.aiohttp_pool_size = aiohttp_pool_size - self.aiohttp_limit_per_host = aiohttp_limit_per_host - - # Handle cookies with validation - cookies_path = cookies or os.getenv("YTDLP_COOKIES") - if cookies_path: - # Convert relative path to absolute path - if not os.path.isabs(cookies_path): - cookies_path = os.path.abspath(cookies_path) - - if os.path.isfile(cookies_path): - self.cookies = cookies_path - else: - logger.warning( - f"Cookie file not found: {cookies_path} - cookies will not be used" - ) - self.cookies = None + self.long_video_threshold = long_video_threshold + + # Get max_video_duration from config if not specified + if max_video_duration is None: + from data.config import config + + perf_config = config.get("performance", {}) + self.max_video_duration = perf_config.get("max_video_duration", 1800) else: - self.cookies = None + self.max_video_duration = max_video_duration + + # Load cookies from Netscape file + self._cookies: dict[str, str] = {} + if cookies is None: + from data.config import config + + cookies = config.get("tiktok", {}).get("cookies_file", "") + if cookies: + self._load_cookies(cookies) + # URL patterns self.mobile_regex = re.compile(r"https?://[^\s]+tiktok\.com/[^\s]+") self.web_regex = re.compile(r"https?://www\.tiktok\.com/@[^\s]+?/video/[0-9]+") self.photo_regex = re.compile( r"https?://www\.tiktok\.com/@[^\s]+?/photo/[0-9]+" ) - self.mus_regex = re.compile(r"https?://www\.tiktok\.com/music/[^\s]+") - def _get_proxy_info(self) -> str: - """Get proxy configuration info for logging.""" - if self.proxy_manager: - count = self.proxy_manager.get_proxy_count() - return f"rotating ({count} proxies)" - return "None" + # Data extraction patterns (compiled for performance) + # Use .*? with DOTALL to handle JSON containing '<' characters (e.g., "I <3 TikTok") + self._universal_data_re = re.compile( + r']+\bid="__UNIVERSAL_DATA_FOR_REHYDRATION__"[^>]*>(.*?)', + re.DOTALL, + ) + self._sigi_state_re = re.compile( + r']+\bid="(?:SIGI_STATE|sigi-persisted-data)"[^>]*>(.*?)', + re.DOTALL, + ) + self._next_data_re = re.compile( + r']+id="__NEXT_DATA__"[^>]*>(.*?)', + re.DOTALL, + ) - def _get_bypass_headers(self, referer_url: str) -> dict[str, str]: - """Get bypass headers dynamically from yt-dlp. + def _load_cookies(self, cookies_path: str) -> None: + """Load cookies from Netscape-format file.""" + if not os.path.isabs(cookies_path): + cookies_path = os.path.abspath(cookies_path) - Uses yt-dlp's standard headers which are updated with each yt-dlp release. - We add Origin and Referer for CORS compliance with TikTok CDN. + if not os.path.isfile(cookies_path): + logger.warning(f"Cookie file not found: {cookies_path}") + return - Args: - referer_url: The referer URL to set in headers + try: + jar = http.cookiejar.MozillaCookieJar(cookies_path) + jar.load(ignore_discard=True, ignore_expires=True) + for cookie in jar: + if "tiktok" in (cookie.domain or "").lower(): + self._cookies[cookie.name] = cookie.value + if self._cookies: + logger.info(f"Loaded {len(self._cookies)} cookies from {cookies_path}") + except Exception as e: + logger.warning(f"Failed to load cookies from {cookies_path}: {e}") - Returns: - Dict of headers for media download - """ - headers = dict(YTDLP_STD_HEADERS) # Copy to avoid mutation + def _get_headers( + self, referer_url: str = "https://www.tiktok.com/" + ) -> dict[str, str]: + """Get request headers with browser-like values.""" + headers = dict(YTDLP_STD_HEADERS) headers["Referer"] = referer_url headers["Origin"] = "https://www.tiktok.com" headers["Accept"] = "*/*" - # Avoid hardcoding Sec-Fetch-* headers; incorrect values can break - # audio/video downloads. Let the browser/client defaults apply. return headers - def _get_cookies_from_context( - self, download_context: dict[str, Any], media_url: Optional[str] = None - ) -> dict[str, str]: - """Extract cookies from yt-dlp context for media download. - - Uses yt-dlp's InfoExtractor to get cookies for the specific media URL. - This properly handles TikTok's cookie propagation to CDN domains. - - Args: - download_context: Dict containing 'ydl', 'ie' with YoutubeDL instance - media_url: Optional media URL to get cookies for (for CDN domain matching) + def _get_initial_proxy(self) -> Optional[str]: + """Get initial proxy for a request.""" + if self.proxy_manager: + return self.proxy_manager.get_next_proxy() + return None - Returns: - Dict of cookie name -> value - """ - cookies: dict[str, str] = {} - try: - ie = download_context.get("ie") - if ie: - # Get cookies from TikTok main domain first - tiktok_cookies = ie._get_cookies("https://www.tiktok.com/") - for cookie_name, cookie in tiktok_cookies.items(): - cookies[cookie_name] = cookie.value - - # If media_url is provided, also get cookies for that specific domain - if media_url: - media_cookies = ie._get_cookies(media_url) - for cookie_name, cookie in media_cookies.items(): - cookies[cookie_name] = cookie.value - else: - # Fallback: extract all cookies from cookiejar - ydl = download_context.get("ydl") - if ydl and hasattr(ydl, "cookiejar"): - for cookie in ydl.cookiejar: - cookies[cookie.name] = cookie.value - except Exception as e: - logger.debug(f"Failed to extract cookies from context: {e}") - return cookies + def _rotate_proxy(self, current_proxy: Optional[str]) -> Optional[str]: + """Rotate to next proxy on failure.""" + if self.proxy_manager: + new_proxy = self.proxy_manager.get_next_proxy() + logger.debug( + f"Rotating proxy: {_strip_proxy_auth(current_proxy)} -> " + f"{_strip_proxy_auth(new_proxy)}" + ) + return new_proxy + return current_proxy - async def _download_media_async( - self, - media_url: str, - download_context: dict[str, Any], - duration: Optional[int] = None, - max_retries: int = 3, - base_delay: float = 1.0, - chunk_size: int = 65536, - progress_callback: Optional[ProgressCallback] = None, - ) -> Optional[bytes]: - """Download media asynchronously using curl_cffi with browser impersonation. + # ------------------------------------------------------------------------- + # URL Resolution + # ------------------------------------------------------------------------- - Features: - - Uses yt-dlp's BROWSER_TARGETS for TLS fingerprint impersonation - - Conditional streaming for long videos (> threshold) to reduce memory spikes - - Automatic retry with exponential backoff for CDN failures - - Optional progress callback for download monitoring + async def _resolve_short_url(self, url: str, proxy: Optional[str]) -> str: + """Resolve short URLs (vm.tiktok.com, vt.tiktok.com, /t/) to full URLs.""" + parsed = urlparse(url) + host = (parsed.hostname or "").lower() + path = parsed.path or "" - Args: - media_url: Direct URL to the media on TikTok CDN - download_context: Dict containing 'ydl', 'ie', 'referer_url', and 'proxy' - duration: Video duration in seconds. If > threshold, uses streaming download. - max_retries: Maximum retry attempts for retryable errors (default: 3) - base_delay: Base delay in seconds for exponential backoff (default: 1.0) - chunk_size: Chunk size for streaming downloads in bytes (default: 64KB) - progress_callback: Optional callback(downloaded_bytes, total_bytes) for progress + is_short_url = host in {"vm.tiktok.com", "vt.tiktok.com"} or ( + host in {"www.tiktok.com", "tiktok.com"} and path.startswith("/t/") + ) - Returns: - Media bytes if successful, None otherwise - """ - from data.config import config + if not is_short_url: + return url - perf_config = config.get("performance", {}) - streaming_threshold = perf_config.get("streaming_duration_threshold", 300) + session = await self._get_curl_session() + headers = self._get_headers() - # Use streaming for long videos (> 5 minutes by default) - use_streaming = duration is not None and duration > streaming_threshold + response = await session.get( + url, + headers=headers, + cookies=self._cookies, + proxy=proxy, + timeout=15, + allow_redirects=True, + ) - referer_url = download_context.get("referer_url", "https://www.tiktok.com/") - headers = self._get_bypass_headers(referer_url) + final_url = str(response.url) + logger.debug(f"Resolved short URL: {url} -> {final_url}") + return final_url - # Get cookies using yt-dlp's cookie handling for proper domain matching - cookies = self._get_cookies_from_context(download_context, media_url) + def _extract_video_id(self, url: str) -> Optional[str]: + """Extract video ID from TikTok URL.""" + match = re.search(r"/(?:video|photo)/(\d+)", url) + if match: + return match.group(1) + return None - # Use proxy from context unless data_only_proxy is True - proxy = None - if not self.data_only_proxy: - proxy = download_context.get("proxy") + async def _resolve_and_extract_id( + self, video_link: str, proxy: Optional[str] + ) -> tuple[str, str, Optional[str]]: + """Resolve URL and extract video ID with retries. - session = self._get_curl_session() + Returns: + Tuple of (full_url, video_id, current_proxy) + """ + current_proxy = proxy - for attempt in range(1, max_retries + 1): - response = None + for attempt in range(1, MAX_RETRIES + 1): try: - response = await session.get( - media_url, - headers=headers, - cookies=cookies, - proxy=proxy, - timeout=60, - allow_redirects=True, - stream=use_streaming, - ) + full_url = await self._resolve_short_url(video_link, current_proxy) + video_id = self._extract_video_id(full_url) - if response.status_code == 200: - if use_streaming: - # Stream in chunks for long videos to reduce memory spikes. - # Note: We still buffer all chunks in memory before returning. - # This is intentional for simplicity - the streaming reduces - # peak memory during download by not loading the entire response - # at once, but the final join still requires full size in memory. - # For truly large files, consider streaming to disk instead. - total_size = response.headers.get("content-length") - total_size = int(total_size) if total_size else None - - chunks: list[bytes] = [] - downloaded = 0 - async for chunk in response.aiter_content(chunk_size): - chunks.append(chunk) - downloaded += len(chunk) - if progress_callback: - progress_callback(downloaded, total_size) - - if use_streaming and duration: - logger.debug( - f"Streamed {downloaded} bytes for {duration}s video" - ) - return b"".join(chunks) - else: - # Direct content for short videos (faster, less overhead) - return response.content - - elif response.status_code in (403, 429, 500, 502, 503, 504): - # Retryable error - CDN issues, rate limiting, etc. - if attempt < max_retries: - delay = base_delay * (2 ** (attempt - 1)) - jitter = delay * 0.1 * random.random() - logger.warning( - f"CDN returned {response.status_code} for {media_url}, " - f"retry {attempt}/{max_retries} after {delay:.1f}s" - ) - await asyncio.sleep(delay + jitter) - continue - else: - logger.error( - f"Media download failed after {max_retries} attempts " - f"with status {response.status_code} for {media_url}" - ) - return None - else: - # Non-retryable error (e.g., 404) - logger.error( - f"Media download failed with status {response.status_code} " - f"for {media_url}" + if not video_id: + raise TikTokExtractionError( + f"Could not extract video ID from {video_link}" ) - return None - except CurlError as e: - if attempt < max_retries: - delay = base_delay * (2 ** (attempt - 1)) + return full_url, video_id, current_proxy + + except (CurlError, TikTokExtractionError) as e: + if attempt < MAX_RETRIES: + current_proxy = self._rotate_proxy(current_proxy) + delay = 1.0 * (2 ** (attempt - 1)) logger.warning( - f"curl_cffi error for {media_url}, " - f"retry {attempt}/{max_retries} after {delay:.1f}s: {e}" + f"URL resolution attempt {attempt}/{MAX_RETRIES} failed: {e}" ) await asyncio.sleep(delay) continue - logger.error( - f"curl_cffi download failed after {max_retries} attempts " - f"for {media_url}: {e}" - ) - return None - - except Exception as e: - # Unexpected errors - don't retry - logger.error(f"Unexpected error downloading media {media_url}: {e}") - return None - - finally: - # Ensure response is properly closed to release connection back to pool. - # curl_cffi responses should be closed to prevent connection leaks. - if response is not None: - try: - response.close() - except Exception: - pass # Ignore errors during cleanup - - return None # Should not reach here, but satisfy type checker - - async def regex_check( - self, video_link: str - ) -> Tuple[Optional[str], Optional[bool]]: - """Check if a link matches known TikTok URL patterns. - - Args: - video_link: URL to check - - Returns: - Tuple of (matched_link, is_mobile) where is_mobile indicates if it's - a short/mobile URL. Returns (None, None) if no pattern matches. - """ - if self.web_regex.search(video_link) is not None: - link = self.web_regex.findall(video_link)[0] - return link, False - elif self.photo_regex.search(video_link) is not None: - link = self.photo_regex.findall(video_link)[0] - return link, False - elif self.mobile_regex.search(video_link) is not None: - link = self.mobile_regex.findall(video_link)[0] - return link, True - else: - return None, None + raise TikTokNetworkError(f"Failed to resolve URL: {e}") from e - async def get_video_id_from_mobile(self, link: str) -> Optional[str]: - """Extract video ID from a mobile/short TikTok URL by following redirects. - - This resolves short URLs like vm.tiktok.com or vt.tiktok.com to get the - actual video ID. - - Args: - link: Mobile/short TikTok URL + except TikTokError: + raise - Returns: - Video ID string if found, None otherwise. - """ - async with aiohttp.ClientSession() as client: - try: - async with client.get(link, allow_redirects=True) as response: - return response.url.name except Exception as e: - logger.error(f"Failed to get video ID from mobile link {link}: {e}") - return None + raise TikTokExtractionError(f"URL resolution failed: {e}") from e - async def get_video_id(self, link: str, is_mobile: bool) -> Optional[str]: - """Extract video ID from a TikTok URL. + raise TikTokNetworkError(f"URL resolution failed after {MAX_RETRIES} attempts") - Args: - link: TikTok URL (web or mobile) - is_mobile: Whether the link is a mobile/short URL + # ------------------------------------------------------------------------- + # Video Info Extraction + # ------------------------------------------------------------------------- + + def _parse_webpage_data( + self, html: str, video_id: str + ) -> tuple[dict[str, Any], int]: + """Parse video data from TikTok webpage HTML. Returns: - Video ID string if found, None otherwise. + Tuple of (video_data dict, status_code) """ - video_id: Optional[str] = None - if not is_mobile: - matches = _redirect_regex.findall(link) - if matches: - video_id = matches[0] - else: + # Try UNIVERSAL_DATA first (most common format) + match = self._universal_data_re.search(html) + if match: try: - video_id = await self.get_video_id_from_mobile(link) - except Exception: + data = json.loads(match.group(1)) + scope = data.get("__DEFAULT_SCOPE__", {}) + video_detail = scope.get("webapp.video-detail", {}) + status = video_detail.get("statusCode", 0) + item_info = video_detail.get("itemInfo", {}) + video_data = item_info.get("itemStruct", {}) + if video_data: + logger.debug("Parsed video data from UNIVERSAL_DATA") + return video_data, status + except json.JSONDecodeError: pass - return video_id - async def _resolve_url(self, url: str) -> str: - """Resolve short URLs (vm.tiktok.com, vt.tiktok.com) to full URLs. - - Uses shared connector for connection pooling efficiency. - """ - if "vm.tiktok.com" in url or "vt.tiktok.com" in url: - connector = self._get_connector( - self.aiohttp_pool_size, self.aiohttp_limit_per_host - ) - timeout = ClientTimeout(total=15, connect=5, sock_read=10) + # Try SIGI_STATE + match = self._sigi_state_re.search(html) + if match: try: - async with aiohttp.ClientSession( - connector=connector, - timeout=timeout, - connector_owner=False, # Don't close shared connector - ) as session: - async with session.get(url, allow_redirects=True) as response: - return str(response.url) - except Exception as e: - logger.error(f"Failed to resolve URL {url}: {e}") - return url - return url + data = json.loads(match.group(1)) + status = data.get("VideoPage", {}).get("statusCode", 0) + video_data = data.get("ItemModule", {}).get(video_id, {}) + if video_data: + logger.debug("Parsed video data from SIGI_STATE") + return video_data, status + except json.JSONDecodeError: + pass - def _extract_video_id(self, url: str) -> Optional[str]: - """Extract video ID from TikTok URL.""" - match = re.search(r"/(?:video|photo)/(\d+)", url) + # Try Next.js data + match = self._next_data_re.search(html) if match: - return match.group(1) - return None + try: + data = json.loads(match.group(1)) + page_props = data.get("props", {}).get("pageProps", {}) + status = page_props.get("statusCode", 0) + video_data = page_props.get("itemInfo", {}).get("itemStruct", {}) + if video_data: + logger.debug("Parsed video data from NEXT_DATA") + return video_data, status + except json.JSONDecodeError: + pass - def _get_ydl_opts( - self, use_proxy: bool = True, explicit_proxy: Any = ... - ) -> dict[str, Any]: - """Get base yt-dlp options. + return {}, -1 - Args: - use_proxy: If True and no explicit_proxy is given, use proxy from config. - If False, no proxy is used (for media downloads when data_only_proxy=True). - explicit_proxy: If provided (including None), use this specific proxy decision - instead of getting one from rotation. Pass None to force direct - connection. Uses sentinel default (...) to distinguish "not provided" - from "provided as None". + def _check_status(self, status: int, video_link: str) -> None: + """Check status code and raise appropriate error.""" + if status == 0: + return # Success - Returns: - Dict of yt-dlp options. - """ - opts: dict[str, Any] = { - "quiet": True, - "no_warnings": True, - } - - # Use explicit proxy decision if it was provided (even if None = direct connection) - if explicit_proxy is not ...: - if explicit_proxy is not None: - opts["proxy"] = explicit_proxy - logger.debug(f"Using explicit proxy: {explicit_proxy}") - else: - logger.debug("Using explicit direct connection (no proxy)") - elif use_proxy and self.proxy_manager: - proxy = self.proxy_manager.get_next_proxy() - if proxy is not None: # None means direct connection - opts["proxy"] = proxy - logger.debug(f"Using proxy: {proxy}") - else: - logger.debug("Using direct connection (no proxy)") + if status in (10216, 10222): + raise TikTokPrivateError(f"Video {video_link} is private") + elif status == 10204: + raise TikTokRegionError(f"Video {video_link} is blocked in your region") + elif status == 10000: + raise TikTokDeletedError(f"Video {video_link} was deleted") + elif status != 0 and status != -1: + logger.warning(f"Unknown TikTok status code: {status}") - if self.cookies: - opts["cookiefile"] = self.cookies - logger.debug(f"yt-dlp using cookie file: {self.cookies}") - return opts + async def _fetch_video_info( + self, url: str, video_id: str, proxy: Optional[str] + ) -> dict[str, Any]: + """Fetch and parse video info from TikTok webpage.""" + # Normalize photo URLs to video URLs (TikTok serves slideshow data on /video/ endpoint) + normalized_url = url.replace("/photo/", "/video/") + + session = await self._get_curl_session() + headers = self._get_headers(normalized_url) + + response = await session.get( + normalized_url, + headers=headers, + cookies=self._cookies, + proxy=proxy, + timeout=30, + allow_redirects=True, + ) - def _extract_raw_data_sync( - self, url: str, video_id: str, explicit_proxy: Any = ... - ) -> Tuple[Optional[dict[str, Any]], Optional[str]]: - """ - Extract raw TikTok data using yt-dlp's internal API. - This method supports both videos AND slideshows. + if response.status_code != 200: + if response.status_code == 429: + raise TikTokRateLimitError("Rate limited by TikTok") + raise TikTokNetworkError(f"HTTP {response.status_code}") - Args: - url: The TikTok URL to extract - video_id: The video ID extracted from the URL - explicit_proxy: Explicit proxy decision for this request. If provided - (including None), uses this instead of rotation. Pass None - to force direct connection. - - NOTE: This code relies on yt-dlp's private API (_extract_web_data_and_status), - which may change or be removed in future yt-dlp releases. Keep yt-dlp up-to-date - and be prepared to update this code if the private API changes. - """ - ydl_opts = self._get_ydl_opts(use_proxy=True, explicit_proxy=explicit_proxy) + # Check for login redirect + final_url = str(response.url) + if "/login" in urlparse(final_url).path: + raise TikTokPrivateError("TikTok requires login to access this content") - try: - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - # Get the TikTok extractor - ie = ydl.get_info_extractor("TikTok") - ie.set_downloader(ydl) - - # Convert /photo/ to /video/ URL (yt-dlp requirement) - normalized_url = url.replace("/photo/", "/video/") - - # Guard: Check if the private method exists before calling it. - # This method is part of yt-dlp's internal API and may be absent - # in future releases. - if not hasattr(ie, "_extract_web_data_and_status"): - logger.error( - "yt-dlp's TikTok extractor is missing '_extract_web_data_and_status' method. " - f"Current yt-dlp version: {yt_dlp.version.__version__}. " - "Please update yt-dlp to a compatible version: pip install -U yt-dlp" - ) - raise TikTokExtractionError( - "Incompatible yt-dlp version: missing required internal method. " - "Please update yt-dlp: pip install -U yt-dlp" - ) + html = response.text - try: - # Use yt-dlp's internal method to get raw webpage data - video_data, status = ie._extract_web_data_and_status( - normalized_url, video_id - ) - except AttributeError as e: - logger.error( - f"Failed to call yt-dlp internal method: {e}. " - f"Current yt-dlp version: {yt_dlp.version.__version__}. " - "Please update yt-dlp: pip install -U yt-dlp" - ) - raise TikTokExtractionError( - "Incompatible yt-dlp version. Please update yt-dlp: pip install -U yt-dlp" - ) from e + # Debug logging - also serves as a timing buffer for curl_cffi response handling + # (removing this logging has caused intermittent extraction failures) + has_universal = "__UNIVERSAL_DATA_FOR_REHYDRATION__" in html + has_sigi = "SIGI_STATE" in html or "sigi-persisted-data" in html + has_next = "__NEXT_DATA__" in html + logger.debug( + f"Fetched {len(html)} bytes, patterns: UNIVERSAL={has_universal}, " + f"SIGI={has_sigi}, NEXT={has_next}" + ) - return video_data, status - except yt_dlp.utils.DownloadError as e: - error_msg = str(e).lower() - if ( - "unavailable" in error_msg - or "removed" in error_msg - or "deleted" in error_msg - ): - logger.warning(f"Video appears deleted: {e}") - return None, "deleted" - elif "private" in error_msg: - logger.warning(f"Video is private: {e}") - return None, "private" - elif "rate" in error_msg or "too many" in error_msg or "429" in error_msg: - logger.warning(f"Rate limited: {e}") - return None, "rate_limit" - elif ( - "region" in error_msg - or "geo" in error_msg - or "country" in error_msg - or "not available in your" in error_msg - ): - logger.warning(f"Region blocked: {e}") - return None, "region" - # IP blocked and other errors -> generic extraction error - logger.error( - f"yt-dlp download error for video {video_id} ({url}): {e}\n" - f" yt-dlp version: {yt_dlp.version.__version__}\n" - f" Proxy: {self._get_proxy_info()}\n" - f" Cookies: {self.cookies or 'None'}" - ) - return None, "extraction" - except yt_dlp.utils.ExtractorError as e: - error_msg = str(e) - logger.error( - f"yt-dlp extractor error for video {video_id} ({url}): {error_msg}\n" - f" yt-dlp version: {yt_dlp.version.__version__}\n" - f" Proxy: {self._get_proxy_info()}\n" - f" Cookies: {self.cookies or 'None'}" - ) - # Log additional guidance for common issues - if "unable to extract" in error_msg.lower(): - logger.error( - "This may indicate TikTok changed their page structure. " - "Try updating yt-dlp: pip install -U yt-dlp\n" - "If the issue persists, check https://github.com/yt-dlp/yt-dlp/issues" - ) - return None, "extraction" - except TikTokError: - raise - except Exception as e: - logger.error( - f"yt-dlp extraction failed for video {video_id} ({url}): {e}\n" - f" yt-dlp version: {yt_dlp.version.__version__}\n" - f" Error type: {type(e).__name__}", - exc_info=True, - ) - return None, "extraction" + if not any([has_universal, has_sigi, has_next]): + # Log preview at DEBUG level to avoid exposing sensitive data in production logs + logger.debug(f"No data patterns found! HTML preview: {html[:2000]}") - def _extract_with_context_sync( - self, url: str, video_id: str, request_proxy: Any = ... - ) -> Tuple[Optional[dict[str, Any]], Optional[str], Optional[dict[str, Any]]]: - """ - Extract TikTok data and return the download context for later media downloads. + video_data, status = self._parse_webpage_data(html, video_id) - This method keeps the YoutubeDL instance alive so it can be reused for - downloading media (videos, images) with the same auth context. + self._check_status(status, url) - Args: - url: The TikTok URL to extract - video_id: The video ID extracted from the URL - request_proxy: Explicit proxy decision for this request. If provided - (including None), uses this instead of rotation and stores - in download_context for media downloads. Pass None to force - direct connection. + if not video_data: + raise TikTokExtractionError("Unable to extract video data from webpage") - Returns: - Tuple of (video_data, status, download_context) - - video_data: Raw TikTok API response - - status: Error status string or None - - download_context: Dict with 'ydl', 'ie', 'referer_url', 'proxy' for media downloads - - Note: - The caller is responsible for closing the YDL instance in download_context - when done. On error paths, this method closes the YDL instance before returning. - """ - ydl_opts = self._get_ydl_opts(use_proxy=True, explicit_proxy=request_proxy) - ydl = None + return video_data - try: - # Create YDL instance WITHOUT context manager so it stays alive - ydl = yt_dlp.YoutubeDL(ydl_opts) - - # Get the TikTok extractor - ie = ydl.get_info_extractor("TikTok") - ie.set_downloader(ydl) - - # Convert /photo/ to /video/ URL (yt-dlp requirement) - normalized_url = url.replace("/photo/", "/video/") - - # Guard: Check if the private method exists - if not hasattr(ie, "_extract_web_data_and_status"): - logger.error( - "yt-dlp's TikTok extractor is missing '_extract_web_data_and_status' method. " - f"Current yt-dlp version: {yt_dlp.version.__version__}. " - "Please update yt-dlp: pip install -U yt-dlp" - ) - raise TikTokExtractionError( - "Incompatible yt-dlp version: missing required internal method. " - "Please update yt-dlp: pip install -U yt-dlp" - ) + async def _fetch_video_info_with_retry( + self, url: str, video_id: str, proxy: Optional[str] + ) -> tuple[dict[str, Any], Optional[str]]: + """Fetch video info with retries. Returns (video_data, final_proxy).""" + current_proxy = proxy + last_error: Optional[Exception] = None + for attempt in range(1, MAX_RETRIES + 1): try: - # Use yt-dlp's internal method to get raw webpage data - # This also sets up all necessary cookies - video_data, status = ie._extract_web_data_and_status( - normalized_url, video_id - ) - except AttributeError as e: - logger.error( - f"Failed to call yt-dlp internal method: {e}. " - f"Current yt-dlp version: {yt_dlp.version.__version__}. " - "Please update yt-dlp: pip install -U yt-dlp" - ) - raise TikTokExtractionError( - "Incompatible yt-dlp version. Please update: pip install -U yt-dlp" - ) from e - - # Create download context with the live instances - download_context = { - "ydl": ydl, - "ie": ie, - "referer_url": url, - "proxy": request_proxy, # Store proxy for per-request assignment - } - - # Success - transfer ownership of ydl to caller via download_context - # Set ydl to None so finally block doesn't close it - ydl = None - return video_data, status, download_context - - except yt_dlp.utils.DownloadError as e: - error_msg = str(e).lower() - if ( - "unavailable" in error_msg - or "removed" in error_msg - or "deleted" in error_msg - ): - logger.warning(f"Video appears deleted: {e}") - return None, "deleted", None - elif "private" in error_msg: - logger.warning(f"Video is private: {e}") - return None, "private", None - elif "rate" in error_msg or "too many" in error_msg or "429" in error_msg: - logger.warning(f"Rate limited: {e}") - return None, "rate_limit", None - elif ( - "region" in error_msg - or "geo" in error_msg - or "country" in error_msg - or "not available in your" in error_msg - ): - logger.warning(f"Region blocked: {e}") - return None, "region", None - logger.error( - f"yt-dlp download error for video {video_id} ({url}): {e}\n" - f" yt-dlp version: {yt_dlp.version.__version__}\n" - f" Proxy: {self._get_proxy_info()}\n" - f" Cookies: {self.cookies or 'None'}" - ) - return None, "extraction", None - except yt_dlp.utils.ExtractorError as e: - error_msg = str(e) - logger.error( - f"yt-dlp extractor error for video {video_id} ({url}): {error_msg}\n" - f" yt-dlp version: {yt_dlp.version.__version__}\n" - f" Proxy: {self._get_proxy_info()}\n" - f" Cookies: {self.cookies or 'None'}" - ) - # Log additional guidance for common issues - if "unable to extract" in error_msg.lower(): - logger.error( - "This may indicate TikTok changed their page structure. " - "Try updating yt-dlp: pip install -U yt-dlp\n" - "If the issue persists, check https://github.com/yt-dlp/yt-dlp/issues" - ) - return None, "extraction", None - except TikTokError: - raise - except Exception as e: - logger.error( - f"yt-dlp extraction failed for video {video_id} ({url}): {e}\n" - f" yt-dlp version: {yt_dlp.version.__version__}\n" - f" Error type: {type(e).__name__}", - exc_info=True, - ) - return None, "extraction", None - finally: - # Close ydl if we still own it (i.e., we didn't successfully transfer - # ownership to the caller via download_context) - if ydl is not None: - try: - ydl.close() - except Exception: - pass - - async def _run_sync(self, func: Any, *args: Any) -> Any: - """Run synchronous function in executor.""" - loop = asyncio.get_event_loop() - return await loop.run_in_executor(self._get_executor(), func, *args) - - def _close_download_context( - self, download_context: Optional[dict[str, Any]] - ) -> None: - """Close the YoutubeDL instance in a download context if present. + video_data = await self._fetch_video_info(url, video_id, current_proxy) + return video_data, current_proxy - Args: - download_context: Dict containing 'ydl' key with YoutubeDL instance, or None - """ - if download_context and "ydl" in download_context: - try: - download_context["ydl"].close() - except Exception: - pass # Ignore errors during cleanup + except (TikTokNetworkError, TikTokRateLimitError, CurlError) as e: + last_error = e + if attempt < MAX_RETRIES: + current_proxy = self._rotate_proxy(current_proxy) + delay = 1.0 * (2 ** (attempt - 1)) + logger.warning( + f"Video info fetch attempt {attempt}/{MAX_RETRIES} failed: {e}" + ) + await asyncio.sleep(delay) + continue + raise - def _raise_for_status(self, status: str, video_link: str) -> None: - """Raise appropriate exception based on status string.""" - if status == "deleted": - raise TikTokDeletedError(f"Video {video_link} was deleted") - elif status == "private": - raise TikTokPrivateError(f"Video {video_link} is private") - elif status == "rate_limit": - raise TikTokRateLimitError("Rate limited by TikTok") - elif status == "network": - raise TikTokNetworkError("Network error occurred") - elif status == "region": - raise TikTokRegionError( - f"Video {video_link} is not available in your region" - ) - else: - # Handle "extraction" and any unknown status values - raise TikTokExtractionError(f"Failed to extract video {video_link}") + except TikTokError: + raise - async def download_image(self, image_url: str, video_info: VideoInfo) -> bytes: - """ - Download an image using async aiohttp with yt-dlp bypass headers/cookies. + except Exception as e: + raise TikTokExtractionError(f"Video info fetch failed: {e}") from e - This method uses the download context that was saved during video extraction, - applying the same cookies and headers for authentication. + if last_error: + raise last_error + raise TikTokExtractionError("Video info fetch failed") - Args: - image_url: Direct URL to the image on TikTok CDN - video_info: VideoInfo object that was returned by video() method - (must be a slideshow with _download_context set) + # ------------------------------------------------------------------------- + # Media Download + # ------------------------------------------------------------------------- - Returns: - Image bytes + async def _download_media( + self, + media_url: str, + proxy: Optional[str], + referer_url: str = "https://www.tiktok.com/", + timeout: int = 60, + ) -> bytes: + """Download media (video, audio, image) from TikTok CDN.""" + session = await self._get_curl_session() + headers = self._get_headers(referer_url) + + response = await session.get( + media_url, + headers=headers, + cookies=self._cookies, + proxy=proxy, + timeout=timeout, + allow_redirects=True, + ) - Raises: - ValueError: If video_info has no download context - TikTokNetworkError: If the download fails - """ - if not video_info._download_context: - raise ValueError( - "VideoInfo has no download context - was it extracted as a slideshow?" + if response.status_code != 200: + raise TikTokNetworkError( + f"Media download failed: HTTP {response.status_code}" ) - result = await self._download_media_async( - image_url, video_info._download_context - ) + return response.content - if result is None: - raise TikTokNetworkError(f"Failed to download image: {image_url}") + async def _download_media_with_retry( + self, + media_url: str, + proxy: Optional[str], + referer_url: str = "https://www.tiktok.com/", + ) -> tuple[bytes, Optional[str]]: + """Download media with retries. Returns (data, final_proxy).""" + current_proxy = proxy if not self.data_only_proxy else None + last_error: Optional[Exception] = None + + for attempt in range(1, MAX_RETRIES + 1): + try: + data = await self._download_media(media_url, current_proxy, referer_url) + return data, current_proxy - return result + except (TikTokNetworkError, CurlError) as e: + last_error = e + if attempt < MAX_RETRIES: + if not self.data_only_proxy: + current_proxy = self._rotate_proxy(current_proxy) + delay = 1.0 * (2 ** (attempt - 1)) + logger.warning( + f"Media download attempt {attempt}/{MAX_RETRIES} failed: {e}" + ) + await asyncio.sleep(delay) + continue + raise TikTokNetworkError(f"Media download failed: {e}") from e - async def detect_image_format(self, image_url: str, video_info: VideoInfo) -> str: - """ - Detect image format using HTTP Range request (only fetches first 20 bytes). + except Exception as e: + raise TikTokNetworkError(f"Media download failed: {e}") from e - Uses curl_cffi with browser impersonation for TLS fingerprint bypass. + raise TikTokNetworkError("Media download failed after all retries") - Args: - image_url: Direct URL to the image on TikTok CDN - video_info: VideoInfo object with download context + # ------------------------------------------------------------------------- + # URL Pattern Matching + # ------------------------------------------------------------------------- + + async def regex_check( + self, video_link: str + ) -> tuple[Optional[str], Optional[bool]]: + """Check if a link matches known TikTok URL patterns. Returns: - File extension string: ".jpg", ".webp", ".heic", or ".jpg" (default) + Tuple of (matched_link, is_mobile). """ - if not video_info._download_context: - # Fallback: assume needs processing if no context - return ".heic" - - referer_url = video_info._download_context.get( - "referer_url", "https://www.tiktok.com/" - ) - headers = self._get_bypass_headers(referer_url) - headers["Range"] = "bytes=0-19" # Only fetch first 20 bytes - cookies = self._get_cookies_from_context( - video_info._download_context, image_url - ) - - # Use proxy from context unless data_only_proxy is True - proxy = None - if not self.data_only_proxy: - proxy = video_info._download_context.get("proxy") - - session = self._get_curl_session() - - response = None - try: - response = await session.get( - image_url, - headers=headers, - cookies=cookies, - proxy=proxy, - timeout=10, - allow_redirects=True, - ) - if response.status_code in (200, 206): # 206 = Partial Content - return self._detect_format_from_bytes(response.content) - else: - logger.warning( - f"Range request returned status {response.status_code} for {image_url}" - ) - return ".heic" # Assume needs processing on error - except Exception as e: - logger.debug(f"Range request failed for {image_url}: {e}") - return ".heic" # Assume needs processing on error - finally: - if response is not None: - response.close() + if self.web_regex.search(video_link) is not None: + link = self.web_regex.findall(video_link)[0] + return link, False + elif self.photo_regex.search(video_link) is not None: + link = self.photo_regex.findall(video_link)[0] + return link, False + elif self.mobile_regex.search(video_link) is not None: + link = self.mobile_regex.findall(video_link)[0] + return link, True + return None, None - @staticmethod - def _detect_format_from_bytes(data: bytes) -> str: - """Detect image format from magic bytes.""" - if data.startswith(b"\xff\xd8\xff"): - return ".jpg" - elif data.startswith(b"RIFF") and len(data) >= 12 and data[8:12] == b"WEBP": - return ".webp" - elif len(data) >= 12 and ( - data[4:12] == b"ftypheic" or data[4:12] == b"ftypmif1" - ): - return ".heic" - else: - return ".jpg" # Unknown format, default to jpg + # ------------------------------------------------------------------------- + # Public API: Video + # ------------------------------------------------------------------------- - async def video(self, video_link: str) -> VideoInfo: - """ - Extract video/slideshow data from TikTok URL. + async def video( + self, + video_link: str, + _resolved: Optional[tuple[str, str, Optional[str]]] = None, + ) -> VideoInfo: + """Extract video/slideshow data from TikTok URL. Args: video_link: TikTok video or slideshow URL Returns: VideoInfo: Object containing video/slideshow information. - - For videos: data contains bytes, url contains direct video URL + - For videos: data contains bytes - For slideshows: data contains list of image URLs Raises: - TikTokDeletedError: Video was deleted by creator + TikTokDeletedError: Video was deleted TikTokPrivateError: Video is private - TikTokNetworkError: Network/connection error - TikTokRateLimitError: Too many requests - TikTokRegionError: Video not available in region + TikTokNetworkError: Network error + TikTokRateLimitError: Rate limited + TikTokRegionError: Video geo-blocked + TikTokVideoTooLongError: Video exceeds max duration TikTokExtractionError: Generic extraction failure """ - download_context = None - context_transferred = False # Track if context ownership was transferred + # Pick a proxy for this entire operation + current_proxy = self._get_initial_proxy() try: - # Get proxy once for the entire request (per-request proxy assignment) - request_proxy: Optional[str] = None - if self.proxy_manager: - request_proxy = self.proxy_manager.get_next_proxy() - if request_proxy: - logger.info(f"Video attempt using proxy: {request_proxy}") - else: - logger.info("Video attempt using direct connection (no proxy)") - - # Resolve short URLs - full_url = await self._resolve_url(video_link) - video_id = self._extract_video_id(full_url) - - if not video_id: - logger.error(f"Could not extract video ID from {video_link}") - raise TikTokExtractionError( - f"Could not extract video ID from {video_link}" + # Step 1: Resolve URL and extract video ID + if _resolved: + full_url, video_id, current_proxy = _resolved + else: + full_url, video_id, current_proxy = await self._resolve_and_extract_id( + video_link, current_proxy ) - # Extract raw data with download context for authenticated downloads - # Pass request_proxy for per-request proxy assignment - video_data, status, download_context = await self._run_sync( - self._extract_with_context_sync, full_url, video_id, request_proxy + # Step 2: Fetch video info + video_data, current_proxy = await self._fetch_video_info_with_retry( + full_url, video_id, current_proxy ) - # Check for error status and raise appropriate exception - if status and status not in ("ok", None): - self._raise_for_status(status, video_link) - - # Check if it's a slideshow (imagePost present in raw data) - if video_data: - image_post = video_data.get("imagePost") - if image_post: - images = image_post.get("images", []) - image_urls = [] - - for img in images: - url_list = img.get("imageURL", {}).get("urlList", []) - if url_list: - # Use first URL (primary CDN) - image_urls.append(url_list[0]) - - if image_urls: - author = video_data.get("author", {}).get("uniqueId", "") - - # Transfer context ownership to VideoInfo - context_transferred = True - return VideoInfo( - type="images", - data=image_urls, - id=int(video_id), - cover=None, - width=None, - height=None, - duration=None, - author=author, - link=video_link, - url=None, - _download_context=download_context, - ) - - # It's a video - extract video URL from raw data and download to memory - # No need to call _extract_video_info_sync again - we already have the data - - if not video_data: - raise TikTokExtractionError( - f"Failed to extract video info for {video_link}" - ) - - if not download_context: - raise TikTokExtractionError( - f"No download context available for {video_link}" - ) + # Check for slideshow (imagePost) + image_post = video_data.get("imagePost") + if image_post: + images = image_post.get("images", []) + image_urls = [] + + for img in images: + url_list = img.get("imageURL", {}).get("urlList", []) + if url_list: + image_urls.append(url_list[0]) + + if image_urls: + author = video_data.get("author", {}).get("uniqueId", "") + return VideoInfo( + type="images", + data=image_urls, + id=int(video_id), + cover=None, + width=None, + height=None, + duration=None, + author=author, + link=video_link, + url=None, + _proxy=current_proxy, + ) - # Get video info and extract metadata early (needed for download decisions) + # It's a video - extract info and download video_info = video_data.get("video", {}) - # Extract duration BEFORE download - needed for streaming decision + # Get duration duration = video_info.get("duration") if duration: duration = int(duration) - # Check if video exceeds maximum duration (configurable via MAX_VIDEO_DURATION env) - # This prevents downloading very large files that would strain resources - # Default: 1800 seconds (30 minutes). Set to 0 to disable limit. - from data.config import config - - perf_config = config.get("performance", {}) - max_video_duration = perf_config.get("max_video_duration", 1800) - if max_video_duration > 0 and duration and duration > max_video_duration: - logger.warning( - f"Video {video_link} exceeds max duration: {duration}s > {max_video_duration}s" - ) + # Check max duration + if ( + self.max_video_duration > 0 + and duration + and duration > self.max_video_duration + ): raise TikTokVideoTooLongError( - f"Video is {duration // 60} minutes long, max allowed is {max_video_duration // 60} minutes" + f"Video is {duration // 60} minutes long, " + f"max allowed is {self.max_video_duration // 60} minutes" ) - # Get video URL from raw TikTok data - # Try multiple paths as TikTok API structure can vary - video_url = None - - # Try playAddr first (primary playback URL) - play_addr = video_info.get("playAddr") - if play_addr: - video_url = play_addr + # Get video URL + video_url = ( + video_info.get("playAddr") + or video_info.get("downloadAddr") + or self._get_video_url_from_bitrate(video_info) + ) - # Try downloadAddr (sometimes has better quality) if not video_url: - download_addr = video_info.get("downloadAddr") - if download_addr: - video_url = download_addr + raise TikTokExtractionError("Could not find video URL") - # Try bitrateInfo for specific quality URLs - if not video_url: - bitrate_info = video_info.get("bitrateInfo", []) - if bitrate_info: - # Get the best quality (usually first or last) - for br in bitrate_info: - play_addr_obj = br.get("PlayAddr", {}) - url_list = play_addr_obj.get("UrlList", []) - if url_list: - video_url = url_list[0] - break + # Save extraction proxy before download (download may use different proxy + # when data_only_proxy=True, but we want to preserve the extraction proxy + # for VideoInfo._proxy which is used for slideshow image downloads) + extraction_proxy = current_proxy - if not video_url: - logger.error(f"Could not find video URL in raw data for {video_link}") - raise TikTokExtractionError( - f"Could not find video URL for {video_link}" - ) - - # Download video using curl_cffi with browser impersonation - # Pass duration for conditional streaming (streams if > 5 minutes) - video_bytes = await self._download_media_async( - video_url, download_context, duration=duration + # Step 3: Download video + video_bytes, _ = await self._download_media_with_retry( + video_url, current_proxy, full_url ) - # Close the download context - it's no longer needed for videos - # (unlike slideshows where we keep it for image downloads) - self._close_download_context(download_context) - context_transferred = True # Mark as handled - - if video_bytes is None: - raise TikTokExtractionError(f"Failed to download video {video_link}") + # Extract metadata + author = video_data.get("author", {}).get("uniqueId", "") - # Log successful download with proxy info - proxy_info = request_proxy or "direct connection" - logger.info( - f"Successfully downloaded video {video_id} using proxy: {proxy_info}" - ) + # Include metadata for long videos (> threshold) + if duration and duration > self.long_video_threshold: + cover = video_info.get("cover") or video_info.get("originCover") + width = ( + int(video_info.get("width")) if video_info.get("width") else None + ) + height = ( + int(video_info.get("height")) if video_info.get("height") else None + ) + else: + cover = None + width = None + height = None - # Extract remaining metadata from raw data - width = video_info.get("width") - height = video_info.get("height") - author = video_data.get("author", {}).get("uniqueId", "") - cover = video_info.get("cover") or video_info.get("originCover") + logger.info(f"Successfully downloaded video {video_id}") return VideoInfo( type="video", data=video_bytes, id=int(video_id), cover=cover, - width=int(width) if width else None, - height=int(height) if height else None, + width=width, + height=height, duration=duration, author=author, link=video_link, url=video_url, + _proxy=extraction_proxy, ) except TikTokError: - # Re-raise TikTok errors as-is raise - except asyncio.CancelledError: - # Handle cancellation (e.g., from timeout) - ensure cleanup happens - logger.debug(f"Video extraction cancelled for {video_link}") - raise - except aiohttp.ClientError as e: - logger.error(f"Network error extracting video {video_link}: {e}") - raise TikTokNetworkError(f"Network error: {e}") from e except Exception as e: logger.error(f"Error extracting video {video_link}: {e}") raise TikTokExtractionError(f"Failed to extract video: {e}") from e - finally: - # Clean up download context if ownership wasn't transferred - if not context_transferred: - self._close_download_context(download_context) + + def _get_video_url_from_bitrate(self, video_info: dict[str, Any]) -> Optional[str]: + """Extract video URL from bitrateInfo.""" + for br in video_info.get("bitrateInfo", []): + play_addr = br.get("PlayAddr", {}) + url_list = play_addr.get("UrlList", []) + if url_list: + return url_list[0] + return None async def video_with_retry( self, video_link: str, max_attempts: int = 3, - request_timeout: float = 10.0, + request_timeout: float = 30.0, base_delay: float = 1.0, - on_retry: Callable[[int], Awaitable[None]] | None = None, + on_retry: Optional[Callable[[int], Awaitable[None]]] = None, ) -> VideoInfo: - """ - Extract video info with retry logic and per-request timeout. - - Each request times out after `request_timeout` seconds. On timeout or - transient errors (network, rate limit, extraction), retries with exponential - backoff. Does NOT retry on permanent errors (deleted, private, region). + """Extract video info with top-level retry logic and timeout. Args: video_link: TikTok video URL - max_attempts: Maximum number of attempts (default: 3) - request_timeout: Timeout per request in seconds (default: 10) - base_delay: Base delay for exponential backoff in seconds (default: 1.0) - on_retry: Optional async callback called with attempt number (1, 2, 3...) - before each attempt. Use for updating status (e.g., emoji reactions). + max_attempts: Maximum attempts + request_timeout: Timeout per attempt in seconds + base_delay: Base delay for exponential backoff + on_retry: Optional async callback(attempt_number) before each attempt Returns: - VideoInfo object containing video/slideshow data + VideoInfo object Raises: - TikTokDeletedError: Video was deleted (not retried) - TikTokPrivateError: Video is private (not retried) - TikTokRegionError: Video geo-blocked (not retried) - TikTokNetworkError: Network error after all retries exhausted - TikTokRateLimitError: Rate limited after all retries exhausted - TikTokExtractionError: Extraction error after all retries exhausted - - Example: - async def update_status(attempt: int): - emojis = ["👀", "🔄", "⏳"] - await message.react([ReactionTypeEmoji(emoji=emojis[attempt - 1])]) - - video_info = await client.video_with_retry( - video_link, - max_attempts=3, - request_timeout=10.0, - on_retry=update_status, - ) + TikTokDeletedError, TikTokPrivateError, TikTokRegionError: Not retried + TikTokVideoTooLongError: Not retried + Other errors: After all retries exhausted """ - last_error: Exception | None = None + last_error: Optional[Exception] = None + + # Resolve URL once before retry loop (has its own 3 retries) + current_proxy = self._get_initial_proxy() + try: + resolved = await self._resolve_and_extract_id(video_link, current_proxy) + except (TikTokNetworkError, TikTokExtractionError) as e: + raise TikTokInvalidLinkError(f"Invalid video link: {video_link}") from e for attempt in range(1, max_attempts + 1): try: - # Call status callback before each attempt if on_retry: await on_retry(attempt) async with asyncio.timeout(request_timeout): - return await self.video(video_link) + return await self.video(video_link, _resolved=resolved) except asyncio.TimeoutError as e: logger.warning( - f"Attempt {attempt}/{max_attempts} timed out after " - f"{request_timeout}s for {video_link}" + f"Attempt {attempt}/{max_attempts} timed out for {video_link}" ) last_error = e @@ -1364,30 +835,21 @@ async def update_status(attempt: int): ) last_error = e - except (TikTokDeletedError, TikTokPrivateError, TikTokRegionError): - # Permanent errors - don't retry, raise immediately + except ( + TikTokDeletedError, + TikTokPrivateError, + TikTokRegionError, + TikTokVideoTooLongError, + ): raise except TikTokError: - # Any other TikTok error - don't retry raise - # Exponential backoff with jitter (only if not last attempt) if attempt < max_attempts: - # Calculate delay: base_delay * 2^(attempt-1) = 1s, 2s, 4s... delay = base_delay * (2 ** (attempt - 1)) - # Add jitter (±10%) to prevent thundering herd - jitter = delay * 0.1 * (2 * random.random() - 1) - delay = max(0.5, delay + jitter) # Minimum 0.5s delay - logger.debug( - f"Retry backoff: sleeping {delay:.2f}s before attempt {attempt + 1}" - ) - await asyncio.sleep(delay) - - # All attempts exhausted - logger.error( - f"All {max_attempts} attempts failed for {video_link}: {last_error}" - ) + jitter = delay * 0.1 * random.random() + await asyncio.sleep(delay + jitter) if isinstance(last_error, asyncio.TimeoutError): raise TikTokNetworkError(f"Request timed out after {max_attempts} attempts") @@ -1395,16 +857,14 @@ async def update_status(attempt: int): if last_error: raise last_error - raise TikTokExtractionError( - f"Failed to extract video after {max_attempts} attempts" - ) + raise TikTokExtractionError(f"Failed after {max_attempts} attempts") - async def music(self, video_id: int) -> MusicInfo: - """ - Extract music info from a TikTok video. + # ------------------------------------------------------------------------- + # Public API: Music + # ------------------------------------------------------------------------- - All network operations are fully async - uses yt-dlp only for metadata - extraction, then downloads audio via async aiohttp. + async def music(self, video_id: int) -> MusicInfo: + """Extract music info from a TikTok video. Args: video_id: TikTok video ID @@ -1413,147 +873,69 @@ async def music(self, video_id: int) -> MusicInfo: MusicInfo: Object containing music/audio information Raises: - TikTokDeletedError: Video was deleted by creator - TikTokPrivateError: Video is private - TikTokNetworkError: Network/connection error - TikTokRateLimitError: Too many requests - TikTokRegionError: Video not available in region - TikTokExtractionError: Generic extraction failure + TikTokExtractionError: No music found or extraction failed + TikTokNetworkError: Network error """ - download_context = None + current_proxy = self._get_initial_proxy() + try: - # Get proxy once for the entire request (per-request proxy assignment) - request_proxy: Optional[str] = None - if self.proxy_manager: - request_proxy = self.proxy_manager.get_next_proxy() - if request_proxy: - logger.info(f"Music attempt using proxy: {request_proxy}") - else: - logger.info("Music attempt using direct connection (no proxy)") - - # Construct a URL with the video ID + # Construct URL url = f"https://www.tiktok.com/@_/video/{video_id}" - # Extract with context (keeps YDL alive for authenticated downloads) - video_data, status, download_context = await self._run_sync( - self._extract_with_context_sync, url, str(video_id), request_proxy + # Fetch video info + video_data, current_proxy = await self._fetch_video_info_with_retry( + url, str(video_id), current_proxy ) - # Check for error status and raise appropriate exception - if status and status not in ("ok", None): - self._raise_for_status(status, str(video_id)) - - if video_data is None: - raise TikTokExtractionError(f"No data returned for video {video_id}") - - if not download_context: - raise TikTokExtractionError( - f"No download context available for video {video_id}" - ) - - # Get music info - music_info = video_data.get("music") + # Extract music info + music_info = video_data.get("music", {}) if not music_info: - raise TikTokExtractionError(f"No music info found for video {video_id}") + raise TikTokExtractionError(f"No music info for video {video_id}") music_url = music_info.get("playUrl") if not music_url: - raise TikTokExtractionError(f"No music URL found for video {video_id}") + raise TikTokExtractionError(f"No music URL for video {video_id}") - # Download audio using async aiohttp with yt-dlp context (headers, cookies) - audio_bytes = await self._download_media_async(music_url, download_context) - if audio_bytes is None: - raise TikTokExtractionError( - f"Failed to download audio for video {video_id}" - ) - - # Log successful download with proxy info - proxy_info = request_proxy or "direct connection" - logger.info( - f"Successfully downloaded music from video {video_id} using proxy: {proxy_info}" + # Download audio + audio_bytes, _ = await self._download_media_with_retry( + music_url, current_proxy, url ) - # Get the music cover URL from music object - cover_url = ( - music_info.get("coverLarge") - or music_info.get("coverMedium") - or music_info.get("coverThumb") - or "" - ) + logger.info(f"Successfully downloaded music from video {video_id}") return MusicInfo( data=audio_bytes, - id=int(video_id), + id=video_id, title=music_info.get("title", ""), author=music_info.get("authorName", ""), duration=int(music_info.get("duration", 0)), - cover=cover_url, + cover=( + music_info.get("coverLarge") + or music_info.get("coverMedium") + or music_info.get("coverThumb") + or "" + ), ) except TikTokError: - # Re-raise TikTok errors as-is raise - except aiohttp.ClientError as e: - logger.error(f"Network error extracting music for video {video_id}: {e}") - raise TikTokNetworkError(f"Network error: {e}") from e except Exception as e: logger.error(f"Error extracting music for video {video_id}: {e}") raise TikTokExtractionError(f"Failed to extract music: {e}") from e - finally: - # Always clean up download context - self._close_download_context(download_context) async def music_with_retry( self, video_id: int, max_attempts: int = 3, - request_timeout: float = 10.0, + request_timeout: float = 30.0, base_delay: float = 1.0, - on_retry: Callable[[int], Awaitable[None]] | None = None, + on_retry: Optional[Callable[[int], Awaitable[None]]] = None, ) -> MusicInfo: - """ - Extract music info with retry logic and per-request timeout. - - Each request times out after `request_timeout` seconds. On timeout or - transient errors (network, rate limit, extraction), retries with exponential - backoff. Does NOT retry on permanent errors (deleted, private, region). - - Args: - video_id: TikTok video ID - max_attempts: Maximum number of attempts (default: 3) - request_timeout: Timeout per request in seconds (default: 10) - base_delay: Base delay for exponential backoff in seconds (default: 1.0) - on_retry: Optional async callback called with attempt number (1, 2, 3...) - before each attempt. Use for updating status (e.g., emoji reactions). - - Returns: - MusicInfo object containing audio data - - Raises: - TikTokDeletedError: Video was deleted (not retried) - TikTokPrivateError: Video is private (not retried) - TikTokRegionError: Video geo-blocked (not retried) - TikTokNetworkError: Network error after all retries exhausted - TikTokRateLimitError: Rate limited after all retries exhausted - TikTokExtractionError: Extraction error after all retries exhausted - - Example: - async def update_status(attempt: int): - emojis = ["👀", "🔄", "⏳"] - await message.react([ReactionTypeEmoji(emoji=emojis[attempt - 1])]) - - music_info = await client.music_with_retry( - video_id, - max_attempts=3, - request_timeout=10.0, - on_retry=update_status, - ) - """ - last_error: Exception | None = None + """Extract music info with top-level retry logic and timeout.""" + last_error: Optional[Exception] = None for attempt in range(1, max_attempts + 1): try: - # Call status callback before each attempt if on_retry: await on_retry(attempt) @@ -1562,8 +944,7 @@ async def update_status(attempt: int): except asyncio.TimeoutError as e: logger.warning( - f"Attempt {attempt}/{max_attempts} timed out after " - f"{request_timeout}s for music from video {video_id}" + f"Attempt {attempt}/{max_attempts} timed out for music {video_id}" ) last_error = e @@ -1573,34 +954,20 @@ async def update_status(attempt: int): TikTokExtractionError, ) as e: logger.warning( - f"Attempt {attempt}/{max_attempts} failed for music from video {video_id}: {e}" + f"Attempt {attempt}/{max_attempts} failed for music {video_id}: {e}" ) last_error = e except (TikTokDeletedError, TikTokPrivateError, TikTokRegionError): - # Permanent errors - don't retry, raise immediately raise except TikTokError: - # Any other TikTok error - don't retry raise - # Exponential backoff with jitter (only if not last attempt) if attempt < max_attempts: - # Calculate delay: base_delay * 2^(attempt-1) = 1s, 2s, 4s... delay = base_delay * (2 ** (attempt - 1)) - # Add jitter (±10%) to prevent thundering herd - jitter = delay * 0.1 * (2 * random.random() - 1) - delay = max(0.5, delay + jitter) # Minimum 0.5s delay - logger.debug( - f"Retry backoff: sleeping {delay:.2f}s before attempt {attempt + 1}" - ) - await asyncio.sleep(delay) - - # All attempts exhausted - logger.error( - f"All {max_attempts} attempts failed for music from video {video_id}: {last_error}" - ) + jitter = delay * 0.1 * random.random() + await asyncio.sleep(delay + jitter) if isinstance(last_error, asyncio.TimeoutError): raise TikTokNetworkError(f"Request timed out after {max_attempts} attempts") @@ -1608,9 +975,107 @@ async def update_status(attempt: int): if last_error: raise last_error - raise TikTokExtractionError( - f"Failed to extract music after {max_attempts} attempts" - ) + raise TikTokExtractionError(f"Failed after {max_attempts} attempts") + + # ------------------------------------------------------------------------- + # Public API: Image Download (for slideshows) + # ------------------------------------------------------------------------- + + async def download_image(self, image_url: str, video_info: VideoInfo) -> bytes: + """Download a slideshow image. + + Uses the proxy stored in VideoInfo from the original video() call. + + Args: + image_url: Direct URL to the image + video_info: VideoInfo from video() call (contains proxy info) + + Returns: + Image bytes + + Raises: + TikTokNetworkError: Download failed + """ + # Use proxy from VideoInfo if available and not data_only_proxy + proxy = None + if not self.data_only_proxy and video_info._proxy: + proxy = video_info._proxy + + try: + data, _ = await self._download_media_with_retry( + image_url, proxy, video_info.link + ) + return data + except Exception as e: + raise TikTokNetworkError(f"Failed to download image: {e}") from e + + async def detect_image_format(self, image_url: str, video_info: VideoInfo) -> str: + """Detect image format using HTTP Range request (first 20 bytes). + + Args: + image_url: Direct URL to the image + video_info: VideoInfo from video() call + + Returns: + File extension: ".jpg", ".webp", ".heic", or ".jpg" (default) + """ + proxy = None + if not self.data_only_proxy and video_info._proxy: + proxy = video_info._proxy + + session = await self._get_curl_session() + headers = self._get_headers(video_info.link) + headers["Range"] = "bytes=0-19" + + try: + response = await session.get( + image_url, + headers=headers, + cookies=self._cookies, + proxy=proxy, + timeout=10, + allow_redirects=True, + ) + + if response.status_code in (200, 206): + return self._detect_format_from_bytes(response.content) + return ".heic" + + except Exception as e: + logger.debug(f"Range request failed for {image_url}: {e}") + return ".heic" + + @staticmethod + def _detect_format_from_bytes(data: bytes) -> str: + """Detect image format from magic bytes.""" + if data.startswith(b"\xff\xd8\xff"): + return ".jpg" + elif data.startswith(b"RIFF") and len(data) >= 12 and data[8:12] == b"WEBP": + return ".webp" + elif len(data) >= 12 and ( + data[4:12] == b"ftypheic" or data[4:12] == b"ftypmif1" + ): + return ".heic" + return ".jpg" + + # ------------------------------------------------------------------------- + # Legacy Methods (for backwards compatibility) + # ------------------------------------------------------------------------- + + async def get_video_id_from_mobile(self, link: str) -> Optional[str]: + """Extract video ID from mobile URL (legacy method).""" + try: + full_url = await self._resolve_short_url(link, self._get_initial_proxy()) + return self._extract_video_id(full_url) + except Exception as e: + logger.error(f"Failed to get video ID from mobile link: {e}") + return None + + async def get_video_id(self, link: str, is_mobile: bool) -> Optional[str]: + """Extract video ID from URL (legacy method).""" + if not is_mobile: + return self._extract_video_id(link) + return await self.get_video_id_from_mobile(link) # Backwards compatibility alias diff --git a/tiktok_api/exceptions.py b/tiktok_api/exceptions.py index 704541c..146a2a7 100644 --- a/tiktok_api/exceptions.py +++ b/tiktok_api/exceptions.py @@ -47,3 +47,9 @@ class TikTokVideoTooLongError(TikTokError): """Video exceeds the maximum allowed duration.""" pass + + +class TikTokInvalidLinkError(TikTokError): + """Invalid or unrecognized TikTok video link.""" + + pass diff --git a/tiktok_api/models.py b/tiktok_api/models.py index eb42db9..52cc3c9 100644 --- a/tiktok_api/models.py +++ b/tiktok_api/models.py @@ -2,11 +2,8 @@ from __future__ import annotations -import logging from dataclasses import dataclass, field -from typing import Any, List, Optional, Union - -logger = logging.getLogger(__name__) +from typing import List, Optional, Union @dataclass @@ -17,9 +14,9 @@ class VideoInfo: type: Content type - "video" for videos, "images" for slideshows data: Video bytes (for videos) or list of image URLs (for slideshows) id: Unique TikTok video/post ID - cover: Thumbnail/cover image URL - width: Video width in pixels (None for slideshows) - height: Video height in pixels (None for slideshows) + cover: Thumbnail/cover image URL (only for videos > 1 minute) + width: Video width in pixels (only for videos > 1 minute) + height: Video height in pixels (only for videos > 1 minute) duration: Video duration in seconds (None for slideshows) author: Author's username link: Original TikTok link @@ -37,53 +34,10 @@ class VideoInfo: link: str url: Optional[str] = None # Only present for videos - # Download context for slideshows (set by TikTokClient). - # Contains yt-dlp YoutubeDL instance and TikTok extractor with cookies/auth - # already configured from the extraction phase. This allows image downloads - # to use the same authentication context as the video info extraction. - # Structure: {'ydl': YoutubeDL, 'ie': TikTokIE, 'referer_url': str} - _download_context: Optional[dict[str, Any]] = field(default=None, repr=False) - - # Track whether close() was called to avoid double-close and __del__ warnings - _closed: bool = field(default=False, repr=False) - - def close(self) -> None: - """Close the download context and release resources. - - Call this method when you're done using the VideoInfo, - especially for slideshows where the download context is kept alive - for image downloads. - - This is idempotent - calling close() multiple times is safe. - """ - if self._closed: - return - - if self._download_context and "ydl" in self._download_context: - try: - self._download_context["ydl"].close() - except Exception: - pass # Ignore errors during cleanup - self._download_context = None - - self._closed = True - - def __enter__(self) -> VideoInfo: - """Enter context manager.""" - return self - - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - """Exit context manager and close resources.""" - self.close() - - def __del__(self) -> None: - """Destructor - warn if resources were not explicitly closed.""" - if self._download_context is not None and not self._closed: - logger.warning( - f"VideoInfo(id={self.id}) was garbage collected without close() - " - "potential resource leak. Call close() or use 'with' statement." - ) - self.close() + # Proxy used for this video extraction (for slideshow image downloads) + # This allows image downloads to use the same proxy that successfully + # extracted the video info. + _proxy: Optional[str] = field(default=None, repr=False) @property def is_video(self) -> bool: diff --git a/tiktok_api/proxy_manager.py b/tiktok_api/proxy_manager.py index db835eb..3637911 100644 --- a/tiktok_api/proxy_manager.py +++ b/tiktok_api/proxy_manager.py @@ -2,8 +2,10 @@ import logging import os +import re import threading from typing import Optional +from urllib.parse import quote logger = logging.getLogger(__name__) @@ -40,6 +42,33 @@ def __init__(self, proxy_file: str, include_host: bool = False): self._rotation_lock = threading.Lock() self._load_proxies(proxy_file, include_host) + def _encode_proxy_auth(self, proxy_url: str) -> str: + """URL-encode username and password in proxy URL.""" + from urllib.parse import urlsplit, urlunsplit + + try: + parts = urlsplit(proxy_url) + except Exception: + return proxy_url + + if not parts.username and not parts.password: + return proxy_url + + username = quote(parts.username or "", safe="") + password = quote(parts.password or "", safe="") + + host = parts.hostname or "" + netloc = host + if parts.port: + netloc = f"{host}:{parts.port}" + if parts.username is not None: + if parts.password is None: + netloc = f"{username}@{netloc}" + else: + netloc = f"{username}:{password}@{netloc}" + + return urlunsplit((parts.scheme, netloc, parts.path, parts.query, parts.fragment)) + def _load_proxies(self, file_path: str, include_host: bool) -> None: """Load proxies from file. @@ -70,7 +99,9 @@ def _load_proxies(self, file_path: str, include_host: bool) -> None: # Skip empty lines and comments if not line or line.startswith("#"): continue - self._proxies.append(line) + # URL-encode authentication credentials + encoded_proxy = self._encode_proxy_auth(line) + self._proxies.append(encoded_proxy) except Exception as e: logger.error(f"Failed to load proxy file {file_path}: {e}")