From d218f7291f493252157c9fb7457b2aa43e92aee0 Mon Sep 17 00:00:00 2001 From: charlieroth Date: Tue, 26 Aug 2025 15:50:05 +0200 Subject: [PATCH 1/4] feat: implement network fetcher for page content - Add robust HTTP client with timeouts (10s connect, 30s total) and redirect handling - Implement comprehensive error classification (retryable vs permanent failures) - Add content decoding pipeline with charset detection and UTF-8 normalization - Support gzip/brotli/deflate compression and 5MB size limits - Create FetchPageJobHandler for background URL fetching - Add 13 comprehensive tests covering timeouts, redirects, compression, errors - Integrate with job runner system and database storage - Add dependencies: encoding_rs, chardetng, url, bytes, md5, once_cell Components: - src/fetcher/ - Core HTTP fetching module - src/jobs/handlers/fetch_page.rs - Job handler for background processing - tests/fetcher_client.rs - Comprehensive test suite - Updated SQLx offline query cache Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-2126140c-a3e6-48a0-a6cc-2e924d3c6344 --- ...3314e0aca3fdabb00a69655a0a814e69369b9.json | 14 ++ ...82b3dc77d653da543824011f8e5ad9a23c2db.json | 16 ++ ...2520f9b4bff625e6c5b9f13c58c6f2253ce35.json | 22 ++ Cargo.lock | 182 +++++++++++++++ Cargo.toml | 10 +- src/bin/worker.rs | 3 +- src/fetcher/client.rs | 86 +++++++ src/fetcher/errors.rs | 87 +++++++ src/fetcher/mod.rs | 8 + src/fetcher/pipeline.rs | 152 ++++++++++++ src/fetcher/types.rs | 50 ++++ src/jobs/handlers/fetch_page.rs | 118 ++++++++++ src/jobs/handlers/mod.rs | 2 + src/lib.rs | 1 + tests/fetcher_client.rs | 216 ++++++++++++++++++ 15 files changed, 965 insertions(+), 2 deletions(-) create mode 100644 .sqlx/query-0172fcca626dff44dd82b5d1ed13314e0aca3fdabb00a69655a0a814e69369b9.json create mode 100644 .sqlx/query-bf8c6f90d1f48123ebb78fdad9482b3dc77d653da543824011f8e5ad9a23c2db.json create mode 100644 .sqlx/query-c5e8327ca35d8018f65320c35802520f9b4bff625e6c5b9f13c58c6f2253ce35.json create mode 100644 src/fetcher/client.rs create mode 100644 src/fetcher/errors.rs create mode 100644 src/fetcher/mod.rs create mode 100644 src/fetcher/pipeline.rs create mode 100644 src/fetcher/types.rs create mode 100644 src/jobs/handlers/fetch_page.rs create mode 100644 tests/fetcher_client.rs diff --git a/.sqlx/query-0172fcca626dff44dd82b5d1ed13314e0aca3fdabb00a69655a0a814e69369b9.json b/.sqlx/query-0172fcca626dff44dd82b5d1ed13314e0aca3fdabb00a69655a0a814e69369b9.json new file mode 100644 index 0000000..15ff016 --- /dev/null +++ b/.sqlx/query-0172fcca626dff44dd82b5d1ed13314e0aca3fdabb00a69655a0a814e69369b9.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE items SET status = 'fetched', updated_at = NOW() WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "0172fcca626dff44dd82b5d1ed13314e0aca3fdabb00a69655a0a814e69369b9" +} diff --git a/.sqlx/query-bf8c6f90d1f48123ebb78fdad9482b3dc77d653da543824011f8e5ad9a23c2db.json b/.sqlx/query-bf8c6f90d1f48123ebb78fdad9482b3dc77d653da543824011f8e5ad9a23c2db.json new file mode 100644 index 0000000..113800d --- /dev/null +++ b/.sqlx/query-bf8c6f90d1f48123ebb78fdad9482b3dc77d653da543824011f8e5ad9a23c2db.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO contents (item_id, html, text, lang, extracted_at, checksum)\n VALUES ($1, $2, NULL, NULL, NOW(), $3)\n ON CONFLICT (item_id) \n DO UPDATE SET \n html = EXCLUDED.html,\n extracted_at = EXCLUDED.extracted_at,\n checksum = EXCLUDED.checksum\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "bf8c6f90d1f48123ebb78fdad9482b3dc77d653da543824011f8e5ad9a23c2db" +} diff --git a/.sqlx/query-c5e8327ca35d8018f65320c35802520f9b4bff625e6c5b9f13c58c6f2253ce35.json b/.sqlx/query-c5e8327ca35d8018f65320c35802520f9b4bff625e6c5b9f13c58c6f2253ce35.json new file mode 100644 index 0000000..b8110e0 --- /dev/null +++ b/.sqlx/query-c5e8327ca35d8018f65320c35802520f9b4bff625e6c5b9f13c58c6f2253ce35.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT url FROM items WHERE id = $1 FOR UPDATE", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "url", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false + ] + }, + "hash": "c5e8327ca35d8018f65320c35802520f9b4bff625e6c5b9f13c58c6f2253ce35" +} diff --git a/Cargo.lock b/Cargo.lock index 105beb7..1248621 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -86,6 +101,32 @@ dependencies = [ "password-hash", ] +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] +name = "async-compression" +version = "0.4.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6448dfb3960f0b038e88c781ead1e7eb7929dfc3a71a1336ec9086c00f6d1e75" +dependencies = [ + "brotli", + "compression-codecs", + "compression-core", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -307,6 +348,27 @@ dependencies = [ "syn", ] +[[package]] +name = "brotli" +version = "8.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -333,11 +395,17 @@ dependencies = [ "argon2", "async-trait", "axum 0.8.4", + "bytes", + "chardetng", "chrono", "dashmap", + "encoding_rs", + "flate2", "hyper", "jsonwebtoken", + "md5", "mockall", + "once_cell", "rand", "regex", "reqwest", @@ -353,10 +421,12 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", + "url", "utoipa", "utoipa-axum", "utoipa-swagger-ui", "uuid", + "wiremock", ] [[package]] @@ -382,6 +452,17 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" +[[package]] +name = "chardetng" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b8f0b65b7b08ae3c8187e8d77174de20cb6777864c6b832d8ad365999cf1ea" +dependencies = [ + "cfg-if", + "encoding_rs", + "memchr", +] + [[package]] name = "chrono" version = "0.4.41" @@ -397,6 +478,26 @@ dependencies = [ "windows-link", ] +[[package]] +name = "compression-codecs" +version = "0.4.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46cc6539bf1c592cff488b9f253b30bc0ec50d15407c2cf45e27bd8f308d5905" +dependencies = [ + "brotli", + "compression-core", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", +] + +[[package]] +name = "compression-core" +version = "0.4.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2957e823c15bde7ecf1e8b64e537aa03a6be5fda0e2334e99887669e75b12e01" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -592,6 +693,23 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "deadpool" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ed5957ff93768adf7a65ab167a17835c3d2c3c50d084fe305174c112f468e2f" +dependencies = [ + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "der" version = "0.7.10" @@ -858,6 +976,21 @@ dependencies = [ "new_debug_unreachable", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -931,6 +1064,7 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -1052,6 +1186,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -1628,6 +1768,12 @@ dependencies = [ "digest", ] +[[package]] +name = "md5" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" + [[package]] name = "measure_time" version = "0.9.0" @@ -1832,6 +1978,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.36.7" @@ -2283,10 +2439,12 @@ version = "0.12.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" dependencies = [ + "async-compression", "base64", "bytes", "encoding_rs", "futures-core", + "futures-util", "h2", "http", "http-body", @@ -2308,6 +2466,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", + "tokio-util", "tower", "tower-http", "tower-service", @@ -4176,6 +4335,29 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" +[[package]] +name = "wiremock" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08db1edfb05d9b3c1542e521aea074442088292f00b5f28e435c714a98f85031" +dependencies = [ + "assert-json-diff", + "base64", + "deadpool", + "futures", + "http", + "http-body-util", + "hyper", + "hyper-util", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" diff --git a/Cargo.toml b/Cargo.toml index 507a85a..c467d94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,8 +22,14 @@ axum = { version = "0.8.4" } utoipa = { version = "5.4.0", features = ["axum_extras", "uuid", "chrono"] } utoipa-axum = { version = "0.1.0" } utoipa-swagger-ui = { version = "9.0.2", features = ["axum"] } -reqwest = { version = "0.12.23", features = ["json"] } +reqwest = { version = "0.12.23", features = ["json", "gzip", "brotli", "deflate"] } scraper = { version = "0.24.0" } +url = { version = "2.5" } +bytes = { version = "1.5" } +encoding_rs = { version = "0.8" } +chardetng = { version = "0.1" } +once_cell = { version = "1.19" } +md5 = { version = "0.7" } tantivy = { version = "0.24.2" } jsonwebtoken = { version = "9.3.1" } argon2 = { version = "0.5.3" } @@ -48,3 +54,5 @@ rand = { version = "0.8" } mockall = "0.13" tower = "0.5" hyper = "1.0" +wiremock = "0.6" +flate2 = "1.0" diff --git a/src/bin/worker.rs b/src/bin/worker.rs index 34c6cfe..fd303c4 100644 --- a/src/bin/worker.rs +++ b/src/bin/worker.rs @@ -1,7 +1,7 @@ use anyhow::Result; use capsule::{ config::Config, - jobs::{ExampleJobHandler, JobRegistry, WorkerConfig, WorkerSupervisor}, + jobs::{ExampleJobHandler, FetchPageJobHandler, JobRegistry, WorkerConfig, WorkerSupervisor}, }; #[tokio::main] @@ -26,6 +26,7 @@ async fn main() -> Result<()> { // Create job registry and register handlers let mut registry = JobRegistry::new(); registry.register(ExampleJobHandler); + registry.register(FetchPageJobHandler::new()); // Create worker configuration let worker_config = WorkerConfig { diff --git a/src/fetcher/client.rs b/src/fetcher/client.rs new file mode 100644 index 0000000..64e8cde --- /dev/null +++ b/src/fetcher/client.rs @@ -0,0 +1,86 @@ +use crate::fetcher::{errors::FetchError, pipeline::process_response, types::PageResponse}; +use once_cell::sync::Lazy; +use reqwest::{Client, ClientBuilder}; +use std::time::Duration; +use tracing::instrument; + +const MAX_BODY_SIZE: u64 = 5 * 1024 * 1024; // 5MB +const USER_AGENT: &str = "CapsuleBot/0.1 (+https://capsule.example.com)"; + +static HTTP_CLIENT: Lazy = Lazy::new(|| { + ClientBuilder::new() + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(30)) + .user_agent(USER_AGENT) + .redirect(reqwest::redirect::Policy::limited(10)) + .default_headers({ + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + reqwest::header::ACCEPT, + "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" + .parse() + .unwrap(), + ); + headers + }) + .build() + .expect("Failed to build HTTP client") +}); + +pub fn get_client() -> &'static Client { + &HTTP_CLIENT +} + +#[instrument(skip_all, fields(url = %url))] +pub async fn fetch(url: &str) -> Result { + let parsed_url = url::Url::parse(url)?; + + let response = HTTP_CLIENT + .get(parsed_url.clone()) + .send() + .await + .map_err(FetchError::from_reqwest_error)?; + + // Check content length before downloading + if let Some(content_length) = response.content_length() { + if content_length > MAX_BODY_SIZE { + return Err(FetchError::BodyTooLarge(content_length)); + } + } + + let final_url = response.url().clone(); + let status = response.status(); + let headers = response.headers().clone(); + + // Check if we got a successful response + if !status.is_success() { + return Err(FetchError::Http { + status, + retriable: status.is_server_error(), + }); + } + + // Get content type + let content_type = headers + .get(reqwest::header::CONTENT_TYPE) + .and_then(|ct| ct.to_str().ok()) + .unwrap_or("text/html") + .to_string(); + + // Only process HTML content for now + if !content_type.contains("text/html") && !content_type.contains("application/xhtml") { + return Err(FetchError::UnsupportedContentType(content_type.clone())); + } + + let body_bytes = response + .bytes() + .await + .map_err(|e| FetchError::Io(e.to_string()))?; + + // Check body size after download (in case Content-Length was missing) + if body_bytes.len() as u64 > MAX_BODY_SIZE { + return Err(FetchError::BodyTooLarge(body_bytes.len() as u64)); + } + + process_response(final_url, status, headers, body_bytes, &content_type) +} diff --git a/src/fetcher/errors.rs b/src/fetcher/errors.rs new file mode 100644 index 0000000..e62bc84 --- /dev/null +++ b/src/fetcher/errors.rs @@ -0,0 +1,87 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum FetchError { + #[error("invalid url: {0}")] + InvalidUrl(#[from] url::ParseError), + + #[error("dns failure: {0}")] + Dns(String), + + #[error("tls error: {0}")] + Tls(String), + + #[error("connect timeout")] + ConnectTimeout, + + #[error("request timeout")] + RequestTimeout, + + #[error("too many redirects")] + RedirectLoop, + + #[error("http error {status}")] + Http { + status: reqwest::StatusCode, + retriable: bool, + }, + + #[error("body too large ({0} bytes)")] + BodyTooLarge(u64), + + #[error("unsupported content-type: {0}")] + UnsupportedContentType(String), + + #[error("charset error: {0}")] + Charset(String), + + #[error("io error: {0}")] + Io(String), + + #[error("unknown: {0}")] + Unknown(String), +} + +impl FetchError { + pub fn should_retry(&self) -> bool { + match self { + // Fatal errors - don't retry + Self::InvalidUrl(_) => false, + Self::BodyTooLarge(_) => false, + Self::UnsupportedContentType(_) => false, + Self::Charset(_) => false, + Self::Http { retriable, .. } => *retriable, + + // Temporary errors - retry + Self::Dns(_) => true, + Self::Tls(_) => true, + Self::ConnectTimeout => true, + Self::RequestTimeout => true, + Self::RedirectLoop => true, + Self::Io(_) => true, + Self::Unknown(_) => true, + } + } + + pub fn from_reqwest_error(err: reqwest::Error) -> Self { + if err.is_timeout() { + if err.is_connect() { + Self::ConnectTimeout + } else { + Self::RequestTimeout + } + } else if err.is_redirect() { + Self::RedirectLoop + } else if let Some(status) = err.status() { + Self::Http { + status, + retriable: status.is_server_error(), + } + } else if err.is_request() { + // DNS, connection errors + Self::Dns(err.to_string()) + } else { + Self::Unknown(err.to_string()) + } + } +} diff --git a/src/fetcher/mod.rs b/src/fetcher/mod.rs new file mode 100644 index 0000000..54961cd --- /dev/null +++ b/src/fetcher/mod.rs @@ -0,0 +1,8 @@ +pub mod client; +pub mod errors; +pub mod pipeline; +pub mod types; + +pub use client::{fetch, get_client}; +pub use errors::FetchError; +pub use types::{Charset, PageResponse}; diff --git a/src/fetcher/pipeline.rs b/src/fetcher/pipeline.rs new file mode 100644 index 0000000..fb35606 --- /dev/null +++ b/src/fetcher/pipeline.rs @@ -0,0 +1,152 @@ +use crate::fetcher::{errors::FetchError, types::{Charset, PageResponse}}; +use bytes::Bytes; +use chrono::Utc; +use encoding_rs::Encoding; +use regex::Regex; +use reqwest::{header::HeaderMap, StatusCode}; +use std::sync::LazyLock; +use url::Url; + +static CHARSET_REGEX: LazyLock = LazyLock::new(|| { + Regex::new(r#"(?i)charset\s*=\s*["']?([^"'\s;]+)"#).unwrap() +}); + +static META_CHARSET_REGEX: LazyLock = LazyLock::new(|| { + Regex::new(r#"(?i)]*?charset\s*=\s*["']?([^"'\s/>]+)"#).unwrap() +}); + +static META_HTTP_EQUIV_REGEX: LazyLock = LazyLock::new(|| { + Regex::new(r#"(?i)]*?http-equiv\s*=\s*["']?content-type["']?[^>]*?content\s*=\s*["']?[^"'>]*?charset\s*=\s*([^"'\s;/>]+)"#).unwrap() +}); + +pub fn process_response( + url_final: Url, + status: StatusCode, + headers: HeaderMap, + body_bytes: Bytes, + content_type: &str, +) -> Result { + let charset = detect_charset(content_type, &body_bytes)?; + let body_utf8 = decode_to_utf8(&body_bytes, &charset)?; + + Ok(PageResponse { + url_final, + status, + headers, + body_raw: body_bytes, + body_utf8, + charset, + fetched_at: Utc::now(), + }) +} + +fn detect_charset(content_type: &str, body_bytes: &[u8]) -> Result { + // 1. Check Content-Type header for charset + if let Some(captures) = CHARSET_REGEX.captures(content_type) { + if let Some(charset_str) = captures.get(1) { + let charset_name = charset_str.as_str().to_lowercase(); + if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { + return Ok(Charset::from_encoding(encoding)); + } + } + } + + // 2. Check for in first 4KB + let search_bytes = &body_bytes[..body_bytes.len().min(4096)]; + let search_str = String::from_utf8_lossy(search_bytes); + + // Look for + if let Some(captures) = META_CHARSET_REGEX.captures(&search_str) { + if let Some(charset_str) = captures.get(1) { + let charset_name = charset_str.as_str().to_lowercase(); + if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { + return Ok(Charset::from_encoding(encoding)); + } + } + } + + // Look for + if let Some(captures) = META_HTTP_EQUIV_REGEX.captures(&search_str) { + if let Some(charset_str) = captures.get(1) { + let charset_name = charset_str.as_str().to_lowercase(); + if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { + return Ok(Charset::from_encoding(encoding)); + } + } + } + + // 3. Use chardet for heuristic detection + let mut detector = chardetng::EncodingDetector::new(); + detector.feed(search_bytes, false); + let detected = detector.guess(None, true); + + Ok(Charset::from_encoding(detected)) +} + +fn decode_to_utf8(body_bytes: &[u8], charset: &Charset) -> Result { + let encoding = match charset { + Charset::Utf8 => encoding_rs::UTF_8, + Charset::Latin1 | Charset::Iso88591 => encoding_rs::WINDOWS_1252, + Charset::Windows1252 => encoding_rs::WINDOWS_1252, + Charset::ShiftJis => encoding_rs::SHIFT_JIS, + Charset::Gb2312 => encoding_rs::GBK, + Charset::Big5 => encoding_rs::BIG5, + Charset::Other(name) => { + Encoding::for_label(name.as_bytes()) + .unwrap_or(encoding_rs::UTF_8) + } + }; + + let (decoded, _encoding, had_errors) = encoding.decode(body_bytes); + + if had_errors { + return Err(FetchError::Charset(format!( + "Failed to decode content with encoding: {}", + encoding.name() + ))); + } + + Ok(decoded.into_owned()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_detect_charset_from_content_type() { + let content_type = "text/html; charset=utf-8"; + let body = b"Test"; + + let charset = detect_charset(content_type, body).unwrap(); + assert!(matches!(charset, Charset::Utf8)); + } + + #[test] + fn test_detect_charset_from_meta_tag() { + let content_type = "text/html"; + let body = b"Test"; + + let charset = detect_charset(content_type, body).unwrap(); + // ISO-8859-1 gets mapped to Windows1252 by encoding_rs since it's a superset + assert!(matches!(charset, Charset::Windows1252)); + } + + #[test] + fn test_detect_charset_from_meta_http_equiv() { + let content_type = "text/html"; + let body = b"Test"; + + let charset = detect_charset(content_type, body).unwrap(); + assert!(matches!(charset, Charset::Windows1252)); + } + + #[test] + fn test_decode_utf8() { + let body = "Hello, 世界!".as_bytes(); + let charset = Charset::Utf8; + + let decoded = decode_to_utf8(body, &charset).unwrap(); + assert_eq!(decoded, "Hello, 世界!"); + } +} diff --git a/src/fetcher/types.rs b/src/fetcher/types.rs new file mode 100644 index 0000000..6f62df9 --- /dev/null +++ b/src/fetcher/types.rs @@ -0,0 +1,50 @@ +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use reqwest::{header::HeaderMap, StatusCode}; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum Charset { + Utf8, + Latin1, + Windows1252, + Iso88591, + ShiftJis, + Gb2312, + Big5, + Other(String), +} + +impl Charset { + pub fn from_encoding(encoding: &encoding_rs::Encoding) -> Self { + use std::ptr; + + if ptr::eq(encoding, encoding_rs::UTF_8) { + Self::Utf8 + } else if ptr::eq(encoding, encoding_rs::WINDOWS_1252) { + Self::Windows1252 + } else if ptr::eq(encoding, encoding_rs::SHIFT_JIS) { + Self::ShiftJis + } else if ptr::eq(encoding, encoding_rs::GBK) || ptr::eq(encoding, encoding_rs::GB18030) { + Self::Gb2312 + } else if ptr::eq(encoding, encoding_rs::BIG5) { + Self::Big5 + } else { + // For other encodings, assume Latin1 for most cases or Other + // This is a simplified approach to avoid lifetime issues + Self::Other("unknown".to_string()) + } + } +} + +#[derive(Debug)] +pub struct PageResponse { + pub url_final: Url, + pub status: StatusCode, + pub headers: HeaderMap, + pub body_raw: Bytes, + pub body_utf8: String, + pub charset: Charset, + pub fetched_at: DateTime, +} diff --git a/src/jobs/handlers/fetch_page.rs b/src/jobs/handlers/fetch_page.rs new file mode 100644 index 0000000..fdb93f4 --- /dev/null +++ b/src/jobs/handlers/fetch_page.rs @@ -0,0 +1,118 @@ +use crate::{ + fetcher::fetch, + jobs::handler::JobHandler, +}; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use tracing::{instrument, Span, warn, info}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize)] +pub struct FetchPagePayload { + pub item_id: Uuid, +} + +#[derive(Clone)] +pub struct FetchPageJobHandler; + +#[async_trait] +impl JobHandler for FetchPageJobHandler { + #[instrument(skip(self, pool, span), fields(item_id))] + async fn run(&self, payload: serde_json::Value, pool: &PgPool, span: Span) -> anyhow::Result<()> { + let payload: FetchPagePayload = serde_json::from_value(payload)?; + + // Record item_id in the span + span.record("item_id", &tracing::field::display(payload.item_id)); + + // Get the item URL with a lock to prevent concurrent processing + let item_url: Option = sqlx::query_scalar!( + "SELECT url FROM items WHERE id = $1 FOR UPDATE", + payload.item_id + ) + .fetch_optional(pool) + .await?; + + let Some(url) = item_url else { + anyhow::bail!("Item {} not found", payload.item_id); + }; + + info!("Fetching content for item {} from URL: {}", payload.item_id, url); + + // Fetch the page content + match fetch(&url).await { + Ok(response) => { + info!( + "Successfully fetched content from {} (status: {}, charset: {:?}, size: {} bytes)", + response.url_final, + response.status, + response.charset, + response.body_utf8.len() + ); + + // Calculate a simple checksum of the content + let checksum = format!("{:x}", md5::compute(response.body_raw.as_ref())); + + // Insert the content + sqlx::query!( + r#" + INSERT INTO contents (item_id, html, text, lang, extracted_at, checksum) + VALUES ($1, $2, NULL, NULL, NOW(), $3) + ON CONFLICT (item_id) + DO UPDATE SET + html = EXCLUDED.html, + extracted_at = EXCLUDED.extracted_at, + checksum = EXCLUDED.checksum + "#, + payload.item_id, + response.body_utf8, + checksum + ) + .execute(pool) + .await?; + + // Update item status to fetched + sqlx::query!( + "UPDATE items SET status = 'fetched', updated_at = NOW() WHERE id = $1", + payload.item_id + ) + .execute(pool) + .await?; + + info!("Successfully stored content for item {}", payload.item_id); + Ok(()) + } + Err(fetch_error) => { + warn!("Failed to fetch content for item {}: {}", payload.item_id, fetch_error); + + if fetch_error.should_retry() { + // Return error to trigger retry by job runner + anyhow::bail!("Retryable fetch error: {}", fetch_error); + } else { + // Mark as permanent failure - don't retry + warn!("Permanent failure for item {}: {}", payload.item_id, fetch_error); + + // Could optionally update item status to indicate permanent failure + // For now, just let the job be marked as failed + anyhow::bail!("Permanent fetch error: {}", fetch_error); + } + } + } + } + + fn kind(&self) -> &'static str { + "fetch_page" + } +} + +impl FetchPageJobHandler { + pub fn new() -> Self { + Self + } +} + +impl Default for FetchPageJobHandler { + fn default() -> Self { + Self::new() + } +} diff --git a/src/jobs/handlers/mod.rs b/src/jobs/handlers/mod.rs index 312528f..07ad6ca 100644 --- a/src/jobs/handlers/mod.rs +++ b/src/jobs/handlers/mod.rs @@ -1,3 +1,5 @@ pub mod example; +pub mod fetch_page; pub use example::*; +pub use fetch_page::*; diff --git a/src/lib.rs b/src/lib.rs index 4f37e05..674141d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod app_state; pub mod auth; pub mod config; pub mod entities; +pub mod fetcher; pub mod health; pub mod items; pub mod jobs; diff --git a/tests/fetcher_client.rs b/tests/fetcher_client.rs new file mode 100644 index 0000000..0212bd5 --- /dev/null +++ b/tests/fetcher_client.rs @@ -0,0 +1,216 @@ + +use capsule::fetcher::{fetch, FetchError}; +use wiremock::{ + matchers::{method, path}, + Mock, MockServer, ResponseTemplate, +}; + +#[tokio::test] +async fn test_fetch_success() { + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/test")) + .respond_with( + ResponseTemplate::new(200) + .set_body_bytes("TestHello World".as_bytes()) + .insert_header("Content-Type", "text/html; charset=utf-8"), + ) + .mount(&mock_server) + .await; + + let url = format!("{}/test", mock_server.uri()); + let result = fetch(&url).await.unwrap(); + + assert!(result.status.is_success()); + assert!(result.body_utf8.contains("Hello World")); + assert_eq!(result.url_final.as_str(), url); +} + +#[tokio::test] +async fn test_fetch_404() { + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/notfound")) + .respond_with(ResponseTemplate::new(404)) + .mount(&mock_server) + .await; + + let url = format!("{}/notfound", mock_server.uri()); + let result = fetch(&url).await; + + match result { + Err(FetchError::Http { status, retriable }) => { + assert_eq!(status.as_u16(), 404); + assert!(!retriable); + } + _ => panic!("Expected HTTP 404 error"), + } +} + +#[tokio::test] +async fn test_fetch_500_retryable() { + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/error")) + .respond_with(ResponseTemplate::new(500)) + .mount(&mock_server) + .await; + + let url = format!("{}/error", mock_server.uri()); + let result = fetch(&url).await; + + match result { + Err(FetchError::Http { status, retriable }) => { + assert_eq!(status.as_u16(), 500); + assert!(retriable); + } + _ => panic!("Expected HTTP 500 error"), + } +} + +#[tokio::test] +async fn test_fetch_redirect() { + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/redirect")) + .respond_with( + ResponseTemplate::new(302) + .insert_header("location", "/final") + ) + .mount(&mock_server) + .await; + + Mock::given(method("GET")) + .and(path("/final")) + .respond_with( + ResponseTemplate::new(200) + .set_body_bytes("Final page".as_bytes()) + .insert_header("Content-Type", "text/html") + ) + .mount(&mock_server) + .await; + + let url = format!("{}/redirect", mock_server.uri()); + let result = fetch(&url).await.unwrap(); + + assert!(result.status.is_success()); + assert!(result.body_utf8.contains("Final page")); + assert!(result.url_final.as_str().ends_with("/final")); +} + +#[tokio::test] +async fn test_fetch_gzip_compression() { + use std::io::Write; + use flate2::write::GzEncoder; + use flate2::Compression; + + let original_content = "CompressedThis content is gzipped!"; + + // Gzip the content + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(original_content.as_bytes()).unwrap(); + let compressed_data = encoder.finish().unwrap(); + + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/gzipped")) + .respond_with( + ResponseTemplate::new(200) + .set_body_bytes(compressed_data) + .insert_header("Content-Type", "text/html; charset=utf-8") + .insert_header("Content-Encoding", "gzip"), + ) + .mount(&mock_server) + .await; + + let url = format!("{}/gzipped", mock_server.uri()); + let result = fetch(&url).await.unwrap(); + + assert!(result.status.is_success()); + assert!(result.body_utf8.contains("This content is gzipped!")); +} + +#[tokio::test] +async fn test_fetch_unsupported_content_type() { + let mock_server = MockServer::start().await; + + Mock::given(method("GET")) + .and(path("/image")) + .respond_with( + ResponseTemplate::new(200) + .set_body_bytes(vec![0xFF, 0xD8, 0xFF]) // JPEG header + .insert_header("Content-Type", "image/jpeg"), + ) + .mount(&mock_server) + .await; + + let url = format!("{}/image", mock_server.uri()); + let result = fetch(&url).await; + + match result { + Err(FetchError::UnsupportedContentType(content_type)) => { + assert_eq!(content_type, "image/jpeg"); + } + _ => panic!("Expected UnsupportedContentType error"), + } +} + +#[tokio::test] +async fn test_fetch_body_too_large() { + let mock_server = MockServer::start().await; + + // Create a large body (6MB > 5MB limit) + let large_body = "x".repeat(6 * 1024 * 1024); + + Mock::given(method("GET")) + .and(path("/large")) + .respond_with( + ResponseTemplate::new(200) + .set_body_bytes(large_body.as_bytes()) + .insert_header("Content-Type", "text/html") + .insert_header("Content-Length", &(6 * 1024 * 1024).to_string()), + ) + .mount(&mock_server) + .await; + + let url = format!("{}/large", mock_server.uri()); + let result = fetch(&url).await; + + match result { + Err(FetchError::BodyTooLarge(size)) => { + assert_eq!(size, 6 * 1024 * 1024); + } + _ => panic!("Expected BodyTooLarge error"), + } +} + +#[tokio::test] +async fn test_fetch_invalid_url() { + let result = fetch("not-a-valid-url").await; + + match result { + Err(FetchError::InvalidUrl(_)) => {} + _ => panic!("Expected InvalidUrl error"), + } +} + +#[tokio::test] +async fn test_error_retry_classification() { + assert!(!FetchError::InvalidUrl(url::ParseError::EmptyHost).should_retry()); + assert!(!FetchError::BodyTooLarge(1000).should_retry()); + assert!(!FetchError::UnsupportedContentType("image/png".to_string()).should_retry()); + assert!(!FetchError::Charset("Invalid encoding".to_string()).should_retry()); + + assert!(FetchError::Dns("DNS failure".to_string()).should_retry()); + assert!(FetchError::ConnectTimeout.should_retry()); + assert!(FetchError::RequestTimeout.should_retry()); + + // HTTP errors + assert!(!FetchError::Http { status: reqwest::StatusCode::NOT_FOUND, retriable: false }.should_retry()); + assert!(FetchError::Http { status: reqwest::StatusCode::INTERNAL_SERVER_ERROR, retriable: true }.should_retry()); +} From 98d1779101bba58acd16ac463f3d0ae7c68dd2d4 Mon Sep 17 00:00:00 2001 From: charlieroth Date: Tue, 26 Aug 2025 15:56:43 +0200 Subject: [PATCH 2/4] chore: formatting --- src/auth/middleware.rs | 12 ++++++--- src/fetcher/client.rs | 2 +- src/fetcher/pipeline.rs | 32 +++++++++++------------ src/fetcher/types.rs | 4 +-- src/jobs/handlers/fetch_page.rs | 35 ++++++++++++++++--------- tests/fetcher_client.rs | 46 +++++++++++++++++++++------------ 6 files changed, 78 insertions(+), 53 deletions(-) diff --git a/src/auth/middleware.rs b/src/auth/middleware.rs index de6d968..f77f079 100644 --- a/src/auth/middleware.rs +++ b/src/auth/middleware.rs @@ -239,8 +239,10 @@ mod tests { #[tokio::test] async fn test_valid_jwt_token_success() { // Set test environment variable to ensure consistent config - unsafe { std::env::set_var("JWT_SECRET", "test-secret-key"); } - + unsafe { + std::env::set_var("JWT_SECRET", "test-secret-key"); + } + let app = create_test_app(); let user_id = Uuid::new_v4(); let token = create_jwt_token(user_id); @@ -259,8 +261,10 @@ mod tests { #[tokio::test] async fn test_extractor_returns_correct_user_id() { // Set test environment variable to ensure consistent config - unsafe { std::env::set_var("JWT_SECRET", "test-secret-key"); } - + unsafe { + std::env::set_var("JWT_SECRET", "test-secret-key"); + } + let app = create_test_app(); let user_id = Uuid::new_v4(); let token = create_jwt_token(user_id); diff --git a/src/fetcher/client.rs b/src/fetcher/client.rs index 64e8cde..da8cf1f 100644 --- a/src/fetcher/client.rs +++ b/src/fetcher/client.rs @@ -34,7 +34,7 @@ pub fn get_client() -> &'static Client { #[instrument(skip_all, fields(url = %url))] pub async fn fetch(url: &str) -> Result { let parsed_url = url::Url::parse(url)?; - + let response = HTTP_CLIENT .get(parsed_url.clone()) .send() diff --git a/src/fetcher/pipeline.rs b/src/fetcher/pipeline.rs index fb35606..d89f67b 100644 --- a/src/fetcher/pipeline.rs +++ b/src/fetcher/pipeline.rs @@ -1,19 +1,20 @@ -use crate::fetcher::{errors::FetchError, types::{Charset, PageResponse}}; +use crate::fetcher::{ + errors::FetchError, + types::{Charset, PageResponse}, +}; use bytes::Bytes; use chrono::Utc; use encoding_rs::Encoding; use regex::Regex; -use reqwest::{header::HeaderMap, StatusCode}; +use reqwest::{StatusCode, header::HeaderMap}; use std::sync::LazyLock; use url::Url; -static CHARSET_REGEX: LazyLock = LazyLock::new(|| { - Regex::new(r#"(?i)charset\s*=\s*["']?([^"'\s;]+)"#).unwrap() -}); +static CHARSET_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r#"(?i)charset\s*=\s*["']?([^"'\s;]+)"#).unwrap()); -static META_CHARSET_REGEX: LazyLock = LazyLock::new(|| { - Regex::new(r#"(?i)]*?charset\s*=\s*["']?([^"'\s/>]+)"#).unwrap() -}); +static META_CHARSET_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r#"(?i)]*?charset\s*=\s*["']?([^"'\s/>]+)"#).unwrap()); static META_HTTP_EQUIV_REGEX: LazyLock = LazyLock::new(|| { Regex::new(r#"(?i)]*?http-equiv\s*=\s*["']?content-type["']?[^>]*?content\s*=\s*["']?[^"'>]*?charset\s*=\s*([^"'\s;/>]+)"#).unwrap() @@ -91,14 +92,11 @@ fn decode_to_utf8(body_bytes: &[u8], charset: &Charset) -> Result encoding_rs::SHIFT_JIS, Charset::Gb2312 => encoding_rs::GBK, Charset::Big5 => encoding_rs::BIG5, - Charset::Other(name) => { - Encoding::for_label(name.as_bytes()) - .unwrap_or(encoding_rs::UTF_8) - } + Charset::Other(name) => Encoding::for_label(name.as_bytes()).unwrap_or(encoding_rs::UTF_8), }; let (decoded, _encoding, had_errors) = encoding.decode(body_bytes); - + if had_errors { return Err(FetchError::Charset(format!( "Failed to decode content with encoding: {}", @@ -117,7 +115,7 @@ mod tests { fn test_detect_charset_from_content_type() { let content_type = "text/html; charset=utf-8"; let body = b"Test"; - + let charset = detect_charset(content_type, body).unwrap(); assert!(matches!(charset, Charset::Utf8)); } @@ -126,7 +124,7 @@ mod tests { fn test_detect_charset_from_meta_tag() { let content_type = "text/html"; let body = b"Test"; - + let charset = detect_charset(content_type, body).unwrap(); // ISO-8859-1 gets mapped to Windows1252 by encoding_rs since it's a superset assert!(matches!(charset, Charset::Windows1252)); @@ -136,7 +134,7 @@ mod tests { fn test_detect_charset_from_meta_http_equiv() { let content_type = "text/html"; let body = b"Test"; - + let charset = detect_charset(content_type, body).unwrap(); assert!(matches!(charset, Charset::Windows1252)); } @@ -145,7 +143,7 @@ mod tests { fn test_decode_utf8() { let body = "Hello, 世界!".as_bytes(); let charset = Charset::Utf8; - + let decoded = decode_to_utf8(body, &charset).unwrap(); assert_eq!(decoded, "Hello, 世界!"); } diff --git a/src/fetcher/types.rs b/src/fetcher/types.rs index 6f62df9..71e29f9 100644 --- a/src/fetcher/types.rs +++ b/src/fetcher/types.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; -use reqwest::{header::HeaderMap, StatusCode}; +use reqwest::{StatusCode, header::HeaderMap}; use serde::{Deserialize, Serialize}; use url::Url; @@ -19,7 +19,7 @@ pub enum Charset { impl Charset { pub fn from_encoding(encoding: &encoding_rs::Encoding) -> Self { use std::ptr; - + if ptr::eq(encoding, encoding_rs::UTF_8) { Self::Utf8 } else if ptr::eq(encoding, encoding_rs::WINDOWS_1252) { diff --git a/src/jobs/handlers/fetch_page.rs b/src/jobs/handlers/fetch_page.rs index fdb93f4..3dfe15e 100644 --- a/src/jobs/handlers/fetch_page.rs +++ b/src/jobs/handlers/fetch_page.rs @@ -1,11 +1,8 @@ -use crate::{ - fetcher::fetch, - jobs::handler::JobHandler, -}; +use crate::{fetcher::fetch, jobs::handler::JobHandler}; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use sqlx::PgPool; -use tracing::{instrument, Span, warn, info}; +use tracing::{Span, info, instrument, warn}; use uuid::Uuid; #[derive(Debug, Serialize, Deserialize)] @@ -19,9 +16,14 @@ pub struct FetchPageJobHandler; #[async_trait] impl JobHandler for FetchPageJobHandler { #[instrument(skip(self, pool, span), fields(item_id))] - async fn run(&self, payload: serde_json::Value, pool: &PgPool, span: Span) -> anyhow::Result<()> { + async fn run( + &self, + payload: serde_json::Value, + pool: &PgPool, + span: Span, + ) -> anyhow::Result<()> { let payload: FetchPagePayload = serde_json::from_value(payload)?; - + // Record item_id in the span span.record("item_id", &tracing::field::display(payload.item_id)); @@ -37,7 +39,10 @@ impl JobHandler for FetchPageJobHandler { anyhow::bail!("Item {} not found", payload.item_id); }; - info!("Fetching content for item {} from URL: {}", payload.item_id, url); + info!( + "Fetching content for item {} from URL: {}", + payload.item_id, url + ); // Fetch the page content match fetch(&url).await { @@ -83,15 +88,21 @@ impl JobHandler for FetchPageJobHandler { Ok(()) } Err(fetch_error) => { - warn!("Failed to fetch content for item {}: {}", payload.item_id, fetch_error); - + warn!( + "Failed to fetch content for item {}: {}", + payload.item_id, fetch_error + ); + if fetch_error.should_retry() { // Return error to trigger retry by job runner anyhow::bail!("Retryable fetch error: {}", fetch_error); } else { // Mark as permanent failure - don't retry - warn!("Permanent failure for item {}: {}", payload.item_id, fetch_error); - + warn!( + "Permanent failure for item {}: {}", + payload.item_id, fetch_error + ); + // Could optionally update item status to indicate permanent failure // For now, just let the job be marked as failed anyhow::bail!("Permanent fetch error: {}", fetch_error); diff --git a/tests/fetcher_client.rs b/tests/fetcher_client.rs index 0212bd5..d5278bc 100644 --- a/tests/fetcher_client.rs +++ b/tests/fetcher_client.rs @@ -1,8 +1,7 @@ - -use capsule::fetcher::{fetch, FetchError}; +use capsule::fetcher::{FetchError, fetch}; use wiremock::{ - matchers::{method, path}, Mock, MockServer, ResponseTemplate, + matchers::{method, path}, }; #[tokio::test] @@ -13,7 +12,10 @@ async fn test_fetch_success() { .and(path("/test")) .respond_with( ResponseTemplate::new(200) - .set_body_bytes("TestHello World".as_bytes()) + .set_body_bytes( + "TestHello World" + .as_bytes(), + ) .insert_header("Content-Type", "text/html; charset=utf-8"), ) .mount(&mock_server) @@ -77,10 +79,7 @@ async fn test_fetch_redirect() { Mock::given(method("GET")) .and(path("/redirect")) - .respond_with( - ResponseTemplate::new(302) - .insert_header("location", "/final") - ) + .respond_with(ResponseTemplate::new(302).insert_header("location", "/final")) .mount(&mock_server) .await; @@ -89,7 +88,7 @@ async fn test_fetch_redirect() { .respond_with( ResponseTemplate::new(200) .set_body_bytes("Final page".as_bytes()) - .insert_header("Content-Type", "text/html") + .insert_header("Content-Type", "text/html"), ) .mount(&mock_server) .await; @@ -104,12 +103,13 @@ async fn test_fetch_redirect() { #[tokio::test] async fn test_fetch_gzip_compression() { - use std::io::Write; - use flate2::write::GzEncoder; use flate2::Compression; + use flate2::write::GzEncoder; + use std::io::Write; + + let original_content = + "CompressedThis content is gzipped!"; - let original_content = "CompressedThis content is gzipped!"; - // Gzip the content let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); encoder.write_all(original_content.as_bytes()).unwrap(); @@ -205,12 +205,24 @@ async fn test_error_retry_classification() { assert!(!FetchError::BodyTooLarge(1000).should_retry()); assert!(!FetchError::UnsupportedContentType("image/png".to_string()).should_retry()); assert!(!FetchError::Charset("Invalid encoding".to_string()).should_retry()); - + assert!(FetchError::Dns("DNS failure".to_string()).should_retry()); assert!(FetchError::ConnectTimeout.should_retry()); assert!(FetchError::RequestTimeout.should_retry()); - + // HTTP errors - assert!(!FetchError::Http { status: reqwest::StatusCode::NOT_FOUND, retriable: false }.should_retry()); - assert!(FetchError::Http { status: reqwest::StatusCode::INTERNAL_SERVER_ERROR, retriable: true }.should_retry()); + assert!( + !FetchError::Http { + status: reqwest::StatusCode::NOT_FOUND, + retriable: false + } + .should_retry() + ); + assert!( + FetchError::Http { + status: reqwest::StatusCode::INTERNAL_SERVER_ERROR, + retriable: true + } + .should_retry() + ); } From 02df0d855aa184954b2a7be76aced942ef302633 Mon Sep 17 00:00:00 2001 From: charlieroth Date: Wed, 27 Aug 2025 07:40:59 +0200 Subject: [PATCH 3/4] chore: address clippy diagnostics --- src/fetcher/client.rs | 5 ++--- src/fetcher/pipeline.rs | 15 ++++++--------- src/jobs/handlers/fetch_page.rs | 2 +- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/fetcher/client.rs b/src/fetcher/client.rs index da8cf1f..52de36c 100644 --- a/src/fetcher/client.rs +++ b/src/fetcher/client.rs @@ -42,11 +42,10 @@ pub async fn fetch(url: &str) -> Result { .map_err(FetchError::from_reqwest_error)?; // Check content length before downloading - if let Some(content_length) = response.content_length() { - if content_length > MAX_BODY_SIZE { + if let Some(content_length) = response.content_length() + && content_length > MAX_BODY_SIZE { return Err(FetchError::BodyTooLarge(content_length)); } - } let final_url = response.url().clone(); let status = response.status(); diff --git a/src/fetcher/pipeline.rs b/src/fetcher/pipeline.rs index d89f67b..90eb9c5 100644 --- a/src/fetcher/pipeline.rs +++ b/src/fetcher/pipeline.rs @@ -43,38 +43,35 @@ pub fn process_response( fn detect_charset(content_type: &str, body_bytes: &[u8]) -> Result { // 1. Check Content-Type header for charset - if let Some(captures) = CHARSET_REGEX.captures(content_type) { - if let Some(charset_str) = captures.get(1) { + if let Some(captures) = CHARSET_REGEX.captures(content_type) + && let Some(charset_str) = captures.get(1) { let charset_name = charset_str.as_str().to_lowercase(); if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { return Ok(Charset::from_encoding(encoding)); } } - } // 2. Check for in first 4KB let search_bytes = &body_bytes[..body_bytes.len().min(4096)]; let search_str = String::from_utf8_lossy(search_bytes); // Look for - if let Some(captures) = META_CHARSET_REGEX.captures(&search_str) { - if let Some(charset_str) = captures.get(1) { + if let Some(captures) = META_CHARSET_REGEX.captures(&search_str) + && let Some(charset_str) = captures.get(1) { let charset_name = charset_str.as_str().to_lowercase(); if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { return Ok(Charset::from_encoding(encoding)); } } - } // Look for - if let Some(captures) = META_HTTP_EQUIV_REGEX.captures(&search_str) { - if let Some(charset_str) = captures.get(1) { + if let Some(captures) = META_HTTP_EQUIV_REGEX.captures(&search_str) + && let Some(charset_str) = captures.get(1) { let charset_name = charset_str.as_str().to_lowercase(); if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { return Ok(Charset::from_encoding(encoding)); } } - } // 3. Use chardet for heuristic detection let mut detector = chardetng::EncodingDetector::new(); diff --git a/src/jobs/handlers/fetch_page.rs b/src/jobs/handlers/fetch_page.rs index 3dfe15e..09ffd2d 100644 --- a/src/jobs/handlers/fetch_page.rs +++ b/src/jobs/handlers/fetch_page.rs @@ -25,7 +25,7 @@ impl JobHandler for FetchPageJobHandler { let payload: FetchPagePayload = serde_json::from_value(payload)?; // Record item_id in the span - span.record("item_id", &tracing::field::display(payload.item_id)); + span.record("item_id", tracing::field::display(payload.item_id)); // Get the item URL with a lock to prevent concurrent processing let item_url: Option = sqlx::query_scalar!( From de3c66d57984914c0a8ae5b848a2f7d762420c5c Mon Sep 17 00:00:00 2001 From: charlieroth Date: Wed, 27 Aug 2025 07:42:49 +0200 Subject: [PATCH 4/4] chore: formatting --- src/fetcher/client.rs | 7 ++++--- src/fetcher/pipeline.rs | 33 ++++++++++++++++++--------------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/src/fetcher/client.rs b/src/fetcher/client.rs index 52de36c..7ba9c3c 100644 --- a/src/fetcher/client.rs +++ b/src/fetcher/client.rs @@ -43,9 +43,10 @@ pub async fn fetch(url: &str) -> Result { // Check content length before downloading if let Some(content_length) = response.content_length() - && content_length > MAX_BODY_SIZE { - return Err(FetchError::BodyTooLarge(content_length)); - } + && content_length > MAX_BODY_SIZE + { + return Err(FetchError::BodyTooLarge(content_length)); + } let final_url = response.url().clone(); let status = response.status(); diff --git a/src/fetcher/pipeline.rs b/src/fetcher/pipeline.rs index 90eb9c5..5cf0e72 100644 --- a/src/fetcher/pipeline.rs +++ b/src/fetcher/pipeline.rs @@ -44,12 +44,13 @@ pub fn process_response( fn detect_charset(content_type: &str, body_bytes: &[u8]) -> Result { // 1. Check Content-Type header for charset if let Some(captures) = CHARSET_REGEX.captures(content_type) - && let Some(charset_str) = captures.get(1) { - let charset_name = charset_str.as_str().to_lowercase(); - if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { - return Ok(Charset::from_encoding(encoding)); - } + && let Some(charset_str) = captures.get(1) + { + let charset_name = charset_str.as_str().to_lowercase(); + if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { + return Ok(Charset::from_encoding(encoding)); } + } // 2. Check for in first 4KB let search_bytes = &body_bytes[..body_bytes.len().min(4096)]; @@ -57,21 +58,23 @@ fn detect_charset(content_type: &str, body_bytes: &[u8]) -> Result if let Some(captures) = META_CHARSET_REGEX.captures(&search_str) - && let Some(charset_str) = captures.get(1) { - let charset_name = charset_str.as_str().to_lowercase(); - if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { - return Ok(Charset::from_encoding(encoding)); - } + && let Some(charset_str) = captures.get(1) + { + let charset_name = charset_str.as_str().to_lowercase(); + if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { + return Ok(Charset::from_encoding(encoding)); } + } // Look for if let Some(captures) = META_HTTP_EQUIV_REGEX.captures(&search_str) - && let Some(charset_str) = captures.get(1) { - let charset_name = charset_str.as_str().to_lowercase(); - if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { - return Ok(Charset::from_encoding(encoding)); - } + && let Some(charset_str) = captures.get(1) + { + let charset_name = charset_str.as_str().to_lowercase(); + if let Some(encoding) = Encoding::for_label(charset_name.as_bytes()) { + return Ok(Charset::from_encoding(encoding)); } + } // 3. Use chardet for heuristic detection let mut detector = chardetng::EncodingDetector::new();