Skip to content

Performance Optimization

Making LionAGI workflows fast and efficient.

Parallel Execution

Execute multiple branches simultaneously:

from lionagi import Session, Branch, iModel
import asyncio

session = Session()

# Multiple branches for parallel processing
researcher = Branch(
    chat_model=iModel(provider="openai", model="gpt-4"),
    system="Research specialist"
)
analyst = Branch(
    chat_model=iModel(provider="openai", model="gpt-4"),
    system="Data analyst"
)

session.include_branches([researcher, analyst])

# Execute in parallel
results = await asyncio.gather(
    researcher.communicate("Research market trends"),
    analyst.communicate("Analyze competitive landscape")
)

Concurrency Control

Control parallel execution with max_concurrent:

from lionagi import Session, Builder, Branch, iModel

session = Session()
builder = Builder("performance_test")

branch = Branch(chat_model=iModel(provider="openai", model="gpt-4"))
session.include_branches([branch])

# Create multiple operations
topics = ["AI trends", "market analysis", "competition"]
for topic in topics:
    builder.add_operation(
        "communicate",
        branch=branch,
        instruction=f"Brief analysis of {topic}"
    )

# Execute with controlled concurrency
result = await session.flow(
    builder.get_graph(),
    max_concurrent=2  # Only 2 operations at once
)

Token Efficiency

Use appropriate models for different tasks:

# Light model for simple tasks
classifier = Branch(
    chat_model=iModel(provider="openai", model="gpt-4"),
    system="Classify content briefly"
)

# Powerful model for complex analysis
analyzer = Branch(
    chat_model=iModel(provider="anthropic", model="claude-3-5-sonnet-20240620"),
    system="Provide detailed analysis"
)

content = "Sample content to process"

# Step 1: Quick classification
category = await classifier.communicate(f"Category (simple/complex): {content}")

# Step 2: Use appropriate model based on complexity
if "complex" in category.lower():
    analysis = await analyzer.communicate(f"Detailed analysis: {content}")
else:
    analysis = await classifier.communicate(f"Brief analysis: {content}")

Batch Processing

Process multiple items efficiently:

branch = Branch(chat_model=iModel(provider="openai", model="gpt-4"))

items = [f"item_{i}" for i in range(20)]
batch_size = 5
results = []

for i in range(0, len(items), batch_size):
    batch = items[i:i + batch_size]

    # Process batch in parallel
    batch_results = await asyncio.gather(*[
        branch.communicate(f"Process: {item}")
        for item in batch
    ])

    results.extend(batch_results)

Memory Management

Clear message history in long workflows:

branch = Branch(chat_model=iModel(provider="openai", model="gpt-4"))

large_dataset = [f"document_{i}" for i in range(100)]
chunk_size = 10
results = []

for i in range(0, len(large_dataset), chunk_size):
    chunk = large_dataset[i:i + chunk_size]

    # Process chunk
    chunk_result = await branch.communicate(
        f"Summarize these {len(chunk)} documents: {chunk}"
    )
    results.append(chunk_result)

    # Clear message history to free memory
    branch.messages.clear()

Best Practices

Choose appropriate patterns for your use case:

# Simple parallel tasks: Use asyncio.gather()
results = await asyncio.gather(*[branch.communicate(task) for task in tasks])

# Complex workflows: Use Builder + session.flow()
result = await session.flow(builder.get_graph(), max_concurrent=5)

Control concurrency based on your needs:

  • Start with max_concurrent=3-5
  • Adjust based on API rate limits
  • Monitor for optimal settings

Optimize token usage:

  • Use appropriate models for task complexity
  • Clear message history when context not needed
  • Batch similar operations together

Monitor performance:

# Use verbose mode for insights
result = await session.flow(graph, verbose=True)

# Track metrics
print(f"Completed: {len(result['completed_operations'])}")

Performance optimization focuses on parallel execution, concurrency control, and efficient resource usage.