Moving AI applications from prototype to production reveals a harsh truth: API costs spiral unpredictably, rate limits strike at the worst moments, and single-provider dependence creates existential risk. What worked for a demo with 100 requests per day fails spectacularly when real users generate thousands of requests with unpredictable patterns.
An AI gateway sits between your application and LLM providers, handling cross-cutting concerns like caching, rate limiting, fallbacks, and cost tracking. This architectural pattern has become essential for teams running production AI workloads at scale.
TL;DR: An AI gateway centralizes LLM infrastructure concerns: request routing, caching (exact and semantic), rate limiting, fallback handling, and cost tracking. Semantic caching can significantly reduce costs for repetitive workloads (results vary based on query patterns). Multi-provider setups with intelligent routing improve reliability and optimize cost-quality tradeoffs. Build vs buy depends on scale and customization needs — start with open-source tools like LiteLLM or managed services like Portkey.
Direct API calls to LLM providers work fine for prototypes. In production, this approach creates problems that compound as you scale.
When every service calls OpenAI directly:
An AI gateway centralizes these responsibilities:
Let's examine each core capability and how to implement it effectively.
| Capability | Purpose | Typical Impact |
|---|---|---|
| Request Routing | Direct traffic to optimal provider | Varies by use case |
| Caching | Avoid redundant API calls | Significant (varies by query patterns) |
| Rate Limiting | Prevent budget overruns | Prevents runaway costs |
| Fallback/Retry | Handle provider failures | 99.9%+ availability |
| Cost Tracking | Attribution and budgeting | Enables chargebacks |
Intelligent routing directs requests to the most appropriate provider based on:
Caching is the highest-impact optimization for most AI workloads. Two approaches dominate:
Exact match caching stores responses keyed by the exact prompt hash. Simple to implement, zero false positives, but only helps with identical requests.
Semantic caching stores embeddings of prompts and returns cached responses for semantically similar queries. Higher hit rates but requires careful tuning to avoid returning inappropriate cached results.
Here's a semantic cache implementation:
import hashlib
import json
import numpy as np
from redis import Redis
from openai import OpenAI
class SemanticCache:
def __init__(self, redis_url: str, similarity_threshold: float = 0.95):
self.redis = Redis.from_url(redis_url)
self.client = OpenAI()
self.threshold = similarity_threshold
self.embedding_model = "text-embedding-3-small"
def _get_embedding(self, text: str) -> list[float]:
response = self.client.embeddings.create(
model=self.embedding_model,
input=text
)
return response.data[0].embedding
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
a, b = np.array(a), np.array(b)
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def get(self, prompt: str) -> dict | None:
query_embedding = self._get_embedding(prompt)
# Scan cached embeddings (use vector DB for scale)
for key in self.redis.scan_iter("cache:embedding:*"):
cached = json.loads(self.redis.get(key))
similarity = self._cosine_similarity(
query_embedding, cached["embedding"]
)
if similarity >= self.threshold:
return cached["response"]
return None
def set(self, prompt: str, response: dict, ttl: int = 3600):
embedding = self._get_embedding(prompt)
cache_key = f"cache:embedding:{hashlib.md5(prompt.encode()).hexdigest()}"
self.redis.setex(
cache_key,
ttl,
json.dumps({"embedding": embedding, "response": response})
)
Rate limiting prevents cost overruns and ensures fair resource allocation. Implement limits at multiple levels:
Token bucket rate limiting implementation:
import time
from redis import Redis
class TokenBucketRateLimiter:
def __init__(self, redis_url: str):
self.redis = Redis.from_url(redis_url)
def check_and_consume(
self,
key: str,
tokens: int,
max_tokens: int,
refill_rate: float
) -> tuple[bool, dict]:
"""
Check if request is allowed and consume tokens.
Args:
key: Unique identifier (user_id, team_id, etc.)
tokens: Tokens to consume (e.g., estimated input + output tokens)
max_tokens: Bucket capacity
refill_rate: Tokens added per second
Returns:
(allowed, info) tuple
"""
now = time.time()
bucket_key = f"ratelimit:{key}"
# Get current bucket state
bucket = self.redis.hgetall(bucket_key)
if bucket:
current_tokens = float(bucket[b"tokens"])
last_update = float(bucket[b"last_update"])
# Refill tokens based on elapsed time
elapsed = now - last_update
current_tokens = min(max_tokens, current_tokens + elapsed * refill_rate)
else:
current_tokens = max_tokens
if current_tokens >= tokens:
# Consume tokens
new_tokens = current_tokens - tokens
self.redis.hset(bucket_key, mapping={
"tokens": new_tokens,
"last_update": now
})
self.redis.expire(bucket_key, 86400) # 24h TTL
return True, {"remaining": new_tokens, "limit": max_tokens}
# Calculate wait time
tokens_needed = tokens - current_tokens
wait_time = tokens_needed / refill_rate
return False, {
"remaining": current_tokens,
"limit": max_tokens,
"retry_after": wait_time
}
Production systems need graceful degradation when providers fail:
Track costs at multiple granularities:
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class UsageRecord:
timestamp: datetime
user_id: str
team_id: str
feature: str
model: str
provider: str
input_tokens: int
output_tokens: int
cached: bool
latency_ms: float
@property
def cost_usd(self) -> float:
# Pricing per 1M tokens (example rates)
pricing = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku": {"input": 0.80, "output": 4.00},
}
rates = pricing.get(self.model, {"input": 5.0, "output": 15.0})
return (
(self.input_tokens * rates["input"] / 1_000_000) +
(self.output_tokens * rates["output"] / 1_000_000)
)
class CostTracker:
def __init__(self, redis_url: str):
self.redis = Redis.from_url(redis_url)
def record(self, usage: UsageRecord):
# Store individual record
record_key = f"usage:{usage.timestamp.isoformat()}"
self.redis.setex(record_key, 86400 * 30, json.dumps(usage.__dict__))
# Update aggregates
date_key = usage.timestamp.strftime("%Y-%m-%d")
# By team
self.redis.incrbyfloat(
f"cost:team:{usage.team_id}:{date_key}",
usage.cost_usd
)
# By feature
self.redis.incrbyfloat(
f"cost:feature:{usage.feature}:{date_key}",
usage.cost_usd
)
# By model
self.redis.incrbyfloat(
f"cost:model:{usage.model}:{date_key}",
usage.cost_usd
)
Caching delivers the highest ROI for most AI workloads. Understanding when and how to cache effectively is critical.
Cache effectiveness depends on your query patterns:
| Aspect | Exact Match | Semantic Cache |
|---|---|---|
| Hit rate | Lower (identical queries only) | Higher (similar queries match) |
| False positives | None | Possible (tuning required) |
| Implementation | Simple hash lookup | Embedding + similarity search |
| Overhead | Minimal | Embedding generation cost |
| Best for | Structured queries, APIs | Natural language, chat |
Proactively populate caches with expected queries:
Stale cache responses can be worse than cache misses. Invalidation strategies:
Depending on a single LLM provider creates business risk. Multi-provider architectures improve reliability, optimize costs, and prevent vendor lock-in.
LiteLLM provides a unified interface across many LLM providers (including OpenAI, Anthropic, Google, Azure, AWS Bedrock, and dozens more):
from litellm import completion, acompletion
import os
# Set provider API keys
os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
# Same interface, any provider
def call_llm(prompt: str, model: str = "gpt-4o") -> str:
response = completion(
model=model, # or "claude-3-5-sonnet-20241022", "gemini/gemini-pro"
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
return response.choices[0].message.content
# Async support
async def call_llm_async(prompt: str, model: str = "gpt-4o") -> str:
response = await acompletion(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# With fallbacks
response = completion(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
fallbacks=["claude-3-5-sonnet-20241022", "gemini/gemini-1.5-pro"]
)
Route requests based on task characteristics:
def route_request(task_type: str, complexity: str, max_tokens: int) -> str:
"""Select optimal model based on task requirements."""
if task_type == "code_generation":
if complexity == "high":
return "gpt-4o" # Best for complex code
return "gpt-4o-mini" # Cost-effective for simple code
if task_type == "long_document":
if max_tokens > 100000:
return "claude-3-5-sonnet-20241022" # 200K context
return "gpt-4o" # 128K context
if task_type == "classification":
return "gpt-4o-mini" # Fast and cheap for structured tasks
if task_type == "creative":
return "claude-3-5-sonnet-20241022" # Strong creative writing
# Default to cost-optimized
return "gpt-4o-mini"
Define fallback chains for each primary model:
FAILOVER_CONFIG = {
"gpt-4o": [
"claude-3-5-sonnet-20241022",
"gemini/gemini-1.5-pro",
],
"claude-3-5-sonnet-20241022": [
"gpt-4o",
"gemini/gemini-1.5-pro",
],
"gpt-4o-mini": [
"claude-3-5-haiku-20241022",
"gemini/gemini-1.5-flash",
],
}
Multiple techniques combine to significantly reduce LLM costs:
| Tactic | Typical Savings | Implementation Effort |
|---|---|---|
| Semantic caching | Varies (depends on query patterns) | Medium |
| Model tiering | Significant (task-dependent) | Low |
| Prompt optimization | Moderate | Low |
| Batch processing | 50% (Batch API) | Medium |
| Response streaming | Improved UX, same cost | Low |
| Token limits | Prevents overruns | Low |
Use the cheapest model that meets quality requirements:
Reduce token usage without sacrificing quality:
Choose based on your scale, customization needs, and operational preferences:
| Solution | Type | Best For | Pricing |
|---|---|---|---|
| LiteLLM Proxy | Open-source | Self-hosted, full control | Free (self-hosted) |
| Portkey | Managed | Fast setup, enterprise features | Usage-based |
| Helicone | Managed | Observability focus | Free tier + usage |
| AWS Bedrock | Cloud | AWS-native, compliance | Usage-based |
| Azure AI Gateway | Cloud | Azure-native, enterprise | Usage-based |
| Custom build | Self-built | Unique requirements | Engineering time |
A basic gateway skeleton with FastAPI:
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from litellm import completion
import time
app = FastAPI()
# Initialize components
cache = SemanticCache(redis_url="redis://localhost:6379")
rate_limiter = TokenBucketRateLimiter(redis_url="redis://localhost:6379")
cost_tracker = CostTracker(redis_url="redis://localhost:6379")
class CompletionRequest(BaseModel):
model: str
messages: list[dict]
user_id: str
team_id: str
feature: str
temperature: float = 0.7
max_tokens: int | None = None
class CompletionResponse(BaseModel):
content: str
model: str
cached: bool
usage: dict
@app.post("/v1/completions", response_model=CompletionResponse)
async def create_completion(request: CompletionRequest):
# Rate limiting
estimated_tokens = sum(len(m["content"]) // 4 for m in request.messages) + 500
allowed, info = rate_limiter.check_and_consume(
key=request.team_id,
tokens=estimated_tokens,
max_tokens=1_000_000, # 1M tokens per day
refill_rate=11.5 # ~1M per day
)
if not allowed:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Retry after {info['retry_after']:.0f}s"
)
# Check cache
cache_key = f"{request.model}:{request.messages[-1]['content']}"
cached_response = cache.get(cache_key)
if cached_response:
return CompletionResponse(
content=cached_response["content"],
model=request.model,
cached=True,
usage=cached_response["usage"]
)
# Call LLM
start_time = time.time()
try:
response = completion(
model=request.model,
messages=request.messages,
temperature=request.temperature,
max_tokens=request.max_tokens,
fallbacks=FAILOVER_CONFIG.get(request.model, [])
)
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
latency_ms = (time.time() - start_time) * 1000
# Extract response data
content = response.choices[0].message.content
usage = {
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens
}
# Cache response
cache.set(cache_key, {"content": content, "usage": usage})
# Track costs
cost_tracker.record(UsageRecord(
timestamp=datetime.now(),
user_id=request.user_id,
team_id=request.team_id,
feature=request.feature,
model=request.model,
provider=response.model.split("/")[0] if "/" in response.model else "openai",
input_tokens=usage["input_tokens"],
output_tokens=usage["output_tokens"],
cached=False,
latency_ms=latency_ms
))
return CompletionResponse(
content=content,
model=request.model,
cached=False,
usage=usage
)
Production AI systems require comprehensive monitoring.
Log at appropriate levels:
Configure alerts for:
At Virtido, we help companies design and implement production AI infrastructure — from gateway architecture to observability. Our teams combine platform engineering expertise with hands-on AI/ML experience.
We've delivered AI infrastructure solutions across FinTech, SaaS, and enterprise software. Our staff augmentation model provides vetted talent with Swiss contracts and full IP protection.
An AI gateway transforms LLM deployments from fragile prototypes into production-grade systems. The pattern addresses real problems that every team hits at scale: unpredictable costs, provider outages, lack of visibility, and vendor lock-in. Starting with basic caching and rate limiting delivers immediate ROI, while multi-provider routing and intelligent cost optimization build long-term resilience.
The implementation path matters as much as the architecture. Managed services like Portkey and Helicone get you running quickly with enterprise features built in. Open-source tools like LiteLLM offer flexibility and control for teams with specific requirements. Building custom makes sense only when existing tools genuinely cannot meet your needs — the engineering investment is significant.
As AI workloads grow, gateway infrastructure becomes a competitive advantage. Teams that invest early in cost control, reliability, and observability can scale confidently while competitors struggle with unpredictable bills and outage-induced downtime. The patterns described here are proven in production across industries — the question is not whether to implement them, but how quickly.
Semantic caching typically reduces costs by 30-50% for workloads with repetitive query patterns, such as customer support, FAQ systems, and code completion. The savings depend on your query distribution — workloads with high query diversity see lower cache hit rates. Monitor your cache hit rate and adjust the similarity threshold to optimize the cost-quality tradeoff.
Exact match caching stores responses keyed by the exact prompt hash and only returns cached results for identical queries. Semantic caching uses embeddings to find similar (not identical) queries, returning cached responses when the semantic similarity exceeds a threshold. Exact match has zero false positives but lower hit rates; semantic caching has higher hit rates but requires tuning to avoid returning inappropriate cached results.
Use an abstraction layer like LiteLLM that normalizes requests and responses across providers. LiteLLM supports 100+ providers through a unified OpenAI-compatible interface. You write code once, and the library handles the API translation. This also simplifies failover logic since you can switch providers without changing your application code.
A well-implemented gateway adds 5-20ms of latency for cache lookups and rate limiting checks. This overhead is negligible compared to typical LLM response times of 1-5 seconds. Cache hits actually reduce total latency dramatically since they avoid the LLM call entirely. The latency tradeoff is almost always worthwhile for the reliability and cost benefits.
Yes, but typically you route each request to a single provider based on the task requirements. Parallel calls to multiple providers are useful for A/B testing, consensus checking (calling multiple models and comparing outputs), or racing providers for lowest latency. However, this multiplies costs, so reserve parallel calls for high-value use cases where the benefits justify the expense.
Include team_id, project_id, and feature identifiers in every request to your gateway. The gateway logs these alongside token usage and calculates costs based on provider pricing. Aggregate daily or monthly costs per dimension and expose dashboards or reports. This enables chargebacks, budget enforcement, and identifying which features drive the most spend.
Start with managed services like Portkey or Helicone if you need quick time-to-value and don't require heavy customization. Use open-source tools like LiteLLM Proxy for self-hosted deployments with more control. Build custom only if you have unique requirements that existing tools don't address — the engineering investment is substantial. Most teams should exhaust existing options before building from scratch.
Implement a multi-layered strategy: First, track your usage against provider limits and implement client-side rate limiting to stay within quotas. Second, configure automatic failover to backup providers when rate limits are hit. Third, use queuing for non-urgent requests to smooth out traffic spikes. Fourth, negotiate higher rate limits with providers if you have predictable high-volume workloads.
Monitor latency (p50, p95, p99), error rates by type, cache hit rate, token usage by model and team, cost per request and aggregated, and provider availability. Set alerts for cost threshold breaches, error rate spikes, latency degradation, and provider outages. Track trends over time to identify optimization opportunities and capacity planning needs.
For RAG-based systems, implement event-driven cache invalidation that clears relevant cached responses when source documents are updated. Tag cached entries with metadata about their data dependencies. For simpler setups, use time-based TTLs appropriate to your data freshness requirements — shorter TTLs for rapidly changing data, longer for stable content.