diff --git a/.env.example b/.env.example index f56eb79..da4dce6 100644 --- a/.env.example +++ b/.env.example @@ -1,10 +1,17 @@ +TG_SERVER=http://telegram-bot-api:8081 +TELEGRAM_API_ID=1234567 +TELEGRAM_API_HASH=abc123 + +DB_URL=postgresql://postgres:postgres@db/ttbot-db # tt-bot BOT_TOKEN=12345:abcde ADMIN_IDS=[1234567] # SECOND_IDS=[1234567] JOIN_LOGS=-1234567 STORAGE_CHANNEL_ID=12345 - +# API settings +BOTSTAT=abcdefg12345 +MONETAG_URL=https://example.com/your-monetag-link/ # stats-bot STATS_BOT_TOKEN=12345:abcde STATS_IDS=[-1234567] @@ -12,53 +19,32 @@ STATS_CHAT=-1234567 STATS_MESSAGE_ID=23 DAILY_STATS_MESSAGE_ID=24 -# API settings -BOTSTAT=abcdefg12345 -MONETAG_URL=https://example.com/your-monetag-link/ - # Logging settings (optional) # LOG_LEVEL=INFO # Options: DEBUG, INFO, WARNING, ERROR, CRITICAL # yt-dlp settings (optional) -# YTDLP_COOKIES=cookies.txt +YTDLP_COOKIES=cookies.txt # Proxy settings (load balancing with multiple proxies) # Path to file with proxy list (one proxy URL per line) -# PROXY_FILE=proxies.txt +PROXY_FILE=proxies.txt # Use proxy only for TikTok API requests, not for media downloads # PROXY_DATA_ONLY=false # Include host machine's direct IP in round-robin rotation # PROXY_INCLUDE_HOST=false -# Performance settings (for high-throughput scenarios) -# ThreadPoolExecutor workers for sync yt-dlp extraction (default: 128) -# THREAD_POOL_SIZE=128 -# Total aiohttp connection pool size for URL resolution (default: 200) -# AIOHTTP_POOL_SIZE=200 -# Per-host connection limit (default: 50) -# AIOHTTP_LIMIT_PER_HOST=50 -# Max parallel image downloads per slideshow (default: 20) -# MAX_CONCURRENT_IMAGES=20 -# curl_cffi connection pool size for media downloads (default: 200) -# CURL_POOL_SIZE=200 -# Use streaming for videos longer than this (seconds, default: 300 = 5 min) -# STREAMING_DURATION_THRESHOLD=300 -# Maximum video duration in seconds (default: 1800 = 30 min, 0 = no limit) -# MAX_VIDEO_DURATION=1800 - -# Queue settings (optional, defaults shown) -# MAX_USER_QUEUE_SIZE=3 - # Retry settings - 3-part retry strategy with proxy rotation # Part 1: URL resolution retries (short URLs to full URLs) -# URL_RESOLVE_MAX_RETRIES=3 +URL_RESOLVE_MAX_RETRIES=3 # Part 2: Video info extraction retries (metadata) -# VIDEO_INFO_MAX_RETRIES=3 +VIDEO_INFO_MAX_RETRIES=3 # Part 3: Download retries (video/images/audio) -# DOWNLOAD_MAX_RETRIES=3 - -# Telegram Bot API -TG_SERVER=http://telegram-bot-api:8081 - -# db -DB_URL=postgresql://postgres:postgres@db/ttbot-db +DOWNLOAD_MAX_RETRIES=3 + +# Limits (optional, 0 = no limit) +# Max concurrent videos per user in queue +MAX_USER_QUEUE_SIZE=0 +# Use streaming for videos longer than this (seconds, 0 = never stream) +STREAMING_DURATION_THRESHOLD=300 +# Maximum video duration in seconds (0 = no limit) +MAX_VIDEO_DURATION=0 diff --git a/data/config.py b/data/config.py index 2129c25..c2aedb8 100644 --- a/data/config.py +++ b/data/config.py @@ -118,14 +118,7 @@ class ProxyConfig(TypedDict): class PerformanceConfig(TypedDict): """Type definition for performance configuration.""" - thread_pool_size: int # ThreadPoolExecutor workers for sync yt-dlp calls - aiohttp_pool_size: int # Total aiohttp connection pool size (legacy, for redirects) - aiohttp_limit_per_host: int # Per-host connection limit - max_concurrent_images: int # Max parallel image downloads per slideshow - curl_pool_size: int # curl_cffi max_clients for media downloads - streaming_duration_threshold: ( - int # Use streaming for videos longer than this (seconds) - ) + streaming_duration_threshold: int # Use streaming for videos longer than this (seconds), 0 = never max_video_duration: int # Maximum video duration in seconds (0 = no limit) @@ -173,7 +166,7 @@ class Config(TypedDict): "daily_stats_message_id": os.getenv("DAILY_STATS_MESSAGE_ID", "0"), }, "queue": { - "max_user_queue_size": _parse_int_env("MAX_USER_QUEUE_SIZE", 3), + "max_user_queue_size": _parse_int_env("MAX_USER_QUEUE_SIZE", 0), # 0 = no limit }, "retry": { "url_resolve_max_retries": _parse_int_env("URL_RESOLVE_MAX_RETRIES", 3), @@ -186,15 +179,8 @@ class Config(TypedDict): "include_host": os.getenv("PROXY_INCLUDE_HOST", "false").lower() == "true", }, "performance": { - "thread_pool_size": _parse_int_env("THREAD_POOL_SIZE", 128), - "aiohttp_pool_size": _parse_int_env("AIOHTTP_POOL_SIZE", 200), - "aiohttp_limit_per_host": _parse_int_env("AIOHTTP_LIMIT_PER_HOST", 50), - "max_concurrent_images": _parse_int_env("MAX_CONCURRENT_IMAGES", 20), - "curl_pool_size": _parse_int_env("CURL_POOL_SIZE", 200), - "streaming_duration_threshold": _parse_int_env( - "STREAMING_DURATION_THRESHOLD", 300 - ), - "max_video_duration": _parse_int_env("MAX_VIDEO_DURATION", 1800), # 30 minutes + "streaming_duration_threshold": _parse_int_env("STREAMING_DURATION_THRESHOLD", 300), + "max_video_duration": _parse_int_env("MAX_VIDEO_DURATION", 0), # 0 = no limit }, "logging": { "log_level": _parse_log_level("LOG_LEVEL", "INFO"), diff --git a/handlers/get_music.py b/handlers/get_music.py index 8ed2f21..d0d7414 100644 --- a/handlers/get_music.py +++ b/handlers/get_music.py @@ -24,12 +24,10 @@ async def send_tiktok_sound(callback_query: CallbackQuery): chat_id = call_msg.chat.id video_id = callback_query.data.lstrip("id/") status_message = None - # Api init with proxy support and performance settings + # Api init with proxy support api = TikTokClient( proxy_manager=ProxyManager.get_instance(), data_only_proxy=config["proxy"]["data_only"], - aiohttp_pool_size=config["performance"]["aiohttp_pool_size"], - aiohttp_limit_per_host=config["performance"]["aiohttp_limit_per_host"], ) # Group chat set group_chat = call_msg.chat.type != "private" diff --git a/handlers/get_video.py b/handlers/get_video.py index 1150ee9..47cb205 100644 --- a/handlers/get_video.py +++ b/handlers/get_video.py @@ -41,12 +41,10 @@ def try_again_button(lang: str): @video_router.message(F.text) async def send_tiktok_video(message: Message): - # Api init with proxy support and performance settings + # Api init with proxy support api = TikTokClient( proxy_manager=ProxyManager.get_instance(), data_only_proxy=config["proxy"]["data_only"], - aiohttp_pool_size=config["performance"]["aiohttp_pool_size"], - aiohttp_limit_per_host=config["performance"]["aiohttp_limit_per_host"], ) # Status message var status_message = False @@ -77,15 +75,17 @@ async def send_tiktok_video(message: Message): await message.reply(locale[lang]["link_error"]) return - # Check per-user queue limit before proceeding - user_queue_count = queue.get_user_queue_count(message.chat.id) - if user_queue_count >= queue_config["max_user_queue_size"]: - if not group_chat: - await message.reply( - locale[lang]["error_queue_full"].format(user_queue_count), - reply_markup=try_again_button(lang), - ) - return + # Check per-user queue limit before proceeding (0 = no limit) + max_queue = queue_config["max_user_queue_size"] + if max_queue > 0: + user_queue_count = queue.get_user_queue_count(message.chat.id) + if user_queue_count >= max_queue: + if not group_chat: + await message.reply( + locale[lang]["error_queue_full"].format(user_queue_count), + reply_markup=try_again_button(lang), + ) + return # Try to send initial reaction to show processing started try: diff --git a/main.py b/main.py index 2698dd2..71b43fd 100644 --- a/main.py +++ b/main.py @@ -18,15 +18,6 @@ async def main() -> None: await setup_db(config["bot"]["db_url"]) - # Configure TikTokClient executor size for high throughput - # Must be called before any TikTokClient is instantiated - TikTokClient.set_executor_size(config["performance"]["thread_pool_size"]) - logging.info( - f"TikTokClient configured: executor={config['performance']['thread_pool_size']} workers, " - f"aiohttp_pool={config['performance']['aiohttp_pool_size']}, " - f"limit_per_host={config['performance']['aiohttp_limit_per_host']}" - ) - # Initialize proxy manager if configured if config["proxy"]["proxy_file"]: ProxyManager.initialize( diff --git a/misc/queue_manager.py b/misc/queue_manager.py index b685c6a..a3b3e6d 100644 --- a/misc/queue_manager.py +++ b/misc/queue_manager.py @@ -83,7 +83,7 @@ async def acquire_info_for_user( True if acquired successfully, False if user limit exceeded """ async with self._lock: - if not bypass_user_limit: + if not bypass_user_limit and self.max_user_queue > 0: current_count = self._user_info_counts.get(user_id, 0) if current_count >= self.max_user_queue: logger.debug( diff --git a/misc/video_types.py b/misc/video_types.py index 001a955..36f362b 100644 --- a/misc/video_types.py +++ b/misc/video_types.py @@ -284,9 +284,9 @@ async def send_video_result( if not isinstance(video_data, bytes): raise ValueError("Video data must be bytes for inline messages") - # Download thumbnail for videos > 1 minute + # Download thumbnail for videos > 30 seconds thumbnail = None - if video_duration and video_duration > 60: + if video_duration and video_duration > 30: thumbnail = await download_thumbnail(video_info.cover, video_id) # Upload to storage channel to get file_id @@ -440,35 +440,23 @@ async def download_images_parallel( image_urls: list[str], client: TikTokClient, video_info: VideoInfo, - max_concurrent: int | None = None, ) -> list[bytes | BaseException]: """ - Download multiple images in parallel with concurrency limit. + Download multiple images in parallel with no concurrency limit. - Uses semaphore to prevent overwhelming TikTok CDN while maximizing throughput. + Downloads all images simultaneously for maximum throughput. This is significantly faster than sequential downloads for slideshows. Args: image_urls: List of image URLs to download client: TikTokClient instance for authenticated downloads video_info: VideoInfo containing download context - max_concurrent: Maximum concurrent downloads. If None, uses config value. Returns: List of image bytes (or exceptions for failed downloads) """ - if max_concurrent is None: - perf_config = config.get("performance") - max_concurrent = perf_config["max_concurrent_images"] if perf_config else 20 - - semaphore = asyncio.Semaphore(max_concurrent) - - async def download_with_limit(url: str) -> bytes: - async with semaphore: - return await client.download_image(url, video_info) - return await asyncio.gather( - *[download_with_limit(url) for url in image_urls], + *[client.download_image(url, video_info) for url in image_urls], return_exceptions=True, # Don't fail all if one fails ) diff --git a/tiktok_api/__init__.py b/tiktok_api/__init__.py index b882d73..91665a2 100644 --- a/tiktok_api/__init__.py +++ b/tiktok_api/__init__.py @@ -12,7 +12,6 @@ >>> client = TikTokClient(proxy_manager=proxy_manager, data_only_proxy=True) >>> try: ... video_info = await client.video("https://www.tiktok.com/@user/video/123") - ... print(video_info.author) ... print(video_info.id) ... if video_info.is_video: ... print(f"Duration: {video_info.duration}s") diff --git a/tiktok_api/client.py b/tiktok_api/client.py index ccd422a..c36aab5 100644 --- a/tiktok_api/client.py +++ b/tiktok_api/client.py @@ -159,25 +159,22 @@ class TikTokClient: cookies: Optional path to a Netscape-format cookies file (e.g., exported from browser). If not provided, uses YTDLP_COOKIES env var. If the file doesn't exist, a warning is logged and cookies are not used. - aiohttp_pool_size: Total connection pool size for async downloads. Default: 200. - aiohttp_limit_per_host: Per-host connection limit. Default: 50. Example: >>> from tiktok_api import TikTokClient, ProxyManager >>> proxy_manager = ProxyManager.initialize("proxies.txt", include_host=True) >>> client = TikTokClient(proxy_manager=proxy_manager, data_only_proxy=True) >>> video_info = await client.video("https://www.tiktok.com/@user/video/123") - >>> print(video_info.author) >>> print(video_info.duration) # With cookies for authenticated requests: >>> client = TikTokClient(cookies="cookies.txt") """ - # Configurable executor - call set_executor_size() before first use + # Thread pool for sync yt-dlp extraction calls _executor: Optional[ThreadPoolExecutor] = None _executor_lock = threading.Lock() - _executor_size: int = 128 # Default, configurable via set_executor_size() + _executor_size: int = 500 # High value for maximum throughput _aiohttp_connector: Optional[TCPConnector] = None _connector_lock = threading.Lock() @@ -263,16 +260,12 @@ def _get_curl_session(cls) -> CurlAsyncSession: The session uses yt-dlp's BROWSER_TARGETS to select the best impersonation target, ensuring TLS fingerprint matches a real browser. - - Pool size is configurable via CURL_POOL_SIZE environment variable. """ with cls._curl_session_lock: # Check if session needs to be created # Note: CurlAsyncSession doesn't have is_closed, we track via _curl_session being None if cls._curl_session is None: - perf_config = config.get("performance", {}) - pool_size = perf_config.get("curl_pool_size", 200) - + pool_size = 10000 # High value for maximum throughput cls._impersonate_target = cls._get_impersonate_target() cls._curl_session = CurlAsyncSession( impersonate=cls._impersonate_target, @@ -297,24 +290,6 @@ async def close_curl_session(cls) -> None: except Exception as e: logger.debug(f"Error closing curl_cffi session: {e}") - @classmethod - def set_executor_size(cls, size: int) -> None: - """Set executor size before first use. Call at app startup. - - Args: - size: Number of worker threads for sync yt-dlp extraction calls. - Higher values allow more concurrent extractions. - """ - with cls._executor_lock: - if cls._executor is not None: - logger.warning( - f"Executor already created with {cls._executor._max_workers} workers. " - f"set_executor_size({size}) has no effect. Call before first TikTokClient use." - ) - return - cls._executor_size = size - logger.info(f"TikTokClient executor size set to {size}") - @classmethod def _get_executor(cls) -> ThreadPoolExecutor: """Get or create the shared ThreadPoolExecutor.""" @@ -330,21 +305,17 @@ def _get_executor(cls) -> ThreadPoolExecutor: return cls._executor @classmethod - def _get_connector(cls, pool_size: int, limit_per_host: int = 50) -> TCPConnector: + def _get_connector(cls) -> TCPConnector: """Get or create shared aiohttp connector for URL resolution. Note: This is only used for resolving short URLs (vm.tiktok.com, vt.tiktok.com). Media downloads use curl_cffi for browser impersonation. - - Args: - pool_size: Total connection pool size - limit_per_host: Maximum connections per host """ with cls._connector_lock: if cls._aiohttp_connector is None or cls._aiohttp_connector.closed: cls._aiohttp_connector = TCPConnector( - limit=pool_size, - limit_per_host=limit_per_host, + limit=0, # Unlimited connections + limit_per_host=0, # Unlimited per-host connections ttl_dns_cache=300, enable_cleanup_closed=True, force_close=False, # Keep connections alive for reuse @@ -375,13 +346,9 @@ def __init__( proxy_manager: Optional["ProxyManager"] = None, data_only_proxy: bool = False, cookies: Optional[str] = None, - aiohttp_pool_size: int = 200, - aiohttp_limit_per_host: int = 50, ): self.proxy_manager = proxy_manager self.data_only_proxy = data_only_proxy - self.aiohttp_pool_size = aiohttp_pool_size - self.aiohttp_limit_per_host = aiohttp_limit_per_host # Handle cookies with validation cookies_path = cookies or os.getenv("YTDLP_COOKIES") @@ -729,9 +696,7 @@ async def _resolve_url( if not is_short_url: return url - connector = self._get_connector( - self.aiohttp_pool_size, self.aiohttp_limit_per_host - ) + connector = self._get_connector() timeout = ClientTimeout(total=15, connect=5, sock_read=10) last_error: Optional[Exception] = None @@ -1349,21 +1314,20 @@ async def download_slideshow_images( video_info: VideoInfo, proxy_session: ProxySession, max_retries: Optional[int] = None, - max_concurrent: Optional[int] = None, ) -> list[bytes]: """Download all slideshow images with individual retry per image. Part 3 of the 3-part retry strategy for slideshows. - Downloads all images in parallel. If an individual image fails, - retries only that image with a new proxy (up to max_retries per image). - Successfully downloaded images are kept. + Downloads all images in parallel with no concurrency limit. + If an individual image fails, retries only that image with a + new proxy (up to max_retries per image). Successfully downloaded + images are kept. Args: video_info: VideoInfo object containing image URLs in data field proxy_session: ProxySession for proxy management max_retries: Maximum retry attempts per image (default from config) - max_concurrent: Maximum concurrent downloads (default from config) Returns: List of image bytes in the same order as input URLs @@ -1382,10 +1346,6 @@ async def download_slideshow_images( retry_config = config.get("retry", {}) max_retries = retry_config.get("download_max_retries", 3) - if max_concurrent is None: - perf_config = config.get("performance", {}) - max_concurrent = perf_config.get("max_concurrent_images", 20) - image_urls: list[str] = video_info.data # type: ignore num_images = len(image_urls) @@ -1394,21 +1354,18 @@ async def download_slideshow_images( retry_counts: list[int] = [0] * num_images failed_indices: set[int] = set(range(num_images)) # All start as "to download" - semaphore = asyncio.Semaphore(max_concurrent) - async def download_single_image( index: int, url: str, proxy: Optional[str] ) -> Tuple[int, Optional[bytes], Optional[Exception]]: - """Download a single image with semaphore limiting.""" - async with semaphore: - context_with_proxy = {**video_info._download_context, "proxy": proxy} - try: - result = await self._download_media_async( - url, context_with_proxy, max_retries=1 - ) - return index, result, None - except Exception as e: - return index, None, e + """Download a single image.""" + context_with_proxy = {**video_info._download_context, "proxy": proxy} + try: + result = await self._download_media_async( + url, context_with_proxy, max_retries=1 + ) + return index, result, None + except Exception as e: + return index, None, e # First pass: download all images in parallel proxy = proxy_session.get_proxy() @@ -1661,7 +1618,6 @@ async def video(self, video_link: str) -> VideoInfo: width=None, height=None, duration=None, - author=author, link=video_link, url=None, _download_context=download_context, @@ -1714,7 +1670,6 @@ async def video(self, video_link: str) -> VideoInfo: # Extract remaining metadata from raw data width = video_info_data.get("width") height = video_info_data.get("height") - author = video_data.get("author", {}).get("uniqueId", "") cover = video_info_data.get("cover") or video_info_data.get("originCover") return VideoInfo( @@ -1725,7 +1680,6 @@ async def video(self, video_link: str) -> VideoInfo: width=int(width) if width else None, height=int(height) if height else None, duration=duration, - author=author, link=video_link, url=video_url, ) diff --git a/tiktok_api/models.py b/tiktok_api/models.py index 9d419f3..6d2b155 100644 --- a/tiktok_api/models.py +++ b/tiktok_api/models.py @@ -24,7 +24,6 @@ class VideoInfo: width: Video width in pixels (None for slideshows) height: Video height in pixels (None for slideshows) duration: Video duration in seconds (None for slideshows) - author: Author's username link: Original TikTok link url: Direct video URL (only present for videos, None for slideshows) """ @@ -36,7 +35,6 @@ class VideoInfo: width: Optional[int] height: Optional[int] duration: Optional[int] - author: str link: str url: Optional[str] = None # Only present for videos