Track and Optimize API Spending
Build a complete cost monitoring and optimization system to track your Eden AI API usage, identify expensive operations, and stay within budget.What You’ll Build
By the end of this tutorial, you’ll have:- Cost Monitoring Dashboard - Real-time view of your API spending
- Automated Budget Alerts - Email/Slack notifications when approaching limits
- Provider Comparison Analysis - Identify the most cost-effective AI providers
- Usage Optimization - Spot and reduce expensive API patterns
Prerequisites
- Python 3.8 or higher
- Eden AI API key
- Basic understanding of REST APIs
- Optional: PostgreSQL or SQLite for persistent storage
Problem Statement
As your Eden AI usage grows, you need to:- Track spending across multiple providers and features
- Set budgets and receive alerts before overspending
- Optimize costs by identifying expensive operations
- Compare providers to find the best value for your use case
- Generate reports for stakeholders
Architecture Overview
Copy
┌─────────────────────────────────────────────────────────────┐
│ Your Application │
│ ┌────────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ Cost │───▶│ Budget │───▶│ Alert │ │
│ │ Monitor │ │ Tracker │ │ System │ │
│ └────────────┘ └────────────┘ └──────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Local Database / Cache │ │
│ └────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────┐
│ Eden AI Cost API │
└───────────────────────┘
Step 1: Set Up Cost Monitoring Client
Create a Python client to interact with the Eden AI Cost Monitoring API:Copy
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json
class EdenAICostMonitor:
"""Client for Eden AI Cost Monitoring API"""
BASE_URL = "https://api.edenai.run/v2/cost_management"
def __init__(self, api_key: str):
"""
Initialize the cost monitor.
Args:
api_key: Your Eden AI API key
"""
self.api_key = api_key
self.headers = {"Authorization": f"Bearer {api_key}"}
def get_credits(self) -> float:
"""
Get current account credit balance.
Returns:
Current credit balance as float
"""
response = requests.get(
f"{self.BASE_URL}/credits/",
headers=self.headers
)
response.raise_for_status()
return response.json()['credits']
def get_usage(
self,
begin: str,
end: str,
step: int = 1,
provider: Optional[str] = None,
subfeature: Optional[str] = None,
token: Optional[str] = None
) -> Dict:
"""
Get usage and cost data for a date range.
Args:
begin: Start date (YYYY-MM-DD)
end: End date (YYYY-MM-DD)
step: Aggregation period (1=daily, 2=weekly, 3=monthly, 4=yearly)
provider: Filter by provider (optional)
subfeature: Filter by subfeature (optional)
token: Filter by token (optional)
Returns:
Usage data dictionary
"""
params = {
"begin": begin,
"end": end,
"step": step
}
if provider:
params["provider"] = provider
if subfeature:
params["subfeature"] = subfeature
if token:
params["token"] = token
response = requests.get(
f"{self.BASE_URL}/",
headers=self.headers,
params=params
)
response.raise_for_status()
return response.json()
def get_date_range_cost(
self,
begin: str,
end: str,
provider: Optional[str] = None
) -> float:
"""
Calculate total cost for a date range.
Args:
begin: Start date (YYYY-MM-DD)
end: End date (YYYY-MM-DD)
provider: Filter by provider (optional)
Returns:
Total cost as float
"""
data = self.get_usage(begin, end, step=1, provider=provider)
total_cost = 0.0
for token_data in data['response']:
for date, features in token_data['data'].items():
for feature, details in features.items():
total_cost += details['total_cost']
return total_cost
def get_provider_breakdown(
self,
begin: str,
end: str
) -> Dict[str, float]:
"""
Get cost breakdown by provider.
Args:
begin: Start date (YYYY-MM-DD)
end: End date (YYYY-MM-DD)
Returns:
Dictionary mapping provider names to costs
"""
data = self.get_usage(begin, end, step=1)
provider_costs = {}
for token_data in data['response']:
for date, features in token_data['data'].items():
for feature, details in features.items():
for provider, cost in details['cost_per_provider'].items():
provider_costs[provider] = (
provider_costs.get(provider, 0.0) + cost
)
return provider_costs
def get_feature_breakdown(
self,
begin: str,
end: str
) -> Dict[str, Dict]:
"""
Get detailed breakdown by feature.
Args:
begin: Start date (YYYY-MM-DD)
end: End date (YYYY-MM-DD)
Returns:
Dictionary with feature costs and call counts
"""
data = self.get_usage(begin, end, step=1)
feature_stats = {}
for token_data in data['response']:
for date, features in token_data['data'].items():
for feature, details in features.items():
if feature not in feature_stats:
feature_stats[feature] = {
'total_cost': 0.0,
'total_calls': 0
}
feature_stats[feature]['total_cost'] += details['total_cost']
feature_stats[feature]['total_calls'] += details['details']
return feature_stats
Step 2: Fetch and Cache Daily Usage Data
Implement caching to reduce API calls and improve performance:Copy
import sqlite3
import json
from datetime import datetime, timedelta
from typing import Optional, Dict
class CostCache:
"""SQLite-based cache for cost data"""
def __init__(self, db_path: str = "cost_cache.db"):
"""Initialize cache database"""
self.conn = sqlite3.connect(db_path)
self.create_tables()
def create_tables(self):
"""Create cache tables if they don't exist"""
cursor = self.conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS daily_costs (
date TEXT PRIMARY KEY,
data TEXT NOT NULL,
fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS provider_costs (
provider TEXT,
date TEXT,
cost REAL,
PRIMARY KEY (provider, date)
)
""")
self.conn.commit()
def cache_daily_data(self, date: str, data: Dict):
"""
Cache daily cost data.
Args:
date: Date string (YYYY-MM-DD)
data: Cost data dictionary
"""
cursor = self.conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO daily_costs (date, data, fetched_at)
VALUES (?, ?, ?)
""", (date, json.dumps(data), datetime.now()))
self.conn.commit()
def get_cached_data(self, date: str, max_age_hours: int = 24) -> Optional[Dict]:
"""
Get cached data if it's fresh enough.
Args:
date: Date string (YYYY-MM-DD)
max_age_hours: Maximum cache age in hours
Returns:
Cached data or None if not found/expired
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT data, fetched_at
FROM daily_costs
WHERE date = ?
""", (date,))
row = cursor.fetchone()
if not row:
return None
data, fetched_at = row
fetched_time = datetime.fromisoformat(fetched_at)
if datetime.now() - fetched_time > timedelta(hours=max_age_hours):
return None # Cache expired
return json.loads(data)
def cache_provider_cost(self, provider: str, date: str, cost: float):
"""Cache provider-specific cost"""
cursor = self.conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO provider_costs (provider, date, cost)
VALUES (?, ?, ?)
""", (provider, date, cost))
self.conn.commit()
def get_provider_total(self, provider: str, begin: str, end: str) -> float:
"""Get total cost for a provider in date range"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT SUM(cost)
FROM provider_costs
WHERE provider = ? AND date BETWEEN ? AND ?
""", (provider, begin, end))
result = cursor.fetchone()[0]
return result or 0.0
def close(self):
"""Close database connection"""
self.conn.close()
Step 3: Analyze Costs by Provider and Feature
Create analysis tools to understand your spending patterns:Copy
from cost_monitor import EdenAICostMonitor
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
class CostAnalyzer:
"""Analyze and report on API costs"""
def __init__(self, monitor: EdenAICostMonitor):
self.monitor = monitor
def compare_providers(
self,
begin: str,
end: str
) -> List[Tuple[str, float, float]]:
"""
Compare costs across providers.
Returns:
List of (provider, cost, percentage) tuples
"""
provider_costs = self.monitor.get_provider_breakdown(begin, end)
total_cost = sum(provider_costs.values())
results = [
(provider, cost, (cost / total_cost * 100) if total_cost > 0 else 0)
for provider, cost in provider_costs.items()
]
# Sort by cost (highest first)
return sorted(results, key=lambda x: x[1], reverse=True)
def find_most_expensive_features(
self,
begin: str,
end: str,
top_n: int = 5
) -> List[Tuple[str, float, int]]:
"""
Find the most expensive features.
Args:
begin: Start date
end: End date
top_n: Number of top features to return
Returns:
List of (feature, cost, num_calls) tuples
"""
features = self.monitor.get_feature_breakdown(begin, end)
results = [
(feature, stats['total_cost'], stats['total_calls'])
for feature, stats in features.items()
]
return sorted(results, key=lambda x: x[1], reverse=True)[:top_n]
def calculate_cost_per_call(
self,
begin: str,
end: str
) -> Dict[str, float]:
"""
Calculate average cost per API call for each feature.
Returns:
Dictionary mapping feature to cost-per-call
"""
features = self.monitor.get_feature_breakdown(begin, end)
return {
feature: (
stats['total_cost'] / stats['total_calls']
if stats['total_calls'] > 0 else 0
)
for feature, stats in features.items()
}
def get_daily_trend(
self,
begin: str,
end: str
) -> List[Tuple[str, float]]:
"""
Get daily cost trend.
Returns:
List of (date, cost) tuples
"""
data = self.monitor.get_usage(begin, end, step=1)
daily_costs = {}
for token_data in data['response']:
for date, features in token_data['data'].items():
daily_total = sum(
details['total_cost']
for details in features.values()
)
daily_costs[date] = daily_costs.get(date, 0.0) + daily_total
return sorted(daily_costs.items())
def print_cost_report(self, begin: str, end: str):
"""Print a formatted cost report"""
print("=" * 60)
print(f"Cost Report: {begin} to {end}")
print("=" * 60)
print()
# Total cost
total_cost = self.monitor.get_date_range_cost(begin, end)
print(f"Total Cost: ${total_cost:.2f}")
print()
# Provider breakdown
print("Cost by Provider:")
print("-" * 60)
for provider, cost, pct in self.compare_providers(begin, end):
print(f"{provider:20s} ${cost:>10.2f} ({pct:>5.1f}%)")
print()
# Most expensive features
print("Top 5 Most Expensive Features:")
print("-" * 60)
for feature, cost, calls in self.find_most_expensive_features(begin, end):
avg_cost = cost / calls if calls > 0 else 0
print(f"{feature:30s} ${cost:>8.2f} ({calls:>6,} calls, ${avg_cost:.4f}/call)")
print()
# Current credits
credits = self.monitor.get_credits()
print(f"Current Credits: ${credits:.2f}")
print("=" * 60)
Step 4: Generate Cost Reports and Visualizations
Create visual reports to understand trends:Copy
from cost_analyzer import CostAnalyzer
import matplotlib.pyplot as plt
from datetime import datetime
class CostVisualizer:
"""Generate visualizations for cost data"""
def __init__(self, analyzer: CostAnalyzer):
self.analyzer = analyzer
def plot_daily_trend(self, begin: str, end: str, save_path: str = None):
"""Plot daily cost trend"""
trend = self.analyzer.get_daily_trend(begin, end)
dates = [datetime.strptime(d, "%Y-%m-%d") for d, _ in trend]
costs = [c for _, c in trend]
plt.figure(figsize=(12, 6))
plt.plot(dates, costs, marker='o', linestyle='-', linewidth=2)
plt.xlabel('Date')
plt.ylabel('Cost ($)')
plt.title('Daily API Cost Trend')
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
if save_path:
plt.savefig(save_path)
else:
plt.show()
def plot_provider_breakdown(self, begin: str, end: str, save_path: str = None):
"""Plot provider cost breakdown as pie chart"""
providers = self.analyzer.compare_providers(begin, end)
labels = [p for p, _, _ in providers]
costs = [c for _, c, _ in providers]
plt.figure(figsize=(10, 8))
plt.pie(costs, labels=labels, autopct='%1.1f%%', startangle=90)
plt.title(f'Cost Distribution by Provider\n{begin} to {end}')
plt.axis('equal')
if save_path:
plt.savefig(save_path)
else:
plt.show()
def plot_feature_costs(self, begin: str, end: str, save_path: str = None):
"""Plot top features by cost as horizontal bar chart"""
features = self.analyzer.find_most_expensive_features(begin, end, top_n=10)
names = [f.replace('__', '\n') for f, _, _ in features]
costs = [c for _, c, _ in features]
plt.figure(figsize=(10, 8))
plt.barh(names, costs)
plt.xlabel('Cost ($)')
plt.title(f'Top 10 Features by Cost\n{begin} to {end}')
plt.tight_layout()
if save_path:
plt.savefig(save_path)
else:
plt.show()
Step 5: Implement Budget Alerts
Add automated alerts when approaching budget limits:Copy
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
from cost_monitor import EdenAICostMonitor
class BudgetAlertSystem:
"""Monitor budgets and send alerts"""
def __init__(
self,
monitor: EdenAICostMonitor,
monthly_budget: float,
alert_thresholds: list = [0.5, 0.75, 0.9],
email_config: dict = None
):
"""
Initialize budget alert system.
Args:
monitor: EdenAICostMonitor instance
monthly_budget: Monthly budget limit in dollars
alert_thresholds: List of budget percentage thresholds (0.0-1.0)
email_config: Email configuration dict with smtp_server, smtp_port,
from_email, to_emails, password
"""
self.monitor = monitor
self.monthly_budget = monthly_budget
self.alert_thresholds = sorted(alert_thresholds)
self.email_config = email_config
self.triggered_alerts = set()
def check_budget(self) -> dict:
"""
Check current month's spending against budget.
Returns:
Dict with budget status information
"""
# Get current month's date range
today = datetime.now()
first_day = today.replace(day=1).strftime("%Y-%m-%d")
current_day = today.strftime("%Y-%m-%d")
# Calculate spending
month_spending = self.monitor.get_date_range_cost(first_day, current_day)
remaining = self.monthly_budget - month_spending
percentage_used = (month_spending / self.monthly_budget) * 100
# Project month-end spending
days_elapsed = today.day
days_in_month = (today.replace(day=28) + timedelta(days=4)).replace(day=1) - timedelta(days=1)
days_in_month = days_in_month.day
if days_elapsed > 0:
daily_average = month_spending / days_elapsed
projected_total = daily_average * days_in_month
else:
projected_total = 0
return {
'budget': self.monthly_budget,
'spent': month_spending,
'remaining': remaining,
'percentage_used': percentage_used,
'days_elapsed': days_elapsed,
'days_in_month': days_in_month,
'projected_total': projected_total,
'over_budget': month_spending > self.monthly_budget
}
def send_email_alert(self, subject: str, message: str):
"""Send email alert"""
if not self.email_config:
print(f"Email not configured. Alert: {subject}")
print(message)
return
msg = MIMEMultipart()
msg['From'] = self.email_config['from_email']
msg['To'] = ', '.join(self.email_config['to_emails'])
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain'))
try:
server = smtplib.SMTP(
self.email_config['smtp_server'],
self.email_config['smtp_port']
)
server.starttls()
server.login(
self.email_config['from_email'],
self.email_config['password']
)
server.send_message(msg)
server.quit()
print(f"Alert sent: {subject}")
except Exception as e:
print(f"Failed to send email: {e}")
def check_and_alert(self):
"""Check budget and send alerts if thresholds exceeded"""
status = self.check_budget()
for threshold in self.alert_thresholds:
threshold_pct = threshold * 100
if status['percentage_used'] >= threshold_pct:
alert_key = f"{datetime.now().strftime('%Y-%m')}-{threshold_pct}"
if alert_key not in self.triggered_alerts:
self.triggered_alerts.add(alert_key)
subject = f"⚠️ Budget Alert: {threshold_pct:.0f}% of Monthly Budget Used"
message = f"""
Eden AI Budget Alert
You have used {status['percentage_used']:.1f}% of your monthly budget.
Budget Status:
- Monthly Budget: ${status['budget']:.2f}
- Spent This Month: ${status['spent']:.2f}
- Remaining: ${status['remaining']:.2f}
- Days Elapsed: {status['days_elapsed']}/{status['days_in_month']}
- Projected Month-End Total: ${status['projected_total']:.2f}
{'⚠️ WARNING: Projected to exceed budget!' if status['projected_total'] > status['budget'] else '✓ On track'}
View detailed breakdown in your dashboard.
"""
self.send_email_alert(subject, message)
def print_budget_status(self):
"""Print budget status to console"""
status = self.check_budget()
print("=" * 60)
print("Monthly Budget Status")
print("=" * 60)
print(f"Budget: ${status['budget']:>10.2f}")
print(f"Spent: ${status['spent']:>10.2f} ({status['percentage_used']:.1f}%)")
print(f"Remaining: ${status['remaining']:>10.2f}")
print(f"Days: {status['days_elapsed']:>3}/{status['days_in_month']}")
print(f"Projected Total: ${status['projected_total']:>10.2f}")
if status['over_budget']:
print("\n⚠️ OVER BUDGET!")
elif status['projected_total'] > status['budget']:
print("\n⚠️ WARNING: Projected to exceed budget")
else:
print("\n✓ On track")
print("=" * 60)
Step 6: Optimize - Identify Most Expensive Operations
Create an optimizer to find cost-saving opportunities:Copy
from cost_analyzer import CostAnalyzer
from typing import List, Dict
class CostOptimizer:
"""Identify cost optimization opportunities"""
def __init__(self, analyzer: CostAnalyzer):
self.analyzer = analyzer
def find_expensive_providers(
self,
begin: str,
end: str,
feature: str = None
) -> List[Dict]:
"""
Identify providers that might have cheaper alternatives.
Returns:
List of optimization suggestions
"""
suggestions = []
# Get provider costs
if feature:
# Filter by feature - would need additional API support
pass
else:
providers = self.analyzer.compare_providers(begin, end)
# Check for high-cost providers
total = sum(cost for _, cost, _ in providers)
for provider, cost, pct in providers:
if pct > 50: # Provider represents >50% of costs
suggestions.append({
'type': 'high_cost_provider',
'provider': provider,
'cost': cost,
'percentage': pct,
'suggestion': f"Provider '{provider}' accounts for {pct:.1f}% of costs. "
f"Consider comparing with alternative providers."
})
return suggestions
def find_inefficient_features(
self,
begin: str,
end: str
) -> List[Dict]:
"""
Find features with unusually high cost-per-call.
Returns:
List of features that might be inefficient
"""
suggestions = []
cost_per_call = self.analyzer.calculate_cost_per_call(begin, end)
features = self.analyzer.monitor.get_feature_breakdown(begin, end)
# Calculate average cost per call across all features
all_costs = list(cost_per_call.values())
if not all_costs:
return suggestions
avg_cost = sum(all_costs) / len(all_costs)
# Find features significantly above average
for feature, cpc in cost_per_call.items():
if cpc > avg_cost * 3: # 3x average
suggestions.append({
'type': 'high_cost_per_call',
'feature': feature,
'cost_per_call': cpc,
'total_calls': features[feature]['total_calls'],
'total_cost': features[feature]['total_cost'],
'suggestion': f"Feature '{feature}' has high cost-per-call "
f"(${cpc:.4f}, 3x average). Review usage patterns."
})
return suggestions
def suggest_optimizations(self, begin: str, end: str):
"""Print all optimization suggestions"""
print("=" * 60)
print("Cost Optimization Suggestions")
print("=" * 60)
print()
provider_suggestions = self.find_expensive_providers(begin, end)
feature_suggestions = self.find_inefficient_features(begin, end)
if not provider_suggestions and not feature_suggestions:
print("✓ No obvious optimization opportunities found.")
print(" Your usage appears efficient!")
return
if provider_suggestions:
print("Provider Optimization Opportunities:")
print("-" * 60)
for i, sugg in enumerate(provider_suggestions, 1):
print(f"{i}. {sugg['suggestion']}")
print(f" Current Cost: ${sugg['cost']:.2f} ({sugg['percentage']:.1f}%)")
print()
if feature_suggestions:
print("Feature Optimization Opportunities:")
print("-" * 60)
for i, sugg in enumerate(feature_suggestions, 1):
print(f"{i}. {sugg['suggestion']}")
print(f" Cost: ${sugg['total_cost']:.2f} ({sugg['total_calls']:,} calls)")
print()
print("=" * 60)
Step 7: Put It All Together
Create a main script that ties everything together:Copy
#!/usr/bin/env python3
"""
Eden AI Cost Monitoring and Optimization System
"""
import os
from datetime import datetime, timedelta
from cost_monitor import EdenAICostMonitor
from cost_analyzer import CostAnalyzer
from cost_optimizer import CostOptimizer
from budget_alerts import BudgetAlertSystem
from cache_manager import CostCache
def main():
# Configuration
API_KEY = os.getenv("EDENAI_API_KEY")
MONTHLY_BUDGET = 500.0 # Your monthly budget
if not API_KEY:
print("Error: EDENAI_API_KEY environment variable not set")
return
# Initialize components
monitor = EdenAICostMonitor(API_KEY)
analyzer = CostAnalyzer(monitor)
optimizer = CostOptimizer(analyzer)
cache = CostCache()
# Set up budget alerts
budget_system = BudgetAlertSystem(
monitor=monitor,
monthly_budget=MONTHLY_BUDGET,
alert_thresholds=[0.5, 0.75, 0.9]
)
# Date ranges
today = datetime.now()
month_start = today.replace(day=1).strftime("%Y-%m-%d")
current_date = today.strftime("%Y-%m-%d")
print("\n🔍 Eden AI Cost Monitoring System\n")
# 1. Check and display budget status
print("📊 Budget Status")
budget_system.print_budget_status()
print()
# 2. Check for budget alerts
budget_system.check_and_alert()
print()
# 3. Display cost report
print("📈 Cost Report")
analyzer.print_cost_report(month_start, current_date)
print()
# 4. Show optimization suggestions
print("💡 Optimization Suggestions")
optimizer.suggest_optimizations(month_start, current_date)
print()
# 5. Show current credits
credits = monitor.get_credits()
print(f"💳 Current Account Balance: ${credits:.2f}")
print()
# Clean up
cache.close()
if __name__ == "__main__":
main()
Testing
Test your cost monitoring system:Copy
import os
from datetime import datetime, timedelta
from cost_monitor import EdenAICostMonitor
def test_cost_monitor():
"""Test basic cost monitoring functionality"""
API_KEY = os.getenv("EDENAI_API_KEY")
monitor = EdenAICostMonitor(API_KEY)
# Test 1: Get credits
print("Test 1: Get current credits")
credits = monitor.get_credits()
print(f"✓ Credits: ${credits:.2f}\n")
# Test 2: Get usage data
print("Test 2: Get last 7 days usage")
end = datetime.now()
begin = end - timedelta(days=7)
data = monitor.get_usage(
begin.strftime("%Y-%m-%d"),
end.strftime("%Y-%m-%d"),
step=1
)
print(f"✓ Retrieved data for {len(data['response'])} token(s)\n")
# Test 3: Get provider breakdown
print("Test 3: Provider breakdown")
providers = monitor.get_provider_breakdown(
begin.strftime("%Y-%m-%d"),
end.strftime("%Y-%m-%d")
)
for provider, cost in providers.items():
print(f" {provider}: ${cost:.2f}")
print()
print("✅ All tests passed!")
if __name__ == "__main__":
test_cost_monitor()
Copy
export EDENAI_API_KEY="your_api_key_here"
python test_system.py
Production Considerations
1. Scheduling
Run the monitoring system on a schedule using cron:Copy
# Run daily at 9 AM
0 9 * * * /usr/bin/python3 /path/to/main.py >> /var/log/edenai_monitor.log 2>&1
schedule library:
Copy
import schedule
import time
def job():
# Run your monitoring
main()
# Schedule daily at 9 AM
schedule.every().day.at("09:00").do(job)
# Or every hour
schedule.every().hour.do(job)
while True:
schedule.run_pending()
time.sleep(60)
2. Error Handling
Add robust error handling:Copy
import logging
from requests.exceptions import RequestException
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
data = monitor.get_usage(begin, end)
except RequestException as e:
logger.error(f"API request failed: {e}")
# Send alert to admin
except Exception as e:
logger.exception("Unexpected error")
# Send alert to admin
3. Security
- Store API keys in environment variables or secret managers
- Use encrypted database for caching sensitive data
- Implement access controls for reports
- Use HTTPS for all API communications
4. Scaling
For high-volume monitoring:- Use Redis instead of SQLite for caching
- Implement asynchronous API calls with
aiohttp - Use job queues (Celery) for background processing
- Store historical data in a time-series database
Next Steps
Now that you have a complete cost monitoring system:- Monitor Usage and Costs Guide - Deep dive into the API
- Manage Custom Tokens - Track costs per token
- Multi-Environment Token Management - Separate dev/prod costs
Complete Example Output
When you runmain.py, you’ll see:
Copy
🔍 Eden AI Cost Monitoring System
📊 Budget Status
============================================================
Monthly Budget Status
============================================================
Budget: $500.00
Spent: $234.56 (46.9%)
Remaining: $265.44
Days: 15/31
Projected Total: $485.63
✓ On track
============================================================
📈 Cost Report
============================================================
Cost Report: 2024-01-01 to 2024-01-15
============================================================
Total Cost: $234.56
Cost by Provider:
------------------------------------------------------------
openai $198.45 (84.6%)
anthropic $28.12 (12.0%)
google $7.99 ( 3.4%)
Top 5 Most Expensive Features:
------------------------------------------------------------
text__chat $198.23 (2,341 calls, $0.0847/call)
image__generation $24.56 ( 89 calls, $0.2760/call)
text__embeddings $8.45 (1,234 calls, $0.0068/call)
ocr__ocr $2.45 ( 45 calls, $0.0544/call)
audio__text_to_speech $0.87 ( 3 calls, $0.2900/call)
Current Credits: $156.78
============================================================
💡 Optimization Suggestions
============================================================
Cost Optimization Suggestions
============================================================
Provider Optimization Opportunities:
------------------------------------------------------------
1. Provider 'openai' accounts for 84.6% of costs. Consider comparing with alternative providers.
Current Cost: $198.45 (84.6%)
============================================================
💳 Current Account Balance: $156.78