Python 201 — Advanced Internals Handbook
V1
Back to handbooks index
Advanced Internals
Python 3.12+ // series 7 of 8
handbook series · vol 7

Py201 Advanced Python Internals & Concurrency

Go beneath the surface. Master the data model, descriptor protocol, metaclasses, generator machinery, the GIL, and full async concurrency. Written for engineers who already know Python and want to understand how it works.

Data Model Descriptors Metaclasses asyncio Concurrency Generators functools itertools
01

The Python Data Model

The Python data model is the system of special (dunder) methods that let your objects participate in the language itself — operators, iteration, attribute access, context managers, and more. Understanding it unlocks idiomatic Python.

▸ Core Concept

When Python evaluates a + b, it does not call a built-in function; it calls a.__add__(b). Every language feature maps to one or more special methods. Your objects can implement any of them.

Object Lifecycle

__new__(cls, *args, **kwargs)
TRIGGER: ClassName(...) — before __init__

Allocates and returns the new object. Must return an instance of cls (or a different type). Rarely overridden — use for immutable types (subclassing int, str, tuple) or singleton patterns.

python
class Singleton:
    _instance = None

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            # super().__new__ actually allocates memory
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self, value):
        self.value = value  # called even on re-use!

a = Singleton(1)
b = Singleton(2)
print(a is b)       # True — same object
print(a.value)       # 2  — __init__ ran again
__init__(self, *args, **kwargs)
TRIGGER: After __new__ returns an instance of cls

Initialises the already-created object. Must return None. This is where attributes are set.

__del__(self)
TRIGGER: When reference count reaches zero (CPython) or GC collects

Finaliser — not a destructor. Timing is not guaranteed (especially with cycles). Prefer context managers. Never raises exceptions from here.

__call__(self, *args, **kwargs)
TRIGGER: instance() — calling an object like a function

Makes any object callable. Used extensively in decorators, neural network modules, and function-like classes.

python
class Multiplier:
    def __init__(self, factor):
        self.factor = factor

    def __call__(self, x):
        return self.factor * x

triple = Multiplier(3)
print(triple(10))   # 30
callable(triple)    # True
__repr__(self) / __str__(self)
repr(obj) / str(obj) / f"{obj}" / print(obj)

__repr__ should return an unambiguous string, ideally one that recreates the object. __str__ returns a readable string. print() uses __str__, falling back to __repr__.

__bool__(self) / __len__(self)
bool(obj) / if obj: / while obj:

Truth testing calls __bool__. If absent, calls __len__ — zero means false. If neither, objects are always truthy.

__hash__(self)
hash(obj) / using as dict key or set member

If you define __eq__, Python sets __hash__ = None (unhashable). To keep hashability, also define __hash__. Objects that compare equal must have equal hashes.

Attribute Access Dunders

Python's attribute system is rich and layered. Understanding the lookup order is essential for descriptors and metaclasses.

attribute lookup order for obj.name ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1. type(obj).__mro__ → look for DATA descriptor in class hierarchy 2. obj.__dict__ → instance dictionary 3. type(obj).__mro__ → look for NON-DATA descriptor or plain class attr 4. AttributeError data descriptor = defines __set__ or __delete__ non-data descriptor = defines __get__ only (e.g., functions/methods)
__getattr__(self, name)
ONLY when normal lookup fails (AttributeError path)

Fallback for missing attributes. Great for lazy loading, proxy objects, or DSL attribute chaining. Avoid infinite recursion by not accessing undefined attrs inside it.

__getattribute__(self, name)
EVERY attribute access — obj.anything

Called for every attribute access. Override with extreme care. Must call super().__getattribute__(name) for default behaviour. Easy to create infinite recursion.

python
class AccessLog:
    def __getattribute__(self, name):
        # intercept every access
        print(f"getting '{name}'")
        # MUST delegate to super — NOT self.name (infinite loop!)
        return super().__getattribute__(name)

    def __getattr__(self, name):
        # only called when attr genuinely missing
        return f"<dynamic:{name}>"

obj = AccessLog()
obj.x    # → getting 'x' → "<dynamic:x>"
__setattr__(self, name, value)
obj.name = value — every attribute assignment

Intercepts all attribute writes. Use object.__setattr__(self, name, value) inside to avoid recursion.

__delattr__(self, name)
del obj.name

Intercepts attribute deletion. Same recursion caveat applies.

Container Protocol

DunderTriggered byNotes
__len__len(obj)Return non-negative int
__getitem__obj[key]Support slices via isinstance(key, slice)
__setitem__obj[key] = val
__delitem__del obj[key]
__contains__x in objFalls back to __iter__ linear scan
__iter__for x in obj / iter(obj)Return iterator (object with __next__)
__next__next(iterator)Raise StopIteration when exhausted
__reversed__reversed(obj)Falls back to __len__ + __getitem__
__missing__self[key] KeyError in dict subclassdict subclasses only
python
class RingBuffer:
    def __init__(self, size):
        self._data = [None] * size
        self._size = size
        self._head = 0

    def __len__(self):
        return self._size

    def __getitem__(self, idx):
        return self._data[(self._head + idx) % self._size]

    def __setitem__(self, idx, val):
        self._data[(self._head + idx) % self._size] = val

    def __iter__(self):
        for i in range(self._size):
            yield self[i]

    def __contains__(self, item):
        return item in self._data

Arithmetic Operators

OpForwardReflectedIn-place
+__add____radd____iadd__
-__sub____rsub____isub__
*__mul____rmul____imul__
/__truediv____rtruediv____itruediv__
//__floordiv____rfloordiv____ifloordiv__
%__mod____rmod____imod__
**__pow____rpow____ipow__
&__and____rand____iand__
|__or____ror____ior__
^__xor____rxor____ixor__
<<__lshift____rlshift____ilshift__
>>__rshift____rrshift____irshift__
@__matmul____rmatmul____imatmul__
-x__neg__
+x__pos__
~x__invert__
abs()__abs__
▸ Reflected Dispatch Rule

For a + b: Python first tries a.__add__(b). If it returns NotImplemented, Python tries b.__radd__(a). If that also fails, raises TypeError. In-place operators (+=) should return self for mutables or a new object for immutables.

python
from functools import total_ordering

@total_ordering  # implement __eq__ + one of lt/le/gt/ge → get all 6
class Vector:
    def __init__(self, x, y):
        self.x, self.y = x, y

    def __add__(self, other):
        if not isinstance(other, Vector):
            return NotImplemented  # triggers __radd__
        return Vector(self.x + other.x, self.y + other.y)

    def __mul__(self, scalar):
        return Vector(self.x * scalar, self.y * scalar)

    __rmul__ = __mul__  # 3 * v  works too

    def __iadd__(self, other):
        self.x += other.x
        self.y += other.y
        return self  # mutate + return self

    def __eq__(self, other):
        return (self.x, self.y) == (other.x, other.y)

    def __lt__(self, other):
        return (self.x**2 + self.y**2) < (other.x**2 + other.y**2)

    def __repr__(self):
        return f"Vector({self.x}, {self.y})"

__slots__

__slots__ replaces the per-instance __dict__ with a fixed set of slot descriptors stored in the class, significantly reducing memory overhead and slightly speeding up attribute access.

▸ With __slots__

~56 bytes/instance (CPython 3.12)
No __dict__ overhead
Attribute names fixed at class creation
Can't add arbitrary attrs at runtime

▸ Without __slots__

~248 bytes/instance minimum
__dict__ is a hash map per object
Fully dynamic — add any attr
Better for exploratory / generic code

python
class Point:
    __slots__ = ('x', 'y')  # tuple of allowed attr names

    def __init__(self, x, y):
        self.x = x  # uses slot descriptor, not __dict__
        self.y = y

p = Point(1, 2)
p.z = 3   # AttributeError — no __dict__, no slot for 'z'

# Inheritance rules:
class Point3D(Point):
    __slots__ = ('z',)  # only add NEW slots
    # if you omit __slots__ here, a __dict__ is added back!

# Allow __weakref__ support:
class WeakPoint:
    __slots__ = ('x', 'y', '__weakref__')
02

Function Internals

Python functions are first-class objects — instances of function type with a rich set of attributes. Understanding the function object and call conventions unlocks advanced metaprogramming.

Function Object Anatomy

function object attributes (fn.__xxx__) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ __name__ str function name __qualname__ str qualified name (e.g. 'Outer.inner') __doc__ str docstring __module__ str defining module name __globals__ dict global namespace of defining module __closure__ tuple cell objects holding free variables __code__ code compiled bytecode object __defaults__ tuple default values for positional params __kwdefaults__ dict default values for keyword-only params __annotations__ dict type annotations (param + return) __dict__ dict arbitrary attributes set on the function __wrapped__ fn original fn (set by functools.wraps)
python
def make_adder(n):
    def adder(x):
        return x + n   # n is a free variable → stored in closure
    return adder

add5 = make_adder(5)
add5.__closure__          # (<cell at 0x...>,)
add5.__closure__[0].cell_contents   # 5
add5.__code__.co_freevars  # ('n',)

# Mutate a closure cell (CPython hack — educational only):
import ctypes
ctypes.cast(id(add5.__closure__[0]), ctypes.py_object).value.cell_contents = 10
add5(1)  # 11 — closed-over value changed!

Parameter Kinds

Python 3 has five distinct parameter kinds, distinguished by position and syntax.

def fn(pos1, pos2, /, normal, *, kw_only, **kwargs) ↑────↑ ↑ ↑──────↑ ↑ ↑───────↑ ↑──────↑ POSITIONAL_ POSITIONAL_ | KEYWORD_ VAR_ ONLY OR_KEYWORD | ONLY KEYWORD POS_ONLY → before / (can't be passed by name) POS_OR_KW → between / and * (default if neither / nor *) VAR_POS → *args (captures extra positional) KW_ONLY → after * (must be passed by name) VAR_KW → **kwargs (captures extra keyword)
python
def api(x, y, /, z=0, *, verbose=False, **opts):
    pass

# x, y must be positional — name is implementation detail
api(1, 2)               # OK
api(1, 2, z=3)          # OK — z is pos-or-kw
api(1, y=2)             # TypeError — y is positional-only
api(1, 2, verbose=True) # OK — verbose is kw-only

# / in builtin signatures: len(obj) — obj is pos-only
# Why? Allows renaming params in C impl without breaking callers

*args / **kwargs Deep Dive

python
# TypedDict + Unpack for typed **kwargs (Python 3.12+)
from typing import TypedDict, Unpack

class Options(TypedDict, total=False):
    verbose: bool
    timeout: float
    retries: int

def connect(host: str, **opts: Unpack[Options]) -> None:
    # type checker knows opts keys are verbose/timeout/retries
    timeout = opts.get('timeout', 30.0)
    ...

# Spread / unpack at call site
args   = ('example.com',)
kwargs = {'verbose': True, 'timeout': 5.0}
connect(*args, **kwargs)

# Forwarding all args to another function
def wrapper(*args, **kwargs):
    print("before")
    result = original(*args, **kwargs)  # perfect forwarding
    print("after")
    return result

inspect Module

python
import inspect

def greet(name: str, *, loud: bool = False) -> str: ...

sig = inspect.signature(greet)
for name, param in sig.parameters.items():
    print(name, param.kind, param.default, param.annotation)
# name  POSITIONAL_OR_KEYWORD  <empty>  <class 'str'>
# loud  KEYWORD_ONLY           False    <class 'bool'>

# Bind arguments programmatically:
bound = sig.bind("Alice", loud=True)
bound.apply_defaults()
bound.arguments  # OrderedDict([('name','Alice'),('loud',True)])

# Useful predicates:
inspect.isfunction(fn)
inspect.ismethod(obj.method)
inspect.iscoroutinefunction(async_fn)
inspect.isgeneratorfunction(gen_fn)

# Get source code:
inspect.getsource(greet)
inspect.getfile(greet)
03

Descriptor Protocol

Descriptors are the mechanism behind property, classmethods, staticmethods, slots, and ORM fields. Any class that defines __get__, __set__, or __delete__ is a descriptor.

Data Descriptor

Defines __set__ and/or __delete__. Takes priority over instance __dict__. property is a data descriptor.

Non-data Descriptor

Defines __get__ only. Instance __dict__ takes priority. Functions (methods) are non-data descriptors.

python
class TypedAttr:
    """Data descriptor that enforces a type."""
    def __set_name__(self, owner, name):
        # called when class body is executed (Python 3.6+)
        self.public_name  = name
        self.private_name = '_' + name  # store in instance __dict__

    def __init__(self, expected_type):
        self.expected_type = expected_type

    def __get__(self, obj, objtype=None):
        if obj is None:
            return self  # Class.attr → return descriptor itself
        return getattr(obj, self.private_name, None)

    def __set__(self, obj, value):
        if not isinstance(value, self.expected_type):
            raise TypeError(f"{self.public_name} must be {self.expected_type.__name__}")
        setattr(obj, self.private_name, value)

    def __delete__(self, obj):
        delattr(obj, self.private_name)

class Person:
    name = TypedAttr(str)
    age  = TypedAttr(int)

    def __init__(self, name, age):
        self.name = name  # triggers TypedAttr.__set__
        self.age  = age

p = Person("Alice", 30)
p.age = "old"   # TypeError: age must be int
▸ How property is implemented with descriptors

property is itself a data descriptor written in C. When you write @property, you get a property object assigned to the class. Its __get__ calls your getter, __set__ calls your setter (or raises AttributeError if none), and __delete__ calls your deleter.

python
# Pure-Python reimplementation of @property:
class myproperty:
    def __init__(self, fget=None, fset=None, fdel=None):
        self.fget, self.fset, self.fdel = fget, fset, fdel

    def __get__(self, obj, objtype=None):
        if obj is None: return self
        if self.fget is None: raise AttributeError
        return self.fget(obj)

    def __set__(self, obj, value):
        if self.fset is None: raise AttributeError("can't set")
        self.fset(obj, value)

    def setter(self, fset):
        return myproperty(self.fget, fset, self.fdel)

class Temperature:
    @myproperty
    def celsius(self):
        return self._c

    @celsius.setter
    def celsius(self, val):
        self._c = float(val)
04

Metaclasses

A metaclass is the class of a class. Just as a class defines the behaviour of its instances, a metaclass defines the behaviour of its classes. The default metaclass is type.

isinstance(42, int) → True (42 is an instance of int) isinstance(int, type) → True (int is an instance of type) isinstance(type, type) → True (type is its own metaclass) Hierarchy: object ← instance of → type int ← instance of → type MyClass ← instance of → MyMeta (if metaclass=MyMeta) MyMeta ← instance of → type

type() — Three-argument Form

python
# type(name, bases, namespace) creates a class dynamically
Dog = type('Dog', (object,), {
    'sound': 'woof',
    'speak': lambda self: f"{self.sound}!"
})

# Equivalent to:
class Dog:
    sound = 'woof'
    def speak(self): return f"{self.sound}!"

# type.__call__ orchestrates class creation:
# 1. MyMeta.__prepare__(name, bases)  → namespace dict
# 2. Execute class body in namespace
# 3. MyMeta.__new__(meta, name, bases, namespace) → cls
# 4. MyMeta.__init__(cls, name, bases, namespace)
python
class RegistryMeta(type):
    """Metaclass that auto-registers all subclasses."""
    _registry = {}

    def __new__(mcs, name, bases, namespace):
        cls = super().__new__(mcs, name, bases, namespace)
        if bases:  # don't register the base class itself
            mcs._registry[name] = cls
        return cls

    @classmethod
    def get(mcs, name):
        return mcs._registry[name]

class Plugin(metaclass=RegistryMeta):
    pass

class AudioPlugin(Plugin):  # auto-registered
    pass

class VideoPlugin(Plugin):  # auto-registered
    pass

RegistryMeta._registry  # {'AudioPlugin': <class...>, 'VideoPlugin': ...}

__init_subclass__ — Simpler Alternative

For many metaclass use cases, __init_subclass__ (Python 3.6+) is cleaner and avoids the full metaclass machinery.

python
class Plugin:
    _registry = {}

    def __init_subclass__(cls, alias=None, **kwargs):
        super().__init_subclass__(**kwargs)
        name = alias or cls.__name__
        Plugin._registry[name] = cls

class AudioPlugin(Plugin, alias='audio'):
    pass

class VideoPlugin(Plugin):
    pass

Plugin._registry
# {'audio': AudioPlugin, 'VideoPlugin': VideoPlugin}
⚠ Metaclass Conflicts

If two metaclasses in a hierarchy are not subclasses of each other, Python raises a TypeError: metaclass conflict. Resolve by creating a merged metaclass: class Merged(MetaA, MetaB): pass and using it explicitly.

05

Structural Subtyping (Protocols)

Protocol (Python 3.8+) enables structural subtyping — "duck typing with type checker support". A class satisfies a Protocol if it has the required methods/attributes, without inheriting from it.

python
from typing import Protocol, runtime_checkable

@runtime_checkable
class Drawable(Protocol):
    def draw(self) -> None: ...
    def resize(self, factor: float) -> None: ...

class Circle:          # no inheritance!
    def draw(self):    print("○")
    def resize(self, f): self.r *= f

def render(shape: Drawable) -> None:
    shape.draw()

render(Circle())            # type checker ✓, works at runtime
isinstance(Circle(), Drawable)  # True (runtime_checkable)

# Protocol with class variables:
class HasVersion(Protocol):
    version: str  # must be a class-level attribute

# Protocol inheritance for composition:
class DrawableAndSaveable(Drawable, Protocol):
    def save(self, path: str) -> None: ...
06

Dataclasses Deep Dive

The dataclasses module auto-generates boilerplate methods. Understanding all its knobs unlocks elegant, well-typed data classes.

python
from dataclasses import dataclass, field, InitVar, KW_ONLY, replace
from typing import ClassVar

@dataclass(order=True, frozen=False, slots=True)
class Point:
    # Comparison uses (x, y) tuple ordering (order=True)
    x: float = 0.0
    y: float = 0.0
    label: str = field(default='', compare=False, repr=True)
    # field() options: default, default_factory, init, repr, compare, hash, metadata
    tags: list = field(default_factory=list)  # NOT tags=[]!
    total: ClassVar[int] = 0               # excluded from all methods
python
# __post_init__ and InitVar
@dataclass
class Circle:
    radius: float
    unit: InitVar[str] = 'm'   # passed to __init__ but NOT stored

    def __post_init__(self, unit):  # InitVar comes here
        if unit == 'cm':
            self.radius /= 100   # convert to metres

c = Circle(50, unit='cm')   # radius → 0.5

# KW_ONLY sentinel (Python 3.10+)
@dataclass
class Config:
    host: str
    port: int
    _: KW_ONLY              # all fields after this are kw-only
    debug: bool = False
    timeout: float = 30.0

Config("localhost", 8080, debug=True)  # debug must be keyword

# replace() — copy with field overrides (like NamedTuple._replace)
c2 = replace(c, radius=1.0)
print(c is c2)  # False — new object
@dataclass parameterDefaultEffect
initTrueGenerate __init__
reprTrueGenerate __repr__
eqTrueGenerate __eq__
orderFalseGenerate __lt__ etc. (requires eq=True)
frozenFalseImmutable — raises on __setattr__
unsafe_hashFalseForce __hash__ generation
slotsFalseAuto-generate __slots__ (3.10+)
kw_onlyFalseAll fields become keyword-only (3.10+)
match_argsTrueGenerate __match_args__ for pattern matching
07

Advanced Decorators

Optional-argument Decorators

A decorator that works both with and without arguments — @retry and @retry(times=3) — is tricky but elegant when done right.

python
import functools

def retry(fn=None, *, times=3, exceptions=(Exception,)):
    """Works as @retry or @retry(times=5)."""
    if fn is None:
        # Called WITH args: @retry(times=5) → returns real decorator
        return functools.partial(retry, times=times, exceptions=exceptions)

    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        for attempt in range(times):
            try:
                return fn(*args, **kwargs)
            except exceptions as e:
                if attempt == times - 1:
                    raise
                print(f"Retry {attempt+1}/{times}: {e}")
    return wrapper

@retry                    # fn is the function → works
def fetch_data(): ...

@retry(times=5)           # fn is None → returns decorator
def connect(): ...

ParamSpec — Preserving Callable Types

python
from typing import ParamSpec, TypeVar, Callable
import functools

P = ParamSpec('P')  # captures parameter spec
T = TypeVar('T')   # captures return type

def logged(fn: Callable[P, T]) -> Callable[P, T]:
    @functools.wraps(fn)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
        print(f"calling {fn.__name__}")
        result = fn(*args, **kwargs)
        print(f"done")
        return result
    return wrapper

@logged
def greet(name: str, *, loud: bool = False) -> str:
    return name.upper() if loud else name

# Type checker knows greet still accepts (name: str, *, loud: bool)

Class Decorators

Class decorators receive the class object and return a (usually modified) class. Simpler than metaclasses for many use cases.

python
def singleton(cls):
    """Class decorator that converts any class into a singleton."""
    instances = {}
    @functools.wraps(cls, updated=[])  # keep cls name/doc
    class Wrapper(*cls.__bases__):
        def __new__(klass, *a, **kw):
            if cls not in instances:
                instances[cls] = super().__new__(klass)
                cls.__init__(instances[cls], *a, **kw)
            return instances[cls]
    return Wrapper

def add_repr(cls):
    """Auto-generate __repr__ from annotations."""
    fields = list(cls.__annotations__.keys())
    def __repr__(self):
        attrs = ', '.join(f"{f}={getattr(self,f)!r}" for f in fields)
        return f"{cls.__name__}({attrs})"
    cls.__repr__ = __repr__
    return cls

@add_repr
class Config:
    host: str
    port: int
    def __init__(self, host, port):
        self.host, self.port = host, port

functools Toolkit

ToolUse
@lru_cache(maxsize=128)Memoize with LRU eviction. Use None for unlimited.
@cachelru_cache(maxsize=None) — simpler, no eviction
@cached_propertyComputed once, cached in instance __dict__. No locks — not thread-safe.
partial(fn, *args)Freeze positional/keyword arguments
partialmethodLike partial but for descriptors/methods
reduce(fn, iterable)Left-fold / accumulate
@total_orderingFill in comparison methods from __eq__ + one other
@singledispatchFunction overloading based on first arg type
@singledispatchmethodMethod version (3.8+)
wraps(fn)Copy __name__, __doc__, __annotations__, etc.
python
from functools import lru_cache, cached_property, singledispatch

# lru_cache — function must be hashable args
@lru_cache(maxsize=256)
def fib(n: int) -> int:
    if n < 2: return n
    return fib(n-1) + fib(n-2)

fib.cache_info()    # CacheInfo(hits=..., misses=..., maxsize=256, currsize=...)
fib.cache_clear()   # empty the cache

# cached_property
class DataSet:
    def __init__(self, data):
        self._data = data

    @cached_property
    def mean(self):   # computed lazily on first access, then stored
        return sum(self._data) / len(self._data)

# singledispatch
@singledispatch
def process(arg):
    raise NotImplementedError(f"No handler for {type(arg)}")

@process.register(int)
def _(n): return n * 2

@process.register(str)
def _(s): return s.upper()

@process.register(list)
@process.register(tuple)  # stack registers for same handler
def _(seq): return [process(x) for x in seq]
08

itertools Reference

Infinite Iterators

FunctionSignatureOutput
countcount(start=0, step=1)start, start+step, start+2*step …
cyclecycle(iterable)A B C A B C A B C …
repeatrepeat(obj, times=None)obj obj obj … (finite if times set)

Finite / Combinatoric

FunctionUse
chain(*iterables)Concatenate iterables lazily
chain.from_iterable(it)Flatten one level: [[1,2],[3]] → 1 2 3
islice(it, stop) / islice(it, start, stop, step)Lazy slice of iterator
zip_longest(*its, fillvalue=None)Like zip but pads shortest
starmap(fn, iterable)fn(*item) for item in iterable
takewhile(pred, it)Yield until pred is False
dropwhile(pred, it)Skip until pred is False, yield rest
filterfalse(pred, it)Yield items where pred is falsy
compress(data, selectors)Yield data where selector is truthy
groupby(it, key=None)Group consecutive equal-key items
pairwise(it)Overlapping pairs (3.10+): ABCDE → AB BC CD DE
batched(it, n)Fixed-size chunks (3.12+)
accumulate(it, fn=add, initial=None)Running total / prefix sums
product(*its, repeat=1)Cartesian product
permutations(it, r=None)All ordered r-length subsets
combinations(it, r)All unordered r-length subsets (no repeat)
combinations_with_replacement(it, r)Like combinations but with repeats
python
from itertools import groupby, accumulate, batched, pairwise
import operator

# groupby — input must be sorted by key!
data = [('a', 1), ('a', 2), ('b', 3), ('b', 4)]
for key, group in groupby(data, key=lambda x: x[0]):
    print(key, list(group))
# 'a' [('a',1),('a',2)]   'b' [('b',3),('b',4)]

# accumulate — prefix sums, running product, running max
list(accumulate([1,2,3,4]))                        # [1,3,6,10]
list(accumulate([1,2,3,4], operator.mul))         # [1,2,6,24]
list(accumulate([3,1,4,1,5], max))                # [3,3,4,4,5]

# batched (3.12) — strict chunking
list(batched(range(7), 3))
# [(0,1,2), (3,4,5), (6,)]

# pairwise (3.10) — overlapping windows of 2
list(pairwise('ABCDE'))
# [('A','B'), ('B','C'), ('C','D'), ('D','E')]

# Manual sliding window of any size:
from collections import deque
def sliding(it, n):
    win = deque(maxlen=n)
    for x in it:
        win.append(x)
        if len(win) == n: yield tuple(win)
09

collections Module

Counter

python
from collections import Counter

c = Counter("abracadabra")
# Counter({'a':5, 'b':2, 'r':2, 'c':1, 'd':1})

c.most_common(3)          # [('a',5), ('b',2), ('r',2)]
c.total()                  # 11  (3.10+)
c + Counter("abc")        # add counts (drop zeros/negatives)
c - Counter("abc")        # subtract (drop zeros/negatives)
c & Counter("abc")        # intersection (min count)
c | Counter("abc")        # union (max count)
c['z']                    # 0 — missing keys return 0, not KeyError

# Word frequency:
words = "the quick brown fox the fox".split()
Counter(words).most_common() # [('the',2),('fox',2),...]

defaultdict

python
from collections import defaultdict

# Group items without KeyError:
graph = defaultdict(list)
edges = [(1,2), (1,3), (2,3)]
for src, dst in edges:
    graph[src].append(dst)  # no need to check/init

# Nested defaultdict:
nested = defaultdict(lambda: defaultdict(int))
nested['a']['x'] += 1    # no KeyError at either level

# Counter implemented with defaultdict:
freq = defaultdict(int)
for word in words:
    freq[word] += 1

deque

python
from collections import deque

dq = deque([1,2,3], maxlen=5)  # circular buffer when maxlen set
dq.appendleft(0)     # O(1) — unlike list.insert(0, x)
dq.popleft()          # O(1) — unlike list.pop(0)
dq.rotate(2)          # rotate right by 2 steps
dq.rotate(-1)         # rotate left by 1
dq.extendleft([10,11]) # each appended left → reverses order

# Use as a bounded queue (maxlen) — oldest items auto-drop
recent = deque(maxlen=3)
for x in range(6): recent.append(x)
# deque([3,4,5], maxlen=3)

ChainMap

python
from collections import ChainMap

defaults = {'color': 'blue', 'size': 10}
overrides = {'color': 'red'}

cfg = ChainMap(overrides, defaults)
cfg['color']  # 'red'  — first map wins
cfg['size']   # 10    — falls through to defaults

# Writes go to FIRST map only:
cfg['weight'] = 5   # added to overrides, not defaults

# Use for variable scoping (like CPython's implementation):
base = ChainMap({}, cfg)   # new child scope
base.parents               # original ChainMap
base.maps                  # list of all dicts in chain
CollectionBest forKey advantage
OrderedDictOrdered dicts pre-3.7 / reorderingmove_to_end(), popitem(last=True/False)
namedtupleLightweight record typesTuple + named field access
UserDict/UserList/UserStringSubclassing built-ins safelyAvoids C-extension subclass pitfalls

operator Module

Functions corresponding to built-in Python operators — useful for sorted(), functools.reduce(), and higher-order function patterns.

python
import operator

# Arithmetic
operator.add(2, 3)      # 5
operator.mul(2, 3)      # 6

# Attribute / item access (great for key= args)
sorted(people, key=operator.attrgetter('age', 'name'))
sorted(rows,   key=operator.itemgetter(2, 0))   # col 2 then col 0

# Method call
strip_all = map(operator.methodcaller('strip'), strings)

# Comparison
operator.lt(1, 2)   # True
operator.eq('a', 'a')  # True

# Logical
operator.truth(0)   # False
operator.not_(1)    # False

# Use with functools.reduce:
from functools import reduce
reduce(operator.add, [1,2,3,4])   # 10
reduce(operator.mul, [1,2,3,4])   # 24

copy / weakref

python
import copy

a = [[1,2], [3,4]]
b = copy.copy(a)        # shallow: new list, same inner lists
c = copy.deepcopy(a)    # deep: new list AND new inner lists

b[0].append(99)  # a[0] also changes! (same object)
c[0].append(99)  # a[0] unchanged

# Customise deep copy:
class MyObj:
    def __deepcopy__(self, memo):
        new = MyObj()
        memo[id(self)] = new  # important: register before recursion
        # ... copy fields selectively
        return new

# weakref — reference that doesn't prevent GC
import weakref

class Node: pass
node = Node()
ref = weakref.ref(node)  # weak reference
ref()                     # <Node> — alive
del node
ref()                     # None  — collected

# WeakValueDictionary — auto-remove when values are GC'd
cache = weakref.WeakValueDictionary()
cache['key'] = Node()  # will disappear after this line!

# WeakSet and WeakKeyDictionary also exist
10

Generator Protocol

Generators are iterators produced by generator functions (with yield) or generator expressions. They implement the full iterator protocol plus send(), throw(), and close().

Generator State Machine ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ ┌─────────────────┐ │ Created │ ← gen = gen_fn() └────────┬────────┘ │ next(gen) or gen.send(None) ┌────────▼────────┐ ┌───►│ Suspended │◄─── yield value │ └────────┬────────┘ │ next(gen) │ next(gen) │ gen.send(v)│ gen.throw(exc) │ │ gen.close() │ ┌────────▼────────┐ └────│ Running │ └────────┬────────┘ │ return / StopIteration ┌────────▼────────┐ │ Closed │ └─────────────────┘
python
def accumulator():
    """Demonstrates send(), throw(), return value."""
    total = 0
    while True:
        try:
            value = yield total      # suspend; receive sent value
        except GeneratorExit:
            return total              # return value from return
        except ValueError:
            total = 0               # reset on throw(ValueError)
        else:
            total += value

gen = accumulator()
next(gen)           # advance to first yield → 0
gen.send(10)        # send 10 → 10
gen.send(5)         # send 5  → 15
gen.throw(ValueError)  # resets → 0
gen.send(3)         # → 3

# yield from — delegate to sub-generator
def chain_gen(*iterables):
    for it in iterables:
        yield from it   # passes send/throw/close through!

# Generator return value captured by yield from:
def inner():
    yield 1
    return 'done'

def outer():
    result = yield from inner()   # result = 'done'
    print(f"inner returned: {result}")

Native Coroutines

▸ async def vs generator-based coroutines

Before Python 3.5, coroutines were generator functions decorated with @asyncio.coroutine using yield from. Since 3.5, use async def / await. Generator-based coroutines are removed in 3.12.

python
import asyncio

async def fetch(url: str) -> str:
    # awaitable = coroutine | Task | Future
    async with aiohttp.ClientSession() as sess:
        async with sess.get(url) as resp:
            return await resp.text()

# Async iteration:
async def stream_lines(path):
    async with aiofiles.open(path) as f:
        async for line in f:           # __aiter__ / __anext__
            yield line.strip()           # async generator!

# Async context manager protocol:
class AsyncTimer:
    async def __aenter__(self):
        self.start = asyncio.get_event_loop().time()
        return self

    async def __aexit__(self, *exc):
        elapsed = asyncio.get_event_loop().time() - self.start
        print(f"{elapsed:.3f}s")

asyncio Deep Dive

python
import asyncio

# TaskGroup (3.11+) — structured concurrency
async def main():
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(fetch("https://example.com"))
        t2 = tg.create_task(fetch("https://example.org"))
    # both done here; any exception cancels all tasks
    print(t1.result(), t2.result())

# asyncio.shield — prevent cancellation of a subtask
async def safe_save(data):
    try:
        result = await asyncio.shield(write_to_db(data))
    except asyncio.CancelledError:
        print("cancelled — but write_to_db still runs!")
        raise

# timeout (3.11+)
async def with_timeout():
    try:
        async with asyncio.timeout(5.0):
            return await slow_operation()
    except asyncio.TimeoutError:
        print("timed out")

# asyncio.Semaphore — limit concurrency
sem = asyncio.Semaphore(10)
async def rate_limited(url):
    async with sem:     # max 10 concurrent requests
        return await fetch(url)

# run_in_executor — run blocking code in thread pool
async def read_file(path):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,                 # None = default ThreadPoolExecutor
        pathlib.Path(path).read_text
    )

# asyncio.gather vs TaskGroup:
# gather: partial results on failure (return_exceptions=True)
# TaskGroup: all-or-nothing, strict structured concurrency
results = await asyncio.gather(*coros, return_exceptions=True)
asyncio PrimitivePurpose
asyncio.EventNotify multiple waiters (one-shot broadcast)
asyncio.ConditionWait for a condition + Lock combo
asyncio.SemaphoreLimit concurrent access to a resource
asyncio.BoundedSemaphoreSemaphore that errors if released too many times
asyncio.QueueProducer-consumer between coroutines
asyncio.PriorityQueueQueue with priority ordering
asyncio.LockMutual exclusion within event loop
11

The GIL Explained

The Global Interpreter Lock is a mutex in CPython that prevents multiple native threads from executing Python bytecode simultaneously. It is not a language feature — it's a CPython implementation detail.

CPython Thread Execution (with GIL) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Thread 1: ██████ run ██████ (wait) ██████ run ██████ Thread 2: (wait) ██████ run ██████ (wait) ██████ run ↑ ↑ GIL released GIL acquired on I/O or (every 5ms check-interval) sys.check_interval GIL is released for: ✓ I/O operations ✓ time.sleep() ✓ C extension code that releases it (NumPy, etc.) ✓ Every ~5ms (sys.getswitchinterval()) GIL is NOT released for: ✗ Pure Python CPU work → threads don't help for CPU-bound
▸ Use threads for I/O-bound

Network calls, file reads, sleep — GIL released during waiting. 10 threads fetching 10 URLs: near 10x faster.

▸ Use processes for CPU-bound

Image processing, ML inference, number crunching — each process has its own GIL. Use multiprocessing or concurrent.futures.ProcessPoolExecutor.

▸ Python 3.13+ Free-threaded Mode (Experimental)

Python 3.13 introduced an optional free-threaded build (python3.13t) that disables the GIL. Opt-in per module with sys.flags.nogil. Not yet production-stable but signifies the path forward. NumPy and Cython are being updated to support it.

12

Threading

python
import threading

# Basic thread
t = threading.Thread(target=worker, args=(1,), daemon=True)
t.start()
t.join(timeout=5.0)  # wait up to 5s

# Thread-local storage — each thread has its own value
local = threading.local()
def worker():
    local.connection = create_db_connection()  # not shared

# Subclassing Thread:
class Worker(threading.Thread):
    def run(self):
        # runs in new thread
        pass

Synchronisation Primitives

PrimitiveWhen to use
LockBasic mutual exclusion. acquire/release. Not reentrant.
RLockReentrant lock — same thread can acquire multiple times
EventSignal between threads. wait() blocks until set() called.
ConditionWait for a condition to become true. Wraps a Lock.
SemaphoreLimit access to a resource pool (e.g. max N connections)
BoundedSemaphoreSemaphore that raises if released more than acquired
BarrierBlock N threads until all have called wait()
TimerCall function after delay in a background thread
python
import threading

# Lock — always use as context manager
lock = threading.Lock()
counter = 0
def increment():
    global counter
    with lock:              # auto-release even on exception
        counter += 1

# RLock — reentrant (same thread can acquire again)
rlock = threading.RLock()
def recursive(n):
    with rlock:
        if n > 0: recursive(n-1)   # same thread reacquires

# Event — one-shot signal
ready = threading.Event()
def producer():
    time.sleep(1)
    ready.set()             # signal all waiters

def consumer():
    ready.wait(timeout=5)  # block until set or timeout
    print("ready!")

# Condition — wait for complex state
cond = threading.Condition()
items = []

def wait_for_item():
    with cond:
        cond.wait_for(lambda: len(items) > 0)  # atomically wait
        return items.pop()

def add_item(item):
    with cond:
        items.append(item)
        cond.notify_all()   # wake all waiters
13

Multiprocessing

Each process gets its own memory space, GIL, and Python interpreter. Data is passed between processes by serialisation (pickle). Best for CPU-bound work.

python
from multiprocessing import Pool, Process, Queue, Value, Array
import multiprocessing as mp

# Pool — the workhorses
def square(n): return n ** 2

with Pool(processes=4) as pool:
    results = pool.map(square, range(100))   # blocking

    # Non-blocking variants:
    ar = pool.map_async(square, range(100))
    ar.get(timeout=10)

    # Chunked for large iterables:
    results = pool.imap(square, range(10000), chunksize=100)

    # starmap — multiple args per call:
    pool.starmap(pow, [(2,3), (3,4)])  # [8, 81]

# Queue — IPC between processes
q = mp.Queue()
def producer(q):
    for i in range(10):
        q.put(i)
    q.put(None)  # sentinel

def consumer(q):
    while (item := q.get()) is not None:
        process(item)

# Shared memory (fastest — no pickle)
counter = Value('i', 0)   # 'i' = C int
arr     = Array('d', range(10))  # 'd' = C double

def increment(counter):
    with counter.get_lock():   # shared memory is NOT thread-safe!
        counter.value += 1

# multiprocessing.shared_memory (3.8+) — arbitrary data
from multiprocessing.shared_memory import SharedMemory
import numpy as np

shm  = SharedMemory(create=True, size=1024)
arr  = np.ndarray((128,), dtype=np.float64, buffer=shm.buf)
# ... pass shm.name to other processes; they attach with SharedMemory(name=...)
shm.close(); shm.unlink()  # cleanup!
⚠ start method matters

fork (default on Linux) copies parent memory — fastest but unsafe with threads (deadlocks). spawn (default on macOS 3.8+, Windows) starts fresh Python interpreter — safest. Always use if __name__ == '__main__': guard with spawn/forkserver. Set globally with mp.set_start_method('spawn').

concurrent.futures

High-level API over threading and multiprocessing, with a unified Future interface.

python
from concurrent.futures import (
    ThreadPoolExecutor, ProcessPoolExecutor,
    as_completed, wait, FIRST_COMPLETED
)

# ThreadPoolExecutor — I/O bound tasks
with ThreadPoolExecutor(max_workers=20) as exe:
    futures = [exe.submit(fetch, url) for url in urls]

    # Process as they complete (order not guaranteed):
    for fut in as_completed(futures):
        try:
            data = fut.result()
        except Exception as e:
            print(f"failed: {e}")

# executor.map — preserves order, raises on first exception
with ProcessPoolExecutor() as exe:
    results = list(exe.map(cpu_work, data_items))

# wait — fine-grained control
done, not_done = wait(futures, timeout=5, return_when=FIRST_COMPLETED)

# Future API:
fut.result(timeout=10)   # blocks; raises exception if fn raised
fut.exception()            # None if successful
fut.done()                 # True if finished (any state)
fut.cancel()               # True if successfully cancelled (pending only)
fut.add_done_callback(cb) # cb(future) called when done
ScenarioBest choice
Many I/O tasks, async codeasyncio (no thread overhead)
I/O + 3rd party blocking libsThreadPoolExecutor
CPU-bound, single machineProcessPoolExecutor / multiprocessing.Pool
CPU-bound, C extensions (NumPy)Threads (NumPy releases GIL)
Mixed CPU + I/Oasyncio + run_in_executor for CPU parts