Skip to content

Replace yt-dlp with own downloader based on#56

Closed
karilaa-dev wants to merge 19 commits intomainfrom
dev
Closed

Replace yt-dlp with own downloader based on#56
karilaa-dev wants to merge 19 commits intomainfrom
dev

Conversation

@karilaa-dev
Copy link
Owner

@karilaa-dev karilaa-dev commented Jan 14, 2026

PR Type

Enhancement, Bug fix


Description

  • Replace yt-dlp extraction with curl_cffi parsing

  • Add async-safe shared curl session initialization

  • Implement per-user sequential queue with max depth

  • Encode proxy credentials and add LOG_LEVEL config


Diagram Walkthrough

flowchart LR
  qm["QueueManager: per-user sequential queue"]
  hv["handlers: get_video/get_inline"]
  tc["TikTokClient: resolve/parse/download"]
  pm["ProxyManager: rotating proxies"]
  cs["curl_cffi AsyncSession: impersonation"]
  rs["Short URL resolution"]
  hp["Fetch HTML + parse embedded JSON"]
  md["Download media (video/audio/images)"]
  hv -- "enqueue request" --> qm
  qm -- "run operation" --> tc
  tc -- "get/rotate proxy" --> pm
  tc -- "HTTP with proxy/cookies" --> cs
  cs -- "redirect follow" --> rs
  cs -- "page HTML" --> hp
  cs -- "CDN bytes" --> md
Loading

File Walkthrough

Relevant files
Enhancement
6 files
client.py
Replace yt-dlp runtime with curl_cffi pipeline                     
+660/-1210
queue_manager.py
Implement sequential per-user queue with limits                   
+69/-94 
get_video.py
Switch to new per-user queue flow                                               
+29/-70 
models.py
Remove download context; store extraction proxy                   
+8/-54   
main.py
Initialize TikTokClient session at startup                             
+9/-11   
get_inline.py
Update inline mode to bypass new queue                                     
+3/-9     
Configuration changes
2 files
config.py
Add logging level and TikTok cookies config                           
+36/-0   
loader.py
Configure logging via config-defined log level                     
+23/-12 
Bug fix
1 files
proxy_manager.py
URL-encode proxy auth credentials when loading                     
+32/-1   
Documentation
2 files
__init__.py
Update module docs to curl_cffi approach                                 
+1/-1     
.env.example
Document optional LOG_LEVEL environment variable                 
+3/-0     

@zam-review
Copy link

zam-review bot commented Jan 14, 2026

PR Reviewer Guide 🔍

(Review updated until commit 5c8edd4)

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Resource Leak

Multiple new curl_cffi requests return response objects that are never explicitly closed. If curl_cffi does not always auto-release connections on garbage collection, this can leak connections and eventually exhaust max_clients, causing intermittent hangs/timeouts under load. Validate whether response.close() (or an async close) is required for CurlAsyncSession.get() responses and add deterministic cleanup where needed (short URL resolution, HTML fetch, media download, range request).

    response = await session.get(
        url,
        headers=headers,
        cookies=self._cookies,
        proxy=proxy,
        timeout=15,
        allow_redirects=True,
    )

    final_url = str(response.url)
    logger.debug(f"Resolved short URL: {url} -> {final_url}")
    return final_url

def _extract_video_id(self, url: str) -> Optional[str]:
    """Extract video ID from TikTok URL."""
    match = re.search(r"/(?:video|photo)/(\d+)", url)
    if match:
        return match.group(1)
    return None

async def _resolve_and_extract_id(
    self, video_link: str, proxy: Optional[str]
) -> tuple[str, str, Optional[str]]:
    """Resolve URL and extract video ID with retries.

    Returns:
        Tuple of (full_url, video_id, current_proxy)
    """
    current_proxy = proxy

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            full_url = await self._resolve_short_url(video_link, current_proxy)
            video_id = self._extract_video_id(full_url)

            if not video_id:
                raise TikTokExtractionError(
                    f"Could not extract video ID from {video_link}"
                )

            return full_url, video_id, current_proxy

        except (CurlError, TikTokExtractionError) as e:
            if attempt < MAX_RETRIES:
                current_proxy = self._rotate_proxy(current_proxy)
                delay = 1.0 * (2 ** (attempt - 1))
                logger.warning(
                    f"URL resolution attempt {attempt}/{MAX_RETRIES} failed: {e}"
                )
                await asyncio.sleep(delay)
                continue
            raise TikTokNetworkError(f"Failed to resolve URL: {e}") from e

        except TikTokError:
            raise

        except Exception as e:
            raise TikTokExtractionError(f"URL resolution failed: {e}") from e

    raise TikTokNetworkError(f"URL resolution failed after {MAX_RETRIES} attempts")

# -------------------------------------------------------------------------
# Video Info Extraction
# -------------------------------------------------------------------------

def _parse_webpage_data(
    self, html: str, video_id: str
) -> tuple[dict[str, Any], int]:
    """Parse video data from TikTok webpage HTML.

    Returns:
        Tuple of (video_data dict, status_code)
    """
    # Try UNIVERSAL_DATA first (most common format)
    match = self._universal_data_re.search(html)
    if match:
        try:
            data = json.loads(match.group(1))
            scope = data.get("__DEFAULT_SCOPE__", {})
            video_detail = scope.get("webapp.video-detail", {})
            status = video_detail.get("statusCode", 0)
            item_info = video_detail.get("itemInfo", {})
            video_data = item_info.get("itemStruct", {})
            if video_data:
                logger.debug("Parsed video data from UNIVERSAL_DATA")
                return video_data, status
        except json.JSONDecodeError:
            pass

    # Try SIGI_STATE
    match = self._sigi_state_re.search(html)
    if match:
        try:
            data = json.loads(match.group(1))
            status = data.get("VideoPage", {}).get("statusCode", 0)
            video_data = data.get("ItemModule", {}).get(video_id, {})
            if video_data:
                logger.debug("Parsed video data from SIGI_STATE")
                return video_data, status
        except json.JSONDecodeError:
            pass

    # Try Next.js data
    match = self._next_data_re.search(html)
    if match:
        try:
            data = json.loads(match.group(1))
            page_props = data.get("props", {}).get("pageProps", {})
            status = page_props.get("statusCode", 0)
            video_data = page_props.get("itemInfo", {}).get("itemStruct", {})
            if video_data:
                logger.debug("Parsed video data from NEXT_DATA")
                return video_data, status
        except json.JSONDecodeError:
            pass

    return {}, -1

def _check_status(self, status: int, video_link: str) -> None:
    """Check status code and raise appropriate error."""
    if status == 0:
        return  # Success

    if status in (10216, 10222):
        raise TikTokPrivateError(f"Video {video_link} is private")
    elif status == 10204:
        raise TikTokRegionError(f"Video {video_link} is blocked in your region")
    elif status == 10000:
        raise TikTokDeletedError(f"Video {video_link} was deleted")
    elif status != 0 and status != -1:
        logger.warning(f"Unknown TikTok status code: {status}")

async def _fetch_video_info(
    self, url: str, video_id: str, proxy: Optional[str]
) -> dict[str, Any]:
    """Fetch and parse video info from TikTok webpage."""
    # Normalize photo URLs to video URLs (TikTok serves slideshow data on /video/ endpoint)
    normalized_url = url.replace("/photo/", "/video/")

    session = await self._get_curl_session()
    headers = self._get_headers(normalized_url)

    response = await session.get(
        normalized_url,
        headers=headers,
        cookies=self._cookies,
        proxy=proxy,
        timeout=30,
        allow_redirects=True,
    )

    if response.status_code != 200:
        if response.status_code == 429:
            raise TikTokRateLimitError("Rate limited by TikTok")
        raise TikTokNetworkError(f"HTTP {response.status_code}")

    # Check for login redirect
    final_url = str(response.url)
    if "/login" in urlparse(final_url).path:
        raise TikTokPrivateError("TikTok requires login to access this content")

    html = response.text

    # Debug logging - also serves as a timing buffer for curl_cffi response handling
    # (removing this logging has caused intermittent extraction failures)
    has_universal = "__UNIVERSAL_DATA_FOR_REHYDRATION__" in html
    has_sigi = "SIGI_STATE" in html or "sigi-persisted-data" in html
    has_next = "__NEXT_DATA__" in html
    logger.debug(
        f"Fetched {len(html)} bytes, patterns: UNIVERSAL={has_universal}, "
        f"SIGI={has_sigi}, NEXT={has_next}"
    )

    if not any([has_universal, has_sigi, has_next]):
        # Log preview at DEBUG level to avoid exposing sensitive data in production logs
        logger.debug(f"No data patterns found! HTML preview: {html[:2000]}")

    video_data, status = self._parse_webpage_data(html, video_id)

    self._check_status(status, url)

    if not video_data:
        raise TikTokExtractionError("Unable to extract video data from webpage")

    return video_data

async def _fetch_video_info_with_retry(
    self, url: str, video_id: str, proxy: Optional[str]
) -> tuple[dict[str, Any], Optional[str]]:
    """Fetch video info with retries. Returns (video_data, final_proxy)."""
    current_proxy = proxy
    last_error: Optional[Exception] = None

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            video_data = await self._fetch_video_info(url, video_id, current_proxy)
            return video_data, current_proxy

        except (TikTokNetworkError, TikTokRateLimitError, CurlError) as e:
            last_error = e
            if attempt < MAX_RETRIES:
                current_proxy = self._rotate_proxy(current_proxy)
                delay = 1.0 * (2 ** (attempt - 1))
                logger.warning(
                    f"Video info fetch attempt {attempt}/{MAX_RETRIES} failed: {e}"
                )
                await asyncio.sleep(delay)
                continue
            raise

        except TikTokError:
            raise

        except Exception as e:
            raise TikTokExtractionError(f"Video info fetch failed: {e}") from e

    if last_error:
        raise last_error
    raise TikTokExtractionError("Video info fetch failed")

# -------------------------------------------------------------------------
# Media Download
# -------------------------------------------------------------------------

async def _download_media(
    self,
    media_url: str,
    proxy: Optional[str],
    referer_url: str = "https://www.tiktok.com/",
    timeout: int = 60,
) -> bytes:
    """Download media (video, audio, image) from TikTok CDN."""
    session = await self._get_curl_session()
    headers = self._get_headers(referer_url)

    response = await session.get(
        media_url,
        headers=headers,
        cookies=self._cookies,
        proxy=proxy,
        timeout=timeout,
        allow_redirects=True,
    )

    if response.status_code != 200:
        raise TikTokNetworkError(
            f"Media download failed: HTTP {response.status_code}"
        )

    return response.content
Race Condition

get_user_queue_count() reads _user_queue_counts without taking _dict_lock, while increments/decrements are locked. This can lead to stale/inconsistent counts in user-facing messages and in pre-check logic under concurrency. Consider either making this read locked (async) or clearly treating it as best-effort and avoiding using it for correctness decisions.

def get_user_queue_count(self, user_id: int) -> int:
    """Get current queue count for a user."""
    return self._user_queue_counts.get(user_id, 0)

async def _get_user_lock(self, user_id: int) -> asyncio.Lock:
    """Get or create lock for user."""
    async with self._dict_lock:
        if user_id not in self._user_locks:
            self._user_locks[user_id] = asyncio.Lock()
        return self._user_locks[user_id]

async def _increment_count(self, user_id: int) -> bool:
    """Increment queue count. Returns False if at max."""
    async with self._dict_lock:
        current = self._user_queue_counts.get(user_id, 0)
        if current >= self.max_queue_size:
            return False
        self._user_queue_counts[user_id] = current + 1
        logger.debug(f"User {user_id} queue: {current + 1}/{self.max_queue_size}")
        return True

async def _decrement_count(self, user_id: int) -> None:
    """Decrement queue count and cleanup lock if no longer needed."""
    async with self._dict_lock:
        if user_id in self._user_queue_counts:
            self._user_queue_counts[user_id] -= 1
            if self._user_queue_counts[user_id] <= 0:
                del self._user_queue_counts[user_id]
                # Clean up lock when user has no active requests
                # This prevents unbounded memory growth with many unique users
                if user_id in self._user_locks:
                    del self._user_locks[user_id]
        logger.debug(
            f"User {user_id} queue: {self._user_queue_counts.get(user_id, 0)}/{self.max_queue_size}"
        )
Event Loop Risk

The shared class-level _session_lock is an asyncio.Lock created lazily and reused. If the application ever creates/uses TikTokClient across different event loops (tests, multiprocessing, reloads), reusing a lock bound to a different loop can raise runtime errors. Confirm single-loop usage or make lock/session lifecycle explicitly tied to the running loop.

# Shared curl_cffi session (class-level singleton)
_curl_session: Optional[CurlAsyncSession] = None
_impersonate_target: Optional[str] = None
_session_lock: Optional[asyncio.Lock] = None

@classmethod
def _get_session_lock(cls) -> asyncio.Lock:
    """Get or create session lock. Safe because Lock() creation is atomic."""
    if cls._session_lock is None:
        cls._session_lock = asyncio.Lock()
    return cls._session_lock

@karilaa-dev
Copy link
Owner Author

/describe

@zam-review
Copy link

zam-review bot commented Jan 14, 2026

PR Description updated to latest commit (21b0751)

@karilaa-dev
Copy link
Owner Author

/review

@zam-review
Copy link

zam-review bot commented Jan 14, 2026

Persistent review updated to latest commit 8e4a187

@karilaa-dev
Copy link
Owner Author

/review

@zam-review
Copy link

zam-review bot commented Jan 14, 2026

Persistent review updated to latest commit f92c774

@karilaa-dev
Copy link
Owner Author

/describe

@karilaa-dev
Copy link
Owner Author

/review

@karilaa-dev
Copy link
Owner Author

/improve

@zam-review
Copy link

zam-review bot commented Jan 14, 2026

PR Code Suggestions ✨

Latest suggestions up to 1aab51e

CategorySuggestion                                                                                                                                    Impact
Possible issue
Close HTTP responses reliably

Ensure curl_cffi responses are always closed, otherwise connections can leak and the
shared pool can stall under load. Wrap the request in try/finally and close the
response before returning/raising.

tiktok_api/client.py [548-562]

-response = await session.get(
-    media_url,
-    headers=headers,
-    cookies=self._cookies,
-    proxy=proxy,
-    timeout=timeout,
-    allow_redirects=True,
-)
-
-if response.status_code != 200:
-    raise TikTokNetworkError(
-        f"Media download failed: HTTP {response.status_code}"
+response = None
+try:
+    response = await session.get(
+        media_url,
+        headers=headers,
+        cookies=self._cookies,
+        proxy=proxy,
+        timeout=timeout,
+        allow_redirects=True,
     )
 
-return response.content
+    if response.status_code != 200:
+        raise TikTokNetworkError(
+            f"Media download failed: HTTP {response.status_code}"
+        )
 
+    return response.content
+finally:
+    if response is not None:
+        try:
+            response.close()
+        except Exception:
+            pass
+
Suggestion importance[1-10]: 8

__

Why: The new _download_media() code never closes the curl_cffi response, which can leak pooled connections and stall the shared CurlAsyncSession under load. Wrapping the request in try/finally and calling response.close() is a concrete reliability fix.

Medium
Stream only header bytes

A server may ignore Range and return the full image; response.content would then
download the entire file just to detect the format. Use stream=True and only read
the first 20 bytes, and always close the response to avoid pool exhaustion.

tiktok_api/client.py [1015-1031]

+response = None
 try:
     response = await session.get(
         image_url,
         headers=headers,
         cookies=self._cookies,
         proxy=proxy,
         timeout=10,
         allow_redirects=True,
+        stream=True,
     )
 
-    if response.status_code in (200, 206):
-        return self._detect_format_from_bytes(response.content)
-    return ".heic"
+    if response.status_code not in (200, 206):
+        return ".heic"
+
+    head = b""
+    async for chunk in response.aiter_content(20):
+        head += chunk
+        if len(head) >= 20:
+            break
+    return self._detect_format_from_bytes(head[:20])
 
 except Exception as e:
     logger.debug(f"Range request failed for {image_url}: {e}")
     return ".heic"
+finally:
+    if response is not None:
+        try:
+            response.close()
+        except Exception:
+            pass
Suggestion importance[1-10]: 7

__

Why: As written, detect_image_format() may still download the full image via response.content if the server ignores Range, and it also doesn’t close the response. Using stream=True, reading only the first bytes, and ensuring response.close() improves performance and prevents pool exhaustion.

Medium
Retry on ID extraction failures

Treat "could not extract video ID" as a retryable failure, since it commonly happens
when TikTok serves interstitials/block pages that differ per proxy/IP. Rotate the
proxy and retry instead of failing immediately on the first bad HTML.

tiktok_api/client.py [343-370]

 for attempt in range(1, MAX_RETRIES + 1):
     try:
         full_url = await self._resolve_short_url(video_link, current_proxy)
         video_id = self._extract_video_id(full_url)
 
         if not video_id:
             raise TikTokExtractionError(
                 f"Could not extract video ID from {video_link}"
             )
 
         return full_url, video_id, current_proxy
 
-    except CurlError as e:
+    except (CurlError, TikTokExtractionError) as e:
         if attempt < MAX_RETRIES:
             current_proxy = self._rotate_proxy(current_proxy)
             delay = 1.0 * (2 ** (attempt - 1))
             logger.warning(
                 f"URL resolution attempt {attempt}/{MAX_RETRIES} failed: {e}"
             )
             await asyncio.sleep(delay)
             continue
         raise TikTokNetworkError(f"Failed to resolve URL: {e}") from e
 
     except TikTokError:
         raise
 
     except Exception as e:
         raise TikTokExtractionError(f"URL resolution failed: {e}") from e
Suggestion importance[1-10]: 2

__

Why: Retrying on missing video_id can be reasonable if redirects land on interstitial/login pages, but the proposed improved_code incorrectly converts a TikTokExtractionError into a TikTokNetworkError on the final attempt, changing error semantics for a non-network failure.

Low

Previous suggestions

Suggestions up to commit 21b0751
CategorySuggestion                                                                                                                                    Impact
Possible issue
Close HTTP responses reliably

curl_cffi responses should be explicitly closed; otherwise connections can leak and
exhaust max_clients under load. Wrap the request in try/finally and call
response.close() after reading response.content (apply the same pattern to
_fetch_video_info(), _resolve_short_url(), and detect_image_format() too).

tiktok_api/client.py [483-509]

 async def _download_media(
     self,
     media_url: str,
     proxy: Optional[str],
     referer_url: str = "https://www.tiktok.com/",
     timeout: int = 60,
 ) -> bytes:
     """Download media (video, audio, image) from TikTok CDN."""
     session = self._get_curl_session()
     headers = self._get_headers(referer_url)
 
-    response = await session.get(
-        media_url,
-        headers=headers,
-        cookies=self._cookies,
-        proxy=proxy,
-        timeout=timeout,
-        allow_redirects=True,
-    )
-
-    if response.status_code != 200:
-        raise TikTokNetworkError(
-            f"Media download failed: HTTP {response.status_code}"
+    response = None
+    try:
+        response = await session.get(
+            media_url,
+            headers=headers,
+            cookies=self._cookies,
+            proxy=proxy,
+            timeout=timeout,
+            allow_redirects=True,
         )
 
-    return response.content
+        if response.status_code != 200:
+            raise TikTokNetworkError(
+                f"Media download failed: HTTP {response.status_code}"
+            )
 
+        return response.content
+    finally:
+        if response is not None:
+            try:
+                response.close()
+            except Exception:
+                pass
+
Suggestion importance[1-10]: 7

__

Why: The curl_cffi response objects in _download_media() are not explicitly closed, which can realistically exhaust the shared max_clients pool under load. The proposed try/finally with response.close() matches the shown code and is a meaningful stability fix (though the note about detect_image_format() is outdated for this PR).

Medium
Make HTML JSON parsing robust

The current ([^<]+) capture breaks if the JSON contains newlines or a < character inside
strings (e.g., captions like "<3"), causing intermittent extraction failures. Use a
non-greedy DOTALL capture so the JSON blob is reliably captured.

tiktok_api/client.py [339-342]

 match = re.search(
-    r'<script[^>]+\bid="__UNIVERSAL_DATA_FOR_REHYDRATION__"[^>]*>([^<]+)</script>',
+    r'<script[^>]+\bid="__UNIVERSAL_DATA_FOR_REHYDRATION__"[^>]*>(.*?)</script>',
     html,
+    flags=re.DOTALL,
 )
Suggestion importance[1-10]: 7

__

Why: The current regex capture ([^<]+) can break if the JSON contains newlines or literal < characters inside strings, leading to extraction failures. Switching to a non-greedy DOTALL capture for the script tag content is a solid robustness improvement for parsing __UNIVERSAL_DATA_FOR_REHYDRATION__.

Medium
Validate and close resolver responses

This resolver currently ignores HTTP error statuses and doesn’t close the response,
which can cause silent failures and connection pool exhaustion. Treat non-2xx/3xx as
a retryable network error and close the response in a finally block.

tiktok_api/client.py [249-277]

 async def _resolve_short_url(self, url: str, proxy: Optional[str]) -> str:
     """Resolve short URLs (vm.tiktok.com, vt.tiktok.com, /t/) to full URLs."""
     parsed = urlparse(url)
     host = (parsed.hostname or "").lower()
     path = parsed.path or ""
 
     is_short_url = host in {"vm.tiktok.com", "vt.tiktok.com"} or (
         host in {"www.tiktok.com", "tiktok.com"} and path.startswith("/t/")
     )
 
     if not is_short_url:
         return url
 
     session = self._get_curl_session()
     headers = self._get_headers()
 
-    response = await session.get(
-        url,
-        headers=headers,
-        cookies=self._cookies,
-        proxy=proxy,
-        timeout=15,
-        allow_redirects=True,
-    )
+    response = None
+    try:
+        response = await session.get(
+            url,
+            headers=headers,
+            cookies=self._cookies,
+            proxy=proxy,
+            timeout=15,
+            allow_redirects=True,
+        )
 
-    final_url = str(response.url)
-    logger.debug(f"Resolved short URL: {url} -> {final_url}")
-    return final_url
+        # If TikTok blocks the redirect resolution, fail fast so caller can retry/rotate proxy.
+        if response.status_code >= 400:
+            raise TikTokNetworkError(f"Short URL resolution failed: HTTP {response.status_code}")
 
+        final_url = str(response.url)
+        logger.debug(f"Resolved short URL: {url} -> {final_url}")
+        return final_url
+    finally:
+        if response is not None:
+            try:
+                response.close()
+            except Exception:
+                pass
+
Suggestion importance[1-10]: 5

__

Why: Adding an HTTP status check and closing the response in _resolve_short_url() is correct and prevents using a bad redirect result while also avoiding connection leaks. However, raising TikTokNetworkError here may not actually be retried by _resolve_and_extract_id() as written (it retries CurlError), so it’s an incomplete “retryable” change.

Low

@zam-review
Copy link

zam-review bot commented Jan 14, 2026

Persistent review updated to latest commit 1aab51e

@zam-review
Copy link

zam-review bot commented Jan 14, 2026

PR Description updated to latest commit (1aab51e)

@karilaa-dev
Copy link
Owner Author

/review

@zam-review
Copy link

zam-review bot commented Jan 14, 2026

Persistent review updated to latest commit 5c8edd4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant