diff --git a/.docker/Dockerfile b/.docker/Dockerfile index d00a424..1f0542a 100644 --- a/.docker/Dockerfile +++ b/.docker/Dockerfile @@ -1,6 +1,10 @@ # Use an official PyTorch image with CUDA support FROM pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime +#Need this to prevent the playwright deps from hanging during the install +ENV DEBIAN_FRONTEND=noninteractive +ENV TZ=Etc/UTC + # Install Git (and any other system tools you need) RUN apt-get update && \ apt-get install -y python3 python3-pip git && \ @@ -10,6 +14,8 @@ RUN apt-get update && \ COPY requirements.txt /workspace/requirements.txt RUN pip install --no-cache-dir -r /workspace/requirements.txt +RUN playwright install +RUN playwright install-deps # Set working directory WORKDIR /workspace diff --git a/.gitignore b/.gitignore index 290f23e..4c1e893 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ src/celerybeat-schedule.* feed*.json .DS_Store .venv +.env # Ignore all __pycache__ directories at any level **/__pycache__/ diff --git a/docker-compose.yml b/docker-compose.yml index b6fe913..6a681a9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,12 +1,10 @@ -version: '3.8' - services: justinsight_service: build: context: . dockerfile: .docker/Dockerfile - environment: - - MONGODB_URI=mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin + # environment: + # - MONGODB_URI=mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin # bind-mount your repo and the shared EBS volume volumes: - ./:/workspace:cached @@ -27,8 +25,8 @@ services: build: context: . dockerfile: .docker/Dockerfile - environment: - - MONGODB_URI=mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin + # environment: + # - MONGODB_URI=mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin volumes: - .:/workspace:cached - mongo_data:/data/db @@ -42,8 +40,8 @@ services: build: context: . dockerfile: .docker/Dockerfile - environment: - - MONGODB_URI=mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin + # environment: + # - MONGODB_URI=mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin volumes: - .:/workspace:cached depends_on: @@ -64,5 +62,41 @@ services: - mongo_data:/data/db - .:/workspace:cached + streamlitapp: + build: + context: . + dockerfile: .docker/Dockerfile + container_name: streamlit + ports: + - "8501:8501" + depends_on: + - mongo + environment: + - MONGODB_URI=mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin + volumes: + # - mongo_data:/data/db + - .:/workspace:cached + command: ["streamlit", "run", "src/justinsight/streamlitapp.py"] + + #will go in the docker-compose.gpu.yml file once we deploy + # celery_gpu_worker: + # build: + # context: . + # dockerfile: .docker/Dockerfile + # command: ["celery", "-A", justinsight.celery, "worker", "-Q", "gpu", "--loglevel=info"] #the lowercase j is actually so important + # environment: + # - CELERY_BROKER_URL=redis://redis:6379/0 + # volumes: + # - .:/workspace:cached + # - mongo_data:/data/db + # depends_on: + # - redis + # - mongo + # deploy: + # resources: + # reservations: + # devices: + # - capabilities: [gpu] + volumes: mongo_data: \ No newline at end of file diff --git a/readme.md b/readme.md index 8335988..ee9efa3 100644 --- a/readme.md +++ b/readme.md @@ -49,8 +49,9 @@ Note: When running in background you can use "docker logs ## How to check what's in the database -Please run: docker compose up -d +Please run: docker compose up -d --build Then: docker exec -it mongo mongosh -u myuser -p mypassword Then: use justinsightdb Then: db.articles.find().pretty() -Note - you may need to download mongosh for this to work and to exit the mongosh environment just run 'exit'. Remember to 'docker compose down' as the containers will be running in the background. \ No newline at end of file +Note - you may need to download mongosh for this to work and to exit the mongosh environment just run 'exit'. Remember to 'docker compose down' as the containers will be running in the background. +Note - to delete everything in your database run the docker in detached mode and then run ./scripts/clear_db.sh \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 18115a6..034333a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ celery[redis] feedparser playwright pymongo -requests \ No newline at end of file +requests +streamlit \ No newline at end of file diff --git a/scripts/clear_db.sh b/scripts/clear_db.sh new file mode 100755 index 0000000..c8d584a --- /dev/null +++ b/scripts/clear_db.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +docker exec mongo mongosh -u myuser -p mypassword --authenticationDatabase admin --eval " + db = db.getSiblingDB('justinsightdb'); + db.dropDatabase(); + db.stats()" \ No newline at end of file diff --git a/src/ingest/ap_ingestor.py b/src/ingest/ap_ingestor.py index 92bb2db..c790913 100644 --- a/src/ingest/ap_ingestor.py +++ b/src/ingest/ap_ingestor.py @@ -1,18 +1,46 @@ +import re +import requests +import json +from bs4 import BeautifulSoup from playwright.sync_api import sync_playwright from ingest.base_ingestor import BaseIngestor class APIngestor(BaseIngestor): RSS_URL = "https://news.google.com/rss/search?q=when:24h+allinurl:apnews.com&hl=en-US&gl=US&ceid=US:en" + def resolve_google_news_redirect(self, url): + resp = requests.get(url) + data = BeautifulSoup(resp.text, 'html.parser').select_one('c-wiz[data-p]').get('data-p') + obj = json.loads(data.replace('%.@.', '["garturlreq",')) + + payload = { + 'f.req': json.dumps([[['Fbv4je', json.dumps(obj[:-6] + obj[-2:]), 'null', 'generic']]]) + } + + headers = { + 'content-type': 'application/x-www-form-urlencoded;charset=UTF-8', + 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36', + } + + url = "https://news.google.com/_/DotsSplashUi/data/batchexecute" + response = requests.post(url, headers=headers, data=payload) + array_string = json.loads(response.text.replace(")]}'", ""))[0][2] + return json.loads(array_string)[1] + + def fetch_full_text(self, article_url): try: with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page() - page.goto(article_url, timeout=15000) + + #change the article_url to the redirected one (or just the same) + article_url = self.resolve_google_news_redirect(article_url) + #print(article_url) + page.goto(article_url, wait_until="domcontentloaded", timeout=15000) # Wait for the main article body to load - page.wait_for_selector('div.RichTextStoryBody', timeout=5000) + page.wait_for_selector('div.RichTextStoryBody', timeout=3000) # Extract the text content from the paragraphs inside the body content = page.query_selector_all('div.RichTextStoryBody p') @@ -22,6 +50,6 @@ def fetch_full_text(self, article_url): return full_text.strip() except Exception as e: - print(f"Playwright error fetching {url}: {e}") + #print(f"Playwright error fetching {article_url}: {e}") return "" diff --git a/src/ingest/base_ingestor.py b/src/ingest/base_ingestor.py index e399c46..3feca58 100644 --- a/src/ingest/base_ingestor.py +++ b/src/ingest/base_ingestor.py @@ -37,8 +37,18 @@ def format_entry(self, entry): } return data - def check_and_save_new_entries(self): + def check_and_save_new_entries(self, using_celery=False): feed = feedparser.parse(self.RSS_URL) for entry in feed.entries: - save_entry(self.format_entry(entry)) + formattedEntry = self.format_entry(entry) + save_entry(formattedEntry, using_celery) + + def check_no_save_new_entries(self): + feed = feedparser.parse(self.RSS_URL) + all_entries = [] + + for entry in feed.entries: + all_entries.append(self.format_entry(entry)) + + return all_entries diff --git a/src/ingest/save_to_database.py b/src/ingest/save_to_database.py index 72ac544..e40b91b 100644 --- a/src/ingest/save_to_database.py +++ b/src/ingest/save_to_database.py @@ -1,13 +1,18 @@ import os from pymongo import MongoClient +from celery import current_app +from celery.exceptions import NotRegistered # Default to local MongoDB when not using docker -mongodb_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017") - -#mongodb_uri = os.getenv("mongodb://localhost:27017", "mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin") - +#mongodb_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017") +#mongodb_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017") +# mongodb_uri = os.getenv( +# "MONGODB_URI", +# "mongodb://myuser:mypassword@localhost:27017/justinsightdb?authSource=admin" +# ) # Include username, password, and authentication database -client = MongoClient(mongodb_uri) + +client = MongoClient("mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin") # Get (or create) a database db = client["justinsightdb"] @@ -15,9 +20,31 @@ # Get (or create) a collection collection = db["articles"] -def save_entry(entry): +def save_entry(entry, using_celery): + #locally import tasks just in this method to prevent circular import + from justinsight.tasks import runNER_task + + #dont save entries without body text + if entry["full_text"] == "" or entry["full_text"] == None: + return + #check if the entry has already been saved and if it has not then save it entry_hash = entry["id"] if collection.count_documents({"id": entry_hash}) == 0: - collection.insert_one(entry) - print(f"I have now saved: {entry['title']}") + result = collection.insert_one(entry) + inserted_id = result.inserted_id + + if using_celery: + # Ensure we’re inside a celery app context and the task is known + try: + current_app.tasks[runNER_task.name] + runNER_task.apply_async(args=[str(inserted_id)], queue='gpu') + except NotRegistered: + # fallback to inline + runNER_task(str(inserted_id)) + else: + # Inline execution + runNER_task(str(inserted_id)) + + + #print(f"I have now saved: {entry['title']}") diff --git a/src/justinsight/celery.py b/src/justinsight/celery.py index 6c6dda0..c1d30f6 100644 --- a/src/justinsight/celery.py +++ b/src/justinsight/celery.py @@ -16,18 +16,21 @@ # "args": (), # }, + #BUGGY Not fixing right now "check-APfeed-every-5-minutes": { "task": "justinsight.tasks.apLogger_task", "schedule": 5.0, "args": (), }, + #this feed is WORKING "check-BBCfeed-every-5-minutes": { "task": "justinsight.tasks.bbcLogger_task", "schedule": 5.0, "args": (), }, + #ALSO BUGGY "check-CBSfeed-every-5-minutes": { "task": "justinsight.tasks.cbsLogger_task", "schedule": 5.0, diff --git a/src/justinsight/nlpthings.py b/src/justinsight/nlpthings.py new file mode 100644 index 0000000..201a5fc --- /dev/null +++ b/src/justinsight/nlpthings.py @@ -0,0 +1,24 @@ +import os +from pymongo import MongoClient +from bson import ObjectId + +# Default to local MongoDB when not using docker +mongodb_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017") + +# Include username, password, and authentication database +#client = MongoClient(mongodb_uri) +client = MongoClient("mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin") + +# Get (or create) a database +db = client["justinsightdb"] + +# Get (or create) a collection +collection = db["articles"] + +def dummy_addToEntryInDB(entry_id): + id = ObjectId(entry_id) + collection.update_one( + {"_id": id}, + {"$set": {"DummyField": "I added something!"}} + ) + print("\n\n I am trying to add a row to an entry \n\n") diff --git a/src/justinsight/streamlitapp.py b/src/justinsight/streamlitapp.py new file mode 100644 index 0000000..8beb783 --- /dev/null +++ b/src/justinsight/streamlitapp.py @@ -0,0 +1,29 @@ +import os +import streamlit as st +from pymongo import MongoClient +import pandas as pd + +# Default to local MongoDB when not using docker --- NO I dont want a local database at all +#mongo_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017") + +# Include username, password, and authentication database +#client = MongoClient(mongo_uri) +client = MongoClient(os.getenv("MONGODB_URI")) + +# Get (or create) a database +db = client["justinsightdb"] + +# Get (or create) a collection +collection = db["articles"] + +st.title("Database Viewing") + +# Load data from MongoDB +data = list(collection.find()) + +# Convert ObjectId to string for each document +for doc in data: + doc['_id'] = str(doc['_id']) + +df = pd.DataFrame(data) +st.dataframe(df) diff --git a/src/justinsight/tasks.py b/src/justinsight/tasks.py index ad55038..bb07cff 100644 --- a/src/justinsight/tasks.py +++ b/src/justinsight/tasks.py @@ -8,6 +8,7 @@ from ingest.npr_ingestor import NPRIngestor from ingest.nyt_ingestor import NYTIngestor from ingest.usnews_ingestor import USNEWSIngestor +from .nlpthings import dummy_addToEntryInDB @shared_task def sample_task(): @@ -18,62 +19,70 @@ def sample_task(): def apLogger_task(): # Create an instance of the class ingestor = APIngestor() - ingestor.check_and_save_new_entries() # this will invoke the inherited logic + ingestor.check_and_save_new_entries(using_celery=True) # this will invoke the inherited logic return "AP RSS Feed checked." @shared_task def bbcLogger_task(): # Create an instance of the class ingestor = BBCIngestor() - ingestor.check_and_save_new_entries() # this will invoke the inherited logic + ingestor.check_and_save_new_entries(using_celery=True) # this will invoke the inherited logic return "BBC RSS Feed checked." @shared_task def cbsLogger_task(): # Create an instance of the class ingestor = CBSIngestor() - ingestor.check_and_save_new_entries() # this will invoke the inherited logic + ingestor.check_and_save_new_entries(using_celery=True) # this will invoke the inherited logic return "CBS RSS Feed checked." @shared_task def cnnLogger_task(): ingestor = CNNIngestor() - ingestor.check_and_save_new_entries() # this will invoke the inherited logic + ingestor.check_and_save_new_entries(using_celery=True) # this will invoke the inherited logic return "CNN RSS Feed checked." @shared_task def latimesLogger_task(): # Create an instance of the class ingestor = LATIMESIngestor() - ingestor.check_and_save_new_entries() # this will invoke the inherited logic + ingestor.check_and_save_new_entries(using_celery=True) # this will invoke the inherited logic return "LATIMES RSS Feed checked." @shared_task def nbcLogger_task(): # Create an instance of the class ingestor = NBCIngestor() - ingestor.check_and_save_new_entries() # this will invoke the inherited logic + ingestor.check_and_save_new_entries(using_celery=True) # this will invoke the inherited logic return "NBC RSS Feed checked." @shared_task def nprLogger_task(): # Create an instance of the class ingestor = NPRIngestor() - ingestor.check_and_save_new_entries() # this will invoke the inherited logic + ingestor.check_and_save_new_entries(using_celery=True) # this will invoke the inherited logic return "NPR RSS Feed checked." @shared_task def nytLogger_task(): # Create an instance of the class ingestor = NYTIngestor() - ingestor.check_and_save_new_entries() # this will invoke the inherited logic + ingestor.check_and_save_new_entries(using_celery=True) # this will invoke the inherited logic return "NYT RSS Feed checked." @shared_task def usnewsLogger_task(): # Create an instance of the class ingestor = USNEWSIngestor() - ingestor.check_and_save_new_entries() # this will invoke the inherited logic + ingestor.check_and_save_new_entries(using_celery=True) # this will invoke the inherited logic return "USNEWS RSS Feed checked." + +@shared_task +def runNER_task(entry_id): + print(f"New worker so we can use GPU on this entry id: {entry_id}") + dummy_addToEntryInDB(entry_id) + #Do GPU-dependent processing here + + #Add more tasks here in the format of the one above \ No newline at end of file diff --git a/tests/test_rssFeeds.py b/tests/test_rssFeeds.py new file mode 100644 index 0000000..4af792e --- /dev/null +++ b/tests/test_rssFeeds.py @@ -0,0 +1,11 @@ +from ingest.ap_ingestor import APIngestor +from pprint import pprint + +def apLogger_task(): + # Create an instance of the class + ingestor = APIngestor() + entries = ingestor.check_no_save_new_entries() #this will not save to a database + pprint(entries) + +if __name__ == '__main__': + apLogger_task() \ No newline at end of file diff --git a/tests/test_trivial.py b/tests/test_trivial.py index 728f100..9efc569 100644 --- a/tests/test_trivial.py +++ b/tests/test_trivial.py @@ -8,6 +8,7 @@ #brew tap mongodb/brew #brew install mongodb-community #brew services start mongodb-community +#playwright install #deactivate #then we can start testing @@ -15,11 +16,6 @@ #then, run: pytest #or if you want to see printed output from your tests: pytest -s -from justinsight.tasks import bbcLogger_task - def test_add(): assert 1 + 1 == 2 - -def test_bbcCheck(): - bbcLogger_task()