DSPy Integration¶
Use DSPy for prompt optimization within LionAGI workflows.
Why DSPy + LionAGI?¶
DSPy: Best-in-class prompt optimization, automatic compilation, few-shot learning LionAGI: Superior orchestration, parallel execution, multi-agent coordination
Together: Optimized prompts with intelligent orchestration.
Basic Integration¶
Use your existing DSPy signatures as custom operations:
from lionagi import Branch, Builder, Session
import dspy
# Your existing DSPy setup - no changes needed!
class AnalyzeMarket(dspy.Signature):
"""Analyze market conditions and provide insights."""
market_data = dspy.InputField(desc="Raw market data")
analysis = dspy.OutputField(desc="Market analysis with key insights")
class AssessRisk(dspy.Signature):
"""Assess risks based on market analysis."""
analysis = dspy.InputField(desc="Market analysis")
risk_assessment = dspy.OutputField(desc="Risk assessment with recommendations")
# Your DSPy modules - unchanged!
market_analyzer = dspy.ChainOfThought(AnalyzeMarket)
risk_assessor = dspy.ChainOfThought(AssessRisk)
# Wrap DSPy modules as LionAGI custom operations
async def dspy_market_analysis(branch: Branch, market_data: str, **kwargs):
"""Custom operation using optimized DSPy prompt"""
# Your existing DSPy logic - no changes
result = market_analyzer(market_data=market_data)
# Return to LionAGI workflow
return result.analysis
async def dspy_risk_assessment(branch: Branch, analysis: str, **kwargs):
"""Custom operation using optimized DSPy prompt"""
# Your existing DSPy logic - no changes
result = risk_assessor(analysis=analysis)
# Return to LionAGI workflow
return result.risk_assessment
# Orchestrate DSPy modules with LionAGI
session = Session()
builder = Builder("dspy_workflow")
# Phase 1: DSPy market analysis
analysis_op = builder.add_operation(
operation=dspy_market_analysis,
market_data="Q3 earnings data, market trends, competitor analysis..."
)
# Phase 2: DSPy risk assessment (depends on analysis)
risk_op = builder.add_operation(
operation=dspy_risk_assessment,
analysis="{analysis_op}", # Use result from previous operation
depends_on=[analysis_op]
)
result = await session.flow(builder.get_graph())
Multi-Signature Workflow¶
Orchestrate multiple DSPy signatures in parallel:
import dspy
from lionagi import Session, Builder
# Your existing DSPy signatures - unchanged!
class TechnicalAnalysis(dspy.Signature):
data = dspy.InputField()
technical_insights = dspy.OutputField()
class FundamentalAnalysis(dspy.Signature):
data = dspy.InputField()
fundamental_insights = dspy.OutputField()
class SentimentAnalysis(dspy.Signature):
data = dspy.InputField()
sentiment_insights = dspy.OutputField()
# Your optimized DSPy modules
tech_analyzer = dspy.ChainOfThought(TechnicalAnalysis)
fundamental_analyzer = dspy.ChainOfThought(FundamentalAnalysis)
sentiment_analyzer = dspy.ChainOfThought(SentimentAnalysis)
# Wrap each as custom operations
async def tech_analysis_op(branch: Branch, data: str, **kwargs):
result = tech_analyzer(data=data)
return result.technical_insights
async def fundamental_analysis_op(branch: Branch, data: str, **kwargs):
result = fundamental_analyzer(data=data)
return result.fundamental_insights
async def sentiment_analysis_op(branch: Branch, data: str, **kwargs):
result = sentiment_analyzer(data=data)
return result.sentiment_insights
# Orchestrate all analyses in parallel
session = Session()
builder = Builder("multi_analysis")
market_data = "AAPL Q3 earnings, market conditions, news sentiment..."
# Parallel execution of optimized DSPy modules
tech_op = builder.add_operation(operation=tech_analysis_op, data=market_data)
fundamental_op = builder.add_operation(operation=fundamental_analysis_op, data=market_data)
sentiment_op = builder.add_operation(operation=sentiment_analysis_op, data=market_data)
# Synthesize all optimized insights
synthesis_op = builder.add_aggregation(
"communicate",
source_node_ids=[tech_op, fundamental_op, sentiment_op],
instruction="Combine technical, fundamental, and sentiment analysis into investment recommendation"
)
result = await session.flow(builder.get_graph())
DSPy Optimization in LionAGI¶
Show how to optimize DSPy modules within LionAGI workflows:
import dspy
from dspy.datasets import HotPotQA
from lionagi import Session, Builder
# Your DSPy module to optimize
class GenerateAnswer(dspy.Signature):
"""Answer questions based on context."""
context = dspy.InputField(desc="Background context")
question = dspy.InputField(desc="Question to answer")
answer = dspy.OutputField(desc="Comprehensive answer")
class RAGPipeline(dspy.Module):
def __init__(self, num_passages=3):
super().__init__()
self.retrieve = dspy.Retrieve(k=num_passages)
self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
def forward(self, question):
context = self.retrieve(question).passages
prediction = self.generate_answer(context=context, question=question)
return dspy.Prediction(context=context, answer=prediction.answer)
# Optimization within LionAGI workflow
async def optimize_dspy_pipeline(branch: Branch, **kwargs):
"""Custom operation that optimizes DSPy pipeline"""
# Your existing DSPy optimization - unchanged!
trainset = [{'question': q, 'answer': a} for q, a in training_data]
# Compile/optimize the pipeline
teleprompter = dspy.BootstrapFewShot(metric=answer_correctness)
compiled_rag = teleprompter.compile(RAGPipeline(), trainset=trainset)
# Save optimized pipeline
compiled_rag.save("optimized_rag.json")
return "Pipeline optimization complete"
async def use_optimized_pipeline(branch: Branch, question: str, **kwargs):
"""Custom operation using optimized DSPy pipeline"""
# Load your optimized pipeline
optimized_rag = RAGPipeline()
optimized_rag.load("optimized_rag.json")
# Use optimized pipeline
result = optimized_rag(question)
return result.answer
# Orchestrate optimization and usage
session = Session()
builder = Builder("dspy_optimization")
# Phase 1: Optimize DSPy pipeline
optimize_op = builder.add_operation(
operation=optimize_dspy_pipeline
)
# Phase 2: Use optimized pipeline (depends on optimization)
questions = ["What is quantum computing?", "How does AI work?", "Explain blockchain"]
for i, question in enumerate(questions):
builder.add_operation(
operation=use_optimized_pipeline,
question=question,
depends_on=[optimize_op]
)
result = await session.flow(builder.get_graph())
Hybrid DSPy + LionAGI Agents¶
Combine DSPy optimization with LionAGI's multi-agent capabilities:
# DSPy signatures for different agent types
class ResearchSignature(dspy.Signature):
topic = dspy.InputField()
research_findings = dspy.OutputField()
class AnalysisSignature(dspy.Signature):
findings = dspy.InputField()
analysis = dspy.OutputField()
class CritiqueSignature(dspy.Signature):
analysis = dspy.InputField()
critique = dspy.OutputField()
# Optimized DSPy modules
research_module = dspy.ChainOfThought(ResearchSignature)
analysis_module = dspy.ChainOfThought(AnalysisSignature)
critique_module = dspy.ChainOfThought(CritiqueSignature)
# LionAGI branches using optimized DSPy prompts
class DSPyBranch(Branch):
def __init__(self, dspy_module, **kwargs):
super().__init__(**kwargs)
self.dspy_module = dspy_module
async def dspy_execute(self, **inputs):
"""Execute DSPy module and return to LionAGI context"""
result = self.dspy_module(**inputs)
# Return as standard LionAGI communication
return str(result)
# Create specialized branches with optimized prompts
session = Session()
builder = Builder("hybrid_workflow")
researcher = DSPyBranch(research_module, system="Research specialist with optimized prompts")
analyst = DSPyBranch(analysis_module, system="Analysis specialist with optimized prompts")
critic = DSPyBranch(critique_module, system="Critique specialist with optimized prompts")
session.include_branches([researcher, analyst, critic])
# Workflow using optimized DSPy prompts in LionAGI orchestration
topic = "Impact of AI on healthcare"
research_op = builder.add_operation(
"dspy_execute",
branch=researcher,
topic=topic
)
analysis_op = builder.add_operation(
"dspy_execute",
branch=analyst,
findings="{research_op}",
depends_on=[research_op]
)
critique_op = builder.add_operation(
"dspy_execute",
branch=critic,
analysis="{analysis_op}",
depends_on=[analysis_op]
)
result = await session.flow(builder.get_graph())
Performance with DSPy¶
Optimize both prompts and orchestration:
import asyncio
from lionagi import Session, Builder
# Batch processing with optimized DSPy prompts
async def batch_dspy_analysis(branch: Branch, questions: list, **kwargs):
"""Process multiple questions with optimized DSPy in parallel"""
async def single_analysis(question):
# Your optimized DSPy module - unchanged
result = optimized_analyzer(question=question)
return result.analysis
# Parallel execution within the operation
results = await asyncio.gather(*[
single_analysis(q) for q in questions
])
return results
# High-throughput DSPy processing
questions = [f"Question {i}" for i in range(100)]
batch_size = 10
builder = Builder("high_throughput_dspy")
for i in range(0, len(questions), batch_size):
batch = questions[i:i + batch_size]
builder.add_operation(
operation=batch_dspy_analysis,
questions=batch
)
# Execute with controlled concurrency
result = await session.flow(
builder.get_graph(),
max_concurrent=5 # Control resource usage
)
A/B Testing DSPy Models¶
Compare different DSPy optimizations:
# Different DSPy optimizations to compare
async def dspy_model_a(branch: Branch, input_data: str, **kwargs):
# Your Model A optimization
return model_a_result
async def dspy_model_b(branch: Branch, input_data: str, **kwargs):
# Your Model B optimization
return model_b_result
# A/B test different optimizations in parallel
builder = Builder("dspy_ab_test")
test_data = "Sample input for comparison"
model_a_op = builder.add_operation(operation=dspy_model_a, input_data=test_data)
model_b_op = builder.add_operation(operation=dspy_model_b, input_data=test_data)
# Compare results
comparison_op = builder.add_aggregation(
"communicate",
source_node_ids=[model_a_op, model_b_op],
instruction="Compare Model A and Model B results. Which performs better and why?"
)
result = await session.flow(builder.get_graph())
Key Benefits¶
- Zero Migration: Keep your existing DSPy code unchanged
- Superior Orchestration: LionAGI handles parallel execution, dependencies
- Optimization + Orchestration: Best of both worlds
- A/B Testing: Easy comparison of different optimizations
- Scalable: Built-in performance controls
- Hybrid Workflows: Mix DSPy with other AI operations
DSPy provides the prompt optimization, LionAGI provides the orchestration intelligence.