Back to handbooks index

Python & FastAPI Coding Standards Handbook

Production-grade conventions for building scalable, type-safe, and maintainable APIs with Python 3.12+ and FastAPI 0.115+.

Python 3.12+ FastAPI 0.115+ Pydantic v2 · SQLAlchemy 2.0 March 2026

Project Structure

Use a domain-driven flat structure for small-to-medium APIs, or a modular package structure for large services. Always separate concerns: routes, schemas, services, and data access.

# Recommended structure — modular domain layout
project-root/
├── app/
│   ├── __init__.py
│   ├── main.py               # App factory, lifespan
│   ├── config.py              # Settings via pydantic-settings
│   ├── dependencies.py        # Shared DI providers
│   ├── exceptions.py          # Custom exception classes
│   ├── middleware.py           # CORS, logging, rate limiting
│   │
│   ├── auth/
│   │   ├── __init__.py
│   │   ├── router.py          # POST /auth/login, /auth/register
│   │   ├── schemas.py         # LoginRequest, TokenResponse
│   │   ├── service.py         # Business logic
│   │   ├── dependencies.py    # get_current_user
│   │   └── constants.py
│   │
│   ├── users/
│   │   ├── __init__.py
│   │   ├── router.py
│   │   ├── schemas.py
│   │   ├── service.py
│   │   ├── repository.py      # Data access layer
│   │   └── models.py          # SQLAlchemy ORM models
│   │
│   ├── orders/
│   │   ├── ...                # Same pattern per domain
│   │
│   └── core/
│       ├── database.py        # Engine, session factory
│       ├── security.py        # JWT, hashing utilities
│       └── pagination.py      # Shared pagination logic
│
├── alembic/
│   ├── env.py
│   └── versions/
├── tests/
│   ├── conftest.py
│   ├── test_auth/
│   ├── test_users/
│   └── test_orders/
├── alembic.ini
├── pyproject.toml
├── Dockerfile
├── docker-compose.yml
└── .env.example
💡
One domain = one package. Each domain module owns its router, schemas, service, repository, and models. Cross-domain calls go through service interfaces, never direct model imports.

Tooling & Environment

ToolPurposeConfig
uvPackage manager & virtual env (replaces pip/venv)pyproject.toml
RuffLinter + formatter (replaces flake8, black, isort)[tool.ruff] in pyproject.toml
mypyStatic type checker — strict mode[tool.mypy] in pyproject.toml
pytestTest runner + async support[tool.pytest.ini_options]
pre-commitGit hooks for ruff, mypy, tests.pre-commit-config.yaml
AlembicDatabase migrationsalembic.ini
# pyproject.toml — essential config
[project]
name = "my-api"
version = "1.0.0"
requires-python = ">=3.12"
dependencies = [
    "fastapi>=0.115",
    "uvicorn[standard]>=0.32",
    "pydantic>=2.10",
    "pydantic-settings>=2.6",
    "sqlalchemy>=2.0",
    "alembic>=1.14",
    "asyncpg>=0.30",
    "python-jose[cryptography]>=3.3",
    "passlib[bcrypt]>=1.7",
    "httpx>=0.28",
]

[dependency-groups]
dev = [
    "pytest>=8.3",
    "pytest-asyncio>=0.24",
    "pytest-cov>=6.0",
    "httpx>=0.28",
    "mypy>=1.13",
    "ruff>=0.8",
    "pre-commit>=4.0",
]

[tool.ruff]
target-version = "py312"
line-length = 99

[tool.ruff.lint]
select = ["E", "F", "W", "I", "N", "UP", "S", "B", "A", "T20", "RUF"]
ignore = ["E501"]  # line length handled by formatter

[tool.ruff.lint.isort]
known-first-party = ["app"]

[tool.mypy]
python_version = "3.12"
strict = true
plugins = ["pydantic.mypy"]

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "--strict-markers --tb=short -q"
# Quick start with uv
uv init my-api
cd my-api
uv add fastapi uvicorn[standard] pydantic-settings sqlalchemy asyncpg
uv add --group dev pytest pytest-asyncio ruff mypy
uv run uvicorn app.main:app --reload

Type Hints & Annotations

Type hints are mandatory on all function signatures, class attributes, and module-level variables. Use modern Python 3.12+ syntax — no typing.Optional or typing.List.

Rule: Use Built-in Generics Python 3.12+

Use list[str] not List[str], dict[str, int] not Dict[str, int], str | None not Optional[str].

# ✅ Correct — Python 3.12+ style
def get_users(
    skip: int = 0,
    limit: int = 100,
    active_only: bool = True,
) -> list[UserResponse]:
    ...

def find_order(order_id: int) -> Order | None:
    ...

# Use type aliases for complex types
type UserMap = dict[str, list[UserResponse]]
type ErrorDetail = dict[str, str | list[str]]

# ❌ Wrong — legacy typing imports
from typing import Optional, List, Dict
def get_users() -> List[UserResponse]:  # Don't do this
    ...
# Annotated for FastAPI dependency injection + validation
from typing import Annotated
from fastapi import Depends, Query

CurrentUser = Annotated[User, Depends(get_current_user)]
PaginationSkip = Annotated[int, Query(ge=0, le=10000)]
PaginationLimit = Annotated[int, Query(ge=1, le=100)]

@router.get("/")
async def list_users(
    user: CurrentUser,
    skip: PaginationSkip = 0,
    limit: PaginationLimit = 20,
) -> PaginatedResponse[UserResponse]:
    ...

Naming Conventions

ElementConventionExample
Modulessnake_case, short, singularuser_service.py, auth.py
Packagessnake_case, plural for domainsusers/, orders/
ClassesPascalCaseUserService, OrderRepository
Functions / Methodssnake_case, verb-firstget_user_by_id(), create_order()
ConstantsUPPER_SNAKE_CASEMAX_RETRIES, DEFAULT_PAGE_SIZE
Pydantic SchemasPascalCase + intent suffixUserCreate, UserResponse, UserUpdate
SQLAlchemy ModelsPascalCase, singular nounUser, Order, OrderItem
Route pathskebab-case, plural nouns/api/v1/order-items
Query paramssnake_case?page_size=20&sort_by=name
Env variablesUPPER_SNAKE_CASE, prefixedAPP_DATABASE_URL, APP_SECRET_KEY
Private attrsLeading underscore_cache, _validate_input()
Avoid abbreviations except universally understood ones (id, url, db, api). Write get_authenticated_user not get_auth_usr.

Imports & Modules

# Import order: stdlib → third-party → local (Ruff isort handles this)
from collections.abc import Sequence    # stdlib
from datetime import UTC, datetime

from fastapi import APIRouter, Depends   # third-party
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database import get_db    # local
from app.users.schemas import UserCreate, UserResponse
from app.users.service import UserService

Error Handling

Use custom exceptions that map to HTTP responses. Never return error dicts manually — let exception handlers do the work.

# app/exceptions.py
from fastapi import HTTPException, status


class AppException(HTTPException):
    """Base exception for all application errors."""
    def __init__(self, detail: str, status_code: int = status.HTTP_400_BAD_REQUEST) -> None:
        super().__init__(status_code=status_code, detail=detail)


class NotFoundError(AppException):
    def __init__(self, resource: str, identifier: str | int) -> None:
        super().__init__(
            detail=f"{resource} with id '{identifier}' not found",
            status_code=status.HTTP_404_NOT_FOUND,
        )


class ConflictError(AppException):
    def __init__(self, detail: str) -> None:
        super().__init__(detail=detail, status_code=status.HTTP_409_CONFLICT)


class ForbiddenError(AppException):
    def __init__(self, detail: str = "Insufficient permissions") -> None:
        super().__init__(detail=detail, status_code=status.HTTP_403_FORBIDDEN)
# Usage in service layer — raise, don't return
class UserService:
    async def get_by_id(self, user_id: int) -> User:
        user = await self.repo.get(user_id)
        if not user:
            raise NotFoundError("User", user_id)
        return user

    async def create(self, data: UserCreate) -> User:
        existing = await self.repo.get_by_email(data.email)
        if existing:
            raise ConflictError(f"Email '{data.email}' already registered")
        return await self.repo.create(data)
# Global exception handler for unhandled errors
# app/main.py
from fastapi import Request
from fastapi.responses import JSONResponse

@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse:
    logger.exception("Unhandled error on %s %s", request.method, request.url.path)
    return JSONResponse(
        status_code=500,
        content={"detail": "Internal server error"},
    )
🚨
Never expose stack traces to clients. The global handler logs the full exception internally but returns a generic message. Detailed tracebacks go to your logging system, not API responses.

Async / Await Patterns

FastAPI is async-first. Use async def for all route handlers and DB operations. Use sync def only for CPU-bound helpers that FastAPI will run in a thread pool.

# ✅ Async route — I/O bound (DB, HTTP calls)
@router.get("/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)) -> UserResponse:
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise NotFoundError("User", user_id)
    return UserResponse.model_validate(user)

# ✅ Sync route — CPU bound (FastAPI auto-threadpools this)
@router.post("/hash")
def compute_hash(payload: HashRequest) -> HashResponse:
    result = expensive_cpu_computation(payload.data)
    return HashResponse(hash=result)

# ✅ Running sync code inside async context
import anyio

async def process_upload(file_data: bytes) -> str:
    # Offload CPU-bound work to thread pool
    result = await anyio.to_thread.run_sync(parse_csv, file_data)
    return result
Never use blocking I/O inside async def. Calling time.sleep(), synchronous requests.get(), or blocking file reads inside an async handler will block the event loop. Use httpx.AsyncClient, aiofiles, or anyio.to_thread.run_sync().
# ✅ Parallel async operations
import asyncio

async def get_dashboard_data(user_id: int) -> DashboardResponse:
    profile, orders, notifications = await asyncio.gather(
        user_service.get_profile(user_id),
        order_service.get_recent(user_id, limit=5),
        notification_service.get_unread(user_id),
    )
    return DashboardResponse(
        profile=profile, orders=orders, notifications=notifications,
    )

App Factory & Lifespan

Use the app factory pattern with lifespan context manager for startup/shutdown. Never use deprecated @app.on_event.

# app/main.py
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import FastAPI

from app.config import settings
from app.core.database import engine
from app.middleware import setup_middleware
from app.auth.router import router as auth_router
from app.users.router import router as users_router


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    # ── Startup ──
    logger.info("Starting %s in %s mode", settings.app_name, settings.environment)
    # Initialize connection pools, caches, ML models, etc.
    yield
    # ── Shutdown ──
    await engine.dispose()
    logger.info("Shutdown complete")


def create_app() -> FastAPI:
    app = FastAPI(
        title=settings.app_name,
        version=settings.app_version,
        lifespan=lifespan,
        docs_url="/docs" if settings.environment != "production" else None,
        redoc_url=None,
    )

    setup_middleware(app)

    app.include_router(auth_router, prefix="/api/v1/auth", tags=["auth"])
    app.include_router(users_router, prefix="/api/v1/users", tags=["users"])

    return app


app = create_app()
Disable docs in production. Set docs_url=None and redoc_url=None in production to avoid exposing API schema endpoints publicly.

Router Organization

Each domain module has a single router.py. Use APIRouter with consistent patterns. Group by resource, version with URL prefix.

# app/users/router.py
from typing import Annotated

from fastapi import APIRouter, Depends, status

from app.auth.dependencies import get_current_user
from app.users.schemas import UserCreate, UserResponse, UserUpdate
from app.users.service import UserService

router = APIRouter()
CurrentUser = Annotated[User, Depends(get_current_user)]


@router.get("/", response_model=list[UserResponse])
async def list_users(
    service: Annotated[UserService, Depends()],
    skip: int = 0,
    limit: int = 20,
) -> list[UserResponse]:
    return await service.list(skip=skip, limit=limit)


@router.post("/", status_code=status.HTTP_201_CREATED, response_model=UserResponse)
async def create_user(
    data: UserCreate,
    service: Annotated[UserService, Depends()],
) -> UserResponse:
    return await service.create(data)


@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: int,
    service: Annotated[UserService, Depends()],
) -> UserResponse:
    return await service.get_by_id(user_id)


@router.patch("/{user_id}", response_model=UserResponse)
async def update_user(
    user_id: int,
    data: UserUpdate,
    current_user: CurrentUser,
    service: Annotated[UserService, Depends()],
) -> UserResponse:
    return await service.update(user_id, data, actor=current_user)


@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
    user_id: int,
    current_user: CurrentUser,
    service: Annotated[UserService, Depends()],
) -> None:
    await service.delete(user_id, actor=current_user)
💡
Return type hints match response_model. Always add both response_model on the decorator and a return type annotation on the function. They should agree. FastAPI uses response_model for serialization filtering.

Pydantic Schemas (v2)

Schemas define API contracts. Separate schemas for input (Create/Update) and output (Response). Never expose ORM models directly.

# app/users/schemas.py
from datetime import datetime
from pydantic import BaseModel, ConfigDict, EmailStr, Field


# ── Base (shared fields) ──
class UserBase(BaseModel):
    email: EmailStr
    full_name: str = Field(min_length=1, max_length=255)


# ── Input schemas ──
class UserCreate(UserBase):
    password: str = Field(min_length=8, max_length=128)


class UserUpdate(BaseModel):
    # All fields optional for PATCH
    full_name: str | None = Field(None, min_length=1, max_length=255)
    email: EmailStr | None = None


# ── Output schemas ──
class UserResponse(UserBase):
    model_config = ConfigDict(from_attributes=True)

    id: int
    is_active: bool
    created_at: datetime


# ── Generic paginated wrapper ──
from typing import Generic, TypeVar
from pydantic import BaseModel

T = TypeVar("T")

class PaginatedResponse(BaseModel, Generic[T]):
    items: list[T]
    total: int
    page: int
    page_size: int
    has_next: bool
Do
Use ConfigDict(from_attributes=True) for ORM integration.
Use Field() for validation constraints.
Use EmailStr for email validation.
Create model_config = ConfigDict(strict=True) for strict type coercion.
Don't
Don't use class Config: (Pydantic v1 syntax).
Don't expose password fields in response schemas.
Don't use orm_mode = True (v1 syntax).
Don't mix input and output in one schema.

Dependencies & Dependency Injection

FastAPI's DI system is your primary composition tool. Use it for database sessions, auth, services, and cross-cutting concerns.

# app/dependencies.py — shared dependencies
from typing import Annotated, AsyncIterator

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database import async_session_factory


async def get_db() -> AsyncIterator[AsyncSession]:
    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise


# Reusable annotated types
DbSession = Annotated[AsyncSession, Depends(get_db)]
# app/users/service.py — service with injected dependencies
from app.dependencies import DbSession
from app.users.repository import UserRepository


class UserService:
    def __init__(self, db: DbSession) -> None:
        self.repo = UserRepository(db)

    async def get_by_id(self, user_id: int) -> User:
        user = await self.repo.get(user_id)
        if not user:
            raise NotFoundError("User", user_id)
        return user
Request arrives
FastAPI resolves Depends()
DB session created
Service instantiated
Handler runs
Session commits/rollback

Middleware & CORS

# app/middleware.py
import time
import uuid

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.trustedhost import TrustedHostMiddleware

from app.config import settings


def setup_middleware(app: FastAPI) -> None:
    # Order matters — outermost first

    app.add_middleware(
        TrustedHostMiddleware,
        allowed_hosts=settings.allowed_hosts,
    )

    app.add_middleware(
        CORSMiddleware,
        allow_origins=settings.cors_origins,   # Never ["*"] in production
        allow_credentials=True,
        allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
        allow_headers=["Authorization", "Content-Type"],
    )

    @app.middleware("http")
    async def request_context_middleware(request: Request, call_next):
        request_id = uuid.uuid4().hex[:12]
        start = time.perf_counter()

        response = await call_next(request)

        duration_ms = (time.perf_counter() - start) * 1000
        response.headers["X-Request-ID"] = request_id
        response.headers["X-Process-Time-Ms"] = f"{duration_ms:.1f}"

        logger.info(
            "%s %s %s %.1fms",
            request.method, request.url.path,
            response.status_code, duration_ms,
            extra={"request_id": request_id},
        )
        return response
🚨
Never use allow_origins=["*"] with allow_credentials=True. This is a security vulnerability. Always explicitly list allowed origins in production.

Database & SQLAlchemy 2.0

Use SQLAlchemy 2.0 with async engine and the new Mapped type annotations. All DB access is async.

# app/core/database.py
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase, MappedAsDataclass

from app.config import settings

engine = create_async_engine(
    settings.database_url,
    echo=settings.db_echo,
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
)

async_session_factory = async_sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False,
)


class Base(DeclarativeBase):
    pass
# app/users/models.py — SQLAlchemy 2.0 Mapped style
from datetime import UTC, datetime
from sqlalchemy import String, func
from sqlalchemy.orm import Mapped, mapped_column

from app.core.database import Base


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(
        String(255), unique=True, index=True,
    )
    full_name: Mapped[str] = mapped_column(String(255))
    hashed_password: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(
        server_default=func.now(),
    )
    updated_at: Mapped[datetime | None] = mapped_column(
        onupdate=func.now(),
    )

Alembic Migrations

# Initialize Alembic with async template
alembic init -t async alembic

# Generate migration from model changes
alembic revision --autogenerate -m "add users table"

# Apply migrations
alembic upgrade head

# Rollback one step
alembic downgrade -1
# alembic/env.py — async setup
from app.core.database import Base
from app.config import settings

# Import all models so autogenerate detects them
from app.users.models import User  # noqa: F401
from app.orders.models import Order  # noqa: F401

target_metadata = Base.metadata
config.set_main_option("sqlalchemy.url", settings.database_url)

Repository Pattern

Encapsulate all database queries in repository classes. Services call repositories, never raw SQL or ORM queries.

# app/users/repository.py
from collections.abc import Sequence

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.users.models import User
from app.users.schemas import UserCreate


class UserRepository:
    def __init__(self, db: AsyncSession) -> None:
        self.db = db

    async def get(self, user_id: int) -> User | None:
        return await self.db.get(User, user_id)

    async def get_by_email(self, email: str) -> User | None:
        result = await self.db.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()

    async def list(
        self, *, skip: int = 0, limit: int = 20,
    ) -> Sequence[User]:
        result = await self.db.execute(
            select(User).offset(skip).limit(limit).order_by(User.id)
        )
        return result.scalars().all()

    async def create(self, data: UserCreate, hashed_pw: str) -> User:
        user = User(
            email=data.email,
            full_name=data.full_name,
            hashed_password=hashed_pw,
        )
        self.db.add(user)
        await self.db.flush()
        return user

    async def delete(self, user: User) -> None:
        await self.db.delete(user)

Authentication & Authorization

# app/core/security.py
from datetime import UTC, datetime, timedelta

from jose import JWTError, jwt
from passlib.context import CryptContext

from app.config import settings

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    return pwd_context.hash(password)


def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)


def create_access_token(
    subject: str,
    expires_delta: timedelta | None = None,
) -> str:
    expire = datetime.now(UTC) + (expires_delta or timedelta(minutes=30))
    payload = {"sub": subject, "exp": expire}
    return jwt.encode(payload, settings.secret_key, algorithm="HS256")
# app/auth/dependencies.py
from typing import Annotated

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt

from app.config import settings
from app.dependencies import DbSession
from app.users.models import User

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")


async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: DbSession,
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
        user_id: str | None = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = await db.get(User, int(user_id))
    if user is None:
        raise credentials_exception
    return user


# Role-based access control dependency
def require_role(*roles: str):
    async def _check(user: Annotated[User, Depends(get_current_user)]) -> User:
        if user.role not in roles:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        return user
    return _check


# Usage: AdminUser = Annotated[User, Depends(require_role("admin"))]

Security Hardening

🔒
Input Validation
All inputs validated through Pydantic schemas with Field() constraints.
Path params typed as int (never raw strings for IDs).
Use Query(ge=0, le=100) for bounds.
🛡️
SQL Injection Prevention
Always use SQLAlchemy ORM / Core queries — never raw string interpolation.
Use bindparam for dynamic queries.
🔑
Secrets
No secrets in code. Use pydantic-settings with .env files.
Rotate JWT keys via environment variables.
🚫
Rate Limiting
Use slowapi or reverse proxy (nginx/Traefik) rate limiting.
Limit auth endpoints more aggressively.
# Security headers middleware
@app.middleware("http")
async def security_headers(request: Request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    response.headers["Content-Security-Policy"] = "default-src 'self'"
    response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
    return response

Secrets & Configuration

# app/config.py — pydantic-settings
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_prefix="APP_",
        case_sensitive=False,
    )

    # App
    app_name: str = "My API"
    app_version: str = "1.0.0"
    environment: str = "development"  # development | staging | production
    debug: bool = False

    # Security
    secret_key: str
    access_token_expire_minutes: int = 30

    # Database
    database_url: str
    db_echo: bool = False

    # CORS
    cors_origins: list[str] = ["http://localhost:3000"]
    allowed_hosts: list[str] = ["*"]


settings = Settings()  # type: ignore[call-arg]
# .env.example (committed) — actual .env is gitignored
APP_SECRET_KEY=change-me-in-production
APP_DATABASE_URL=postgresql+asyncpg://user:pass@localhost:5432/mydb
APP_ENVIRONMENT=development
APP_CORS_ORIGINS=["http://localhost:3000"]
🚨
Never commit .env files. Add .env to .gitignore. Commit .env.example with placeholder values. In production, inject secrets via environment variables or a vault service.

Testing Strategy

Test pyramid: unit tests (services, utils) → integration tests (routes + DB) → e2e tests (full API). Target 80%+ coverage.

# tests/conftest.py
import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

from app.core.database import Base
from app.dependencies import get_db
from app.main import app

TEST_DB_URL = "sqlite+aiosqlite:///./test.db"

engine = create_async_engine(TEST_DB_URL)
TestSession = async_sessionmaker(engine, expire_on_commit=False)


@pytest.fixture(autouse=True)
async def setup_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)


@pytest.fixture
async def db():
    async with TestSession() as session:
        yield session


@pytest.fixture
async def client(db):
    async def _override_db():
        yield db

    app.dependency_overrides[get_db] = _override_db
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac
    app.dependency_overrides.clear()
# tests/test_users/test_router.py
import pytest
from httpx import AsyncClient


async def test_create_user(client: AsyncClient) -> None:
    response = await client.post("/api/v1/users/", json={
        "email": "test@example.com",
        "full_name": "Test User",
        "password": "secure-pass-123",
    })
    assert response.status_code == 201
    data = response.json()
    assert data["email"] == "test@example.com"
    assert "password" not in data  # Never in response


async def test_create_user_duplicate_email(client: AsyncClient) -> None:
    payload = {"email": "dup@example.com", "full_name": "A", "password": "12345678"}
    await client.post("/api/v1/users/", json=payload)
    response = await client.post("/api/v1/users/", json=payload)
    assert response.status_code == 409


async def test_get_user_not_found(client: AsyncClient) -> None:
    response = await client.get("/api/v1/users/99999")
    assert response.status_code == 404
# Run tests
uv run pytest                        # all tests
uv run pytest tests/test_users/ -v   # specific domain
uv run pytest --cov=app --cov-report=term-missing

Logging & Observability

# app/core/logging.py — structured JSON logging
import logging
import sys

import structlog

from app.config import settings


def setup_logging() -> None:
    log_level = logging.DEBUG if settings.debug else logging.INFO

    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            structlog.processors.JSONRenderer() if settings.environment == "production"
            else structlog.dev.ConsoleRenderer(),
        ],
        wrapper_class=structlog.stdlib.BoundLogger,
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

    logging.basicConfig(
        format="%(message)s",
        stream=sys.stdout,
        level=log_level,
    )

Performance & Caching

Connection Pooling
SQLAlchemy async engine with pool_size=20, max_overflow=10.
Use pool_pre_ping=True for stale connection detection.
Response Caching
Use Redis with fastapi-cache2 for endpoint caching.
Cache-Control headers for static/semi-static data.
Pagination
Always paginate list endpoints. Default limit=20, max limit=100.
Use cursor-based pagination for large datasets.
Background Tasks
Use BackgroundTasks for non-critical work (emails, logs).
Use Celery/ARQ for heavy async processing.
# Background tasks example
from fastapi import BackgroundTasks

@router.post("/", status_code=201)
async def create_user(
    data: UserCreate,
    background_tasks: BackgroundTasks,
    service: Annotated[UserService, Depends()],
) -> UserResponse:
    user = await service.create(data)
    background_tasks.add_task(send_welcome_email, user.email, user.full_name)
    return UserResponse.model_validate(user)
# N+1 query prevention — use joinedload / selectinload
from sqlalchemy.orm import selectinload

stmt = (
    select(Order)
    .options(selectinload(Order.items))
    .where(Order.user_id == user_id)
    .offset(skip).limit(limit)
)
result = await db.execute(stmt)

Deployment & Docker

# Dockerfile — multi-stage, non-root user
FROM python:3.12-slim AS base
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1

FROM base AS builder
WORKDIR /build
COPY pyproject.toml .
RUN pip install --no-cache-dir uv && \
    uv pip install --system --no-cache .

FROM base AS runtime
RUN groupadd -r appuser && useradd -r -g appuser appuser
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
COPY app/ app/
COPY alembic/ alembic/
COPY alembic.ini .

USER appuser
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# docker-compose.yml
services:
  api:
    build: .
    ports:
      - "8000:8000"
    env_file: .env
    depends_on:
      db:
        condition: service_healthy

  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: appuser
      POSTGRES_PASSWORD: ${DB_PASSWORD}
      POSTGRES_DB: mydb
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U appuser"]
      interval: 5s
      retries: 5

volumes:
  pgdata:
# Production launch command
uvicorn app.main:app \
  --host 0.0.0.0 \
  --port 8000 \
  --workers 4 \
  --loop uvloop \
  --http httptools \
  --access-log \
  --proxy-headers \
  --forwarded-allow-ips="*"
💡
Run Alembic migrations before app start in your deployment pipeline, not inside the app. Use an init container in Kubernetes or a pre-start script: alembic upgrade head && uvicorn ...

Reference Links

Official Documentation

Style Guides & Best Practices

Recommended Libraries