Monitoring Router Performance
Learn how to track, analyze, and optimize router performance in production applications.Overview
Effective monitoring is essential for understanding how smart routing behaves in production. This guide covers tracking routing decisions, analyzing performance metrics, and optimizing based on real usage data. What you’ll learn:- Tracking routing decisions and model selection
- Measuring routing latency and overhead
- Analyzing cost patterns
- Identifying routing failures
- Building monitoring dashboards
- Optimizing based on metrics
Basic Monitoring
Tracking Selected Models
Track which models the router selects for your requests:Copy
import requests
import json
from datetime import datetime
from typing import Optional
class RouterMonitor:
"""Simple router monitoring with tracking."""
def __init__(self, api_key: str):
self.api_key = api_key
self.url = "https://api.edenai.run/v3/llm/chat/completions"
self.routing_log = []
def chat(
self,
message: str,
candidates: list[str] = None,
metadata: dict = None
) -> dict:
"""Chat with routing and tracking."""
start_time = datetime.now()
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "@edenai",
"messages": [{"role": "user", "content": message}],
"stream": True
}
if candidates:
payload["router_candidates"] = candidates
response = requests.post(
self.url,
headers=headers,
json=payload,
stream=True
)
full_response = ""
selected_model = None
first_chunk_time = None
for line in response.iter_lines():
if line:
if not first_chunk_time:
first_chunk_time = datetime.now()
line_str = line.decode('utf-8')
if line_str.startswith('data: ') and line_str != 'data: [DONE]':
data = json.loads(line_str[6:])
if not selected_model and 'model' in data:
selected_model = data['model']
content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
full_response += content
end_time = datetime.now()
# Calculate latencies
routing_latency = (
(first_chunk_time - start_time).total_seconds() * 1000
if first_chunk_time else None
)
total_latency = (end_time - start_time).total_seconds() * 1000
# Log routing decision
log_entry = {
"timestamp": start_time.isoformat(),
"message": message[:100] + "..." if len(message) > 100 else message,
"selected_model": selected_model,
"candidates": candidates or "default",
"routing_latency_ms": round(routing_latency, 2) if routing_latency else None,
"total_latency_ms": round(total_latency, 2),
"response_length": len(full_response),
"metadata": metadata or {}
}
self.routing_log.append(log_entry)
return {
"response": full_response,
"model": selected_model,
"metrics": {
"routing_latency_ms": log_entry["routing_latency_ms"],
"total_latency_ms": log_entry["total_latency_ms"]
}
}
def get_log(self, limit: int = None) -> list[dict]:
"""Get routing log."""
if limit:
return self.routing_log[-limit:]
return self.routing_log.copy()
def print_summary(self):
"""Print routing summary."""
if not self.routing_log:
print("No routing data available")
return
from collections import Counter
model_counts = Counter(entry["selected_model"] for entry in self.routing_log)
total_requests = len(self.routing_log)
avg_routing_latency = sum(
entry["routing_latency_ms"]
for entry in self.routing_log
if entry["routing_latency_ms"]
) / total_requests
avg_total_latency = sum(
entry["total_latency_ms"]
for entry in self.routing_log
) / total_requests
print("\n=== Routing Summary ===")
print(f"Total requests: {total_requests}")
print(f"\nModel distribution:")
for model, count in model_counts.most_common():
percentage = (count / total_requests) * 100
print(f" {model}: {count} ({percentage:.1f}%)")
print(f"\nAverage routing latency: {avg_routing_latency:.0f}ms")
print(f"Average total latency: {avg_total_latency:.0f}ms")
# Usage
monitor = RouterMonitor("YOUR_API_KEY")
# Make several requests
queries = [
"What is Python?",
"Explain machine learning",
"Write a haiku about coding",
"What's the capital of France?",
"Describe quantum computing"
]
for query in queries:
result = monitor.chat(
query,
candidates=["openai/gpt-4o", "anthropic/claude-sonnet-4-5"],
metadata={"user_id": "user123", "session": "abc"}
)
print(f"Q: {query}")
print(f"Model: {result['model']}")
print(f"Routing latency: {result['metrics']['routing_latency_ms']}ms\n")
# Print summary
monitor.print_summary()
Advanced Monitoring
Comprehensive Metrics Collection
Collect detailed metrics for analysis:Copy
import requests
import json
from datetime import datetime
from collections import defaultdict, Counter
from typing import Optional, Dict, List
import statistics
class RouterAnalytics:
"""Advanced router monitoring and analytics."""
def __init__(self, api_key: str):
self.api_key = api_key
self.url = "https://api.edenai.run/v3/llm/chat/completions"
# Detailed logs
self.routing_events: List[Dict] = []
self.error_log: List[Dict] = []
# Metrics by model
self.metrics_by_model = defaultdict(lambda: {
"count": 0,
"latencies": [],
"response_lengths": [],
"errors": 0
})
# Metrics by candidate pool
self.metrics_by_pool = defaultdict(lambda: {
"count": 0,
"model_distribution": Counter()
})
def _pool_key(self, candidates: Optional[List[str]]) -> str:
"""Create key for candidate pool."""
if not candidates:
return "default"
return "|".join(sorted(candidates))
def chat(
self,
message: str,
candidates: Optional[List[str]] = None,
metadata: Optional[Dict] = None
) -> Dict:
"""Chat with comprehensive monitoring."""
start_time = datetime.now()
pool_key = self._pool_key(candidates)
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "@edenai",
"messages": [{"role": "user", "content": message}],
"stream": True
}
if candidates:
payload["router_candidates"] = candidates
try:
response = requests.post(
self.url,
headers=headers,
json=payload,
stream=True,
timeout=30
)
response.raise_for_status()
full_response = ""
selected_model = None
first_chunk_time = None
for line in response.iter_lines():
if line:
if not first_chunk_time:
first_chunk_time = datetime.now()
line_str = line.decode('utf-8')
if line_str.startswith('data: ') and line_str != 'data: [DONE]':
data = json.loads(line_str[6:])
if not selected_model and 'model' in data:
selected_model = data['model']
content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
full_response += content
end_time = datetime.now()
# Calculate metrics
routing_latency = (
(first_chunk_time - start_time).total_seconds() * 1000
if first_chunk_time else None
)
total_latency = (end_time - start_time).total_seconds() * 1000
response_length = len(full_response)
# Log event
event = {
"timestamp": start_time.isoformat(),
"message": message[:100],
"selected_model": selected_model,
"candidates": candidates,
"pool_key": pool_key,
"routing_latency_ms": routing_latency,
"total_latency_ms": total_latency,
"response_length": response_length,
"success": True,
"metadata": metadata or {}
}
self.routing_events.append(event)
# Update metrics
if selected_model:
self.metrics_by_model[selected_model]["count"] += 1
if routing_latency:
self.metrics_by_model[selected_model]["latencies"].append(routing_latency)
self.metrics_by_model[selected_model]["response_lengths"].append(response_length)
self.metrics_by_pool[pool_key]["count"] += 1
self.metrics_by_pool[pool_key]["model_distribution"][selected_model] += 1
return {
"success": True,
"response": full_response,
"model": selected_model,
"metrics": {
"routing_latency_ms": routing_latency,
"total_latency_ms": total_latency,
"response_length": response_length
}
}
except Exception as e:
end_time = datetime.now()
total_latency = (end_time - start_time).total_seconds() * 1000
# Log error
error_event = {
"timestamp": start_time.isoformat(),
"message": message[:100],
"candidates": candidates,
"pool_key": pool_key,
"error": str(e),
"latency_ms": total_latency,
"metadata": metadata or {}
}
self.error_log.append(error_event)
# Update error count for pool
self.metrics_by_pool[pool_key]["count"] += 1
return {
"success": False,
"error": str(e),
"metrics": {
"latency_ms": total_latency
}
}
def get_model_statistics(self) -> Dict:
"""Get statistics per model."""
stats = {}
for model, metrics in self.metrics_by_model.items():
latencies = metrics["latencies"]
response_lengths = metrics["response_lengths"]
stats[model] = {
"request_count": metrics["count"],
"error_count": metrics["errors"],
"avg_routing_latency_ms": (
round(statistics.mean(latencies), 2)
if latencies else None
),
"p50_routing_latency_ms": (
round(statistics.median(latencies), 2)
if latencies else None
),
"p95_routing_latency_ms": (
round(statistics.quantiles(latencies, n=20)[18], 2)
if len(latencies) > 10 else None
),
"avg_response_length": (
round(statistics.mean(response_lengths), 2)
if response_lengths else None
)
}
return stats
def get_pool_statistics(self) -> Dict:
"""Get statistics per candidate pool."""
stats = {}
for pool_key, metrics in self.metrics_by_pool.items():
total = metrics["count"]
distribution = metrics["model_distribution"]
stats[pool_key] = {
"total_requests": total,
"model_distribution": {
model: {
"count": count,
"percentage": round((count / total) * 100, 1)
}
for model, count in distribution.items()
}
}
return stats
def get_error_rate(self) -> float:
"""Calculate overall error rate."""
total = len(self.routing_events) + len(self.error_log)
if total == 0:
return 0.0
return (len(self.error_log) / total) * 100
def print_report(self):
"""Print comprehensive analytics report."""
print("\n" + "="*60)
print("ROUTER ANALYTICS REPORT")
print("="*60)
# Overall metrics
total_requests = len(self.routing_events) + len(self.error_log)
successful_requests = len(self.routing_events)
error_rate = self.get_error_rate()
print(f"\n📊 Overall Metrics")
print(f" Total requests: {total_requests}")
print(f" Successful: {successful_requests}")
print(f" Failed: {len(self.error_log)}")
print(f" Error rate: {error_rate:.2f}%")
# Model statistics
print(f"\n🤖 Model Performance")
model_stats = self.get_model_statistics()
for model, stats in sorted(
model_stats.items(),
key=lambda x: x[1]["request_count"],
reverse=True
):
print(f"\n {model}:")
print(f" Requests: {stats['request_count']}")
print(f" Avg routing latency: {stats['avg_routing_latency_ms']}ms")
if stats['p50_routing_latency_ms']:
print(f" P50 latency: {stats['p50_routing_latency_ms']}ms")
if stats['p95_routing_latency_ms']:
print(f" P95 latency: {stats['p95_routing_latency_ms']}ms")
print(f" Avg response length: {stats['avg_response_length']} chars")
# Pool statistics
print(f"\n🎯 Candidate Pool Performance")
pool_stats = self.get_pool_statistics()
for pool_key, stats in pool_stats.items():
print(f"\n Pool: {pool_key}")
print(f" Total requests: {stats['total_requests']}")
print(f" Model distribution:")
for model, dist in stats["model_distribution"].items():
print(f" {model}: {dist['count']} ({dist['percentage']}%)")
# Recent errors
if self.error_log:
print(f"\n❌ Recent Errors (last 5)")
for error in self.error_log[-5:]:
print(f"\n {error['timestamp']}")
print(f" Message: {error['message']}")
print(f" Error: {error['error']}")
print(f" Pool: {error['pool_key']}")
print("\n" + "="*60)
# Usage example
analytics = RouterAnalytics("YOUR_API_KEY")
# Simulate various requests
test_cases = [
("What is Python?", ["openai/gpt-4o", "anthropic/claude-sonnet-4-5"]),
("Explain quantum physics", ["anthropic/claude-opus-4-5", "openai/gpt-4o"]),
("Write a function", ["openai/gpt-4o", "anthropic/claude-sonnet-4-5"]),
("Tell me a joke", ["openai/gpt-4o-mini", "google/gemini-2.0-flash"]),
("Summarize AI trends", None), # Default routing
]
for message, candidates in test_cases:
result = analytics.chat(
message,
candidates=candidates,
metadata={"test_case": True}
)
if result["success"]:
print(f"✓ {message[:30]}... → {result['model']}")
else:
print(f"✗ {message[:30]}... → Error")
# Print full report
analytics.print_report()
Cost Tracking
Monitoring Routing Costs
Track estimated costs by model and over time:Copy
import requests
import json
from datetime import datetime, timedelta
from typing import Dict, List
from collections import defaultdict
class RouterCostTracker:
"""Track routing costs over time."""
# Estimated costs per 1K tokens (input + output averaged)
MODEL_COSTS = {
"openai/gpt-4o-mini": 0.0002,
"google/gemini-2.0-flash": 0.0002,
"anthropic/claude-haiku-4-5": 0.0005,
"openai/gpt-4o": 0.003,
"anthropic/claude-sonnet-4-5": 0.004,
"anthropic/claude-opus-4-5": 0.015,
"google/gemini-2.5-pro": 0.008,
}
def __init__(self, api_key: str):
self.api_key = api_key
self.url = "https://api.edenai.run/v3/llm/chat/completions"
# Cost tracking
self.costs_by_model = defaultdict(float)
self.costs_by_day = defaultdict(float)
self.request_log: List[Dict] = []
def estimate_tokens(self, text: str) -> int:
"""Rough token estimation (4 chars ≈ 1 token)."""
return len(text) // 4
def estimate_cost(self, model: str, input_text: str, output_text: str) -> float:
"""Estimate request cost."""
total_tokens = self.estimate_tokens(input_text + output_text)
cost_per_1k = self.MODEL_COSTS.get(model, 0.003) # Default to gpt-4o cost
return (total_tokens / 1000) * cost_per_1k
def chat(self, message: str, candidates: List[str] = None) -> Dict:
"""Chat with cost tracking."""
timestamp = datetime.now()
date_key = timestamp.date().isoformat()
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "@edenai",
"messages": [{"role": "user", "content": message}],
"stream": True
}
if candidates:
payload["router_candidates"] = candidates
response = requests.post(
self.url,
headers=headers,
json=payload,
stream=True
)
full_response = ""
selected_model = None
for line in response.iter_lines():
if line:
line_str = line.decode('utf-8')
if line_str.startswith('data: ') and line_str != 'data: [DONE]':
data = json.loads(line_str[6:])
if not selected_model and 'model' in data:
selected_model = data['model']
content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
full_response += content
# Calculate cost
estimated_cost = self.estimate_cost(selected_model, message, full_response)
# Track costs
self.costs_by_model[selected_model] += estimated_cost
self.costs_by_day[date_key] += estimated_cost
# Log request
log_entry = {
"timestamp": timestamp.isoformat(),
"date": date_key,
"message": message[:100],
"model": selected_model,
"candidates": candidates,
"estimated_cost": estimated_cost,
"input_length": len(message),
"output_length": len(full_response)
}
self.request_log.append(log_entry)
return {
"response": full_response,
"model": selected_model,
"cost": estimated_cost
}
def get_cost_summary(self) -> Dict:
"""Get cost summary."""
total_cost = sum(self.costs_by_model.values())
total_requests = len(self.request_log)
return {
"total_cost": round(total_cost, 4),
"total_requests": total_requests,
"avg_cost_per_request": round(total_cost / total_requests, 4) if total_requests > 0 else 0,
"costs_by_model": {
model: round(cost, 4)
for model, cost in sorted(
self.costs_by_model.items(),
key=lambda x: x[1],
reverse=True
)
},
"costs_by_day": {
date: round(cost, 4)
for date, cost in sorted(self.costs_by_day.items())
}
}
def print_cost_report(self):
"""Print cost analysis report."""
summary = self.get_cost_summary()
print("\n" + "="*60)
print("ROUTING COST ANALYSIS")
print("="*60)
print(f"\n💰 Overall")
print(f" Total cost: ${summary['total_cost']:.4f}")
print(f" Total requests: {summary['total_requests']}")
print(f" Average cost per request: ${summary['avg_cost_per_request']:.4f}")
print(f"\n🤖 Cost by Model")
for model, cost in summary['costs_by_model'].items():
percentage = (cost / summary['total_cost']) * 100 if summary['total_cost'] > 0 else 0
print(f" {model}: ${cost:.4f} ({percentage:.1f}%)")
print(f"\n📅 Cost by Day")
for date, cost in summary['costs_by_day'].items():
print(f" {date}: ${cost:.4f}")
# Cost optimization suggestions
print(f"\n💡 Optimization Suggestions")
most_expensive = max(
summary['costs_by_model'].items(),
key=lambda x: x[1]
)[0] if summary['costs_by_model'] else None
if most_expensive and "opus" in most_expensive.lower():
print(f" • Consider replacing {most_expensive} with Sonnet for cost savings")
if summary['avg_cost_per_request'] > 0.01:
print(f" • Average cost is high - consider using budget-tier candidates")
print("\n" + "="*60)
# Usage
tracker = RouterCostTracker("YOUR_API_KEY")
# Make requests
queries = [
("What is Python?", ["openai/gpt-4o", "anthropic/claude-sonnet-4-5"]),
("Explain quantum computing in detail", ["anthropic/claude-opus-4-5", "openai/gpt-4o"]),
("Hello", ["openai/gpt-4o-mini", "google/gemini-2.0-flash"]),
]
for message, candidates in queries:
result = tracker.chat(message, candidates)
print(f"Q: {message}")
print(f"Model: {result['model']}, Cost: ${result['cost']:.4f}\n")
# Print cost report
tracker.print_cost_report()
Integration with Monitoring Tools
Prometheus Metrics Export
Export metrics for Prometheus monitoring:Copy
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import requests
import json
from typing import Optional, List
# Define Prometheus metrics
routing_requests_total = Counter(
'eden_ai_routing_requests_total',
'Total routing requests',
['model', 'candidate_pool']
)
routing_errors_total = Counter(
'eden_ai_routing_errors_total',
'Total routing errors',
['error_type']
)
routing_latency_seconds = Histogram(
'eden_ai_routing_latency_seconds',
'Routing decision latency in seconds',
['model']
)
total_latency_seconds = Histogram(
'eden_ai_total_latency_seconds',
'Total request latency in seconds',
['model']
)
response_length_bytes = Histogram(
'eden_ai_response_length_bytes',
'Response length in bytes',
['model']
)
active_models = Gauge(
'eden_ai_active_models',
'Number of different models being used'
)
class PrometheusRouterMonitor:
"""Router with Prometheus metrics export."""
def __init__(self, api_key: str):
self.api_key = api_key
self.url = "https://api.edenai.run/v3/llm/chat/completions"
self.active_models_set = set()
def _pool_key(self, candidates: Optional[List[str]]) -> str:
"""Create key for candidate pool."""
if not candidates:
return "default"
return ",".join(sorted(candidates)[:3]) # Limit to 3 for cardinality
def chat(self, message: str, candidates: Optional[List[str]] = None) -> dict:
"""Chat with Prometheus metrics."""
pool_key = self._pool_key(candidates)
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "@edenai",
"messages": [{"role": "user", "content": message}],
"stream": True
}
if candidates:
payload["router_candidates"] = candidates
try:
import time
start_time = time.time()
response = requests.post(
self.url,
headers=headers,
json=payload,
stream=True,
timeout=30
)
response.raise_for_status()
full_response = ""
selected_model = None
first_chunk_time = None
for line in response.iter_lines():
if line:
if not first_chunk_time:
first_chunk_time = time.time()
line_str = line.decode('utf-8')
if line_str.startswith('data: ') and line_str != 'data: [DONE]':
data = json.loads(line_str[6:])
if not selected_model and 'model' in data:
selected_model = data['model']
content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
full_response += content
end_time = time.time()
# Record metrics
routing_requests_total.labels(
model=selected_model,
candidate_pool=pool_key
).inc()
if first_chunk_time:
routing_latency = first_chunk_time - start_time
routing_latency_seconds.labels(model=selected_model).observe(routing_latency)
total_latency = end_time - start_time
total_latency_seconds.labels(model=selected_model).observe(total_latency)
response_length_bytes.labels(model=selected_model).observe(len(full_response))
# Track active models
self.active_models_set.add(selected_model)
active_models.set(len(self.active_models_set))
return {
"success": True,
"response": full_response,
"model": selected_model
}
except Exception as e:
# Record error
error_type = type(e).__name__
routing_errors_total.labels(error_type=error_type).inc()
return {
"success": False,
"error": str(e)
}
def export_metrics(self) -> str:
"""Export metrics in Prometheus format."""
return generate_latest().decode('utf-8')
# Usage
monitor = PrometheusRouterMonitor("YOUR_API_KEY")
# Make some requests
for i in range(10):
result = monitor.chat(
f"Question {i+1}: Explain AI",
candidates=["openai/gpt-4o", "anthropic/claude-sonnet-4-5"]
)
# Export metrics (serve this on /metrics endpoint)
metrics = monitor.export_metrics()
print(metrics)
DataDog Integration
Send routing metrics to DataDog:Copy
from datadog import initialize, statsd
import requests
import json
import time
from typing import Optional, List
# Initialize DataDog
initialize(
statsd_host='127.0.0.1',
statsd_port=8125
)
class DataDogRouterMonitor:
"""Router with DataDog metrics."""
def __init__(self, api_key: str):
self.api_key = api_key
self.url = "https://api.edenai.run/v3/llm/chat/completions"
def chat(
self,
message: str,
candidates: Optional[List[str]] = None,
tags: Optional[List[str]] = None
) -> dict:
"""Chat with DataDog metrics."""
base_tags = tags or []
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "@edenai",
"messages": [{"role": "user", "content": message}],
"stream": True
}
if candidates:
payload["router_candidates"] = candidates
base_tags.append(f"candidate_count:{len(candidates)}")
try:
start_time = time.time()
# Increment request counter
statsd.increment(
'eden_ai.routing.requests',
tags=base_tags
)
response = requests.post(
self.url,
headers=headers,
json=payload,
stream=True,
timeout=30
)
response.raise_for_status()
full_response = ""
selected_model = None
first_chunk_time = None
for line in response.iter_lines():
if line:
if not first_chunk_time:
first_chunk_time = time.time()
line_str = line.decode('utf-8')
if line_str.startswith('data: ') and line_str != 'data: [DONE]':
data = json.loads(line_str[6:])
if not selected_model and 'model' in data:
selected_model = data['model']
content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
full_response += content
end_time = time.time()
# Record metrics with model tag
model_tags = base_tags + [f"model:{selected_model}"]
# Routing latency
if first_chunk_time:
routing_latency_ms = (first_chunk_time - start_time) * 1000
statsd.histogram(
'eden_ai.routing.latency',
routing_latency_ms,
tags=model_tags
)
# Total latency
total_latency_ms = (end_time - start_time) * 1000
statsd.histogram(
'eden_ai.routing.total_latency',
total_latency_ms,
tags=model_tags
)
# Response length
statsd.histogram(
'eden_ai.routing.response_length',
len(full_response),
tags=model_tags
)
# Success counter
statsd.increment(
'eden_ai.routing.success',
tags=model_tags
)
return {
"success": True,
"response": full_response,
"model": selected_model
}
except Exception as e:
# Record error
error_tags = base_tags + [f"error_type:{type(e).__name__}"]
statsd.increment(
'eden_ai.routing.errors',
tags=error_tags
)
return {
"success": False,
"error": str(e)
}
# Usage
monitor = DataDogRouterMonitor("YOUR_API_KEY")
result = monitor.chat(
"Explain AI",
candidates=["openai/gpt-4o", "anthropic/claude-sonnet-4-5"],
tags=["env:production", "service:chatbot"]
)
Alerting and Notifications
Set Up Alerts for Routing Issues
Monitor critical metrics and send alerts:Copy
import requests
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Callable
from collections import deque
class RouterAlertManager:
"""Monitor routing and send alerts."""
def __init__(
self,
api_key: str,
alert_webhook: str = None, # Slack/Discord webhook
alert_email: str = None
):
self.api_key = api_key
self.url = "https://api.edenai.run/v3/llm/chat/completions"
self.alert_webhook = alert_webhook
self.alert_email = alert_email
# Metrics windows (last N requests)
self.recent_latencies = deque(maxlen=100)
self.recent_errors = deque(maxlen=100)
self.error_count_5min = 0
self.last_error_reset = time.time()
# Alert thresholds
self.max_error_rate = 0.05 # 5%
self.max_avg_latency_ms = 2000
self.max_errors_5min = 10
def _send_alert(self, alert_type: str, message: str, severity: str = "warning"):
"""Send alert notification."""
alert = {
"type": alert_type,
"severity": severity,
"message": message,
"timestamp": datetime.now().isoformat()
}
print(f"🚨 ALERT [{severity.upper()}]: {message}")
# Send to webhook (Slack/Discord)
if self.alert_webhook:
try:
requests.post(
self.alert_webhook,
json={"text": f"🚨 {alert_type}: {message}"},
timeout=5
)
except:
pass
def _check_alerts(self):
"""Check if any alert conditions are met."""
# Reset error counter every 5 minutes
if time.time() - self.last_error_reset > 300:
self.error_count_5min = 0
self.last_error_reset = time.time()
# Check error rate
if len(self.recent_errors) >= 20:
error_rate = sum(self.recent_errors) / len(self.recent_errors)
if error_rate > self.max_error_rate:
self._send_alert(
"High Error Rate",
f"Routing error rate is {error_rate*100:.1f}% (threshold: {self.max_error_rate*100}%)",
severity="critical"
)
# Check 5-minute error spike
if self.error_count_5min >= self.max_errors_5min:
self._send_alert(
"Error Spike",
f"{self.error_count_5min} errors in last 5 minutes",
severity="critical"
)
# Check latency
if len(self.recent_latencies) >= 20:
avg_latency = sum(self.recent_latencies) / len(self.recent_latencies)
if avg_latency > self.max_avg_latency_ms:
self._send_alert(
"High Latency",
f"Average routing latency is {avg_latency:.0f}ms (threshold: {self.max_avg_latency_ms}ms)",
severity="warning"
)
def chat(self, message: str, candidates: List[str] = None) -> dict:
"""Chat with alerting."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "@edenai",
"messages": [{"role": "user", "content": message}],
"stream": True
}
if candidates:
payload["router_candidates"] = candidates
try:
start_time = time.time()
response = requests.post(
self.url,
headers=headers,
json=payload,
stream=True,
timeout=30
)
response.raise_for_status()
full_response = ""
selected_model = None
first_chunk_time = None
for line in response.iter_lines():
if line:
if not first_chunk_time:
first_chunk_time = time.time()
line_str = line.decode('utf-8')
if line_str.startswith('data: ') and line_str != 'data: [DONE]':
data = json.loads(line_str[6:])
if not selected_model and 'model' in data:
selected_model = data['model']
content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
full_response += content
end_time = time.time()
# Record metrics
if first_chunk_time:
latency_ms = (first_chunk_time - start_time) * 1000
self.recent_latencies.append(latency_ms)
self.recent_errors.append(0) # Success
# Check alerts
self._check_alerts()
return {
"success": True,
"response": full_response,
"model": selected_model
}
except Exception as e:
# Record error
self.recent_errors.append(1)
self.error_count_5min += 1
# Check alerts
self._check_alerts()
return {
"success": False,
"error": str(e)
}
# Usage
alert_manager = RouterAlertManager(
"YOUR_API_KEY",
alert_webhook="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
)
# Simulate requests (some will trigger alerts if thresholds are exceeded)
for i in range(50):
result = alert_manager.chat(
f"Question {i+1}",
candidates=["openai/gpt-4o", "anthropic/claude-sonnet-4-5"]
)
time.sleep(0.1)
Best Practices
Monitoring Checklist
✅ Essential Metrics- Model selection distribution
- Routing latency (P50, P95, P99)
- Error rates and types
- Request volume over time
- Cost per model and total
- Track latency trends
- Monitor for latency regressions
- Alert on high latency (>2s routing time)
- Compare routed vs. fixed model performance
- Track daily/weekly spending
- Monitor cost per model
- Alert on budget thresholds
- Analyze cost optimization opportunities
- Track error rates by pool
- Monitor routing failures
- Set up alerts for error spikes
- Log errors for debugging
- Monitoring without actionable alerts
- Ignoring cost patterns
- Missing latency regressions
- Not tracking model distribution
Next Steps
- Getting Started - Learn router basics
- Advanced Usage - Master advanced patterns
- Cost Optimization Tutorial - Complete cost optimization guide
- LLM Smart Routing - Practical LLM examples