LlamaIndex Integration¶
Use LlamaIndex for RAG (Retrieval-Augmented Generation) within LionAGI workflows.
Why LlamaIndex + LionAGI?¶
LlamaIndex: Best-in-class RAG capabilities, document indexing, retrieval LionAGI: Superior orchestration, parallel execution, multi-agent coordination
Together: Powerful RAG workflows with intelligent orchestration.
Basic Integration¶
Wrap LlamaIndex queries as custom operations:
from lionagi import Branch, Builder, Session
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
# Your existing LlamaIndex setup - no changes needed!
def setup_llamaindex():
documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(documents)
return index.as_query_engine()
query_engine = setup_llamaindex()
# Custom operation wrapping LlamaIndex
async def rag_query(branch: Branch, question: str, **kwargs):
"""RAG query using LlamaIndex - keeps your existing code"""
# Your LlamaIndex query logic unchanged
response = query_engine.query(question)
# Return in LionAGI format
return await branch.chat(f"Based on this retrieved information: {response}\n\nQuestion: {question}")
# Use in LionAGI workflow
session = Session()
builder = Builder("rag_workflow")
rag_op = builder.add_operation(
operation=rag_query,
question="What are the key findings in the research papers?"
)
result = await session.flow(builder.get_graph())
Multi-Source RAG¶
Orchestrate multiple knowledge sources in parallel:
from lionagi import Session, Builder, Branch
# Different RAG sources
def setup_financial_rag():
# Your financial documents RAG
return financial_query_engine
def setup_technical_rag():
# Your technical documents RAG
return technical_query_engine
# Custom operations for each source
async def financial_rag(branch: Branch, question: str, **kwargs):
query_engine = setup_financial_rag()
response = query_engine.query(question)
return await branch.chat(f"Financial data: {response}\nAnalyze: {question}")
async def technical_rag(branch: Branch, question: str, **kwargs):
query_engine = setup_technical_rag()
response = query_engine.query(question)
return await branch.chat(f"Technical data: {response}\nAnalyze: {question}")
# Orchestrate multiple RAG sources in parallel
session = Session()
builder = Builder("multi_rag")
question = "What are the technical and financial risks of AI adoption?"
# Parallel RAG queries
financial_op = builder.add_operation(
operation=financial_rag,
question=question
)
technical_op = builder.add_operation(
operation=technical_rag,
question=question
)
# Synthesize results
synthesis = builder.add_aggregation(
"communicate",
source_node_ids=[financial_op, technical_op],
instruction="Combine financial and technical analysis into comprehensive risk assessment"
)
result = await session.flow(builder.get_graph())
RAG + Analysis Pipeline¶
Chain RAG retrieval with specialized analysis:
from lionagi import Session, Builder, Branch, iModel
session = Session()
builder = Builder("rag_analysis_pipeline")
# Specialized analysis branches
retriever = Branch(system="Document retrieval specialist")
analyzer = Branch(
chat_model=iModel(provider="anthropic", model="claude-3-5-sonnet-20240620"),
system="Deep analysis specialist"
)
synthesizer = Branch(system="Report synthesis specialist")
session.include_branches([retriever, analyzer, synthesizer])
# Phase 1: RAG retrieval
retrieval_op = builder.add_operation(
operation=rag_query,
branch=retriever,
question="What are current market trends in AI?"
)
# Phase 2: Deep analysis (depends on retrieval)
analysis_op = builder.add_operation(
"communicate",
branch=analyzer,
instruction="Provide detailed analysis of the retrieved information",
depends_on=[retrieval_op]
)
# Phase 3: Final synthesis
synthesis_op = builder.add_operation(
"communicate",
branch=synthesizer,
instruction="Create executive summary of analysis",
depends_on=[analysis_op]
)
result = await session.flow(builder.get_graph())
Advanced: Multi-Modal RAG¶
Handle different document types with specialized processing:
# Different document processors
async def pdf_rag(branch: Branch, query: str, **kwargs):
# Your LlamaIndex PDF processing
pdf_engine = setup_pdf_rag()
response = pdf_engine.query(query)
return await branch.chat(f"PDF data: {response}\nQuery: {query}")
async def web_rag(branch: Branch, query: str, **kwargs):
# Your LlamaIndex web scraping
web_engine = setup_web_rag()
response = web_engine.query(query)
return await branch.chat(f"Web data: {response}\nQuery: {query}")
async def database_rag(branch: Branch, query: str, **kwargs):
# Your LlamaIndex database querying
db_engine = setup_database_rag()
response = db_engine.query(query)
return await branch.chat(f"Database data: {response}\nQuery: {query}")
# Orchestrate all sources
builder = Builder("multimodal_rag")
query = "What are the latest developments in AI regulations?"
# Parallel multi-modal retrieval
pdf_op = builder.add_operation(operation=pdf_rag, query=query)
web_op = builder.add_operation(operation=web_rag, query=query)
db_op = builder.add_operation(operation=database_rag, query=query)
# Cross-reference and validate
validator = Branch(system="Information validation specialist")
validation_op = builder.add_operation(
"communicate",
branch=validator,
instruction="Cross-reference all sources and validate information consistency",
depends_on=[pdf_op, web_op, db_op]
)
result = await session.flow(builder.get_graph())
Error Handling with RAG¶
Handle RAG failures gracefully:
async def resilient_rag(branch: Branch, question: str, **kwargs):
"""RAG with fallback strategies"""
# Try primary RAG source
try:
primary_engine = setup_primary_rag()
response = primary_engine.query(question)
return await branch.chat(f"Retrieved: {response}\nAnalyze: {question}")
except Exception as e:
print(f"Primary RAG failed: {e}")
# Fallback to secondary source
try:
fallback_engine = setup_fallback_rag()
response = fallback_engine.query(question)
return await branch.chat(f"Fallback data: {response}\nAnalyze: {question}")
except Exception as e2:
print(f"Fallback RAG failed: {e2}")
# Ultimate fallback: use LLM knowledge
return await branch.chat(f"Using model knowledge to answer: {question}")
# Use resilient RAG in workflow
builder.add_operation(operation=resilient_rag, question="Complex query here")
Performance Optimization¶
Optimize RAG workflows for production:
import asyncio
async def batch_rag_queries(branch: Branch, questions: list, **kwargs):
"""Process multiple RAG queries in parallel"""
async def single_rag_query(question):
response = query_engine.query(question)
return {"question": question, "response": str(response)}
# Parallel RAG queries within the operation
results = await asyncio.gather(*[
single_rag_query(q) for q in questions
])
# Synthesize all results
combined = "\n\n".join([f"Q: {r['question']}\nA: {r['response']}" for r in results])
return await branch.chat(f"Synthesize these RAG results:\n{combined}")
# Use in high-throughput scenarios
questions = ["Question 1", "Question 2", "Question 3"]
batch_op = builder.add_operation(
operation=batch_rag_queries,
questions=questions
)
Key Benefits¶
- Zero Migration: Keep your existing LlamaIndex code unchanged
- Superior Orchestration: LionAGI handles parallel execution, dependencies, error handling
- Multi-Source: Easily orchestrate multiple RAG sources
- Scalable: Built-in performance controls and monitoring
- Flexible: Mix RAG with other AI operations seamlessly
LlamaIndex provides the RAG capabilities, LionAGI provides the orchestration intelligence.