Skip to content

Research Synthesis

Parallel research with expert synthesis - the natural pattern for comprehensive analysis.

Basic Research Synthesis

from lionagi import Branch, Builder, Session, iModel
from lionagi.fields import LIST_INSTRUCT_FIELD_MODEL, Instruct
from lionagi.protocols.types import AssistantResponse

# Setup orchestrator
orchestrator = Branch(
    chat_model=iModel(provider="openai", model="gpt-4o-mini"),
    system="Break research into parallel assignments and synthesize findings"
)
session = Session(default_branch=orchestrator)
builder = Builder("ResearchSynthesis")

topic = "AI safety in production systems"

# Decomposition phase
root = builder.add_operation(
    "operate",
    instruct=Instruct(
        instruction=f"Create 3-4 research assignments for: {topic}",
        context=topic
    ),
    reason=True,
    field_models=[LIST_INSTRUCT_FIELD_MODEL]
)

# Execute decomposition
result = await session.flow(builder.get_graph())
instruct_models = result["operation_results"][root].instruct_models

# Fan-out: Create researchers
research_nodes = []
for i, instruction in enumerate(instruct_models):
    researcher = Branch(
        chat_model=iModel(provider="openai", model="gpt-4o-mini"),
        system=f"Research specialist #{i+1} - focused domain expert"
    )

    node = builder.add_operation(
        "communicate",
        depends_on=[root],
        branch=researcher,
        **instruction.to_dict()
    )
    research_nodes.append(node)

# Execute research
await session.flow(builder.get_graph())

# Extract findings with cost tracking
costs = 0
def get_context(node_id):
    global costs
    graph = builder.get_graph()
    node = graph.internal_nodes[node_id]
    branch = session.get_branch(node.branch_id, None)
    if (branch and len(branch.messages) > 0 and 
        isinstance(msg := branch.messages[-1], AssistantResponse)):
        costs += msg.model_response.get("total_cost_usd") or 0
        return f"""
Response: {msg.model_response.get("result") or "Not available"}
Summary: {msg.model_response.get("summary") or "Not available"}
        """.strip()

ctx = [get_context(i) for i in research_nodes]

# Fan-in: Synthesize
synthesis = builder.add_operation(
    "communicate",
    depends_on=research_nodes,
    branch=orchestrator,
    instruction="Synthesize research findings into comprehensive analysis",
    context=[i for i in ctx if i is not None]
)

final_result = await session.flow(builder.get_graph())
print(f"Research complete. Total cost: ${costs:.4f}")

Literature Review

# Literature review orchestrator
orchestrator = Branch(
    chat_model=iModel(provider="openai", model="gpt-4o-mini"),
    system="Literature review coordinator and synthesizer"
)

papers = ["Attention is All You Need", "BERT", "GPT-3"]
focus = "transformer architecture evolution"

# Generate review plan
planning = builder.add_operation(
    "operate",
    branch=orchestrator,
    instruct=Instruct(
        instruction=f"Create analysis framework for: {focus}",
        context={"papers": papers, "focus": focus}
    ),
    field_models=[LIST_INSTRUCT_FIELD_MODEL]
)

result = await session.flow(builder.get_graph())
review_tasks = result["operation_results"][planning].instruct_models

# Parallel paper analysis
review_nodes = []
for task in review_tasks:
    reviewer = Branch(
        chat_model=iModel(provider="openai", model="gpt-4o-mini"),
        system="Academic paper analysis specialist"
    )

    node = builder.add_operation(
        "communicate",
        depends_on=[planning],
        branch=reviewer,
        **task.to_dict()
    )
    review_nodes.append(node)

await session.flow(builder.get_graph())

# Synthesis
synthesis = builder.add_operation(
    "communicate",
    depends_on=review_nodes,
    branch=orchestrator,
    instruction="Create comprehensive literature review synthesis"
)

final_result = await session.flow(builder.get_graph())

Cost-Efficient Research

# Budget-aware research
total_cost = 0
max_cost = 1.0

coordinator = Branch(
    chat_model=iModel(provider="openai", model="gpt-4o-mini"),
    system="Efficient research coordinator"
)

# Budget check before synthesis
def track_costs(node_id):
    global total_cost
    graph = builder.get_graph()
    node = graph.internal_nodes[node_id]
    branch = session.get_branch(node.branch_id, None)
    if (branch and len(branch.messages) > 0 and 
        isinstance(msg := branch.messages[-1], AssistantResponse)):
        cost = msg.model_response.get("total_cost_usd", 0)
        total_cost += cost
        return cost > 0
    return False

await session.flow(builder.get_graph())

if total_cost < max_cost:
    synthesis = builder.add_operation(
        "communicate",
        depends_on=research_nodes,
        branch=coordinator,
        instruction="Synthesize research findings efficiently"
    )
    final_result = await session.flow(builder.get_graph())
    print(f"Research completed. Cost: ${total_cost:.4f}")
else:
    print(f"Budget exceeded: ${total_cost:.4f}")

Production Error Handling

try:
    # Research execution with error handling
    final_result = await session.flow(builder.get_graph())
    print(f"Research complete. Total cost: ${costs:.4f}")

except Exception as e:
    print(f"Research failed: {e}")
    import traceback
    traceback.print_exc()

When to Use

Perfect for: Complex topics, literature reviews, market research, technical analysis requiring domain expertise

Execution Flow: Task Decomposition → Parallel Researchers → Context Extraction → Synthesis

Key Benefits:

  • 3-4x faster than sequential research
  • Higher quality through diverse perspectives
  • Systematic evidence gathering
  • Cost-efficient parallel execution

Research synthesis leverages the fan-out/fan-in pattern for comprehensive analysis through specialized parallel research with intelligent synthesis.