Orchestrator
The Orchestrator manages agent execution, scaling, and observability. It handles task queuing, worker management, and provides real-time monitoring.
Basic Setup
from factorial import Orchestrator, AgentWorkerConfig, MaintenanceWorkerConfig
orchestrator = Orchestrator(
redis_host="localhost",
redis_port=6379,
openai_api_key="sk-...",
)
# Register your agent
orchestrator.register_runner(
agent=my_agent,
agent_worker_config=AgentWorkerConfig(workers=10),
maintenance_worker_config=MaintenanceWorkerConfig(),
)
# Start the system
orchestrator.run()
Configuration
Redis Configuration
orchestrator = Orchestrator(
redis_host="localhost",
redis_port=6379,
redis_db=0,
redis_max_connections=50,
)
Worker Configuration
Control how many workers process tasks:
from factorial import AgentWorkerConfig
config = AgentWorkerConfig(
workers=10, # Number of concurrent async workers (coroutines)
batch_size=25, # Tasks processed per batch
max_retries=5, # How many times to requeue failed tasks
heartbeat_interval=5, # Heartbeat frequency (seconds)
turn_timeout=120, # Timeout for a single agent turn (seconds)
)
Maintenance Worker Configuration
Maintenance workers clean up expired tasks and recover any failed tasks that have been dropped by a crashed worker.
from factorial import MaintenanceWorkerConfig, TaskTTLConfig
config = MaintenanceWorkerConfig(
interval=10, # Maintenance check interval
workers=1, # Maintenance workers
task_ttl=TaskTTLConfig(
completed_ttl=3600, # Keep completed tasks for 1 hour
failed_ttl=86400, # Keep failed tasks for 24 hours
cancelled_ttl=1800, # Keep cancelled tasks for 30 minutes
),
)
Task Management
Enqueue Tasks
import asyncio
from factorial import AgentContext
async def submit_task():
context = AgentContext(query="Analyze this data")
task = agent.create_task(owner_id="user123", payload=context)
await orchestrator.enqueue_task(agent, task)
return task.id
task_id = asyncio.run(submit_task())
Check Task Status
async def check_status(task_id: str):
status = await orchestrator.get_task_status(task_id)
print(f"Status: {status}") # queued, processing, completed, failed, cancelled
asyncio.run(check_status(task_id))
Get Task Data
async def get_task_data(task_id: str):
task_data = await orchestrator.get_task_data(task_id)
if task_data:
print(f"Data: {task_data}")
asyncio.run(get_task_data(task_id))
Get Task Result
async def get_task_result(task_id: str):
task_data = await orchestrator.get_task_data(task_id)
if task_data:
result = task_data["payload"].get("output")
print(f"Result: {result}")
asyncio.run(get_results(task_id))
Cancel Tasks
async def cancel_task(task_id: str):
await orchestrator.cancel_task(task_id)
asyncio.run(cancel_task(task_id))
Steer Tasks
Inject messages into running tasks:
async def steer_task(task_id: str):
messages = [
{"role": "user", "content": "Please focus on the financial aspects"}
]
await orchestrator.steer_task(task_id, messages)
asyncio.run(steer_task(task_id))
Observability
Dashboard
The orchestrator includes a built-in web dashboard:
from factorial import ObservabilityConfig
orchestrator = Orchestrator(
observability_config=ObservabilityConfig(
enabled=True,
host="0.0.0.0",
port=8080,
dashboard_name="My AI System",
),
)
Access at: http://localhost:8080/observability
Metrics
Configure metrics collection:
from factorial import MetricsTimelineConfig
config = MetricsTimelineConfig(
timeline_duration=3600, # 1 hour timeline
bucket_size="minutes", # Bucket by minutes
retention_multiplier=2.0, # Keep data for 2x timeline
)