MCP Server Integration¶
Connecting LionAGI to Model Context Protocol servers for external capabilities.
Memory MCP¶
from lionagi import Branch
# Memory MCP is automatically available
# Save information to memory
branch = Branch(name="researcher")
await branch.communicate("Remember that user prefers concise responses")
# Memory is automatically saved via MCP
# Access with mcp__memory__search(), mcp__memory__save(), etc.
External API Integration Pattern¶
from lionagi import iModel
# Exa search integration
exa_model = iModel(
provider="exa",
endpoint="search",
queue_capacity=5,
capacity_refresh_time=1,
invoke_with_endpoint=False,
)
async def research_with_external_api(query: str):
"""Use external MCP servers for enhanced research"""
researcher = Branch(
chat_model=iModel(provider="openai", model="gpt-4o-mini"),
system="Research specialist with external search capabilities"
)
# Use external search API
search_request = {
"query": query,
"num_results": 5,
"include_domains": ["arxiv.org", "openai.com"],
"is_cached": True
}
# Make API call
api_call = exa_model.create_api_calling(**search_request)
search_result = await exa_model.invoke(api_call)
# Process with LionAGI
analysis = await researcher.communicate(
"Analyze search results and provide insights",
context=search_result.response
)
return analysis
# Usage
insights = await research_with_external_api("AI safety research trends 2024")
Custom MCP Server Pattern¶
# Example: Custom knowledge base MCP
class KnowledgeBaseMCP:
"""Custom MCP server for domain knowledge"""
def __init__(self, knowledge_db_path: str):
self.db_path = knowledge_db_path
async def query_knowledge(self, topic: str) -> str:
"""Query domain-specific knowledge base"""
# Implementation would connect to actual knowledge base
return f"Knowledge about {topic}: ..."
async def add_knowledge(self, topic: str, content: str) -> bool:
"""Add new knowledge to base"""
# Implementation would store in knowledge base
return True
# Integration with LionAGI
knowledge_server = KnowledgeBaseMCP("./knowledge.db")
domain_expert = Branch(
chat_model=iModel(provider="openai", model="gpt-4o-mini"),
tools=[knowledge_server.query_knowledge, knowledge_server.add_knowledge],
system="Domain expert with access to specialized knowledge base"
)
# Use custom MCP
result = await domain_expert.ReAct(
instruct={"instruction": "Research quantum computing applications"},
max_extensions=3
)
MCP Communication Patterns¶
import json
import asyncio
class MCPClient:
"""Generic MCP client for custom protocols"""
def __init__(self, server_endpoint: str):
self.endpoint = server_endpoint
async def call_mcp_method(self, method: str, params: dict):
"""Generic MCP method call"""
# Implementation would handle MCP protocol
request = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": 1
}
# Mock response for example
return {"result": f"MCP response for {method}"}
# Use MCP client in agents
mcp_client = MCPClient("http://localhost:8000/mcp")
async def mcp_enhanced_workflow():
"""Workflow using multiple MCP servers"""
coordinator = Branch(
chat_model=iModel(provider="openai", model="gpt-4o-mini"),
system="Coordinates work using MCP servers"
)
# Call multiple MCP servers
tasks = [
mcp_client.call_mcp_method("search", {"query": "AI research"}),
mcp_client.call_mcp_method("analyze", {"data": "research_data"}),
mcp_client.call_mcp_method("summarize", {"content": "long_text"})
]
# Execute MCP calls in parallel
mcp_results = await asyncio.gather(*tasks)
# Process results with LionAGI
synthesis = await coordinator.communicate(
"Synthesize MCP server results",
context=mcp_results
)
return synthesis
# Usage
result = await mcp_enhanced_workflow()
Resource Management¶
from contextlib import asynccontextmanager
class MCPResourceManager:
"""Manage MCP server connections and resources"""
def __init__(self):
self.connections = {}
@asynccontextmanager
async def get_mcp_connection(self, server_name: str):
"""Get managed MCP connection"""
try:
if server_name not in self.connections:
# Initialize connection
self.connections[server_name] = MCPClient(f"http://{server_name}:8000")
yield self.connections[server_name]
except Exception as e:
print(f"MCP connection error: {e}")
raise
finally:
# Cleanup if needed
pass
# Usage in production
mcp_manager = MCPResourceManager()
async def production_mcp_workflow():
"""Production workflow with managed MCP resources"""
async with mcp_manager.get_mcp_connection("memory_server") as memory_mcp:
async with mcp_manager.get_mcp_connection("search_server") as search_mcp:
# Use multiple MCP servers safely
memory_result = await memory_mcp.call_mcp_method("recall", {"query": "user_preferences"})
search_result = await search_mcp.call_mcp_method("search", {"query": "latest_ai_news"})
# Process with LionAGI
processor = Branch(
chat_model=iModel(provider="openai", model="gpt-4o-mini")
)
return await processor.communicate(
"Process MCP results",
context={"memory": memory_result, "search": search_result}
)
Error Handling and Retry¶
async def robust_mcp_call(mcp_client, method: str, params: dict, max_retries: int = 3):
"""Robust MCP call with retry logic"""
for attempt in range(max_retries):
try:
result = await mcp_client.call_mcp_method(method, params)
return result
except Exception as e:
if attempt == max_retries - 1:
print(f"MCP call failed after {max_retries} attempts: {e}")
return None
# Exponential backoff
await asyncio.sleep(2 ** attempt)
return None
# Usage in workflows
async def fault_tolerant_workflow():
"""Workflow with MCP fault tolerance"""
mcp_client = MCPClient("http://unreliable-server:8000")
# Robust MCP calls
results = await asyncio.gather(
robust_mcp_call(mcp_client, "method1", {"param": "value1"}),
robust_mcp_call(mcp_client, "method2", {"param": "value2"}),
return_exceptions=True
)
# Filter successful results
successful_results = [r for r in results if r is not None]
# Process with LionAGI even if some MCP calls failed
if successful_results:
processor = Branch(
chat_model=iModel(provider="openai", model="gpt-4o-mini")
)
return await processor.communicate(
"Process available MCP results",
context=successful_results
)
return "No MCP results available"
Best Practices¶
Connection Management:
- Use connection pooling for high-throughput scenarios
- Implement proper timeout handling
- Cache MCP responses when appropriate
Error Handling:
- Implement retry logic with exponential backoff
- Graceful degradation when MCP servers unavailable
- Comprehensive logging for debugging
Performance:
- Parallel MCP calls when possible
- Optimize payload sizes for network efficiency
- Monitor MCP server response times
Security:
- Validate all MCP server responses
- Use secure connections (TLS) for production
- Implement proper authentication mechanisms