Python & FastAPI Coding Standards Handbook
Production-grade conventions for building scalable, type-safe, and maintainable APIs with Python 3.12+ and FastAPI 0.115+.
Project Structure
Use a domain-driven flat structure for small-to-medium APIs, or a modular package structure for large services. Always separate concerns: routes, schemas, services, and data access.
# Recommended structure — modular domain layout project-root/ ├── app/ │ ├── __init__.py │ ├── main.py # App factory, lifespan │ ├── config.py # Settings via pydantic-settings │ ├── dependencies.py # Shared DI providers │ ├── exceptions.py # Custom exception classes │ ├── middleware.py # CORS, logging, rate limiting │ │ │ ├── auth/ │ │ ├── __init__.py │ │ ├── router.py # POST /auth/login, /auth/register │ │ ├── schemas.py # LoginRequest, TokenResponse │ │ ├── service.py # Business logic │ │ ├── dependencies.py # get_current_user │ │ └── constants.py │ │ │ ├── users/ │ │ ├── __init__.py │ │ ├── router.py │ │ ├── schemas.py │ │ ├── service.py │ │ ├── repository.py # Data access layer │ │ └── models.py # SQLAlchemy ORM models │ │ │ ├── orders/ │ │ ├── ... # Same pattern per domain │ │ │ └── core/ │ ├── database.py # Engine, session factory │ ├── security.py # JWT, hashing utilities │ └── pagination.py # Shared pagination logic │ ├── alembic/ │ ├── env.py │ └── versions/ ├── tests/ │ ├── conftest.py │ ├── test_auth/ │ ├── test_users/ │ └── test_orders/ ├── alembic.ini ├── pyproject.toml ├── Dockerfile ├── docker-compose.yml └── .env.example
Tooling & Environment
| Tool | Purpose | Config |
|---|---|---|
| uv | Package manager & virtual env (replaces pip/venv) | pyproject.toml |
| Ruff | Linter + formatter (replaces flake8, black, isort) | [tool.ruff] in pyproject.toml |
| mypy | Static type checker — strict mode | [tool.mypy] in pyproject.toml |
| pytest | Test runner + async support | [tool.pytest.ini_options] |
| pre-commit | Git hooks for ruff, mypy, tests | .pre-commit-config.yaml |
| Alembic | Database migrations | alembic.ini |
# pyproject.toml — essential config [project] name = "my-api" version = "1.0.0" requires-python = ">=3.12" dependencies = [ "fastapi>=0.115", "uvicorn[standard]>=0.32", "pydantic>=2.10", "pydantic-settings>=2.6", "sqlalchemy>=2.0", "alembic>=1.14", "asyncpg>=0.30", "python-jose[cryptography]>=3.3", "passlib[bcrypt]>=1.7", "httpx>=0.28", ] [dependency-groups] dev = [ "pytest>=8.3", "pytest-asyncio>=0.24", "pytest-cov>=6.0", "httpx>=0.28", "mypy>=1.13", "ruff>=0.8", "pre-commit>=4.0", ] [tool.ruff] target-version = "py312" line-length = 99 [tool.ruff.lint] select = ["E", "F", "W", "I", "N", "UP", "S", "B", "A", "T20", "RUF"] ignore = ["E501"] # line length handled by formatter [tool.ruff.lint.isort] known-first-party = ["app"] [tool.mypy] python_version = "3.12" strict = true plugins = ["pydantic.mypy"] [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] addopts = "--strict-markers --tb=short -q"
# Quick start with uv uv init my-api cd my-api uv add fastapi uvicorn[standard] pydantic-settings sqlalchemy asyncpg uv add --group dev pytest pytest-asyncio ruff mypy uv run uvicorn app.main:app --reload
Type Hints & Annotations
Type hints are mandatory on all function signatures, class attributes, and module-level variables. Use modern Python 3.12+ syntax — no typing.Optional or typing.List.
Use list[str] not List[str], dict[str, int] not Dict[str, int], str | None not Optional[str].
# ✅ Correct — Python 3.12+ style def get_users( skip: int = 0, limit: int = 100, active_only: bool = True, ) -> list[UserResponse]: ... def find_order(order_id: int) -> Order | None: ... # Use type aliases for complex types type UserMap = dict[str, list[UserResponse]] type ErrorDetail = dict[str, str | list[str]] # ❌ Wrong — legacy typing imports from typing import Optional, List, Dict def get_users() -> List[UserResponse]: # Don't do this ...
# Annotated for FastAPI dependency injection + validation from typing import Annotated from fastapi import Depends, Query CurrentUser = Annotated[User, Depends(get_current_user)] PaginationSkip = Annotated[int, Query(ge=0, le=10000)] PaginationLimit = Annotated[int, Query(ge=1, le=100)] @router.get("/") async def list_users( user: CurrentUser, skip: PaginationSkip = 0, limit: PaginationLimit = 20, ) -> PaginatedResponse[UserResponse]: ...
Naming Conventions
| Element | Convention | Example |
|---|---|---|
| Modules | snake_case, short, singular | user_service.py, auth.py |
| Packages | snake_case, plural for domains | users/, orders/ |
| Classes | PascalCase | UserService, OrderRepository |
| Functions / Methods | snake_case, verb-first | get_user_by_id(), create_order() |
| Constants | UPPER_SNAKE_CASE | MAX_RETRIES, DEFAULT_PAGE_SIZE |
| Pydantic Schemas | PascalCase + intent suffix | UserCreate, UserResponse, UserUpdate |
| SQLAlchemy Models | PascalCase, singular noun | User, Order, OrderItem |
| Route paths | kebab-case, plural nouns | /api/v1/order-items |
| Query params | snake_case | ?page_size=20&sort_by=name |
| Env variables | UPPER_SNAKE_CASE, prefixed | APP_DATABASE_URL, APP_SECRET_KEY |
| Private attrs | Leading underscore | _cache, _validate_input() |
id, url, db, api). Write get_authenticated_user not get_auth_usr.Imports & Modules
# Import order: stdlib → third-party → local (Ruff isort handles this) from collections.abc import Sequence # stdlib from datetime import UTC, datetime from fastapi import APIRouter, Depends # third-party from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_db # local from app.users.schemas import UserCreate, UserResponse from app.users.service import UserService
- Absolute imports only — no relative imports (
from . import x) except within a package's__init__.py - No wildcard imports —
from module import *is banned - Import what you use — don't import entire modules when you need one symbol
- Avoid circular imports — if service A needs service B, inject via DI, don't cross-import
Error Handling
Use custom exceptions that map to HTTP responses. Never return error dicts manually — let exception handlers do the work.
# app/exceptions.py from fastapi import HTTPException, status class AppException(HTTPException): """Base exception for all application errors.""" def __init__(self, detail: str, status_code: int = status.HTTP_400_BAD_REQUEST) -> None: super().__init__(status_code=status_code, detail=detail) class NotFoundError(AppException): def __init__(self, resource: str, identifier: str | int) -> None: super().__init__( detail=f"{resource} with id '{identifier}' not found", status_code=status.HTTP_404_NOT_FOUND, ) class ConflictError(AppException): def __init__(self, detail: str) -> None: super().__init__(detail=detail, status_code=status.HTTP_409_CONFLICT) class ForbiddenError(AppException): def __init__(self, detail: str = "Insufficient permissions") -> None: super().__init__(detail=detail, status_code=status.HTTP_403_FORBIDDEN)
# Usage in service layer — raise, don't return class UserService: async def get_by_id(self, user_id: int) -> User: user = await self.repo.get(user_id) if not user: raise NotFoundError("User", user_id) return user async def create(self, data: UserCreate) -> User: existing = await self.repo.get_by_email(data.email) if existing: raise ConflictError(f"Email '{data.email}' already registered") return await self.repo.create(data)
# Global exception handler for unhandled errors # app/main.py from fastapi import Request from fastapi.responses import JSONResponse @app.exception_handler(Exception) async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse: logger.exception("Unhandled error on %s %s", request.method, request.url.path) return JSONResponse( status_code=500, content={"detail": "Internal server error"}, )
Async / Await Patterns
FastAPI is async-first. Use async def for all route handlers and DB operations. Use sync def only for CPU-bound helpers that FastAPI will run in a thread pool.
# ✅ Async route — I/O bound (DB, HTTP calls) @router.get("/{user_id}") async def get_user(user_id: int, db: AsyncSession = Depends(get_db)) -> UserResponse: result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise NotFoundError("User", user_id) return UserResponse.model_validate(user) # ✅ Sync route — CPU bound (FastAPI auto-threadpools this) @router.post("/hash") def compute_hash(payload: HashRequest) -> HashResponse: result = expensive_cpu_computation(payload.data) return HashResponse(hash=result) # ✅ Running sync code inside async context import anyio async def process_upload(file_data: bytes) -> str: # Offload CPU-bound work to thread pool result = await anyio.to_thread.run_sync(parse_csv, file_data) return result
async def. Calling time.sleep(), synchronous requests.get(), or blocking file reads inside an async handler will block the event loop. Use httpx.AsyncClient, aiofiles, or anyio.to_thread.run_sync().# ✅ Parallel async operations import asyncio async def get_dashboard_data(user_id: int) -> DashboardResponse: profile, orders, notifications = await asyncio.gather( user_service.get_profile(user_id), order_service.get_recent(user_id, limit=5), notification_service.get_unread(user_id), ) return DashboardResponse( profile=profile, orders=orders, notifications=notifications, )
App Factory & Lifespan
Use the app factory pattern with lifespan context manager for startup/shutdown. Never use deprecated @app.on_event.
# app/main.py from collections.abc import AsyncIterator from contextlib import asynccontextmanager from fastapi import FastAPI from app.config import settings from app.core.database import engine from app.middleware import setup_middleware from app.auth.router import router as auth_router from app.users.router import router as users_router @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: # ── Startup ── logger.info("Starting %s in %s mode", settings.app_name, settings.environment) # Initialize connection pools, caches, ML models, etc. yield # ── Shutdown ── await engine.dispose() logger.info("Shutdown complete") def create_app() -> FastAPI: app = FastAPI( title=settings.app_name, version=settings.app_version, lifespan=lifespan, docs_url="/docs" if settings.environment != "production" else None, redoc_url=None, ) setup_middleware(app) app.include_router(auth_router, prefix="/api/v1/auth", tags=["auth"]) app.include_router(users_router, prefix="/api/v1/users", tags=["users"]) return app app = create_app()
docs_url=None and redoc_url=None in production to avoid exposing API schema endpoints publicly.Router Organization
Each domain module has a single router.py. Use APIRouter with consistent patterns. Group by resource, version with URL prefix.
# app/users/router.py from typing import Annotated from fastapi import APIRouter, Depends, status from app.auth.dependencies import get_current_user from app.users.schemas import UserCreate, UserResponse, UserUpdate from app.users.service import UserService router = APIRouter() CurrentUser = Annotated[User, Depends(get_current_user)] @router.get("/", response_model=list[UserResponse]) async def list_users( service: Annotated[UserService, Depends()], skip: int = 0, limit: int = 20, ) -> list[UserResponse]: return await service.list(skip=skip, limit=limit) @router.post("/", status_code=status.HTTP_201_CREATED, response_model=UserResponse) async def create_user( data: UserCreate, service: Annotated[UserService, Depends()], ) -> UserResponse: return await service.create(data) @router.get("/{user_id}", response_model=UserResponse) async def get_user( user_id: int, service: Annotated[UserService, Depends()], ) -> UserResponse: return await service.get_by_id(user_id) @router.patch("/{user_id}", response_model=UserResponse) async def update_user( user_id: int, data: UserUpdate, current_user: CurrentUser, service: Annotated[UserService, Depends()], ) -> UserResponse: return await service.update(user_id, data, actor=current_user) @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_user( user_id: int, current_user: CurrentUser, service: Annotated[UserService, Depends()], ) -> None: await service.delete(user_id, actor=current_user)
response_model on the decorator and a return type annotation on the function. They should agree. FastAPI uses response_model for serialization filtering.Pydantic Schemas (v2)
Schemas define API contracts. Separate schemas for input (Create/Update) and output (Response). Never expose ORM models directly.
# app/users/schemas.py from datetime import datetime from pydantic import BaseModel, ConfigDict, EmailStr, Field # ── Base (shared fields) ── class UserBase(BaseModel): email: EmailStr full_name: str = Field(min_length=1, max_length=255) # ── Input schemas ── class UserCreate(UserBase): password: str = Field(min_length=8, max_length=128) class UserUpdate(BaseModel): # All fields optional for PATCH full_name: str | None = Field(None, min_length=1, max_length=255) email: EmailStr | None = None # ── Output schemas ── class UserResponse(UserBase): model_config = ConfigDict(from_attributes=True) id: int is_active: bool created_at: datetime # ── Generic paginated wrapper ── from typing import Generic, TypeVar from pydantic import BaseModel T = TypeVar("T") class PaginatedResponse(BaseModel, Generic[T]): items: list[T] total: int page: int page_size: int has_next: bool
ConfigDict(from_attributes=True) for ORM integration.Use
Field() for validation constraints.Use
EmailStr for email validation.Create
model_config = ConfigDict(strict=True) for strict type coercion.
class Config: (Pydantic v1 syntax).Don't expose password fields in response schemas.
Don't use
orm_mode = True (v1 syntax).Don't mix input and output in one schema.
Dependencies & Dependency Injection
FastAPI's DI system is your primary composition tool. Use it for database sessions, auth, services, and cross-cutting concerns.
# app/dependencies.py — shared dependencies from typing import Annotated, AsyncIterator from fastapi import Depends from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import async_session_factory async def get_db() -> AsyncIterator[AsyncSession]: async with async_session_factory() as session: try: yield session await session.commit() except Exception: await session.rollback() raise # Reusable annotated types DbSession = Annotated[AsyncSession, Depends(get_db)]
# app/users/service.py — service with injected dependencies from app.dependencies import DbSession from app.users.repository import UserRepository class UserService: def __init__(self, db: DbSession) -> None: self.repo = UserRepository(db) async def get_by_id(self, user_id: int) -> User: user = await self.repo.get(user_id) if not user: raise NotFoundError("User", user_id) return user
Middleware & CORS
# app/middleware.py import time import uuid from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from starlette.middleware.trustedhost import TrustedHostMiddleware from app.config import settings def setup_middleware(app: FastAPI) -> None: # Order matters — outermost first app.add_middleware( TrustedHostMiddleware, allowed_hosts=settings.allowed_hosts, ) app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origins, # Never ["*"] in production allow_credentials=True, allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE"], allow_headers=["Authorization", "Content-Type"], ) @app.middleware("http") async def request_context_middleware(request: Request, call_next): request_id = uuid.uuid4().hex[:12] start = time.perf_counter() response = await call_next(request) duration_ms = (time.perf_counter() - start) * 1000 response.headers["X-Request-ID"] = request_id response.headers["X-Process-Time-Ms"] = f"{duration_ms:.1f}" logger.info( "%s %s %s %.1fms", request.method, request.url.path, response.status_code, duration_ms, extra={"request_id": request_id}, ) return response
allow_origins=["*"] with allow_credentials=True. This is a security vulnerability. Always explicitly list allowed origins in production.Database & SQLAlchemy 2.0
Use SQLAlchemy 2.0 with async engine and the new Mapped type annotations. All DB access is async.
# app/core/database.py from sqlalchemy.ext.asyncio import ( AsyncSession, async_sessionmaker, create_async_engine, ) from sqlalchemy.orm import DeclarativeBase, MappedAsDataclass from app.config import settings engine = create_async_engine( settings.database_url, echo=settings.db_echo, pool_size=20, max_overflow=10, pool_pre_ping=True, ) async_session_factory = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, ) class Base(DeclarativeBase): pass
# app/users/models.py — SQLAlchemy 2.0 Mapped style from datetime import UTC, datetime from sqlalchemy import String, func from sqlalchemy.orm import Mapped, mapped_column from app.core.database import Base class User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(primary_key=True) email: Mapped[str] = mapped_column( String(255), unique=True, index=True, ) full_name: Mapped[str] = mapped_column(String(255)) hashed_password: Mapped[str] = mapped_column(String(255)) is_active: Mapped[bool] = mapped_column(default=True) created_at: Mapped[datetime] = mapped_column( server_default=func.now(), ) updated_at: Mapped[datetime | None] = mapped_column( onupdate=func.now(), )
Alembic Migrations
# Initialize Alembic with async template alembic init -t async alembic # Generate migration from model changes alembic revision --autogenerate -m "add users table" # Apply migrations alembic upgrade head # Rollback one step alembic downgrade -1
# alembic/env.py — async setup from app.core.database import Base from app.config import settings # Import all models so autogenerate detects them from app.users.models import User # noqa: F401 from app.orders.models import Order # noqa: F401 target_metadata = Base.metadata config.set_main_option("sqlalchemy.url", settings.database_url)
- One migration per logical change — don't bundle unrelated schema changes
- Always review autogenerated migrations — autogenerate misses constraints, indexes, and data migrations
- Never edit applied migrations — create a new migration to fix issues
- Include downgrade logic — every
upgrade()must have a correspondingdowngrade()
Repository Pattern
Encapsulate all database queries in repository classes. Services call repositories, never raw SQL or ORM queries.
# app/users/repository.py from collections.abc import Sequence from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.users.models import User from app.users.schemas import UserCreate class UserRepository: def __init__(self, db: AsyncSession) -> None: self.db = db async def get(self, user_id: int) -> User | None: return await self.db.get(User, user_id) async def get_by_email(self, email: str) -> User | None: result = await self.db.execute( select(User).where(User.email == email) ) return result.scalar_one_or_none() async def list( self, *, skip: int = 0, limit: int = 20, ) -> Sequence[User]: result = await self.db.execute( select(User).offset(skip).limit(limit).order_by(User.id) ) return result.scalars().all() async def create(self, data: UserCreate, hashed_pw: str) -> User: user = User( email=data.email, full_name=data.full_name, hashed_password=hashed_pw, ) self.db.add(user) await self.db.flush() return user async def delete(self, user: User) -> None: await self.db.delete(user)
Authentication & Authorization
# app/core/security.py from datetime import UTC, datetime, timedelta from jose import JWTError, jwt from passlib.context import CryptContext from app.config import settings pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) def create_access_token( subject: str, expires_delta: timedelta | None = None, ) -> str: expire = datetime.now(UTC) + (expires_delta or timedelta(minutes=30)) payload = {"sub": subject, "exp": expire} return jwt.encode(payload, settings.secret_key, algorithm="HS256")
# app/auth/dependencies.py from typing import Annotated from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from app.config import settings from app.dependencies import DbSession from app.users.models import User oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") async def get_current_user( token: Annotated[str, Depends(oauth2_scheme)], db: DbSession, ) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"]) user_id: str | None = payload.get("sub") if user_id is None: raise credentials_exception except JWTError: raise credentials_exception user = await db.get(User, int(user_id)) if user is None: raise credentials_exception return user # Role-based access control dependency def require_role(*roles: str): async def _check(user: Annotated[User, Depends(get_current_user)]) -> User: if user.role not in roles: raise HTTPException(status_code=403, detail="Insufficient permissions") return user return _check # Usage: AdminUser = Annotated[User, Depends(require_role("admin"))]
Security Hardening
Field() constraints.Path params typed as
int (never raw strings for IDs).Use
Query(ge=0, le=100) for bounds.
Use
bindparam for dynamic queries.
pydantic-settings with .env files.Rotate JWT keys via environment variables.
slowapi or reverse proxy (nginx/Traefik) rate limiting.Limit auth endpoints more aggressively.
# Security headers middleware @app.middleware("http") async def security_headers(request: Request, call_next): response = await call_next(request) response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" response.headers["Content-Security-Policy"] = "default-src 'self'" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" return response
Secrets & Configuration
# app/config.py — pydantic-settings from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): model_config = SettingsConfigDict( env_file=".env", env_prefix="APP_", case_sensitive=False, ) # App app_name: str = "My API" app_version: str = "1.0.0" environment: str = "development" # development | staging | production debug: bool = False # Security secret_key: str access_token_expire_minutes: int = 30 # Database database_url: str db_echo: bool = False # CORS cors_origins: list[str] = ["http://localhost:3000"] allowed_hosts: list[str] = ["*"] settings = Settings() # type: ignore[call-arg]
# .env.example (committed) — actual .env is gitignored
APP_SECRET_KEY=change-me-in-production
APP_DATABASE_URL=postgresql+asyncpg://user:pass@localhost:5432/mydb
APP_ENVIRONMENT=development
APP_CORS_ORIGINS=["http://localhost:3000"]
.env files. Add .env to .gitignore. Commit .env.example with placeholder values. In production, inject secrets via environment variables or a vault service.Testing Strategy
Test pyramid: unit tests (services, utils) → integration tests (routes + DB) → e2e tests (full API). Target 80%+ coverage.
# tests/conftest.py import pytest from httpx import ASGITransport, AsyncClient from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker from app.core.database import Base from app.dependencies import get_db from app.main import app TEST_DB_URL = "sqlite+aiosqlite:///./test.db" engine = create_async_engine(TEST_DB_URL) TestSession = async_sessionmaker(engine, expire_on_commit=False) @pytest.fixture(autouse=True) async def setup_db(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) yield async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) @pytest.fixture async def db(): async with TestSession() as session: yield session @pytest.fixture async def client(db): async def _override_db(): yield db app.dependency_overrides[get_db] = _override_db transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as ac: yield ac app.dependency_overrides.clear()
# tests/test_users/test_router.py import pytest from httpx import AsyncClient async def test_create_user(client: AsyncClient) -> None: response = await client.post("/api/v1/users/", json={ "email": "test@example.com", "full_name": "Test User", "password": "secure-pass-123", }) assert response.status_code == 201 data = response.json() assert data["email"] == "test@example.com" assert "password" not in data # Never in response async def test_create_user_duplicate_email(client: AsyncClient) -> None: payload = {"email": "dup@example.com", "full_name": "A", "password": "12345678"} await client.post("/api/v1/users/", json=payload) response = await client.post("/api/v1/users/", json=payload) assert response.status_code == 409 async def test_get_user_not_found(client: AsyncClient) -> None: response = await client.get("/api/v1/users/99999") assert response.status_code == 404
# Run tests uv run pytest # all tests uv run pytest tests/test_users/ -v # specific domain uv run pytest --cov=app --cov-report=term-missing
Logging & Observability
# app/core/logging.py — structured JSON logging import logging import sys import structlog from app.config import settings def setup_logging() -> None: log_level = logging.DEBUG if settings.debug else logging.INFO structlog.configure( processors=[ structlog.contextvars.merge_contextvars, structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer() if settings.environment == "production" else structlog.dev.ConsoleRenderer(), ], wrapper_class=structlog.stdlib.BoundLogger, context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True, ) logging.basicConfig( format="%(message)s", stream=sys.stdout, level=log_level, )
- Use structlog for structured, JSON-format logs in production
- Bind request context — request_id, user_id, path — to every log line
- Never log secrets — passwords, tokens, API keys
- Log at boundaries — on request entry, service calls, external API calls, errors
- Use log levels correctly — DEBUG for dev, INFO for operations, WARNING for recoverable issues, ERROR for failures
Performance & Caching
pool_size=20, max_overflow=10.Use
pool_pre_ping=True for stale connection detection.
fastapi-cache2 for endpoint caching.Cache-Control headers for static/semi-static data.
limit=20, max limit=100.Use cursor-based pagination for large datasets.
BackgroundTasks for non-critical work (emails, logs).Use Celery/ARQ for heavy async processing.
# Background tasks example from fastapi import BackgroundTasks @router.post("/", status_code=201) async def create_user( data: UserCreate, background_tasks: BackgroundTasks, service: Annotated[UserService, Depends()], ) -> UserResponse: user = await service.create(data) background_tasks.add_task(send_welcome_email, user.email, user.full_name) return UserResponse.model_validate(user)
# N+1 query prevention — use joinedload / selectinload from sqlalchemy.orm import selectinload stmt = ( select(Order) .options(selectinload(Order.items)) .where(Order.user_id == user_id) .offset(skip).limit(limit) ) result = await db.execute(stmt)
Deployment & Docker
# Dockerfile — multi-stage, non-root user FROM python:3.12-slim AS base ENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 FROM base AS builder WORKDIR /build COPY pyproject.toml . RUN pip install --no-cache-dir uv && \ uv pip install --system --no-cache . FROM base AS runtime RUN groupadd -r appuser && useradd -r -g appuser appuser WORKDIR /app COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages COPY --from=builder /usr/local/bin /usr/local/bin COPY app/ app/ COPY alembic/ alembic/ COPY alembic.ini . USER appuser EXPOSE 8000 CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml services: api: build: . ports: - "8000:8000" env_file: .env depends_on: db: condition: service_healthy db: image: postgres:16-alpine environment: POSTGRES_USER: appuser POSTGRES_PASSWORD: ${DB_PASSWORD} POSTGRES_DB: mydb volumes: - pgdata:/var/lib/postgresql/data healthcheck: test: ["CMD-SHELL", "pg_isready -U appuser"] interval: 5s retries: 5 volumes: pgdata:
# Production launch command
uvicorn app.main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--loop uvloop \
--http httptools \
--access-log \
--proxy-headers \
--forwarded-allow-ips="*"
alembic upgrade head && uvicorn ...Reference Links
Official Documentation
- FastAPI fastapi.tiangolo.com
- Pydantic v2 docs.pydantic.dev
- SQLAlchemy 2.0 docs.sqlalchemy.org
- Uvicorn uvicorn.org
- Alembic alembic.sqlalchemy.org
- Ruff docs.astral.sh/ruff
- uv docs.astral.sh/uv
- mypy mypy.readthedocs.io
Style Guides & Best Practices
- PEP 8 peps.python.org/pep-0008 — Style Guide
- PEP 484 peps.python.org/pep-0484 — Type Hints
- PEP 695 peps.python.org/pep-0695 — Type Aliases (3.12)
- Patterns FastAPI — Bigger Applications
- Security FastAPI — Security Tutorial
- Async FastAPI — Async / Await
Recommended Libraries
- HTTP client httpx — async HTTP client
- Logging structlog — structured logging
- Caching fastapi-cache2 — response caching
- Rate limit slowapi — rate limiting
- Tasks arq — async task queue
- Testing pytest-asyncio — async test support