Fan-Out/In Pattern¶
Distribute work to multiple agents in parallel, then combine their results.
Basic Pattern¶
from lionagi import Session, Builder, Branch, iModel
session = Session()
builder = Builder()
# Fan-out: Create parallel research tasks
topics = ["market analysis", "competitor review", "customer feedback"]
research_nodes = []
for topic in topics:
node = builder.add_operation("communicate", instruction=f"Research {topic}")
research_nodes.append(node)
# Fan-in: Synthesize all results
synthesis = builder.add_operation(
"communicate",
depends_on=research_nodes,
instruction="Combine all research into comprehensive report"
)
result = await session.flow(builder.get_graph())
This pattern demonstrates the power of parallel processing in LionAGI. The fan-out phase creates three research operations that run simultaneously, each focusing on a different aspect. The fan-in phase waits for all research to complete, then synthesizes the findings into a comprehensive report. This approach is 3x faster than sequential processing while providing more thorough coverage than any single analysis.
Production Implementation with Cost Tracking¶
from lionagi import Branch, iModel, Session, Builder
from lionagi.fields import LIST_INSTRUCT_FIELD_MODEL, Instruct
from lionagi.models import AssistantResponse
async def production_fan_out_in():
"""Production version with error handling and cost tracking"""
try:
orc_branch = Branch(
chat_model=iModel(provider="openai", model="gpt-4o-mini"),
name="orchestrator"
)
session = Session(default_branch=orc_branch)
builder = Builder("ProductionFanOut")
# Initial decomposition
root = builder.add_operation(
"operate",
instruct=Instruct(
instruction="Break down the analysis task into parallel components",
context="market_analysis"
),
reason=True,
field_models=[LIST_INSTRUCT_FIELD_MODEL]
)
result = await session.flow(builder.get_graph())
instruct_models = result["operation_results"][root].instruct_models
# Create research nodes
research_nodes = []
for instruction in instruct_models:
node = builder.add_operation(
"communicate",
depends_on=[root],
chat_model=iModel(provider="openai", model="gpt-4o-mini"),
**instruction.to_dict()
)
research_nodes.append(node)
# Execute research with cost tracking
costs = 0
def get_context(node_id):
nonlocal costs
graph = builder.get_graph()
node = graph.internal_nodes[node_id]
branch = session.get_branch(node.branch_id, None)
if (branch and len(branch.messages) > 0 and
isinstance(msg := branch.messages[-1], AssistantResponse)):
costs += msg.model_response.get("total_cost_usd") or 0
return f"""
Response: {msg.model_response.get("result") or "Not available"}
Summary: {msg.model_response.get("summary") or "Not available"}
""".strip()
await session.flow(builder.get_graph())
ctx = [get_context(i) for i in research_nodes]
# Synthesis
synthesis = builder.add_operation(
"communicate",
depends_on=research_nodes,
branch=orc_branch,
instruction="Synthesize researcher findings",
context=[i for i in ctx if i is not None]
)
final_result = await session.flow(builder.get_graph())
result_synthesis = final_result["operation_results"][synthesis]
# Optional: Visualize execution graph
builder.visualize("Fan-out/in execution pattern")
print(f"Analysis complete. Total cost: ${costs:.4f}")
return result_synthesis
except Exception as e:
print(f"Fan-out-in failed: {e}")
import traceback
traceback.print_exc()
return None
asyncio.run(production_fan_out_in())
````
## When to Use
!!! success "Perfect For"
- **Complex research**: Multiple perspectives on the same topic
- **Code reviews**: Security, performance, style analysis in parallel
- **Market analysis**: Different domain experts working simultaneously
- **Large datasets**: Parallel investigation and analysis
!!! tip "Pattern Indicators"
Use fan-out/in when:
- Problem benefits from simultaneous analysis
- Individual tasks can run independently
- Final answer requires synthesis of perspectives
- Time constraints favor parallel over sequential execution
## Execution Flow
## Key Implementation Notes
### Context Extraction Pattern
```python
def get_context(node_id):
graph = builder.get_graph()
node = graph.internal_nodes[node_id]
branch = session.get_branch(node.branch_id, None)
if (branch and len(branch.messages) > 0 and
isinstance(msg := branch.messages[-1], AssistantResponse)):
return msg.model_response.get("result") or "Not available"
Cost Tracking Pattern¶
costs = 0
def track_costs(msg):
nonlocal costs
costs += msg.model_response.get("total_cost_usd") or 0
Error Handling¶
try:
result = await session.flow(builder.get_graph())
except Exception as e:
print(f"Execution failed: {e}")
import traceback
traceback.print_exc()
return None
Performance Characteristics¶
Expected Performance
- Speed: 3-4x faster than sequential for complex analysis
- Quality: Higher insights through diverse perspectives
- Success Rate: ~95% completion rate
- Scale: Optimal with 3-5 parallel researchers
Cost Considerations
Cost scales proportionally with number of parallel researchers. Balance thoroughness vs. expense based on your use case.
Fan-out/in delivers comprehensive analysis through parallel specialization, making complex investigations both faster and more thorough.