PYTHON
301
The advanced knowledge that separates Python experts from developers who use Python. Covers CPython bytecode, the import system, AST manipulation, the full type system, performance profiling, C interop, exception groups, structured logging, property-based testing, cryptography, packaging, and Pythonic design patterns.
Bytecode & dis
Python source code is compiled to bytecode — a compact representation executed by the CPython virtual machine (a stack-based interpreter). Understanding bytecode explains why certain Python patterns are faster than others, and how the interpreter works under the hood.
import dis def add(x, y): return x + y # Disassemble to human-readable bytecode dis.dis(add) # Offset Opname Argument # 0 RESUME 0 # 2 LOAD_FAST 0 (x) # 4 LOAD_FAST 1 (y) # 6 BINARY_OP 0 (+) # 10 RETURN_VALUE # Code object attributes code = add.__code__ code.co_varnames # ('x', 'y') — local variable names code.co_consts # (None,) — compile-time constants code.co_names # () — global/attr names referenced code.co_code # bytes — raw bytecode code.co_stacksize # max stack depth needed code.co_flags # flags (generator, coroutine, etc.) code.co_filename # source file code.co_firstlineno # line number of first line # Compare two implementations def slow_join(parts): result = "" for p in parts: result += p # Creates new string each iteration! return result def fast_join(parts): return "".join(parts) # Single allocation dis.dis(slow_join) # Shows repeated BINARY_OP with store_fast dis.dis(fast_join) # Shows single LOAD_CONST + LOAD_FAST + CALL # Bytecode comparison of list comprehension vs for loop def loop_version(data): result = [] for x in data: result.append(x * 2) return result def comp_version(data): return [x * 2 for x in data] # comp_version is faster: list comprehension uses LIST_APPEND opcode # which is more efficient than repeatedly calling list.append() # Get the .pyc cache file path import importlib.util spec = importlib.util.find_spec("json") print(importlib.util.cache_from_source(spec.origin)) # ~/.../lib/python3.12/__pycache__/json.cpython-312.pyc
Local variables use LOAD_FAST (array index lookup — very fast). Global variables use LOAD_GLOBAL (dict lookup — slower). This is why caching a global as a local inside a hot loop speeds things up. Also why closures accessing enclosing scope are slightly slower than locals.
Memory Model
CPython uses reference counting as its primary memory management strategy. Every object has a reference count. When it drops to zero, the object is deallocated immediately. Understanding this explains many Python performance characteristics and memory behaviors.
import sys, tracemalloc, gc # ── Reference counting ───────────────────────────────────────── x = [] sys.getrefcount(x) # 2 (x itself + getrefcount's own ref) y = x sys.getrefcount(x) # 3 (x, y, getrefcount) del y sys.getrefcount(x) # 2 # ── Object identity and interning ───────────────────────────── # Small integers (-5 to 256) are interned (cached singletons) a = 100; b = 100 a is b # True — same object! a = 1000; b = 1000 a is b # False — different objects (outside cached range) a == b # True — same value # String interning — identifiers and small strings are interned s1 = "hello"; s2 = "hello" s1 is s2 # True — interned s1 = "hello world"; s2 = "hello world" s1 is s2 # May be False — not guaranteed # Force interning s1 = sys.intern("hello world") s2 = sys.intern("hello world") s1 is s2 # True — now interned # ── Object sizes ────────────────────────────────────────────── sys.getsizeof([]) # 56 bytes — empty list sys.getsizeof([1,2,3]) # 88 bytes — 3 element list sys.getsizeof({}) # 232 bytes — empty dict sys.getsizeof("hello") # 54 bytes # Note: getsizeof doesn't include referenced objects! # Deep size (include referenced objects) def deep_sizeof(obj, seen=None): from sys import getsizeof from collections.abc import Mapping, Sequence size = getsizeof(obj) if seen is None: seen = set() obj_id = id(obj) if obj_id in seen: return 0 seen.add(obj_id) if isinstance(obj, Mapping): size += sum(deep_sizeof(k, seen) + deep_sizeof(v, seen) for k,v in obj.items()) elif isinstance(obj, Sequence) and not isinstance(obj, (str, bytes)): size += sum(deep_sizeof(i, seen) for i in obj) return size # ── tracemalloc — track memory allocations ──────────────────── tracemalloc.start() # ... code to profile ... big_list = [dict(i=i, v=i**2) for i in range(10000)] snapshot = tracemalloc.take_snapshot() top_stats = snapshot.statistics("lineno") for stat in top_stats[:5]: print(stat) # Shows file:line — size — count tracemalloc.stop()
Garbage Collector
import gc # Reference counting misses circular references! class Node: def __init__(self, val): self.val = val self.next = None a = Node(1) b = Node(2) a.next = b # a → b b.next = a # b → a — CYCLE! Neither refcount drops to 0 del a, b # Objects still alive — cycle keeps refcounts at 1 gc.collect() # Returns number of unreachable objects collected # GC generations (0=young, 1=middle, 2=old) gc.get_threshold() # (700, 10, 10) — default collection thresholds gc.set_threshold(1000, 15, 15) # Tune for your workload gc.get_count() # Current (gen0, gen1, gen2) object counts gc.get_objects() # All objects tracked by GC (expensive!) # Disable GC (safe if you avoid cycles — massive perf gain) gc.disable() # Get referrers (useful for leak hunting) x = [1, 2, 3] print(gc.get_referrers(x)) # Shows what holds references to x # Make an object uncollectable (add __del__ to a cycle) # In Python 3.4+, __del__ no longer prevents GC — safe to use # __del__ in cycles. But avoid if you can. # Profiling GC impact import gc, time gc.disable() start = time.perf_counter() # ... workload ... gc.enable(); gc.collect() print(f"GC overhead: {time.perf_counter()-start:.4f}s")
Import System
Python's import system is fully customizable via hooks. Understanding sys.meta_path, finders, and loaders lets you intercept imports, load modules from databases, implement lazy loading, and create import-time transformations.
import sys, importlib, importlib.util, importlib.abc # ── Import mechanics ─────────────────────────────────────────── # When you do 'import foo', Python: # 1. Checks sys.modules (cache — instant if found) # 2. Calls each finder in sys.meta_path # 3. Finder returns a loader # 4. Loader executes module code # 5. Module added to sys.modules sys.meta_path # [BuiltinImporter, FrozenImporter, PathFinder] sys.modules["json"] # Already-loaded modules (import cache) # ── Manual import ───────────────────────────────────────────── spec = importlib.util.spec_from_file_location("mymod", "/path/to/mod.py") module = importlib.util.module_from_spec(spec) sys.modules["mymod"] = module spec.loader.exec_module(module) # ── Reload a module (hot-reload pattern) ────────────────────── importlib.reload(sys.modules["mymod"]) # ── Custom meta path finder ──────────────────────────────────── class DatabaseFinder: """Load modules stored in a database.""" def find_spec(self, fullname, path, target=None): """Return a ModuleSpec if we can load this module, else None.""" source = self._fetch_from_db(fullname) if source is None: return None loader = DatabaseLoader(fullname, source) return importlib.util.spec_from_loader(fullname, loader) def _fetch_from_db(self, name): db = {"dbmod": "MY_VALUE = 42\ndef hello(): return 'from db!'"} return db.get(name) class DatabaseLoader(importlib.abc.Loader): def __init__(self, name, source): self.name, self.source = name, source def create_module(self, spec): return None # Use default def exec_module(self, module): exec(compile(self.source, self.name, "exec"), module.__dict__) sys.meta_path.insert(0, DatabaseFinder()) import dbmod # Now works — loaded from dict! print(dbmod.MY_VALUE) # 42 # ── Lazy imports ────────────────────────────────────────────── import importlib.util class LazyModule: """Import only when first accessed — saves startup time.""" def __init__(self, name): self._name = name self._module = None def __getattr__(self, attr): if self._module is None: self._module = importlib.import_module(self._name) return getattr(self._module, attr) # Heavy module not imported until first use numpy = LazyModule("numpy") # At startup: no numpy import # First use: numpy imported transparently arr = numpy.array([1, 2, 3]) # Import happens HERE
AST Module
The ast module gives you access to Python's Abstract Syntax Tree — the intermediate representation between source text and bytecode. You can parse code, inspect it, transform it, and generate new code.
import ast source = """ def factorial(n): if n <= 1: return 1 return n * factorial(n - 1) """ # Parse source to AST tree = ast.parse(source) print(ast.dump(tree, indent=2)) # Formatted tree # ── NodeVisitor — walk and inspect the tree ─────────────────── class FunctionAnalyzer(ast.NodeVisitor): """Count calls, find undefined variables, detect recursion.""" def __init__(self): self.functions = [] self.calls = [] def visit_FunctionDef(self, node): self.functions.append(node.name) self.generic_visit(node) # Visit children def visit_Call(self, node): if isinstance(node.func, ast.Name): self.calls.append(node.func.id) self.generic_visit(node) analyzer = FunctionAnalyzer() analyzer.visit(tree) print(analyzer.functions) # ["factorial"] print(analyzer.calls) # ["factorial"] — recursive call detected # ── NodeTransformer — modify the AST ───────────────────────── class ConstantFolder(ast.NodeTransformer): """Fold constant expressions at compile time.""" def visit_BinOp(self, node): self.generic_visit(node) # Transform children first if (isinstance(node.left, ast.Constant) and isinstance(node.right, ast.Constant) and isinstance(node.op, (ast.Add, ast.Mul, ast.Sub))): try: # Evaluate constant expression at parse time result = eval(compile(ast.Expression(node), "<folding>", "eval")) return ast.Constant(value=result) except: pass return node source = "result = 2 * 3 * 7" tree = ast.parse(source) tree = ConstantFolder().visit(tree) ast.fix_missing_locations(tree) # Required after transformation code = compile(tree, "<constant_folding>", "exec") exec(code) # result is now computed from ast.Constant(42) — no runtime multiply # ── Practical: check for missing await ──────────────────────── class MissingAwaitDetector(ast.NodeVisitor): def visit_AsyncFunctionDef(self, node): for child in ast.walk(node): if isinstance(child, ast.Call): if isinstance(child.func, ast.Name): # Heuristic: coroutine functions often end with _async/_coro if child.func.id.endswith("_async"): print(f"Possible missing await at line {child.lineno}") self.generic_visit(node)
exec / eval / compile
# ── eval — evaluate a single expression ────────────────────── result = eval("2 ** 10") # 1024 eval("x + y", {"x": 3, "y": 4}) # 7 — with custom namespace # ── exec — execute statements ───────────────────────────────── namespace = {} exec(""" def greet(name): return f'Hello, {name}!' """, namespace) print(namespace["greet"]("World")) # "Hello, World!" # ── compile — compile to code object ───────────────────────── code = compile("x + 1", "<string>", "eval") # mode: eval/exec/single eval(code, {"x": 41}) # 42 — reuse compiled code object, faster # ── Restricted execution (sandbox) ─────────────────────────── def safe_eval(expr: str, extra_vars: dict = None): """Evaluate expression with restricted builtins.""" # Restrict builtins to safe operations safe_builtins = { "abs": abs, "max": max, "min": min, "len": len, "sum": sum, "sorted": sorted, "int": int, "float": float, "str": str, "True": True, "False": False, "None": None, } globals_ = {"__builtins__": safe_builtins} if extra_vars: globals_.update(extra_vars) return eval(expr, globals_, {}) safe_eval("max(1, 2, 3)") # 3 ✅ # safe_eval("__import__('os')") # NameError ✅ — blocked # ── Dynamic function generation from template ───────────────── def make_validator(field_name: str, min_val: int, max_val: int): """Compile a tight validation function at runtime.""" source = f""" def validate_{field_name}(value): if not isinstance(value, (int, float)): raise TypeError(f'{field_name} must be numeric') if not ({min_val} <= value <= {max_val}): raise ValueError(f'{field_name} must be between {min_val} and {max_val}') return value """ namespace = {} exec(compile(source, f"<validator_{field_name}>", "exec"), namespace) return namespace[f"validate_{field_name}"] validate_age = make_validator("age", 0, 150) validate_age(25) # 25 validate_age(200) # ValueError
eval() and exec() with user-controlled input is a critical security vulnerability. Even "restricted" namespaces can be escaped via metaclass traversal ("__class__.__mro__[-1].__subclasses__()"). Use ast.literal_eval() for safe parsing of Python literals, or a proper expression parser library.
TypeVar , ParamSpec & TypeVarTuple
from typing import TypeVar, Generic, Callable, Concatenate from typing import ParamSpec, TypeVarTuple, Unpack # ── TypeVar with bounds and constraints ─────────────────────── T = TypeVar("T") # Unconstrained N = TypeVar("N", int, float, complex) # Constrained: only these 3 C = TypeVar("C", bound="Comparable") # Bound: must be Comparable or subclass def first(items: list[T]) -> T: return items[0] # Return type matches input element type def add(a: N, b: N) -> N: return a + b # a and b must be same numeric type # ── Generic classes ─────────────────────────────────────────── class Stack(Generic[T]): def __init__(self): self._items: list[T] = [] def push(self, item: T) -> None: self._items.append(item) def pop(self) -> T: return self._items.pop() def peek(self) -> T | None: return self._items[-1] if self._items else None stack: Stack[int] = Stack() stack.push(1) # mypy: OK # stack.push("x") # mypy: error — expected int # ── ParamSpec — preserve callable signatures in decorators ──── P = ParamSpec("P") R = TypeVar("R") def timed(func: Callable[P, R]) -> Callable[P, R]: def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: import time start = time.perf_counter() result = func(*args, **kwargs) print(f"{func.__name__}: {time.perf_counter()-start:.4f}s") return result return wrapper @timed def connect(host: str, port: int, *, ssl: bool = False) -> str: ... # mypy knows: connect(host="x", port=80) — fully typed through decorator! # ── Concatenate — add extra args to callable ────────────────── def with_logging(func: Callable[Concatenate[str, P], R]) -> Callable[P, R]: """Removes the first 'log_prefix' arg from the signature.""" def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: return func("[LOG]", *args, **kwargs) return wrapper # ── TypeVarTuple — variadic generics ────────────────────────── Ts = TypeVarTuple("Ts") def zip_typed(*iterables: Unpack[Ts]) -> list[tuple[Unpack[Ts]]]: return list(zip(*iterables))
Annotated & Literal
from typing import Annotated, Literal, Final, ClassVar, Self, Never, LiteralString # ── Annotated — attach metadata to type hints ──────────────── # Metadata is IGNORED by the type checker (it only uses the base type) # Used by frameworks: pydantic, FastAPI, SQLAlchemy, etc. class Gt: """Greater-than constraint.""" def __init__(self, val): self.val = val class MaxLen: def __init__(self, n): self.n = n PositiveInt = Annotated[int, Gt(0)] ShortStr = Annotated[str, MaxLen(50)] def create_user(name: ShortStr, age: PositiveInt) -> dict: # Pydantic reads Annotated metadata to create validators! return {"name": name, "age": age} # ── Literal — exact value types ────────────────────────────── Direction = Literal["north", "south", "east", "west"] HTTPMethod = Literal["GET", "POST", "PUT", "DELETE"] StatusCode = Literal[200, 201, 400, 401, 403, 404, 500] def move(direction: Direction) -> None: print(f"Moving {direction}") move("north") # ✅ mypy OK # move("up") # ❌ mypy error: not a valid Direction # ── Final — constant declarations ───────────────────────────── MAX_CONNECTIONS: Final = 100 # MAX_CONNECTIONS = 200 # ❌ mypy error: cannot reassign a Final class Config: TIMEOUT: Final[int] = 30 # ── Self — return type for methods that return self ─────────── class QueryBuilder: def where(self, condition: str) -> Self: # Returns same type self._conditions.append(condition) return self # Subclasses get correct return type automatically def limit(self, n: int) -> Self: self._limit = n return self # ── Never — function that never returns ─────────────────────── def abort(msg: str) -> Never: raise SystemExit(msg) # ── LiteralString — only string literals, prevents injection ── def execute_sql(query: LiteralString) -> list: # mypy will error if query could be user-controlled string return db.execute(query) # Safe — query is a literal execute_sql("SELECT * FROM users") # ✅ user_input = input() # execute_sql(user_input) # ❌ mypy: not a LiteralString
TypeGuard & Type Narrowing
from typing import TypeGuard, Union, assert_type, assert_never from typing import TypeIs # Python 3.13+ # ── TypeGuard — narrow type inside if blocks ────────────────── def is_list_of_str(val: list) -> TypeGuard[list[str]]: """When this returns True, mypy knows val is list[str].""" return all(isinstance(x, str) for x in val) def process(items: list[str | int]) -> None: if is_list_of_str(items): # mypy knows items is list[str] here for s in items: print(s.upper()) # ✅ — s is str # ── assert_type — verify type at type-check time ────────────── x: int | str = get_value() if isinstance(x, int): assert_type(x, int) # ✅ type narrowed to int # ── assert_never — exhaustiveness checking ──────────────────── Shape = Literal["circle", "square", "triangle"] def get_area(shape: Shape, size: float) -> float: if shape == "circle": return 3.14 * size ** 2 elif shape == "square": return size ** 2 elif shape == "triangle": return 0.5 * size ** 2 else: assert_never(shape) # mypy errors here if Shape has unhandled case! # If you add "pentagon" to Shape and forget this branch: compile error
@overload — Multiple Signatures
from typing import overload # @overload lets you declare multiple type signatures for ONE function. # The overload definitions are type-checker only — never called. # The actual implementation uses Union types. @overload def process(data: str) -> str: ... @overload def process(data: int) -> int: ... @overload def process(data: list[str]) -> list[str]: ... def process(data: str | int | list[str]) -> str | int | list[str]: """Actual implementation — handles all cases.""" if isinstance(data, str): return data.upper() elif isinstance(data, int): return data * 2 else: return [s.upper() for s in data] # mypy infers correct return type from argument type: result1 = process("hello") # type: str result2 = process(42) # type: int result3 = process(["a", "b"]) # type: list[str] # ── Overload for methods with optional args ──────────────────── class Connection: @overload def get(self, key: str) -> str | None: ... @overload def get(self, key: str, default: T) -> str | T: ... def get(self, key, default=None): return self._data.get(key, default)
Runtime Type Checking
import typing from typing import get_type_hints # ── get_type_hints — resolve annotations at runtime ─────────── def greet(name: str, age: int) -> str: ... hints = get_type_hints(greet) # {"name": str, "age": int, "return": str} # Resolves forward references (strings like "MyClass") class Node: children: list["Node"] # Forward ref get_type_hints(Node) # Resolves "Node" to actual Node class # ── Runtime type enforcement (beartype library) ──────────────── # pip install beartype from beartype import beartype @beartype def sqrt(n: float | int) -> float: return n ** 0.5 sqrt(4) # 2.0 ✅ sqrt("4") # BeartypeException at runtime ✅ # Beartype is O(1) — checks only the outermost type, not deep recursion # Much faster than pydantic for runtime checking of function signatures # ── Roll your own runtime validator using annotations ────────── import functools, inspect def enforce_types(func): hints = get_type_hints(func) params = list(inspect.signature(func).parameters.keys()) @functools.wraps(func) def wrapper(*args, **kwargs): for param, value in zip(params, args): expected = hints.get(param) if expected and not isinstance(value, expected): raise TypeError( f"{param}: expected {expected.__name__}, got {type(value).__name__}") return func(*args, **kwargs) return wrapper
Async Generators
import asyncio # ── Async generator — yields values asynchronously ─────────── async def stream_db_rows(query: str, batch_size: int = 100): """Yield database rows in batches without loading all into memory.""" offset = 0 while True: batch = await db.execute(f"{query} LIMIT {batch_size} OFFSET {offset}") if not batch: break for row in batch: yield row # async generator yield offset += batch_size # Consume with async for async def process_all(): async for row in stream_db_rows("SELECT * FROM events"): await process_row(row) # ── Async generator with aclose and athrow ──────────────────── async def tick(interval: float): while True: yield asyncio.get_event_loop().time() await asyncio.sleep(interval) async def main(): gen = tick(1.0) for _ in range(3): ts = await gen.__anext__() print(f"tick: {ts:.2f}") await gen.aclose() # Properly close the generator # ── Async comprehensions ─────────────────────────────────────── async def get_user_names(): return [user.name async for user in stream_db_rows("SELECT name FROM users")] # async generator expression names_gen = (row["name"] async for row in stream_db_rows("SELECT name FROM users")) # Lazy — processes one row at a time
contextvars — Async-Safe Context
contextvars provides context-local storage that works correctly across async tasks — unlike threading.local() which doesn't work with asyncio. Perfect for request IDs, user sessions, and correlation IDs in async web applications.
from contextvars import ContextVar, copy_context import asyncio # ── Declare context variables ───────────────────────────────── request_id: ContextVar[str] = ContextVar("request_id", default="unknown") current_user: ContextVar[str | None] = ContextVar("current_user", default=None) # ── Middleware sets context for each request ────────────────── async def request_middleware(request, handler): # Each request gets its OWN context — no cross-contamination! token_rid = request_id.set(str(uuid.uuid4())) token_usr = current_user.set(request.user) try: return await handler(request) finally: request_id.reset(token_rid) # Restore previous value current_user.reset(token_usr) # ── Access context from anywhere in the call stack ──────────── async def log_event(event: str): rid = request_id.get() user = current_user.get() print(f"[{rid}] user={user} event={event}") async def process_payment(amount: float): await log_event(f"payment:{amount}") # Gets correct request_id automatically! # ── copy_context — snapshot current context for task isolation ── async def spawn_background_task(): ctx = copy_context() # Snapshot the current context asyncio.create_task(ctx.run(background_work)) # background_work inherits our request_id, but changes won't affect us # ── ContextVar + Structured Logging ─────────────────────────── import logging, contextvars log_context: ContextVar[dict] = ContextVar("log_context", default={}) class ContextFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: ctx = log_context.get() for key, value in ctx.items(): setattr(record, key, value) return True
anyio & Trio
# pip install anyio[trio] import anyio # ── anyio — backend-agnostic async primitives ───────────────── async def fetch_all(urls: list[str]) -> list[str]: results = [] async with anyio.create_task_group() as tg: async def fetch_one(url: str): async with anyio.open_url(url) as resp: results.append(await resp.text()) for url in urls: tg.start_soon(fetch_one, url) # Like asyncio.create_task return results # Run on asyncio (default) anyio.run(fetch_all, ["https://example.com"]) # Run on Trio anyio.run(fetch_all, ["https://example.com"], backend="trio") # ── anyio synchronization primitives ───────────────────────── async def with_primitives(): lock = anyio.Lock() event = anyio.Event() semaphore = anyio.Semaphore(10) queue = anyio.create_memory_object_stream() async with lock: # Exclusive access pass async with semaphore: # Max 10 concurrent pass # ── Cancel scopes — explicit cancellation ──────────────────── async def with_timeout(): try: with anyio.fail_after(5.0): # Raises TimeoutError after 5s result = await long_operation() except TimeoutError: print("Timed out") with anyio.move_on_after(5.0) as scope: # Returns, no exception result = await long_operation() if scope.cancelled_caught: print("Operation timed out silently")
Profiling
import cProfile, pstats, io # ── cProfile — function-level profiling ────────────────────── def profile(func): """Decorator that profiles a function call.""" def wrapper(*args, **kwargs): pr = cProfile.Profile() pr.enable() result = func(*args, **kwargs) pr.disable() stream = io.StringIO() ps = pstats.Stats(pr, stream=stream) ps.sort_stats(pstats.SortKey.CUMULATIVE) ps.print_stats(20) # Top 20 functions print(stream.getvalue()) return result return wrapper # CLI: python -m cProfile -s cumulative my_script.py # Visualize with: snakeviz (pip install snakeviz) # snakeviz profile.prof # ── line_profiler — line-by-line profiling ──────────────────── # pip install line_profiler # @profile ← decorator injected by kernprof command # def slow_function(): # data = list(range(1_000_000)) # Line X time ≈ 100ms # total = sum(data) # Line X time ≈ 50ms # # Run: kernprof -l -v my_script.py # ── memory_profiler — memory usage per line ─────────────────── # pip install memory_profiler # @profile ← decorator injected by mprof # def build_index(): # index = {} # ... # Run: python -m memory_profiler my_script.py # Or: mprof run script.py && mprof plot # ── py-spy — sampling profiler, no code changes ─────────────── # pip install py-spy # py-spy top --pid $PID # Live view (like top for Python) # py-spy record -o profile.svg -- python script.py # Flame graph! # ── timeit — micro-benchmarks ───────────────────────────────── import timeit # Best of 5 runs, 100000 iterations time_loop = timeit.timeit( stmt="sum(x**2 for x in range(1000))", number=10000 ) time_np = timeit.timeit( stmt="(arr**2).sum()", setup="import numpy as np; arr = np.arange(1000)", number=10000 ) print(f"Loop: {time_loop:.3f}s NumPy: {time_np:.3f}s Ratio: {time_loop/time_np:.1f}x")
Optimization Techniques
import sys, dis from functools import lru_cache # ── 1. Cache global lookups as locals in hot loops ──────────── # GLOBAL lookup: 2 dict lookups (builtins + module globals) # LOCAL lookup: 1 array index (LOAD_FAST) def slow(data): result = [] for x in data: result.append(math.sqrt(x)) # math lookup every iteration def fast(data): sqrt = math.sqrt # Cache as local — one lookup at function entry result_append = [].append # Cache method too result = [] for x in data: result_append(sqrt(x)) # LOAD_FAST everywhere — 2-3x faster in tight loops # ── 2. Use join() for string building ──────────────────────── def build_str_slow(parts: list[str]) -> str: result = "" for p in parts: result += p # O(n²) — new allocation each time return result def build_str_fast(parts: list[str]) -> str: return "".join(parts) # O(n) — one allocation # ── 3. __slots__ for millions of small objects ──────────────── # See Python 201 — 60% memory reduction per instance # ── 4. Avoid attribute lookup in hot loops ──────────────────── class Particle: __slots__ = ("x", "y", "vx", "vy") def update_slow(particles): for p in particles: p.x += p.vx # Attribute lookup each time p.y += p.vy def update_fast(particles): for p in particles: x, y, vx, vy = p.x, p.y, p.vx, p.vy # Load to locals p.x, p.y = x + vx, y + vy # ── 5. Use sets for membership, not lists ───────────────────── # O(1) average vs O(n) bad_words_list = ["spam", "junk", ...] # O(n) lookup bad_words_set = {"spam", "junk", ...} # O(1) lookup # ── 6. Generator expressions over list comprehensions ───────── # When you only iterate once and don't need random access total = sum(x**2 for x in data) # Generator: O(1) memory # vs sum([x**2 for x in data]) # List: O(n) memory # ── 7. Use deque for queues ────────────────────────────────── from collections import deque queue = deque() # O(1) append/pop from both ends # list.pop(0) is O(n) — never use a list as a queue!
NumPy Vectorization
import numpy as np # ── Vectorization — eliminate Python loops ──────────────────── data = np.random.rand(1_000_000) # Pure Python: ~500ms def normalize_slow(data): mn, mx = min(data), max(data) return [(x - mn) / (mx - mn) for x in data] # NumPy vectorized: ~2ms (250x faster) def normalize_fast(data: np.ndarray) -> np.ndarray: return (data - data.min()) / (data.max() - data.min()) # ── Broadcasting — operations between different shapes ──────── matrix = np.zeros((3, 4)) # (3, 4) row = np.array([1, 2, 3, 4]) # (4,) col = np.array([[1], [2], [3]]) # (3, 1) matrix + row # (3,4) — row broadcast to each row matrix + col # (3,4) — col broadcast to each column row + col # (3,4) — outer sum! # ── Views vs Copies — CRITICAL for performance ──────────────── a = np.arange(10) b = a[2:8] # VIEW — shares memory, no copy! b[0] = 99 # Modifies a too! c = a[2:8].copy() # COPY — independent array a.flags.owndata # True — owns its data b.base is a # True — b is a view of a # ── Fancy indexing ──────────────────────────────────────────── data = np.array([10, 20, 30, 40, 50]) idx = np.array([0, 2, 4]) data[idx] # [10, 30, 50] — index by array mask = data > 25 # Boolean mask: [F,F,T,T,T] data[mask] # [30, 40, 50] — boolean indexing data[data > 25] = 0 # In-place conditional assignment # ── Vectorized string operations ────────────────────────────── names = np.array(["Alice", "Bob", "Charlie"]) np.char.upper(names) # ["ALICE", "BOB", "CHARLIE"] np.char.startswith(names, "A") # [True, False, False] # ── np.where — vectorized conditional ──────────────────────── grades = np.array([45, 72, 89, 56, 91]) results = np.where(grades >= 60, "Pass", "Fail") # ["Fail", "Pass", "Pass", "Fail", "Pass"]
Numba JIT Compilation
# pip install numba from numba import jit, njit, prange, cuda import numpy as np # ── @njit — compile to machine code, no Python fallback ─────── @njit def fast_sum(arr: np.ndarray) -> float: """First call: ~300ms compile. Subsequent calls: ~1μs.""" total = 0.0 for i in range(len(arr)): total += arr[i] return total arr = np.random.rand(10_000_000) fast_sum(arr) # First call compiles JIT fast_sum(arr) # Subsequent calls: C speed! # ── Ahead of time compilation ───────────────────────────────── @njit(cache=True) # Cache compiled code to disk — skip recompile def cached_jit_func(x): ... # ── Parallel loops ──────────────────────────────────────────── @njit(parallel=True) def parallel_sum(arr: np.ndarray) -> float: total = 0.0 for i in prange(len(arr)): # prange = parallel range total += arr[i] return total # ── GPU kernels with numba.cuda ─────────────────────────────── @cuda.jit def gpu_add(a, b, result): i = cuda.grid(1) # Get thread index if i < result.size: result[i] = a[i] + b[i] # Launch on GPU threads_per_block = 256 blocks = (len(arr) + threads_per_block - 1) // threads_per_block gpu_add[blocks, threads_per_block](a_gpu, b_gpu, result_gpu) # ── When to use Numba vs NumPy ─────────────────────────────── # NumPy: vectorized ops, built-in functions, array manipulation # Numba: custom loops that can't be vectorized, numerical algorithms, # physics simulations, anything with complex loop-level logic
Cython & Mypyc
# ── Cython — annotated Python compiled to C ────────────────── # File: fast_math.pyx # Pure Python (slow): def sum_squares_py(n): total = 0 for i in range(n): total += i * i return total # Cython (fast_math.pyx — ~50-100x faster): # def sum_squares_cy(int n): # C int, not Python int # cdef long long total = 0 # C variable, no boxing # cdef int i # C loop variable # for i in range(n): # total += i * i # return total # Build (setup.py): # from setuptools import setup # from Cython.Build import cythonize # setup(ext_modules=cythonize("fast_math.pyx", language_level=3)) # python setup.py build_ext --inplace # ── mypyc — compile typed Python to C extension ────────────── # pip install mypy # mypyc fast_math.py ← compiles to .so / .pyd # Your mypy-typed Python becomes a compiled extension: def sum_squares(n: int) -> int: # Types used by mypyc for optimization total = 0 for i in range(n): total += i * i return total # Compiled: 2-4x speedup on typical code, up to 10x on tight loops # No source changes needed — just compile! # Used by: Black formatter (6x faster after mypyc compilation)
ctypes — Call C Libraries
import ctypes from ctypes import c_int, c_double, c_char_p, POINTER, Structure # ── Load a shared library ───────────────────────────────────── libc = ctypes.CDLL("libc.so.6") # Linux # libc = ctypes.CDLL("libc.dylib") # macOS # libc = ctypes.CDLL("msvcrt.dll") # Windows # ── Call C functions ────────────────────────────────────────── libc.printf.argtypes = [c_char_p] libc.printf.restype = c_int libc.printf(b"Hello from C!\n") # ── Type mapping: Python → C ────────────────────────────────── # c_int → int # c_double → double # c_char_p → char* (bytes) # c_void_p → void* # POINTER(T) → T* # ctypes.c_bool → _Bool # ── Call custom .so library ─────────────────────────────────── libmath = ctypes.CDLL("./libmath.so") libmath.fast_sqrt.argtypes = [c_double] libmath.fast_sqrt.restype = c_double result = libmath.fast_sqrt(ctypes.c_double(16.0)) # ── C structures ───────────────────────────────────────────── class Point(Structure): _fields_ = [("x", c_double), ("y", c_double)] libmath.distance.argtypes = [POINTER(Point), POINTER(Point)] libmath.distance.restype = c_double p1 = Point(0.0, 0.0) p2 = Point(3.0, 4.0) dist = libmath.distance(ctypes.byref(p1), ctypes.byref(p2)) # 5.0 # ── Pass numpy arrays to C ──────────────────────────────────── import numpy as np arr = np.zeros(1000, dtype=np.float64) libmath.process_array( arr.ctypes.data_as(POINTER(c_double)), ctypes.c_int(len(arr)) )
ExceptionGroup (Python 3.11+)
ExceptionGroup allows a single raise to contain multiple exceptions simultaneously. This is essential for concurrent code (e.g., multiple async tasks all failing at once) and for comprehensive validation (collect all errors, not just the first).
# ── Creating ExceptionGroups ────────────────────────────────── errors = ExceptionGroup("multiple validation errors", [ ValueError("name is required"), TypeError("age must be an integer"), ValueError("email format invalid"), ]) raise errors # ── except* — handle multiple exceptions in groups ──────────── try: raise ExceptionGroup("task errors", [ ValueError("bad value"), TypeError("bad type"), RuntimeError("runtime error"), ]) except* ValueError as eg: # eg.exceptions = (ValueError("bad value"),) print(f"ValueError(s): {eg.exceptions}") except* TypeError as eg: print(f"TypeError(s): {eg.exceptions}") # RuntimeError is NOT caught — re-raised as ExceptionGroup(RuntimeError) # ── Real world: async task failure collection ───────────────── async def run_tasks(tasks): errors = [] results = [] async with asyncio.TaskGroup() as tg: for task in tasks: tg.create_task(task()) # TaskGroup automatically raises ExceptionGroup if any task fails! try: await run_tasks(my_tasks) except* ConnectionError as eg: for exc in eg.exceptions: print(f"Connection failed: {exc}") except* TimeoutError as eg: print(f"{len(eg.exceptions)} tasks timed out") # ── Validation example — collect ALL errors ─────────────────── def validate_form(data: dict) -> None: exceptions = [] if not data.get("name"): exceptions.append(ValueError("name is required")) if not isinstance(data.get("age"), int): exceptions.append(TypeError("age must be int")) if "@" not in data.get("email", ""): exceptions.append(ValueError("invalid email")) if exceptions: raise ExceptionGroup("Validation failed", exceptions)
traceback Module
import traceback, sys # ── Exception chaining ──────────────────────────────────────── try: int("not a number") except ValueError as e: raise RuntimeError("Failed to parse config") from e # Shows: ValueError, then "The above exception was the direct cause of..." try: int("not a number") except ValueError: raise RuntimeError("Failed to parse config") from None # 'from None' suppresses original exception context # ── Working with tracebacks ──────────────────────────────────── try: 1 / 0 except Exception: tb = sys.exc_info()[2] # Traceback object traceback.print_exc() # Print to stderr text = traceback.format_exc() # Get as string frames = traceback.extract_tb(tb) # StackSummary for frame in frames: print(f"{frame.filename}:{frame.lineno} in {frame.name}") print(f" {frame.line}") # ── __tracebackhide__ — hide internal frames from pytest ─────── def assert_positive(value: int): __tracebackhide__ = True # pytest hides this frame in output if value <= 0: raise AssertionError(f"Expected positive, got {value}") # ── Rich tracebacks (pip install rich) ──────────────────────── from rich.traceback import install install(show_locals=True) # Beautiful syntax-highlighted tracebacks
Logging Architecture
import logging from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler # ── Logger hierarchy ────────────────────────────────────────── # root logger → myapp → myapp.services → myapp.services.db # Child loggers propagate to parents by default logger = logging.getLogger("myapp.services.db") # ── Production logging config ───────────────────────────────── def configure_logging(level: str = "INFO", json_output: bool = False): root = logging.getLogger() root.setLevel(level) # Console handler console = logging.StreamHandler() console.setLevel(level) if json_output: # JSON structured logs for production import json class JSONFormatter(logging.Formatter): def format(self, record: logging.LogRecord) -> str: log = { "ts": self.formatTime(record), "lvl": record.levelname, "msg": record.getMessage(), "mod": record.module, "exc": self.formatException(record.exc_info) if record.exc_info else None, } # Include extra fields from LogRecord for key in ("request_id", "user_id", "duration_ms"): if hasattr(record, key): log[key] = getattr(record, key) return json.dumps({k: v for k, v in log.items() if v is not None}) console.setFormatter(JSONFormatter()) else: fmt = logging.Formatter("%(asctime)s %(name)-20s %(levelname)-8s %(message)s") console.setFormatter(fmt) # File handler with rotation file_handler = RotatingFileHandler( "app.log", maxBytes=10*1024*1024, backupCount=5 ) root.addHandler(console) root.addHandler(file_handler) # ── Contextual logging with extra fields ────────────────────── logger.info("User login", extra={"user_id": 42, "duration_ms": 120}) # ── Log adapter for per-request context ─────────────────────── class RequestAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): ctx = {"request_id": request_id.get(), "user": current_user.get()} kwargs.setdefault("extra", {}).update(ctx) return msg, kwargs log = RequestAdapter(logger, {}) log.info("Processing order") # Includes request_id and user automatically
structlog — Structured Logging
# pip install structlog import structlog # ── Configure structlog ────────────────────────────────────── structlog.configure( processors=[ structlog.contextvars.merge_contextvars, # Add context vars structlog.processors.add_log_level, # Add level field structlog.processors.TimeStamper(fmt="iso"), # ISO timestamp structlog.dev.ConsoleRenderer() # Pretty in dev # structlog.processors.JSONRenderer() # JSON in prod ], wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG), context_class=dict, logger_factory=structlog.PrintLoggerFactory(), ) log = structlog.get_logger() # ── Bind context to logger ──────────────────────────────────── request_log = log.bind(request_id="abc-123", user_id=42) request_log.info("Request started", method="POST", path="/api/orders") # Output: {"event": "Request started", "request_id": "abc-123", # "user_id": 42, "method": "POST", "path": "/api/orders", # "level": "info", "timestamp": "2025-01-01T..."} # ── contextvars integration (async-safe) ────────────────────── structlog.contextvars.bind_contextvars(request_id="req-456") # All subsequent log calls in this async context include request_id! log.info("Processing") # Automatically includes request_id structlog.contextvars.unbind_contextvars("request_id")
OpenTelemetry — Distributed Tracing
# pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp from opentelemetry import trace, metrics from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter # ── Setup tracing ───────────────────────────────────────────── provider = TracerProvider() processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://jaeger:4317")) provider.add_span_processor(processor) trace.set_tracer_provider(provider) tracer = trace.get_tracer("myapp", "1.0.0") # ── Create spans ───────────────────────────────────────────── async def process_order(order_id: str): with tracer.start_as_current_span("process_order") as span: span.set_attribute("order.id", order_id) span.set_attribute("service.name", "order-service") with tracer.start_as_current_span("validate_payment") as child: result = await validate_payment(order_id) child.set_attribute("payment.valid", result) with tracer.start_as_current_span("save_to_db"): await save_order(order_id) # ── Automatic instrumentation ───────────────────────────────── # pip install opentelemetry-instrumentation-fastapi # pip install opentelemetry-instrumentation-sqlalchemy from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor FastAPIInstrumentor.instrument_app(app) # Auto-traces all routes SQLAlchemyInstrumentor().instrument() # Auto-traces DB queries
Advanced match/case
from dataclasses import dataclass @dataclass class Point: x: float y: float @dataclass class Circle: center: Point radius: float @dataclass class Rectangle: top_left: Point width: float height: float # ── Class patterns — match against dataclass fields ─────────── def describe_shape(shape) -> str: match shape: case Circle(center=Point(x=0, y=0), radius=r): return f"Circle of radius {r} centered at origin" case Circle(center=Point(x=cx, y=cy), radius=r): return f"Circle of radius {r} at ({cx},{cy})" case Rectangle(width=w, height=h) if w == h: return f"Square of side {w}" # guard clause case Rectangle(width=w, height=h): return f"Rectangle {w}×{h}" case _: return "Unknown shape" # ── Sequence patterns ───────────────────────────────────────── def process_command(tokens: list[str]) -> str: match tokens: case []: return "empty" case ["quit"] | ["exit"]: return "exit" case ["get", resource]: return f"GET {resource}" case ["set", key, value]: return f"SET {key}={value}" case ["set", *_]: return "set requires key and value" case [cmd, *args]: return f"Unknown command: {cmd} with {args}" # ── Mapping patterns ────────────────────────────────────────── def handle_event(event: dict) -> None: match event: case {"type": "click", "x": x, "y": y}: print(f"Click at ({x}, {y})") case {"type": "keydown", "key": ("Ctrl" | "Meta") as mod, "code": code}: print(f"{mod}+{code} pressed") case {"type": event_type, **rest}: # ** captures remaining keys print(f"Other event: {event_type}, data={rest}") # ── Or patterns + guards ───────────────────────────────────── match status_code: case 200 | 201 | 204: print("Success") case code if 400 <= code < 500: print(f"Client error: {code}") case code if 500 <= code < 600: print(f"Server error: {code}")
Hypothesis — Property-Based Testing
Instead of writing specific test cases, Hypothesis generates hundreds of random inputs to find edge cases you didn't think of. It then shrinks failing cases to the minimal reproducing example.
# pip install hypothesis from hypothesis import given, settings, assume, example from hypothesis import strategies as st # ── Basic property test ─────────────────────────────────────── @given(st.lists(st.integers())) def test_sort_preserves_length(lst): """Sort should never change the list length.""" assert len(sorted(lst)) == len(lst) # ── Multiple strategies ─────────────────────────────────────── @given( st.text(min_size=1), st.integers(min_value=1, max_value=100) ) def test_string_repeat(s, n): result = s * n assert len(result) == len(s) * n # ── assume() — filter invalid inputs ───────────────────────── @given(st.floats(), st.floats()) def test_add_commutative(a, b): assume(not (math.isnan(a) or math.isnan(b) or math.isinf(a) or math.isinf(b))) assert a + b == b + a # ── Custom strategies ───────────────────────────────────────── valid_email = st.from_regex(r"[a-z]+@[a-z]+\.[a-z]{2,}", fullmatch=True) positive_int = st.integers(min_value=1) # Composite strategy @st.composite def user_data(draw): name = draw(st.text(alphabet=st.characters(whitelist_categories=("Lu", "Ll")), min_size=2)) age = draw(st.integers(min_value=18, max_value=120)) email = draw(valid_email) return {"name": name, "age": age, "email": email} @given(user_data()) def test_user_creation(user): created = create_user(**user) assert created["name"] == user["name"] # ── Force-run specific examples (regression tests) ─────────── @given(st.integers()) @example(0) # Always test this case @example(-1) # Always test this edge case def test_absolute(n): assert abs(n) >= 0 # ── Settings — tune how many examples ──────────────────────── @settings(max_examples=1000, deadline=5000) @given(st.text()) def test_intensive(s): ...
pytest Advanced
import pytest from typing import Generator # ── Fixture scopes ──────────────────────────────────────────── @pytest.fixture(scope="session") # Created once for entire test session def db_engine(): engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) yield engine engine.dispose() @pytest.fixture(scope="function") # Default — new instance per test def db_session(db_engine) -> Generator: conn = db_engine.connect() trans = conn.begin() session = Session(bind=conn) yield session session.close() trans.rollback() # Rollback after each test — DB clean conn.close() # ── autouse fixtures ────────────────────────────────────────── @pytest.fixture(autouse=True) # Applied to ALL tests automatically def clear_cache(): cache.clear() yield # ── parametrize with IDs ────────────────────────────────────── @pytest.mark.parametrize( "input, expected", [ pytest.param("", None, id="empty-string"), pytest.param("abc", "abc", id="simple-string"), pytest.param(" x ", "x", id="with-whitespace"), pytest.param(None, None, id="none-input", marks=pytest.mark.xfail(reason="not yet implemented")), ] ) def test_clean_string(input, expected): assert clean(input) == expected # ── Custom markers ──────────────────────────────────────────── @pytest.mark.slow @pytest.mark.integration def test_full_pipeline(): ... # pytest.ini or pyproject.toml: # [tool.pytest.ini_options] # markers = ["slow: slow tests", "integration: requires external services"] # Run: pytest -m "not slow" or pytest -m "integration" # ── conftest.py — shared fixtures without import ────────────── # Place conftest.py in test root — fixtures auto-available to all tests # Can stack conftest.py files in subdirectories # ── pytest plugins ─────────────────────────────────────────── # pytest-xdist — parallel test execution: pytest -n auto # pytest-cov — coverage: pytest --cov=myapp --cov-report=html # pytest-asyncio — async test support # pytest-mock — mocker fixture # pytest-benchmark — benchmarks in tests # pytest-snapshot — snapshot testing
Mock Deep Dive
from unittest.mock import Mock, MagicMock, patch, AsyncMock, call, sentinel # ── spec — mock with type safety ────────────────────────────── from myapp.services import UserService mock_svc = Mock(spec=UserService) # mock_svc.non_existent_method() # AttributeError — spec prevents this! mock_svc.get_user.return_value = {"id": 1, "name": "Alice"} mock_svc.get_user(1) # Returns {"id": 1, ...} # ── side_effect — dynamic return values ────────────────────── mock = Mock() mock.side_effect = [1, 2, 3] # Returns 1, then 2, then 3 mock.side_effect = ValueError("DB down") # Always raises mock.side_effect = lambda x: x * 2 # Call a function # ── patch — replace real objects during test ───────────────── with patch("myapp.services.send_email") as mock_email: process_order(123) mock_email.assert_called_once_with( to="user@example.com", subject="Order confirmed" ) # As decorator @patch("myapp.db.Session") @patch("myapp.services.EmailService.send") def test_checkout(mock_send, mock_session): mock_session.return_value.__enter__.return_value = mock_session checkout(cart={}) mock_send.assert_called_once() # ── AsyncMock — mock async functions ────────────────────────── async_mock = AsyncMock(return_value="async result") result = await async_mock() # "async result" with patch("myapp.client.fetch", new_callable=AsyncMock) as mock_fetch: mock_fetch.return_value = {"data": "test"} result = await my_async_function() # ── Assert call history ─────────────────────────────────────── mock = Mock() mock(1, 2) mock(key="value") mock.call_count # 2 mock.call_args_list # [call(1, 2), call(key="value")] mock.assert_any_call(1, 2) mock.assert_called_with(key="value") # Last call mock.assert_has_calls([call(1, 2), call(key="value")])
Security Practices
import secrets, hashlib, hmac, os, subprocess # ── secrets — cryptographically secure random ──────────────── token = secrets.token_hex(32) # 64-char hex token (256-bit) url_token = secrets.token_urlsafe(32) # URL-safe base64 token api_key = secrets.token_bytes(32) # Raw bytes # Constant-time comparison (prevents timing attacks) secrets.compare_digest(token1, token2) # Use this, NOT == # ── subprocess — avoid shell injection ─────────────────────── # NEVER: subprocess.run(f"ls {user_input}", shell=True) ❌ # Safe: pass as list, shell=False filename = "file.txt" result = subprocess.run( ["ls", "-la", filename], # List form — no shell interpretation capture_output=True, text=True, timeout=10, check=True, # Raises CalledProcessError on non-zero exit ) # ── SQL injection prevention ────────────────────────────────── # NEVER: cursor.execute(f"SELECT * FROM users WHERE name='{name}'") ❌ # Safe: use parameterized queries cursor.execute("SELECT * FROM users WHERE name = ?", (name,)) # ✅ # SQLAlchemy ORM is safe by default — use it instead of raw SQL # ── File path traversal prevention ────────────────────────── from pathlib import Path import os def safe_read(base_dir: str, user_filename: str) -> bytes: base = Path(base_dir).resolve() target = (base / user_filename).resolve() # Ensure target is within base_dir — prevents ../../etc/passwd target.relative_to(base) # Raises ValueError if outside base return target.read_bytes() # ── Environment variable secrets ───────────────────────────── # NEVER commit secrets to source. Use: # os.environ.get("SECRET_KEY") — fail silently (bad) # os.environ["SECRET_KEY"] — fail explicitly (ok) # Or python-dotenv for development from dotenv import load_dotenv load_dotenv() # Loads .env file — NEVER commit .env! secret = os.environ["SECRET_KEY"]
Cryptography
import hashlib, hmac, secrets # pip install cryptography from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.asymmetric import rsa, padding import base64 # ── Password hashing (use bcrypt or Argon2, not SHA) ────────── # pip install bcrypt import bcrypt password = b"correct horse battery staple" hashed = bcrypt.hashpw(password, bcrypt.gensalt(rounds=12)) bcrypt.checkpw(password, hashed) # True — constant time # ── HMAC — message authentication ──────────────────────────── secret_key = secrets.token_bytes(32) message = b"important data" mac = hmac.new(secret_key, message, hashlib.sha256).hexdigest() # Verify: expected = hmac.new(secret_key, received_message, hashlib.sha256).hexdigest() secrets.compare_digest(mac, expected) # Timing-safe comparison # ── Fernet — symmetric encryption (AES-128-CBC + HMAC-SHA256) ── key = Fernet.generate_key() # 32 bytes, URL-safe base64 fernet = Fernet(key) ciphertext = fernet.encrypt(b"secret message") plaintext = fernet.decrypt(ciphertext) # b"secret message" # Token with expiry token = fernet.encrypt(b"user_id:42") fernet.decrypt(token, max_age=3600) # Expires after 1 hour # ── PBKDF2 — derive encryption key from password ───────────── kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=secrets.token_bytes(16), # Store this with ciphertext! iterations=480000, # OWASP 2023 recommendation ) key = base64.urlsafe_b64encode(kdf.derive(b"user_password")) # ── RSA — asymmetric encryption ─────────────────────────────── private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) public_key = private_key.public_key() ciphertext = public_key.encrypt( b"secret", padding.OAEP(mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None) ) plaintext = private_key.decrypt(ciphertext, padding.OAEP(...))
Packaging & Distribution
[build-system] requires = ["hatchling"] # Build backend (alt: flit, setuptools, pdm) build-backend = "hatchling.build" [project] name = "my-awesome-lib" version = "1.2.0" requires-python = ">=3.10" description = "A useful library" readme = "README.md" license = {text = "MIT"} authors = [{name = "Alice Dev", email = "alice@example.com"}] keywords = ["utility", "python"] classifiers = [ "Development Status :: 5 - Production/Stable", "Programming Language :: Python :: 3.10", "License :: OSI Approved :: MIT License", ] dependencies = [ # Runtime deps (like NuGet package deps) "httpx>=0.26.0", "pydantic>=2.0,<3.0", ] [project.optional-dependencies] # pip install mylib[dev,docs] dev = ["pytest", "ruff", "mypy"] docs = ["mkdocs", "mkdocstrings"] [project.scripts] # CLI entry points mylib-cli = "mylib.cli:main" [project.entry-points."mylib.plugins"] # Plugin registration csv = "mylib.plugins.csv:CSVPlugin" [project.urls] Homepage = "https://github.com/alice/mylib" Documentation = "https://mylib.readthedocs.io" Changelog = "https://github.com/alice/mylib/CHANGELOG.md"
# Install build tools pip install build twine # Build wheel + sdist python -m build # dist/ # my_awesome_lib-1.2.0-py3-none-any.whl # my_awesome_lib-1.2.0.tar.gz # Inspect wheel contents unzip -l dist/my_awesome_lib-*.whl # Upload to TestPyPI first twine upload --repository testpypi dist/* pip install --index-url https://test.pypi.org/simple/ my-awesome-lib # Upload to PyPI twine upload dist/* # Verify installation pip install my-awesome-lib python -c "import mylib; print(mylib.__version__)" # Trusted publisher (no API key) via GitHub Actions + PyPI OIDC # pyproject.toml sets up the project, then: # - push tag v1.2.0 # - GitHub Actions builds and publishes with short-lived OIDC token
Pythonic Design Patterns
# ── Observer pattern — event system ────────────────────────── from collections import defaultdict from typing import Callable, Any class EventBus: def __init__(self): self._listeners: dict[str, list[Callable]] = defaultdict(list) def on(self, event: str, handler: Callable) -> Callable: self._listeners[event].append(handler) return handler # Enable use as decorator async def emit(self, event: str, **data: Any) -> None: for handler in self._listeners.get(event, []): result = handler(**data) if asyncio.iscoroutine(result): await result bus = EventBus() @bus.on("order.created") async def send_confirmation(order_id, user_email, **_): await email_service.send(user_email, f"Order {order_id} confirmed") @bus.on("order.created") def update_inventory(order_id, items, **_): inventory.reserve(order_id, items) await bus.emit("order.created", order_id=42, user_email="a@b.com", items=[]) # ── Command pattern — undo/redo queue ───────────────────────── from abc import ABC, abstractmethod from collections import deque class Command(ABC): @abstractmethod def execute(self) -> None: ... @abstractmethod def undo(self) -> None: ... class CommandHistory: def __init__(self, max_size: int = 50): self._history: deque[Command] = deque(maxlen=max_size) def execute(self, cmd: Command) -> None: cmd.execute() self._history.append(cmd) def undo(self) -> None: if self._history: self._history.pop().undo() # ── Chain of Responsibility — middleware chain ───────────────── class Handler(ABC): def __init__(self): self._next: Handler | None = None def set_next(self, handler: "Handler") -> "Handler": self._next = handler return handler # Enable chaining: auth.set_next(rate).set_next(cache) @abstractmethod def handle(self, request: dict) -> dict | None: ... def _pass_to_next(self, request): if self._next: return self._next.handle(request) return None # ── Builder pattern — fluent interface ──────────────────────── class QueryBuilder: def __init__(self, table: str): self._table = table self._conditions: list[str] = [] self._limit: int | None = None self._order: str | None = None def where(self, cond: str) -> "QueryBuilder": self._conditions.append(cond); return self def limit(self, n: int) -> "QueryBuilder": self._limit = n; return self def order_by(self, col: str) -> "QueryBuilder": self._order = col; return self def build(self) -> str: sql = f"SELECT * FROM {self._table}" if self._conditions: sql += " WHERE " + " AND ".join(self._conditions) if self._order: sql += f" ORDER BY {self._order}" if self._limit: sql += f" LIMIT {self._limit}" return sql query = ( QueryBuilder("users") .where("active = 1") .where("age > 18") .order_by("name") .limit(50) .build() )
Expert Cheat Sheet
CPython Internals Quick Reference
| Topic | Tool/API | Use For |
|---|---|---|
| Bytecode inspection | dis.dis(func) | Understand why code is fast/slow |
| Memory tracking | tracemalloc.start() / .take_snapshot() | Find memory leaks, measure allocation |
| Reference counting | sys.getrefcount(obj) | Debug GC issues, understand lifetime |
| Object interning | sys.intern(s) | Share identical string objects |
| Import hooks | sys.meta_path.insert(0, finder) | Custom module loading |
| AST inspection | ast.parse() + NodeVisitor | Code analysis, linters, formatters |
| AST transformation | NodeTransformer + compile() | Macros, optimizers, DSLs |
| GC control | gc.disable() / gc.collect() | Performance tuning, cycle detection |
Type System at Expert Level
| Construct | Purpose |
|---|---|
TypeVar("T", bound=X) | Generic constrained to X or subclasses |
ParamSpec("P") | Preserve callable signature through decorators |
TypeVarTuple("Ts") | Variadic generics (tuple of arbitrary types) |
Annotated[T, metadata] | Attach runtime metadata to type hints |
Literal["a", "b"] | Restrict to exact values |
Final[T] | Constant — cannot be reassigned |
LiteralString | Only string literals — prevents injection |
Never | Function that never returns |
Self | Return type = same type as class |
TypeGuard[T] | Narrow type when function returns True |
@overload | Multiple typed signatures for one function |
assert_never(x) | Exhaustiveness checking at type-check time |
Performance Decision Tree
| Problem | Solution |
|---|---|
| Algorithmic bottleneck | Fix the algorithm first — no optimization can fix O(n²) → O(n) |
| Python loop over numbers | NumPy vectorization — 100–1000x speedup |
| Custom numerical algorithm | Numba @njit — C speed, write Python |
| Tight loop, many small objects | __slots__ + local variable caching + set for membership |
| Need to call C library | ctypes (no compile) or cffi (better for complex APIs) |
| Whole module needs C speed | Cython .pyx or mypyc for typed Python |
| I/O bound concurrency | asyncio + anyio for 10K+ concurrent connections |
| CPU bound parallelism | ProcessPoolExecutor / multiprocessing (bypasses GIL) |