Skip to main content

Monitoring Router Performance

Learn how to track, analyze, and optimize router performance in production applications.

Overview

Effective monitoring is essential for understanding how smart routing behaves in production. This guide covers tracking routing decisions, analyzing performance metrics, and optimizing based on real usage data. What you’ll learn:
  • Tracking routing decisions and model selection
  • Measuring routing latency and overhead
  • Analyzing cost patterns
  • Identifying routing failures
  • Building monitoring dashboards
  • Optimizing based on metrics

Basic Monitoring

Tracking Selected Models

Track which models the router selects for your requests:
import requests
import json
from datetime import datetime
from typing import Optional

class RouterMonitor:
    """Simple router monitoring with tracking."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.url = "https://api.edenai.run/v3/llm/chat/completions"
        self.routing_log = []

    def chat(
        self,
        message: str,
        candidates: list[str] = None,
        metadata: dict = None
    ) -> dict:
        """Chat with routing and tracking."""

        start_time = datetime.now()

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        payload = {
            "model": "@edenai",
            "messages": [{"role": "user", "content": message}],
            "stream": True
        }

        if candidates:
            payload["router_candidates"] = candidates

        response = requests.post(
            self.url,
            headers=headers,
            json=payload,
            stream=True
        )

        full_response = ""
        selected_model = None
        first_chunk_time = None

        for line in response.iter_lines():
            if line:
                if not first_chunk_time:
                    first_chunk_time = datetime.now()

                line_str = line.decode('utf-8')
                if line_str.startswith('data: ') and line_str != 'data: [DONE]':
                    data = json.loads(line_str[6:])

                    if not selected_model and 'model' in data:
                        selected_model = data['model']

                    content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
                    full_response += content

        end_time = datetime.now()

        # Calculate latencies
        routing_latency = (
            (first_chunk_time - start_time).total_seconds() * 1000
            if first_chunk_time else None
        )
        total_latency = (end_time - start_time).total_seconds() * 1000

        # Log routing decision
        log_entry = {
            "timestamp": start_time.isoformat(),
            "message": message[:100] + "..." if len(message) > 100 else message,
            "selected_model": selected_model,
            "candidates": candidates or "default",
            "routing_latency_ms": round(routing_latency, 2) if routing_latency else None,
            "total_latency_ms": round(total_latency, 2),
            "response_length": len(full_response),
            "metadata": metadata or {}
        }

        self.routing_log.append(log_entry)

        return {
            "response": full_response,
            "model": selected_model,
            "metrics": {
                "routing_latency_ms": log_entry["routing_latency_ms"],
                "total_latency_ms": log_entry["total_latency_ms"]
            }
        }

    def get_log(self, limit: int = None) -> list[dict]:
        """Get routing log."""
        if limit:
            return self.routing_log[-limit:]
        return self.routing_log.copy()

    def print_summary(self):
        """Print routing summary."""
        if not self.routing_log:
            print("No routing data available")
            return

        from collections import Counter

        model_counts = Counter(entry["selected_model"] for entry in self.routing_log)
        total_requests = len(self.routing_log)

        avg_routing_latency = sum(
            entry["routing_latency_ms"]
            for entry in self.routing_log
            if entry["routing_latency_ms"]
        ) / total_requests

        avg_total_latency = sum(
            entry["total_latency_ms"]
            for entry in self.routing_log
        ) / total_requests

        print("\n=== Routing Summary ===")
        print(f"Total requests: {total_requests}")
        print(f"\nModel distribution:")
        for model, count in model_counts.most_common():
            percentage = (count / total_requests) * 100
            print(f"  {model}: {count} ({percentage:.1f}%)")

        print(f"\nAverage routing latency: {avg_routing_latency:.0f}ms")
        print(f"Average total latency: {avg_total_latency:.0f}ms")

# Usage
monitor = RouterMonitor("YOUR_API_KEY")

# Make several requests
queries = [
    "What is Python?",
    "Explain machine learning",
    "Write a haiku about coding",
    "What's the capital of France?",
    "Describe quantum computing"
]

for query in queries:
    result = monitor.chat(
        query,
        candidates=["openai/gpt-4o", "anthropic/claude-sonnet-4-5"],
        metadata={"user_id": "user123", "session": "abc"}
    )
    print(f"Q: {query}")
    print(f"Model: {result['model']}")
    print(f"Routing latency: {result['metrics']['routing_latency_ms']}ms\n")

# Print summary
monitor.print_summary()

Advanced Monitoring

Comprehensive Metrics Collection

Collect detailed metrics for analysis:
import requests
import json
from datetime import datetime
from collections import defaultdict, Counter
from typing import Optional, Dict, List
import statistics

class RouterAnalytics:
    """Advanced router monitoring and analytics."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.url = "https://api.edenai.run/v3/llm/chat/completions"

        # Detailed logs
        self.routing_events: List[Dict] = []
        self.error_log: List[Dict] = []

        # Metrics by model
        self.metrics_by_model = defaultdict(lambda: {
            "count": 0,
            "latencies": [],
            "response_lengths": [],
            "errors": 0
        })

        # Metrics by candidate pool
        self.metrics_by_pool = defaultdict(lambda: {
            "count": 0,
            "model_distribution": Counter()
        })

    def _pool_key(self, candidates: Optional[List[str]]) -> str:
        """Create key for candidate pool."""
        if not candidates:
            return "default"
        return "|".join(sorted(candidates))

    def chat(
        self,
        message: str,
        candidates: Optional[List[str]] = None,
        metadata: Optional[Dict] = None
    ) -> Dict:
        """Chat with comprehensive monitoring."""

        start_time = datetime.now()
        pool_key = self._pool_key(candidates)

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        payload = {
            "model": "@edenai",
            "messages": [{"role": "user", "content": message}],
            "stream": True
        }

        if candidates:
            payload["router_candidates"] = candidates

        try:
            response = requests.post(
                self.url,
                headers=headers,
                json=payload,
                stream=True,
                timeout=30
            )
            response.raise_for_status()

            full_response = ""
            selected_model = None
            first_chunk_time = None

            for line in response.iter_lines():
                if line:
                    if not first_chunk_time:
                        first_chunk_time = datetime.now()

                    line_str = line.decode('utf-8')
                    if line_str.startswith('data: ') and line_str != 'data: [DONE]':
                        data = json.loads(line_str[6:])

                        if not selected_model and 'model' in data:
                            selected_model = data['model']

                        content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
                        full_response += content

            end_time = datetime.now()

            # Calculate metrics
            routing_latency = (
                (first_chunk_time - start_time).total_seconds() * 1000
                if first_chunk_time else None
            )
            total_latency = (end_time - start_time).total_seconds() * 1000
            response_length = len(full_response)

            # Log event
            event = {
                "timestamp": start_time.isoformat(),
                "message": message[:100],
                "selected_model": selected_model,
                "candidates": candidates,
                "pool_key": pool_key,
                "routing_latency_ms": routing_latency,
                "total_latency_ms": total_latency,
                "response_length": response_length,
                "success": True,
                "metadata": metadata or {}
            }
            self.routing_events.append(event)

            # Update metrics
            if selected_model:
                self.metrics_by_model[selected_model]["count"] += 1
                if routing_latency:
                    self.metrics_by_model[selected_model]["latencies"].append(routing_latency)
                self.metrics_by_model[selected_model]["response_lengths"].append(response_length)

            self.metrics_by_pool[pool_key]["count"] += 1
            self.metrics_by_pool[pool_key]["model_distribution"][selected_model] += 1

            return {
                "success": True,
                "response": full_response,
                "model": selected_model,
                "metrics": {
                    "routing_latency_ms": routing_latency,
                    "total_latency_ms": total_latency,
                    "response_length": response_length
                }
            }

        except Exception as e:
            end_time = datetime.now()
            total_latency = (end_time - start_time).total_seconds() * 1000

            # Log error
            error_event = {
                "timestamp": start_time.isoformat(),
                "message": message[:100],
                "candidates": candidates,
                "pool_key": pool_key,
                "error": str(e),
                "latency_ms": total_latency,
                "metadata": metadata or {}
            }
            self.error_log.append(error_event)

            # Update error count for pool
            self.metrics_by_pool[pool_key]["count"] += 1

            return {
                "success": False,
                "error": str(e),
                "metrics": {
                    "latency_ms": total_latency
                }
            }

    def get_model_statistics(self) -> Dict:
        """Get statistics per model."""
        stats = {}

        for model, metrics in self.metrics_by_model.items():
            latencies = metrics["latencies"]
            response_lengths = metrics["response_lengths"]

            stats[model] = {
                "request_count": metrics["count"],
                "error_count": metrics["errors"],
                "avg_routing_latency_ms": (
                    round(statistics.mean(latencies), 2)
                    if latencies else None
                ),
                "p50_routing_latency_ms": (
                    round(statistics.median(latencies), 2)
                    if latencies else None
                ),
                "p95_routing_latency_ms": (
                    round(statistics.quantiles(latencies, n=20)[18], 2)
                    if len(latencies) > 10 else None
                ),
                "avg_response_length": (
                    round(statistics.mean(response_lengths), 2)
                    if response_lengths else None
                )
            }

        return stats

    def get_pool_statistics(self) -> Dict:
        """Get statistics per candidate pool."""
        stats = {}

        for pool_key, metrics in self.metrics_by_pool.items():
            total = metrics["count"]
            distribution = metrics["model_distribution"]

            stats[pool_key] = {
                "total_requests": total,
                "model_distribution": {
                    model: {
                        "count": count,
                        "percentage": round((count / total) * 100, 1)
                    }
                    for model, count in distribution.items()
                }
            }

        return stats

    def get_error_rate(self) -> float:
        """Calculate overall error rate."""
        total = len(self.routing_events) + len(self.error_log)
        if total == 0:
            return 0.0
        return (len(self.error_log) / total) * 100

    def print_report(self):
        """Print comprehensive analytics report."""
        print("\n" + "="*60)
        print("ROUTER ANALYTICS REPORT")
        print("="*60)

        # Overall metrics
        total_requests = len(self.routing_events) + len(self.error_log)
        successful_requests = len(self.routing_events)
        error_rate = self.get_error_rate()

        print(f"\n📊 Overall Metrics")
        print(f"  Total requests: {total_requests}")
        print(f"  Successful: {successful_requests}")
        print(f"  Failed: {len(self.error_log)}")
        print(f"  Error rate: {error_rate:.2f}%")

        # Model statistics
        print(f"\n🤖 Model Performance")
        model_stats = self.get_model_statistics()

        for model, stats in sorted(
            model_stats.items(),
            key=lambda x: x[1]["request_count"],
            reverse=True
        ):
            print(f"\n  {model}:")
            print(f"    Requests: {stats['request_count']}")
            print(f"    Avg routing latency: {stats['avg_routing_latency_ms']}ms")
            if stats['p50_routing_latency_ms']:
                print(f"    P50 latency: {stats['p50_routing_latency_ms']}ms")
            if stats['p95_routing_latency_ms']:
                print(f"    P95 latency: {stats['p95_routing_latency_ms']}ms")
            print(f"    Avg response length: {stats['avg_response_length']} chars")

        # Pool statistics
        print(f"\n🎯 Candidate Pool Performance")
        pool_stats = self.get_pool_statistics()

        for pool_key, stats in pool_stats.items():
            print(f"\n  Pool: {pool_key}")
            print(f"    Total requests: {stats['total_requests']}")
            print(f"    Model distribution:")
            for model, dist in stats["model_distribution"].items():
                print(f"      {model}: {dist['count']} ({dist['percentage']}%)")

        # Recent errors
        if self.error_log:
            print(f"\n❌ Recent Errors (last 5)")
            for error in self.error_log[-5:]:
                print(f"\n  {error['timestamp']}")
                print(f"    Message: {error['message']}")
                print(f"    Error: {error['error']}")
                print(f"    Pool: {error['pool_key']}")

        print("\n" + "="*60)

# Usage example
analytics = RouterAnalytics("YOUR_API_KEY")

# Simulate various requests
test_cases = [
    ("What is Python?", ["openai/gpt-4o", "anthropic/claude-sonnet-4-5"]),
    ("Explain quantum physics", ["anthropic/claude-opus-4-5", "openai/gpt-4o"]),
    ("Write a function", ["openai/gpt-4o", "anthropic/claude-sonnet-4-5"]),
    ("Tell me a joke", ["openai/gpt-4o-mini", "google/gemini-2.0-flash"]),
    ("Summarize AI trends", None),  # Default routing
]

for message, candidates in test_cases:
    result = analytics.chat(
        message,
        candidates=candidates,
        metadata={"test_case": True}
    )
    if result["success"]:
        print(f"✓ {message[:30]}... → {result['model']}")
    else:
        print(f"✗ {message[:30]}... → Error")

# Print full report
analytics.print_report()

Cost Tracking

Monitoring Routing Costs

Track estimated costs by model and over time:
import requests
import json
from datetime import datetime, timedelta
from typing import Dict, List
from collections import defaultdict

class RouterCostTracker:
    """Track routing costs over time."""

    # Estimated costs per 1K tokens (input + output averaged)
    MODEL_COSTS = {
        "openai/gpt-4o-mini": 0.0002,
        "google/gemini-2.0-flash": 0.0002,
        "anthropic/claude-haiku-4-5": 0.0005,
        "openai/gpt-4o": 0.003,
        "anthropic/claude-sonnet-4-5": 0.004,
        "anthropic/claude-opus-4-5": 0.015,
        "google/gemini-2.5-pro": 0.008,
    }

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.url = "https://api.edenai.run/v3/llm/chat/completions"

        # Cost tracking
        self.costs_by_model = defaultdict(float)
        self.costs_by_day = defaultdict(float)
        self.request_log: List[Dict] = []

    def estimate_tokens(self, text: str) -> int:
        """Rough token estimation (4 chars ≈ 1 token)."""
        return len(text) // 4

    def estimate_cost(self, model: str, input_text: str, output_text: str) -> float:
        """Estimate request cost."""
        total_tokens = self.estimate_tokens(input_text + output_text)
        cost_per_1k = self.MODEL_COSTS.get(model, 0.003)  # Default to gpt-4o cost
        return (total_tokens / 1000) * cost_per_1k

    def chat(self, message: str, candidates: List[str] = None) -> Dict:
        """Chat with cost tracking."""
        timestamp = datetime.now()
        date_key = timestamp.date().isoformat()

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        payload = {
            "model": "@edenai",
            "messages": [{"role": "user", "content": message}],
            "stream": True
        }

        if candidates:
            payload["router_candidates"] = candidates

        response = requests.post(
            self.url,
            headers=headers,
            json=payload,
            stream=True
        )

        full_response = ""
        selected_model = None

        for line in response.iter_lines():
            if line:
                line_str = line.decode('utf-8')
                if line_str.startswith('data: ') and line_str != 'data: [DONE]':
                    data = json.loads(line_str[6:])

                    if not selected_model and 'model' in data:
                        selected_model = data['model']

                    content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
                    full_response += content

        # Calculate cost
        estimated_cost = self.estimate_cost(selected_model, message, full_response)

        # Track costs
        self.costs_by_model[selected_model] += estimated_cost
        self.costs_by_day[date_key] += estimated_cost

        # Log request
        log_entry = {
            "timestamp": timestamp.isoformat(),
            "date": date_key,
            "message": message[:100],
            "model": selected_model,
            "candidates": candidates,
            "estimated_cost": estimated_cost,
            "input_length": len(message),
            "output_length": len(full_response)
        }
        self.request_log.append(log_entry)

        return {
            "response": full_response,
            "model": selected_model,
            "cost": estimated_cost
        }

    def get_cost_summary(self) -> Dict:
        """Get cost summary."""
        total_cost = sum(self.costs_by_model.values())
        total_requests = len(self.request_log)

        return {
            "total_cost": round(total_cost, 4),
            "total_requests": total_requests,
            "avg_cost_per_request": round(total_cost / total_requests, 4) if total_requests > 0 else 0,
            "costs_by_model": {
                model: round(cost, 4)
                for model, cost in sorted(
                    self.costs_by_model.items(),
                    key=lambda x: x[1],
                    reverse=True
                )
            },
            "costs_by_day": {
                date: round(cost, 4)
                for date, cost in sorted(self.costs_by_day.items())
            }
        }

    def print_cost_report(self):
        """Print cost analysis report."""
        summary = self.get_cost_summary()

        print("\n" + "="*60)
        print("ROUTING COST ANALYSIS")
        print("="*60)

        print(f"\n💰 Overall")
        print(f"  Total cost: ${summary['total_cost']:.4f}")
        print(f"  Total requests: {summary['total_requests']}")
        print(f"  Average cost per request: ${summary['avg_cost_per_request']:.4f}")

        print(f"\n🤖 Cost by Model")
        for model, cost in summary['costs_by_model'].items():
            percentage = (cost / summary['total_cost']) * 100 if summary['total_cost'] > 0 else 0
            print(f"  {model}: ${cost:.4f} ({percentage:.1f}%)")

        print(f"\n📅 Cost by Day")
        for date, cost in summary['costs_by_day'].items():
            print(f"  {date}: ${cost:.4f}")

        # Cost optimization suggestions
        print(f"\n💡 Optimization Suggestions")
        most_expensive = max(
            summary['costs_by_model'].items(),
            key=lambda x: x[1]
        )[0] if summary['costs_by_model'] else None

        if most_expensive and "opus" in most_expensive.lower():
            print(f"  • Consider replacing {most_expensive} with Sonnet for cost savings")
        if summary['avg_cost_per_request'] > 0.01:
            print(f"  • Average cost is high - consider using budget-tier candidates")

        print("\n" + "="*60)

# Usage
tracker = RouterCostTracker("YOUR_API_KEY")

# Make requests
queries = [
    ("What is Python?", ["openai/gpt-4o", "anthropic/claude-sonnet-4-5"]),
    ("Explain quantum computing in detail", ["anthropic/claude-opus-4-5", "openai/gpt-4o"]),
    ("Hello", ["openai/gpt-4o-mini", "google/gemini-2.0-flash"]),
]

for message, candidates in queries:
    result = tracker.chat(message, candidates)
    print(f"Q: {message}")
    print(f"Model: {result['model']}, Cost: ${result['cost']:.4f}\n")

# Print cost report
tracker.print_cost_report()

Integration with Monitoring Tools

Prometheus Metrics Export

Export metrics for Prometheus monitoring:
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import requests
import json
from typing import Optional, List

# Define Prometheus metrics
routing_requests_total = Counter(
    'eden_ai_routing_requests_total',
    'Total routing requests',
    ['model', 'candidate_pool']
)

routing_errors_total = Counter(
    'eden_ai_routing_errors_total',
    'Total routing errors',
    ['error_type']
)

routing_latency_seconds = Histogram(
    'eden_ai_routing_latency_seconds',
    'Routing decision latency in seconds',
    ['model']
)

total_latency_seconds = Histogram(
    'eden_ai_total_latency_seconds',
    'Total request latency in seconds',
    ['model']
)

response_length_bytes = Histogram(
    'eden_ai_response_length_bytes',
    'Response length in bytes',
    ['model']
)

active_models = Gauge(
    'eden_ai_active_models',
    'Number of different models being used'
)

class PrometheusRouterMonitor:
    """Router with Prometheus metrics export."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.url = "https://api.edenai.run/v3/llm/chat/completions"
        self.active_models_set = set()

    def _pool_key(self, candidates: Optional[List[str]]) -> str:
        """Create key for candidate pool."""
        if not candidates:
            return "default"
        return ",".join(sorted(candidates)[:3])  # Limit to 3 for cardinality

    def chat(self, message: str, candidates: Optional[List[str]] = None) -> dict:
        """Chat with Prometheus metrics."""
        pool_key = self._pool_key(candidates)

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        payload = {
            "model": "@edenai",
            "messages": [{"role": "user", "content": message}],
            "stream": True
        }

        if candidates:
            payload["router_candidates"] = candidates

        try:
            import time
            start_time = time.time()

            response = requests.post(
                self.url,
                headers=headers,
                json=payload,
                stream=True,
                timeout=30
            )
            response.raise_for_status()

            full_response = ""
            selected_model = None
            first_chunk_time = None

            for line in response.iter_lines():
                if line:
                    if not first_chunk_time:
                        first_chunk_time = time.time()

                    line_str = line.decode('utf-8')
                    if line_str.startswith('data: ') and line_str != 'data: [DONE]':
                        data = json.loads(line_str[6:])

                        if not selected_model and 'model' in data:
                            selected_model = data['model']

                        content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
                        full_response += content

            end_time = time.time()

            # Record metrics
            routing_requests_total.labels(
                model=selected_model,
                candidate_pool=pool_key
            ).inc()

            if first_chunk_time:
                routing_latency = first_chunk_time - start_time
                routing_latency_seconds.labels(model=selected_model).observe(routing_latency)

            total_latency = end_time - start_time
            total_latency_seconds.labels(model=selected_model).observe(total_latency)

            response_length_bytes.labels(model=selected_model).observe(len(full_response))

            # Track active models
            self.active_models_set.add(selected_model)
            active_models.set(len(self.active_models_set))

            return {
                "success": True,
                "response": full_response,
                "model": selected_model
            }

        except Exception as e:
            # Record error
            error_type = type(e).__name__
            routing_errors_total.labels(error_type=error_type).inc()

            return {
                "success": False,
                "error": str(e)
            }

    def export_metrics(self) -> str:
        """Export metrics in Prometheus format."""
        return generate_latest().decode('utf-8')

# Usage
monitor = PrometheusRouterMonitor("YOUR_API_KEY")

# Make some requests
for i in range(10):
    result = monitor.chat(
        f"Question {i+1}: Explain AI",
        candidates=["openai/gpt-4o", "anthropic/claude-sonnet-4-5"]
    )

# Export metrics (serve this on /metrics endpoint)
metrics = monitor.export_metrics()
print(metrics)

DataDog Integration

Send routing metrics to DataDog:
from datadog import initialize, statsd
import requests
import json
import time
from typing import Optional, List

# Initialize DataDog
initialize(
    statsd_host='127.0.0.1',
    statsd_port=8125
)

class DataDogRouterMonitor:
    """Router with DataDog metrics."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.url = "https://api.edenai.run/v3/llm/chat/completions"

    def chat(
        self,
        message: str,
        candidates: Optional[List[str]] = None,
        tags: Optional[List[str]] = None
    ) -> dict:
        """Chat with DataDog metrics."""

        base_tags = tags or []

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        payload = {
            "model": "@edenai",
            "messages": [{"role": "user", "content": message}],
            "stream": True
        }

        if candidates:
            payload["router_candidates"] = candidates
            base_tags.append(f"candidate_count:{len(candidates)}")

        try:
            start_time = time.time()

            # Increment request counter
            statsd.increment(
                'eden_ai.routing.requests',
                tags=base_tags
            )

            response = requests.post(
                self.url,
                headers=headers,
                json=payload,
                stream=True,
                timeout=30
            )
            response.raise_for_status()

            full_response = ""
            selected_model = None
            first_chunk_time = None

            for line in response.iter_lines():
                if line:
                    if not first_chunk_time:
                        first_chunk_time = time.time()

                    line_str = line.decode('utf-8')
                    if line_str.startswith('data: ') and line_str != 'data: [DONE]':
                        data = json.loads(line_str[6:])

                        if not selected_model and 'model' in data:
                            selected_model = data['model']

                        content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
                        full_response += content

            end_time = time.time()

            # Record metrics with model tag
            model_tags = base_tags + [f"model:{selected_model}"]

            # Routing latency
            if first_chunk_time:
                routing_latency_ms = (first_chunk_time - start_time) * 1000
                statsd.histogram(
                    'eden_ai.routing.latency',
                    routing_latency_ms,
                    tags=model_tags
                )

            # Total latency
            total_latency_ms = (end_time - start_time) * 1000
            statsd.histogram(
                'eden_ai.routing.total_latency',
                total_latency_ms,
                tags=model_tags
            )

            # Response length
            statsd.histogram(
                'eden_ai.routing.response_length',
                len(full_response),
                tags=model_tags
            )

            # Success counter
            statsd.increment(
                'eden_ai.routing.success',
                tags=model_tags
            )

            return {
                "success": True,
                "response": full_response,
                "model": selected_model
            }

        except Exception as e:
            # Record error
            error_tags = base_tags + [f"error_type:{type(e).__name__}"]
            statsd.increment(
                'eden_ai.routing.errors',
                tags=error_tags
            )

            return {
                "success": False,
                "error": str(e)
            }

# Usage
monitor = DataDogRouterMonitor("YOUR_API_KEY")

result = monitor.chat(
    "Explain AI",
    candidates=["openai/gpt-4o", "anthropic/claude-sonnet-4-5"],
    tags=["env:production", "service:chatbot"]
)

Alerting and Notifications

Set Up Alerts for Routing Issues

Monitor critical metrics and send alerts:
import requests
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Callable
from collections import deque

class RouterAlertManager:
    """Monitor routing and send alerts."""

    def __init__(
        self,
        api_key: str,
        alert_webhook: str = None,  # Slack/Discord webhook
        alert_email: str = None
    ):
        self.api_key = api_key
        self.url = "https://api.edenai.run/v3/llm/chat/completions"
        self.alert_webhook = alert_webhook
        self.alert_email = alert_email

        # Metrics windows (last N requests)
        self.recent_latencies = deque(maxlen=100)
        self.recent_errors = deque(maxlen=100)
        self.error_count_5min = 0
        self.last_error_reset = time.time()

        # Alert thresholds
        self.max_error_rate = 0.05  # 5%
        self.max_avg_latency_ms = 2000
        self.max_errors_5min = 10

    def _send_alert(self, alert_type: str, message: str, severity: str = "warning"):
        """Send alert notification."""
        alert = {
            "type": alert_type,
            "severity": severity,
            "message": message,
            "timestamp": datetime.now().isoformat()
        }

        print(f"🚨 ALERT [{severity.upper()}]: {message}")

        # Send to webhook (Slack/Discord)
        if self.alert_webhook:
            try:
                requests.post(
                    self.alert_webhook,
                    json={"text": f"🚨 {alert_type}: {message}"},
                    timeout=5
                )
            except:
                pass

    def _check_alerts(self):
        """Check if any alert conditions are met."""
        # Reset error counter every 5 minutes
        if time.time() - self.last_error_reset > 300:
            self.error_count_5min = 0
            self.last_error_reset = time.time()

        # Check error rate
        if len(self.recent_errors) >= 20:
            error_rate = sum(self.recent_errors) / len(self.recent_errors)
            if error_rate > self.max_error_rate:
                self._send_alert(
                    "High Error Rate",
                    f"Routing error rate is {error_rate*100:.1f}% (threshold: {self.max_error_rate*100}%)",
                    severity="critical"
                )

        # Check 5-minute error spike
        if self.error_count_5min >= self.max_errors_5min:
            self._send_alert(
                "Error Spike",
                f"{self.error_count_5min} errors in last 5 minutes",
                severity="critical"
            )

        # Check latency
        if len(self.recent_latencies) >= 20:
            avg_latency = sum(self.recent_latencies) / len(self.recent_latencies)
            if avg_latency > self.max_avg_latency_ms:
                self._send_alert(
                    "High Latency",
                    f"Average routing latency is {avg_latency:.0f}ms (threshold: {self.max_avg_latency_ms}ms)",
                    severity="warning"
                )

    def chat(self, message: str, candidates: List[str] = None) -> dict:
        """Chat with alerting."""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        payload = {
            "model": "@edenai",
            "messages": [{"role": "user", "content": message}],
            "stream": True
        }

        if candidates:
            payload["router_candidates"] = candidates

        try:
            start_time = time.time()

            response = requests.post(
                self.url,
                headers=headers,
                json=payload,
                stream=True,
                timeout=30
            )
            response.raise_for_status()

            full_response = ""
            selected_model = None
            first_chunk_time = None

            for line in response.iter_lines():
                if line:
                    if not first_chunk_time:
                        first_chunk_time = time.time()

                    line_str = line.decode('utf-8')
                    if line_str.startswith('data: ') and line_str != 'data: [DONE]':
                        data = json.loads(line_str[6:])

                        if not selected_model and 'model' in data:
                            selected_model = data['model']

                        content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
                        full_response += content

            end_time = time.time()

            # Record metrics
            if first_chunk_time:
                latency_ms = (first_chunk_time - start_time) * 1000
                self.recent_latencies.append(latency_ms)

            self.recent_errors.append(0)  # Success

            # Check alerts
            self._check_alerts()

            return {
                "success": True,
                "response": full_response,
                "model": selected_model
            }

        except Exception as e:
            # Record error
            self.recent_errors.append(1)
            self.error_count_5min += 1

            # Check alerts
            self._check_alerts()

            return {
                "success": False,
                "error": str(e)
            }

# Usage
alert_manager = RouterAlertManager(
    "YOUR_API_KEY",
    alert_webhook="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
)

# Simulate requests (some will trigger alerts if thresholds are exceeded)
for i in range(50):
    result = alert_manager.chat(
        f"Question {i+1}",
        candidates=["openai/gpt-4o", "anthropic/claude-sonnet-4-5"]
    )
    time.sleep(0.1)

Best Practices

Monitoring Checklist

Essential Metrics
  • Model selection distribution
  • Routing latency (P50, P95, P99)
  • Error rates and types
  • Request volume over time
  • Cost per model and total
Performance Monitoring
  • Track latency trends
  • Monitor for latency regressions
  • Alert on high latency (>2s routing time)
  • Compare routed vs. fixed model performance
Cost Monitoring
  • Track daily/weekly spending
  • Monitor cost per model
  • Alert on budget thresholds
  • Analyze cost optimization opportunities
Error Monitoring
  • Track error rates by pool
  • Monitor routing failures
  • Set up alerts for error spikes
  • Log errors for debugging
Avoid
  • Monitoring without actionable alerts
  • Ignoring cost patterns
  • Missing latency regressions
  • Not tracking model distribution

Next Steps