Back to handbooks index

Data Engineering Core Concepts & Python Handbook

A production-focused guide to modern data engineering fundamentals: data movement, storage design, scalable file formats, orchestration, and Python-first implementation patterns across the modern data stack.

Python First Pandas · Polars · PySpark · Airflow Warehouse · Lake · Lakehouse Complete Edition

Table of Contents

This handbook is organized around the core concerns of a production data engineer: how data moves, where it lives, how it is cleaned, how pipelines are scheduled, and how all of those parts combine in real systems used by analytics, machine learning, and business teams.

Writing approach: each module uses direct explanations, real-world analogies, and copy-pasteable Python so the handbook stays practical for engineers building or operating real pipelines.

Module 1: Core Concepts & Architecture

Data engineering is mostly about moving data from where it is created to where it becomes useful, while keeping the process scalable, observable, and cheap enough to run every day. If software engineering is about building services, data engineering is about building reliable delivery systems for data.

Source systems
Ingestion
Storage layer
Transformation
Analytics / ML / Apps
Think in systems
The job is not just to write a script. The job is to design a repeatable system that still works when row counts grow from thousands to millions or billions.
Optimize for downstream use
A good pipeline is shaped by the consumer: dashboards, data scientists, reverse ETL jobs, or machine learning features all need different data contracts.
Scale changes design
Choices that look trivial on a laptop, such as CSV output or reading everything into memory, become operational failures in production.

ETL vs ELT

ETL means Extract, Transform, Load. Data is pulled from source systems, transformed before landing in the destination, and then loaded into a warehouse or database.

ELT means Extract, Load, Transform. Raw data is first loaded into a scalable storage system such as a cloud warehouse or data lake, and transformations happen afterward inside that platform.

PatternHow it worksBest fitMain tradeoff
ETL Transform before loading into the target system. Strict schemas, legacy warehouses, sensitive filtering before storage, smaller or controlled pipelines. Transformation layer becomes a bottleneck if compute does not scale well.
ELT Load raw data first, then transform inside the warehouse or lakehouse. Modern cloud analytics stacks such as Snowflake, BigQuery, Redshift, Databricks, and dbt-driven teams. You must govern raw data carefully because more data lands before cleanup.
Real-world analogy Factory vs warehouse

ETL is like cleaning, labeling, and boxing products at the factory before shipping them to the warehouse.

ELT is like sending everything to a massive distribution center first, then sorting and repackaging it closer to the buyers using much larger infrastructure.

When should a team use which?

Choose ETL when
You must remove sensitive fields before storage, the target system is not built for heavy transformation, or you need strong control over exactly what lands downstream. ETL still appears in regulated environments and tightly governed integrations.
Choose ELT when
You are using scalable cloud compute, want reproducible SQL or Spark transformations, need raw data retained for reprocessing, or want analytics engineering teams to transform data close to the warehouse. This is the common default in the modern data stack.
💡
Practical rule: if your destination can scale compute separately from storage, ELT is usually the better default. If raw data cannot be retained safely or the target is weak at transformation, ETL can still be the right answer.

Storage Architectures

Modern data platforms are built around three main storage patterns: data warehouses, data lakes, and data lakehouses. The differences are not just branding. They change how data is stored, queried, and governed.

ArchitectureDefinitionExamplesWhere it shines
Data Warehouse Managed analytical database optimized for structured data, SQL queries, and BI workloads. Snowflake, BigQuery, Redshift Fast analytics, dashboarding, governed business metrics, SQL-heavy teams
Data Lake Low-cost object storage that holds raw and processed data in many formats. AWS S3, Azure Data Lake Storage, Google Cloud Storage Cheap storage, flexible schemas, ML and archival workloads, raw ingestion
Data Lakehouse A lake plus table-format and governance features that make it behave more like a warehouse. Databricks with Delta Lake, Apache Iceberg, Apache Hudi One platform for BI, ML, and large-scale data engineering with open storage
Data Warehouse Structured analytics

What it is: a warehouse is the polished showroom. It holds curated tables that business users, analysts, and BI tools can trust.

Why teams use it: warehouses separate storage and compute well, scale SQL, and are built for concurrent analytics workloads.

Data Lake Cheap, flexible storage

What it is: a lake is the raw materials yard. It stores CSV, JSON, Parquet, images, logs, and unstructured files at low cost.

Why teams use it: you can land data quickly, keep historical raw files, and support many use cases without forcing everything into rigid schemas upfront.

Data Lakehouse Best of both when done well

What it is: a lakehouse keeps open object storage but adds transaction logs, schema enforcement, table versioning, and ACID-like behaviors.

Why teams use it: it reduces the gap between raw storage and analytics-grade tables, which is especially useful for mixed analytics and machine learning workloads.

Architecture trap: many teams treat S3 alone as a full analytics platform. It is not. A lake without table management, schema control, partitioning discipline, and query engines becomes a file graveyard.

File Formats and Why Columnar Storage Matters

File format choice is one of the highest-leverage decisions in analytics engineering. At small scale, CSV is convenient. At production scale, it is usually the wrong default.

Row-oriented formats
CSV and JSON store records row by row. They are easy to inspect, but they are bulky, slow to scan, and expensive for analytical queries that only need a few columns.
Column-oriented formats
Parquet and ORC store data by column. Query engines can read only the columns they need, compress repeated values well, and skip irrelevant chunks faster.
Real-world analogy Warehouse aisles

CSV is like storing every item for an order in one mixed box. To find all prices, you still have to open every box and inspect everything.

Parquet is like storing all prices on one shelf, all customer IDs on another shelf, and all timestamps on a third. If an analyst only needs prices and timestamps, the system reads only those shelves.

Why column-oriented formats are crucial

The snippet below writes a Pandas DataFrame to .parquet with Snappy compression. It also writes CSV for comparison, but the recommendation is clear: use Parquet for analytics and large-scale storage unless you have a specific interoperability reason not to.

from pathlib import Path

import pandas as pd


def save_orders_dataset() -> None:
    """Save the same dataset as CSV and Parquet to show the production-safe default."""
    output_dir = Path("./data_output")
    output_dir.mkdir(parents=True, exist_ok=True)

    orders_df = pd.DataFrame(
        {
            "order_id": [1001, 1002, 1003, 1004],
            "customer_id": [501, 502, 501, 503],
            "order_total": [125.50, 88.10, 42.99, 310.75],
            "currency": ["USD", "USD", "USD", "USD"],
            "order_ts": pd.to_datetime(
                [
                    "2026-03-25 10:15:00",
                    "2026-03-25 11:05:00",
                    "2026-03-26 08:43:00",
                    "2026-03-26 17:20:00",
                ]
            ),
        }
    )

    try:
        # CSV is easy to inspect, but usually inefficient for analytics workloads.
        orders_df.to_csv(output_dir / "orders.csv", index=False)

        # Parquet stores typed columns and compresses efficiently.
        # Snappy is a common production default because it balances speed and compression.
        orders_df.to_parquet(
            output_dir / "orders.parquet",
            index=False,
            compression="snappy",
            engine="pyarrow",
        )

        print("Saved CSV and Parquet versions to ./data_output")
    except ImportError as exc:
        print("Missing optional dependency. Install pyarrow to write Parquet files.")
        print(f"Details: {exc}")
    except OSError as exc:
        print("File write failed. Check directory permissions and available disk space.")
        print(f"Details: {exc}")


if __name__ == "__main__":
    save_orders_dataset()
Scalability note: when data grows to millions of rows, the difference between CSV and Parquet stops being academic. It directly affects storage cost, load times, cloud query spend, and whether downstream jobs finish on time.

Module 2: Data Ingestion (Extraction)

Extraction is where many pipelines quietly fail. A small demo script can make a single request or run one SQL query, but production extraction must handle timeouts, retries, pagination, authentication, and memory pressure. This is where a lot of "works on my laptop" code turns into brittle operations work.

Source API / DB
Reliable extraction
Raw landing zone
Transform layer

API Extraction

REST APIs are a common source for SaaS data such as CRM records, billing events, and support tickets. The mistake beginners make is assuming one requests.get() call is enough. In production, APIs fail transiently, rate limit, and occasionally return malformed payloads. Retries and defensive parsing are not optional.

Scalability concern: API extraction is often network-bound and vendor-limited. The bottleneck is usually not CPU. It is backoff strategy, pagination discipline, and checkpointing progress so a failed run can resume safely.
from __future__ import annotations

from typing import Any

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


def build_session() -> requests.Session:
    """Create a requests session with retry behavior for transient API failures."""
    retry_strategy = Retry(
        total=5,
        connect=5,
        read=5,
        backoff_factor=1.0,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET"],
        raise_on_status=False,
    )

    adapter = HTTPAdapter(max_retries=retry_strategy)
    session = requests.Session()
    session.mount("https://", adapter)
    session.mount("http://", adapter)
    session.headers.update(
        {
            "Accept": "application/json",
            "User-Agent": "data-engineering-handbook/1.0",
        }
    )
    return session


def fetch_all_pages(base_url: str, api_token: str) -> list[dict[str, Any]]:
    """Pull paginated JSON records from an API with retries and basic validation."""
    session = build_session()
    session.headers.update({"Authorization": f"Bearer {api_token}"})

    results: list[dict[str, Any]] = []
    next_url: str | None = base_url

    try:
        while next_url:
            response = session.get(next_url, timeout=30)
            response.raise_for_status()

            payload = response.json()
            if not isinstance(payload, dict):
                raise ValueError("Expected a JSON object at the top level.")

            records = payload.get("results", [])
            if not isinstance(records, list):
                raise ValueError("Expected 'results' to be a list.")

            results.extend(record for record in records if isinstance(record, dict))
            next_url = payload.get("next")

        return results
    except requests.exceptions.Timeout as exc:
        print("API request timed out. Increase timeout or inspect network conditions.")
        raise RuntimeError("API extraction timed out") from exc
    except requests.exceptions.HTTPError as exc:
        print(f"HTTP error while calling API: {exc}")
        raise RuntimeError("API extraction failed with HTTP error") from exc
    except ValueError as exc:
        print(f"Unexpected response shape: {exc}")
        raise RuntimeError("API extraction failed because payload was malformed") from exc
    finally:
        session.close()


if __name__ == "__main__":
    API_URL = "https://api.example.com/v1/orders"
    API_TOKEN = "replace-with-real-token"

    try:
        orders = fetch_all_pages(API_URL, API_TOKEN)
        print(f"Fetched {len(orders)} records from the API")
    except RuntimeError as exc:
        print(f"Extraction job failed: {exc}")

This pattern is production-friendly because it retries transient failures, validates the payload shape, and handles pagination without assuming the dataset fits in a single response.

Database Extraction

Database extraction becomes dangerous when engineers read entire tables into memory. That works for 50,000 rows and fails badly at 50 million rows. The fix is straightforward: stream or chunk reads so memory stays bounded.

from __future__ import annotations

from pathlib import Path

import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine


def build_engine() -> Engine:
    """Build a SQLAlchemy engine. Swap the URL for your real database."""
    connection_url = "postgresql+psycopg2://user:password@localhost:5432/analytics"
    return create_engine(connection_url, pool_pre_ping=True)


def extract_orders_in_chunks(output_dir: str = "./db_extract") -> None:
    """Read rows in chunks so the job does not exhaust memory on large tables."""
    query = text(
        """
        SELECT order_id, customer_id, order_total, order_status, created_at
        FROM fact_orders
        WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
        ORDER BY created_at
        """
    )

    Path(output_dir).mkdir(parents=True, exist_ok=True)
    engine = build_engine()

    try:
        with engine.connect() as connection:
            for chunk_number, chunk_df in enumerate(
                pd.read_sql_query(query, connection, chunksize=100_000),
                start=1,
            ):
                output_path = Path(output_dir) / f"orders_chunk_{chunk_number:03d}.parquet"

                # Writing each chunk immediately prevents one huge in-memory dataframe.
                chunk_df.to_parquet(output_path, index=False, compression="snappy")
                print(f"Wrote {len(chunk_df):,} rows to {output_path}")
    except Exception as exc:
        print("Database extraction failed. Check connectivity, credentials, and SQL syntax.")
        raise RuntimeError("Chunked database extraction failed") from exc
    finally:
        engine.dispose()


if __name__ == "__main__":
    extract_orders_in_chunks()
💡
Production rule: for large database pulls, extract incrementally if possible, partition by time or primary key, and write chunks out immediately. If a job dies halfway through, you want partial progress on disk instead of an empty result and a blown memory budget.

Module 3: Data Cleaning & Transformation (The Heavy Lifting)

This is where raw data becomes usable. Most data engineering effort is not about connecting to the source. It is about making inconsistent, incomplete, and messy data safe for analytics and downstream models.

Data Quality Issues

IssueWhat it looks likeWhy it matters
Nulls Missing revenue, missing timestamps, empty country values Breaks aggregations, model features, joins, and KPIs
Duplicates Repeated orders, repeated click events, duplicate customer rows Inflates counts, revenue, conversion metrics, and downstream trust issues
Schema drift Column types or names change unexpectedly Breaks parsers, silently corrupts pipelines, and causes hard-to-debug failures
Real-world analogy Receiving dock

Raw data is like truck deliveries arriving with missing labels, duplicate boxes, and slightly different packing slips every day. Cleaning is the receiving dock process that checks, standardizes, and routes goods before they hit inventory systems.

Cleaning with Pandas and Polars

For small to medium datasets, Pandas remains a strong default. Polars is increasingly popular because it is faster, more memory-efficient, and designed around a modern execution engine. The business logic is the same in both tools: deduplicate, normalize, fill missing values, and standardize timestamps.

from __future__ import annotations

import pandas as pd


def clean_with_pandas() -> pd.DataFrame:
    """Clean a messy dataset using Pandas with common production-safe steps."""
    raw_df = pd.DataFrame(
        {
            "customer_name": [" Alice ", "bob", "bob", " CHARLIE "],
            "city": [" New York ", "LONDON", "LONDON", " San Francisco "],
            "sales_amount": [100.0, None, None, 250.0],
            "order_date": ["2026-03-01", "2026/03/02", "2026/03/02", "2026-03-04"],
        }
    )

    try:
        cleaned_df = raw_df.drop_duplicates().copy()

        # Fill missing numeric values with the column mean so aggregations remain usable.
        sales_mean = cleaned_df["sales_amount"].mean()
        cleaned_df["sales_amount"] = cleaned_df["sales_amount"].fillna(sales_mean)

        # Standardize strings for safer joins and group-by behavior.
        for column in ["customer_name", "city"]:
            cleaned_df[column] = (
                cleaned_df[column]
                .astype("string")
                .str.strip()
                .str.lower()
            )

        # Coerce invalid values to NaT rather than crashing the whole job.
        cleaned_df["order_date"] = pd.to_datetime(cleaned_df["order_date"], errors="coerce")

        return cleaned_df
    except Exception as exc:
        print("Pandas cleaning step failed. Inspect source schema and values.")
        raise RuntimeError("Pandas cleaning failed") from exc


if __name__ == "__main__":
    print(clean_with_pandas())
from __future__ import annotations

import polars as pl


def clean_with_polars() -> pl.DataFrame:
    """Apply the same cleaning logic with Polars for better performance on larger local workloads."""
    raw_df = pl.DataFrame(
        {
            "customer_name": [" Alice ", "bob", "bob", " CHARLIE "],
            "city": [" New York ", "LONDON", "LONDON", " San Francisco "],
            "sales_amount": [100.0, None, None, 250.0],
            "order_date": ["2026-03-01", "2026/03/02", "2026/03/02", "2026-03-04"],
        }
    )

    try:
        mean_sales = raw_df.select(pl.col("sales_amount").mean()).item()

        cleaned_df = (
            raw_df
            .unique()
            .with_columns(
                [
                    pl.col("sales_amount").fill_null(mean_sales),
                    pl.col("customer_name").str.strip_chars().str.to_lowercase(),
                    pl.col("city").str.strip_chars().str.to_lowercase(),
                    pl.col("order_date").str.strptime(pl.Date, strict=False),
                ]
            )
        )

        return cleaned_df
    except Exception as exc:
        print("Polars cleaning step failed. Verify data types and date formats.")
        raise RuntimeError("Polars cleaning failed") from exc


if __name__ == "__main__":
    print(clean_with_polars())

Big Data Transformation with PySpark

Once datasets exceed a single machine's RAM, local Pandas workflows stop being reliable. PySpark spreads both storage and compute across a cluster, which is why it is common in lakehouse platforms such as Databricks and EMR-backed data lakes.

from __future__ import annotations

from pyspark.sql import SparkSession
from pyspark.sql import functions as F


def transform_large_dataset() -> None:
    """Run cleaning logic in Spark for datasets too large for one machine."""
    spark = (
        SparkSession.builder
        .appName("data-engineering-handbook-cleaning")
        .getOrCreate()
    )

    try:
        df = spark.createDataFrame(
            [
                (1, " Alice ", 100.0, "2026-03-01", "completed"),
                (2, "bob", None, "2026-03-02", "completed"),
                (3, "", 50.0, "2026-03-03", "cancelled"),
                (4, None, 300.0, "2026-03-04", "completed"),
            ],
            ["order_id", "customer_name", "sales_amount", "order_date", "order_status"],
        )

        cleaned_df = (
            df
            .withColumn("customer_name", F.trim(F.lower(F.col("customer_name"))))
            .withColumn("sales_amount", F.coalesce(F.col("sales_amount"), F.lit(0.0)))
            .withColumn("order_date", F.to_timestamp("order_date"))
            .filter(F.col("order_status") == "completed")
            .filter(F.col("customer_name").isNotNull() & (F.col("customer_name") != ""))
        )

        cleaned_df.show(truncate=False)
    except Exception as exc:
        print("PySpark transformation failed. Check Spark configuration and input schema.")
        raise RuntimeError("PySpark transformation failed") from exc
    finally:
        spark.stop()


if __name__ == "__main__":
    transform_large_dataset()
Scalability note: Spark does not just make code faster. It changes the execution model. You stop assuming a dataframe fits in local RAM and instead push transformations to distributed executors that read partitioned files from a lake or lakehouse.

Module 4: Data Orchestration & Scheduling

Once pipelines multiply, the problem is no longer just running code on a schedule. The real problem is coordinating dependencies, knowing what failed, retrying safely, and alerting the right people fast enough for the business to care.

The Need for Orchestration

Cron jobs fail at scale because they only answer one question: "when should this command run?" Production pipelines need answers to much harder questions: what must finish before this task starts, what happens if task 3 fails, which dataset version was produced, and how does the on-call engineer know before downstream dashboards go stale?

Cron problemWhy it hurtsWhat orchestrators add
No dependency graph Jobs can run out of order or before upstream data is ready DAGs with explicit task dependencies
Poor observability Failures hide in logs on random machines Central UI, logs, metadata, alerts
Weak retry control Transient failures cause missed SLAs or duplicate runs Task retries, backfills, idempotent scheduling
Real-world analogy Airport control tower

Cron is a timer that tells pilots when to try taking off.

An orchestrator is the control tower coordinating which plane can move, which runway is free, what happens when weather changes, and who gets alerted when a plane is delayed.

Apache Airflow

Airflow is one of the most common orchestrators in the Python ecosystem. It is built around DAGs, task dependencies, retries, schedules, and rich operational metadata. The TaskFlow API makes DAGs feel like plain Python while still giving you orchestration features.

from __future__ import annotations

from datetime import datetime, timedelta

from airflow.decorators import dag, task


@dag(
    dag_id="daily_orders_pipeline",
    start_date=datetime(2026, 3, 1),
    schedule="@daily",
    catchup=False,
    tags=["handbook", "data-engineering"],
)
def daily_orders_pipeline():
    """A simple ELT-style DAG using the TaskFlow API."""

    @task(retries=2, retry_delay=timedelta(minutes=1))
    def extract() -> list[dict[str, object]]:
        # In production this would pull from an API, database, or cloud storage.
        return [
            {"order_id": 1, "amount": 120.0, "status": "completed"},
            {"order_id": 2, "amount": 0.0, "status": "cancelled"},
            {"order_id": 3, "amount": 75.5, "status": "completed"},
        ]

    @task
    def transform(records: list[dict[str, object]]) -> list[dict[str, object]]:
        try:
            # Keep only business-valid rows for the downstream load step.
            return [
                record
                for record in records
                if record.get("status") == "completed" and float(record.get("amount", 0)) > 0
            ]
        except Exception as exc:
            raise RuntimeError("Transform task failed") from exc

    @task
    def load(records: list[dict[str, object]]) -> None:
        try:
            # Replace this with a Snowflake, BigQuery, or warehouse load in real code.
            print(f"Loading {len(records)} cleaned records into analytics storage")
        except Exception as exc:
            raise RuntimeError("Load task failed") from exc

    load(transform(extract()))


dag = daily_orders_pipeline()
💡
Production mindset: Airflow is most useful when tasks are idempotent, inputs are explicit, outputs are traceable, and retries do not create duplicate business records. Orchestration cannot fix unsafe task design.

Module 5: Real-World Scenarios & Use Cases

Concepts become easier to retain when you see how they combine in real architectures. The examples below show how ingestion, storage, transformation, and orchestration fit together in common production systems.

Scenario 1: E-commerce Daily Sales Pipeline

Goal: ingest daily transaction logs from AWS S3, clean the data in Databricks with PySpark, and load aggregated metrics into Snowflake so the BI team can power daily revenue dashboards.

App logs land in S3
Databricks reads Parquet or JSON
PySpark cleans and aggregates
Snowflake serves BI
LayerExample technologyResponsibility
Landing zone AWS S3 Store raw transaction files partitioned by date
Processing Databricks + PySpark Deduplicate, normalize currencies, filter invalid orders, aggregate daily sales
Serving Snowflake Expose curated fact tables and business metrics to dashboards
Orchestration Airflow Schedule the pipeline, retries, and alerting

Architecture cards

Ingress
S3 Raw Zone
Application transaction logs land in S3 partitions such as date=YYYY-MM-DD. Keeping raw files unchanged makes reprocessing possible when business logic changes or quality issues are discovered later.
Compute
Databricks + PySpark
Spark jobs deduplicate transactions, normalize currencies, fill missing fields, and generate curated silver or gold tables. This is where the heavy lifting happens when daily event volume is too large for local Python workflows.
Serving
Snowflake Metrics Layer
Aggregated revenue, refund, and customer metrics land in Snowflake so BI tools can query stable, governed tables without scanning raw event logs every morning.
Control Plane
Airflow Orchestration
Airflow coordinates the handoff between storage, Spark compute, and warehouse load steps. It also gives retries, SLA visibility, and alerts when the daily pipeline misses its reporting window.

Why this architecture works:

# Pseudo-flow for the daily sales pipeline

def daily_sales_pipeline(execution_date: str) -> None:
    raw_path = f"s3://company-raw/transactions/date={execution_date}/"

    # 1. Read raw data from the lake.
    raw_df = spark.read.parquet(raw_path)

    # 2. Apply business cleaning rules.
    cleaned_df = (
        raw_df
        .dropDuplicates(["transaction_id"])
        .filter("order_status = 'completed'")
        .fillna({"discount_amount": 0.0})
    )

    # 3. Aggregate for BI consumption.
    agg_df = cleaned_df.groupBy("order_date", "country").sum("net_revenue")

    # 4. Write curated results to Snowflake.
    write_dataframe_to_snowflake(agg_df, table_name="daily_sales_metrics")

Scenario 2: Real-Time Fraud Detection (Streaming)

Batch pipelines assume it is acceptable to wait. Fraud detection does not have that luxury. If you detect fraud six hours later, the money may already be gone. That is the paradigm shift from batch to streaming: process events continuously as they happen, not only in daily or hourly windows.

Batch vs streaming Mindset change

Batch asks: what happened during the last time window?

Streaming asks: what should we do about this event right now, before the next event arrives?

Architecture cards

Event Backbone
Kafka Topics
Payment events are published to Kafka in near real time. Kafka decouples producers from consumers so scoring services, audit pipelines, and monitoring tools can all react to the same event stream independently.
Decision Layer
Streaming Consumer
A Python consumer or stream-processing job enriches each payment with user context, velocity windows, and geographic checks before deciding whether the transaction should be approved, held, or escalated.
Risk Logic
Rules or Model Scoring
Simple threshold rules can catch obvious fraud patterns quickly, while ML scoring can rank more subtle risk signals. Mature systems often combine both because deterministic rules are easier to audit.
Feedback Loop
Case Review and Labels
Analyst review outcomes, chargeback signals, and confirmed fraud labels must flow back into the system so the rules and models improve over time instead of remaining static.

Apache Kafka acts as the event backbone. Producers publish payment events. Consumers read them continuously. A streaming application then enriches events, computes features such as transaction velocity or location mismatch, and decides whether to flag the event.

from __future__ import annotations

import json

from kafka import KafkaConsumer


def consume_fraud_events() -> None:
    """Illustrative streaming consumer that would feed fraud rules or a model."""
    consumer = KafkaConsumer(
        "payments",
        bootstrap_servers=["localhost:9092"],
        auto_offset_reset="earliest",
        enable_auto_commit=True,
        group_id="fraud-detectors",
        value_deserializer=lambda value: json.loads(value.decode("utf-8")),
    )

    try:
        for message in consumer:
            event = message.value

            # In production, replace this with model scoring or a feature store lookup.
            if event.get("amount", 0) > 5000 and event.get("country_mismatch", False):
                print(f"Potential fraud detected for transaction {event.get('transaction_id')}")
    except Exception as exc:
        print("Streaming consumer failed. Check Kafka connectivity and payload schema.")
        raise RuntimeError("Fraud streaming consumer failed") from exc
    finally:
        consumer.close()


if __name__ == "__main__":
    consume_fraud_events()
Streaming reality: Python is often used for consumers, rule engines, or inference services, but the bigger design question is event-time handling, late-arriving data, feature freshness, and exactly-once or at-least-once semantics. Streaming is less about syntax and more about operational guarantees.

How the pieces combine in a modern stack

A modern data platform is rarely one product. It is usually a layered system: sources feed a lake, transformations happen in Spark or warehouse SQL, curated data lands in a warehouse or lakehouse table format, orchestration coordinates jobs, and downstream consumers use BI, reverse ETL, ML features, or streaming decisions.

Reference Links

These are the primary references worth keeping close when you move from handbook examples into production implementation work.

Python Data Processing

Orchestration and Streaming

Modern Data Stack Platforms

Recommended habit: use this handbook for concepts and patterns, then validate implementation details against the current vendor or library docs before building a production pipeline. Platform behavior, SDKs, and configuration defaults change faster than conceptual architecture does.