Skip to content

Fan-Out/In Pattern

Distribute work to multiple agents in parallel, then combine their results.

Basic Pattern

from lionagi import Session, Builder, Branch, iModel

session = Session()
builder = Builder()

# Fan-out: Create parallel research tasks
topics = ["market analysis", "competitor review", "customer feedback"]
research_nodes = []

for topic in topics:
    node = builder.add_operation("communicate", instruction=f"Research {topic}")
    research_nodes.append(node)

# Fan-in: Synthesize all results
synthesis = builder.add_operation(
    "communicate",
    depends_on=research_nodes,
    instruction="Combine all research into comprehensive report"
)

result = await session.flow(builder.get_graph())

This pattern demonstrates the power of parallel processing in LionAGI. The fan-out phase creates three research operations that run simultaneously, each focusing on a different aspect. The fan-in phase waits for all research to complete, then synthesizes the findings into a comprehensive report. This approach is 3x faster than sequential processing while providing more thorough coverage than any single analysis.

Production Implementation with Cost Tracking

from lionagi import Branch, iModel, Session, Builder
from lionagi.fields import LIST_INSTRUCT_FIELD_MODEL, Instruct
from lionagi.models import AssistantResponse

async def production_fan_out_in():
    """Production version with error handling and cost tracking"""

    try:
        orc_branch = Branch(
            chat_model=iModel(provider="openai", model="gpt-4o-mini"),
            name="orchestrator"
        )
        session = Session(default_branch=orc_branch)
        builder = Builder("ProductionFanOut")

        # Initial decomposition
        root = builder.add_operation(
            "operate",
            instruct=Instruct(
                instruction="Break down the analysis task into parallel components",
                context="market_analysis"
            ),
            reason=True,
            field_models=[LIST_INSTRUCT_FIELD_MODEL]
        )

        result = await session.flow(builder.get_graph())
        instruct_models = result["operation_results"][root].instruct_models

        # Create research nodes
        research_nodes = []
        for instruction in instruct_models:
            node = builder.add_operation(
                "communicate",
                depends_on=[root],
                chat_model=iModel(provider="openai", model="gpt-4o-mini"),
                **instruction.to_dict()
            )
            research_nodes.append(node)

        # Execute research with cost tracking
        costs = 0

        def get_context(node_id):
            nonlocal costs
            graph = builder.get_graph()
            node = graph.internal_nodes[node_id]
            branch = session.get_branch(node.branch_id, None)
            if (branch and len(branch.messages) > 0 and 
                isinstance(msg := branch.messages[-1], AssistantResponse)):
                costs += msg.model_response.get("total_cost_usd") or 0
                return f"""
Response: {msg.model_response.get("result") or "Not available"}
Summary: {msg.model_response.get("summary") or "Not available"}
                """.strip()

        await session.flow(builder.get_graph())
        ctx = [get_context(i) for i in research_nodes]

        # Synthesis
        synthesis = builder.add_operation(
            "communicate",
            depends_on=research_nodes,
            branch=orc_branch,
            instruction="Synthesize researcher findings",
            context=[i for i in ctx if i is not None]
        )

        final_result = await session.flow(builder.get_graph())
        result_synthesis = final_result["operation_results"][synthesis]

        # Optional: Visualize execution graph
        builder.visualize("Fan-out/in execution pattern")

        print(f"Analysis complete. Total cost: ${costs:.4f}")
        return result_synthesis

    except Exception as e:
        print(f"Fan-out-in failed: {e}")
        import traceback
        traceback.print_exc()
        return None

asyncio.run(production_fan_out_in())
````

## When to Use

!!! success "Perfect For"
    - **Complex research**: Multiple perspectives on the same topic
    - **Code reviews**: Security, performance, style analysis in parallel
    - **Market analysis**: Different domain experts working simultaneously  
    - **Large datasets**: Parallel investigation and analysis

!!! tip "Pattern Indicators"
    Use fan-out/in when:

    - Problem benefits from simultaneous analysis
    - Individual tasks can run independently  
    - Final answer requires synthesis of perspectives
    - Time constraints favor parallel over sequential execution

## Execution Flow
[Orchestrator] ↓ (decompose task) [Task Breakdown] ↓ (fan-out) ┌─[Researcher 1]─┐ ├─[Researcher 2]─┤ → (parallel execution) ├─[Researcher 3]─┤ └─[Researcher 4]─┘ ↓ (fan-in) [Synthesis] ↓ [Final Result]
## Key Implementation Notes

### Context Extraction Pattern

```python
def get_context(node_id):
    graph = builder.get_graph()
    node = graph.internal_nodes[node_id]
    branch = session.get_branch(node.branch_id, None)
    if (branch and len(branch.messages) > 0 and 
        isinstance(msg := branch.messages[-1], AssistantResponse)):
        return msg.model_response.get("result") or "Not available"

Cost Tracking Pattern

costs = 0
def track_costs(msg):
    nonlocal costs
    costs += msg.model_response.get("total_cost_usd") or 0

Error Handling

try:
    result = await session.flow(builder.get_graph())
except Exception as e:
    print(f"Execution failed: {e}")
    import traceback
    traceback.print_exc()
    return None

Performance Characteristics

Expected Performance

  • Speed: 3-4x faster than sequential for complex analysis
  • Quality: Higher insights through diverse perspectives
  • Success Rate: ~95% completion rate
  • Scale: Optimal with 3-5 parallel researchers

Cost Considerations

Cost scales proportionally with number of parallel researchers. Balance thoroughness vs. expense based on your use case.

Fan-out/in delivers comprehensive analysis through parallel specialization, making complex investigations both faster and more thorough.