Data Engineering Core Concepts & Python Handbook
A production-focused guide to modern data engineering fundamentals: data movement, storage design, scalable file formats, orchestration, and Python-first implementation patterns across the modern data stack.
Table of Contents
This handbook is organized around the core concerns of a production data engineer: how data moves, where it lives, how it is cleaned, how pipelines are scheduled, and how all of those parts combine in real systems used by analytics, machine learning, and business teams.
- Module 1: Core Concepts & Architecture
ETL vs ELT, storage architectures, and why columnar formats like Parquet matter at scale. - Module 2: Data Ingestion (Extraction)
Reliable API pulls with retries and memory-safe database extraction with SQLAlchemy chunking. - Module 3: Data Cleaning & Transformation
Data quality problems, practical cleaning with Pandas or Polars, and PySpark transformations for large datasets. - Module 4: Data Orchestration & Scheduling
Why cron breaks at scale and how Apache Airflow coordinates production pipelines. - Module 5: Real-World Scenarios & Use Cases
E-commerce batch pipelines, streaming fraud detection, and how the modern data stack fits together.
Module 1: Core Concepts & Architecture
Data engineering is mostly about moving data from where it is created to where it becomes useful, while keeping the process scalable, observable, and cheap enough to run every day. If software engineering is about building services, data engineering is about building reliable delivery systems for data.
ETL vs ELT
ETL means Extract, Transform, Load. Data is pulled from source systems, transformed before landing in the destination, and then loaded into a warehouse or database.
ELT means Extract, Load, Transform. Raw data is first loaded into a scalable storage system such as a cloud warehouse or data lake, and transformations happen afterward inside that platform.
| Pattern | How it works | Best fit | Main tradeoff |
|---|---|---|---|
| ETL | Transform before loading into the target system. | Strict schemas, legacy warehouses, sensitive filtering before storage, smaller or controlled pipelines. | Transformation layer becomes a bottleneck if compute does not scale well. |
| ELT | Load raw data first, then transform inside the warehouse or lakehouse. | Modern cloud analytics stacks such as Snowflake, BigQuery, Redshift, Databricks, and dbt-driven teams. | You must govern raw data carefully because more data lands before cleanup. |
ETL is like cleaning, labeling, and boxing products at the factory before shipping them to the warehouse.
ELT is like sending everything to a massive distribution center first, then sorting and repackaging it closer to the buyers using much larger infrastructure.
When should a team use which?
Storage Architectures
Modern data platforms are built around three main storage patterns: data warehouses, data lakes, and data lakehouses. The differences are not just branding. They change how data is stored, queried, and governed.
| Architecture | Definition | Examples | Where it shines |
|---|---|---|---|
| Data Warehouse | Managed analytical database optimized for structured data, SQL queries, and BI workloads. | Snowflake, BigQuery, Redshift | Fast analytics, dashboarding, governed business metrics, SQL-heavy teams |
| Data Lake | Low-cost object storage that holds raw and processed data in many formats. | AWS S3, Azure Data Lake Storage, Google Cloud Storage | Cheap storage, flexible schemas, ML and archival workloads, raw ingestion |
| Data Lakehouse | A lake plus table-format and governance features that make it behave more like a warehouse. | Databricks with Delta Lake, Apache Iceberg, Apache Hudi | One platform for BI, ML, and large-scale data engineering with open storage |
What it is: a warehouse is the polished showroom. It holds curated tables that business users, analysts, and BI tools can trust.
Why teams use it: warehouses separate storage and compute well, scale SQL, and are built for concurrent analytics workloads.
What it is: a lake is the raw materials yard. It stores CSV, JSON, Parquet, images, logs, and unstructured files at low cost.
Why teams use it: you can land data quickly, keep historical raw files, and support many use cases without forcing everything into rigid schemas upfront.
What it is: a lakehouse keeps open object storage but adds transaction logs, schema enforcement, table versioning, and ACID-like behaviors.
Why teams use it: it reduces the gap between raw storage and analytics-grade tables, which is especially useful for mixed analytics and machine learning workloads.
File Formats and Why Columnar Storage Matters
File format choice is one of the highest-leverage decisions in analytics engineering. At small scale, CSV is convenient. At production scale, it is usually the wrong default.
CSV is like storing every item for an order in one mixed box. To find all prices, you still have to open every box and inspect everything.
Parquet is like storing all prices on one shelf, all customer IDs on another shelf, and all timestamps on a third. If an analyst only needs prices and timestamps, the system reads only those shelves.
Why column-oriented formats are crucial
- Scan Analytical queries often select a small subset of columns from wide tables. Columnar storage avoids reading everything.
- Compression Repeated column values compress far better than mixed row text, which reduces storage cost and IO.
- Speed Query engines such as Spark, DuckDB, BigQuery external tables, and Snowflake stages work far more efficiently with Parquet.
- Schema Parquet stores typed metadata, which is much safer than inferring types from CSV strings on every read.
The snippet below writes a Pandas DataFrame to .parquet with Snappy compression. It also writes CSV for comparison, but the recommendation is clear: use Parquet for analytics and large-scale storage unless you have a specific interoperability reason not to.
from pathlib import Path
import pandas as pd
def save_orders_dataset() -> None:
"""Save the same dataset as CSV and Parquet to show the production-safe default."""
output_dir = Path("./data_output")
output_dir.mkdir(parents=True, exist_ok=True)
orders_df = pd.DataFrame(
{
"order_id": [1001, 1002, 1003, 1004],
"customer_id": [501, 502, 501, 503],
"order_total": [125.50, 88.10, 42.99, 310.75],
"currency": ["USD", "USD", "USD", "USD"],
"order_ts": pd.to_datetime(
[
"2026-03-25 10:15:00",
"2026-03-25 11:05:00",
"2026-03-26 08:43:00",
"2026-03-26 17:20:00",
]
),
}
)
try:
# CSV is easy to inspect, but usually inefficient for analytics workloads.
orders_df.to_csv(output_dir / "orders.csv", index=False)
# Parquet stores typed columns and compresses efficiently.
# Snappy is a common production default because it balances speed and compression.
orders_df.to_parquet(
output_dir / "orders.parquet",
index=False,
compression="snappy",
engine="pyarrow",
)
print("Saved CSV and Parquet versions to ./data_output")
except ImportError as exc:
print("Missing optional dependency. Install pyarrow to write Parquet files.")
print(f"Details: {exc}")
except OSError as exc:
print("File write failed. Check directory permissions and available disk space.")
print(f"Details: {exc}")
if __name__ == "__main__":
save_orders_dataset()
Module 2: Data Ingestion (Extraction)
Extraction is where many pipelines quietly fail. A small demo script can make a single request or run one SQL query, but production extraction must handle timeouts, retries, pagination, authentication, and memory pressure. This is where a lot of "works on my laptop" code turns into brittle operations work.
API Extraction
REST APIs are a common source for SaaS data such as CRM records, billing events, and support tickets. The mistake beginners make is assuming one requests.get() call is enough. In production, APIs fail transiently, rate limit, and occasionally return malformed payloads. Retries and defensive parsing are not optional.
from __future__ import annotations
from typing import Any
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def build_session() -> requests.Session:
"""Create a requests session with retry behavior for transient API failures."""
retry_strategy = Retry(
total=5,
connect=5,
read=5,
backoff_factor=1.0,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"],
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update(
{
"Accept": "application/json",
"User-Agent": "data-engineering-handbook/1.0",
}
)
return session
def fetch_all_pages(base_url: str, api_token: str) -> list[dict[str, Any]]:
"""Pull paginated JSON records from an API with retries and basic validation."""
session = build_session()
session.headers.update({"Authorization": f"Bearer {api_token}"})
results: list[dict[str, Any]] = []
next_url: str | None = base_url
try:
while next_url:
response = session.get(next_url, timeout=30)
response.raise_for_status()
payload = response.json()
if not isinstance(payload, dict):
raise ValueError("Expected a JSON object at the top level.")
records = payload.get("results", [])
if not isinstance(records, list):
raise ValueError("Expected 'results' to be a list.")
results.extend(record for record in records if isinstance(record, dict))
next_url = payload.get("next")
return results
except requests.exceptions.Timeout as exc:
print("API request timed out. Increase timeout or inspect network conditions.")
raise RuntimeError("API extraction timed out") from exc
except requests.exceptions.HTTPError as exc:
print(f"HTTP error while calling API: {exc}")
raise RuntimeError("API extraction failed with HTTP error") from exc
except ValueError as exc:
print(f"Unexpected response shape: {exc}")
raise RuntimeError("API extraction failed because payload was malformed") from exc
finally:
session.close()
if __name__ == "__main__":
API_URL = "https://api.example.com/v1/orders"
API_TOKEN = "replace-with-real-token"
try:
orders = fetch_all_pages(API_URL, API_TOKEN)
print(f"Fetched {len(orders)} records from the API")
except RuntimeError as exc:
print(f"Extraction job failed: {exc}")
This pattern is production-friendly because it retries transient failures, validates the payload shape, and handles pagination without assuming the dataset fits in a single response.
Database Extraction
Database extraction becomes dangerous when engineers read entire tables into memory. That works for 50,000 rows and fails badly at 50 million rows. The fix is straightforward: stream or chunk reads so memory stays bounded.
from __future__ import annotations
from pathlib import Path
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
def build_engine() -> Engine:
"""Build a SQLAlchemy engine. Swap the URL for your real database."""
connection_url = "postgresql+psycopg2://user:password@localhost:5432/analytics"
return create_engine(connection_url, pool_pre_ping=True)
def extract_orders_in_chunks(output_dir: str = "./db_extract") -> None:
"""Read rows in chunks so the job does not exhaust memory on large tables."""
query = text(
"""
SELECT order_id, customer_id, order_total, order_status, created_at
FROM fact_orders
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
ORDER BY created_at
"""
)
Path(output_dir).mkdir(parents=True, exist_ok=True)
engine = build_engine()
try:
with engine.connect() as connection:
for chunk_number, chunk_df in enumerate(
pd.read_sql_query(query, connection, chunksize=100_000),
start=1,
):
output_path = Path(output_dir) / f"orders_chunk_{chunk_number:03d}.parquet"
# Writing each chunk immediately prevents one huge in-memory dataframe.
chunk_df.to_parquet(output_path, index=False, compression="snappy")
print(f"Wrote {len(chunk_df):,} rows to {output_path}")
except Exception as exc:
print("Database extraction failed. Check connectivity, credentials, and SQL syntax.")
raise RuntimeError("Chunked database extraction failed") from exc
finally:
engine.dispose()
if __name__ == "__main__":
extract_orders_in_chunks()
Module 3: Data Cleaning & Transformation (The Heavy Lifting)
This is where raw data becomes usable. Most data engineering effort is not about connecting to the source. It is about making inconsistent, incomplete, and messy data safe for analytics and downstream models.
Data Quality Issues
| Issue | What it looks like | Why it matters |
|---|---|---|
| Nulls | Missing revenue, missing timestamps, empty country values | Breaks aggregations, model features, joins, and KPIs |
| Duplicates | Repeated orders, repeated click events, duplicate customer rows | Inflates counts, revenue, conversion metrics, and downstream trust issues |
| Schema drift | Column types or names change unexpectedly | Breaks parsers, silently corrupts pipelines, and causes hard-to-debug failures |
Raw data is like truck deliveries arriving with missing labels, duplicate boxes, and slightly different packing slips every day. Cleaning is the receiving dock process that checks, standardizes, and routes goods before they hit inventory systems.
Cleaning with Pandas and Polars
For small to medium datasets, Pandas remains a strong default. Polars is increasingly popular because it is faster, more memory-efficient, and designed around a modern execution engine. The business logic is the same in both tools: deduplicate, normalize, fill missing values, and standardize timestamps.
from __future__ import annotations
import pandas as pd
def clean_with_pandas() -> pd.DataFrame:
"""Clean a messy dataset using Pandas with common production-safe steps."""
raw_df = pd.DataFrame(
{
"customer_name": [" Alice ", "bob", "bob", " CHARLIE "],
"city": [" New York ", "LONDON", "LONDON", " San Francisco "],
"sales_amount": [100.0, None, None, 250.0],
"order_date": ["2026-03-01", "2026/03/02", "2026/03/02", "2026-03-04"],
}
)
try:
cleaned_df = raw_df.drop_duplicates().copy()
# Fill missing numeric values with the column mean so aggregations remain usable.
sales_mean = cleaned_df["sales_amount"].mean()
cleaned_df["sales_amount"] = cleaned_df["sales_amount"].fillna(sales_mean)
# Standardize strings for safer joins and group-by behavior.
for column in ["customer_name", "city"]:
cleaned_df[column] = (
cleaned_df[column]
.astype("string")
.str.strip()
.str.lower()
)
# Coerce invalid values to NaT rather than crashing the whole job.
cleaned_df["order_date"] = pd.to_datetime(cleaned_df["order_date"], errors="coerce")
return cleaned_df
except Exception as exc:
print("Pandas cleaning step failed. Inspect source schema and values.")
raise RuntimeError("Pandas cleaning failed") from exc
if __name__ == "__main__":
print(clean_with_pandas())
from __future__ import annotations
import polars as pl
def clean_with_polars() -> pl.DataFrame:
"""Apply the same cleaning logic with Polars for better performance on larger local workloads."""
raw_df = pl.DataFrame(
{
"customer_name": [" Alice ", "bob", "bob", " CHARLIE "],
"city": [" New York ", "LONDON", "LONDON", " San Francisco "],
"sales_amount": [100.0, None, None, 250.0],
"order_date": ["2026-03-01", "2026/03/02", "2026/03/02", "2026-03-04"],
}
)
try:
mean_sales = raw_df.select(pl.col("sales_amount").mean()).item()
cleaned_df = (
raw_df
.unique()
.with_columns(
[
pl.col("sales_amount").fill_null(mean_sales),
pl.col("customer_name").str.strip_chars().str.to_lowercase(),
pl.col("city").str.strip_chars().str.to_lowercase(),
pl.col("order_date").str.strptime(pl.Date, strict=False),
]
)
)
return cleaned_df
except Exception as exc:
print("Polars cleaning step failed. Verify data types and date formats.")
raise RuntimeError("Polars cleaning failed") from exc
if __name__ == "__main__":
print(clean_with_polars())
Big Data Transformation with PySpark
Once datasets exceed a single machine's RAM, local Pandas workflows stop being reliable. PySpark spreads both storage and compute across a cluster, which is why it is common in lakehouse platforms such as Databricks and EMR-backed data lakes.
from __future__ import annotations
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
def transform_large_dataset() -> None:
"""Run cleaning logic in Spark for datasets too large for one machine."""
spark = (
SparkSession.builder
.appName("data-engineering-handbook-cleaning")
.getOrCreate()
)
try:
df = spark.createDataFrame(
[
(1, " Alice ", 100.0, "2026-03-01", "completed"),
(2, "bob", None, "2026-03-02", "completed"),
(3, "", 50.0, "2026-03-03", "cancelled"),
(4, None, 300.0, "2026-03-04", "completed"),
],
["order_id", "customer_name", "sales_amount", "order_date", "order_status"],
)
cleaned_df = (
df
.withColumn("customer_name", F.trim(F.lower(F.col("customer_name"))))
.withColumn("sales_amount", F.coalesce(F.col("sales_amount"), F.lit(0.0)))
.withColumn("order_date", F.to_timestamp("order_date"))
.filter(F.col("order_status") == "completed")
.filter(F.col("customer_name").isNotNull() & (F.col("customer_name") != ""))
)
cleaned_df.show(truncate=False)
except Exception as exc:
print("PySpark transformation failed. Check Spark configuration and input schema.")
raise RuntimeError("PySpark transformation failed") from exc
finally:
spark.stop()
if __name__ == "__main__":
transform_large_dataset()
Module 4: Data Orchestration & Scheduling
Once pipelines multiply, the problem is no longer just running code on a schedule. The real problem is coordinating dependencies, knowing what failed, retrying safely, and alerting the right people fast enough for the business to care.
The Need for Orchestration
Cron jobs fail at scale because they only answer one question: "when should this command run?" Production pipelines need answers to much harder questions: what must finish before this task starts, what happens if task 3 fails, which dataset version was produced, and how does the on-call engineer know before downstream dashboards go stale?
| Cron problem | Why it hurts | What orchestrators add |
|---|---|---|
| No dependency graph | Jobs can run out of order or before upstream data is ready | DAGs with explicit task dependencies |
| Poor observability | Failures hide in logs on random machines | Central UI, logs, metadata, alerts |
| Weak retry control | Transient failures cause missed SLAs or duplicate runs | Task retries, backfills, idempotent scheduling |
Cron is a timer that tells pilots when to try taking off.
An orchestrator is the control tower coordinating which plane can move, which runway is free, what happens when weather changes, and who gets alerted when a plane is delayed.
Apache Airflow
Airflow is one of the most common orchestrators in the Python ecosystem. It is built around DAGs, task dependencies, retries, schedules, and rich operational metadata. The TaskFlow API makes DAGs feel like plain Python while still giving you orchestration features.
from __future__ import annotations
from datetime import datetime, timedelta
from airflow.decorators import dag, task
@dag(
dag_id="daily_orders_pipeline",
start_date=datetime(2026, 3, 1),
schedule="@daily",
catchup=False,
tags=["handbook", "data-engineering"],
)
def daily_orders_pipeline():
"""A simple ELT-style DAG using the TaskFlow API."""
@task(retries=2, retry_delay=timedelta(minutes=1))
def extract() -> list[dict[str, object]]:
# In production this would pull from an API, database, or cloud storage.
return [
{"order_id": 1, "amount": 120.0, "status": "completed"},
{"order_id": 2, "amount": 0.0, "status": "cancelled"},
{"order_id": 3, "amount": 75.5, "status": "completed"},
]
@task
def transform(records: list[dict[str, object]]) -> list[dict[str, object]]:
try:
# Keep only business-valid rows for the downstream load step.
return [
record
for record in records
if record.get("status") == "completed" and float(record.get("amount", 0)) > 0
]
except Exception as exc:
raise RuntimeError("Transform task failed") from exc
@task
def load(records: list[dict[str, object]]) -> None:
try:
# Replace this with a Snowflake, BigQuery, or warehouse load in real code.
print(f"Loading {len(records)} cleaned records into analytics storage")
except Exception as exc:
raise RuntimeError("Load task failed") from exc
load(transform(extract()))
dag = daily_orders_pipeline()
Module 5: Real-World Scenarios & Use Cases
Concepts become easier to retain when you see how they combine in real architectures. The examples below show how ingestion, storage, transformation, and orchestration fit together in common production systems.
Scenario 1: E-commerce Daily Sales Pipeline
Goal: ingest daily transaction logs from AWS S3, clean the data in Databricks with PySpark, and load aggregated metrics into Snowflake so the BI team can power daily revenue dashboards.
| Layer | Example technology | Responsibility |
|---|---|---|
| Landing zone | AWS S3 | Store raw transaction files partitioned by date |
| Processing | Databricks + PySpark | Deduplicate, normalize currencies, filter invalid orders, aggregate daily sales |
| Serving | Snowflake | Expose curated fact tables and business metrics to dashboards |
| Orchestration | Airflow | Schedule the pipeline, retries, and alerting |
Architecture cards
date=YYYY-MM-DD. Keeping raw files unchanged makes reprocessing possible when business logic changes or quality issues are discovered later.Why this architecture works:
- S3 Cheap raw storage preserves full history for reprocessing.
- Spark Handles large daily files and wide transformations without relying on one machine.
- Snowflake Gives the BI team fast SQL performance and governed metrics.
- ELT Keeps raw data available so transformation logic can evolve without re-extracting from apps.
# Pseudo-flow for the daily sales pipeline
def daily_sales_pipeline(execution_date: str) -> None:
raw_path = f"s3://company-raw/transactions/date={execution_date}/"
# 1. Read raw data from the lake.
raw_df = spark.read.parquet(raw_path)
# 2. Apply business cleaning rules.
cleaned_df = (
raw_df
.dropDuplicates(["transaction_id"])
.filter("order_status = 'completed'")
.fillna({"discount_amount": 0.0})
)
# 3. Aggregate for BI consumption.
agg_df = cleaned_df.groupBy("order_date", "country").sum("net_revenue")
# 4. Write curated results to Snowflake.
write_dataframe_to_snowflake(agg_df, table_name="daily_sales_metrics")
Scenario 2: Real-Time Fraud Detection (Streaming)
Batch pipelines assume it is acceptable to wait. Fraud detection does not have that luxury. If you detect fraud six hours later, the money may already be gone. That is the paradigm shift from batch to streaming: process events continuously as they happen, not only in daily or hourly windows.
Batch asks: what happened during the last time window?
Streaming asks: what should we do about this event right now, before the next event arrives?
Architecture cards
Apache Kafka acts as the event backbone. Producers publish payment events. Consumers read them continuously. A streaming application then enriches events, computes features such as transaction velocity or location mismatch, and decides whether to flag the event.
from __future__ import annotations
import json
from kafka import KafkaConsumer
def consume_fraud_events() -> None:
"""Illustrative streaming consumer that would feed fraud rules or a model."""
consumer = KafkaConsumer(
"payments",
bootstrap_servers=["localhost:9092"],
auto_offset_reset="earliest",
enable_auto_commit=True,
group_id="fraud-detectors",
value_deserializer=lambda value: json.loads(value.decode("utf-8")),
)
try:
for message in consumer:
event = message.value
# In production, replace this with model scoring or a feature store lookup.
if event.get("amount", 0) > 5000 and event.get("country_mismatch", False):
print(f"Potential fraud detected for transaction {event.get('transaction_id')}")
except Exception as exc:
print("Streaming consumer failed. Check Kafka connectivity and payload schema.")
raise RuntimeError("Fraud streaming consumer failed") from exc
finally:
consumer.close()
if __name__ == "__main__":
consume_fraud_events()
How the pieces combine in a modern stack
A modern data platform is rarely one product. It is usually a layered system: sources feed a lake, transformations happen in Spark or warehouse SQL, curated data lands in a warehouse or lakehouse table format, orchestration coordinates jobs, and downstream consumers use BI, reverse ETL, ML features, or streaming decisions.
- Storage: S3 or cloud object storage for raw data, Snowflake or BigQuery for governed analytics, and lakehouse formats for open-table workflows.
- Transformation: Pandas or Polars for small-scale work, PySpark for distributed processing, and SQL or dbt for modeled analytics layers.
- Orchestration: Airflow for dependencies, retries, backfills, and visibility.
- Serving: BI dashboards, ML feature stores, reverse ETL, or streaming applications depending on the consumer.
Reference Links
These are the primary references worth keeping close when you move from handbook examples into production implementation work.
Python Data Processing
- Pandas pandas.pydata.org docs
- Polars docs.pola.rs
- PySpark Spark Python API docs
- SQLAlchemy docs.sqlalchemy.org
- Requests requests.readthedocs.io
Orchestration and Streaming
- Airflow airflow.apache.org docs
- Kafka kafka.apache.org documentation
- Confluent developer.confluent.io
Modern Data Stack Platforms
- Snowflake docs.snowflake.com
- BigQuery Google BigQuery docs
- Databricks docs.databricks.com
- Delta Lake docs.delta.io
- Apache Iceberg iceberg.apache.org docs