Py201 Advanced Python Internals & Concurrency
Go beneath the surface. Master the data model, descriptor protocol, metaclasses, generator machinery, the GIL, and full async concurrency. Written for engineers who already know Python and want to understand how it works.
The Python Data Model
The Python data model is the system of special (dunder) methods that let your objects participate in the language itself — operators, iteration, attribute access, context managers, and more. Understanding it unlocks idiomatic Python.
When Python evaluates a + b, it does not call a built-in function; it calls a.__add__(b). Every language feature maps to one or more special methods. Your objects can implement any of them.
Object Lifecycle
Allocates and returns the new object. Must return an instance of cls (or a different type). Rarely overridden — use for immutable types (subclassing int, str, tuple) or singleton patterns.
class Singleton: _instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: # super().__new__ actually allocates memory cls._instance = super().__new__(cls) return cls._instance def __init__(self, value): self.value = value # called even on re-use! a = Singleton(1) b = Singleton(2) print(a is b) # True — same object print(a.value) # 2 — __init__ ran again
Initialises the already-created object. Must return None. This is where attributes are set.
Finaliser — not a destructor. Timing is not guaranteed (especially with cycles). Prefer context managers. Never raises exceptions from here.
Makes any object callable. Used extensively in decorators, neural network modules, and function-like classes.
class Multiplier: def __init__(self, factor): self.factor = factor def __call__(self, x): return self.factor * x triple = Multiplier(3) print(triple(10)) # 30 callable(triple) # True
__repr__ should return an unambiguous string, ideally one that recreates the object. __str__ returns a readable string. print() uses __str__, falling back to __repr__.
Truth testing calls __bool__. If absent, calls __len__ — zero means false. If neither, objects are always truthy.
If you define __eq__, Python sets __hash__ = None (unhashable). To keep hashability, also define __hash__. Objects that compare equal must have equal hashes.
Attribute Access Dunders
Python's attribute system is rich and layered. Understanding the lookup order is essential for descriptors and metaclasses.
Fallback for missing attributes. Great for lazy loading, proxy objects, or DSL attribute chaining. Avoid infinite recursion by not accessing undefined attrs inside it.
Called for every attribute access. Override with extreme care. Must call super().__getattribute__(name) for default behaviour. Easy to create infinite recursion.
class AccessLog: def __getattribute__(self, name): # intercept every access print(f"getting '{name}'") # MUST delegate to super — NOT self.name (infinite loop!) return super().__getattribute__(name) def __getattr__(self, name): # only called when attr genuinely missing return f"<dynamic:{name}>" obj = AccessLog() obj.x # → getting 'x' → "<dynamic:x>"
Intercepts all attribute writes. Use object.__setattr__(self, name, value) inside to avoid recursion.
Intercepts attribute deletion. Same recursion caveat applies.
Container Protocol
| Dunder | Triggered by | Notes |
|---|---|---|
| __len__ | len(obj) | Return non-negative int |
| __getitem__ | obj[key] | Support slices via isinstance(key, slice) |
| __setitem__ | obj[key] = val | — |
| __delitem__ | del obj[key] | — |
| __contains__ | x in obj | Falls back to __iter__ linear scan |
| __iter__ | for x in obj / iter(obj) | Return iterator (object with __next__) |
| __next__ | next(iterator) | Raise StopIteration when exhausted |
| __reversed__ | reversed(obj) | Falls back to __len__ + __getitem__ |
| __missing__ | self[key] KeyError in dict subclass | dict subclasses only |
class RingBuffer: def __init__(self, size): self._data = [None] * size self._size = size self._head = 0 def __len__(self): return self._size def __getitem__(self, idx): return self._data[(self._head + idx) % self._size] def __setitem__(self, idx, val): self._data[(self._head + idx) % self._size] = val def __iter__(self): for i in range(self._size): yield self[i] def __contains__(self, item): return item in self._data
Arithmetic Operators
| Op | Forward | Reflected | In-place |
|---|---|---|---|
| + | __add__ | __radd__ | __iadd__ |
| - | __sub__ | __rsub__ | __isub__ |
| * | __mul__ | __rmul__ | __imul__ |
| / | __truediv__ | __rtruediv__ | __itruediv__ |
| // | __floordiv__ | __rfloordiv__ | __ifloordiv__ |
| % | __mod__ | __rmod__ | __imod__ |
| ** | __pow__ | __rpow__ | __ipow__ |
| & | __and__ | __rand__ | __iand__ |
| | | __or__ | __ror__ | __ior__ |
| ^ | __xor__ | __rxor__ | __ixor__ |
| << | __lshift__ | __rlshift__ | __ilshift__ |
| >> | __rshift__ | __rrshift__ | __irshift__ |
| @ | __matmul__ | __rmatmul__ | __imatmul__ |
| -x | __neg__ | — | — |
| +x | __pos__ | — | — |
| ~x | __invert__ | — | — |
| abs() | __abs__ | — | — |
For a + b: Python first tries a.__add__(b). If it returns NotImplemented, Python tries b.__radd__(a). If that also fails, raises TypeError. In-place operators (+=) should return self for mutables or a new object for immutables.
from functools import total_ordering @total_ordering # implement __eq__ + one of lt/le/gt/ge → get all 6 class Vector: def __init__(self, x, y): self.x, self.y = x, y def __add__(self, other): if not isinstance(other, Vector): return NotImplemented # triggers __radd__ return Vector(self.x + other.x, self.y + other.y) def __mul__(self, scalar): return Vector(self.x * scalar, self.y * scalar) __rmul__ = __mul__ # 3 * v works too def __iadd__(self, other): self.x += other.x self.y += other.y return self # mutate + return self def __eq__(self, other): return (self.x, self.y) == (other.x, other.y) def __lt__(self, other): return (self.x**2 + self.y**2) < (other.x**2 + other.y**2) def __repr__(self): return f"Vector({self.x}, {self.y})"
__slots__
__slots__ replaces the per-instance __dict__ with a fixed set of slot descriptors stored in the class, significantly reducing memory overhead and slightly speeding up attribute access.
~56 bytes/instance (CPython 3.12)
No __dict__ overhead
Attribute names fixed at class creation
Can't add arbitrary attrs at runtime
~248 bytes/instance minimum
__dict__ is a hash map per object
Fully dynamic — add any attr
Better for exploratory / generic code
class Point: __slots__ = ('x', 'y') # tuple of allowed attr names def __init__(self, x, y): self.x = x # uses slot descriptor, not __dict__ self.y = y p = Point(1, 2) p.z = 3 # AttributeError — no __dict__, no slot for 'z' # Inheritance rules: class Point3D(Point): __slots__ = ('z',) # only add NEW slots # if you omit __slots__ here, a __dict__ is added back! # Allow __weakref__ support: class WeakPoint: __slots__ = ('x', 'y', '__weakref__')
Function Internals
Python functions are first-class objects — instances of function type with a rich set of attributes. Understanding the function object and call conventions unlocks advanced metaprogramming.
Function Object Anatomy
def make_adder(n): def adder(x): return x + n # n is a free variable → stored in closure return adder add5 = make_adder(5) add5.__closure__ # (<cell at 0x...>,) add5.__closure__[0].cell_contents # 5 add5.__code__.co_freevars # ('n',) # Mutate a closure cell (CPython hack — educational only): import ctypes ctypes.cast(id(add5.__closure__[0]), ctypes.py_object).value.cell_contents = 10 add5(1) # 11 — closed-over value changed!
Parameter Kinds
Python 3 has five distinct parameter kinds, distinguished by position and syntax.
def api(x, y, /, z=0, *, verbose=False, **opts): pass # x, y must be positional — name is implementation detail api(1, 2) # OK api(1, 2, z=3) # OK — z is pos-or-kw api(1, y=2) # TypeError — y is positional-only api(1, 2, verbose=True) # OK — verbose is kw-only # / in builtin signatures: len(obj) — obj is pos-only # Why? Allows renaming params in C impl without breaking callers
*args / **kwargs Deep Dive
# TypedDict + Unpack for typed **kwargs (Python 3.12+) from typing import TypedDict, Unpack class Options(TypedDict, total=False): verbose: bool timeout: float retries: int def connect(host: str, **opts: Unpack[Options]) -> None: # type checker knows opts keys are verbose/timeout/retries timeout = opts.get('timeout', 30.0) ... # Spread / unpack at call site args = ('example.com',) kwargs = {'verbose': True, 'timeout': 5.0} connect(*args, **kwargs) # Forwarding all args to another function def wrapper(*args, **kwargs): print("before") result = original(*args, **kwargs) # perfect forwarding print("after") return result
inspect Module
import inspect def greet(name: str, *, loud: bool = False) -> str: ... sig = inspect.signature(greet) for name, param in sig.parameters.items(): print(name, param.kind, param.default, param.annotation) # name POSITIONAL_OR_KEYWORD <empty> <class 'str'> # loud KEYWORD_ONLY False <class 'bool'> # Bind arguments programmatically: bound = sig.bind("Alice", loud=True) bound.apply_defaults() bound.arguments # OrderedDict([('name','Alice'),('loud',True)]) # Useful predicates: inspect.isfunction(fn) inspect.ismethod(obj.method) inspect.iscoroutinefunction(async_fn) inspect.isgeneratorfunction(gen_fn) # Get source code: inspect.getsource(greet) inspect.getfile(greet)
Descriptor Protocol
Descriptors are the mechanism behind property, classmethods, staticmethods, slots, and ORM fields. Any class that defines __get__, __set__, or __delete__ is a descriptor.
Defines __set__ and/or __delete__. Takes priority over instance __dict__. property is a data descriptor.
Defines __get__ only. Instance __dict__ takes priority. Functions (methods) are non-data descriptors.
class TypedAttr: """Data descriptor that enforces a type.""" def __set_name__(self, owner, name): # called when class body is executed (Python 3.6+) self.public_name = name self.private_name = '_' + name # store in instance __dict__ def __init__(self, expected_type): self.expected_type = expected_type def __get__(self, obj, objtype=None): if obj is None: return self # Class.attr → return descriptor itself return getattr(obj, self.private_name, None) def __set__(self, obj, value): if not isinstance(value, self.expected_type): raise TypeError(f"{self.public_name} must be {self.expected_type.__name__}") setattr(obj, self.private_name, value) def __delete__(self, obj): delattr(obj, self.private_name) class Person: name = TypedAttr(str) age = TypedAttr(int) def __init__(self, name, age): self.name = name # triggers TypedAttr.__set__ self.age = age p = Person("Alice", 30) p.age = "old" # TypeError: age must be int
property is itself a data descriptor written in C. When you write @property, you get a property object assigned to the class. Its __get__ calls your getter, __set__ calls your setter (or raises AttributeError if none), and __delete__ calls your deleter.
# Pure-Python reimplementation of @property: class myproperty: def __init__(self, fget=None, fset=None, fdel=None): self.fget, self.fset, self.fdel = fget, fset, fdel def __get__(self, obj, objtype=None): if obj is None: return self if self.fget is None: raise AttributeError return self.fget(obj) def __set__(self, obj, value): if self.fset is None: raise AttributeError("can't set") self.fset(obj, value) def setter(self, fset): return myproperty(self.fget, fset, self.fdel) class Temperature: @myproperty def celsius(self): return self._c @celsius.setter def celsius(self, val): self._c = float(val)
Metaclasses
A metaclass is the class of a class. Just as a class defines the behaviour of its instances, a metaclass defines the behaviour of its classes. The default metaclass is type.
type() — Three-argument Form
# type(name, bases, namespace) creates a class dynamically Dog = type('Dog', (object,), { 'sound': 'woof', 'speak': lambda self: f"{self.sound}!" }) # Equivalent to: class Dog: sound = 'woof' def speak(self): return f"{self.sound}!" # type.__call__ orchestrates class creation: # 1. MyMeta.__prepare__(name, bases) → namespace dict # 2. Execute class body in namespace # 3. MyMeta.__new__(meta, name, bases, namespace) → cls # 4. MyMeta.__init__(cls, name, bases, namespace)
class RegistryMeta(type): """Metaclass that auto-registers all subclasses.""" _registry = {} def __new__(mcs, name, bases, namespace): cls = super().__new__(mcs, name, bases, namespace) if bases: # don't register the base class itself mcs._registry[name] = cls return cls @classmethod def get(mcs, name): return mcs._registry[name] class Plugin(metaclass=RegistryMeta): pass class AudioPlugin(Plugin): # auto-registered pass class VideoPlugin(Plugin): # auto-registered pass RegistryMeta._registry # {'AudioPlugin': <class...>, 'VideoPlugin': ...}
__init_subclass__ — Simpler Alternative
For many metaclass use cases, __init_subclass__ (Python 3.6+) is cleaner and avoids the full metaclass machinery.
class Plugin: _registry = {} def __init_subclass__(cls, alias=None, **kwargs): super().__init_subclass__(**kwargs) name = alias or cls.__name__ Plugin._registry[name] = cls class AudioPlugin(Plugin, alias='audio'): pass class VideoPlugin(Plugin): pass Plugin._registry # {'audio': AudioPlugin, 'VideoPlugin': VideoPlugin}
If two metaclasses in a hierarchy are not subclasses of each other, Python raises a TypeError: metaclass conflict. Resolve by creating a merged metaclass: class Merged(MetaA, MetaB): pass and using it explicitly.
Structural Subtyping (Protocols)
Protocol (Python 3.8+) enables structural subtyping — "duck typing with type checker support". A class satisfies a Protocol if it has the required methods/attributes, without inheriting from it.
from typing import Protocol, runtime_checkable @runtime_checkable class Drawable(Protocol): def draw(self) -> None: ... def resize(self, factor: float) -> None: ... class Circle: # no inheritance! def draw(self): print("○") def resize(self, f): self.r *= f def render(shape: Drawable) -> None: shape.draw() render(Circle()) # type checker ✓, works at runtime isinstance(Circle(), Drawable) # True (runtime_checkable) # Protocol with class variables: class HasVersion(Protocol): version: str # must be a class-level attribute # Protocol inheritance for composition: class DrawableAndSaveable(Drawable, Protocol): def save(self, path: str) -> None: ...
Dataclasses Deep Dive
The dataclasses module auto-generates boilerplate methods. Understanding all its knobs unlocks elegant, well-typed data classes.
from dataclasses import dataclass, field, InitVar, KW_ONLY, replace from typing import ClassVar @dataclass(order=True, frozen=False, slots=True) class Point: # Comparison uses (x, y) tuple ordering (order=True) x: float = 0.0 y: float = 0.0 label: str = field(default='', compare=False, repr=True) # field() options: default, default_factory, init, repr, compare, hash, metadata tags: list = field(default_factory=list) # NOT tags=[]! total: ClassVar[int] = 0 # excluded from all methods
# __post_init__ and InitVar @dataclass class Circle: radius: float unit: InitVar[str] = 'm' # passed to __init__ but NOT stored def __post_init__(self, unit): # InitVar comes here if unit == 'cm': self.radius /= 100 # convert to metres c = Circle(50, unit='cm') # radius → 0.5 # KW_ONLY sentinel (Python 3.10+) @dataclass class Config: host: str port: int _: KW_ONLY # all fields after this are kw-only debug: bool = False timeout: float = 30.0 Config("localhost", 8080, debug=True) # debug must be keyword # replace() — copy with field overrides (like NamedTuple._replace) c2 = replace(c, radius=1.0) print(c is c2) # False — new object
| @dataclass parameter | Default | Effect |
|---|---|---|
| init | True | Generate __init__ |
| repr | True | Generate __repr__ |
| eq | True | Generate __eq__ |
| order | False | Generate __lt__ etc. (requires eq=True) |
| frozen | False | Immutable — raises on __setattr__ |
| unsafe_hash | False | Force __hash__ generation |
| slots | False | Auto-generate __slots__ (3.10+) |
| kw_only | False | All fields become keyword-only (3.10+) |
| match_args | True | Generate __match_args__ for pattern matching |
Advanced Decorators
Optional-argument Decorators
A decorator that works both with and without arguments — @retry and @retry(times=3) — is tricky but elegant when done right.
import functools def retry(fn=None, *, times=3, exceptions=(Exception,)): """Works as @retry or @retry(times=5).""" if fn is None: # Called WITH args: @retry(times=5) → returns real decorator return functools.partial(retry, times=times, exceptions=exceptions) @functools.wraps(fn) def wrapper(*args, **kwargs): for attempt in range(times): try: return fn(*args, **kwargs) except exceptions as e: if attempt == times - 1: raise print(f"Retry {attempt+1}/{times}: {e}") return wrapper @retry # fn is the function → works def fetch_data(): ... @retry(times=5) # fn is None → returns decorator def connect(): ...
ParamSpec — Preserving Callable Types
from typing import ParamSpec, TypeVar, Callable import functools P = ParamSpec('P') # captures parameter spec T = TypeVar('T') # captures return type def logged(fn: Callable[P, T]) -> Callable[P, T]: @functools.wraps(fn) def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: print(f"calling {fn.__name__}") result = fn(*args, **kwargs) print(f"done") return result return wrapper @logged def greet(name: str, *, loud: bool = False) -> str: return name.upper() if loud else name # Type checker knows greet still accepts (name: str, *, loud: bool)
Class Decorators
Class decorators receive the class object and return a (usually modified) class. Simpler than metaclasses for many use cases.
def singleton(cls): """Class decorator that converts any class into a singleton.""" instances = {} @functools.wraps(cls, updated=[]) # keep cls name/doc class Wrapper(*cls.__bases__): def __new__(klass, *a, **kw): if cls not in instances: instances[cls] = super().__new__(klass) cls.__init__(instances[cls], *a, **kw) return instances[cls] return Wrapper def add_repr(cls): """Auto-generate __repr__ from annotations.""" fields = list(cls.__annotations__.keys()) def __repr__(self): attrs = ', '.join(f"{f}={getattr(self,f)!r}" for f in fields) return f"{cls.__name__}({attrs})" cls.__repr__ = __repr__ return cls @add_repr class Config: host: str port: int def __init__(self, host, port): self.host, self.port = host, port
functools Toolkit
| Tool | Use |
|---|---|
| @lru_cache(maxsize=128) | Memoize with LRU eviction. Use None for unlimited. |
| @cache | lru_cache(maxsize=None) — simpler, no eviction |
| @cached_property | Computed once, cached in instance __dict__. No locks — not thread-safe. |
| partial(fn, *args) | Freeze positional/keyword arguments |
| partialmethod | Like partial but for descriptors/methods |
| reduce(fn, iterable) | Left-fold / accumulate |
| @total_ordering | Fill in comparison methods from __eq__ + one other |
| @singledispatch | Function overloading based on first arg type |
| @singledispatchmethod | Method version (3.8+) |
| wraps(fn) | Copy __name__, __doc__, __annotations__, etc. |
from functools import lru_cache, cached_property, singledispatch # lru_cache — function must be hashable args @lru_cache(maxsize=256) def fib(n: int) -> int: if n < 2: return n return fib(n-1) + fib(n-2) fib.cache_info() # CacheInfo(hits=..., misses=..., maxsize=256, currsize=...) fib.cache_clear() # empty the cache # cached_property class DataSet: def __init__(self, data): self._data = data @cached_property def mean(self): # computed lazily on first access, then stored return sum(self._data) / len(self._data) # singledispatch @singledispatch def process(arg): raise NotImplementedError(f"No handler for {type(arg)}") @process.register(int) def _(n): return n * 2 @process.register(str) def _(s): return s.upper() @process.register(list) @process.register(tuple) # stack registers for same handler def _(seq): return [process(x) for x in seq]
itertools Reference
Infinite Iterators
| Function | Signature | Output |
|---|---|---|
| count | count(start=0, step=1) | start, start+step, start+2*step … |
| cycle | cycle(iterable) | A B C A B C A B C … |
| repeat | repeat(obj, times=None) | obj obj obj … (finite if times set) |
Finite / Combinatoric
| Function | Use |
|---|---|
| chain(*iterables) | Concatenate iterables lazily |
| chain.from_iterable(it) | Flatten one level: [[1,2],[3]] → 1 2 3 |
| islice(it, stop) / islice(it, start, stop, step) | Lazy slice of iterator |
| zip_longest(*its, fillvalue=None) | Like zip but pads shortest |
| starmap(fn, iterable) | fn(*item) for item in iterable |
| takewhile(pred, it) | Yield until pred is False |
| dropwhile(pred, it) | Skip until pred is False, yield rest |
| filterfalse(pred, it) | Yield items where pred is falsy |
| compress(data, selectors) | Yield data where selector is truthy |
| groupby(it, key=None) | Group consecutive equal-key items |
| pairwise(it) | Overlapping pairs (3.10+): ABCDE → AB BC CD DE |
| batched(it, n) | Fixed-size chunks (3.12+) |
| accumulate(it, fn=add, initial=None) | Running total / prefix sums |
| product(*its, repeat=1) | Cartesian product |
| permutations(it, r=None) | All ordered r-length subsets |
| combinations(it, r) | All unordered r-length subsets (no repeat) |
| combinations_with_replacement(it, r) | Like combinations but with repeats |
from itertools import groupby, accumulate, batched, pairwise import operator # groupby — input must be sorted by key! data = [('a', 1), ('a', 2), ('b', 3), ('b', 4)] for key, group in groupby(data, key=lambda x: x[0]): print(key, list(group)) # 'a' [('a',1),('a',2)] 'b' [('b',3),('b',4)] # accumulate — prefix sums, running product, running max list(accumulate([1,2,3,4])) # [1,3,6,10] list(accumulate([1,2,3,4], operator.mul)) # [1,2,6,24] list(accumulate([3,1,4,1,5], max)) # [3,3,4,4,5] # batched (3.12) — strict chunking list(batched(range(7), 3)) # [(0,1,2), (3,4,5), (6,)] # pairwise (3.10) — overlapping windows of 2 list(pairwise('ABCDE')) # [('A','B'), ('B','C'), ('C','D'), ('D','E')] # Manual sliding window of any size: from collections import deque def sliding(it, n): win = deque(maxlen=n) for x in it: win.append(x) if len(win) == n: yield tuple(win)
collections Module
Counter
from collections import Counter c = Counter("abracadabra") # Counter({'a':5, 'b':2, 'r':2, 'c':1, 'd':1}) c.most_common(3) # [('a',5), ('b',2), ('r',2)] c.total() # 11 (3.10+) c + Counter("abc") # add counts (drop zeros/negatives) c - Counter("abc") # subtract (drop zeros/negatives) c & Counter("abc") # intersection (min count) c | Counter("abc") # union (max count) c['z'] # 0 — missing keys return 0, not KeyError # Word frequency: words = "the quick brown fox the fox".split() Counter(words).most_common() # [('the',2),('fox',2),...]
defaultdict
from collections import defaultdict # Group items without KeyError: graph = defaultdict(list) edges = [(1,2), (1,3), (2,3)] for src, dst in edges: graph[src].append(dst) # no need to check/init # Nested defaultdict: nested = defaultdict(lambda: defaultdict(int)) nested['a']['x'] += 1 # no KeyError at either level # Counter implemented with defaultdict: freq = defaultdict(int) for word in words: freq[word] += 1
deque
from collections import deque dq = deque([1,2,3], maxlen=5) # circular buffer when maxlen set dq.appendleft(0) # O(1) — unlike list.insert(0, x) dq.popleft() # O(1) — unlike list.pop(0) dq.rotate(2) # rotate right by 2 steps dq.rotate(-1) # rotate left by 1 dq.extendleft([10,11]) # each appended left → reverses order # Use as a bounded queue (maxlen) — oldest items auto-drop recent = deque(maxlen=3) for x in range(6): recent.append(x) # deque([3,4,5], maxlen=3)
ChainMap
from collections import ChainMap defaults = {'color': 'blue', 'size': 10} overrides = {'color': 'red'} cfg = ChainMap(overrides, defaults) cfg['color'] # 'red' — first map wins cfg['size'] # 10 — falls through to defaults # Writes go to FIRST map only: cfg['weight'] = 5 # added to overrides, not defaults # Use for variable scoping (like CPython's implementation): base = ChainMap({}, cfg) # new child scope base.parents # original ChainMap base.maps # list of all dicts in chain
| Collection | Best for | Key advantage |
|---|---|---|
| OrderedDict | Ordered dicts pre-3.7 / reordering | move_to_end(), popitem(last=True/False) |
| namedtuple | Lightweight record types | Tuple + named field access |
| UserDict/UserList/UserString | Subclassing built-ins safely | Avoids C-extension subclass pitfalls |
operator Module
Functions corresponding to built-in Python operators — useful for sorted(), functools.reduce(), and higher-order function patterns.
import operator # Arithmetic operator.add(2, 3) # 5 operator.mul(2, 3) # 6 # Attribute / item access (great for key= args) sorted(people, key=operator.attrgetter('age', 'name')) sorted(rows, key=operator.itemgetter(2, 0)) # col 2 then col 0 # Method call strip_all = map(operator.methodcaller('strip'), strings) # Comparison operator.lt(1, 2) # True operator.eq('a', 'a') # True # Logical operator.truth(0) # False operator.not_(1) # False # Use with functools.reduce: from functools import reduce reduce(operator.add, [1,2,3,4]) # 10 reduce(operator.mul, [1,2,3,4]) # 24
copy / weakref
import copy a = [[1,2], [3,4]] b = copy.copy(a) # shallow: new list, same inner lists c = copy.deepcopy(a) # deep: new list AND new inner lists b[0].append(99) # a[0] also changes! (same object) c[0].append(99) # a[0] unchanged # Customise deep copy: class MyObj: def __deepcopy__(self, memo): new = MyObj() memo[id(self)] = new # important: register before recursion # ... copy fields selectively return new # weakref — reference that doesn't prevent GC import weakref class Node: pass node = Node() ref = weakref.ref(node) # weak reference ref() # <Node> — alive del node ref() # None — collected # WeakValueDictionary — auto-remove when values are GC'd cache = weakref.WeakValueDictionary() cache['key'] = Node() # will disappear after this line! # WeakSet and WeakKeyDictionary also exist
Generator Protocol
Generators are iterators produced by generator functions (with yield) or generator expressions. They implement the full iterator protocol plus send(), throw(), and close().
def accumulator(): """Demonstrates send(), throw(), return value.""" total = 0 while True: try: value = yield total # suspend; receive sent value except GeneratorExit: return total # return value from return except ValueError: total = 0 # reset on throw(ValueError) else: total += value gen = accumulator() next(gen) # advance to first yield → 0 gen.send(10) # send 10 → 10 gen.send(5) # send 5 → 15 gen.throw(ValueError) # resets → 0 gen.send(3) # → 3 # yield from — delegate to sub-generator def chain_gen(*iterables): for it in iterables: yield from it # passes send/throw/close through! # Generator return value captured by yield from: def inner(): yield 1 return 'done' def outer(): result = yield from inner() # result = 'done' print(f"inner returned: {result}")
Native Coroutines
Before Python 3.5, coroutines were generator functions decorated with @asyncio.coroutine using yield from. Since 3.5, use async def / await. Generator-based coroutines are removed in 3.12.
import asyncio async def fetch(url: str) -> str: # awaitable = coroutine | Task | Future async with aiohttp.ClientSession() as sess: async with sess.get(url) as resp: return await resp.text() # Async iteration: async def stream_lines(path): async with aiofiles.open(path) as f: async for line in f: # __aiter__ / __anext__ yield line.strip() # async generator! # Async context manager protocol: class AsyncTimer: async def __aenter__(self): self.start = asyncio.get_event_loop().time() return self async def __aexit__(self, *exc): elapsed = asyncio.get_event_loop().time() - self.start print(f"{elapsed:.3f}s")
asyncio Deep Dive
import asyncio # TaskGroup (3.11+) — structured concurrency async def main(): async with asyncio.TaskGroup() as tg: t1 = tg.create_task(fetch("https://example.com")) t2 = tg.create_task(fetch("https://example.org")) # both done here; any exception cancels all tasks print(t1.result(), t2.result()) # asyncio.shield — prevent cancellation of a subtask async def safe_save(data): try: result = await asyncio.shield(write_to_db(data)) except asyncio.CancelledError: print("cancelled — but write_to_db still runs!") raise # timeout (3.11+) async def with_timeout(): try: async with asyncio.timeout(5.0): return await slow_operation() except asyncio.TimeoutError: print("timed out") # asyncio.Semaphore — limit concurrency sem = asyncio.Semaphore(10) async def rate_limited(url): async with sem: # max 10 concurrent requests return await fetch(url) # run_in_executor — run blocking code in thread pool async def read_file(path): loop = asyncio.get_running_loop() return await loop.run_in_executor( None, # None = default ThreadPoolExecutor pathlib.Path(path).read_text ) # asyncio.gather vs TaskGroup: # gather: partial results on failure (return_exceptions=True) # TaskGroup: all-or-nothing, strict structured concurrency results = await asyncio.gather(*coros, return_exceptions=True)
| asyncio Primitive | Purpose |
|---|---|
| asyncio.Event | Notify multiple waiters (one-shot broadcast) |
| asyncio.Condition | Wait for a condition + Lock combo |
| asyncio.Semaphore | Limit concurrent access to a resource |
| asyncio.BoundedSemaphore | Semaphore that errors if released too many times |
| asyncio.Queue | Producer-consumer between coroutines |
| asyncio.PriorityQueue | Queue with priority ordering |
| asyncio.Lock | Mutual exclusion within event loop |
The GIL Explained
The Global Interpreter Lock is a mutex in CPython that prevents multiple native threads from executing Python bytecode simultaneously. It is not a language feature — it's a CPython implementation detail.
Network calls, file reads, sleep — GIL released during waiting. 10 threads fetching 10 URLs: near 10x faster.
Image processing, ML inference, number crunching — each process has its own GIL. Use multiprocessing or concurrent.futures.ProcessPoolExecutor.
Python 3.13 introduced an optional free-threaded build (python3.13t) that disables the GIL. Opt-in per module with sys.flags.nogil. Not yet production-stable but signifies the path forward. NumPy and Cython are being updated to support it.
Threading
import threading # Basic thread t = threading.Thread(target=worker, args=(1,), daemon=True) t.start() t.join(timeout=5.0) # wait up to 5s # Thread-local storage — each thread has its own value local = threading.local() def worker(): local.connection = create_db_connection() # not shared # Subclassing Thread: class Worker(threading.Thread): def run(self): # runs in new thread pass
Synchronisation Primitives
| Primitive | When to use |
|---|---|
| Lock | Basic mutual exclusion. acquire/release. Not reentrant. |
| RLock | Reentrant lock — same thread can acquire multiple times |
| Event | Signal between threads. wait() blocks until set() called. |
| Condition | Wait for a condition to become true. Wraps a Lock. |
| Semaphore | Limit access to a resource pool (e.g. max N connections) |
| BoundedSemaphore | Semaphore that raises if released more than acquired |
| Barrier | Block N threads until all have called wait() |
| Timer | Call function after delay in a background thread |
import threading # Lock — always use as context manager lock = threading.Lock() counter = 0 def increment(): global counter with lock: # auto-release even on exception counter += 1 # RLock — reentrant (same thread can acquire again) rlock = threading.RLock() def recursive(n): with rlock: if n > 0: recursive(n-1) # same thread reacquires # Event — one-shot signal ready = threading.Event() def producer(): time.sleep(1) ready.set() # signal all waiters def consumer(): ready.wait(timeout=5) # block until set or timeout print("ready!") # Condition — wait for complex state cond = threading.Condition() items = [] def wait_for_item(): with cond: cond.wait_for(lambda: len(items) > 0) # atomically wait return items.pop() def add_item(item): with cond: items.append(item) cond.notify_all() # wake all waiters
Multiprocessing
Each process gets its own memory space, GIL, and Python interpreter. Data is passed between processes by serialisation (pickle). Best for CPU-bound work.
from multiprocessing import Pool, Process, Queue, Value, Array import multiprocessing as mp # Pool — the workhorses def square(n): return n ** 2 with Pool(processes=4) as pool: results = pool.map(square, range(100)) # blocking # Non-blocking variants: ar = pool.map_async(square, range(100)) ar.get(timeout=10) # Chunked for large iterables: results = pool.imap(square, range(10000), chunksize=100) # starmap — multiple args per call: pool.starmap(pow, [(2,3), (3,4)]) # [8, 81] # Queue — IPC between processes q = mp.Queue() def producer(q): for i in range(10): q.put(i) q.put(None) # sentinel def consumer(q): while (item := q.get()) is not None: process(item) # Shared memory (fastest — no pickle) counter = Value('i', 0) # 'i' = C int arr = Array('d', range(10)) # 'd' = C double def increment(counter): with counter.get_lock(): # shared memory is NOT thread-safe! counter.value += 1 # multiprocessing.shared_memory (3.8+) — arbitrary data from multiprocessing.shared_memory import SharedMemory import numpy as np shm = SharedMemory(create=True, size=1024) arr = np.ndarray((128,), dtype=np.float64, buffer=shm.buf) # ... pass shm.name to other processes; they attach with SharedMemory(name=...) shm.close(); shm.unlink() # cleanup!
fork (default on Linux) copies parent memory — fastest but unsafe with threads (deadlocks). spawn (default on macOS 3.8+, Windows) starts fresh Python interpreter — safest. Always use if __name__ == '__main__': guard with spawn/forkserver. Set globally with mp.set_start_method('spawn').
concurrent.futures
High-level API over threading and multiprocessing, with a unified Future interface.
from concurrent.futures import ( ThreadPoolExecutor, ProcessPoolExecutor, as_completed, wait, FIRST_COMPLETED ) # ThreadPoolExecutor — I/O bound tasks with ThreadPoolExecutor(max_workers=20) as exe: futures = [exe.submit(fetch, url) for url in urls] # Process as they complete (order not guaranteed): for fut in as_completed(futures): try: data = fut.result() except Exception as e: print(f"failed: {e}") # executor.map — preserves order, raises on first exception with ProcessPoolExecutor() as exe: results = list(exe.map(cpu_work, data_items)) # wait — fine-grained control done, not_done = wait(futures, timeout=5, return_when=FIRST_COMPLETED) # Future API: fut.result(timeout=10) # blocks; raises exception if fn raised fut.exception() # None if successful fut.done() # True if finished (any state) fut.cancel() # True if successfully cancelled (pending only) fut.add_done_callback(cb) # cb(future) called when done
| Scenario | Best choice |
|---|---|
| Many I/O tasks, async code | asyncio (no thread overhead) |
| I/O + 3rd party blocking libs | ThreadPoolExecutor |
| CPU-bound, single machine | ProcessPoolExecutor / multiprocessing.Pool |
| CPU-bound, C extensions (NumPy) | Threads (NumPy releases GIL) |
| Mixed CPU + I/O | asyncio + run_in_executor for CPU parts |