Integrating AI APIs into your applications opens up powerful capabilities, but it also introduces important considerations around security, performance, and cost. This comprehensive guide will walk you through best practices for building robust, secure, and cost-effective AI API integrations using Python and Flask.

Table of Contents

  1. Security Best Practices
  2. Rate Limiting Strategies
  3. Cost Management
  4. Error Handling and Resilience
  5. Monitoring and Observability

1. Security Best Practices

API Key Management

Never hardcode API keys in your source code or commit them to version control. Instead:

# BAD - Hardcoded key
api_key = "sk-ant-1234567890"

# GOOD - Environment variable
import os
api_key = os.environ.get('ANTHROPIC_API_KEY')

Use environment variables and secret management systems:

  • Development: .env files (with python-dotenv)
  • Production: AWS Secrets Manager, Google Secret Manager, HashiCorp Vault, or Azure Key Vault
# Using python-dotenv
from dotenv import load_dotenv
import os

load_dotenv()
ANTHROPIC_API_KEY = os.getenv('ANTHROPIC_API_KEY')

Implement key rotation policies:

  • Rotate API keys regularly (every 90 days recommended)
  • Maintain multiple keys to enable zero-downtime rotation
  • Have emergency revocation procedures ready

Backend-Only API Calls

AI API keys should never be exposed to client-side code. Always proxy requests through your backend:

from flask import Flask, request, jsonify
import anthropic
import os

app = Flask(__name__)
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))

@app.route('/api/generate', methods=['POST'])
def generate_text():
    data = request.get_json()
    prompt = data.get('prompt', '')
    
    # Validate and sanitize input
    if not prompt or len(prompt) > 10000:
        return jsonify({'error': 'Invalid prompt'}), 400
    
    try:
        message = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}]
        )
        
        return jsonify({
            'content': message.content[0].text,
            'usage': {
                'input_tokens': message.usage.input_tokens,
                'output_tokens': message.usage.output_tokens
            }
        })
    except Exception as e:
        return jsonify({'error': str(e)}), 500

Input Validation and Sanitization

Protect against injection attacks and abuse:

import re
from flask import abort

class InputValidator:
    MAX_LENGTH = 100000
    PROHIBITED_PATTERNS = [
        r'ignore\s+previous\s+instructions',
        r'system\s+prompt',
        r'you\s+are\s+now',
    ]
    
    @classmethod
    def validate(cls, text):
        # Length validation
        if len(text) > cls.MAX_LENGTH:
            raise ValueError('Input too long')
        
        # Content filtering
        for pattern in cls.PROHIBITED_PATTERNS:
            if re.search(pattern, text, re.IGNORECASE):
                raise ValueError('Invalid input detected')
        
        return text.strip()

@app.route('/api/generate', methods=['POST'])
def generate_text():
    data = request.get_json()
    prompt = data.get('prompt', '')
    
    try:
        validated_prompt = InputValidator.validate(prompt)
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    
    # Process validated prompt...

Authentication and Authorization

Implement proper user authentication before allowing API access:

from flask import Flask, request, jsonify
from functools import wraps
import jwt
from datetime import datetime, timedelta

app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY')

def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        
        if not token:
            return jsonify({'error': 'Token is missing'}), 401
        
        try:
            # Remove 'Bearer ' prefix if present
            if token.startswith('Bearer '):
                token = token[7:]
            
            data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
            current_user = data['user_id']
        except jwt.ExpiredSignatureError:
            return jsonify({'error': 'Token has expired'}), 401
        except jwt.InvalidTokenError:
            return jsonify({'error': 'Invalid token'}), 401
        
        return f(current_user, *args, **kwargs)
    
    return decorated

@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
    # Only authenticated users reach here
    prompt = request.get_json().get('prompt', '')
    # Process request...

2. Rate Limiting Strategies

Understanding API Rate Limits

Most AI APIs implement rate limits on multiple dimensions:

  • Requests per minute (RPM): Number of API calls
  • Tokens per minute (TPM): Total tokens processed
  • Concurrent requests: Simultaneous connections

Client-Side Rate Limiting

Implement your own rate limiting to stay within quotas:

import time
from collections import deque
from threading import Lock

class RateLimiter:
    def __init__(self, max_requests, time_window):
        self.max_requests = max_requests
        self.time_window = time_window  # in seconds
        self.requests = deque()
        self.lock = Lock()
    
    def acquire(self):
        with self.lock:
            now = time.time()
            
            # Remove old requests outside time window
            while self.requests and now - self.requests[0] > self.time_window:
                self.requests.popleft()
            
            if len(self.requests) >= self.max_requests:
                # Calculate wait time
                oldest_request = self.requests[0]
                wait_time = self.time_window - (now - oldest_request)
                time.sleep(wait_time)
                return self.acquire()
            
            self.requests.append(now)

# Usage
limiter = RateLimiter(max_requests=50, time_window=60)  # 50 requests per minute

def call_api(prompt):
    limiter.acquire()
    # Make API call
    return client.messages.create(...)

Flask-Limiter for Rate Limiting

Use Flask-Limiter for built-in rate limiting:

from flask import Flask
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

app = Flask(__name__)
limiter = Limiter(
    app=app,
    key_func=get_remote_address,
    default_limits=["200 per day", "50 per hour"],
    storage_uri="redis://localhost:6379"
)

@app.route('/api/generate', methods=['POST'])
@limiter.limit("10 per minute")
@token_required
def generate_text(current_user):
    # This endpoint is limited to 10 requests per minute per IP
    prompt = request.get_json().get('prompt', '')
    # Process request...

Per-User Rate Limiting

from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

def get_user_id():
    # Extract user ID from JWT token
    token = request.headers.get('Authorization', '').replace('Bearer ', '')
    try:
        data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
        return data['user_id']
    except:
        return get_remote_address()

limiter = Limiter(
    app=app,
    key_func=get_user_id,
    storage_uri="redis://localhost:6379"
)

@app.route('/api/generate', methods=['POST'])
@limiter.limit("10 per minute")  # Per user
@token_required
def generate_text(current_user):
    # Process request...

Exponential Backoff

Handle rate limit errors gracefully with exponential backoff:

import time
import random
from anthropic import RateLimitError

def call_with_retry(api_call_func, max_retries=5):
    for attempt in range(max_retries):
        try:
            return api_call_func()
        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise
            
            # Exponential backoff with jitter
            delay = min(2 ** attempt, 32) + random.uniform(0, 1)
            time.sleep(delay)
    
    raise Exception('Max retries exceeded')

# Usage
def make_api_call():
    return client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Hello!"}]
    )

response = call_with_retry(make_api_call)

Request Queuing with Celery

Implement a queue system for high-traffic applications:

from celery import Celery
from flask import Flask
import anthropic
import os

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))

@celery.task(bind=True, max_retries=3)
def process_ai_request(self, prompt, user_id):
    try:
        message = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}]
        )
        return {
            'content': message.content[0].text,
            'tokens': message.usage.input_tokens + message.usage.output_tokens
        }
    except Exception as e:
        # Retry with exponential backoff
        raise self.retry(exc=e, countdown=2 ** self.request.retries)

@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
    prompt = request.get_json().get('prompt', '')
    
    # Queue the task
    task = process_ai_request.delay(prompt, current_user)
    
    return jsonify({
        'task_id': task.id,
        'status': 'queued'
    }), 202

@app.route('/api/status/<task_id>', methods=['GET'])
def check_status(task_id):
    task = process_ai_request.AsyncResult(task_id)
    
    if task.state == 'PENDING':
        response = {'state': task.state, 'status': 'Pending...'}
    elif task.state == 'SUCCESS':
        response = {'state': task.state, 'result': task.result}
    else:
        response = {'state': task.state, 'status': str(task.info)}
    
    return jsonify(response)

3. Cost Management

Token Usage Optimization

Reduce prompt length while maintaining effectiveness:

# ❌ Verbose prompt
verbose_prompt = """
Hello AI assistant. I would like you to please analyze 
the following text very carefully and provide me with 
a comprehensive summary of the main points...
"""

# ✅ Concise prompt
concise_prompt = "Summarize the main points of this text:"

Use appropriate models for different tasks:

class ModelSelector:
    MODELS = {
        'simple': 'claude-haiku-3-20240307',      # Fast, cheap
        'balanced': 'claude-sonnet-4-20250514',    # Best balance
        'complex': 'claude-opus-4-20250514'        # Most capable
    }
    
    @classmethod
    def select_model(cls, task_complexity):
        return cls.MODELS.get(task_complexity, cls.MODELS['balanced'])

# Usage
model = ModelSelector.select_model('simple')
message = client.messages.create(model=model, ...)

Implement token counting before sending requests:

import tiktoken

def estimate_tokens(text, model="claude-3"):
    # Rough estimation for Claude models
    # Approximately 4 characters per token for English
    return len(text) // 4

def validate_request(prompt, max_tokens=1000):
    estimated_tokens = estimate_tokens(prompt)
    
    if estimated_tokens > max_tokens:
        raise ValueError(f'Request exceeds token limit: {estimated_tokens} > {max_tokens}')
    
    return True

@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
    prompt = request.get_json().get('prompt', '')
    
    try:
        validate_request(prompt, max_tokens=5000)
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    
    # Process request...

Response Caching

Cache responses for identical or similar requests:

from flask_caching import Cache
import hashlib

app = Flask(__name__)
cache = Cache(app, config={
    'CACHE_TYPE': 'redis',
    'CACHE_REDIS_URL': 'redis://localhost:6379/0',
    'CACHE_DEFAULT_TIMEOUT': 3600  # 1 hour
})

def generate_cache_key(prompt, model):
    # Create a unique hash for the prompt and model
    content = f"{model}:{prompt}"
    return hashlib.md5(content.encode()).hexdigest()

@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
    data = request.get_json()
    prompt = data.get('prompt', '')
    model = data.get('model', 'claude-sonnet-4-20250514')
    
    # Check cache
    cache_key = generate_cache_key(prompt, model)
    cached_response = cache.get(cache_key)
    
    if cached_response:
        return jsonify({
            'content': cached_response,
            'cached': True
        })
    
    # Make API call
    message = client.messages.create(
        model=model,
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )
    
    response_text = message.content[0].text
    
    # Cache the response
    cache.set(cache_key, response_text)
    
    return jsonify({
        'content': response_text,
        'cached': False,
        'usage': {
            'input_tokens': message.usage.input_tokens,
            'output_tokens': message.usage.output_tokens
        }
    })

Usage Tracking and Budgets

Monitor and limit spending:

from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timedelta

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/ai_usage'
db = SQLAlchemy(app)

class UsageRecord(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.String(50), nullable=False)
    timestamp = db.Column(db.DateTime, default=datetime.utcnow)
    input_tokens = db.Column(db.Integer)
    output_tokens = db.Column(db.Integer)
    cost = db.Column(db.Float)
    model = db.Column(db.String(50))

class UsageTracker:
    # Pricing per 1M tokens (example rates)
    PRICING = {
        'claude-haiku-3-20240307': {'input': 0.25, 'output': 1.25},
        'claude-sonnet-4-20250514': {'input': 3.00, 'output': 15.00},
        'claude-opus-4-20250514': {'input': 15.00, 'output': 75.00}
    }
    
    @classmethod
    def calculate_cost(cls, model, input_tokens, output_tokens):
        pricing = cls.PRICING.get(model, cls.PRICING['claude-sonnet-4-20250514'])
        input_cost = (input_tokens / 1_000_000) * pricing['input']
        output_cost = (output_tokens / 1_000_000) * pricing['output']
        return input_cost + output_cost
    
    @classmethod
    def track_usage(cls, user_id, model, input_tokens, output_tokens):
        cost = cls.calculate_cost(model, input_tokens, output_tokens)
        
        record = UsageRecord(
            user_id=user_id,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost=cost,
            model=model
        )
        db.session.add(record)
        db.session.commit()
        
        return cost
    
    @classmethod
    def get_monthly_usage(cls, user_id):
        start_of_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0)
        
        usage = db.session.query(
            db.func.sum(UsageRecord.input_tokens).label('total_input_tokens'),
            db.func.sum(UsageRecord.output_tokens).label('total_output_tokens'),
            db.func.sum(UsageRecord.cost).label('total_cost')
        ).filter(
            UsageRecord.user_id == user_id,
            UsageRecord.timestamp >= start_of_month
        ).first()
        
        return {
            'input_tokens': usage.total_input_tokens or 0,
            'output_tokens': usage.total_output_tokens or 0,
            'total_cost': float(usage.total_cost or 0)
        }
    
    @classmethod
    def check_budget(cls, user_id, budget_limit=10.0):
        usage = cls.get_monthly_usage(user_id)
        return usage['total_cost'] < budget_limit

@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
    # Check budget
    if not UsageTracker.check_budget(current_user, budget_limit=10.0):
        return jsonify({'error': 'Monthly budget exceeded'}), 402
    
    data = request.get_json()
    prompt = data.get('prompt', '')
    model = data.get('model', 'claude-sonnet-4-20250514')
    
    # Make API call
    message = client.messages.create(
        model=model,
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )
    
    # Track usage
    cost = UsageTracker.track_usage(
        current_user,
        model,
        message.usage.input_tokens,
        message.usage.output_tokens
    )
    
    return jsonify({
        'content': message.content[0].text,
        'usage': {
            'input_tokens': message.usage.input_tokens,
            'output_tokens': message.usage.output_tokens,
            'cost': cost
        }
    })

@app.route('/api/usage', methods=['GET'])
@token_required
def get_usage(current_user):
    usage = UsageTracker.get_monthly_usage(current_user)
    return jsonify(usage)

Streaming Responses

Use streaming to reduce latency and improve UX:

from flask import Response, stream_with_context
import json

@app.route('/api/generate/stream', methods=['POST'])
@token_required
def generate_stream(current_user):
    data = request.get_json()
    prompt = data.get('prompt', '')
    
    def generate():
        try:
            with client.messages.stream(
                model="claude-sonnet-4-20250514",
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}]
            ) as stream:
                for text in stream.text_stream:
                    yield f"data: {json.dumps({'content': text})}\n\n"
                
                # Send final message with usage stats
                final_message = stream.get_final_message()
                yield f"data: {json.dumps({'done': True, 'usage': {
                    'input_tokens': final_message.usage.input_tokens,
                    'output_tokens': final_message.usage.output_tokens
                }})}\n\n"
        except Exception as e:
            yield f"data: {json.dumps({'error': str(e)})}\n\n"
    
    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'X-Accel-Buffering': 'no'
        }
    )

4. Error Handling and Resilience

Comprehensive Error Handling

from anthropic import APIError, APIConnectionError, RateLimitError, APIStatusError

class AIAPIError(Exception):
    def __init__(self, message, status_code, retryable=False):
        self.message = message
        self.status_code = status_code
        self.retryable = retryable
        super().__init__(self.message)

def robust_api_call(prompt, model="claude-sonnet-4-20250514"):
    try:
        message = client.messages.create(
            model=model,
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}]
        )
        return message
    
    except RateLimitError as e:
        raise AIAPIError('Rate limit exceeded', 429, retryable=True)
    
    except APIConnectionError as e:
        raise AIAPIError('Connection error', 503, retryable=True)
    
    except APIStatusError as e:
        if e.status_code >= 500:
            raise AIAPIError('Server error', e.status_code, retryable=True)
        else:
            raise AIAPIError(str(e), e.status_code, retryable=False)
    
    except APIError as e:
        raise AIAPIError(str(e), 500, retryable=False)

@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
    data = request.get_json()
    prompt = data.get('prompt', '')
    
    try:
        message = robust_api_call(prompt)
        return jsonify({
            'content': message.content[0].text,
            'usage': {
                'input_tokens': message.usage.input_tokens,
                'output_tokens': message.usage.output_tokens
            }
        })
    
    except AIAPIError as e:
        if e.retryable:
            # Could implement retry logic here
            return jsonify({
                'error': e.message,
                'retryable': True
            }), e.status_code
        else:
            return jsonify({
                'error': e.message,
                'retryable': False
            }), e.status_code

Circuit Breaker Pattern

Prevent cascading failures:

import time
from enum import Enum
from threading import Lock

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self.lock = Lock()
    
    def call(self, func, *args, **kwargs):
        with self.lock:
            if self.state == CircuitState.OPEN:
                if time.time() - self.last_failure_time > self.timeout:
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise Exception('Circuit breaker is OPEN')
        
        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise e
    
    def on_success(self):
        with self.lock:
            self.failure_count = 0
            self.state = CircuitState.CLOSED
    
    def on_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

# Global circuit breaker
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60)

@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
    data = request.get_json()
    prompt = data.get('prompt', '')
    
    try:
        message = circuit_breaker.call(robust_api_call, prompt)
        return jsonify({'content': message.content[0].text})
    except Exception as e:
        return jsonify({'error': str(e)}), 503

Timeout Handling

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import signal

class TimeoutException(Exception):
    pass

def timeout_handler(signum, frame):
    raise TimeoutException("Request timeout")

def call_with_timeout(func, timeout_seconds=30, *args, **kwargs):
    # Using threading for timeout
    with ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(func, *args, **kwargs)
        try:
            return future.result(timeout=timeout_seconds)
        except TimeoutError:
            raise TimeoutException("Request timeout")

@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
    data = request.get_json()
    prompt = data.get('prompt', '')
    
    try:
        message = call_with_timeout(robust_api_call, 30, prompt)
        return jsonify({'content': message.content[0].text})
    except TimeoutException:
        return jsonify({'error': 'Request timeout'}), 504

5. Monitoring and Observability

Logging Best Practices

import logging
from logging.handlers import RotatingFileHandler
import json
from datetime import datetime

# Configure logging
def setup_logging():
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # File handler
    file_handler = RotatingFileHandler(
        'app.log',
        maxBytes=10485760,  # 10MB
        backupCount=10
    )
    file_handler.setFormatter(formatter)
    
    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

logger = setup_logging()

class StructuredLogger:
    @staticmethod
    def log_api_call(user_id, prompt_length, success, duration, tokens=None, error=None):
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'event': 'api_call',
            'user_id': user_id,
            'prompt_length': prompt_length,
            'success': success,
            'duration_ms': duration,
            'tokens': tokens,
            'error': error
        }
        
        if success:
            logger.info(json.dumps(log_data))
        else:
            logger.error(json.dumps(log_data))

@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
    data = request.get_json()
    prompt = data.get('prompt', '')
    start_time = time.time()
    
    try:
        message = robust_api_call(prompt)
        duration = (time.time() - start_time) * 1000
        
        StructuredLogger.log_api_call(
            user_id=current_user,
            prompt_length=len(prompt),
            success=True,
            duration=duration,
            tokens=message.usage.input_tokens + message.usage.output_tokens
        )
        
        return jsonify({'content': message.content[0].text})
    
    except Exception as e:
        duration = (time.time() - start_time) * 1000
        
        StructuredLogger.log_api_call(
            user_id=current_user,
            prompt_length=len(prompt),
            success=False,
            duration=duration,
            error=str(e)
        )
        
        return jsonify({'error': str(e)}), 500

Metrics Collection

from dataclasses import dataclass
from typing import Dict
import threading

@dataclass
class Metrics:
    total_requests: int = 0
    successful_requests: int = 0
    failed_requests: int = 0
    total_latency: float = 0.0
    total_tokens: int = 0
    total_cost: float = 0.0

class MetricsCollector:
    def __init__(self):
        self.metrics = Metrics()
        self.lock = threading.Lock()
    
    def record_request(self, success, latency, tokens=0, cost=0.0):
        with self.lock:
            self.metrics.total_requests += 1
            
            if success:
                self.metrics.successful_requests += 1
                self.metrics.total_tokens += tokens
                self.metrics.total_cost += cost
            else:
                self.metrics.failed_requests += 1
            
            self.metrics.total_latency += latency
    
    def get_stats(self) -> Dict:
        with self.lock:
            if self.metrics.total_requests == 0:
                return {
                    'total_requests': 0,
                    'success_rate': 0,
                    'avg_latency_ms': 0,
                    'total_tokens': 0,
                    'total_cost': 0
                }
            
            return {
                'total_requests': self.metrics.total_requests,
                'successful_requests': self.metrics.successful_requests,
                'failed_requests': self.metrics.failed_requests,
                'success_rate': self.metrics.successful_requests / self.metrics.total_requests,
                'avg_latency_ms': self.metrics.total_latency / self.metrics.total_requests,
                'total_tokens': self.metrics.total_tokens,
                'total_cost': self.metrics.total_cost
            }
    
    def reset(self):
        with self.lock:
            self.metrics = Metrics()

# Global metrics collector
metrics = MetricsCollector()

@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
    data = request.get_json()
    prompt = data.get('prompt', '')
    start_time = time.time()
    
    try:
        message = robust_api_call(prompt)
        duration = (time.time() - start_time) * 1000
        
        tokens = message.usage.input_tokens + message.usage.output_tokens
        cost = UsageTracker.calculate_cost(
            'claude-sonnet-4-20250514',
            message.usage.input_tokens,
            message.usage.output_tokens
        )
        
        metrics.record_request(True, duration, tokens, cost)
        
        return jsonify({'content': message.content[0].text})
    
    except Exception as e:
        duration = (time.time() - start_time) * 1000
        metrics.record_request(False, duration)
        return jsonify({'error': str(e)}), 500

@app.route('/api/metrics', methods=['GET'])
def get_metrics():
    return jsonify(metrics.get_stats())

Prometheus Integration

For production environments, integrate with Prometheus for metrics:

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from flask import Response

# Define metrics
api_requests_total = Counter(
    'api_requests_total',
    'Total API requests',
    ['method', 'endpoint', 'status']
)

api_request_duration = Histogram(
    'api_request_duration_seconds',
    'API request duration',
    ['method', 'endpoint']
)

api_tokens_used = Counter(
    'api_tokens_used_total',
    'Total tokens used',
    ['model', 'user_id']
)

api_cost_total = Counter(
    'api_cost_total',
    'Total API cost in USD',
    ['model', 'user_id']
)

active_requests = Gauge(
    'active_requests',
    'Number of active requests'
)

@app.route('/api/generate', methods=['POST'])
@token_required
def generate_text(current_user):
    active_requests.inc()
    start_time = time.time()
    
    try:
        data = request.get_json()
        prompt = data.get('prompt', '')
        model = 'claude-sonnet-4-20250514'
        
        message = robust_api_call(prompt, model)
        
        # Record metrics
        duration = time.time() - start_time
        tokens = message.usage.input_tokens + message.usage.output_tokens
        cost = UsageTracker.calculate_cost(
            model,
            message.usage.input_tokens,
            message.usage.output_tokens
        )
        
        api_requests_total.labels('POST', '/api/generate', '200').inc()
        api_request_duration.labels('POST', '/api/generate').observe(duration)
        api_tokens_used.labels(model, current_user).inc(tokens)
        api_cost_total.labels(model, current_user).inc(cost)
        
        return jsonify({'content': message.content[0].text})
    
    except Exception as e:
        api_requests_total.labels('POST', '/api/generate', '500').inc()
        return jsonify({'error': str(e)}), 500
    
    finally:
        active_requests.dec()

@app.route('/metrics')
def prometheus_metrics():
    return Response(generate_latest(), mimetype='text/plain')

Health Checks

from datetime import datetime
import psutil

@app.route('/health', methods=['GET'])
def health_check():
    health_status = {
        'status': 'ok',
        'timestamp': datetime.utcnow().isoformat(),
        'uptime': time.time() - app.config.get('START_TIME', time.time()),
        'version': '1.0.0'
    }
    
    # Check API connectivity
    try:
        test_message = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=10,
            messages=[{"role": "user", "content": "test"}]
        )
        health_status['api_status'] = 'connected'
    except Exception as e:
        health_status['api_status'] = 'disconnected'
        health_status['api_error'] = str(e)
        health_status['status'] = 'degraded'
    
    # Check database connectivity
    try:
        db.session.execute('SELECT 1')
        health_status['database_status'] = 'connected'
    except Exception as e:
        health_status['database_status'] = 'disconnected'
        health_status['database_error'] = str(e)
        health_status['status'] = 'degraded'
    
    # Check Redis connectivity
    try:
        cache.set('health_check', 'ok', timeout=1)
        health_status['cache_status'] = 'connected'
    except Exception as e:
        health_status['cache_status'] = 'disconnected'
        health_status['cache_error'] = str(e)
        health_status['status'] = 'degraded'
    
    # System metrics
    health_status['system'] = {
        'cpu_percent': psutil.cpu_percent(),
        'memory_percent': psutil.virtual_memory().percent,
        'disk_percent': psutil.disk_usage('/').percent
    }
    
    status_code = 200 if health_status['status'] == 'ok' else 503
    return jsonify(health_status), status_code

@app.route('/health/ready', methods=['GET'])
def readiness_check():
    """Kubernetes readiness probe"""
    try:
        # Check if app can handle requests
        db.session.execute('SELECT 1')
        return jsonify({'status': 'ready'}), 200
    except:
        return jsonify({'status': 'not ready'}), 503

@app.route('/health/live', methods=['GET'])
def liveness_check():
    """Kubernetes liveness probe"""
    return jsonify({'status': 'alive'}), 200

Application Performance Monitoring (APM)

Integration with APM tools like New Relic or DataDog:

# Example with New Relic
import newrelic.agent
newrelic.agent.initialize('newrelic.ini')

@newrelic.agent.background_task()
def background_ai_task(prompt):
    return client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )

@app.route('/api/generate', methods=['POST'])
@token_required
@newrelic.agent.function_trace()
def generate_text(current_user):
    data = request.get_json()
    prompt = data.get('prompt', '')
    
    # APM will automatically track this
    with newrelic.agent.FunctionTrace('anthropic_api_call'):
        message = robust_api_call(prompt)
    
    return jsonify({'content': message.content[0].text})

Complete Example Application

Here’s a complete Flask application putting it all together:

# app.py
from flask import Flask, request, jsonify, Response
from flask_sqlalchemy import SQLAlchemy
from flask_caching import Cache
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import anthropic
import os
import time
import logging
from functools import wraps
import jwt
from datetime import datetime, timedelta

# Initialize Flask app
app = Flask(__name__)

# Configuration
app.config['SECRET_KEY'] = os.getenv('JWT_SECRET_KEY')
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['START_TIME'] = time.time()

# Initialize extensions
db = SQLAlchemy(app)
cache = Cache(app, config={
    'CACHE_TYPE': 'redis',
    'CACHE_REDIS_URL': os.getenv('REDIS_URL', 'redis://localhost:6379/0')
})

# Rate limiter
def get_user_id():
    token = request.headers.get('Authorization', '').replace('Bearer ', '')
    try:
        data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
        return data['user_id']
    except:
        return get_remote_address()

limiter = Limiter(
    app=app,
    key_func=get_user_id,
    storage_uri=os.getenv('REDIS_URL', 'redis://localhost:6379/0')
)

# Initialize Anthropic client
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Models
class UsageRecord(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.String(50), nullable=False, index=True)
    timestamp = db.Column(db.DateTime, default=datetime.utcnow, index=True)
    input_tokens = db.Column(db.Integer)
    output_tokens = db.Column(db.Integer)
    cost = db.Column(db.Float)
    model = db.Column(db.String(50))

# Authentication decorator
def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        
        if not token or not token.startswith('Bearer '):
            return jsonify({'error': 'Token is missing'}), 401
        
        try:
            token = token[7:]
            data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
            current_user = data['user_id']
        except jwt.ExpiredSignatureError:
            return jsonify({'error': 'Token has expired'}), 401
        except jwt.InvalidTokenError:
            return jsonify({'error': 'Invalid token'}), 401
        
        return f(current_user, *args, **kwargs)
    
    return decorated

# Usage tracker
class UsageTracker:
    PRICING = {
        'claude-haiku-3-20240307': {'input': 0.25, 'output': 1.25},
        'claude-sonnet-4-20250514': {'input': 3.00, 'output': 15.00},
        'claude-opus-4-20250514': {'input': 15.00, 'output': 75.00}
    }
    
    @classmethod
    def calculate_cost(cls, model, input_tokens, output_tokens):
        pricing = cls.PRICING.get(model, cls.PRICING['claude-sonnet-4-20250514'])
        input_cost = (input_tokens / 1_000_000) * pricing['input']
        output_cost = (output_tokens / 1_000_000) * pricing['output']
        return input_cost + output_cost
    
    @classmethod
    def track_usage(cls, user_id, model, input_tokens, output_tokens):
        cost = cls.calculate_cost(model, input_tokens, output_tokens)
        
        record = UsageRecord(
            user_id=user_id,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost=cost,
            model=model
        )
        db.session.add(record)
        db.session.commit()
        
        return cost
    
    @classmethod
    def get_monthly_usage(cls, user_id):
        start_of_month = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0)
        
        usage = db.session.query(
            db.func.sum(UsageRecord.input_tokens).label('total_input_tokens'),
            db.func.sum(UsageRecord.output_tokens).label('total_output_tokens'),
            db.func.sum(UsageRecord.cost).label('total_cost')
        ).filter(
            UsageRecord.user_id == user_id,
            UsageRecord.timestamp >= start_of_month
        ).first()
        
        return {
            'input_tokens': usage.total_input_tokens or 0,
            'output_tokens': usage.total_output_tokens or 0,
            'total_cost': float(usage.total_cost or 0)
        }
    
    @classmethod
    def check_budget(cls, user_id, budget_limit=10.0):
        usage = cls.get_monthly_usage(user_id)
        return usage['total_cost'] < budget_limit

# Routes
@app.route('/api/auth/login', methods=['POST'])
def login():
    """Generate JWT token for demo purposes"""
    data = request.get_json()
    user_id = data.get('user_id')
    
    if not user_id:
        return jsonify({'error': 'user_id required'}), 400
    
    token = jwt.encode({
        'user_id': user_id,
        'exp': datetime.utcnow() + timedelta(days=1)
    }, app.config['SECRET_KEY'], algorithm='HS256')
    
    return jsonify({'token': token})

@app.route('/api/generate', methods=['POST'])
@limiter.limit("10 per minute")
@token_required
def generate_text(current_user):
    """Generate text using Claude API"""
    
    # Check budget
    if not UsageTracker.check_budget(current_user, budget_limit=10.0):
        return jsonify({'error': 'Monthly budget exceeded'}), 402
    
    data = request.get_json()
    prompt = data.get('prompt', '')
    model = data.get('model', 'claude-sonnet-4-20250514')
    
    if not prompt or len(prompt) > 100000:
        return jsonify({'error': 'Invalid prompt'}), 400
    
    start_time = time.time()
    
    try:
        # Make API call
        message = client.messages.create(
            model=model,
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}]
        )
        
        duration = (time.time() - start_time) * 1000
        
        # Track usage
        cost = UsageTracker.track_usage(
            current_user,
            model,
            message.usage.input_tokens,
            message.usage.output_tokens
        )
        
        logger.info(f"API call successful - user: {current_user}, duration: {duration}ms, tokens: {message.usage.input_tokens + message.usage.output_tokens}")
        
        return jsonify({
            'content': message.content[0].text,
            'usage': {
                'input_tokens': message.usage.input_tokens,
                'output_tokens': message.usage.output_tokens,
                'cost': cost
            },
            'duration_ms': duration
        })
    
    except Exception as e:
        duration = (time.time() - start_time) * 1000
        logger.error(f"API call failed - user: {current_user}, error: {str(e)}, duration: {duration}ms")
        return jsonify({'error': str(e)}), 500

@app.route('/api/usage', methods=['GET'])
@token_required
def get_usage(current_user):
    """Get current month usage for user"""
    usage = UsageTracker.get_monthly_usage(current_user)
    return jsonify(usage)

@app.route('/health', methods=['GET'])
def health_check():
    """Health check endpoint"""
    health_status = {
        'status': 'ok',
        'timestamp': datetime.utcnow().isoformat(),
        'uptime': time.time() - app.config['START_TIME']
    }
    
    try:
        db.session.execute('SELECT 1')
        health_status['database'] = 'ok'
    except:
        health_status['database'] = 'error'
        health_status['status'] = 'degraded'
    
    status_code = 200 if health_status['status'] == 'ok' else 503
    return jsonify(health_status), status_code

if __name__ == '__main__':
    with app.app_context():
        db.create_all()
    app.run(debug=True, port=5000)

Requirements File

# requirements.txt
Flask==3.0.0
Flask-SQLAlchemy==3.1.1
Flask-Caching==2.1.0
Flask-Limiter==3.5.0
anthropic==0.21.0
PyJWT==2.8.0
python-dotenv==1.0.0
psycopg2-binary==2.9.9
redis==5.0.1
psutil==5.9.6
prometheus-client==0.19.0

Environment Variables

# .env
ANTHROPIC_API_KEY=your_api_key_here
JWT_SECRET_KEY=your_secret_key_here
DATABASE_URL=postgresql://localhost/ai_usage
REDIS_URL=redis://localhost:6379/0
FLASK_ENV=development

Deployment Best Practices

Docker Configuration

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# Expose port
EXPOSE 5000

# Run with gunicorn
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "--timeout", "120", "app:app"]

Docker Compose

# docker-compose.yml
version: '3.8'

services:
  web:
    build: .
    ports:
      - "5000:5000"
    environment:
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - JWT_SECRET_KEY=${JWT_SECRET_KEY}
      - DATABASE_URL=postgresql://postgres:password@db:5432/ai_usage
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
  
  db:
    image: postgres:15
    environment:
      - POSTGRES_DB=ai_usage
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
  
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

volumes:
  postgres_data:
  redis_data:

Conclusion

Building robust AI API integrations with Python and Flask requires careful attention to:

  1. Security: Environment variables, backend proxies, input validation, JWT authentication
  2. Rate Limiting: Flask-Limiter, custom rate limiters, exponential backoff, Celery queues
  3. Cost Management: Token optimization, caching, usage tracking with SQLAlchemy, budget enforcement
  4. Error Handling: Custom exceptions, circuit breakers, timeout handling, retry logic
  5. Monitoring: Structured logging, metrics collection, Prometheus integration, health checks

By implementing these patterns, you’ll create AI-powered applications that are secure, reliable, scalable, and cost-effective.


Additional Resources:

Categorized in: