RAG Pipelines

---
title: RAG Pipeline
---
flowchart LR
Collector --> AssetURIList
Retriever --- AssetURIList
Retriever --> Assets
Retriever --> MetaData
Converter --- Assets
Converter --> MarkdownFiles
Categorizer --- MarkdownFiles
Categorizer --> MetaData
Tokenizer --- MetaData
Tokenizer --- MarkdownFiles

Tokenizer --> Tokens
Transformer --- Tokens
Transformer --> Vectors
Embeddor --- Vectors
Embeddor --> EmbeddingDB
Prompt --> ChatBot
Question --> ChatBot
Query --- EmbeddingDB
Query --> Context
Context --> ChatBot
ChatBot --> Answer

Collector

Gathers a list of asset URIs and stores in AssetuRIList.
This Class is special in so far it has no input method.
It gets triggered by schedule or sensor.

classDiagram
class Collector{
            +Dict config
            getAssetURLs() List~str~
            transform()
            saveAsset()
        }
class councilInsight~Collector~{
            String council_url          
            generateAssetURLs()
            process()
        }
        
class bibRag~Collector~{
			+String bin_url
			scrapeURIs()
			proocess()
		}
Collector --> councilInsight
Collector --> bibRag

AssetURIList

Could be imported as:

{
'retrieval': {
	'url': 'https://www.gemeinderat.heidelberg.de/getfile.asp?id=5793&type=do',
	'last_run': '2025-09-15 14:23.15',
	'status': 200
	}
'storage': {
	'pdf': {
		'path': 's3://bucket_name/folder1/folder2/file1.pdf',
		'created_at': '2025-08-10 15:09:26',
		'updated_at': '2025-09-23 16:18:08'
		},
	'md': {
		'path': 's3://bucket_name/folder1/folder2/file1.md',
		'created_at': '2025-08-10 15:09:26',
		'updated_at': '2025-09-23 16:18:08'
		}
	}
}

MetaData

Retriever

import requests
from path import AssetURIList

class Retriever:

	def __init__(self, config: Optional[Dict] = {}) -> None
		self.au = AssetURIList(config)
		...
		
	def get_assets(self):
		for asset_url in self.au.getAssetURLs()
			response = request.get(asset_url)
			if response.status = 404
				self.au.set_status(asset_url, response.status)
				continue
			self.store_asset(response.content)
	
	...
		

Evaluation

https://weaviate.io/blog/rag-evaluation

Vector DB

Qdrant

https://docs.dagster.io/integrations/libraries/qdrant

Faiss

no data storage

Milvus

oss

ChromaDB

small in memory

Weaviate

is an open-source vector database that enables you to store and manage vector embeddings at scale. You can start with a small dataset and scale up as your needs grow. This enables you to build powerful AI applications with semantic search and similarity matching capabilities. Weaviate offers fast query performance using vector-based search and GraphQL APIs, making it a powerful tool for AI-powered applications and machine learning workflows.
https://docs.dagster.io/integrations/libraries/weaviate

https://docs.weaviate.io/weaviate/recipes/multi-vector-colipali-rag

Pinecone

fully managed account required
https://docs.dagster.io/examples/rag

https://medium.com/@rohitupadhye799/comparing-chroma-db-weaviate-and-pinecone-which-vector-database-is-right-for-you-3b85b561b3a3

PostgreSQL (pgvector Extension)

multi purpose

Resources

https://arxiv.org/abs/2407.01449