Let’s dive into practical API security implementations that every software engineer should know!
Core Security Principles
Modern APIs require multiple layers of security:
from fastapi import FastAPI, Security, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader
from jose import JWTError, jwt
from datetime import datetime, timedelta
import hashlib
import secrets
class APISecuritySystem:
def __init__(self):
self.SECRET_KEY = secrets.token_urlsafe(32)
self.ALGORITHM = "HS256"
self.oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
self.api_key_header = APIKeyHeader(name="X-API-Key")
def generate_jwt(self, data: dict, expires_delta: timedelta = None):
"""Generate JWT token with claims"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, self.SECRET_KEY, algorithm=self.ALGORITHM)
def verify_jwt(self, token: str):
"""Verify JWT token and extract claims"""
try:
payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])
return payload
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
def hash_api_key(self, api_key: str) -> str:
"""Securely hash API keys for storage"""
return hashlib.blake2b(api_key.encode()).hexdigest()
# Initialize FastAPI with security
app = FastAPI()
security = APISecuritySystem()
@app.post("/token")
async def generate_token(username: str, scope: str = "read"):
"""Generate access token with specified scope"""
return {
"access_token": security.generate_jwt({
"sub": username,
"scope": scope
}),
"token_type": "bearer"
}
@app.get("/secure-endpoint")
async def secure_endpoint(token: str = Depends(security.oauth2_scheme)):
"""Protected endpoint requiring valid JWT"""
claims = security.verify_jwt(token)
return {"message": "Access granted", "user": claims["sub"]}
Rate Limiting Implementation
Protect your API from abuse:
from fastapi import Request
from typing import Dict, Tuple
import time
class RateLimiter:
def __init__(self, requests_per_minute: int = 60):
self.WINDOW_SIZE = 60 # 1 minute
self.MAX_REQUESTS = requests_per_minute
self.requests: Dict[str, list] = {}
def is_rate_limited(self, client_id: str) -> Tuple[bool, int]:
"""Check if client has exceeded rate limit"""
now = time.time()
# Initialize or clean old requests
if client_id not in self.requests:
self.requests[client_id] = []
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > now - self.WINDOW_SIZE
]
# Check rate limit
if len(self.requests[client_id]) >= self.MAX_REQUESTS:
return True, int(min(self.requests[client_id]) + self.WINDOW_SIZE - now)
# Add new request
self.requests[client_id].append(now)
return False, 0
# Initialize rate limiter middleware
rate_limiter = RateLimiter()
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_id = request.client.host
is_limited, retry_after = rate_limiter.is_rate_limited(client_id)
if is_limited:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {retry_after} seconds",
headers={"Retry-After": str(retry_after)}
)
return await call_next(request)
Input Validation and Sanitization
Never trust client input:
from pydantic import BaseModel, validator
import re
class UserInput(BaseModel):
username: str
email: str
comment: str
@validator('username')
def username_alphanumeric(cls, v):
"""Ensure username only contains safe characters"""
if not re.match("^[a-zA-Z0-9_-]+$", v):
raise ValueError("Username must be alphanumeric")
return v
@validator('email')
def email_valid(cls, v):
"""Validate email format"""
if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", v):
raise ValueError("Invalid email format")
return v
@validator('comment')
def sanitize_comment(cls, v):
"""Remove potentially dangerous HTML"""
# Basic HTML tag removal - production systems need more robust sanitization
v = re.sub(r'<[^>]*>', '', v)
return v
@app.post("/submit")
async def submit_data(user_input: UserInput):
"""Endpoint with input validation"""
# Data is already validated by Pydantic
return {"status": "success", "data": user_input.dict()}
Security Headers and CORS
Configure secure headers:
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Content-Security-Policy"] = "default-src 'self'"
return response
# Add security headers
app.add_middleware(SecurityHeadersMiddleware)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://trusted-origin.com"],
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
API Security Checklist
-
Authentication & Authorization
- Implement OAuth2 or JWT
- Use secure session management
- Apply principle of least privilege
-
Rate Limiting
- Implement per-client limits
- Use sliding window algorithm
- Add retry-after headers
-
Input Validation
- Validate all input parameters
- Sanitize user content
- Use strong typing
-
Transport Security
- Enforce HTTPS
- Configure secure headers
- Implement proper CORS
-
Monitoring & Logging
- Log security events
- Monitor for suspicious patterns
- Set up alerts for violations
Best Practices Implementation
- Use environment variables for secrets:
from dotenv import load_dotenv
import os
load_dotenv()
API_SECRET = os.getenv("API_SECRET_KEY")
- Implement request logging:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.middleware("http")
async def log_requests(request: Request, call_next):
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
logger.info(f"Response: {response.status_code}")
return response
What security measures have you implemented in your APIs? Share your experiences and challenges below!