diff --git a/backend/app/cache/__init__.py b/backend/app/cache/__init__.py new file mode 100644 index 00000000..106b44ad --- /dev/null +++ b/backend/app/cache/__init__.py @@ -0,0 +1,24 @@ +""" +Cache module for Perspective API. + +Provides in-memory caching for API responses to reduce redundant +LLM API calls and improve response times. + +Usage: + from app.cache import cache + + # Check for cached response + cached = cache.get("process", url) + if cached: + return cached + + # Store response in cache + cache.set("process", url, result) +""" + +from app.cache.cache import get_cache, URLCache + +# Export singleton cache instance +cache = get_cache() + +__all__ = ["cache", "get_cache", "URLCache"] diff --git a/backend/app/cache/cache.py b/backend/app/cache/cache.py new file mode 100644 index 00000000..688dffde --- /dev/null +++ b/backend/app/cache/cache.py @@ -0,0 +1,227 @@ +""" +cache.py +-------- +In-memory caching module for Perspective API responses. + +This module provides a thread-safe, TTL-based cache for storing +processed article results, reducing redundant API calls to Groq +and improving response times for repeated URL requests. + +Features: + - TTL-based expiration (configurable via CACHE_TTL_SECONDS) + - Thread-safe operations using locks + - LRU-style eviction when max size is reached + - Configurable via environment variables + +Environment Variables: + CACHE_ENABLED (str): "true" or "false" to enable/disable caching + CACHE_TTL_SECONDS (int): Time-to-live in seconds (default: 86400 = 24h) + CACHE_MAX_SIZE (int): Maximum number of cache entries (default: 1000) + +Usage: + from app.cache import cache + + # Check cache + cached = cache.get("process", "https://example.com/article") + if cached: + return cached + + # Store in cache + cache.set("process", "https://example.com/article", result) +""" + +import hashlib +import os +import threading +import copy +from datetime import datetime, timedelta +from typing import Any, Optional +from collections import OrderedDict +from dotenv import load_dotenv +from app.logging.logging_config import setup_logger + +load_dotenv() +logger = setup_logger(__name__) + + +class CacheEntry: + """Represents a single cache entry with value and expiration time.""" + + def __init__(self, value: Any, ttl_seconds: int): + self.value = value + self.cached_at = datetime.utcnow() + self.expires_at = self.cached_at + timedelta(seconds=ttl_seconds) + + def is_expired(self) -> bool: + """Check if this cache entry has expired.""" + return datetime.utcnow() > self.expires_at + + def to_metadata(self) -> dict: + """Return cache metadata for response.""" + return { + "hit": True, + "cached_at": self.cached_at.isoformat() + "Z", + "expires_at": self.expires_at.isoformat() + "Z", + } + + +class URLCache: + """ + Thread-safe in-memory cache with TTL expiration. + + Uses an OrderedDict for LRU-style eviction when max size is reached. + """ + + def __init__(self): + self._cache: OrderedDict[str, CacheEntry] = OrderedDict() + self._lock = threading.Lock() + + # Load configuration from environment + self._enabled = os.getenv("CACHE_ENABLED", "true").lower() == "true" + self._ttl_seconds = int(os.getenv("CACHE_TTL_SECONDS", "86400")) + self._max_size = int(os.getenv("CACHE_MAX_SIZE", "1000")) + + logger.info( + f"Cache initialized: enabled={self._enabled}, " + f"ttl={self._ttl_seconds}s, max_size={self._max_size}" + ) + + @property + def enabled(self) -> bool: + """Check if caching is enabled.""" + return self._enabled + + def _generate_key(self, endpoint: str, url: str) -> str: + """Generate a unique cache key from endpoint and URL.""" + normalized_url = url.strip().lower() + url_hash = hashlib.sha256(normalized_url.encode()).hexdigest()[:16] + return f"{endpoint}:{url_hash}" + + def get(self, endpoint: str, url: str) -> Optional[dict]: + """ + Retrieve a cached response if it exists and hasn't expired. + + Args: + endpoint: The API endpoint type ("process" or "bias") + url: The article URL + + Returns: + Cached response dict with _cache metadata, or None if not found/expired + """ + if not self._enabled: + return None + + key = self._generate_key(endpoint, url) + + with self._lock: + entry = self._cache.get(key) + + if entry is None: + logger.debug(f"Cache miss for {endpoint}: {url}") + return None + + if entry.is_expired(): + logger.debug(f"Cache expired for {endpoint}: {url}") + del self._cache[key] + return None + + # Move to end for LRU behavior + self._cache.move_to_end(key) + + logger.info(f"Cache hit for {endpoint}: {url}") + + # Return deep copy with cache metadata to prevent mutation + result = copy.deepcopy(entry.value) if isinstance(entry.value, (dict, list)) else entry.value + if isinstance(result, dict): + result["_cache"] = entry.to_metadata() + + return result + + def set(self, endpoint: str, url: str, value: Any) -> None: + """ + Store a response in the cache. + + Args: + endpoint: The API endpoint type ("process" or "bias") + url: The article URL + value: The response to cache + """ + if not self._enabled: + return + + key = self._generate_key(endpoint, url) + + with self._lock: + # Evict oldest entries if at max size + while len(self._cache) >= self._max_size: + evicted_key, _ = self._cache.popitem(last=False) + logger.debug(f"Evicted cache entry: {evicted_key}") + + safe_value = copy.deepcopy(value) if isinstance(value, (dict, list)) else value + self._cache[key] = CacheEntry(safe_value, self._ttl_seconds) + logger.info(f"Cached response for {endpoint}: {url}") + + def delete(self, endpoint: str, url: str) -> bool: + """ + Remove a specific entry from the cache. + + Args: + endpoint: The API endpoint type + url: The article URL + + Returns: + True if entry was deleted, False if not found + """ + key = self._generate_key(endpoint, url) + + with self._lock: + if key in self._cache: + del self._cache[key] + logger.info(f"Deleted cache entry for {endpoint}: {url}") + return True + return False + + def clear(self) -> int: + """ + Clear all cache entries. + + Returns: + Number of entries cleared + """ + with self._lock: + count = len(self._cache) + self._cache.clear() + logger.info(f"Cleared {count} cache entries") + return count + + def stats(self) -> dict: + """ + Get cache statistics. + + Returns: + Dict with cache stats + """ + with self._lock: + # Count expired entries + expired = sum(1 for e in self._cache.values() if e.is_expired()) + + return { + "enabled": self._enabled, + "total_entries": len(self._cache), + "expired_entries": expired, + "active_entries": len(self._cache) - expired, + "max_size": self._max_size, + "ttl_seconds": self._ttl_seconds, + } + + +# Singleton instance +_cache_instance: Optional[URLCache] = None + + +def get_cache() -> URLCache: + """Get or create the singleton cache instance.""" + global _cache_instance + if _cache_instance is None: + _cache_instance = URLCache() + return _cache_instance diff --git a/backend/app/modules/bias_detection/check_bias.py b/backend/app/modules/bias_detection/check_bias.py index a0644529..dee0faa1 100644 --- a/backend/app/modules/bias_detection/check_bias.py +++ b/backend/app/modules/bias_detection/check_bias.py @@ -61,7 +61,7 @@ def check_bias(text): "content": (f"Give bias score to the following article \n\n{text}"), }, ], - model="gemma2-9b-it", + model="llama-3.3-70b-versatile", temperature=0.3, max_tokens=512, ) diff --git a/backend/app/modules/chat/llm_processing.py b/backend/app/modules/chat/llm_processing.py index 2d5134fa..70e8ace1 100644 --- a/backend/app/modules/chat/llm_processing.py +++ b/backend/app/modules/chat/llm_processing.py @@ -55,7 +55,7 @@ def ask_llm(question, docs): """ response = client.chat.completions.create( - model="gemma2-9b-it", + model="llama-3.3-70b-versatile", messages=[ {"role": "system", "content": "Use only the context to answer."}, {"role": "user", "content": prompt}, diff --git a/backend/app/modules/fact_check_tool.py b/backend/app/modules/fact_check_tool.py new file mode 100644 index 00000000..d06481f4 --- /dev/null +++ b/backend/app/modules/fact_check_tool.py @@ -0,0 +1,208 @@ +""" +fact_check_tool.py +------------------ +Fact checking tool node implementation using DuckDuckGo search. + +This module replaces the Google Custom Search with a native DuckDuckGo +based fact checking pipeline that doesn't require any API keys. + +Functions: + extract_claims_node: Extracts verifiable claims from text using LLM + plan_searches_node: Generates search queries for each claim + execute_searches_node: Runs DuckDuckGo searches in parallel + verify_facts_node: Verifies claims against search results using LLM +""" + +import os +import json +import asyncio +from groq import Groq +from langchain_community.tools import DuckDuckGoSearchRun +from app.logging.logging_config import setup_logger +from dotenv import load_dotenv + +load_dotenv() + +client = Groq(api_key=os.getenv("GROQ_API_KEY")) +search_tool = DuckDuckGoSearchRun() + +# Use the same model as other modules +LLM_MODEL = "llama-3.3-70b-versatile" + +# Timeout for DuckDuckGo searches (seconds) +# The underlying duckduckgo_search library has a default timeout (~10s), +# but we add an asyncio-level timeout as a safety net for hung searches. +SEARCH_TIMEOUT_SECONDS = 15 + +logger = setup_logger(__name__) + + +async def extract_claims_node(state): + """Extract verifiable factual claims from the text.""" + logger.info("--- Fact Check Step 1: Extracting Claims ---") + try: + text = state.get("cleaned_text", "") + + response = await asyncio.to_thread( + client.chat.completions.create, + messages=[ + { + "role": "system", + "content": "Extract 2-3 key factual claims. One per line. Be brief." + }, + {"role": "user", "content": text[:2000]} + ], + model=LLM_MODEL, + temperature=0.0, + max_tokens=200 + ) + + raw_content = response.choices[0].message.content + + claims = [ + line.strip("- *") + for line in raw_content.split("\n") + if len(line.strip()) > 10 + ] + + logger.info(f"Extracted {len(claims)} claims.") + return {"claims": claims} + + except Exception as e: + logger.error(f"Error extracting claims: {e}") + return {"claims": []} + + +async def plan_searches_node(state): + """Generate search queries for each claim.""" + logger.info("--- Fact Check Step 2: Planning Searches ---") + claims = state.get("claims", []) + + if not claims: + return {"search_queries": []} + + claims_text = "\n".join([f"{i}. {c}" for i, c in enumerate(claims)]) + + prompt = f"""Generate search queries for these claims. Return JSON: {{"searches": [{{"query": "...", "claim_id": 0}}]}} + +Claims: +{claims_text}""" + + try: + response = await asyncio.to_thread( + client.chat.completions.create, + messages=[{"role": "user", "content": prompt}], + model=LLM_MODEL, + temperature=0.0, + max_tokens=150, + response_format={"type": "json_object"} + ) + + plan_json = json.loads(response.choices[0].message.content) + queries = plan_json.get("searches", []) + + return {"search_queries": queries} + + except Exception as e: + logger.error(f"Failed to plan searches: {e}") + return {"search_queries": []} + + +async def execute_searches_node(state): + """Execute DuckDuckGo searches in parallel.""" + logger.info("--- Fact Check Step 3: Executing Parallel Searches ---") + queries = state.get("search_queries", []) + + if not queries: + return {"search_results": []} + + async def run_one_search(q): + query_str = q.get("query") + c_id = q.get("claim_id") + + # Guard against malformed LLM responses missing the query key + if not query_str or not isinstance(query_str, str): + logger.warning(f"Skipping invalid search query for claim {c_id}: {query_str}") + return {"claim_id": c_id, "result": "Invalid query"} + + try: + # Wrap the search in asyncio.wait_for to prevent indefinite hangs + res = await asyncio.wait_for( + asyncio.to_thread(search_tool.invoke, query_str), + timeout=SEARCH_TIMEOUT_SECONDS + ) + logger.info(f"Search Result for Claim {c_id}: {res[:200]}...") + return {"claim_id": c_id, "result": res} + except asyncio.TimeoutError: + logger.warning( + f"Search timed out after {SEARCH_TIMEOUT_SECONDS}s for query: {query_str}" + ) + return {"claim_id": c_id, "result": "Search timed out"} + except Exception as e: + logger.error(f"Search failed for query: {query_str}: {e}") + return {"claim_id": c_id, "result": "Search failed"} + + results = await asyncio.gather(*[run_one_search(q) for q in queries]) + + logger.info(f"Completed {len(results)} searches.") + return {"search_results": results} + + +async def verify_facts_node(state): + """Verify claims against search results using LLM.""" + logger.info("--- Fact Check Step 4: Verifying Facts ---") + claims = state.get("claims", []) + results = state.get("search_results", []) + + if not claims: + return {"facts": [], "fact_check_done": True} + + context = "Verify claims:\n" + for item in results: + try: + c_id = int(item["claim_id"]) + except (KeyError, TypeError, ValueError): + continue + if 0 <= c_id < len(claims): + # Limit evidence to first 300 chars + evidence = item['result'][:300] if item.get('result') else 'No evidence' + context += f"Claim: {claims[c_id]}\nEvidence: {evidence}\n" + + try: + response = await asyncio.to_thread( + client.chat.completions.create, + messages=[ + { + "role": "system", + "content": "Return JSON: {\"facts\": [{\"claim\": \"...\", \"status\": true/false, \"reason\": \"brief\"}]}" + }, + {"role": "user", "content": context[:1500]} + ], + model=LLM_MODEL, + temperature=0.0, + max_tokens=300, + response_format={"type": "json_object"} + ) + + final_verdict_str = response.choices[0].message.content + + data = json.loads(final_verdict_str) + + facts_list = [] + if isinstance(data, dict): + # Look for common keys if wrapped + if "facts" in data: + facts_list = data["facts"] + elif "verified_claims" in data: + facts_list = data["verified_claims"] + else: + facts_list = [data] + elif isinstance(data, list): + facts_list = data + + logger.info(f"Verified {len(facts_list)} facts.") + return {"facts": facts_list, "fact_check_done": True} + + except Exception as e: + logger.error(f"Verification failed: {e}") + return {"facts": [], "fact_check_done": True} diff --git a/backend/app/modules/facts_check/llm_processing.py b/backend/app/modules/facts_check/llm_processing.py index dc223a85..1fe34979 100644 --- a/backend/app/modules/facts_check/llm_processing.py +++ b/backend/app/modules/facts_check/llm_processing.py @@ -63,7 +63,7 @@ def run_claim_extractor_sdk(state): ), }, ], - model="gemma2-9b-it", + model="llama-3.3-70b-versatile", temperature=0.3, max_tokens=512, ) @@ -128,7 +128,7 @@ def run_fact_verifier_sdk(search_results): ), }, ], - model="gemma2-9b-it", + model="llama-3.3-70b-versatile", temperature=0.3, max_tokens=256, ) diff --git a/backend/app/modules/langgraph_builder.py b/backend/app/modules/langgraph_builder.py index 7729f945..c9c78b27 100644 --- a/backend/app/modules/langgraph_builder.py +++ b/backend/app/modules/langgraph_builder.py @@ -6,12 +6,12 @@ and retry logic. Workflow: - 1. Sentiment analysis on the cleaned text. - 2. Fact-checking detected claims. - 3. Generating a counter-perspective. - 4. Judging the quality of the generated perspective. - 5. Storing results and sending them downstream. - 6. Error handling at any step if failures occur. + 1. Parallel analysis: sentiment analysis and fact checking tool pipeline + (extract_claims -> plan_searches -> execute_searches -> verify_facts) + 2. Generating a counter-perspective. + 3. Judging the quality of the generated perspective. + 4. Storing results and sending. + 5. Error handling at any step if failures occur. Core Features: - Uses a TypedDict (`MyState`) to define the shape of the pipeline's @@ -31,18 +31,18 @@ """ +from typing import List, Any from langgraph.graph import StateGraph +from typing_extensions import TypedDict + from app.modules.langgraph_nodes import ( sentiment, - fact_check, generate_perspective, judge, store_and_send, error_handler, ) -from typing_extensions import TypedDict - class MyState(TypedDict): cleaned_text: str @@ -52,29 +52,26 @@ class MyState(TypedDict): score: int retries: int status: str + claims: List[str] + search_queries: List[Any] + search_results: List[Any] def build_langgraph(): graph = StateGraph(MyState) - graph.add_node("sentiment_analysis", sentiment.run_sentiment_sdk) - graph.add_node("fact_checking", fact_check.run_fact_check) + # parallel analysis runs sentiment and fact_check tool pipeline in parallel + graph.add_node("parallel_analysis", sentiment.run_parallel_analysis) + graph.add_node("generate_perspective", generate_perspective.generate_perspective) graph.add_node("judge_perspective", judge.judge_perspective) graph.add_node("store_and_send", store_and_send.store_and_send) graph.add_node("error_handler", error_handler.error_handler) - graph.set_entry_point( - "sentiment_analysis", - ) + graph.set_entry_point("parallel_analysis") graph.add_conditional_edges( - "sentiment_analysis", - lambda x: ("error_handler" if x.get("status") == "error" else "fact_checking"), - ) - - graph.add_conditional_edges( - "fact_checking", + "parallel_analysis", lambda x: ( "error_handler" if x.get("status") == "error" else "generate_perspective" ), @@ -101,6 +98,7 @@ def build_langgraph(): else "store_and_send" ), ) + graph.add_conditional_edges( "store_and_send", lambda x: ("error_handler" if x.get("status") == "error" else "__end__"), @@ -109,3 +107,4 @@ def build_langgraph(): graph.set_finish_point("store_and_send") return graph.compile() + diff --git a/backend/app/modules/langgraph_nodes/generate_perspective.py b/backend/app/modules/langgraph_nodes/generate_perspective.py index be0c81f3..41810e1e 100644 --- a/backend/app/modules/langgraph_nodes/generate_perspective.py +++ b/backend/app/modules/langgraph_nodes/generate_perspective.py @@ -56,17 +56,19 @@ def generate_perspective(state): if not text: raise ValueError("Missing or empty 'cleaned_text' in state") - elif not facts: - raise ValueError("Missing or empty 'facts' in state") - - facts_str = "\n".join( - [ - f"Claim: {f['original_claim']}\n" - "Verdict: {f['verdict']}\nExplanation: " - "{f['explanation']}" - for f in state["facts"] - ] - ) + # Handle both old format (original_claim/verdict/explanation) and + # new format (claim/status/reason) + if not facts: + facts_str = "No facts available." + else: + facts_str = "\n".join( + [ + f"Claim: {f.get('claim', f.get('original_claim', 'Unknown'))}\n" + f"Verdict: {f.get('status', f.get('verdict', 'Unknown'))}\n" + f"Explanation: {f.get('reason', f.get('explanation', 'N/A'))}" + for f in facts + ] + ) result = chain.invoke( { diff --git a/backend/app/modules/langgraph_nodes/judge.py b/backend/app/modules/langgraph_nodes/judge.py index 57100301..f3eaef06 100644 --- a/backend/app/modules/langgraph_nodes/judge.py +++ b/backend/app/modules/langgraph_nodes/judge.py @@ -24,7 +24,7 @@ # Init once groq_llm = ChatGroq( - model="gemma2-9b-it", + model="llama-3.3-70b-versatile", temperature=0.0, max_tokens=10, ) diff --git a/backend/app/modules/langgraph_nodes/sentiment.py b/backend/app/modules/langgraph_nodes/sentiment.py index fef1d39d..6862aa13 100644 --- a/backend/app/modules/langgraph_nodes/sentiment.py +++ b/backend/app/modules/langgraph_nodes/sentiment.py @@ -11,13 +11,18 @@ Functions: run_sentiment_sdk(state: dict) -> dict: Analyzes sentiment and updates the state with the result. + + run_parallel_analysis(state: dict) -> dict: + Runs sentiment analysis and fact-checking tool nodes in parallel. + Combines claims, search_queries, search_results, and facts into state. """ - +import asyncio import os from groq import Groq from dotenv import load_dotenv from app.logging.logging_config import setup_logger +from app.modules import fact_check_tool logger = setup_logger(__name__) @@ -26,6 +31,74 @@ client = Groq(api_key=os.getenv("GROQ_API_KEY")) +async def run_parallel_analysis(state): + """ + Runs sentiment analysis and fact-checking pipeline in parallel. + + The fact-checking pipeline runs sequentially: + extract_claims -> plan_searches -> execute_searches -> verify_facts + """ + async def run_fact_check_pipeline(state): + try: + claims_result = await fact_check_tool.extract_claims_node(state) + current_state = {**state, **claims_result} + + searches_result = await fact_check_tool.plan_searches_node(current_state) + current_state = {**current_state, **searches_result} + + exec_result = await fact_check_tool.execute_searches_node(current_state) + current_state = {**current_state, **exec_result} + + verify_result = await fact_check_tool.verify_facts_node(current_state) + current_state = {**current_state, **verify_result} + + return { + "claims": current_state.get("claims", []), + "search_queries": current_state.get("search_queries", []), + "search_results": current_state.get("search_results", []), + "facts": current_state.get("facts", []), + "status": "success" + } + except Exception as e: + logger.exception(f"Error in fact_check_pipeline: {e}") + return { + "status": "error", + "error_from": "fact_checking", + "message": str(e) + } + + sentiment_task = asyncio.to_thread(run_sentiment_sdk, state) + fact_check_task = run_fact_check_pipeline(state) + + sentiment_result, fact_check_result = await asyncio.gather( + sentiment_task, fact_check_task + ) + + if sentiment_result.get("status") == "error": + return { + "status": "error", + "error_from": sentiment_result.get("error_from", "sentiment_analysis"), + "message": sentiment_result.get("message", "Unknown error") + } + + if fact_check_result.get("status") == "error": + return { + "status": "error", + "error_from": fact_check_result.get("error_from", "fact_checking"), + "message": fact_check_result.get("message", "Unknown error") + } + + return { + **state, + "sentiment": sentiment_result.get("sentiment"), + "claims": fact_check_result.get("claims", []), + "search_queries": fact_check_result.get("search_queries", []), + "search_results": fact_check_result.get("search_results", []), + "facts": fact_check_result.get("facts", []), + "status": "success" + } + + def run_sentiment_sdk(state): try: text = state.get("cleaned_text") @@ -49,7 +122,7 @@ def run_sentiment_sdk(state): ), }, ], - model="gemma2-9b-it", + model="llama-3.3-70b-versatile", temperature=0.2, max_tokens=3, ) diff --git a/backend/app/modules/pipeline.py b/backend/app/modules/pipeline.py index 3e4a844e..c6da8fc2 100644 --- a/backend/app/modules/pipeline.py +++ b/backend/app/modules/pipeline.py @@ -27,8 +27,8 @@ returning a dictionary containing the cleaned text and keywords. run_langgraph_workflow(state: dict) -> dict - Invokes the pre-compiled LangGraph workflow with the provided - state dictionary and returns the result. + Invokes the pre-compiled LangGraph workflow asynchronously with + the provided state dictionary and returns the result. """ @@ -38,6 +38,7 @@ from app.modules.langgraph_builder import build_langgraph from app.logging.logging_config import setup_logger import json +import asyncio logger = setup_logger(__name__) @@ -64,8 +65,9 @@ def run_scraper_pipeline(url: str) -> dict: return result -def run_langgraph_workflow(state: dict): - """Execute the pre-compiled LangGraph workflow.""" - result = _LANGGRAPH_WORKFLOW.invoke(state) +async def run_langgraph_workflow(state: dict): + """Execute the pre-compiled LangGraph workflow asynchronously.""" + result = await _LANGGRAPH_WORKFLOW.ainvoke(state) logger.info("LangGraph workflow executed successfully.") return result + diff --git a/backend/app/modules/scraper/extractor.py b/backend/app/modules/scraper/extractor.py index 15fe0199..c9611fd4 100644 --- a/backend/app/modules/scraper/extractor.py +++ b/backend/app/modules/scraper/extractor.py @@ -41,7 +41,7 @@ def __init__(self, url): def _fetch_html(self): try: - res = requests.get(self.url, self.headers, timeout=10) + res = requests.get(self.url, headers=self.headers, timeout=10) res.raise_for_status() return res.text except requests.RequestException as e: diff --git a/backend/app/modules/vector_store/chunk_rag_data.py b/backend/app/modules/vector_store/chunk_rag_data.py index 7e2a32eb..5cca0ff3 100644 --- a/backend/app/modules/vector_store/chunk_rag_data.py +++ b/backend/app/modules/vector_store/chunk_rag_data.py @@ -73,23 +73,24 @@ def chunk_rag_data(data): ) # Add each fact as a separate chunk + # Handle both old format (original_claim/verdict/explanation/source_link) and + # new DuckDuckGo format (claim/status/reason) for i, fact in enumerate(data["facts"]): - fact_fields = ["original_claim", "verdict", "explanation", "source_link"] - for field in fact_fields: - if field not in fact: - raise ValueError( - f"Missing required fact field: {field} in fact index {i}" - ) + # Get claim text with flexible key lookup + claim_text = fact.get("claim") or fact.get("original_claim") or "Unknown claim" + verdict = fact.get("status") or fact.get("verdict") or "Unknown" + explanation = fact.get("reason") or fact.get("explanation") or "N/A" + source_link = fact.get("source_link", "") chunks.append( { "id": f"{article_id}-fact-{i}", - "text": fact["original_claim"], + "text": claim_text, "metadata": { "type": "fact", - "verdict": fact["verdict"], - "explanation": fact["explanation"], - "source_link": fact["source_link"], + "verdict": str(verdict), + "explanation": explanation, + "source_link": source_link, "article_id": article_id, }, } diff --git a/backend/app/routes/routes.py b/backend/app/routes/routes.py index 6988f5e8..989dcc8e 100644 --- a/backend/app/routes/routes.py +++ b/backend/app/routes/routes.py @@ -10,41 +10,53 @@ POST /bias Accepts a URL, scrapes and processes the article content, and runs bias detection - to return a bias score and related insights. + to return a bias score and related insights. Results are cached for faster + repeated requests. POST /process Accepts a URL, scrapes and processes the article content, then executes the LangGraph workflow for sentiment analysis, fact-checking, perspective generation, - and final result assembly. + and final result assembly. Results are cached for faster repeated requests. POST /chat Accepts a user query, searches stored vector data in Pinecone, and queries an LLM to produce a contextual answer. + GET /cache/stats + Returns cache statistics including hit rates and entry counts. + Core Components: - run_scraper_pipeline: Extracts and cleans article text, then identifies keywords. - run_langgraph_workflow: Executes the LangGraph pipeline for deep content analysis. - check_bias: Scores and analyzes potential bias in article content. - search_pinecone: Retrieves relevant RAG data for a given query. - ask_llm: Generates a natural language answer using retrieved context. + - cache: In-memory cache for API responses to reduce LLM API calls. """ -from fastapi import APIRouter +from fastapi import APIRouter, Header, HTTPException from pydantic import BaseModel from app.modules.pipeline import run_scraper_pipeline from app.modules.pipeline import run_langgraph_workflow from app.modules.bias_detection.check_bias import check_bias from app.modules.chat.get_rag_data import search_pinecone from app.modules.chat.llm_processing import ask_llm +from app.cache import cache from app.logging.logging_config import setup_logger import asyncio import json +import os +import hashlib +from pathlib import Path logger = setup_logger(__name__) router = APIRouter() +# Admin API key for destructive cache operations (set via environment variable) +ADMIN_API_KEY = os.getenv("ADMIN_API_KEY", "") + class URlRequest(BaseModel): url: str @@ -59,19 +71,101 @@ async def home(): return {"message": "Perspective API is live!"} +# Directory for saving cache hit responses +CACHE_RESPONSES_DIR = Path(__file__).parent.parent.parent / "cache_responses" +CACHE_RESPONSES_DIR.mkdir(exist_ok=True) +CACHE_RESPONSES_MAX_FILES = 100 # Max files to keep + + +async def _cleanup_old_cache_files() -> None: + """Remove oldest cache files if exceeding max limit.""" + try: + files = list(CACHE_RESPONSES_DIR.glob("*.json")) + if len(files) > CACHE_RESPONSES_MAX_FILES: + # Sort by modification time (oldest first) + files.sort(key=lambda f: f.stat().st_mtime) + # Remove oldest files to stay under limit + files_to_remove = files[:len(files) - CACHE_RESPONSES_MAX_FILES] + for f in files_to_remove: + f.unlink() + logger.debug(f"Evicted old cache file: {f.name}") + except Exception as e: + logger.error(f"Failed to cleanup cache files: {e}") + + +async def _save_cache_response(url: str, response: dict) -> None: + """Save cache hit response to a local JSON file.""" + try: + # Create filename from URL hash + url_hash = hashlib.sha256(url.encode()).hexdigest()[:12] + filename = f"{url_hash}.json" + filepath = CACHE_RESPONSES_DIR / filename + + # Custom JSON encoder for Pydantic and other non-serializable objects + def json_serializer(obj): + if hasattr(obj, "model_dump"): # Pydantic v2 + return obj.model_dump() + if hasattr(obj, "dict"): # Pydantic v1 + return obj.dict() + return str(obj) + + # Save response as formatted JSON + json_content = json.dumps(response, indent=2, ensure_ascii=False, default=json_serializer) + await asyncio.to_thread( + lambda: filepath.write_text(json_content, encoding="utf-8") + ) + logger.info(f"Saved cache response to: {filepath}") + + # Cleanup old files if over limit + await _cleanup_old_cache_files() + except Exception as e: + logger.error(f"Failed to save cache response: {e}") + + @router.post("/bias") async def bias_detection(request: URlRequest): + # Check cache first + cached_response = cache.get("bias", request.url) + if cached_response: + logger.info(f"Returning cached bias result for: {request.url}") + return cached_response + + # Process if not cached content = await asyncio.to_thread(run_scraper_pipeline, (request.url)) bias_score = await asyncio.to_thread(check_bias, (content)) logger.info(f"Bias detection result: {bias_score}") + + # Store in cache (only if successful) + if bias_score.get("status") == "success": + cache.set("bias", request.url, bias_score) + + # Add cache miss metadata + bias_score["_cache"] = {"hit": False} return bias_score @router.post("/process") async def run_pipelines(request: URlRequest): + # Check cache first + cached_response = cache.get("process", request.url) + if cached_response: + logger.info(f"Returning cached process result for: {request.url}") + # Auto-save cache hit response to local JSON file + await _save_cache_response(request.url, cached_response) + return cached_response + + # Process if not cached article_text = await asyncio.to_thread(run_scraper_pipeline, (request.url)) logger.debug(f"Scraper output: {json.dumps(article_text, indent=2, ensure_ascii=False)}") - data = await asyncio.to_thread(run_langgraph_workflow, (article_text)) + data = await run_langgraph_workflow(article_text) + + # Store in cache (only if successful) + if isinstance(data, dict) and data.get("status") == "success": + cache.set("process", request.url, data) + + # Add cache miss metadata + if isinstance(data, dict): + data["_cache"] = {"hit": False} return data @@ -83,3 +177,49 @@ async def answer_query(request: ChatQuery): logger.info(f"Chat answer generated: {answer}") return {"answer": answer} + + +@router.get("/cache/stats") +async def cache_stats(): + """Return cache statistics.""" + return cache.stats() + + +@router.delete("/cache/clear") +async def cache_clear(x_admin_key: str = Header(None, alias="X-Admin-Key")): + """Clear all cache entries. Requires X-Admin-Key header.""" + if not ADMIN_API_KEY or x_admin_key != ADMIN_API_KEY: + raise HTTPException(status_code=403, detail="Forbidden: Invalid or missing admin key") + count = cache.clear() + logger.info(f"Cache cleared: {count} entries removed") + return {"message": f"Cleared {count} cache entries", "cleared": count} + + +@router.delete("/cache/{endpoint}") +async def cache_delete( + endpoint: str, + request: URlRequest, + x_admin_key: str = Header(None, alias="X-Admin-Key") +): + """ + Delete a specific cache entry. Requires X-Admin-Key header. + + Args: + endpoint: The endpoint type ("process" or "bias") + request: The URL request containing the article URL + """ + if not ADMIN_API_KEY or x_admin_key != ADMIN_API_KEY: + raise HTTPException(status_code=403, detail="Forbidden: Invalid or missing admin key") + + if endpoint not in ["process", "bias"]: + raise HTTPException( + status_code=400, + detail="Invalid endpoint. Use 'process' or 'bias'" + ) + + deleted = cache.delete(endpoint, request.url) + if deleted: + logger.info(f"Cache entry deleted for {endpoint}: {request.url}") + return {"message": f"Deleted cache entry for {endpoint}", "deleted": True} + else: + return {"message": "Cache entry not found", "deleted": False}