From 0ae3128d431b3d41b956cc75942ed2e2756ca038 Mon Sep 17 00:00:00 2001 From: Chloe Date: Tue, 12 Aug 2025 11:59:40 -0500 Subject: [PATCH 1/2] updates --- src/justinsight/celery.py | 1 - src/nlp/ner_core.py | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/justinsight/celery.py b/src/justinsight/celery.py index ce71966..a34af65 100644 --- a/src/justinsight/celery.py +++ b/src/justinsight/celery.py @@ -30,7 +30,6 @@ "args": (), }, - # Checked and good # "check-CBSfeed-every-5-minutes": { # "task": "justinsight.tasks.cbsLogger_task", # "schedule": 5.0, diff --git a/src/nlp/ner_core.py b/src/nlp/ner_core.py index a01903f..85dd2e5 100644 --- a/src/nlp/ner_core.py +++ b/src/nlp/ner_core.py @@ -30,8 +30,10 @@ def process_article(self, article_id: str): # return # Run NER + print("about to run NER") entities = self.pipeline(full_text) # run_ner_hf(full_text) - + print("ran NER yay") + # Update article in DB self.addToEntryInDB(article_id, { "ner": entities, From cd8f93814b218214017a2c8bc6069bdabd4fef62 Mon Sep 17 00:00:00 2001 From: Chloe Date: Wed, 13 Aug 2025 12:38:40 -0500 Subject: [PATCH 2/2] new pipelines --- src/nlp/base_core.py | 7 +++-- src/nlp/cli.py | 2 +- src/nlp/ner_core.py | 4 +-- src/nlp/sent_core.py | 54 +++++++++++++++++++++++++++++++++++ src/nlp/summ_core.py | 58 ++++++++++++++++++++++++++++++++++++++ src/nlp/tests/test_core.py | 2 +- 6 files changed, 120 insertions(+), 7 deletions(-) create mode 100644 src/nlp/sent_core.py create mode 100644 src/nlp/summ_core.py diff --git a/src/nlp/base_core.py b/src/nlp/base_core.py index 36be2f0..5b42a85 100644 --- a/src/nlp/base_core.py +++ b/src/nlp/base_core.py @@ -17,11 +17,12 @@ def __init__(self, task: str, model_name: str, aggregation_strategy: str = None) self.pipeline = self.load_pipeline() def load_pipeline(self): + args = {"model": self.model_name} if self.task == "ner" and self.aggregation_strategy: - return pipeline(self.task, model=self.model_name, aggregation_strategy=self.aggregation_strategy) - + args["aggregation_strategy"] = self.aggregation_strategy + print(f"pipeline loading...") - return pipeline(self.task, model=self.model_name) + return pipeline(self.task, args) def process_article(self, article_id: str): #Retrieve article by ID diff --git a/src/nlp/cli.py b/src/nlp/cli.py index 4a030b1..303252c 100644 --- a/src/nlp/cli.py +++ b/src/nlp/cli.py @@ -1,5 +1,5 @@ import argparse -from nlp.core import process_article # adjust if your actual import path differs +from nlp.ner_core import process_article # adjust if your actual import path differs def run_cli(article_id: str): entities = process_article(article_id) diff --git a/src/nlp/ner_core.py b/src/nlp/ner_core.py index 85dd2e5..8d2771b 100644 --- a/src/nlp/ner_core.py +++ b/src/nlp/ner_core.py @@ -19,7 +19,7 @@ def process_article(self, article_id: str): print(f"No article found with ID: {article_id}") return [] - if article.get("processed") is True: + if article.get("ner_processed") is True: print(f"Article {article_id} already processed.") return [] @@ -37,7 +37,7 @@ def process_article(self, article_id: str): # Update article in DB self.addToEntryInDB(article_id, { "ner": entities, - "processed": True + "ner_processed": True }) return entities diff --git a/src/nlp/sent_core.py b/src/nlp/sent_core.py new file mode 100644 index 0000000..4a77b9f --- /dev/null +++ b/src/nlp/sent_core.py @@ -0,0 +1,54 @@ +from nlp.base_core import BaseCore +from transformers import pipeline +from bson import ObjectId + +class SentCore(BaseCore): + def __init__(self): + super().__init__( + task="sentiment-analysis", + model_name="distilbert-base-uncased-finetuned-sst-2-english" + ) + + def process_article(self, article_id: str): + #Retrieve article by ID + article = self.collection.find_one({"_id": ObjectId(article_id)}) + + if not article: + print(f"No article found with ID: {article_id}") + return [] + + if article.get("sentiment_processed") is True: + print(f"Article {article_id} already processed.") + return [] + + # We have a check running so only articles with full text are saved + full_text = article.get("full_text", "") + # if not full_text: + # print(f"Article {article_id} has no full text.") + # return + + # Run Sentiment Analysis + print("about to run Sentiment Analysis") + results = self.pipeline(full_text) + print("ran Sentiment Analysis yay") + + # Update article in DB + self.addToEntryInDB(article_id, { + "sentiment analysis": results, + "sentiment_processed": True + }) + + return results + + def addToEntryInDB(self, entry_id, updates): + print("Adding Sentiment Analysis results to database\r\r\r") + + for res in updates.get("sentiment", []): + if "score" in res: + res["score"] = float(res["score"]) # convert np.float32 to Python float + + id = ObjectId(entry_id) + self.collection.update_one( + {"_id": id}, + {"$set": updates} + ) \ No newline at end of file diff --git a/src/nlp/summ_core.py b/src/nlp/summ_core.py new file mode 100644 index 0000000..236c23e --- /dev/null +++ b/src/nlp/summ_core.py @@ -0,0 +1,58 @@ +from nlp.base_core import BaseCore +from transformers import pipeline +from bson import ObjectId + +class SummCore(BaseCore): + def __init__(self): + super().__init__( + task="summarization", + model_name="facebook/bart-large-cnn" + ) + + def process_article(self, article_id: str): + #Retrieve article by ID + article = self.collection.find_one({"_id": ObjectId(article_id)}) + + if not article: + print(f"No article found with ID: {article_id}") + return [] + + if article.get("summary_processed") is True: + print(f"Article {article_id} already processed.") + return [] + + # We have a check running so only articles with full text are saved + full_text = article.get("full_text", "") + # if not full_text: + # print(f"Article {article_id} has no full text.") + # return + + # Run Summarization + print("about to run Summarization") + summary = self.pipeline( + full_text, + max_length=150, + min_length=30, + do_sample=False) # run_ner_hf(full_text) + print("ran Summarization yay") + + # Update article in DB + self.addToEntryInDB(article_id, { + "hf summary": summary, + "summary_processed": True + }) + + return summary + + def addToEntryInDB(self, entry_id, updates): + print("Adding Summarization results to database\r\r\r") + + if "hf summary" in updates: + for ent in updates["summarization"]: + ent["score"] = float(ent["score"]) # convert np.float32 to Python float + + id = ObjectId(entry_id) + self.collection.update_one( + {"_id": id}, + {"$set": updates} + ) \ No newline at end of file diff --git a/src/nlp/tests/test_core.py b/src/nlp/tests/test_core.py index 147835a..41bdaf5 100644 --- a/src/nlp/tests/test_core.py +++ b/src/nlp/tests/test_core.py @@ -1,4 +1,4 @@ -from nlp.core import run_ner_hf +from nlp.ner_core import run_ner_hf import warnings warnings.filterwarnings("ignore", category=UserWarning)