Integrating AI APIs into your applications opens up powerful capabilities, but it also introduces important considerations around security, performance, and cost. This comprehensive guide will walk you through best practices for building robust, secure, and cost-effective AI API integrations using Python and Flask.
Table of Contents
- Security Best Practices
- Rate Limiting Strategies
- Cost Management
- Error Handling and Resilience
- Monitoring and Observability
1. Security Best Practices
API Key Management
Never hardcode API keys in your source code or commit them to version control. Instead:
# BAD - Hardcoded key
api_key = "sk-ant-1234567890"
# GOOD - Environment variable
import os
api_key = os.environ.get('ANTHROPIC_API_KEY')
Use environment variables and secret management systems:
- Development:
.envfiles (withpython-dotenv) - Production: AWS Secrets Manager, Google Secret Manager, HashiCorp Vault, or Azure Key Vault
# Using python-dotenv
from dotenv import load_dotenv
import os
load_dotenv()
ANTHROPIC_API_KEY = os.getenv('ANTHROPIC_API_KEY')
Implement key rotation policies:
- Rotate API keys regularly (every 90 days recommended)
- Maintain multiple keys to enable zero-downtime rotation
- Have emergency revocation procedures ready
Backend-Only API Calls
AI API keys should never be exposed to client-side code. Always proxy requests through your backend:
from flask import Flask, request, jsonify
import anthropic
import os
app = Flask(__name__)
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
@app.route('/api/generate', methods=['POST'])
def generate_text():
data = request.get_json()
prompt = data.get('prompt', '')
# Validate and sanitize input
if not prompt or len(prompt) > 10000:
return jsonify({'error': 'Invalid prompt'}), 400
try:
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return jsonify({
'content': message.content[0].text,
'usage': {
'input_tokens': message.usage.input_tokens,
'output_tokens': message.usage.output_tokens
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
Input Validation and Sanitization
Protect against injection attacks and abuse:
import re
from flask import abort
class InputValidator:
MAX_LENGTH = 100000
PROHIBITED_PATTERNS = [
r'ignore\s+previous\s+instructions',
r'system\s+prompt',
r'you\s+are\s+now',
]
@classmethod
def validate(cls, text):
# Length validation
if len(text) > cls.MAX_LENGTH:
raise ValueError('Input too long')
# Content filtering
for pattern in cls.PROHIBITED_PATTERNS:
if re.search(pattern, text, re.IGNORECASE):
raise ValueError('Invalid input detected')
return text.strip()
@app.route('/api/generate', methods=['POST'])
def generate_text():
data = request.get_json()
prompt = data.get('prompt', '')
try:
validated_prompt = InputValidator.validate(prompt)
except ValueError as e:
return jsonify({'error': str(e)}), 400
# Process validated prompt...
Authentication and Authorization
Implement proper user authentication before allowing API access:
from flask import Flask, request, jsonify
from functools import wraps
import jwt
from datetime import datetime, timedelta
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY')
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Token is missing'}), 401
try:
# Remove 'Bearer ' prefix if present
if token.startswith('Bearer '):
token = token[7:]
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
current_user = data['user_id']
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token has expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(current_user, *args, **kwargs)
return decorated
@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
# Only authenticated users reach here
prompt = request.get_json().get('prompt', '')
# Process request...
2. Rate Limiting Strategies
Understanding API Rate Limits
Most AI APIs implement rate limits on multiple dimensions:
- Requests per minute (RPM): Number of API calls
- Tokens per minute (TPM): Total tokens processed
- Concurrent requests: Simultaneous connections
Client-Side Rate Limiting
Implement your own rate limiting to stay within quotas:
import time
from collections import deque
from threading import Lock
class RateLimiter:
def __init__(self, max_requests, time_window):
self.max_requests = max_requests
self.time_window = time_window # in seconds
self.requests = deque()
self.lock = Lock()
def acquire(self):
with self.lock:
now = time.time()
# Remove old requests outside time window
while self.requests and now - self.requests[0] > self.time_window:
self.requests.popleft()
if len(self.requests) >= self.max_requests:
# Calculate wait time
oldest_request = self.requests[0]
wait_time = self.time_window - (now - oldest_request)
time.sleep(wait_time)
return self.acquire()
self.requests.append(now)
# Usage
limiter = RateLimiter(max_requests=50, time_window=60) # 50 requests per minute
def call_api(prompt):
limiter.acquire()
# Make API call
return client.messages.create(...)
Flask-Limiter for Rate Limiting
Use Flask-Limiter for built-in rate limiting:
from flask import Flask
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
app = Flask(__name__)
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
storage_uri="redis://localhost:6379"
)
@app.route('/api/generate', methods=['POST'])
@limiter.limit("10 per minute")
@token_required
def generate_text(current_user):
# This endpoint is limited to 10 requests per minute per IP
prompt = request.get_json().get('prompt', '')
# Process request...
Per-User Rate Limiting
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
def get_user_id():
# Extract user ID from JWT token
token = request.headers.get('Authorization', '').replace('Bearer ', '')
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
return data['user_id']
except:
return get_remote_address()
limiter = Limiter(
app=app,
key_func=get_user_id,
storage_uri="redis://localhost:6379"
)
@app.route('/api/generate', methods=['POST'])
@limiter.limit("10 per minute") # Per user
@token_required
def generate_text(current_user):
# Process request...
Exponential Backoff
Handle rate limit errors gracefully with exponential backoff:
import time
import random
from anthropic import RateLimitError
def call_with_retry(api_call_func, max_retries=5):
for attempt in range(max_retries):
try:
return api_call_func()
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
delay = min(2 ** attempt, 32) + random.uniform(0, 1)
time.sleep(delay)
raise Exception('Max retries exceeded')
# Usage
def make_api_call():
return client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)
response = call_with_retry(make_api_call)
Request Queuing with Celery
Implement a queue system for high-traffic applications:
from celery import Celery
from flask import Flask
import anthropic
import os
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
@celery.task(bind=True, max_retries=3)
def process_ai_request(self, prompt, user_id):
try:
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return {
'content': message.content[0].text,
'tokens': message.usage.input_tokens + message.usage.output_tokens
}
except Exception as e:
# Retry with exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries)
@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
prompt = request.get_json().get('prompt', '')
# Queue the task
task = process_ai_request.delay(prompt, current_user)
return jsonify({
'task_id': task.id,
'status': 'queued'
}), 202
@app.route('/api/status/<task_id>', methods=['GET'])
def check_status(task_id):
task = process_ai_request.AsyncResult(task_id)
if task.state == 'PENDING':
response = {'state': task.state, 'status': 'Pending...'}
elif task.state == 'SUCCESS':
response = {'state': task.state, 'result': task.result}
else:
response = {'state': task.state, 'status': str(task.info)}
return jsonify(response)
3. Cost Management
Token Usage Optimization
Reduce prompt length while maintaining effectiveness:
# ❌ Verbose prompt
verbose_prompt = """
Hello AI assistant. I would like you to please analyze
the following text very carefully and provide me with
a comprehensive summary of the main points...
"""
# ✅ Concise prompt
concise_prompt = "Summarize the main points of this text:"
Use appropriate models for different tasks:
class ModelSelector:
MODELS = {
'simple': 'claude-haiku-3-20240307', # Fast, cheap
'balanced': 'claude-sonnet-4-20250514', # Best balance
'complex': 'claude-opus-4-20250514' # Most capable
}
@classmethod
def select_model(cls, task_complexity):
return cls.MODELS.get(task_complexity, cls.MODELS['balanced'])
# Usage
model = ModelSelector.select_model('simple')
message = client.messages.create(model=model, ...)
Implement token counting before sending requests:
import tiktoken
def estimate_tokens(text, model="claude-3"):
# Rough estimation for Claude models
# Approximately 4 characters per token for English
return len(text) // 4
def validate_request(prompt, max_tokens=1000):
estimated_tokens = estimate_tokens(prompt)
if estimated_tokens > max_tokens:
raise ValueError(f'Request exceeds token limit: {estimated_tokens} > {max_tokens}')
return True
@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
prompt = request.get_json().get('prompt', '')
try:
validate_request(prompt, max_tokens=5000)
except ValueError as e:
return jsonify({'error': str(e)}), 400
# Process request...
Response Caching
Cache responses for identical or similar requests:
from flask_caching import Cache
import hashlib
app = Flask(__name__)
cache = Cache(app, config={
'CACHE_TYPE': 'redis',
'CACHE_REDIS_URL': 'redis://localhost:6379/0',
'CACHE_DEFAULT_TIMEOUT': 3600 # 1 hour
})
def generate_cache_key(prompt, model):
# Create a unique hash for the prompt and model
content = f"{model}:{prompt}"
return hashlib.md5(content.encode()).hexdigest()
@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
data = request.get_json()
prompt = data.get('prompt', '')
model = data.get('model', 'claude-sonnet-4-20250514')
# Check cache
cache_key = generate_cache_key(prompt, model)
cached_response = cache.get(cache_key)
if cached_response:
return jsonify({
'content': cached_response,
'cached': True
})
# Make API call
message = client.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
response_text = message.content[0].text
# Cache the response
cache.set(cache_key, response_text)
return jsonify({
'content': response_text,
'cached': False,
'usage': {
'input_tokens': message.usage.input_tokens,
'output_tokens': message.usage.output_tokens
}
})
Usage Tracking and Budgets
Monitor and limit spending:
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timedelta
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/ai_usage'
db = SQLAlchemy(app)
class UsageRecord(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(50), nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
input_tokens = db.Column(db.Integer)
output_tokens = db.Column(db.Integer)
cost = db.Column(db.Float)
model = db.Column(db.String(50))
class UsageTracker:
# Pricing per 1M tokens (example rates)
PRICING = {
'claude-haiku-3-20240307': {'input': 0.25, 'output': 1.25},
'claude-sonnet-4-20250514': {'input': 3.00, 'output': 15.00},
'claude-opus-4-20250514': {'input': 15.00, 'output': 75.00}
}
@classmethod
def calculate_cost(cls, model, input_tokens, output_tokens):
pricing = cls.PRICING.get(model, cls.PRICING['claude-sonnet-4-20250514'])
input_cost = (input_tokens / 1_000_000) * pricing['input']
output_cost = (output_tokens / 1_000_000) * pricing['output']
return input_cost + output_cost
@classmethod
def track_usage(cls, user_id, model, input_tokens, output_tokens):
cost = cls.calculate_cost(model, input_tokens, output_tokens)
record = UsageRecord(
user_id=user_id,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
model=model
)
db.session.add(record)
db.session.commit()
return cost
@classmethod
def get_monthly_usage(cls, user_id):
start_of_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0)
usage = db.session.query(
db.func.sum(UsageRecord.input_tokens).label('total_input_tokens'),
db.func.sum(UsageRecord.output_tokens).label('total_output_tokens'),
db.func.sum(UsageRecord.cost).label('total_cost')
).filter(
UsageRecord.user_id == user_id,
UsageRecord.timestamp >= start_of_month
).first()
return {
'input_tokens': usage.total_input_tokens or 0,
'output_tokens': usage.total_output_tokens or 0,
'total_cost': float(usage.total_cost or 0)
}
@classmethod
def check_budget(cls, user_id, budget_limit=10.0):
usage = cls.get_monthly_usage(user_id)
return usage['total_cost'] < budget_limit
@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
# Check budget
if not UsageTracker.check_budget(current_user, budget_limit=10.0):
return jsonify({'error': 'Monthly budget exceeded'}), 402
data = request.get_json()
prompt = data.get('prompt', '')
model = data.get('model', 'claude-sonnet-4-20250514')
# Make API call
message = client.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
# Track usage
cost = UsageTracker.track_usage(
current_user,
model,
message.usage.input_tokens,
message.usage.output_tokens
)
return jsonify({
'content': message.content[0].text,
'usage': {
'input_tokens': message.usage.input_tokens,
'output_tokens': message.usage.output_tokens,
'cost': cost
}
})
@app.route('/api/usage', methods=['GET'])
@token_required
def get_usage(current_user):
usage = UsageTracker.get_monthly_usage(current_user)
return jsonify(usage)
Streaming Responses
Use streaming to reduce latency and improve UX:
from flask import Response, stream_with_context
import json
@app.route('/api/generate/stream', methods=['POST'])
@token_required
def generate_stream(current_user):
data = request.get_json()
prompt = data.get('prompt', '')
def generate():
try:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
yield f"data: {json.dumps({'content': text})}\n\n"
# Send final message with usage stats
final_message = stream.get_final_message()
yield f"data: {json.dumps({'done': True, 'usage': {
'input_tokens': final_message.usage.input_tokens,
'output_tokens': final_message.usage.output_tokens
}})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)
4. Error Handling and Resilience
Comprehensive Error Handling
from anthropic import APIError, APIConnectionError, RateLimitError, APIStatusError
class AIAPIError(Exception):
def __init__(self, message, status_code, retryable=False):
self.message = message
self.status_code = status_code
self.retryable = retryable
super().__init__(self.message)
def robust_api_call(prompt, model="claude-sonnet-4-20250514"):
try:
message = client.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return message
except RateLimitError as e:
raise AIAPIError('Rate limit exceeded', 429, retryable=True)
except APIConnectionError as e:
raise AIAPIError('Connection error', 503, retryable=True)
except APIStatusError as e:
if e.status_code >= 500:
raise AIAPIError('Server error', e.status_code, retryable=True)
else:
raise AIAPIError(str(e), e.status_code, retryable=False)
except APIError as e:
raise AIAPIError(str(e), 500, retryable=False)
@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
data = request.get_json()
prompt = data.get('prompt', '')
try:
message = robust_api_call(prompt)
return jsonify({
'content': message.content[0].text,
'usage': {
'input_tokens': message.usage.input_tokens,
'output_tokens': message.usage.output_tokens
}
})
except AIAPIError as e:
if e.retryable:
# Could implement retry logic here
return jsonify({
'error': e.message,
'retryable': True
}), e.status_code
else:
return jsonify({
'error': e.message,
'retryable': False
}), e.status_code
Circuit Breaker Pattern
Prevent cascading failures:
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self.lock = Lock()
def call(self, func, *args, **kwargs):
with self.lock:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception('Circuit breaker is OPEN')
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
with self.lock:
self.failure_count = 0
self.state = CircuitState.CLOSED
def on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Global circuit breaker
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60)
@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
data = request.get_json()
prompt = data.get('prompt', '')
try:
message = circuit_breaker.call(robust_api_call, prompt)
return jsonify({'content': message.content[0].text})
except Exception as e:
return jsonify({'error': str(e)}), 503
Timeout Handling
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import signal
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException("Request timeout")
def call_with_timeout(func, timeout_seconds=30, *args, **kwargs):
# Using threading for timeout
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs)
try:
return future.result(timeout=timeout_seconds)
except TimeoutError:
raise TimeoutException("Request timeout")
@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
data = request.get_json()
prompt = data.get('prompt', '')
try:
message = call_with_timeout(robust_api_call, 30, prompt)
return jsonify({'content': message.content[0].text})
except TimeoutException:
return jsonify({'error': 'Request timeout'}), 504
5. Monitoring and Observability
Logging Best Practices
import logging
from logging.handlers import RotatingFileHandler
import json
from datetime import datetime
# Configure logging
def setup_logging():
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# File handler
file_handler = RotatingFileHandler(
'app.log',
maxBytes=10485760, # 10MB
backupCount=10
)
file_handler.setFormatter(formatter)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
logger = setup_logging()
class StructuredLogger:
@staticmethod
def log_api_call(user_id, prompt_length, success, duration, tokens=None, error=None):
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'event': 'api_call',
'user_id': user_id,
'prompt_length': prompt_length,
'success': success,
'duration_ms': duration,
'tokens': tokens,
'error': error
}
if success:
logger.info(json.dumps(log_data))
else:
logger.error(json.dumps(log_data))
@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
data = request.get_json()
prompt = data.get('prompt', '')
start_time = time.time()
try:
message = robust_api_call(prompt)
duration = (time.time() - start_time) * 1000
StructuredLogger.log_api_call(
user_id=current_user,
prompt_length=len(prompt),
success=True,
duration=duration,
tokens=message.usage.input_tokens + message.usage.output_tokens
)
return jsonify({'content': message.content[0].text})
except Exception as e:
duration = (time.time() - start_time) * 1000
StructuredLogger.log_api_call(
user_id=current_user,
prompt_length=len(prompt),
success=False,
duration=duration,
error=str(e)
)
return jsonify({'error': str(e)}), 500
Metrics Collection
from dataclasses import dataclass
from typing import Dict
import threading
@dataclass
class Metrics:
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
total_latency: float = 0.0
total_tokens: int = 0
total_cost: float = 0.0
class MetricsCollector:
def __init__(self):
self.metrics = Metrics()
self.lock = threading.Lock()
def record_request(self, success, latency, tokens=0, cost=0.0):
with self.lock:
self.metrics.total_requests += 1
if success:
self.metrics.successful_requests += 1
self.metrics.total_tokens += tokens
self.metrics.total_cost += cost
else:
self.metrics.failed_requests += 1
self.metrics.total_latency += latency
def get_stats(self) -> Dict:
with self.lock:
if self.metrics.total_requests == 0:
return {
'total_requests': 0,
'success_rate': 0,
'avg_latency_ms': 0,
'total_tokens': 0,
'total_cost': 0
}
return {
'total_requests': self.metrics.total_requests,
'successful_requests': self.metrics.successful_requests,
'failed_requests': self.metrics.failed_requests,
'success_rate': self.metrics.successful_requests / self.metrics.total_requests,
'avg_latency_ms': self.metrics.total_latency / self.metrics.total_requests,
'total_tokens': self.metrics.total_tokens,
'total_cost': self.metrics.total_cost
}
def reset(self):
with self.lock:
self.metrics = Metrics()
# Global metrics collector
metrics = MetricsCollector()
@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
data = request.get_json()
prompt = data.get('prompt', '')
start_time = time.time()
try:
message = robust_api_call(prompt)
duration = (time.time() - start_time) * 1000
tokens = message.usage.input_tokens + message.usage.output_tokens
cost = UsageTracker.calculate_cost(
'claude-sonnet-4-20250514',
message.usage.input_tokens,
message.usage.output_tokens
)
metrics.record_request(True, duration, tokens, cost)
return jsonify({'content': message.content[0].text})
except Exception as e:
duration = (time.time() - start_time) * 1000
metrics.record_request(False, duration)
return jsonify({'error': str(e)}), 500
@app.route('/api/metrics', methods=['GET'])
def get_metrics():
return jsonify(metrics.get_stats())
Prometheus Integration
For production environments, integrate with Prometheus for metrics:
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from flask import Response
# Define metrics
api_requests_total = Counter(
'api_requests_total',
'Total API requests',
['method', 'endpoint', 'status']
)
api_request_duration = Histogram(
'api_request_duration_seconds',
'API request duration',
['method', 'endpoint']
)
api_tokens_used = Counter(
'api_tokens_used_total',
'Total tokens used',
['model', 'user_id']
)
api_cost_total = Counter(
'api_cost_total',
'Total API cost in USD',
['model', 'user_id']
)
active_requests = Gauge(
'active_requests',
'Number of active requests'
)
@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
active_requests.inc()
start_time = time.time()
try:
data = request.get_json()
prompt = data.get('prompt', '')
model = 'claude-sonnet-4-20250514'
message = robust_api_call(prompt, model)
# Record metrics
duration = time.time() - start_time
tokens = message.usage.input_tokens + message.usage.output_tokens
cost = UsageTracker.calculate_cost(
model,
message.usage.input_tokens,
message.usage.output_tokens
)
api_requests_total.labels('POST', '/api/generate', '200').inc()
api_request_duration.labels('POST', '/api/generate').observe(duration)
api_tokens_used.labels(model, current_user).inc(tokens)
api_cost_total.labels(model, current_user).inc(cost)
return jsonify({'content': message.content[0].text})
except Exception as e:
api_requests_total.labels('POST', '/api/generate', '500').inc()
return jsonify({'error': str(e)}), 500
finally:
active_requests.dec()
@app.route('/metrics')
def prometheus_metrics():
return Response(generate_latest(), mimetype='text/plain')
Health Checks
from datetime import datetime
import psutil
@app.route('/health', methods=['GET'])
def health_check():
health_status = {
'status': 'ok',
'timestamp': datetime.utcnow().isoformat(),
'uptime': time.time() - app.config.get('START_TIME', time.time()),
'version': '1.0.0'
}
# Check API connectivity
try:
test_message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=10,
messages=[{"role": "user", "content": "test"}]
)
health_status['api_status'] = 'connected'
except Exception as e:
health_status['api_status'] = 'disconnected'
health_status['api_error'] = str(e)
health_status['status'] = 'degraded'
# Check database connectivity
try:
db.session.execute('SELECT 1')
health_status['database_status'] = 'connected'
except Exception as e:
health_status['database_status'] = 'disconnected'
health_status['database_error'] = str(e)
health_status['status'] = 'degraded'
# Check Redis connectivity
try:
cache.set('health_check', 'ok', timeout=1)
health_status['cache_status'] = 'connected'
except Exception as e:
health_status['cache_status'] = 'disconnected'
health_status['cache_error'] = str(e)
health_status['status'] = 'degraded'
# System metrics
health_status['system'] = {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent
}
status_code = 200 if health_status['status'] == 'ok' else 503
return jsonify(health_status), status_code
@app.route('/health/ready', methods=['GET'])
def readiness_check():
"""Kubernetes readiness probe"""
try:
# Check if app can handle requests
db.session.execute('SELECT 1')
return jsonify({'status': 'ready'}), 200
except:
return jsonify({'status': 'not ready'}), 503
@app.route('/health/live', methods=['GET'])
def liveness_check():
"""Kubernetes liveness probe"""
return jsonify({'status': 'alive'}), 200
Application Performance Monitoring (APM)
Integration with APM tools like New Relic or DataDog:
# Example with New Relic
import newrelic.agent
newrelic.agent.initialize('newrelic.ini')
@newrelic.agent.background_task()
def background_ai_task(prompt):
return client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
@app.route('/api/generate', methods=['POST'])
@token_required
@newrelic.agent.function_trace()
def generate_text(current_user):
data = request.get_json()
prompt = data.get('prompt', '')
# APM will automatically track this
with newrelic.agent.FunctionTrace('anthropic_api_call'):
message = robust_api_call(prompt)
return jsonify({'content': message.content[0].text})
Complete Example Application
Here’s a complete Flask application putting it all together:
# app.py
from flask import Flask, request, jsonify, Response
from flask_sqlalchemy import SQLAlchemy
from flask_caching import Cache
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import anthropic
import os
import time
import logging
from functools import wraps
import jwt
from datetime import datetime, timedelta
# Initialize Flask app
app = Flask(__name__)
# Configuration
app.config['SECRET_KEY'] = os.getenv('JWT_SECRET_KEY')
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['START_TIME'] = time.time()
# Initialize extensions
db = SQLAlchemy(app)
cache = Cache(app, config={
'CACHE_TYPE': 'redis',
'CACHE_REDIS_URL': os.getenv('REDIS_URL', 'redis://localhost:6379/0')
})
# Rate limiter
def get_user_id():
token = request.headers.get('Authorization', '').replace('Bearer ', '')
try:
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
return data['user_id']
except:
return get_remote_address()
limiter = Limiter(
app=app,
key_func=get_user_id,
storage_uri=os.getenv('REDIS_URL', 'redis://localhost:6379/0')
)
# Initialize Anthropic client
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Models
class UsageRecord(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(50), nullable=False, index=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow, index=True)
input_tokens = db.Column(db.Integer)
output_tokens = db.Column(db.Integer)
cost = db.Column(db.Float)
model = db.Column(db.String(50))
# Authentication decorator
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token or not token.startswith('Bearer '):
return jsonify({'error': 'Token is missing'}), 401
try:
token = token[7:]
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
current_user = data['user_id']
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token has expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(current_user, *args, **kwargs)
return decorated
# Usage tracker
class UsageTracker:
PRICING = {
'claude-haiku-3-20240307': {'input': 0.25, 'output': 1.25},
'claude-sonnet-4-20250514': {'input': 3.00, 'output': 15.00},
'claude-opus-4-20250514': {'input': 15.00, 'output': 75.00}
}
@classmethod
def calculate_cost(cls, model, input_tokens, output_tokens):
pricing = cls.PRICING.get(model, cls.PRICING['claude-sonnet-4-20250514'])
input_cost = (input_tokens / 1_000_000) * pricing['input']
output_cost = (output_tokens / 1_000_000) * pricing['output']
return input_cost + output_cost
@classmethod
def track_usage(cls, user_id, model, input_tokens, output_tokens):
cost = cls.calculate_cost(model, input_tokens, output_tokens)
record = UsageRecord(
user_id=user_id,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
model=model
)
db.session.add(record)
db.session.commit()
return cost
@classmethod
def get_monthly_usage(cls, user_id):
start_of_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0)
usage = db.session.query(
db.func.sum(UsageRecord.input_tokens).label('total_input_tokens'),
db.func.sum(UsageRecord.output_tokens).label('total_output_tokens'),
db.func.sum(UsageRecord.cost).label('total_cost')
).filter(
UsageRecord.user_id == user_id,
UsageRecord.timestamp >= start_of_month
).first()
return {
'input_tokens': usage.total_input_tokens or 0,
'output_tokens': usage.total_output_tokens or 0,
'total_cost': float(usage.total_cost or 0)
}
@classmethod
def check_budget(cls, user_id, budget_limit=10.0):
usage = cls.get_monthly_usage(user_id)
return usage['total_cost'] < budget_limit
# Routes
@app.route('/api/auth/login', methods=['POST'])
def login():
"""Generate JWT token for demo purposes"""
data = request.get_json()
user_id = data.get('user_id')
if not user_id:
return jsonify({'error': 'user_id required'}), 400
token = jwt.encode({
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(days=1)
}, app.config['SECRET_KEY'], algorithm='HS256')
return jsonify({'token': token})
@app.route('/api/generate', methods=['POST'])
@limiter.limit("10 per minute")
@token_required
def generate_text(current_user):
"""Generate text using Claude API"""
# Check budget
if not UsageTracker.check_budget(current_user, budget_limit=10.0):
return jsonify({'error': 'Monthly budget exceeded'}), 402
data = request.get_json()
prompt = data.get('prompt', '')
model = data.get('model', 'claude-sonnet-4-20250514')
if not prompt or len(prompt) > 100000:
return jsonify({'error': 'Invalid prompt'}), 400
start_time = time.time()
try:
# Make API call
message = client.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
duration = (time.time() - start_time) * 1000
# Track usage
cost = UsageTracker.track_usage(
current_user,
model,
message.usage.input_tokens,
message.usage.output_tokens
)
logger.info(f"API call successful - user: {current_user}, duration: {duration}ms, tokens: {message.usage.input_tokens + message.usage.output_tokens}")
return jsonify({
'content': message.content[0].text,
'usage': {
'input_tokens': message.usage.input_tokens,
'output_tokens': message.usage.output_tokens,
'cost': cost
},
'duration_ms': duration
})
except Exception as e:
duration = (time.time() - start_time) * 1000
logger.error(f"API call failed - user: {current_user}, error: {str(e)}, duration: {duration}ms")
return jsonify({'error': str(e)}), 500
@app.route('/api/usage', methods=['GET'])
@token_required
def get_usage(current_user):
"""Get current month usage for user"""
usage = UsageTracker.get_monthly_usage(current_user)
return jsonify(usage)
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
health_status = {
'status': 'ok',
'timestamp': datetime.utcnow().isoformat(),
'uptime': time.time() - app.config['START_TIME']
}
try:
db.session.execute('SELECT 1')
health_status['database'] = 'ok'
except:
health_status['database'] = 'error'
health_status['status'] = 'degraded'
status_code = 200 if health_status['status'] == 'ok' else 503
return jsonify(health_status), status_code
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True, port=5000)
Requirements File
# requirements.txt
Flask==3.0.0
Flask-SQLAlchemy==3.1.1
Flask-Caching==2.1.0
Flask-Limiter==3.5.0
anthropic==0.21.0
PyJWT==2.8.0
python-dotenv==1.0.0
psycopg2-binary==2.9.9
redis==5.0.1
psutil==5.9.6
prometheus-client==0.19.0
Environment Variables
# .env
ANTHROPIC_API_KEY=your_api_key_here
JWT_SECRET_KEY=your_secret_key_here
DATABASE_URL=postgresql://localhost/ai_usage
REDIS_URL=redis://localhost:6379/0
FLASK_ENV=development
Deployment Best Practices
Docker Configuration
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# Expose port
EXPOSE 5000
# Run with gunicorn
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "--timeout", "120", "app:app"]
Docker Compose
# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "5000:5000"
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- JWT_SECRET_KEY=${JWT_SECRET_KEY}
- DATABASE_URL=postgresql://postgres:password@db:5432/ai_usage
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
db:
image: postgres:15
environment:
- POSTGRES_DB=ai_usage
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
Conclusion
Building robust AI API integrations with Python and Flask requires careful attention to:
- Security: Environment variables, backend proxies, input validation, JWT authentication
- Rate Limiting: Flask-Limiter, custom rate limiters, exponential backoff, Celery queues
- Cost Management: Token optimization, caching, usage tracking with SQLAlchemy, budget enforcement
- Error Handling: Custom exceptions, circuit breakers, timeout handling, retry logic
- Monitoring: Structured logging, metrics collection, Prometheus integration, health checks
By implementing these patterns, you’ll create AI-powered applications that are secure, reliable, scalable, and cost-effective.
Additional Resources:

Comments