Skip to content

martingaw11/GraphRAG

Repository files navigation

CS441: Agentic GraphRAG Pipeline w/ Flink & Neo4j

Martin Gawron | mgawr2@uic.edu

Video Demo Link


Quick Links:

Project Overview
Program Design and Architecture
   Pipeline Overview
   Models (Ollama)
   Flink Streaming Jobs
   Component Deployment Decisions
Steps to Run Locally
Steps to Run on AWS EMKS
Query Endpoints
Notes


Project Overview

This project implements a scalable GraphRAG (Retrieval-Augmented Generation) pipeline using Apache Flink for stream processing and Neo4j as the knowledge graph backend.

Unlike traditional RAG which relies solely on vector similarity, this pipeline constructs a structured knowledge graph by extracting entities and relationships from documents in real-time. It utilizes Ollama models for entity extraction and embedding generation, deployed on AWS EKS to handle distributed stream processing. The system supports atomic upserts to Neo4j, ensuring versioned consistency for concept and chunk nodes.


Programming Design and Architecture

Pipeline Overview:

The workflow transforms raw document streams into a queryable Knowledge Graph. Desired workflow: Doc Stream → Flink Processing (Extraction/Fusion) → Neo4j Storage → Agentic Search (Akka HTTP)

  1. Ingestion Source A stream of document chunks (simulated via file monitoring or Kafka) is ingested into the Flink cluster.

  2. Stream Processing (Flink) The Flink job processes chunks in parallel:

  • entity extraction: utilizes llama3:instruct (via ollama) to extract entities and semantic relationships from text.

  • relation fusion: fuses candidate relations and resolves entity duplicates using a rules-based pass.

  • Graph Construction: Formats data into Nodes (Concepts, Chunks) and Edges (RELATED_TO, MENTIONS, CO_OCCURS).

  1. Graph Storage (Neo4j) The processed graph elements are atomically upserted into Neo4j. Constraints ensures uniqueness, and versioning is applied to track changes in the corpus over time.

  2. Querying (Akka HTTP Microservice) A REST API acts as the interface for the Agentic Search:

  • Performs dependency queries (finding paths between concepts).

  • Detects logical inconsistencies in the graph.

  • Retrieves context for the LLM to generate final answers.

Models (Ollama)

  • llama3:instruct
    • The reason we use the instruct model is because we can adjust the temperatrue paremeters for the model to adhere more closely to the given instructions. For example, specific reasoning criteria as well as strict output formatting in order for the pipeline to work means that rule following is a critical necessity.

Flink Streaming Jobs

Chunk Ingestion

  • Input: Directory (HDFS/S3)
  • Logic: Reads input documents, splits them into manageable token windows, and hashes the content to create easily identify chunks for graph traversal later.

Entity Extraction

  • Input: Text Chunks (data stream)
  • Logic: This operator makes use of both NLP and LLM. It uses NLP to extract with high recall entities such as person, organization, and location. Meanwhile, the LLM is tasked with extracting complex concepts from the text.

Co-Occurrence

  • Input: Mentions (data stream)
  • Logic: Given the stream of mentions (concepts and entities), this job finds the mentions that co-occur with each other and makes note of it to tie together closely occuring concepts and entities in the knowledge graph.

Relationships

  • Input: Co-Occurrences (data stream)
  • Logic: Given the stream of co-occurrences, grab the ones that are very closely related and mark them as relationship candidates. These candidates will be evaluated by the LLM to see whether they form a good relationship or not, evidence provided.

Neo4j Upsert

  • Input: Graph Objects (Nodes/Edges)
  • Logic: Chunks, entities, co-occurences, mentions, and relationships make up the nodes and edges of the knowledge graph. This job upserts these in the correct format to AuraDB (Neo4j database) to create the graph.

Component Deployment Decisions

Stream Processing (AWS EKS / Flink)

  • Why EKS? Graph construction is resource-intensive. EKS allows us to orchestrate a Flink cluster (JobManager + multiple TaskManagers) alongside the Neo4j database and Ollama inferencing services.

  • Scaling: K8s allows us to scale TaskManagers horizontally based on the lag in the ingestion stream.

Graph Storage (Neo4j through AuraDB)

  • Flexible Accessibility: AuraDB was chosen to make the transition from local to cloud code to be smoother as both can rely on the same endpoints and environment variables. Simplifying graph upsertion and retrieval.

Microservices (Akka HTTP)

  • Isolation: The query layer is deployed as a separate stateless deployment exposed via a LoadBalancer service, decoupling the read-path (User Queries) from the write-path (Flink Indexing).

Steps to Run Locally

1. Clone the Repository

git clone git@github.com:martingaw11/CS441-CloudComputing-UIC.git
cd CS441-CloudComputing-UIC

2. Pull the Llama3:instruct model

ollama pull llama3:instruct

3. Neo4j Setup

Create an account with Neo4j and activate AuraDB
https://neo4j.com/product/auradb/

4. S3 Setup (Store Corpus here for container access)

In your AWS account, setup a directory in your s3 bucket with the corpus of PDF's.

5. Build the JAR files for API and Flink Job

Inside of Intellij is easiest

reload
clean
core/assembly       # API
ingestion/assembly  # Flink Pipeline 

6. Create .env File in Root

Format the .env file like this and fill it in with account information.
For the base url, match it with your WSL IP address.
For the model, you can choose to use whatever model you like. (Just make sure to adjust the Dockerfile.ollama file to pull this model)

NEO4J_URI=...
NEO4J_USERNAME=...
NEO4J_PASSWORD=...

AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
AWS_REGION=...

OLLAMA_BASE_URL="http://172.xx.xx.xx:11434/api"
OLLAMA_MODEL="phi"

7. Run the Container!

docker-compose up --build

8. Next Steps...

Wait for the knowledge graph to be built and then run queries!


Steps to Run on AWS EKS

1. ECR Repository Setup (CLI)

Create repositories for your custom images (Flink, API, Ollama)

aws ecr create-repository --repository-name graphrag-ingestion
aws ecr create-repository --repository-name graphrag-api
aws ecr create-repository --repository-name graphrag-ollama

2. Build and Push Images

Use sbt to build the JARs that Docker will use

core/assembly
ingestion/assembly

Connect Docker to AWS and build/push images

# Authenticate
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <aws_account_id>.dkr.ecr.<region>.amazonaws.com

# Build & Push for each
docker build -f Dockerfile.api -t <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphra
g-ingestion:v1 .
docker push <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphrag-ingestion:v1

docker build -f Dockerfile.api -t <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphra
g-api:v1 .
docker push <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphrag-api:v1

docker build -f Dockerfile.api -t <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphra
g-ollama:v1 .
docker push <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphrag-ollama:v1

3. Download kubectl and eksctl For EKS management

# 1. Install kubectl
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x kubectl
sudo mv kubectl /usr/local/bin/

# 2. Install eksctl
curl --silent --location "https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp
sudo mv /tmp/eksctl /usr/local/bin/

Create a dedicated flink-dev IAM User with Admin Accesss...

aws configure # Configure Terminal for Identity

Using the eks-cluster-config.yaml file, replace <> with your information and run:

eksctl create cluster -f eks-cluster-config.yaml

This will take a while...

4. Grant S3 Access Permision

  • Go to IAM Roles in AWS Console
  • Find a beginning with eksctl
    • It should look something like eksctl-graphrag-cluster-nodegroup-NodeInstanceRole-XXXXX
  • Add permissions via Attach policy for this role
    • Give it the AmazonS3FullAccess policy

5. Connect and Set Up Software Environment

Install helm

curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3

chmod 700 get_helm.sh

./get_helm.sh

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/

Connect kubectl to new cluster and install flink operator

aws eks update-kubeconfig --region us-east-2 --name graphrag-cluster

helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
  --set webhook.create=false \
  --namespace default

6. Apply Permissions (RBAC)

kubectl delete rolebinding flink-role-binding
kubectl apply -f rbac.yaml

7. Create Secrets

Replace the values with values from the .env file from before...

kubectl create secret generic graphrag-secrets \
  --from-literal=AWS_ACCESS_KEY_ID=... \
  --from-literal=AWS_SECRET_ACCESS_KEY="..." \
  --from-literal=AWS_REGION=... \
  --from-literal=NEO4J_URI=... \
  --from-literal=NEO4J_USER=neo4j \
  --from-literal=NEO4J_PASSWORD=...

8. Deploy

kubectl apply -f flink-deployment.yaml
kubectl apply -f api-deployment.yaml

Optional: Watch with these various commands

# Watch pods and their status
kubectl get pods --watch

# Get pods and their names
kubectl get pods

# Get the logs of a pod and container
kubectl logs -f <pod> -c <container>
# ex.
kubectl logs -f graphrag-cluster-taskmanager-1-1 -c flink-main-container

# Port forward cluster to view Flink Dashboard
kubectl port-forward svc/graphrag-cluster-rest 8081:8081

# Get public facing IP of API
kubectl get svc graphrag-api-service

# Use the External IP for Requests: ex.
curl -X POST "http://<external-ip>/v1/query" \
     -H "Content-Type: application/json" \
     -d '{"query": "Why is CVS important for MSR?"}'

9. Destroy

After all is done and you are finished, you can go ahead and do these commands to delete everything to not accrue costs.

# Delete deployments
kubectl delete -f flink-deployment.yaml
kubectl delete -f api-deployment.yaml

# Destroy eks clutser
eksctl delete cluster --name graphrag-cluster --region us-east-2

Query Endpoints

POST /v1/query

The primary entry point. The service canonicalizes requests into graph patterns and returns an LLM response answering the question based on the "knowledge" in the graph.

Request Payload

{
  "query": "What are the main benefits of CVS?",
  "output": {
    "groupBy": [],
    "metrics": [],
    "topKPerGroup": 5,
    "includeCitations": true
  }
}

GET /v1/jobs/{jobId}

Returns the state of a job (RUNNING, SUCCEEDED, FAILED)

GET /v1/jobs/{jobId}/result

Return the final answer and citations once the job state is SUCCEEDED

GET /v1/evidence/{evidenceId}

Every query response will return citations from the graph, these citation are denoted as evidence and this endpoint allows you to further explore citation evidence.

GET /v1/graph/concept/{conceptId}/neighbors

Lightweight neighborhood lookup to see what is connected to a concept and by what edges.

  • Parameters:
    • depth: 1..3 (Traversal depth)
    • direction: in | out | both
    • edgeTypes: Filter specific relationships (RELATES_TO, MENTIONS, CO_OCCURS)

GET /v1/explain/trace/{requestId}

Returns the execution plan, the Cypher queries generated, and the LLM prompt versions used. This gives you insight into how an answer was derived.


Notes

  • Ollama Timeout: When running on CPU nodes in EKS, LLM inference may time out. Adjust flink.timeout in application.conf accordingly.

  • Neo4j Constraints: Ensure unique constraints are created on Concept(name) before running the Flink job to prevent duplicate nodes.

  • Async I/O: The Flink job uses Async I/O for Ollama calls. If the "Ordered" mode is used, a slow LLM response will backpressure the entire stream. "Unordered" is recommended for higher throughput.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •  

Languages