Moving AI applications from prototype to production reveals a harsh truth: API costs spiral unpredictably, rate limits strike at the worst moments, and single-provider dependence creates existential risk. What worked for a demo with 100 requests per day fails spectacularly when real users generate thousands of requests with unpredictable patterns.
An AI gateway sits between your application and LLM providers, handling cross-cutting concerns like caching, rate limiting, fallbacks, and cost tracking. This architectural pattern has become essential for teams running production AI workloads at scale.
TL;DR: An AI gateway centralizes LLM infrastructure concerns: request routing, caching (exact and semantic), rate limiting, fallback handling, and cost tracking. Semantic caching can significantly reduce costs for repetitive workloads (results vary based on query patterns). Multi-provider setups with intelligent routing improve reliability and optimize cost-quality tradeoffs. Build vs buy depends on scale and customization needs — start with open-source tools like LiteLLM or managed services like Portkey.
Why You Need an AI Gateway
Direct API calls to LLM providers work fine for prototypes. In production, this approach creates problems that compound as you scale.
The Problem with Direct API Calls
When every service calls OpenAI directly:
- No visibility — You can't track which teams or features drive costs without instrumenting every call site
- No resilience — A provider outage takes down your entire AI capability
- No cost control — Runaway loops or bugs can burn through budgets in minutes
- Duplicated effort — Every team implements their own retry logic, caching, and error handling
- Vendor lock-in — Switching providers requires changes across your entire codebase
Cross-Cutting Concerns That Gateways Handle
An AI gateway centralizes these responsibilities:
- Request routing — Direct requests to appropriate providers based on model, cost, or latency requirements
- Caching — Store and reuse responses for identical or semantically similar requests
- Rate limiting — Enforce quotas per user, team, or application to prevent cost overruns
- Fallback and retry — Automatically switch providers or retry on failures
- Cost tracking — Attribute costs to specific teams, features, or customers
- Observability — Centralized logging, metrics, and alerting
Core Gateway Capabilities
Let's examine each core capability and how to implement it effectively.
| Capability | Purpose | Typical Impact |
|---|---|---|
| Request Routing | Direct traffic to optimal provider | Varies by use case |
| Caching | Avoid redundant API calls | Significant (varies by query patterns) |
| Rate Limiting | Prevent budget overruns | Prevents runaway costs |
| Fallback/Retry | Handle provider failures | 99.9%+ availability |
| Cost Tracking | Attribution and budgeting | Enables chargebacks |
Request Routing
Intelligent routing directs requests to the most appropriate provider based on:
- Model capability — Use GPT-4 for complex reasoning, Claude for long context, Mistral for cost-sensitive tasks
- Cost optimization — Route simple tasks to cheaper models automatically
- Latency requirements — Prefer providers with lower p95 latency for interactive use cases
- Geographic compliance — Route to EU-hosted models for GDPR requirements
Caching Strategies
Caching is the highest-impact optimization for most AI workloads. Two approaches dominate:
Exact match caching stores responses keyed by the exact prompt hash. Simple to implement, zero false positives, but only helps with identical requests.
Semantic caching stores embeddings of prompts and returns cached responses for semantically similar queries. Higher hit rates but requires careful tuning to avoid returning inappropriate cached results.
Here's a semantic cache implementation:
import hashlib
import json
import numpy as np
from redis import Redis
from openai import OpenAI
class SemanticCache:
def __init__(self, redis_url: str, similarity_threshold: float = 0.95):
self.redis = Redis.from_url(redis_url)
self.client = OpenAI()
self.threshold = similarity_threshold
self.embedding_model = "text-embedding-3-small"
def _get_embedding(self, text: str) -> list[float]:
response = self.client.embeddings.create(
model=self.embedding_model,
input=text
)
return response.data[0].embedding
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
a, b = np.array(a), np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def get(self, prompt: str) -> dict | None:
query_embedding = self._get_embedding(prompt)
# Scan cached embeddings (use vector DB for scale)
for key in self.redis.scan_iter("cache:embedding:*"):
cached = json.loads(self.redis.get(key))
similarity = self._cosine_similarity(
query_embedding, cached["embedding"]
)
if similarity >= self.threshold:
return cached["response"]
return None
def set(self, prompt: str, response: dict, ttl: int = 3600):
embedding = self._get_embedding(prompt)
cache_key = f"cache:embedding:{hashlib.md5(prompt.encode()).hexdigest()}"
self.redis.setex(
cache_key,
ttl,
json.dumps({"embedding": embedding, "response": response})
)
Rate Limiting
Rate limiting prevents cost overruns and ensures fair resource allocation. Implement limits at multiple levels:
- Per-user limits — Prevent individual users from consuming excessive resources
- Per-team/department limits — Budget allocation across organizational units
- Per-feature limits — Control costs for specific product features
- Global limits — Hard ceiling on total spend
Token bucket rate limiting implementation:
import time
from redis import Redis
class TokenBucketRateLimiter:
def __init__(self, redis_url: str):
self.redis = Redis.from_url(redis_url)
def check_and_consume(
self,
key: str,
tokens: int,
max_tokens: int,
refill_rate: float
) -> tuple[bool, dict]:
"""
Check if request is allowed and consume tokens.
Args:
key: Unique identifier (user_id, team_id, etc.)
tokens: Tokens to consume (e.g., estimated input + output tokens)
max_tokens: Bucket capacity
refill_rate: Tokens added per second
Returns:
(allowed, info) tuple
"""
now = time.time()
bucket_key = f"ratelimit:{key}"
# Get current bucket state
bucket = self.redis.hgetall(bucket_key)
if bucket:
current_tokens = float(bucket[b"tokens"])
last_update = float(bucket[b"last_update"])
# Refill tokens based on elapsed time
elapsed = now - last_update
current_tokens = min(max_tokens, current_tokens + elapsed * refill_rate)
else:
current_tokens = max_tokens
if current_tokens >= tokens:
# Consume tokens
new_tokens = current_tokens - tokens
self.redis.hset(bucket_key, mapping={
"tokens": new_tokens,
"last_update": now
})
self.redis.expire(bucket_key, 86400) # 24h TTL
return True, {"remaining": new_tokens, "limit": max_tokens}
# Calculate wait time
tokens_needed = tokens - current_tokens
wait_time = tokens_needed / refill_rate
return False, {
"remaining": current_tokens,
"limit": max_tokens,
"retry_after": wait_time
}
Fallback and Retry Logic
Production systems need graceful degradation when providers fail:
- Automatic retries — Retry transient failures with exponential backoff
- Provider fallback — Switch to backup provider on persistent failures
- Model fallback — Fall back to a different model if the primary is unavailable
- Circuit breaker — Stop calling a failing provider temporarily to prevent cascade failures
Cost Tracking
Track costs at multiple granularities:
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class UsageRecord:
timestamp: datetime
user_id: str
team_id: str
feature: str
model: str
provider: str
input_tokens: int
output_tokens: int
cached: bool
latency_ms: float
@property
def cost_usd(self) -> float:
# Pricing per 1M tokens (example rates)
pricing = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku": {"input": 0.80, "output": 4.00},
}
rates = pricing.get(self.model, {"input": 5.0, "output": 15.0})
return (
(self.input_tokens * rates["input"] / 1_000_000) +
(self.output_tokens * rates["output"] / 1_000_000)
)
class CostTracker:
def __init__(self, redis_url: str):
self.redis = Redis.from_url(redis_url)
def record(self, usage: UsageRecord):
# Store individual record
record_key = f"usage:{usage.timestamp.isoformat()}"
self.redis.setex(record_key, 86400 * 30, json.dumps(usage.__dict__))
# Update aggregates
date_key = usage.timestamp.strftime("%Y-%m-%d")
# By team
self.redis.incrbyfloat(
f"cost:team:{usage.team_id}:{date_key}",
usage.cost_usd
)
# By feature
self.redis.incrbyfloat(
f"cost:feature:{usage.feature}:{date_key}",
usage.cost_usd
)
# By model
self.redis.incrbyfloat(
f"cost:model:{usage.model}:{date_key}",
usage.cost_usd
)
Caching Strategies Deep Dive
Caching delivers the highest ROI for most AI workloads. Understanding when and how to cache effectively is critical.
When Caching Makes Sense
Cache effectiveness depends on your query patterns:
- High cache potential — Customer support (similar questions), code completion (common patterns), content classification
- Medium cache potential — Document Q&A with RAG (depends on query diversity), translation
- Low cache potential — Creative writing, personalized recommendations, real-time data analysis
Semantic vs Exact Match Caching
| Aspect | Exact Match | Semantic Cache |
|---|---|---|
| Hit rate | Lower (identical queries only) | Higher (similar queries match) |
| False positives | None | Possible (tuning required) |
| Implementation | Simple hash lookup | Embedding + similarity search |
| Overhead | Minimal | Embedding generation cost |
| Best for | Structured queries, APIs | Natural language, chat |
Cache Warming
Proactively populate caches with expected queries:
- Historical analysis — Cache responses for your most frequent queries
- Synthetic generation — Generate variations of common questions
- Off-peak processing — Run batch jobs during low-traffic periods
Cache Invalidation
Stale cache responses can be worse than cache misses. Invalidation strategies:
- Time-based TTL — Simple but may serve stale data or expire valid entries too soon
- Event-driven invalidation — Invalidate when underlying data changes (for RAG caches)
- Version tagging — Include model version in cache key, auto-invalidate on upgrades
Multi-Provider Architecture
Depending on a single LLM provider creates business risk. Multi-provider architectures improve reliability, optimize costs, and prevent vendor lock-in.
Why Multi-Provider Matters
- Reliability — No single provider has 100% uptime. OpenAI, Anthropic, and Google all experience outages.
- Cost optimization — Different providers offer better price/performance for different task types
- Capability matching — Claude excels at long context, GPT-4 at code, Gemini at multimodal
- Negotiating leverage — Multi-provider capability gives you options when negotiating contracts
Provider Abstraction with LiteLLM
LiteLLM provides a unified interface across many LLM providers (including OpenAI, Anthropic, Google, Azure, AWS Bedrock, and dozens more):
from litellm import completion, acompletion
import os
# Set provider API keys
os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
# Same interface, any provider
def call_llm(prompt: str, model: str = "gpt-4o") -> str:
response = completion(
model=model, # or "claude-3-5-sonnet-20241022", "gemini/gemini-pro"
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
return response.choices[0].message.content
# Async support
async def call_llm_async(prompt: str, model: str = "gpt-4o") -> str:
response = await acompletion(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# With fallbacks
response = completion(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
fallbacks=["claude-3-5-sonnet-20241022", "gemini/gemini-1.5-pro"]
)
Intelligent Routing
Route requests based on task characteristics:
def route_request(task_type: str, complexity: str, max_tokens: int) -> str:
"""Select optimal model based on task requirements."""
if task_type == "code_generation":
if complexity == "high":
return "gpt-4o" # Best for complex code
return "gpt-4o-mini" # Cost-effective for simple code
if task_type == "long_document":
if max_tokens > 100000:
return "claude-3-5-sonnet-20241022" # 200K context
return "gpt-4o" # 128K context
if task_type == "classification":
return "gpt-4o-mini" # Fast and cheap for structured tasks
if task_type == "creative":
return "claude-3-5-sonnet-20241022" # Strong creative writing
# Default to cost-optimized
return "gpt-4o-mini"
Failover Configuration
Define fallback chains for each primary model:
FAILOVER_CONFIG = {
"gpt-4o": [
"claude-3-5-sonnet-20241022",
"gemini/gemini-1.5-pro",
],
"claude-3-5-sonnet-20241022": [
"gpt-4o",
"gemini/gemini-1.5-pro",
],
"gpt-4o-mini": [
"claude-3-5-haiku-20241022",
"gemini/gemini-1.5-flash",
],
}
Cost Optimization Tactics
Multiple techniques combine to significantly reduce LLM costs:
| Tactic | Typical Savings | Implementation Effort |
|---|---|---|
| Semantic caching | Varies (depends on query patterns) | Medium |
| Model tiering | Significant (task-dependent) | Low |
| Prompt optimization | Moderate | Low |
| Batch processing | 50% (Batch API) | Medium |
| Response streaming | Improved UX, same cost | Low |
| Token limits | Prevents overruns | Low |
Model Tiering
Use the cheapest model that meets quality requirements:
- Tier 1 (flagship) — GPT-4o, Claude 3.5 Sonnet for complex reasoning, critical outputs
- Tier 2 (balanced) — GPT-4o-mini, Claude 3.5 Haiku for general tasks
- Tier 3 (economy) — Open-source models for high-volume, low-complexity tasks
Prompt Optimization
Reduce token usage without sacrificing quality:
- Compress system prompts — Remove redundant instructions, use concise language
- Limit context — Only include relevant information in RAG contexts
- Output constraints — Specify maximum response length when appropriate
Implementation Options
Choose based on your scale, customization needs, and operational preferences:
| Solution | Type | Best For | Pricing |
|---|---|---|---|
| LiteLLM Proxy | Open-source | Self-hosted, full control | Free (self-hosted) |
| Portkey | Managed | Fast setup, enterprise features | Usage-based |
| Helicone | Managed | Observability focus | Free tier + usage |
| AWS Bedrock | Cloud | AWS-native, compliance | Usage-based |
| Azure AI Gateway | Cloud | Azure-native, enterprise | Usage-based |
| Custom build | Self-built | Unique requirements | Engineering time |
Minimal Gateway Implementation
A basic gateway skeleton with FastAPI:
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from litellm import completion
import time
app = FastAPI()
# Initialize components
cache = SemanticCache(redis_url="redis://localhost:6379")
rate_limiter = TokenBucketRateLimiter(redis_url="redis://localhost:6379")
cost_tracker = CostTracker(redis_url="redis://localhost:6379")
class CompletionRequest(BaseModel):
model: str
messages: list[dict]
user_id: str
team_id: str
feature: str
temperature: float = 0.7
max_tokens: int | None = None
class CompletionResponse(BaseModel):
content: str
model: str
cached: bool
usage: dict
@app.post("/v1/completions", response_model=CompletionResponse)
async def create_completion(request: CompletionRequest):
# Rate limiting
estimated_tokens = sum(len(m["content"]) // 4 for m in request.messages) + 500
allowed, info = rate_limiter.check_and_consume(
key=request.team_id,
tokens=estimated_tokens,
max_tokens=1_000_000, # 1M tokens per day
refill_rate=11.5 # ~1M per day
)
if not allowed:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Retry after {info['retry_after']:.0f}s"
)
# Check cache
cache_key = f"{request.model}:{request.messages[-1]['content']}"
cached_response = cache.get(cache_key)
if cached_response:
return CompletionResponse(
content=cached_response["content"],
model=request.model,
cached=True,
usage=cached_response["usage"]
)
# Call LLM
start_time = time.time()
try:
response = completion(
model=request.model,
messages=request.messages,
temperature=request.temperature,
max_tokens=request.max_tokens,
fallbacks=FAILOVER_CONFIG.get(request.model, [])
)
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
latency_ms = (time.time() - start_time) * 1000
# Extract response data
content = response.choices[0].message.content
usage = {
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens
}
# Cache response
cache.set(cache_key, {"content": content, "usage": usage})
# Track costs
cost_tracker.record(UsageRecord(
timestamp=datetime.now(),
user_id=request.user_id,
team_id=request.team_id,
feature=request.feature,
model=request.model,
provider=response.model.split("/")[0] if "/" in response.model else "openai",
input_tokens=usage["input_tokens"],
output_tokens=usage["output_tokens"],
cached=False,
latency_ms=latency_ms
))
return CompletionResponse(
content=content,
model=request.model,
cached=False,
usage=usage
)
Monitoring and Observability
Production AI systems require comprehensive monitoring.
Key Metrics
- Latency — p50, p95, p99 response times by model and provider
- Error rate — Failed requests by error type (rate limit, timeout, API error)
- Cache hit rate — Percentage of requests served from cache
- Token usage — Input and output tokens by team, feature, model
- Cost — Real-time and cumulative spend tracking
- Provider health — Availability and performance by provider
Logging Strategy
Log at appropriate levels:
- Always log — Request metadata, model, tokens, latency, cost, cache status
- Optionally log — Full prompts and responses (consider privacy, storage costs)
- Never log — API keys, PII without consent
Alerting
Configure alerts for:
- Cost thresholds — Daily/weekly spend exceeds budget
- Error rate spikes — Error rate exceeds baseline
- Latency degradation — p95 latency exceeds SLA
- Provider outages — Failover triggered, provider unavailable
- Cache degradation — Hit rate drops significantly
How Virtido Can Help You Build AI Infrastructure
At Virtido, we help companies design and implement production AI infrastructure — from gateway architecture to observability. Our teams combine platform engineering expertise with hands-on AI/ML experience.
What We Offer
- AI infrastructure design — Gateway architecture, caching strategies, multi-provider setups tailored to your requirements
- DevOps and platform engineering — Kubernetes, observability, CI/CD for AI workloads
- Cost optimization — Analyze your LLM usage and implement savings strategies
- MLOps implementation — Model deployment, monitoring, and lifecycle management
- Staff augmentation — Platform engineers and ML specialists to extend your team in 2-4 weeks
We've delivered AI infrastructure solutions across FinTech, SaaS, and enterprise software. Our staff augmentation model provides vetted talent with Swiss contracts and full IP protection.
Final Thoughts
An AI gateway transforms LLM deployments from fragile prototypes into production-grade systems. The pattern addresses real problems that every team hits at scale: unpredictable costs, provider outages, lack of visibility, and vendor lock-in. Starting with basic caching and rate limiting delivers immediate ROI, while multi-provider routing and intelligent cost optimization build long-term resilience.
The implementation path matters as much as the architecture. Managed services like Portkey and Helicone get you running quickly with enterprise features built in. Open-source tools like LiteLLM offer flexibility and control for teams with specific requirements. Building custom makes sense only when existing tools genuinely cannot meet your needs — the engineering investment is significant.
As AI workloads grow, gateway infrastructure becomes a competitive advantage. Teams that invest early in cost control, reliability, and observability can scale confidently while competitors struggle with unpredictable bills and outage-induced downtime. The patterns described here are proven in production across industries — the question is not whether to implement them, but how quickly.
Frequently Asked Questions
How much can semantic caching reduce my LLM costs?
Semantic caching typically reduces costs by 30-50% for workloads with repetitive query patterns, such as customer support, FAQ systems, and code completion. The savings depend on your query distribution — workloads with high query diversity see lower cache hit rates. Monitor your cache hit rate and adjust the similarity threshold to optimize the cost-quality tradeoff.
What's the difference between semantic caching and exact match caching?
Exact match caching stores responses keyed by the exact prompt hash and only returns cached results for identical queries. Semantic caching uses embeddings to find similar (not identical) queries, returning cached responses when the semantic similarity exceeds a threshold. Exact match has zero false positives but lower hit rates; semantic caching has higher hit rates but requires tuning to avoid returning inappropriate cached results.
How do I handle different API formats between providers like OpenAI and Anthropic?
Use an abstraction layer like LiteLLM that normalizes requests and responses across providers. LiteLLM supports 100+ providers through a unified OpenAI-compatible interface. You write code once, and the library handles the API translation. This also simplifies failover logic since you can switch providers without changing your application code.
How much latency does an AI gateway add to requests?
A well-implemented gateway adds 5-20ms of latency for cache lookups and rate limiting checks. This overhead is negligible compared to typical LLM response times of 1-5 seconds. Cache hits actually reduce total latency dramatically since they avoid the LLM call entirely. The latency tradeoff is almost always worthwhile for the reliability and cost benefits.
Can I use multiple LLM providers simultaneously for the same request?
Yes, but typically you route each request to a single provider based on the task requirements. Parallel calls to multiple providers are useful for A/B testing, consensus checking (calling multiple models and comparing outputs), or racing providers for lowest latency. However, this multiplies costs, so reserve parallel calls for high-value use cases where the benefits justify the expense.
How do I track and allocate LLM costs to different teams or projects?
Include team_id, project_id, and feature identifiers in every request to your gateway. The gateway logs these alongside token usage and calculates costs based on provider pricing. Aggregate daily or monthly costs per dimension and expose dashboards or reports. This enables chargebacks, budget enforcement, and identifying which features drive the most spend.
Should I build my own AI gateway or use a managed service?
Start with managed services like Portkey or Helicone if you need quick time-to-value and don't require heavy customization. Use open-source tools like LiteLLM Proxy for self-hosted deployments with more control. Build custom only if you have unique requirements that existing tools don't address — the engineering investment is substantial. Most teams should exhaust existing options before building from scratch.
How do I handle provider rate limits without impacting users?
Implement a multi-layered strategy: First, track your usage against provider limits and implement client-side rate limiting to stay within quotas. Second, configure automatic failover to backup providers when rate limits are hit. Third, use queuing for non-urgent requests to smooth out traffic spikes. Fourth, negotiate higher rate limits with providers if you have predictable high-volume workloads.
What metrics should I monitor for a production AI gateway?
Monitor latency (p50, p95, p99), error rates by type, cache hit rate, token usage by model and team, cost per request and aggregated, and provider availability. Set alerts for cost threshold breaches, error rate spikes, latency degradation, and provider outages. Track trends over time to identify optimization opportunities and capacity planning needs.
How do I invalidate cached responses when my underlying data changes?
For RAG-based systems, implement event-driven cache invalidation that clears relevant cached responses when source documents are updated. Tag cached entries with metadata about their data dependencies. For simpler setups, use time-based TTLs appropriate to your data freshness requirements — shorter TTLs for rapidly changing data, longer for stable content.