Files
whisper-api/src/services/stats_service.py

135 lines
4.0 KiB
Python

import hashlib
from datetime import datetime, timedelta
from typing import Optional, List
from sqlalchemy.orm import Session
from sqlalchemy import func
from src.database.models import ApiKey, UsageLog
def hash_api_key(key: str) -> str:
"""Hash API key for storage"""
return hashlib.sha256(key.encode()).hexdigest()
async def log_usage(
db: Session,
api_key: str,
endpoint: str,
file_size: Optional[int],
duration: Optional[float],
processing_time: int,
model: str,
status: str,
error_message: Optional[str] = None
):
"""Log API usage"""
key_hash = hash_api_key(api_key)
# Find or create API key record
db_key = db.query(ApiKey).filter(ApiKey.key_hash == key_hash).first()
if not db_key:
db_key = ApiKey(
key_hash=key_hash,
description=f"Auto-created for key {api_key[:8]}..."
)
db.add(db_key)
db.commit()
db.refresh(db_key)
# Update last used
db_key.last_used_at = datetime.utcnow()
db_key.usage_count += 1
# Create usage log
log = UsageLog(
api_key_id=db_key.id,
endpoint=endpoint,
file_size_bytes=file_size,
duration_seconds=duration,
processing_time_ms=processing_time,
model_used=model,
status=status,
error_message=error_message
)
db.add(log)
db.commit()
async def get_usage_stats(db: Session, days: int = 30):
"""Get usage statistics for dashboard"""
since = datetime.utcnow() - timedelta(days=days)
# Total requests
total_requests = db.query(UsageLog).filter(UsageLog.created_at >= since).count()
# Success rate
success_count = db.query(UsageLog).filter(
UsageLog.created_at >= since,
UsageLog.status == "success"
).count()
success_rate = (success_count / total_requests * 100) if total_requests > 0 else 0
# Average processing time
avg_time = db.query(func.avg(UsageLog.processing_time_ms)).filter(
UsageLog.created_at >= since,
UsageLog.status == "success"
).scalar() or 0
# Average audio duration
avg_duration = db.query(func.avg(UsageLog.duration_seconds)).filter(
UsageLog.created_at >= since,
UsageLog.status == "success",
UsageLog.duration_seconds.isnot(None)
).scalar() or 0
# Processing factor (seconds processing time per minute of audio)
# Formula: (processing_time_ms / 1000) / duration_seconds * 60
processing_factor = db.query(
func.avg(
(UsageLog.processing_time_ms / 1000.0) / UsageLog.duration_seconds * 60
)
).filter(
UsageLog.created_at >= since,
UsageLog.status == "success",
UsageLog.duration_seconds.isnot(None),
UsageLog.duration_seconds > 0
).scalar() or 0
# Daily stats for chart
daily_stats = db.query(
func.date(UsageLog.created_at).label("date"),
func.count().label("count"),
func.avg(UsageLog.processing_time_ms).label("avg_time")
).filter(
UsageLog.created_at >= since
).group_by(
func.date(UsageLog.created_at)
).order_by("date").all()
# Recent logs
recent_logs = db.query(UsageLog).order_by(
UsageLog.created_at.desc()
).limit(50).all()
return {
"total_requests": total_requests,
"success_rate": round(success_rate, 2),
"avg_processing_time_ms": round(avg_time, 2),
"avg_duration_seconds": round(avg_duration, 2),
"processing_factor": round(processing_factor, 2),
"daily_stats": [
{"date": str(stat.date), "count": stat.count, "avg_time": round(stat.avg_time or 0, 2)}
for stat in daily_stats
],
"recent_logs": recent_logs
}
async def cleanup_old_logs(db: Session, retention_days: int):
"""Delete logs older than retention period"""
cutoff = datetime.utcnow() - timedelta(days=retention_days)
deleted = db.query(UsageLog).filter(UsageLog.created_at < cutoff).delete()
db.commit()
return deleted