Back to Projects
Custom RAG Pipeline System
Provider-agnostic agentic RAG framework built for the BlueCallom platform. An LLM agent receives a query, builds an execution plan, then loops through tools — semantic search, SQL queries, dependency checks, pricing lookups — until it assembles a complete answer. Multi-tenant by design.
PythonDjangoOpenAI Responses APIWeaviateSQLAlchemyPydanticDockerPytest
A flexible, provider-agnostic Retrieval-Augmented Generation (RAG) pipeline framework built with Django. It enables building agentic workflows that combine LLMs with tools for database queries, vector search, and external APIs — with full per-request isolation and automatic resource management.
Features
- Dynamic LLM provider selection — per-request model and provider switching (OpenAI supported, extensible to Anthropic, etc.)
- Agentic tool-calling loop — autonomous multi-step reasoning with configurable iteration limits (up to 100)
- Dual data layer — SQL (SQLAlchemy) for structured queries + Weaviate for semantic vector search
- Progressive answer building — incremental result construction to prevent information loss in long chains
- Token tracking — accumulated usage metrics across all iterations for cost monitoring
- Context manager pipelines — per-request resource isolation with guaranteed cleanup
- Pipeline registry — config-driven pipeline discovery and instantiation via pipelines.json
- Reasoning effort control — support for OpenAI o1/o3 reasoning effort levels
Architecture
Request → Django View → Registry → Pipeline (Context Manager)
│
Provider Factory → LLM Provider
│
Agent Executor ⇄ Tools (SQL, Vector DB, APIs)
│
Progressive Answer Builder → ResponseCore Components
- core/llm_provider.py — Abstract LLM provider interface
- core/openai_provider.py — OpenAI Responses API implementation
- core/provider_factory.py — Factory for dynamic provider creation
- core/agent_executor.py — Agentic loop with tool dispatch
- core/base_pipeline.py — Abstract pipeline with context manager pattern
- core/base_tool.py — Abstract tool base class
- connectors/database.py — SQLAlchemy connector with connection pooling
- connectors/weaviate_connector.py — Weaviate vector DB connector
Usage
API Request
curl -X POST http://localhost:8000/custom_rag/execute/ \
-H "Content-Type: application/json" \
-d '{
"function_id": "company_1",
"llm_provider": "openai",
"llm": "gpt-4o",
"reasoning_effort": "medium",
"prompt_objects": {
"query": "Find SAP products for mid-market customers with Swiss data residency"
}
}'Response
{
"success": true,
"data": {
"output": { ... },
"iterations": 12,
"tools_used": ["semantic_search", "get_product_details", "get_pricing", "create_final_answer"]
},
"usage": {
"input_tokens": 4200,
"output_tokens": 1800,
"total_tokens": 6000
}
}Request Parameters
- function_id (string, required) — Pipeline identifier (e.g., "company_1")
- llm_provider (string, required) — Provider name ("openai")
- llm (string, required) — Model name ("gpt-4o", "o3-mini", etc.)
- reasoning_effort (string, optional) — For reasoning models: "low", "medium", "high"
- prompt_objects (object, required) — Input data (e.g., {"query": "..."})
Extending
Adding a New Pipeline
- Create a directory under custom_rag/pipelines/your_pipeline/
- Implement a pipeline class extending BasePipeline
- Create tools extending BaseTool
- Write system/planning prompts
- Register in custom_rag/pipelines/pipelines.json
Adding a New LLM Provider
- Create a provider class extending LLMProvider
- Implement execute_with_tools()
- Register in ProviderFactory
Testing
pytest # Run all tests (38 tests)
pytest tests/test_company_1_pipeline.py -v # Run specific test file
pytest --cov=custom_rag --cov-report=html # Coverage reportAll tests use mocks — no real API calls or database connections required.
Quick Start
git clone <repository-url>
cd custom-rag
python -m venv venv
source venv/bin/activate
pip install -r requirements-dev.txt
cp .env.example .env
# Edit .env with your credentials
python manage.py migrate
python manage.py runserverThe API is available at http://localhost:8000/custom_rag/execute/.