Martin Gawron | mgawr2@uic.edu
Project Overview
Program Design and Architecture
Pipeline Overview
Models (Ollama)
Flink Streaming Jobs
Component Deployment Decisions
Steps to Run Locally
Steps to Run on AWS EMKS
Query Endpoints
Notes
This project implements a scalable GraphRAG (Retrieval-Augmented Generation) pipeline using Apache Flink for stream processing and Neo4j as the knowledge graph backend.
Unlike traditional RAG which relies solely on vector similarity, this pipeline constructs a structured knowledge graph by extracting entities and relationships from documents in real-time. It utilizes Ollama models for entity extraction and embedding generation, deployed on AWS EKS to handle distributed stream processing. The system supports atomic upserts to Neo4j, ensuring versioned consistency for concept and chunk nodes.
The workflow transforms raw document streams into a queryable Knowledge Graph. Desired workflow: Doc Stream → Flink Processing (Extraction/Fusion) → Neo4j Storage → Agentic Search (Akka HTTP)
-
Ingestion Source A stream of document chunks (simulated via file monitoring or Kafka) is ingested into the Flink cluster.
-
Stream Processing (Flink) The Flink job processes chunks in parallel:
-
entity extraction: utilizes
llama3:instruct(via ollama) to extract entities and semantic relationships from text. -
relation fusion: fuses candidate relations and resolves entity duplicates using a rules-based pass.
-
Graph Construction: Formats data into Nodes (Concepts, Chunks) and Edges (RELATED_TO, MENTIONS, CO_OCCURS).
-
Graph Storage (Neo4j) The processed graph elements are atomically upserted into Neo4j. Constraints ensures uniqueness, and versioning is applied to track changes in the corpus over time.
-
Querying (Akka HTTP Microservice) A REST API acts as the interface for the Agentic Search:
-
Performs dependency queries (finding paths between concepts).
-
Detects logical inconsistencies in the graph.
-
Retrieves context for the LLM to generate final answers.
- llama3:instruct
- The reason we use the instruct model is because we can adjust the temperatrue paremeters for the model to adhere more closely to the given instructions. For example, specific reasoning criteria as well as strict output formatting in order for the pipeline to work means that rule following is a critical necessity.
- Input: Directory (HDFS/S3)
- Logic: Reads input documents, splits them into manageable token windows, and hashes the content to create easily identify chunks for graph traversal later.
- Input: Text Chunks (data stream)
- Logic: This operator makes use of both NLP and LLM. It uses NLP to extract with high recall entities such as person, organization, and location. Meanwhile, the LLM is tasked with extracting complex concepts from the text.
- Input: Mentions (data stream)
- Logic: Given the stream of mentions (concepts and entities), this job finds the mentions that co-occur with each other and makes note of it to tie together closely occuring concepts and entities in the knowledge graph.
- Input: Co-Occurrences (data stream)
- Logic: Given the stream of co-occurrences, grab the ones that are very closely related and mark them as relationship candidates. These candidates will be evaluated by the LLM to see whether they form a good relationship or not, evidence provided.
- Input: Graph Objects (Nodes/Edges)
- Logic: Chunks, entities, co-occurences, mentions, and relationships make up the nodes and edges of the knowledge graph. This job upserts these in the correct format to AuraDB (Neo4j database) to create the graph.
-
Why EKS? Graph construction is resource-intensive. EKS allows us to orchestrate a Flink cluster (JobManager + multiple TaskManagers) alongside the Neo4j database and Ollama inferencing services.
-
Scaling: K8s allows us to scale TaskManagers horizontally based on the lag in the ingestion stream.
- Flexible Accessibility: AuraDB was chosen to make the transition from local to cloud code to be smoother as both can rely on the same endpoints and environment variables. Simplifying graph upsertion and retrieval.
- Isolation: The query layer is deployed as a separate stateless deployment exposed via a LoadBalancer service, decoupling the read-path (User Queries) from the write-path (Flink Indexing).
git clone git@github.com:martingaw11/CS441-CloudComputing-UIC.git
cd CS441-CloudComputing-UICollama pull llama3:instructCreate an account with Neo4j and activate AuraDB
https://neo4j.com/product/auradb/
In your AWS account, setup a directory in your s3 bucket with the corpus of PDF's.
Inside of Intellij is easiest
reload
clean
core/assembly # API
ingestion/assembly # Flink Pipeline Format the .env file like this and fill it in with account information.
For the base url, match it with your WSL IP address.
For the model, you can choose to use whatever model you like. (Just make sure to adjust the Dockerfile.ollama file to pull this model)
NEO4J_URI=...
NEO4J_USERNAME=...
NEO4J_PASSWORD=...
AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
AWS_REGION=...
OLLAMA_BASE_URL="http://172.xx.xx.xx:11434/api"
OLLAMA_MODEL="phi"
docker-compose up --buildWait for the knowledge graph to be built and then run queries!
Create repositories for your custom images (Flink, API, Ollama)
aws ecr create-repository --repository-name graphrag-ingestion
aws ecr create-repository --repository-name graphrag-api
aws ecr create-repository --repository-name graphrag-ollamaUse sbt to build the JARs that Docker will use
core/assembly
ingestion/assembly
Connect Docker to AWS and build/push images
# Authenticate
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <aws_account_id>.dkr.ecr.<region>.amazonaws.com
# Build & Push for each
docker build -f Dockerfile.api -t <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphra
g-ingestion:v1 .
docker push <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphrag-ingestion:v1
docker build -f Dockerfile.api -t <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphra
g-api:v1 .
docker push <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphrag-api:v1
docker build -f Dockerfile.api -t <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphra
g-ollama:v1 .
docker push <aws_account_id>.dkr.ecr.<region>.amazonaws.com/graphrag-ollama:v1# 1. Install kubectl
curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
chmod +x kubectl
sudo mv kubectl /usr/local/bin/
# 2. Install eksctl
curl --silent --location "https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp
sudo mv /tmp/eksctl /usr/local/bin/Create a dedicated flink-dev IAM User with Admin Accesss...
aws configure # Configure Terminal for IdentityUsing the eks-cluster-config.yaml file, replace <> with your information and run:
eksctl create cluster -f eks-cluster-config.yamlThis will take a while...
- Go to IAM Roles in AWS Console
- Find a beginning with eksctl
- It should look something like
eksctl-graphrag-cluster-nodegroup-NodeInstanceRole-XXXXX
- It should look something like
- Add permissions via Attach policy for this role
- Give it the
AmazonS3FullAccesspolicy
- Give it the
Install helm
curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3
chmod 700 get_helm.sh
./get_helm.sh
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/Connect kubectl to new cluster and install flink operator
aws eks update-kubeconfig --region us-east-2 --name graphrag-cluster
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--set webhook.create=false \
--namespace defaultkubectl delete rolebinding flink-role-binding
kubectl apply -f rbac.yamlReplace the values with values from the .env file from before...
kubectl create secret generic graphrag-secrets \
--from-literal=AWS_ACCESS_KEY_ID=... \
--from-literal=AWS_SECRET_ACCESS_KEY="..." \
--from-literal=AWS_REGION=... \
--from-literal=NEO4J_URI=... \
--from-literal=NEO4J_USER=neo4j \
--from-literal=NEO4J_PASSWORD=...kubectl apply -f flink-deployment.yaml
kubectl apply -f api-deployment.yamlOptional: Watch with these various commands
# Watch pods and their status
kubectl get pods --watch
# Get pods and their names
kubectl get pods
# Get the logs of a pod and container
kubectl logs -f <pod> -c <container>
# ex.
kubectl logs -f graphrag-cluster-taskmanager-1-1 -c flink-main-container
# Port forward cluster to view Flink Dashboard
kubectl port-forward svc/graphrag-cluster-rest 8081:8081
# Get public facing IP of API
kubectl get svc graphrag-api-service
# Use the External IP for Requests: ex.
curl -X POST "http://<external-ip>/v1/query" \
-H "Content-Type: application/json" \
-d '{"query": "Why is CVS important for MSR?"}'After all is done and you are finished, you can go ahead and do these commands to delete everything to not accrue costs.
# Delete deployments
kubectl delete -f flink-deployment.yaml
kubectl delete -f api-deployment.yaml
# Destroy eks clutser
eksctl delete cluster --name graphrag-cluster --region us-east-2The primary entry point. The service canonicalizes requests into graph patterns and returns an LLM response answering the question based on the "knowledge" in the graph.
{
"query": "What are the main benefits of CVS?",
"output": {
"groupBy": [],
"metrics": [],
"topKPerGroup": 5,
"includeCitations": true
}
}Returns the state of a job (RUNNING, SUCCEEDED, FAILED)
Return the final answer and citations once the job state is SUCCEEDED
Every query response will return citations from the graph, these citation are denoted as evidence and this endpoint allows you to further explore citation evidence.
Lightweight neighborhood lookup to see what is connected to a concept and by what edges.
- Parameters:
depth: 1..3 (Traversal depth)direction:in|out|bothedgeTypes: Filter specific relationships (RELATES_TO,MENTIONS,CO_OCCURS)
Returns the execution plan, the Cypher queries generated, and the LLM prompt versions used. This gives you insight into how an answer was derived.
-
Ollama Timeout: When running on CPU nodes in EKS, LLM inference may time out. Adjust flink.timeout in application.conf accordingly.
-
Neo4j Constraints: Ensure unique constraints are created on Concept(name) before running the Flink job to prevent duplicate nodes.
-
Async I/O: The Flink job uses Async I/O for Ollama calls. If the "Ordered" mode is used, a slow LLM response will backpressure the entire stream. "Unordered" is recommended for higher throughput.