From 7132d9aaa26e87962e3f72b7cbef693eabbf7375 Mon Sep 17 00:00:00 2001 From: Chloe Date: Wed, 6 Aug 2025 12:54:07 -0500 Subject: [PATCH 1/3] base core --- src/nlp/base_core.py | 56 +++++++++++++++++++++++++++++++++++++++ src/nlp/core.py | 61 ------------------------------------------ src/nlp/ner_core.py | 63 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+), 61 deletions(-) create mode 100644 src/nlp/base_core.py delete mode 100644 src/nlp/core.py create mode 100644 src/nlp/ner_core.py diff --git a/src/nlp/base_core.py b/src/nlp/base_core.py new file mode 100644 index 0000000..7e83c45 --- /dev/null +++ b/src/nlp/base_core.py @@ -0,0 +1,56 @@ +from pymongo import MongoClient +from bson import ObjectId +from transformers import pipeline + +class BaseCore: + # Include username, password, and authentication database + client = MongoClient("mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin") + db = client["justinsightdb"] + collection = db["articles"] + + model = None #Set in subclass + + def __init__(self, task: str, model_name: str, aggregation_strategy: str = None): + self.task = task + self.model_name = model_name + self.aggregation_strategy = self._load_pipeline() + + def _load_pipeline(self): + args = { + "task": self.task, + "model": self.model_name + } + + if self.task == "ner" and self.aggregation_strategy: + args["aggregation_strategy"] = self.aggregation_strategy + + return pipeline(args) + + def process_article(self, article_id: str): + #Retrieve article by ID + article = self.collection.find_one({"_id": ObjectId(article_id)}) + + if not article: + print(f"No article found with ID: {article_id}") + return [] + + if article.get("processed") is True: + print(f"Article {article_id} already processed.") + return + + # We have a check running so only articles with full text are saved + full_text = article.get("full_text", "") + # if not full_text: + # print(f"Article {article_id} has no full text.") + # return + + entities = self.pipeline(full_text) + + self.collection.update_one( + {"_id": ObjectId(article_id)}, + {"$set": { + "processed": True, + "entities": entities + }} + ) + return entities diff --git a/src/nlp/core.py b/src/nlp/core.py deleted file mode 100644 index ce5cece..0000000 --- a/src/nlp/core.py +++ /dev/null @@ -1,61 +0,0 @@ -from transformers import pipeline -from pymongo import MongoClient -from bson import ObjectId - -# Include username, password, and authentication database -client = MongoClient("mongodb://myuser:mypassword@mongo:27017/justinsightdb?authSource=admin") -db = client["justinsightdb"] -collection = db["articles"] - -ner = pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple") - -def process_article(article_id: str): - #Retrieve article by ID - article = collection.find_one({"_id": ObjectId(article_id)}) - - if not article: - print(f"No article found with ID: {article_id}") - return [] - - if article.get("processed") is True: - print(f"Article {article_id} already processed.") - return - - # We have a check running so only articles with full text are saved - full_text = article.get("full_text", "") - # if not full_text: - # print(f"Article {article_id} has no full text.") - # return - - # Run NER - entities = ner(full_text) # run_ner_hf(full_text) - - # Update article in DB - addToEntryInDB(article_id, { - "ner": entities, - "processed": True - }) - - return - -def format_ner_tags(ner_list): - formatted = [] - for ent in ner_list: - label = ent.get("label") or ent.get("entity") or "UNKNOWN" - text = ent.get("text") or ent.get("word") or "" - formatted.append(f"{label}: {text}") - return ", ".join(formatted) - -def addToEntryInDB(entry_id, updates): - print("Adding NER results to database\r\r\r") - - if "ner" in updates: - for ent in updates["ner"]: - ent["score"] = float(ent["score"]) # convert np.float32 to Python float - updates["ner_pretty"] = format_ner_tags(updates["ner"]) # so we can actually read the NER - - id = ObjectId(entry_id) - collection.update_one( - {"_id": id}, - {"$set": updates} - ) \ No newline at end of file diff --git a/src/nlp/ner_core.py b/src/nlp/ner_core.py new file mode 100644 index 0000000..0d38ff5 --- /dev/null +++ b/src/nlp/ner_core.py @@ -0,0 +1,63 @@ +from nlp.base_core import BaseCore +from transformers import pipeline +from bson import ObjectId + +class NERCore(BaseCore): + def __init__(self): + super().__init__( + task="ner", + model_name="dslim/bert-base-NER", + aggregation_strategy="simple" + ) + + def process_article(self, article_id: str): + #Retrieve article by ID + article = self.collection.find_one({"_id": ObjectId(article_id)}) + + if not article: + print(f"No article found with ID: {article_id}") + return [] + + if article.get("processed") is True: + print(f"Article {article_id} already processed.") + return [] + + # We have a check running so only articles with full text are saved + full_text = article.get("full_text", "") + # if not full_text: + # print(f"Article {article_id} has no full text.") + # return + + # Run NER + entities = self.pipeline(full_text) # run_ner_hf(full_text) + + # Update article in DB + self.addToEntryInDB(article_id, { + "ner": entities, + "processed": True + }) + + return entities + + def format_ner_tags(self, ner_list): + formatted = [] + for ent in ner_list: + label = ent.get("label") or ent.get("entity") or ent.get("entity_group", "UNKNOWN") + text = ent.get("text") or ent.get("word") or "" + formatted.append(f"{label}: {text}") + return ", ".join(formatted) + + def addToEntryInDB(self, entry_id, updates): + print("Adding NER results to database\r\r\r") + + if "ner" in updates: + for ent in updates["ner"]: + ent["score"] = float(ent["score"]) # convert np.float32 to Python float + + updates["ner_pretty"] = self.format_ner_tags(updates["ner"]) # so we can actually read the NER + + id = ObjectId(entry_id) + self.collection.update_one( + {"_id": id}, + {"$set": updates} + ) \ No newline at end of file From d80fa34e73c4d586121e613339abd316401f9891 Mon Sep 17 00:00:00 2001 From: Chloe Date: Mon, 11 Aug 2025 10:05:38 -0500 Subject: [PATCH 2/3] more! --- src/justinsight/tasks.py | 5 +++-- src/nlp/base_core.py | 14 +++++--------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/src/justinsight/tasks.py b/src/justinsight/tasks.py index 26385ee..141593c 100644 --- a/src/justinsight/tasks.py +++ b/src/justinsight/tasks.py @@ -10,7 +10,7 @@ from ingest.usnews_ingestor import USNEWSIngestor from .nlpthings import dummy_addToEntryInDB from ingest.save_to_database import collection -from nlp.core import process_article +from nlp.ner_core import NERCore from bson import ObjectId @shared_task @@ -90,8 +90,9 @@ def runNER_task(entry_id): @shared_task def ner_task(article_id): # Process article with NER results + core = NERCore() print("Actually trying to do NER!!!") - process_article(article_id) + core.process_article(article_id) diff --git a/src/nlp/base_core.py b/src/nlp/base_core.py index 7e83c45..0d7a63f 100644 --- a/src/nlp/base_core.py +++ b/src/nlp/base_core.py @@ -13,18 +13,14 @@ class BaseCore: def __init__(self, task: str, model_name: str, aggregation_strategy: str = None): self.task = task self.model_name = model_name - self.aggregation_strategy = self._load_pipeline() - - def _load_pipeline(self): - args = { - "task": self.task, - "model": self.model_name - } + self.aggregation_strategy = aggregation_strategy + self.pipeline = self.load_pipeline() + def load_pipeline(self): if self.task == "ner" and self.aggregation_strategy: - args["aggregation_strategy"] = self.aggregation_strategy + return pipeline(self.task, model=self.model_name, aggregation_strategy=self.aggregation_strategy) - return pipeline(args) + return pipeline(self.task, model=self.model_name) def process_article(self, article_id: str): #Retrieve article by ID From 13627251fecd7aac8b479a07ea7f21282dd07344 Mon Sep 17 00:00:00 2001 From: Chloe Date: Mon, 11 Aug 2025 11:14:02 -0500 Subject: [PATCH 3/3] what's going on --- .docker/Dockerfile | 2 +- src/justinsight/celery.py | 10 +++++----- src/justinsight/tasks.py | 1 + src/nlp/base_core.py | 1 + src/nlp/ner_core.py | 1 + 5 files changed, 9 insertions(+), 6 deletions(-) diff --git a/.docker/Dockerfile b/.docker/Dockerfile index 6cf3bf0..cc5db62 100644 --- a/.docker/Dockerfile +++ b/.docker/Dockerfile @@ -13,7 +13,7 @@ RUN apt-get update && \ # Install requirements COPY requirements.txt /workspace/requirements.txt RUN pip install --no-cache-dir -r /workspace/requirements.txt -RUN pip install torch==2.1.2 torchvision --index-url https://download.pytorch.org/whl/cpu +RUN pip install torch==2.4.0 torchvision --index-url https://download.pytorch.org/whl/cpu RUN playwright install diff --git a/src/justinsight/celery.py b/src/justinsight/celery.py index a78de95..7378475 100644 --- a/src/justinsight/celery.py +++ b/src/justinsight/celery.py @@ -23,11 +23,11 @@ # "args": (), # }, - "check-BBCfeed-every-5-minutes": { - "task": "justinsight.tasks.bbcLogger_task", - "schedule": 5.0, - "args": (), - }, + # "check-BBCfeed-every-5-minutes": { + # "task": "justinsight.tasks.bbcLogger_task", + # "schedule": 5.0, + # "args": (), + # }, # "check-CBSfeed-every-5-minutes": { # "task": "justinsight.tasks.cbsLogger_task", diff --git a/src/justinsight/tasks.py b/src/justinsight/tasks.py index 141593c..2cf2975 100644 --- a/src/justinsight/tasks.py +++ b/src/justinsight/tasks.py @@ -90,6 +90,7 @@ def runNER_task(entry_id): @shared_task def ner_task(article_id): # Process article with NER results + print("In the NER task") core = NERCore() print("Actually trying to do NER!!!") core.process_article(article_id) diff --git a/src/nlp/base_core.py b/src/nlp/base_core.py index 0d7a63f..36be2f0 100644 --- a/src/nlp/base_core.py +++ b/src/nlp/base_core.py @@ -20,6 +20,7 @@ def load_pipeline(self): if self.task == "ner" and self.aggregation_strategy: return pipeline(self.task, model=self.model_name, aggregation_strategy=self.aggregation_strategy) + print(f"pipeline loading...") return pipeline(self.task, model=self.model_name) def process_article(self, article_id: str): diff --git a/src/nlp/ner_core.py b/src/nlp/ner_core.py index 0d38ff5..671d83f 100644 --- a/src/nlp/ner_core.py +++ b/src/nlp/ner_core.py @@ -4,6 +4,7 @@ class NERCore(BaseCore): def __init__(self): + print("constructing NER Core instance") super().__init__( task="ner", model_name="dslim/bert-base-NER",