Orchestration Guide¶
How to build single-step and multi-step workflows with lionagi.
Architecture Overview¶
Branch (single conversation)
|- chat_model: iModel -- LLM for conversation
|- parse_model: iModel -- LLM for structured parsing (defaults to chat_model)
|- messages: Pile[RoledMessage] -- conversation history
|- tools: dict[str, Tool] -- registered callable tools
|- logs: Pile[Log] -- activity logs
Session (multi-branch orchestrator)
|- branches: Pile[Branch] -- all managed branches
|- default_branch: Branch -- fallback branch
|- flow(graph) -> dict -- execute a DAG workflow
Builder (OperationGraphBuilder)
|- add_operation(...) -- add a node to the DAG
|- add_aggregation(...) -- add a node that collects from multiple sources
|- expand_from_result(...) -- dynamically expand based on results
|- get_graph() -> Graph -- return the graph for execution
Branch: The Primary API Surface¶
Branch is a facade over four managers. All LLM operations are Branch methods.
Method Signatures¶
# Low-level call -- does NOT add to history
await branch.chat(
instruction="...", # main prompt
guidance="...", # system-level guidance
context="...", # additional context
response_format=MyModel, # optional structured output
imodel=alt_model, # optional model override
images=[...], # optional images
**kwargs, # passed to LLM (temperature, etc.)
) -> tuple[Instruction, AssistantResponse]
# Returns (instruction_msg, response_msg) -- neither added to history
# Conversational -- DOES add to history
await branch.communicate(
instruction="...", # main prompt (positional OK)
guidance="...", # system-level guidance
context="...", # additional context
response_format=MyModel, # optional structured output
chat_model=alt_model, # optional model override
parse_model=parse_model, # optional parse model override
num_parse_retries=3, # retries for structured parsing
clear_messages=False, # clear history before this call
**kwargs,
) -> str | BaseModel | dict | None
# Tool use + structured output -- DOES add to history
await branch.operate(
instruction="...", # or instruct=Instruct(...)
guidance="...",
context="...",
response_format=MyModel, # optional structured output
actions=True, # enable tool calling
tools=[my_func], # register tools for this call
reason=True, # request chain-of-thought
invoke_actions=True, # auto-invoke requested tools
action_strategy="concurrent", # or "sequential"
**kwargs,
) -> list | BaseModel | None | dict | str
# Multi-step reasoning -- DOES add to history
await branch.ReAct(
instruct=Instruct(instruction="...", guidance="..."),
tools=[my_func], # available tools
response_format=MyModel, # final output format
extension_allowed=True, # allow multi-step expansion
max_extensions=3, # max reasoning steps (capped at 5)
verbose=False, # print intermediate steps
**kwargs,
) -> Any | tuple[Any, list] # result, or (result, analyses) if return_analysis=True
When to Use Which Method¶
Need simple LLM response, no history? -> chat()
Need conversational context? -> communicate()
Need structured output from conversation? -> communicate(response_format=Model)
Need tool calling? -> operate(actions=True)
Need structured output + tools? -> operate(response_format=Model, actions=True)
Need multi-step reasoning with tools? -> ReAct()
Need to parse existing text into a model? -> parse(text, response_format=Model)
Need to refine/rewrite a prompt? -> interpret(text)
Concurrency Patterns¶
lionagi provides structured concurrency primitives in lionagi.ln.concurrency.
gather -- Run awaitables concurrently¶
from lionagi.ln.concurrency import gather
results = await gather(
branch.communicate("Task A"),
branch.communicate("Task B"),
branch.communicate("Task C"),
return_exceptions=False, # True to collect errors instead of failing fast
)
# results: list in same order as input
race -- First to complete wins¶
from lionagi.ln.concurrency import race
result = await race(
branch.communicate("Fast approach"),
branch.communicate("Thorough approach"),
)
# Returns the first result; cancels the rest
bounded_map -- Concurrency-limited parallel map¶
from lionagi.ln.concurrency import bounded_map
tasks = ["Analyze module A", "Analyze module B", "Analyze module C"]
results = await bounded_map(
lambda task: branch.communicate(task),
tasks,
limit=2, # max 2 concurrent
)
retry -- Exponential backoff¶
from lionagi.ln.concurrency import retry
result = await retry(
lambda: branch.communicate("Flaky task"),
attempts=3,
base_delay=0.5,
retry_on=(ValueError,),
)
CompletionStream -- Process results as they arrive¶
from lionagi.ln.concurrency import CompletionStream
tasks = [branch.communicate(f"Task {i}") for i in range(10)]
async with CompletionStream(tasks, limit=3) as stream:
async for idx, result in stream:
print(f"Task {idx} completed: {result[:50]}")
alcall / bcall -- Batch processing¶
from lionagi.ln import alcall, bcall
# Apply function to list with concurrency control
results = await alcall(
["item1", "item2", "item3"],
lambda x: branch.communicate(f"Process {x}"),
max_concurrent=2,
retry_default=3,
)
# Batch processing with yields per batch
async for batch_results in bcall(
large_list,
process_fn,
batch_size=10,
):
handle(batch_results)
Session + Builder: DAG Workflows¶
For workflows with dependencies between operations, use Session + Builder.
Basic Sequential Pipeline¶
from lionagi import Branch, Session, Builder, iModel
session = Session()
builder = Builder("pipeline")
branch = Branch(chat_model=iModel(provider="openai", model="gpt-4.1-mini"))
session.include_branches([branch])
step1 = builder.add_operation(
"communicate", branch=branch,
instruction="Research the topic",
)
step2 = builder.add_operation(
"communicate", branch=branch,
instruction="Analyze findings",
depends_on=[step1],
)
step3 = builder.add_operation(
"communicate", branch=branch,
instruction="Write report",
depends_on=[step2],
)
result = await session.flow(builder.get_graph())
# result["operation_results"][step1] -- result from step 1
# result["operation_results"][step2] -- result from step 2
# result["completed_operations"] -- list of completed node IDs
# result["skipped_operations"] -- list of skipped node IDs
Fan-Out / Fan-In¶
builder = Builder("fan_out")
# Independent analyses run in parallel
security = builder.add_operation(
"communicate", branch=sec_branch,
instruction="Security analysis",
)
perf = builder.add_operation(
"communicate", branch=perf_branch,
instruction="Performance analysis",
)
style = builder.add_operation(
"communicate", branch=style_branch,
instruction="Code style analysis",
)
# Aggregation collects all results
synthesis = builder.add_aggregation(
"communicate", branch=main_branch,
source_node_ids=[security, perf, style],
instruction="Synthesize all analyses into a report",
)
result = await session.flow(builder.get_graph(), max_concurrent=3)
Dynamic Expansion¶
from lionagi.operations.fields import LIST_INSTRUCT_FIELD_MODEL
# Step 1: Generate sub-tasks
root = builder.add_operation(
"operate", branch=branch,
instruction="Generate 5 research questions",
field_models=[LIST_INSTRUCT_FIELD_MODEL],
)
result = await session.flow(builder.get_graph())
sub_tasks = result["operation_results"][root].instruct_models
# Step 2: Expand graph dynamically
new_ids = builder.expand_from_result(
sub_tasks,
source_node_id=root,
operation="communicate",
)
# Step 3: Execute expanded graph
final = await session.flow(builder.get_graph(), max_concurrent=5)
Builder API Reference¶
builder = Builder(name="workflow_name")
# Add a single operation
node_id = builder.add_operation(
operation, # "communicate" | "operate" | "chat" | "ReAct"
node_id=None, # optional reference ID
depends_on=None, # list of node IDs this depends on
inherit_context=False, # inherit conversation from dependency
branch=None, # Branch to execute on
**parameters, # passed to the Branch method
)
# Add aggregation node
node_id = builder.add_aggregation(
operation,
source_node_ids=None, # defaults to current head nodes
inherit_context=False,
branch=None,
**parameters,
)
# Expand based on results
new_ids = builder.expand_from_result(
items, # list of items (e.g., Instruct models)
source_node_id, # parent node ID
operation, # operation for each item
strategy=ExpansionStrategy.CONCURRENT,
**shared_params,
)
# Get the graph for execution
graph = builder.get_graph()
# Inspect state
state = builder.visualize_state()
# {"total_nodes": N, "executed_nodes": M, "current_heads": [...]}
Session.flow() Parameters¶
result = await session.flow(
graph, # Graph from builder.get_graph()
context=None, # dict of initial context
parallel=True, # enable parallel execution
max_concurrent=5, # max concurrent operations
verbose=False, # enable logging
default_branch=None, # override default branch
)
Error Handling¶
Branch-Level¶
try:
result = await branch.communicate("Task")
except ValueError as e:
# API call failure, timeout, or parsing error
print(f"Failed: {e}")
Workflow-Level¶
result = await session.flow(builder.get_graph())
# Check for skipped operations
if result.get("skipped_operations"):
for skipped_id in result["skipped_operations"]:
print(f"Skipped: {skipped_id}")
# Check specific operation results
for node_id, op_result in result.get("operation_results", {}).items():
if op_result is None:
print(f"Operation {node_id} returned None")
Structured Output Validation¶
# communicate() with response_format handles parse retries internally
result = await branch.communicate(
"Extract entities",
response_format=EntityList,
num_parse_retries=3, # retry parsing up to 3 times
)
# result is EntityList or None if all retries fail
# operate() with handle_validation control
result = await branch.operate(
instruction="Extract entities",
response_format=EntityList,
handle_validation="return_none", # "raise" | "return_value" | "return_none"
)
Registering Tools¶
# Function tools -- auto-generates schema from signature
def multiply(a: int, b: int) -> int:
"""Multiply two numbers."""
return a * b
async def fetch_data(url: str) -> str:
"""Fetch data from URL."""
...
branch.register_tools([multiply, fetch_data])
# Then use with operate()
result = await branch.operate(
instruction="What is 6 times 7?",
actions=True,
)
System Messages¶
# Set at construction
branch = Branch(system="You are a code reviewer.")
# With datetime
branch = Branch(
system="You are a code reviewer.",
system_datetime=True, # includes current timestamp
)
# With the built-in Lion system message
branch = Branch(
system="Additional instructions here",
use_lion_system_message=True,
)