V1
Back to handbooks index
301
Python Expert
Master Level · 3.12
CPython Internals Type System AST & Bytecode Performance Security
Master Level · Production Grade · Python 3.12

PYTHON
301

// CPython Internals · Type System · Performance · Security · Packaging

The advanced knowledge that separates Python experts from developers who use Python. Covers CPython bytecode, the import system, AST manipulation, the full type system, performance profiling, C interop, exception groups, structured logging, property-based testing, cryptography, packaging, and Pythonic design patterns.

CPython Internals Full Type System AST & Bytecode Numba / Cython Hypothesis Testing ExceptionGroup
CPython Internals

Bytecode & dis

Python source code is compiled to bytecode — a compact representation executed by the CPython virtual machine (a stack-based interpreter). Understanding bytecode explains why certain Python patterns are faster than others, and how the interpreter works under the hood.

pythondis module — disassemble Python bytecode
import dis

def add(x, y):
    return x + y

# Disassemble to human-readable bytecode
dis.dis(add)
# Offset  Opname            Argument
#  0      RESUME            0
#  2      LOAD_FAST         0 (x)
#  4      LOAD_FAST         1 (y)
#  6      BINARY_OP         0 (+)
# 10      RETURN_VALUE

# Code object attributes
code = add.__code__
code.co_varnames    # ('x', 'y') — local variable names
code.co_consts      # (None,)    — compile-time constants
code.co_names       # ()         — global/attr names referenced
code.co_code        # bytes — raw bytecode
code.co_stacksize   # max stack depth needed
code.co_flags       # flags (generator, coroutine, etc.)
code.co_filename    # source file
code.co_firstlineno # line number of first line

# Compare two implementations
def slow_join(parts):
    result = ""
    for p in parts:
        result += p         # Creates new string each iteration!
    return result

def fast_join(parts):
    return "".join(parts)  # Single allocation

dis.dis(slow_join)  # Shows repeated BINARY_OP with store_fast
dis.dis(fast_join)  # Shows single LOAD_CONST + LOAD_FAST + CALL

# Bytecode comparison of list comprehension vs for loop
def loop_version(data):
    result = []
    for x in data:
        result.append(x * 2)
    return result

def comp_version(data):
    return [x * 2 for x in data]

# comp_version is faster: list comprehension uses LIST_APPEND opcode
# which is more efficient than repeatedly calling list.append()

# Get the .pyc cache file path
import importlib.util
spec = importlib.util.find_spec("json")
print(importlib.util.cache_from_source(spec.origin))
# ~/.../lib/python3.12/__pycache__/json.cpython-312.pyc
⚡ Key Optimization Insights from Bytecode

Local variables use LOAD_FAST (array index lookup — very fast). Global variables use LOAD_GLOBAL (dict lookup — slower). This is why caching a global as a local inside a hot loop speeds things up. Also why closures accessing enclosing scope are slightly slower than locals.

CPython Internals

Memory Model

CPython uses reference counting as its primary memory management strategy. Every object has a reference count. When it drops to zero, the object is deallocated immediately. Understanding this explains many Python performance characteristics and memory behaviors.

pythonsys, id(), object interning, tracemalloc
import sys, tracemalloc, gc

# ── Reference counting ─────────────────────────────────────────
x = []
sys.getrefcount(x)   # 2 (x itself + getrefcount's own ref)

y = x
sys.getrefcount(x)   # 3 (x, y, getrefcount)
del y
sys.getrefcount(x)   # 2

# ── Object identity and interning ─────────────────────────────
# Small integers (-5 to 256) are interned (cached singletons)
a = 100; b = 100
a is b   # True  — same object!

a = 1000; b = 1000
a is b   # False — different objects (outside cached range)
a == b   # True  — same value

# String interning — identifiers and small strings are interned
s1 = "hello"; s2 = "hello"
s1 is s2       # True  — interned

s1 = "hello world"; s2 = "hello world"
s1 is s2       # May be False — not guaranteed

# Force interning
s1 = sys.intern("hello world")
s2 = sys.intern("hello world")
s1 is s2  # True — now interned

# ── Object sizes ──────────────────────────────────────────────
sys.getsizeof([])              # 56 bytes — empty list
sys.getsizeof([1,2,3])         # 88 bytes — 3 element list
sys.getsizeof({})              # 232 bytes — empty dict
sys.getsizeof("hello")         # 54 bytes
# Note: getsizeof doesn't include referenced objects!

# Deep size (include referenced objects)
def deep_sizeof(obj, seen=None):
    from sys import getsizeof
    from collections.abc import Mapping, Sequence
    size = getsizeof(obj)
    if seen is None: seen = set()
    obj_id = id(obj)
    if obj_id in seen: return 0
    seen.add(obj_id)
    if isinstance(obj, Mapping):
        size += sum(deep_sizeof(k, seen) + deep_sizeof(v, seen) for k,v in obj.items())
    elif isinstance(obj, Sequence) and not isinstance(obj, (str, bytes)):
        size += sum(deep_sizeof(i, seen) for i in obj)
    return size

# ── tracemalloc — track memory allocations ────────────────────
tracemalloc.start()

# ... code to profile ...
big_list = [dict(i=i, v=i**2) for i in range(10000)]

snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics("lineno")

for stat in top_stats[:5]:
    print(stat)  # Shows file:line — size — count

tracemalloc.stop()
CPython Internals

Garbage Collector

pythongc module — cyclic reference handling
import gc

# Reference counting misses circular references!
class Node:
    def __init__(self, val):
        self.val = val
        self.next = None

a = Node(1)
b = Node(2)
a.next = b    # a → b
b.next = a    # b → a  — CYCLE! Neither refcount drops to 0

del a, b      # Objects still alive — cycle keeps refcounts at 1
gc.collect()  # Returns number of unreachable objects collected

# GC generations (0=young, 1=middle, 2=old)
gc.get_threshold()     # (700, 10, 10) — default collection thresholds
gc.set_threshold(1000, 15, 15)  # Tune for your workload
gc.get_count()         # Current (gen0, gen1, gen2) object counts
gc.get_objects()       # All objects tracked by GC (expensive!)

# Disable GC (safe if you avoid cycles — massive perf gain)
gc.disable()

# Get referrers (useful for leak hunting)
x = [1, 2, 3]
print(gc.get_referrers(x))  # Shows what holds references to x

# Make an object uncollectable (add __del__ to a cycle)
# In Python 3.4+, __del__ no longer prevents GC — safe to use
# __del__ in cycles. But avoid if you can.

# Profiling GC impact
import gc, time
gc.disable()
start = time.perf_counter()
# ... workload ...
gc.enable(); gc.collect()
print(f"GC overhead: {time.perf_counter()-start:.4f}s")
CPython Internals

Import System

Python's import system is fully customizable via hooks. Understanding sys.meta_path, finders, and loaders lets you intercept imports, load modules from databases, implement lazy loading, and create import-time transformations.

pythonimportlib, meta_path hooks, custom finders/loaders
import sys, importlib, importlib.util, importlib.abc

# ── Import mechanics ───────────────────────────────────────────
# When you do 'import foo', Python:
# 1. Checks sys.modules (cache — instant if found)
# 2. Calls each finder in sys.meta_path
# 3. Finder returns a loader
# 4. Loader executes module code
# 5. Module added to sys.modules

sys.meta_path         # [BuiltinImporter, FrozenImporter, PathFinder]
sys.modules["json"]   # Already-loaded modules (import cache)

# ── Manual import ─────────────────────────────────────────────
spec = importlib.util.spec_from_file_location("mymod", "/path/to/mod.py")
module = importlib.util.module_from_spec(spec)
sys.modules["mymod"] = module
spec.loader.exec_module(module)

# ── Reload a module (hot-reload pattern) ──────────────────────
importlib.reload(sys.modules["mymod"])

# ── Custom meta path finder ────────────────────────────────────
class DatabaseFinder:
    """Load modules stored in a database."""
    def find_spec(self, fullname, path, target=None):
        """Return a ModuleSpec if we can load this module, else None."""
        source = self._fetch_from_db(fullname)
        if source is None:
            return None
        loader = DatabaseLoader(fullname, source)
        return importlib.util.spec_from_loader(fullname, loader)

    def _fetch_from_db(self, name):
        db = {"dbmod": "MY_VALUE = 42\ndef hello(): return 'from db!'"}
        return db.get(name)

class DatabaseLoader(importlib.abc.Loader):
    def __init__(self, name, source): self.name, self.source = name, source
    def create_module(self, spec): return None  # Use default
    def exec_module(self, module):
        exec(compile(self.source, self.name, "exec"), module.__dict__)

sys.meta_path.insert(0, DatabaseFinder())
import dbmod            # Now works — loaded from dict!
print(dbmod.MY_VALUE)   # 42

# ── Lazy imports ──────────────────────────────────────────────
import importlib.util

class LazyModule:
    """Import only when first accessed — saves startup time."""
    def __init__(self, name):
        self._name = name
        self._module = None

    def __getattr__(self, attr):
        if self._module is None:
            self._module = importlib.import_module(self._name)
        return getattr(self._module, attr)

# Heavy module not imported until first use
numpy = LazyModule("numpy")
# At startup: no numpy import
# First use: numpy imported transparently
arr = numpy.array([1, 2, 3])  # Import happens HERE
CPython Internals

AST Module

The ast module gives you access to Python's Abstract Syntax Tree — the intermediate representation between source text and bytecode. You can parse code, inspect it, transform it, and generate new code.

pythonast.parse, NodeVisitor, NodeTransformer, code generation
import ast

source = """
def factorial(n):
    if n <= 1:
        return 1
    return n * factorial(n - 1)
"""

# Parse source to AST
tree = ast.parse(source)
print(ast.dump(tree, indent=2))   # Formatted tree

# ── NodeVisitor — walk and inspect the tree ───────────────────
class FunctionAnalyzer(ast.NodeVisitor):
    """Count calls, find undefined variables, detect recursion."""
    def __init__(self):
        self.functions = []
        self.calls = []

    def visit_FunctionDef(self, node):
        self.functions.append(node.name)
        self.generic_visit(node)  # Visit children

    def visit_Call(self, node):
        if isinstance(node.func, ast.Name):
            self.calls.append(node.func.id)
        self.generic_visit(node)

analyzer = FunctionAnalyzer()
analyzer.visit(tree)
print(analyzer.functions)  # ["factorial"]
print(analyzer.calls)      # ["factorial"]  — recursive call detected

# ── NodeTransformer — modify the AST ─────────────────────────
class ConstantFolder(ast.NodeTransformer):
    """Fold constant expressions at compile time."""
    def visit_BinOp(self, node):
        self.generic_visit(node)  # Transform children first
        if (isinstance(node.left, ast.Constant) and
                isinstance(node.right, ast.Constant) and
                isinstance(node.op, (ast.Add, ast.Mul, ast.Sub))):
            try:
                # Evaluate constant expression at parse time
                result = eval(compile(ast.Expression(node), "<folding>", "eval"))
                return ast.Constant(value=result)
            except: pass
        return node

source = "result = 2 * 3 * 7"
tree = ast.parse(source)
tree = ConstantFolder().visit(tree)
ast.fix_missing_locations(tree)  # Required after transformation
code = compile(tree, "<constant_folding>", "exec")
exec(code)
# result is now computed from ast.Constant(42) — no runtime multiply

# ── Practical: check for missing await ────────────────────────
class MissingAwaitDetector(ast.NodeVisitor):
    def visit_AsyncFunctionDef(self, node):
        for child in ast.walk(node):
            if isinstance(child, ast.Call):
                if isinstance(child.func, ast.Name):
                    # Heuristic: coroutine functions often end with _async/_coro
                    if child.func.id.endswith("_async"):
                        print(f"Possible missing await at line {child.lineno}")
        self.generic_visit(node)
CPython Internals

exec / eval / compile

pythonDynamic code execution — patterns and dangers
# ── eval — evaluate a single expression ──────────────────────
result = eval("2 ** 10")         # 1024
eval("x + y", {"x": 3, "y": 4})  # 7 — with custom namespace

# ── exec — execute statements ─────────────────────────────────
namespace = {}
exec("""
def greet(name):
    return f'Hello, {name}!'
""", namespace)
print(namespace["greet"]("World"))  # "Hello, World!"

# ── compile — compile to code object ─────────────────────────
code = compile("x + 1", "<string>", "eval")   # mode: eval/exec/single
eval(code, {"x": 41})   # 42 — reuse compiled code object, faster

# ── Restricted execution (sandbox) ───────────────────────────
def safe_eval(expr: str, extra_vars: dict = None):
    """Evaluate expression with restricted builtins."""
    # Restrict builtins to safe operations
    safe_builtins = {
        "abs": abs, "max": max, "min": min,
        "len": len, "sum": sum, "sorted": sorted,
        "int": int, "float": float, "str": str,
        "True": True, "False": False, "None": None,
    }
    globals_ = {"__builtins__": safe_builtins}
    if extra_vars: globals_.update(extra_vars)
    return eval(expr, globals_, {})

safe_eval("max(1, 2, 3)")        # 3 ✅
# safe_eval("__import__('os')")  # NameError ✅ — blocked

# ── Dynamic function generation from template ─────────────────
def make_validator(field_name: str, min_val: int, max_val: int):
    """Compile a tight validation function at runtime."""
    source = f"""
def validate_{field_name}(value):
    if not isinstance(value, (int, float)):
        raise TypeError(f'{field_name} must be numeric')
    if not ({min_val} <= value <= {max_val}):
        raise ValueError(f'{field_name} must be between {min_val} and {max_val}')
    return value
"""
    namespace = {}
    exec(compile(source, f"<validator_{field_name}>", "exec"), namespace)
    return namespace[f"validate_{field_name}"]

validate_age = make_validator("age", 0, 150)
validate_age(25)   # 25
validate_age(200)  # ValueError
Security Warning: Never eval() User Input

eval() and exec() with user-controlled input is a critical security vulnerability. Even "restricted" namespaces can be escaped via metaclass traversal ("__class__.__mro__[-1].__subclasses__()"). Use ast.literal_eval() for safe parsing of Python literals, or a proper expression parser library.

Type System Mastery

TypeVar , ParamSpec & TypeVarTuple

pythonGeneric programming with TypeVar, ParamSpec, TypeVarTuple
from typing import TypeVar, Generic, Callable, Concatenate
from typing import ParamSpec, TypeVarTuple, Unpack

# ── TypeVar with bounds and constraints ───────────────────────
T = TypeVar("T")                           # Unconstrained
N = TypeVar("N", int, float, complex)      # Constrained: only these 3
C = TypeVar("C", bound="Comparable")      # Bound: must be Comparable or subclass

def first(items: list[T]) -> T:
    return items[0]  # Return type matches input element type

def add(a: N, b: N) -> N:
    return a + b      # a and b must be same numeric type

# ── Generic classes ───────────────────────────────────────────
class Stack(Generic[T]):
    def __init__(self): self._items: list[T] = []
    def push(self, item: T) -> None: self._items.append(item)
    def pop(self) -> T: return self._items.pop()
    def peek(self) -> T | None: return self._items[-1] if self._items else None

stack: Stack[int] = Stack()
stack.push(1)      # mypy: OK
# stack.push("x")  # mypy: error — expected int

# ── ParamSpec — preserve callable signatures in decorators ────
P = ParamSpec("P")
R = TypeVar("R")

def timed(func: Callable[P, R]) -> Callable[P, R]:
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        import time
        start = time.perf_counter()
        result = func(*args, **kwargs)
        print(f"{func.__name__}: {time.perf_counter()-start:.4f}s")
        return result
    return wrapper

@timed
def connect(host: str, port: int, *, ssl: bool = False) -> str: ...
# mypy knows: connect(host="x", port=80) — fully typed through decorator!

# ── Concatenate — add extra args to callable ──────────────────
def with_logging(func: Callable[Concatenate[str, P], R]) -> Callable[P, R]:
    """Removes the first 'log_prefix' arg from the signature."""
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        return func("[LOG]", *args, **kwargs)
    return wrapper

# ── TypeVarTuple — variadic generics ──────────────────────────
Ts = TypeVarTuple("Ts")

def zip_typed(*iterables: Unpack[Ts]) -> list[tuple[Unpack[Ts]]]:
    return list(zip(*iterables))
Type System Mastery

Annotated & Literal

pythonAnnotated metadata, Literal types, Final, ClassVar, Self
from typing import Annotated, Literal, Final, ClassVar, Self, Never, LiteralString

# ── Annotated — attach metadata to type hints ────────────────
# Metadata is IGNORED by the type checker (it only uses the base type)
# Used by frameworks: pydantic, FastAPI, SQLAlchemy, etc.
class Gt:    """Greater-than constraint."""
    def __init__(self, val): self.val = val

class MaxLen: def __init__(self, n): self.n = n

PositiveInt = Annotated[int, Gt(0)]
ShortStr    = Annotated[str, MaxLen(50)]

def create_user(name: ShortStr, age: PositiveInt) -> dict:
    # Pydantic reads Annotated metadata to create validators!
    return {"name": name, "age": age}

# ── Literal — exact value types ──────────────────────────────
Direction   = Literal["north", "south", "east", "west"]
HTTPMethod  = Literal["GET", "POST", "PUT", "DELETE"]
StatusCode  = Literal[200, 201, 400, 401, 403, 404, 500]

def move(direction: Direction) -> None:
    print(f"Moving {direction}")

move("north")    # ✅ mypy OK
# move("up")   # ❌ mypy error: not a valid Direction

# ── Final — constant declarations ─────────────────────────────
MAX_CONNECTIONS: Final = 100
# MAX_CONNECTIONS = 200  # ❌ mypy error: cannot reassign a Final

class Config:
    TIMEOUT: Final[int] = 30

# ── Self — return type for methods that return self ───────────
class QueryBuilder:
    def where(self, condition: str) -> Self:   # Returns same type
        self._conditions.append(condition)
        return self  # Subclasses get correct return type automatically

    def limit(self, n: int) -> Self:
        self._limit = n
        return self

# ── Never — function that never returns ───────────────────────
def abort(msg: str) -> Never:
    raise SystemExit(msg)

# ── LiteralString — only string literals, prevents injection ──
def execute_sql(query: LiteralString) -> list:
    # mypy will error if query could be user-controlled string
    return db.execute(query)  # Safe — query is a literal

execute_sql("SELECT * FROM users")  # ✅
user_input = input()
# execute_sql(user_input)          # ❌ mypy: not a LiteralString
Type System Mastery

TypeGuard & Type Narrowing

pythonTypeGuard, TypeIs, assert_type, assert_never
from typing import TypeGuard, Union, assert_type, assert_never
from typing import TypeIs  # Python 3.13+

# ── TypeGuard — narrow type inside if blocks ──────────────────
def is_list_of_str(val: list) -> TypeGuard[list[str]]:
    """When this returns True, mypy knows val is list[str]."""
    return all(isinstance(x, str) for x in val)

def process(items: list[str | int]) -> None:
    if is_list_of_str(items):
        # mypy knows items is list[str] here
        for s in items:
            print(s.upper())  # ✅ — s is str

# ── assert_type — verify type at type-check time ──────────────
x: int | str = get_value()
if isinstance(x, int):
    assert_type(x, int)   # ✅ type narrowed to int

# ── assert_never — exhaustiveness checking ────────────────────
Shape = Literal["circle", "square", "triangle"]

def get_area(shape: Shape, size: float) -> float:
    if shape == "circle":
        return 3.14 * size ** 2
    elif shape == "square":
        return size ** 2
    elif shape == "triangle":
        return 0.5 * size ** 2
    else:
        assert_never(shape)  # mypy errors here if Shape has unhandled case!
        # If you add "pentagon" to Shape and forget this branch: compile error
Type System Mastery

@overload — Multiple Signatures

python@overload — typed function overloading
from typing import overload

# @overload lets you declare multiple type signatures for ONE function.
# The overload definitions are type-checker only — never called.
# The actual implementation uses Union types.

@overload
def process(data: str) -> str: ...

@overload
def process(data: int) -> int: ...

@overload
def process(data: list[str]) -> list[str]: ...

def process(data: str | int | list[str]) -> str | int | list[str]:
    """Actual implementation — handles all cases."""
    if isinstance(data, str):
        return data.upper()
    elif isinstance(data, int):
        return data * 2
    else:
        return [s.upper() for s in data]

# mypy infers correct return type from argument type:
result1 = process("hello")    # type: str
result2 = process(42)         # type: int
result3 = process(["a", "b"]) # type: list[str]

# ── Overload for methods with optional args ────────────────────
class Connection:
    @overload
    def get(self, key: str) -> str | None: ...

    @overload
    def get(self, key: str, default: T) -> str | T: ...

    def get(self, key, default=None):
        return self._data.get(key, default)
Type System Mastery

Runtime Type Checking

pythonget_type_hints, beartype, validate at runtime
import typing
from typing import get_type_hints

# ── get_type_hints — resolve annotations at runtime ───────────
def greet(name: str, age: int) -> str: ...

hints = get_type_hints(greet)
# {"name": str, "age": int, "return": str}

# Resolves forward references (strings like "MyClass")
class Node:
    children: list["Node"]   # Forward ref

get_type_hints(Node)  # Resolves "Node" to actual Node class

# ── Runtime type enforcement (beartype library) ────────────────
# pip install beartype
from beartype import beartype

@beartype
def sqrt(n: float | int) -> float:
    return n ** 0.5

sqrt(4)         # 2.0 ✅
sqrt("4")       # BeartypeException at runtime ✅

# Beartype is O(1) — checks only the outermost type, not deep recursion
# Much faster than pydantic for runtime checking of function signatures

# ── Roll your own runtime validator using annotations ──────────
import functools, inspect

def enforce_types(func):
    hints = get_type_hints(func)
    params = list(inspect.signature(func).parameters.keys())

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        for param, value in zip(params, args):
            expected = hints.get(param)
            if expected and not isinstance(value, expected):
                raise TypeError(
                    f"{param}: expected {expected.__name__}, got {type(value).__name__}")
        return func(*args, **kwargs)
    return wrapper
Advanced Async

Async Generators

pythonasync def + yield — async generators
import asyncio

# ── Async generator — yields values asynchronously ───────────
async def stream_db_rows(query: str, batch_size: int = 100):
    """Yield database rows in batches without loading all into memory."""
    offset = 0
    while True:
        batch = await db.execute(f"{query} LIMIT {batch_size} OFFSET {offset}")
        if not batch: break
        for row in batch:
            yield row   # async generator yield
        offset += batch_size

# Consume with async for
async def process_all():
    async for row in stream_db_rows("SELECT * FROM events"):
        await process_row(row)

# ── Async generator with aclose and athrow ────────────────────
async def tick(interval: float):
    while True:
        yield asyncio.get_event_loop().time()
        await asyncio.sleep(interval)

async def main():
    gen = tick(1.0)
    for _ in range(3):
        ts = await gen.__anext__()
        print(f"tick: {ts:.2f}")
    await gen.aclose()   # Properly close the generator

# ── Async comprehensions ───────────────────────────────────────
async def get_user_names():
    return [user.name async for user in stream_db_rows("SELECT name FROM users")]

# async generator expression
names_gen = (row["name"] async for row in stream_db_rows("SELECT name FROM users"))
# Lazy — processes one row at a time
Advanced Async

contextvars — Async-Safe Context

contextvars provides context-local storage that works correctly across async tasks — unlike threading.local() which doesn't work with asyncio. Perfect for request IDs, user sessions, and correlation IDs in async web applications.

pythonContextVar — per-task context in async code
from contextvars import ContextVar, copy_context
import asyncio

# ── Declare context variables ─────────────────────────────────
request_id: ContextVar[str] = ContextVar("request_id", default="unknown")
current_user: ContextVar[str | None] = ContextVar("current_user", default=None)

# ── Middleware sets context for each request ──────────────────
async def request_middleware(request, handler):
    # Each request gets its OWN context — no cross-contamination!
    token_rid = request_id.set(str(uuid.uuid4()))
    token_usr = current_user.set(request.user)
    try:
        return await handler(request)
    finally:
        request_id.reset(token_rid)   # Restore previous value
        current_user.reset(token_usr)

# ── Access context from anywhere in the call stack ────────────
async def log_event(event: str):
    rid = request_id.get()
    user = current_user.get()
    print(f"[{rid}] user={user} event={event}")

async def process_payment(amount: float):
    await log_event(f"payment:{amount}")  # Gets correct request_id automatically!

# ── copy_context — snapshot current context for task isolation ──
async def spawn_background_task():
    ctx = copy_context()   # Snapshot the current context
    asyncio.create_task(ctx.run(background_work))
    # background_work inherits our request_id, but changes won't affect us

# ── ContextVar + Structured Logging ───────────────────────────
import logging, contextvars

log_context: ContextVar[dict] = ContextVar("log_context", default={})

class ContextFilter(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        ctx = log_context.get()
        for key, value in ctx.items():
            setattr(record, key, value)
        return True
Advanced Async

anyio & Trio

pythonanyio — write async code that runs on asyncio or Trio
# pip install anyio[trio]
import anyio

# ── anyio — backend-agnostic async primitives ─────────────────
async def fetch_all(urls: list[str]) -> list[str]:
    results = []
    async with anyio.create_task_group() as tg:
        async def fetch_one(url: str):
            async with anyio.open_url(url) as resp:
                results.append(await resp.text())
        for url in urls:
            tg.start_soon(fetch_one, url)  # Like asyncio.create_task
    return results

# Run on asyncio (default)
anyio.run(fetch_all, ["https://example.com"])

# Run on Trio
anyio.run(fetch_all, ["https://example.com"], backend="trio")

# ── anyio synchronization primitives ─────────────────────────
async def with_primitives():
    lock      = anyio.Lock()
    event     = anyio.Event()
    semaphore = anyio.Semaphore(10)
    queue     = anyio.create_memory_object_stream()

    async with lock:
        # Exclusive access
        pass

    async with semaphore:
        # Max 10 concurrent
        pass

# ── Cancel scopes — explicit cancellation ────────────────────
async def with_timeout():
    try:
        with anyio.fail_after(5.0):   # Raises TimeoutError after 5s
            result = await long_operation()
    except TimeoutError:
        print("Timed out")

    with anyio.move_on_after(5.0) as scope:  # Returns, no exception
        result = await long_operation()
        if scope.cancelled_caught:
            print("Operation timed out silently")
Performance

Profiling

pythoncProfile, line_profiler, memory_profiler, py-spy
import cProfile, pstats, io

# ── cProfile — function-level profiling ──────────────────────
def profile(func):
    """Decorator that profiles a function call."""
    def wrapper(*args, **kwargs):
        pr = cProfile.Profile()
        pr.enable()
        result = func(*args, **kwargs)
        pr.disable()

        stream = io.StringIO()
        ps = pstats.Stats(pr, stream=stream)
        ps.sort_stats(pstats.SortKey.CUMULATIVE)
        ps.print_stats(20)   # Top 20 functions
        print(stream.getvalue())
        return result
    return wrapper

# CLI: python -m cProfile -s cumulative my_script.py
# Visualize with: snakeviz (pip install snakeviz)
# snakeviz profile.prof

# ── line_profiler — line-by-line profiling ────────────────────
# pip install line_profiler
# @profile  ← decorator injected by kernprof command
# def slow_function():
#     data = list(range(1_000_000))  # Line X  time ≈ 100ms
#     total = sum(data)              # Line X  time ≈ 50ms
# 
# Run: kernprof -l -v my_script.py

# ── memory_profiler — memory usage per line ───────────────────
# pip install memory_profiler
# @profile  ← decorator injected by mprof
# def build_index():
#     index = {}
#     ...
# Run: python -m memory_profiler my_script.py
# Or: mprof run script.py && mprof plot

# ── py-spy — sampling profiler, no code changes ───────────────
# pip install py-spy
# py-spy top --pid $PID               # Live view (like top for Python)
# py-spy record -o profile.svg -- python script.py  # Flame graph!

# ── timeit — micro-benchmarks ─────────────────────────────────
import timeit

# Best of 5 runs, 100000 iterations
time_loop = timeit.timeit(
    stmt="sum(x**2 for x in range(1000))",
    number=10000
)
time_np = timeit.timeit(
    stmt="(arr**2).sum()",
    setup="import numpy as np; arr = np.arange(1000)",
    number=10000
)
print(f"Loop: {time_loop:.3f}s  NumPy: {time_np:.3f}s  Ratio: {time_loop/time_np:.1f}x")
Performance

Optimization Techniques

pythonPractical optimization patterns
import sys, dis
from functools import lru_cache

# ── 1. Cache global lookups as locals in hot loops ────────────
# GLOBAL lookup: 2 dict lookups (builtins + module globals)
# LOCAL lookup:  1 array index (LOAD_FAST)
def slow(data):
    result = []
    for x in data:
        result.append(math.sqrt(x))  # math lookup every iteration

def fast(data):
    sqrt = math.sqrt         # Cache as local — one lookup at function entry
    result_append = [].append  # Cache method too
    result = []
    for x in data:
        result_append(sqrt(x))   # LOAD_FAST everywhere — 2-3x faster in tight loops

# ── 2. Use join() for string building ────────────────────────
def build_str_slow(parts: list[str]) -> str:
    result = ""
    for p in parts:
        result += p   # O(n²) — new allocation each time
    return result

def build_str_fast(parts: list[str]) -> str:
    return "".join(parts)   # O(n) — one allocation

# ── 3. __slots__ for millions of small objects ────────────────
# See Python 201 — 60% memory reduction per instance

# ── 4. Avoid attribute lookup in hot loops ────────────────────
class Particle:
    __slots__ = ("x", "y", "vx", "vy")

def update_slow(particles):
    for p in particles:
        p.x += p.vx   # Attribute lookup each time
        p.y += p.vy

def update_fast(particles):
    for p in particles:
        x, y, vx, vy = p.x, p.y, p.vx, p.vy   # Load to locals
        p.x, p.y = x + vx, y + vy

# ── 5. Use sets for membership, not lists ─────────────────────
# O(1) average vs O(n)
bad_words_list = ["spam", "junk", ...]  # O(n) lookup
bad_words_set  = {"spam", "junk", ...}  # O(1) lookup

# ── 6. Generator expressions over list comprehensions ─────────
# When you only iterate once and don't need random access
total = sum(x**2 for x in data)       # Generator: O(1) memory
# vs sum([x**2 for x in data])         # List: O(n) memory

# ── 7. Use deque for queues ──────────────────────────────────
from collections import deque
queue = deque()       # O(1) append/pop from both ends
# list.pop(0) is O(n) — never use a list as a queue!
Performance

NumPy Vectorization

pythonBroadcasting, fancy indexing, views vs copies, ufuncs
import numpy as np

# ── Vectorization — eliminate Python loops ────────────────────
data = np.random.rand(1_000_000)

# Pure Python: ~500ms
def normalize_slow(data):
    mn, mx = min(data), max(data)
    return [(x - mn) / (mx - mn) for x in data]

# NumPy vectorized: ~2ms (250x faster)
def normalize_fast(data: np.ndarray) -> np.ndarray:
    return (data - data.min()) / (data.max() - data.min())

# ── Broadcasting — operations between different shapes ────────
matrix = np.zeros((3, 4))         # (3, 4)
row    = np.array([1, 2, 3, 4])    # (4,)
col    = np.array([[1], [2], [3]]) # (3, 1)

matrix + row   # (3,4) — row broadcast to each row
matrix + col   # (3,4) — col broadcast to each column
row + col      # (3,4) — outer sum!

# ── Views vs Copies — CRITICAL for performance ────────────────
a = np.arange(10)
b = a[2:8]        # VIEW — shares memory, no copy!
b[0] = 99          # Modifies a too!

c = a[2:8].copy()  # COPY — independent array
a.flags.owndata    # True — owns its data
b.base is a        # True — b is a view of a

# ── Fancy indexing ────────────────────────────────────────────
data = np.array([10, 20, 30, 40, 50])
idx  = np.array([0, 2, 4])
data[idx]                    # [10, 30, 50] — index by array

mask = data > 25             # Boolean mask: [F,F,T,T,T]
data[mask]                   # [30, 40, 50] — boolean indexing
data[data > 25] = 0          # In-place conditional assignment

# ── Vectorized string operations ──────────────────────────────
names = np.array(["Alice", "Bob", "Charlie"])
np.char.upper(names)          # ["ALICE", "BOB", "CHARLIE"]
np.char.startswith(names, "A") # [True, False, False]

# ── np.where — vectorized conditional ────────────────────────
grades = np.array([45, 72, 89, 56, 91])
results = np.where(grades >= 60, "Pass", "Fail")
# ["Fail", "Pass", "Pass", "Fail", "Pass"]
Performance

Numba JIT Compilation

pythonnumba @jit, @njit, parallel, CUDA — JIT compile to machine code
# pip install numba
from numba import jit, njit, prange, cuda
import numpy as np

# ── @njit — compile to machine code, no Python fallback ───────
@njit
def fast_sum(arr: np.ndarray) -> float:
    """First call: ~300ms compile. Subsequent calls: ~1μs."""
    total = 0.0
    for i in range(len(arr)):
        total += arr[i]
    return total

arr = np.random.rand(10_000_000)
fast_sum(arr)   # First call compiles JIT
fast_sum(arr)   # Subsequent calls: C speed!

# ── Ahead of time compilation ─────────────────────────────────
@njit(cache=True)   # Cache compiled code to disk — skip recompile
def cached_jit_func(x): ...

# ── Parallel loops ────────────────────────────────────────────
@njit(parallel=True)
def parallel_sum(arr: np.ndarray) -> float:
    total = 0.0
    for i in prange(len(arr)):   # prange = parallel range
        total += arr[i]
    return total

# ── GPU kernels with numba.cuda ───────────────────────────────
@cuda.jit
def gpu_add(a, b, result):
    i = cuda.grid(1)   # Get thread index
    if i < result.size:
        result[i] = a[i] + b[i]

# Launch on GPU
threads_per_block = 256
blocks = (len(arr) + threads_per_block - 1) // threads_per_block
gpu_add[blocks, threads_per_block](a_gpu, b_gpu, result_gpu)

# ── When to use Numba vs NumPy ───────────────────────────────
# NumPy: vectorized ops, built-in functions, array manipulation
# Numba: custom loops that can't be vectorized, numerical algorithms,
#         physics simulations, anything with complex loop-level logic
Performance

Cython & Mypyc

pythonCython .pyx + mypyc compilation
# ── Cython — annotated Python compiled to C ──────────────────
# File: fast_math.pyx

# Pure Python (slow):
def sum_squares_py(n):
    total = 0
    for i in range(n):
        total += i * i
    return total

# Cython (fast_math.pyx — ~50-100x faster):
# def sum_squares_cy(int n):                # C int, not Python int
#     cdef long long total = 0              # C variable, no boxing
#     cdef int i                            # C loop variable
#     for i in range(n):
#         total += i * i
#     return total

# Build (setup.py):
# from setuptools import setup
# from Cython.Build import cythonize
# setup(ext_modules=cythonize("fast_math.pyx", language_level=3))
# python setup.py build_ext --inplace

# ── mypyc — compile typed Python to C extension ──────────────
# pip install mypy
# mypyc fast_math.py     ← compiles to .so / .pyd

# Your mypy-typed Python becomes a compiled extension:
def sum_squares(n: int) -> int:   # Types used by mypyc for optimization
    total = 0
    for i in range(n):
        total += i * i
    return total
# Compiled: 2-4x speedup on typical code, up to 10x on tight loops
# No source changes needed — just compile!
# Used by: Black formatter (6x faster after mypyc compilation)
C Interop

ctypes — Call C Libraries

pythonctypes — call C without writing C
import ctypes
from ctypes import c_int, c_double, c_char_p, POINTER, Structure

# ── Load a shared library ─────────────────────────────────────
libc = ctypes.CDLL("libc.so.6")         # Linux
# libc = ctypes.CDLL("libc.dylib")      # macOS
# libc = ctypes.CDLL("msvcrt.dll")      # Windows

# ── Call C functions ──────────────────────────────────────────
libc.printf.argtypes = [c_char_p]
libc.printf.restype  = c_int
libc.printf(b"Hello from C!\n")

# ── Type mapping: Python → C ──────────────────────────────────
# c_int         → int
# c_double      → double
# c_char_p      → char* (bytes)
# c_void_p      → void*
# POINTER(T)    → T*
# ctypes.c_bool → _Bool

# ── Call custom .so library ───────────────────────────────────
libmath = ctypes.CDLL("./libmath.so")

libmath.fast_sqrt.argtypes = [c_double]
libmath.fast_sqrt.restype  = c_double

result = libmath.fast_sqrt(ctypes.c_double(16.0))

# ── C structures ─────────────────────────────────────────────
class Point(Structure):
    _fields_ = [("x", c_double), ("y", c_double)]

libmath.distance.argtypes = [POINTER(Point), POINTER(Point)]
libmath.distance.restype  = c_double

p1 = Point(0.0, 0.0)
p2 = Point(3.0, 4.0)
dist = libmath.distance(ctypes.byref(p1), ctypes.byref(p2))  # 5.0

# ── Pass numpy arrays to C ────────────────────────────────────
import numpy as np
arr = np.zeros(1000, dtype=np.float64)
libmath.process_array(
    arr.ctypes.data_as(POINTER(c_double)),
    ctypes.c_int(len(arr))
)
Exception Mastery

ExceptionGroup (Python 3.11+)

ExceptionGroup allows a single raise to contain multiple exceptions simultaneously. This is essential for concurrent code (e.g., multiple async tasks all failing at once) and for comprehensive validation (collect all errors, not just the first).

pythonExceptionGroup, except*, BaseExceptionGroup
# ── Creating ExceptionGroups ──────────────────────────────────
errors = ExceptionGroup("multiple validation errors", [
    ValueError("name is required"),
    TypeError("age must be an integer"),
    ValueError("email format invalid"),
])
raise errors

# ── except* — handle multiple exceptions in groups ────────────
try:
    raise ExceptionGroup("task errors", [
        ValueError("bad value"),
        TypeError("bad type"),
        RuntimeError("runtime error"),
    ])
except* ValueError as eg:
    # eg.exceptions = (ValueError("bad value"),)
    print(f"ValueError(s): {eg.exceptions}")
except* TypeError as eg:
    print(f"TypeError(s): {eg.exceptions}")
# RuntimeError is NOT caught — re-raised as ExceptionGroup(RuntimeError)

# ── Real world: async task failure collection ─────────────────
async def run_tasks(tasks):
    errors = []
    results = []

    async with asyncio.TaskGroup() as tg:
        for task in tasks:
            tg.create_task(task())
    # TaskGroup automatically raises ExceptionGroup if any task fails!

try:
    await run_tasks(my_tasks)
except* ConnectionError as eg:
    for exc in eg.exceptions:
        print(f"Connection failed: {exc}")
except* TimeoutError as eg:
    print(f"{len(eg.exceptions)} tasks timed out")

# ── Validation example — collect ALL errors ───────────────────
def validate_form(data: dict) -> None:
    exceptions = []

    if not data.get("name"):
        exceptions.append(ValueError("name is required"))
    if not isinstance(data.get("age"), int):
        exceptions.append(TypeError("age must be int"))
    if "@" not in data.get("email", ""):
        exceptions.append(ValueError("invalid email"))

    if exceptions:
        raise ExceptionGroup("Validation failed", exceptions)
Exception Mastery

traceback Module

pythonException chaining, traceback, __tracebackhide__
import traceback, sys

# ── Exception chaining ────────────────────────────────────────
try:
    int("not a number")
except ValueError as e:
    raise RuntimeError("Failed to parse config") from e
# Shows: ValueError, then "The above exception was the direct cause of..."

try:
    int("not a number")
except ValueError:
    raise RuntimeError("Failed to parse config") from None
# 'from None' suppresses original exception context

# ── Working with tracebacks ────────────────────────────────────
try:
    1 / 0
except Exception:
    tb = sys.exc_info()[2]                    # Traceback object
    traceback.print_exc()                      # Print to stderr
    text = traceback.format_exc()              # Get as string
    frames = traceback.extract_tb(tb)          # StackSummary
    for frame in frames:
        print(f"{frame.filename}:{frame.lineno} in {frame.name}")
        print(f"  {frame.line}")

# ── __tracebackhide__ — hide internal frames from pytest ───────
def assert_positive(value: int):
    __tracebackhide__ = True   # pytest hides this frame in output
    if value <= 0:
        raise AssertionError(f"Expected positive, got {value}")

# ── Rich tracebacks (pip install rich) ────────────────────────
from rich.traceback import install
install(show_locals=True)  # Beautiful syntax-highlighted tracebacks
Observability

Logging Architecture

pythonLogging hierarchy, handlers, filters, formatters
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler

# ── Logger hierarchy ──────────────────────────────────────────
# root logger → myapp → myapp.services → myapp.services.db
# Child loggers propagate to parents by default

logger = logging.getLogger("myapp.services.db")

# ── Production logging config ─────────────────────────────────
def configure_logging(level: str = "INFO", json_output: bool = False):
    root = logging.getLogger()
    root.setLevel(level)

    # Console handler
    console = logging.StreamHandler()
    console.setLevel(level)

    if json_output:
        # JSON structured logs for production
        import json
        class JSONFormatter(logging.Formatter):
            def format(self, record: logging.LogRecord) -> str:
                log = {
                    "ts":  self.formatTime(record),
                    "lvl": record.levelname,
                    "msg": record.getMessage(),
                    "mod": record.module,
                    "exc": self.formatException(record.exc_info) if record.exc_info else None,
                }
                # Include extra fields from LogRecord
                for key in ("request_id", "user_id", "duration_ms"):
                    if hasattr(record, key):
                        log[key] = getattr(record, key)
                return json.dumps({k: v for k, v in log.items() if v is not None})
        console.setFormatter(JSONFormatter())
    else:
        fmt = logging.Formatter("%(asctime)s %(name)-20s %(levelname)-8s %(message)s")
        console.setFormatter(fmt)

    # File handler with rotation
    file_handler = RotatingFileHandler(
        "app.log", maxBytes=10*1024*1024, backupCount=5
    )

    root.addHandler(console)
    root.addHandler(file_handler)

# ── Contextual logging with extra fields ──────────────────────
logger.info("User login", extra={"user_id": 42, "duration_ms": 120})

# ── Log adapter for per-request context ───────────────────────
class RequestAdapter(logging.LoggerAdapter):
    def process(self, msg, kwargs):
        ctx = {"request_id": request_id.get(), "user": current_user.get()}
        kwargs.setdefault("extra", {}).update(ctx)
        return msg, kwargs

log = RequestAdapter(logger, {})
log.info("Processing order")  # Includes request_id and user automatically
Observability

structlog — Structured Logging

pythonstructlog — production structured logging
# pip install structlog
import structlog

# ── Configure structlog ──────────────────────────────────────
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,    # Add context vars
        structlog.processors.add_log_level,          # Add level field
        structlog.processors.TimeStamper(fmt="iso"), # ISO timestamp
        structlog.dev.ConsoleRenderer()              # Pretty in dev
        # structlog.processors.JSONRenderer()          # JSON in prod
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG),
    context_class=dict,
    logger_factory=structlog.PrintLoggerFactory(),
)

log = structlog.get_logger()

# ── Bind context to logger ────────────────────────────────────
request_log = log.bind(request_id="abc-123", user_id=42)
request_log.info("Request started", method="POST", path="/api/orders")
# Output: {"event": "Request started", "request_id": "abc-123",
#          "user_id": 42, "method": "POST", "path": "/api/orders",
#          "level": "info", "timestamp": "2025-01-01T..."}

# ── contextvars integration (async-safe) ──────────────────────
structlog.contextvars.bind_contextvars(request_id="req-456")
# All subsequent log calls in this async context include request_id!
log.info("Processing")  # Automatically includes request_id
structlog.contextvars.unbind_contextvars("request_id")
Observability

OpenTelemetry — Distributed Tracing

pythonOpenTelemetry spans, metrics, traces
# pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# ── Setup tracing ─────────────────────────────────────────────
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://jaeger:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("myapp", "1.0.0")

# ── Create spans ─────────────────────────────────────────────
async def process_order(order_id: str):
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)
        span.set_attribute("service.name", "order-service")

        with tracer.start_as_current_span("validate_payment") as child:
            result = await validate_payment(order_id)
            child.set_attribute("payment.valid", result)

        with tracer.start_as_current_span("save_to_db"):
            await save_order(order_id)

# ── Automatic instrumentation ─────────────────────────────────
# pip install opentelemetry-instrumentation-fastapi
# pip install opentelemetry-instrumentation-sqlalchemy
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

FastAPIInstrumentor.instrument_app(app)   # Auto-traces all routes
SQLAlchemyInstrumentor().instrument()     # Auto-traces DB queries
Pattern Matching

Advanced match/case

pythonClass patterns, mapping patterns, sequence patterns, guards
from dataclasses import dataclass

@dataclass
class Point:
    x: float
    y: float

@dataclass
class Circle:
    center: Point
    radius: float

@dataclass
class Rectangle:
    top_left: Point
    width: float
    height: float

# ── Class patterns — match against dataclass fields ───────────
def describe_shape(shape) -> str:
    match shape:
        case Circle(center=Point(x=0, y=0), radius=r):
            return f"Circle of radius {r} centered at origin"
        case Circle(center=Point(x=cx, y=cy), radius=r):
            return f"Circle of radius {r} at ({cx},{cy})"
        case Rectangle(width=w, height=h) if w == h:
            return f"Square of side {w}"         # guard clause
        case Rectangle(width=w, height=h):
            return f"Rectangle {w}×{h}"
        case _:
            return "Unknown shape"

# ── Sequence patterns ─────────────────────────────────────────
def process_command(tokens: list[str]) -> str:
    match tokens:
        case []:
            return "empty"
        case ["quit"] | ["exit"]:
            return "exit"
        case ["get", resource]:
            return f"GET {resource}"
        case ["set", key, value]:
            return f"SET {key}={value}"
        case ["set", *_]:
            return "set requires key and value"
        case [cmd, *args]:
            return f"Unknown command: {cmd} with {args}"

# ── Mapping patterns ──────────────────────────────────────────
def handle_event(event: dict) -> None:
    match event:
        case {"type": "click", "x": x, "y": y}:
            print(f"Click at ({x}, {y})")
        case {"type": "keydown", "key": ("Ctrl" | "Meta") as mod, "code": code}:
            print(f"{mod}+{code} pressed")
        case {"type": event_type, **rest}:   # ** captures remaining keys
            print(f"Other event: {event_type}, data={rest}")

# ── Or patterns + guards ─────────────────────────────────────
match status_code:
    case 200 | 201 | 204:
        print("Success")
    case code if 400 <= code < 500:
        print(f"Client error: {code}")
    case code if 500 <= code < 600:
        print(f"Server error: {code}")
Testing Expert

Hypothesis — Property-Based Testing

Instead of writing specific test cases, Hypothesis generates hundreds of random inputs to find edge cases you didn't think of. It then shrinks failing cases to the minimal reproducing example.

pythonhypothesis — generate test cases automatically
# pip install hypothesis
from hypothesis import given, settings, assume, example
from hypothesis import strategies as st

# ── Basic property test ───────────────────────────────────────
@given(st.lists(st.integers()))
def test_sort_preserves_length(lst):
    """Sort should never change the list length."""
    assert len(sorted(lst)) == len(lst)

# ── Multiple strategies ───────────────────────────────────────
@given(
    st.text(min_size=1),
    st.integers(min_value=1, max_value=100)
)
def test_string_repeat(s, n):
    result = s * n
    assert len(result) == len(s) * n

# ── assume() — filter invalid inputs ─────────────────────────
@given(st.floats(), st.floats())
def test_add_commutative(a, b):
    assume(not (math.isnan(a) or math.isnan(b) or math.isinf(a) or math.isinf(b)))
    assert a + b == b + a

# ── Custom strategies ─────────────────────────────────────────
valid_email = st.from_regex(r"[a-z]+@[a-z]+\.[a-z]{2,}", fullmatch=True)
positive_int = st.integers(min_value=1)

# Composite strategy
@st.composite
def user_data(draw):
    name = draw(st.text(alphabet=st.characters(whitelist_categories=("Lu", "Ll")), min_size=2))
    age  = draw(st.integers(min_value=18, max_value=120))
    email = draw(valid_email)
    return {"name": name, "age": age, "email": email}

@given(user_data())
def test_user_creation(user):
    created = create_user(**user)
    assert created["name"] == user["name"]

# ── Force-run specific examples (regression tests) ───────────
@given(st.integers())
@example(0)        # Always test this case
@example(-1)       # Always test this edge case
def test_absolute(n):
    assert abs(n) >= 0

# ── Settings — tune how many examples ────────────────────────
@settings(max_examples=1000, deadline=5000)
@given(st.text())
def test_intensive(s): ...
Testing Expert

pytest Advanced

pythonFixture scopes, parametrize, markers, conftest, plugins
import pytest
from typing import Generator

# ── Fixture scopes ────────────────────────────────────────────
@pytest.fixture(scope="session")   # Created once for entire test session
def db_engine():
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    yield engine
    engine.dispose()

@pytest.fixture(scope="function")  # Default — new instance per test
def db_session(db_engine) -> Generator:
    conn = db_engine.connect()
    trans = conn.begin()
    session = Session(bind=conn)
    yield session
    session.close()
    trans.rollback()                 # Rollback after each test — DB clean
    conn.close()

# ── autouse fixtures ──────────────────────────────────────────
@pytest.fixture(autouse=True)      # Applied to ALL tests automatically
def clear_cache():
    cache.clear()
    yield

# ── parametrize with IDs ──────────────────────────────────────
@pytest.mark.parametrize(
    "input, expected",
    [
        pytest.param("",      None,      id="empty-string"),
        pytest.param("abc",   "abc",     id="simple-string"),
        pytest.param("  x  ", "x",       id="with-whitespace"),
        pytest.param(None,    None,      id="none-input",
                     marks=pytest.mark.xfail(reason="not yet implemented")),
    ]
)
def test_clean_string(input, expected):
    assert clean(input) == expected

# ── Custom markers ────────────────────────────────────────────
@pytest.mark.slow
@pytest.mark.integration
def test_full_pipeline(): ...

# pytest.ini or pyproject.toml:
# [tool.pytest.ini_options]
# markers = ["slow: slow tests", "integration: requires external services"]
# Run: pytest -m "not slow"  or  pytest -m "integration"

# ── conftest.py — shared fixtures without import ──────────────
# Place conftest.py in test root — fixtures auto-available to all tests
# Can stack conftest.py files in subdirectories

# ── pytest plugins ───────────────────────────────────────────
# pytest-xdist    — parallel test execution: pytest -n auto
# pytest-cov      — coverage: pytest --cov=myapp --cov-report=html
# pytest-asyncio  — async test support
# pytest-mock     — mocker fixture  
# pytest-benchmark — benchmarks in tests
# pytest-snapshot — snapshot testing
Testing Expert

Mock Deep Dive

pythonMock, MagicMock, patch, spec, AsyncMock, side_effect
from unittest.mock import Mock, MagicMock, patch, AsyncMock, call, sentinel

# ── spec — mock with type safety ──────────────────────────────
from myapp.services import UserService

mock_svc = Mock(spec=UserService)
# mock_svc.non_existent_method()  # AttributeError — spec prevents this!
mock_svc.get_user.return_value = {"id": 1, "name": "Alice"}
mock_svc.get_user(1)             # Returns {"id": 1, ...}

# ── side_effect — dynamic return values ──────────────────────
mock = Mock()
mock.side_effect = [1, 2, 3]       # Returns 1, then 2, then 3
mock.side_effect = ValueError("DB down")  # Always raises
mock.side_effect = lambda x: x * 2        # Call a function

# ── patch — replace real objects during test ─────────────────
with patch("myapp.services.send_email") as mock_email:
    process_order(123)
    mock_email.assert_called_once_with(
        to="user@example.com",
        subject="Order confirmed"
    )

# As decorator
@patch("myapp.db.Session")
@patch("myapp.services.EmailService.send")
def test_checkout(mock_send, mock_session):
    mock_session.return_value.__enter__.return_value = mock_session
    checkout(cart={})
    mock_send.assert_called_once()

# ── AsyncMock — mock async functions ──────────────────────────
async_mock = AsyncMock(return_value="async result")
result = await async_mock()   # "async result"

with patch("myapp.client.fetch", new_callable=AsyncMock) as mock_fetch:
    mock_fetch.return_value = {"data": "test"}
    result = await my_async_function()

# ── Assert call history ───────────────────────────────────────
mock = Mock()
mock(1, 2)
mock(key="value")

mock.call_count            # 2
mock.call_args_list        # [call(1, 2), call(key="value")]
mock.assert_any_call(1, 2)
mock.assert_called_with(key="value")  # Last call
mock.assert_has_calls([call(1, 2), call(key="value")])
Security

Security Practices

pythonsecrets, subprocess safety, SQL injection prevention
import secrets, hashlib, hmac, os, subprocess

# ── secrets — cryptographically secure random ────────────────
token = secrets.token_hex(32)          # 64-char hex token (256-bit)
url_token = secrets.token_urlsafe(32)  # URL-safe base64 token
api_key = secrets.token_bytes(32)      # Raw bytes

# Constant-time comparison (prevents timing attacks)
secrets.compare_digest(token1, token2)  # Use this, NOT ==

# ── subprocess — avoid shell injection ───────────────────────
# NEVER: subprocess.run(f"ls {user_input}", shell=True)  ❌
# Safe: pass as list, shell=False
filename = "file.txt"
result = subprocess.run(
    ["ls", "-la", filename],   # List form — no shell interpretation
    capture_output=True,
    text=True,
    timeout=10,
    check=True,               # Raises CalledProcessError on non-zero exit
)

# ── SQL injection prevention ──────────────────────────────────
# NEVER: cursor.execute(f"SELECT * FROM users WHERE name='{name}'")  ❌
# Safe: use parameterized queries
cursor.execute("SELECT * FROM users WHERE name = ?", (name,))  # ✅
# SQLAlchemy ORM is safe by default — use it instead of raw SQL

# ── File path traversal prevention ──────────────────────────
from pathlib import Path
import os

def safe_read(base_dir: str, user_filename: str) -> bytes:
    base = Path(base_dir).resolve()
    target = (base / user_filename).resolve()
    # Ensure target is within base_dir — prevents ../../etc/passwd
    target.relative_to(base)   # Raises ValueError if outside base
    return target.read_bytes()

# ── Environment variable secrets ─────────────────────────────
# NEVER commit secrets to source. Use:
# os.environ.get("SECRET_KEY")        — fail silently (bad)
# os.environ["SECRET_KEY"]            — fail explicitly (ok)
# Or python-dotenv for development

from dotenv import load_dotenv
load_dotenv()  # Loads .env file — NEVER commit .env!
secret = os.environ["SECRET_KEY"]
Security

Cryptography

pythonhashlib, hmac, cryptography library, Fernet symmetric encryption
import hashlib, hmac, secrets
# pip install cryptography
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.asymmetric import rsa, padding
import base64

# ── Password hashing (use bcrypt or Argon2, not SHA) ──────────
# pip install bcrypt
import bcrypt
password = b"correct horse battery staple"
hashed = bcrypt.hashpw(password, bcrypt.gensalt(rounds=12))
bcrypt.checkpw(password, hashed)   # True — constant time

# ── HMAC — message authentication ────────────────────────────
secret_key = secrets.token_bytes(32)
message = b"important data"

mac = hmac.new(secret_key, message, hashlib.sha256).hexdigest()
# Verify:
expected = hmac.new(secret_key, received_message, hashlib.sha256).hexdigest()
secrets.compare_digest(mac, expected)   # Timing-safe comparison

# ── Fernet — symmetric encryption (AES-128-CBC + HMAC-SHA256) ──
key = Fernet.generate_key()    # 32 bytes, URL-safe base64
fernet = Fernet(key)

ciphertext = fernet.encrypt(b"secret message")
plaintext  = fernet.decrypt(ciphertext)    # b"secret message"

# Token with expiry
token = fernet.encrypt(b"user_id:42")
fernet.decrypt(token, max_age=3600)       # Expires after 1 hour

# ── PBKDF2 — derive encryption key from password ─────────────
kdf = PBKDF2HMAC(
    algorithm=hashes.SHA256(),
    length=32,
    salt=secrets.token_bytes(16),   # Store this with ciphertext!
    iterations=480000,              # OWASP 2023 recommendation
)
key = base64.urlsafe_b64encode(kdf.derive(b"user_password"))

# ── RSA — asymmetric encryption ───────────────────────────────
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key  = private_key.public_key()

ciphertext = public_key.encrypt(
    b"secret",
    padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
plaintext = private_key.decrypt(ciphertext, padding.OAEP(...))
Packaging

Packaging & Distribution

tomlpyproject.toml — complete package config
[build-system]
requires      = ["hatchling"]    # Build backend (alt: flit, setuptools, pdm)
build-backend = "hatchling.build"

[project]
name     = "my-awesome-lib"
version  = "1.2.0"
requires-python = ">=3.10"
description     = "A useful library"
readme          = "README.md"
license         = {text = "MIT"}
authors         = [{name = "Alice Dev", email = "alice@example.com"}]
keywords        = ["utility", "python"]
classifiers     = [
    "Development Status :: 5 - Production/Stable",
    "Programming Language :: Python :: 3.10",
    "License :: OSI Approved :: MIT License",
]
dependencies = [                    # Runtime deps (like NuGet package deps)
    "httpx>=0.26.0",
    "pydantic>=2.0,<3.0",
]

[project.optional-dependencies]    # pip install mylib[dev,docs]
dev  = ["pytest", "ruff", "mypy"]
docs = ["mkdocs", "mkdocstrings"]

[project.scripts]                   # CLI entry points
mylib-cli = "mylib.cli:main"

[project.entry-points."mylib.plugins"]   # Plugin registration
csv = "mylib.plugins.csv:CSVPlugin"

[project.urls]
Homepage      = "https://github.com/alice/mylib"
Documentation = "https://mylib.readthedocs.io"
Changelog     = "https://github.com/alice/mylib/CHANGELOG.md"
bashBuild and publish workflow
# Install build tools
pip install build twine

# Build wheel + sdist
python -m build
# dist/
#   my_awesome_lib-1.2.0-py3-none-any.whl
#   my_awesome_lib-1.2.0.tar.gz

# Inspect wheel contents
unzip -l dist/my_awesome_lib-*.whl

# Upload to TestPyPI first
twine upload --repository testpypi dist/*
pip install --index-url https://test.pypi.org/simple/ my-awesome-lib

# Upload to PyPI
twine upload dist/*

# Verify installation
pip install my-awesome-lib
python -c "import mylib; print(mylib.__version__)"

# Trusted publisher (no API key) via GitHub Actions + PyPI OIDC
# pyproject.toml sets up the project, then:
# - push tag v1.2.0
# - GitHub Actions builds and publishes with short-lived OIDC token
Design Patterns

Pythonic Design Patterns

pythonObserver, Command, Chain of Responsibility, Builder — Pythonically
# ── Observer pattern — event system ──────────────────────────
from collections import defaultdict
from typing import Callable, Any

class EventBus:
    def __init__(self):
        self._listeners: dict[str, list[Callable]] = defaultdict(list)

    def on(self, event: str, handler: Callable) -> Callable:
        self._listeners[event].append(handler)
        return handler  # Enable use as decorator

    async def emit(self, event: str, **data: Any) -> None:
        for handler in self._listeners.get(event, []):
            result = handler(**data)
            if asyncio.iscoroutine(result):
                await result

bus = EventBus()

@bus.on("order.created")
async def send_confirmation(order_id, user_email, **_):
    await email_service.send(user_email, f"Order {order_id} confirmed")

@bus.on("order.created")
def update_inventory(order_id, items, **_):
    inventory.reserve(order_id, items)

await bus.emit("order.created", order_id=42, user_email="a@b.com", items=[])

# ── Command pattern — undo/redo queue ─────────────────────────
from abc import ABC, abstractmethod
from collections import deque

class Command(ABC):
    @abstractmethod
    def execute(self) -> None: ...
    @abstractmethod
    def undo(self) -> None: ...

class CommandHistory:
    def __init__(self, max_size: int = 50):
        self._history: deque[Command] = deque(maxlen=max_size)

    def execute(self, cmd: Command) -> None:
        cmd.execute()
        self._history.append(cmd)

    def undo(self) -> None:
        if self._history:
            self._history.pop().undo()

# ── Chain of Responsibility — middleware chain ─────────────────
class Handler(ABC):
    def __init__(self): self._next: Handler | None = None

    def set_next(self, handler: "Handler") -> "Handler":
        self._next = handler
        return handler  # Enable chaining: auth.set_next(rate).set_next(cache)

    @abstractmethod
    def handle(self, request: dict) -> dict | None: ...

    def _pass_to_next(self, request):
        if self._next: return self._next.handle(request)
        return None

# ── Builder pattern — fluent interface ────────────────────────
class QueryBuilder:
    def __init__(self, table: str):
        self._table = table
        self._conditions: list[str] = []
        self._limit: int | None = None
        self._order: str | None = None

    def where(self, cond: str) -> "QueryBuilder":
        self._conditions.append(cond); return self

    def limit(self, n: int) -> "QueryBuilder":
        self._limit = n; return self

    def order_by(self, col: str) -> "QueryBuilder":
        self._order = col; return self

    def build(self) -> str:
        sql = f"SELECT * FROM {self._table}"
        if self._conditions:
            sql += " WHERE " + " AND ".join(self._conditions)
        if self._order: sql += f" ORDER BY {self._order}"
        if self._limit: sql += f" LIMIT {self._limit}"
        return sql

query = (
    QueryBuilder("users")
    .where("active = 1")
    .where("age > 18")
    .order_by("name")
    .limit(50)
    .build()
)
Reference

Expert Cheat Sheet

CPython Internals Quick Reference

TopicTool/APIUse For
Bytecode inspectiondis.dis(func)Understand why code is fast/slow
Memory trackingtracemalloc.start() / .take_snapshot()Find memory leaks, measure allocation
Reference countingsys.getrefcount(obj)Debug GC issues, understand lifetime
Object interningsys.intern(s)Share identical string objects
Import hookssys.meta_path.insert(0, finder)Custom module loading
AST inspectionast.parse() + NodeVisitorCode analysis, linters, formatters
AST transformationNodeTransformer + compile()Macros, optimizers, DSLs
GC controlgc.disable() / gc.collect()Performance tuning, cycle detection

Type System at Expert Level

ConstructPurpose
TypeVar("T", bound=X)Generic constrained to X or subclasses
ParamSpec("P")Preserve callable signature through decorators
TypeVarTuple("Ts")Variadic generics (tuple of arbitrary types)
Annotated[T, metadata]Attach runtime metadata to type hints
Literal["a", "b"]Restrict to exact values
Final[T]Constant — cannot be reassigned
LiteralStringOnly string literals — prevents injection
NeverFunction that never returns
SelfReturn type = same type as class
TypeGuard[T]Narrow type when function returns True
@overloadMultiple typed signatures for one function
assert_never(x)Exhaustiveness checking at type-check time

Performance Decision Tree

ProblemSolution
Algorithmic bottleneckFix the algorithm first — no optimization can fix O(n²) → O(n)
Python loop over numbersNumPy vectorization — 100–1000x speedup
Custom numerical algorithmNumba @njit — C speed, write Python
Tight loop, many small objects__slots__ + local variable caching + set for membership
Need to call C libraryctypes (no compile) or cffi (better for complex APIs)
Whole module needs C speedCython .pyx or mypyc for typed Python
I/O bound concurrencyasyncio + anyio for 10K+ concurrent connections
CPU bound parallelismProcessPoolExecutor / multiprocessing (bypasses GIL)

Expert Badge Summary

dis.dis() — understand bytecode tracemalloc — track allocations sys.meta_path — custom imports ast.NodeTransformer — code gen ParamSpec — typed decorators TypeGuard — narrow types assert_never — exhaustiveness LiteralString — injection safe ContextVar — async context ExceptionGroup — multi-error asyncio.TaskGroup — structured anyio — backend agnostic Numba @njit — JIT compile mypyc — compile typed Python ctypes — call C without wrapping @njit(parallel=True) — multi-core hypothesis — property tests @given(strategies) — auto-inputs Mock(spec=) — type-safe mocks AsyncMock — async unit tests secrets.token_hex() — CSPRNG bcrypt — password hashing Fernet — symmetric encryption subprocess list form — no injection