Design a Large-Scale Data Pipeline (ETL / Batch + Streaming)
Move and transform petabytes from sources into a warehouse/lake for analytics. DAG orchestration, Spark shuffles, lake vs warehouse, and idempotent, replayable jobs.
The problem
Netflix processes roughly 700 billion events per day. Airbnb's data team runs thousands of Spark jobs every night to produce the dashboards that show hosts how their listings are performing. Meta's internal data warehouse serves petabytes of analytics queries from thousands of engineers. The thing all of them rely on — and the thing you're designing here — is a data pipeline: the infrastructure that takes raw events from dozens of source systems, transforms that data into clean, queryable tables, and delivers it to analysts and ML engineers with a freshness guarantee.
At its core, an ETL (Extract, Transform, Load) pipeline is a continuous assembly line for data. Raw events — user clicks, database row changes, API responses, IoT sensor readings — arrive from many sources in many formats and need to be cleaned, joined, aggregated, and loaded into a fast columnar store where a SQL query can answer "how much revenue did we make in North America last Tuesday?" in under ten seconds. The pipeline must do this reliably, on a schedule, at petabyte scale.
The building blocks — Kafka, object storage, Spark, a DAG orchestrator — are now mostly commodity. The hard part is in the composition. Two core tensions drive almost every design decision in this space. First, idempotency under failure: when a Spark job crashes halfway through processing a 500 GB partition, you need to be able to re-run it without double-counting a single row — and you need that guarantee to hold across backfills spanning a year of history. Second, scale and freshness at the same time: batch jobs are cheap and reliable but deliver data hours late; streaming delivers data in minutes but is expensive and operationally complex. Getting both simultaneously, against a shared dataset, without duplicating transform logic, is the central engineering challenge.
Designing this well means understanding where distributed systems theory and data engineering practice collide — specifically: what makes a shuffle expensive, why partition overwrite semantics save you from a class of correctness bugs, and how an open table format like Apache Iceberg lets a streaming micro-batch and an overnight Spark job coexist on the same table without corrupting each other.
Functional requirements
- Batch ingestion: pull from relational databases (JDBC), object-store files (S3/GCS), event logs, third-party APIs on a schedule.
- Stream ingestion: continuous ingest from a distributed message queue (Kafka) at millions of events/s.
- Transformation: clean, filter, join, and aggregate raw data into business-level tables (facts, dimensions).
- Orchestration: define job dependencies, handle retries, alert on failure, support backfills.
- Serving: land results in a columnar data warehouse (BigQuery, Snowflake, Redshift) or a lakehouse (Iceberg, Delta Lake).
- Data quality: schema validation, null checks, row-count assertions before publishing downstream.
Non-functional requirements
- Idempotency: re-running a job for any past partition produces the same result and does not double-count.
- Fault tolerance: a worker crash or bad machine mid-shuffle must not corrupt output.
- Replayability: landing the same raw data twice to object storage must be safe (deduplication downstream).
- Freshness SLAs: overnight batch jobs complete by 07:00; streaming lag < 60 s end-to-end.
- Cost efficiency: compute is expensive; jobs should read only the partitions they need (partition pruning).
Capacity estimation
| Dimension | Estimate | How we got there |
|---|---|---|
| Raw ingest rate (sustained) | ~58 MB/s | 5 TB/day ÷ 86,400 s = 5 × 10¹² B ÷ 86,400 s ≈ 58 MB/s |
| Raw ingest rate (peak) | ~175 MB/s | Peak is ~3× sustained — midnight batch dumps from source DBs |
| Batch read volume per daily run | ~1.7 TB | 5 TB/day × (1 ÷ 3) compression ratio → 1.7 TB of compressed reads |
| Spark read throughput | ~10.5 s to read 1 TB | 200 MB/s per executor × 500-executor cluster → 100,000 MB/s; 1 TB ÷ 100,000 MB/s ≈ 10.5 s |
| End-to-end batch job time | 30–90 min | Realistic with multi-join shuffles on top of the raw read |
| Kafka ingest rate (streaming, uncompressed) | 2 GB/s | 2M events/s × 1 KB avg event size |
| Kafka ingest rate (on-wire, compressed) | ~200 MB/s | 2 GB/s ÷ 10 (10:1 Kafka compression) |
| Streaming micro-batch window | 10–30 s | Flink or Spark Structured Streaming cadence |
| Warehouse total size | 2 PB | Columnar, compressed 5–10× vs raw |
| Concurrent analysts | 500 | Dashboard queries scan 1–50 GB of columnar data → 2–10 s |
| Warehouse idle cost | $0 target | Serverless BigQuery / Snowflake — compute auto-scales to zero |
| Batch tasks per day | ~10,000 | ~500 DAGs × avg 20 tasks/DAG |
| Concurrent Spark jobs (peak) | ~120 | ~500 DAGs × 20–30% overlap at peak window; cluster sized accordingly |
Takeaway: Storage is cheap; compute and network are not. Every major design decision in this pipeline — partition pruning, columnar formats, compression, micro-batching — exists to minimize how much data moves across the wire or through a shuffle.
ETL vs ELT — and why ELT won
Classic ETL (Extract → Transform → Load) transforms data before it enters the destination. You run a Java/Python job, reshape the data, and insert clean rows into the warehouse. This made sense when warehouse compute was expensive and scarce.
ELT (Extract → Load → Transform) inverts the order: load raw data first, then transform it inside the warehouse using SQL or a compute engine that sits next to the storage. ELT is now the dominant pattern for three reasons. First, your raw data is already there, so if a bug slips into the transform logic you re-run it against the original — classic ETL often throws raw data away. Second, warehouse compute has scaled dramatically: BigQuery and Snowflake separate compute from storage and auto-scale, making a CREATE TABLE AS SELECT over 10 TB fast and cheap. Third, SQL is expressive and version-controlled; dbt (data build tool) lets teams write, test, and track transform SQL as a DAG of models with column-level lineage.
The data lake exists to hold the raw immutable layer regardless of which approach you use. It is the insurance policy: even if your warehouse is corrupted or your transform logic had a bug two months ago, you can replay from raw.
flowchart LR
SRC["Source DB"] --> ETLT["ETL: transform<br/>before loading"]
ETLT --> ETLWH[("Warehouse<br/>(no raw copy)")]
SRC --> ELTL["ELT: load raw first"]
ELTL --> LAKE[("Data Lake<br/>raw, immutable")]
LAKE --> ELTT["Transform inside<br/>warehouse / Spark"]
ELTT --> ELTWH[("Warehouse<br/>+ time travel")]
style ETLT fill:#ffaa00,color:#0a0a0f
style ELTL fill:#ff6b1a,color:#0a0a0f
style LAKE fill:#0e7490,color:#fff
style ELTT fill:#15803d,color:#fff
style ELTWH fill:#a855f7,color:#fff
Building up to the design
V1: Cron + Python script + Postgres
A developer writes a Python script that reads from a MySQL production DB, computes a daily summary, and inserts rows into an analytics Postgres. A cron job runs it at 01:00 every night.
This gets you an end-to-end working product, and it's the right starting point for a single team. But the script is not idempotent — re-running it doubles every row. It also does a full-table scan on the production DB, a known way to topple a MySQL primary at 3 AM.
V2: Add idempotency + read replicas
Make the script idempotent: use INSERT ... ON CONFLICT DO UPDATE or DELETE WHERE dt = today; INSERT. Read from a replica, not the primary. Add alerting when the script fails.
Safe retries buy you a lot. What you still can't do is run fast enough. When you have 50 tables and each join takes 10 minutes, the 07:00 SLA becomes impossible. You also have no dependency graph — if Table B depends on Table A finishing, you schedule them with a hard-coded 30-minute sleep, which is fragile and slow.
V3: Orchestrated DAGs
Replace cron with an orchestrator (Apache Airflow, Prefect, Dagster). A DAG (Directed Acyclic Graph) describes task dependencies explicitly:
raw_events_extract → staging_events → fact_events
↗
raw_users_extract → staging_users
The orchestrator runs tasks in parallel where dependencies allow, retries failed tasks with back-off, sends alerts, and — critically — supports backfill: re-run the DAG for any historical date range by changing the execution_date parameter.
Now you have parallelism, retries, observability, and backfill. The new limit is a single Python process: joins over hundreds of gigabytes of data can't run on one machine.
V4: Distributed processing (Spark)
Replace the single-machine transform with a Spark job submitted by the orchestrator. Spark distributes the compute across a cluster, processes each partition in parallel, and handles the network shuffle internally. A 1 TB join that took hours now takes minutes.
The gap that opens up: all transformed data lives in the warehouse. If you want to re-process from scratch, you need to re-extract from the source — which may no longer have that history. You also have no cheap place to land raw events at scale.
V5: Data lake + lakehouse
Add a data lake: raw events written to object storage (S3/GCS) immediately on arrival, untouched. The lake is the single source of truth. Spark reads from the lake; the warehouse is a derived, queryable view. Add a lakehouse open table format (Iceberg or Delta Lake) on top of the lake for the refined/modeled layer: you get ACID commits, schema evolution, and time travel without leaving object storage.
V6: Streaming path
Add a Kafka → stream processor → warehouse path for near-real-time metrics. The batch and stream paths land into the same Iceberg tables. Analysts see data that is minutes old, not hours old, without any code duplication in the transform logic.
flowchart LR
V1["V1: cron script<br/>single table, not idempotent"] --> V2["V2: + idempotency<br/>safe retries"]
V2 --> V3["V3: + DAG orchestrator<br/>parallelism, backfill"]
V3 --> V4["V4: + Spark<br/>TB-scale transforms"]
V4 --> V5["V5: + data lake / lakehouse<br/>immutable raw, ACID refined"]
V5 --> V6["V6: + Kafka streaming<br/>minutes-fresh + batch unified"]
style V1 fill:#0e7490,color:#fff
style V3 fill:#15803d,color:#fff
style V4 fill:#ff6b1a,color:#0a0a0f
style V6 fill:#a855f7,color:#fff
High-level architecture
flowchart TD
subgraph SOURCES["Source Systems"]
DB["OLTP DBs<br/>(MySQL, Postgres)"]
API["APIs / SaaS"]
EV["Event streams"]
end
subgraph INGEST["Ingestion"]
CDC["CDC / Debezium"]
FIVETRAN["Batch connectors<br/>(Fivetran, custom)"]
KAFKAB["Kafka brokers"]
end
subgraph LAKE["Data Lake (object storage)"]
RAW["raw/<br/>immutable, partitioned by dt"]
STAGING["staging/<br/>cleaned, typed"]
end
subgraph PROC["Processing"]
ORCH["Orchestrator<br/>(Airflow / Dagster)"]
SPARK["Spark cluster"]
end
subgraph SERVE["Serving Layer"]
ICE[("Lakehouse<br/>Iceberg / Delta<br/>ACID + time travel")]
WH[("Data Warehouse<br/>BigQuery / Snowflake")]
end
subgraph STREAM["Streaming Path"]
FLINK["Flink / Spark<br/>Structured Streaming"]
end
DB --> CDC --> KAFKAB
DB --> FIVETRAN --> RAW
API --> FIVETRAN
EV --> KAFKAB --> RAW
KAFKAB --> FLINK --> ICE
RAW --> ORCH --> SPARK --> STAGING --> SPARK --> ICE
ICE --> WH
style CDC fill:#ff6b1a,color:#0a0a0f
style ORCH fill:#ffaa00,color:#0a0a0f
style SPARK fill:#15803d,color:#fff
style ICE fill:#0e7490,color:#fff
style WH fill:#a855f7,color:#fff
style FLINK fill:#ff2e88,color:#fff
Ingestion layer deep-dive
Batch ingestion
For relational databases, two patterns:
- Full snapshot:
SELECT * FROM orders WHERE dt = :execution_date. Simple but reads the entire table each run. Fine for small tables (< 10M rows); breaks for large ones. - CDC (Change Data Capture): stream the database's replication log (binlog for MySQL, WAL for Postgres) via a connector like Debezium into Kafka. Every
INSERT/UPDATE/DELETEbecomes an event. The data lake receives a stream of changes, not a snapshot.
CDC is strictly better for large tables — the source DB sees only the log read overhead, not a full-scan query — but adds operational complexity (connector management, schema changes in the log format).
Streaming ingestion
Events (user clicks, app logs, IoT telemetry) go to Kafka. A message queue with enough partitions and retention (default 7 days; commonly configured to 30 days for replay safety) acts as the buffer between producers and consumers. Consumers write to object storage in micro-batches (Kafka Connect S3 Sink, or a Flink job). The raw files on object storage are the durable, replayable copy.
One constraint is non-negotiable here: write raw files in append-only batches and never update a raw file. Idempotency is achieved by keying partitions to the Kafka offset range or a time window — writing the same window twice produces the same output files.
DAG orchestration and idempotency
An orchestrator (Apache Airflow is the canonical open-source choice) executes a DAG of tasks. Each task corresponds to one logical step: extract, validate, transform, load.
What makes a task idempotent?
A task is idempotent if running it N times produces exactly the same result as running it once. For data jobs, the standard pattern is partition-overwrite:
-- Instead of: INSERT INTO fact_orders SELECT ... WHERE dt = '{{ ds }}'
-- Do:
DELETE FROM fact_orders WHERE dt = '{{ ds }}';
INSERT INTO fact_orders SELECT ... WHERE dt = '{{ ds }}';
Or with Spark, use SaveMode.Overwrite scoped to the partition:
df.write \
.mode("overwrite") \
.partitionBy("dt") \
.option("partitionOverwriteMode", "dynamic") \
.parquet("s3://lake/fact_orders/")
"Dynamic partition overwrite" replaces only the partitions present in the current DataFrame, leaving all other partitions untouched. This is the correct default for all batch writes.
Backfills
When a new table is introduced or a transform bug is fixed, you need to re-run historical partitions. A good orchestrator handles backfill by generating one DAG run per historical date range and executing them in parallel (up to a configured concurrency limit). The jobs are idempotent, so this is safe.
Backfills are expensive, though. Re-reading and re-transforming 365 days of data can cost as much as the entire warehouse's annual compute budget. Keep raw data cheap (object storage), make transforms efficient (partition pruning), and test new logic against a sample partition before committing to the full run.
DAG dependency graph
flowchart TD
A["Extract: raw_events<br/>dt = {{ ds }}"] --> B["Validate: row count > 0"]
C["Extract: raw_users<br/>dt = {{ ds }}"] --> D["Validate: schema check"]
B --> E["Transform: staging_events"]
D --> E
E --> F["Transform: fact_session_summary"]
F --> G["Load: publish to warehouse"]
G --> H["Alert: SLA check<br/>must complete by 07:00"]
style A fill:#0e7490,color:#fff
style E fill:#15803d,color:#fff
style F fill:#ff6b1a,color:#0a0a0f
style H fill:#a855f7,color:#fff
Distributed processing: Spark and the shuffle
MapReduce (2004) proved that you could transform petabytes with commodity hardware by separating the computation into a Map phase (apply a function per record) and a Reduce phase (aggregate by key). Every Map→Reduce boundary required writing all intermediate data to HDFS — slow disk I/O that made chained operations painfully expensive.
Apache Spark (started at UC Berkeley AMPLab in 2009, open-sourced in 2010, RDD paper published at NSDI 2012) improved this with a lazy execution DAG: Spark parses the entire transformation program (reads, filters, joins, aggregations) into a logical plan, optimizes it (predicate pushdown, broadcast hash joins for small tables), and only when an action is triggered does it run — keeping intermediate data in memory across stages where possible.
The shuffle
The shuffle is the defining bottleneck of distributed data processing. Whenever Spark needs to co-locate all rows with the same key on the same executor (for a GROUP BY, JOIN, or reduceByKey), every executor sends its data for each key to the executor responsible for that key — all-to-all network transfer.
sequenceDiagram
participant E1 as Executor 1
participant E2 as Executor 2
participant E3 as Executor 3
Note over E1,E3: Map stage: each executor reads its partition
E1->>E1: reads rows, hashes key → bucket
E2->>E2: reads rows, hashes key → bucket
E3->>E3: reads rows, hashes key → bucket
Note over E1,E3: Shuffle: send each bucket to the right executor
E1->>E2: rows where hash(key) mod 3 == 1
E1->>E3: rows where hash(key) mod 3 == 2
E2->>E1: rows where hash(key) mod 3 == 0
E2->>E3: rows where hash(key) mod 3 == 2
E3->>E1: rows where hash(key) mod 3 == 0
E3->>E2: rows where hash(key) mod 3 == 1
Note over E1,E3: Reduce stage: each executor now has all rows for its keys
The shuffle is expensive for three compounding reasons: it serializes, network-transfers, and deserializes potentially all the data; if any one executor is slow (a straggler), the entire stage waits; and shuffle data is spilled to local disk when it doesn't fit in memory.
The rule of thumb is: minimize shuffles. Spark's query optimizer (Catalyst) does this automatically for simple SQL, but complex multi-join pipelines benefit from manual partition colocation and broadcast joins for small-side tables. Spark's default auto-broadcast threshold (spark.sql.autoBroadcastJoinThreshold) is 10 MB; raise it explicitly with a hint or config for tables up to a few hundred MB when memory allows.
Data skew and stragglers
Skew happens when one key has disproportionately many rows — a user_id of NULL for all unauthenticated events, or a single viral product that drives 40% of orders. After the shuffle, one executor gets all the skewed rows and becomes the straggler that holds up the entire stage.
Three ways to handle it:
- Salting: add a random integer suffix (0–N) to the skewed key before the join/group. Split the join into N sub-joins and union the results. Distributes the skewed key across N partitions.
# Skewed join: orders JOIN products ON product_id # If product_id 'ABC' has 90% of rows: orders = orders.withColumn("salt", (rand() * 20).cast("int")) orders = orders.withColumn("salted_key", concat(col("product_id"), lit("_"), col("salt"))) products_replicated = products.crossJoin(spark.range(20).withColumnRenamed("id","salt")) \ .withColumn("salted_key", concat(col("product_id"), lit("_"), col("salt"))) result = orders.join(products_replicated, "salted_key") - Broadcast join: if one side of the join is small (up to ~200 MB), force Spark to broadcast it to every executor using the
broadcast()hint — this bypasses the default 10 MB auto-threshold. No shuffle needed on the large side. - Isolate the NULL key separately: filter
WHERE key IS NOT NULL, process the non-null rows normally, then handle the nulls as a special case.
Storage targets: lake, warehouse, and lakehouse
Data lake
A data lake is object storage (S3, GCS, Azure Blob) organized by logical paths and partitioned by date:
s3://company-data-lake/
raw/
events/dt=2026-06-01/hour=00/part-00000.snappy.parquet
events/dt=2026-06-01/hour=01/part-00001.snappy.parquet
staging/
events_clean/dt=2026-06-01/...
Raw files are never modified — corrections land in a new partition. There is no schema enforced at write time; the reader infers it at query time. And the cost is hard to beat: object storage runs ~$0.023/GB/month (S3 Standard), versus ~$20–200/GB/month for SSD-backed databases.
The lake's weakness: without a table format, there is no ACID — two writers can corrupt each other; listing files is expensive at scale (the "small-files problem"); and there is no efficient row-level delete.
Data warehouse
A data warehouse (BigQuery, Snowflake, Amazon Redshift) is a managed, columnar, schema-enforced analytics store. All data is validated against a schema at load time. Each column is stored together, so analytical queries that touch 5 of 100 columns read only 5% of the data. A 10 TB table scan runs in seconds across thousands of cores, and the whole thing is fully transactional — a failed load leaves the table unchanged.
The warehouse's weakness: it is a closed system. Data lives inside the vendor's storage format. You pay for both storage and compute through the vendor, and you cannot use Spark directly against the data without exporting it first (unless using connectors).
Lakehouse
The lakehouse (Apache Iceberg, Delta Lake, Apache Hudi) adds a metadata layer on top of object storage that gives warehouse-like guarantees without the vendor lock-in. A manifest file tracks which data files belong to which snapshot, so a writer atomically swaps the manifest pointer — a partial write is never visible to readers. Schema evolution is tracked in the table's history. Time travel is built in: SELECT * FROM orders VERSION AS OF <snapshot_id> (Spark+Iceberg) or FOR VERSION AS OF (Trino/Presto) — syntax varies by engine. And partition pruning uses per-file min/max statistics so the query engine skips files that can't contain matching rows.
Iceberg and Delta Lake are the two most widely adopted formats. Iceberg originated at Netflix (open-sourced to Apache in 2018, graduated to Apache top-level project in 2020). Delta Lake was open-sourced by Databricks in April 2019 and is hosted by the Linux Foundation (LF AI & Data) — not the Apache Software Foundation. Both have broad ecosystem support (Spark, Flink, Trino, Hive, BigQuery/BigLake).
Storage comparison
| Property | Data Lake (raw) | Data Warehouse | Lakehouse (Iceberg/Delta) |
|---|---|---|---|
| ACID | No | Yes | Yes |
| Schema enforcement | No (schema-on-read) | Yes (schema-on-write) | Yes (with evolution support) |
| Time travel | No (manually version files) | Vendor-specific | Yes (built-in) |
| Cost per TB | ~$23/TB/month | ~$2–5/TB/month (columnar compressed) + compute | ~$23/TB/month storage + open compute |
| Compute lock-in | None (open formats) | High (vendor SQL engine) | None (Spark, Flink, Trino, etc.) |
| Row-level deletes | Painful (rewrite partition) | Native | Yes (merge-on-read or copy-on-write) |
| Best for | Raw archive; replayability | Interactive analyst SQL | Everything — the modern default |
Partitioning and partition pruning
Partitioning is the single most important query-performance lever in a data pipeline. Partition by dt (date) for any table that analysts will filter by time — which is almost all of them.
s3://lake/fact_orders/
dt=2026-05-01/... # 8 GB
dt=2026-05-02/... # 9 GB
dt=2026-05-03/... # 8 GB
...
A query WHERE dt = '2026-05-01' reads only 8 GB; without partitioning it reads the entire table. At 2 PB total, this is the difference between a 2-second and an hour-long query.
For very large tables, add a second partition key — common choices are region, platform (iOS/Android/web), or a bucketed user hash for even distribution. Be careful: too many partition columns creates too many directories, which feeds directly into the small-files problem.
The small-files problem: streaming writes produce many small files (one per micro-batch per partition per Kafka partition). A table with 10 000 partitions × 100 micro-batches/hour × 24 hours = 24M files/day. Listing 24M files in S3 takes minutes and exhausts memory in the query planner. The solution is periodic compaction jobs that merge small files into larger ones (Iceberg's RewriteDataFiles action; Delta's OPTIMIZE command).
Streaming ingestion and batch+stream unification
The streaming path must answer: "what happened in the last 5 minutes?" — before the batch job has run.
flowchart LR
KAFKA["Kafka<br/>2M events/s"] --> SS["Spark Structured<br/>Streaming / Flink"]
SS -->|"micro-batch every 30s"| ICE[("Iceberg table<br/>fact_events")]
BATCH["Nightly Spark<br/>batch job"] -->|"partition overwrite"| ICE
ANA["Analysts / BI tools"] --> ICE
style KAFKA fill:#ff6b1a,color:#0a0a0f
style SS fill:#ff2e88,color:#fff
style ICE fill:#0e7490,color:#fff
style BATCH fill:#15803d,color:#fff
Both the streaming and batch paths write to the same Iceberg table. Analysts query one table and get a unified view. The batch job's partition overwrite replaces the streaming-written micro-batch files for past dates once the batch run completes — this is the Lambda Architecture pattern made clean by Iceberg's atomic swap.
Late and out-of-order data
Streaming events are not always generated in order. A mobile app may buffer events for minutes while offline, then flush them all at once. The event timestamp may be 08:47 but the message arrives in Kafka at 09:12.
Process on event time (when the event happened), not processing time (when Kafka received it). Windowing operations should use event timestamps.
Watermarks are the mechanism for handling this. A watermark is a heuristic assertion that "all events with event_time < T have now arrived." Events arriving after the watermark for their window are either late-handled (updated in the Iceberg table via a merge), or dropped. Typical watermark delay is 5–30 minutes depending on the application's offline-buffering behavior.
Schema evolution and data quality
Schema registry
With many producers writing to Kafka, schema drift is a constant threat. A producer team adds a new field — or renames one — and downstream consumers break silently or crash.
A schema registry (Confluent Schema Registry is the canonical implementation) stores the schema for each Kafka topic and enforces compatibility rules at produce time. Backward compatibility means the new schema can read data written with the old schema (new optional fields with defaults are fine; removing fields or changing types is not). Forward compatibility means the old schema can read data written with the new schema. "Full compatible" means both directions.
At ingest, the schema of every batch is validated against the registered schema. Incompatible batches are rejected to a dead-letter queue, not silently corrupted in the lake.
flowchart LR
PROD["Producer<br/>(new schema v2)"] -->|"serialize with schema id"| REG[("Schema Registry<br/>enforces compatibility")]
REG -->|"compatible: allow"| KAFKA["Kafka topic"]
REG -->|"breaking change: reject"| DLQ["Dead-letter queue"]
KAFKA --> CONS["Consumer<br/>(reads schema id → deserialize)"]
style REG fill:#ffaa00,color:#0a0a0f
style DLQ fill:#ff2e88,color:#fff
style KAFKA fill:#15803d,color:#fff
Data quality checks
Before a Spark job publishes its output partition, run assertions:
# Example quality checks (dbt test style, but runnable in Spark/PySpark)
assert output.count() > 0, "Empty output — job likely failed"
assert output.filter(col("user_id").isNull()).count() == 0, "Unexpected nulls in user_id"
assert output.agg(sum("revenue")).first()[0] == approx(expected_revenue, rel=0.05), "Revenue > 5% off"
If any assertion fails, the task fails and the orchestrator does not mark the partition as complete. The stale (previous) partition remains visible to downstream consumers, which is usually safer than serving a known-bad partition.
Failure modes
| Failure | Symptom | Mitigation |
|---|---|---|
| Shuffle skew / stragglers | One Spark stage hangs; all other tasks finish in minutes, one task runs for hours | Salting; broadcast joins; isolate the skewed key |
| Out-of-order / late data | Yesterday's partition has lower numbers than expected; corrects itself the next day | Event-time watermarks; allow late arrivals with configurable delay; reprocess partition after watermark |
| Expensive backfill | Reprocessing 1 year of history costs 10× a normal day's compute | Keep raw data on cheap object storage; design jobs for partition-level idempotency; test with a sample partition first |
| Schema drift (poison schema) | A producer renames a field; all downstream jobs fail silently or produce nulls | Schema registry with compatibility enforcement; dead-letter queue for malformed records |
| Poison records | A single malformed event causes a Spark task to throw an unhandled exception | Parse defensively; route bad records to a quarantine table; do not let one bad record fail an entire partition |
| Small-files problem | Streaming writes produce millions of tiny files; S3 listing times explode; query planner OOMs | Periodic compaction via Iceberg RewriteDataFiles or Delta OPTIMIZE; target 128–512 MB per file |
| Task not idempotent | Double-run during orchestrator recovery inserts duplicate rows | Partition overwrite semantics everywhere; no INSERT INTO without a preceding DELETE WHERE dt = ? |
| Source DB overload | JDBC full-scan query runs on OLTP primary and causes production latency spike | Always read from a replica; use CDC for large tables; schedule extracts during off-peak hours |
Lineage and observability
Every analytics table should answer: "where did this data come from, and what transformed it?"
- Column-level lineage: tools like OpenLineage (open standard), Marquez, and dbt's lineage graph trace which source columns contributed to each output column. Essential for compliance (GDPR: "show me all tables that contain user PII").
- Job-level lineage: the orchestrator knows which task produced which output partition. Every Airflow task should emit an OpenLineage event on start, success, and failure.
- Data catalog: a searchable inventory of all tables, schemas, owners, and freshness SLAs (Apache Atlas, DataHub, Amundsen).
- Anomaly detection: track row counts, null rates, and value distributions per partition over time. Alert when today's partition deviates more than 2σ from the 30-day rolling average. This catches silent data quality failures before analysts discover them.
Storage choices
| Data | Store | Why |
|---|---|---|
| Raw events (immutable) | Object storage (S3/GCS) — Parquet/ORC | Cheap, durable, replay-safe; see object storage |
| Refined / modeled tables | Iceberg / Delta on object storage | ACID + time travel + open compute; best of lake + warehouse |
| Warehouse layer | BigQuery / Snowflake / Redshift | Managed SQL, fast interactive queries, BI tool integrations |
| Kafka topic schemas | Confluent Schema Registry | Compatibility enforcement at produce time |
| Job metadata / DAG state | Postgres (orchestrator's metastore) | Transactional; not in the hot path |
| Data catalog | DataHub / Amundsen | Graph of assets, owners, lineage |
For the columnar storage format inside the lake, Parquet is the industry default: efficient column-level compression (dictionary, RLE, delta encoding, ZSTD/Snappy), predicate pushdown via row group statistics, and broad ecosystem support. See LSM trees vs B-trees for background on why columnar formats dramatically outperform row formats for analytical scans.
Things to discuss in an interview
- ETL vs ELT: explain why ELT dominates now (cheap warehouse compute, reprocessing from raw).
- Idempotency: every batch job uses partition-overwrite semantics; re-running any date is safe.
- Backfill strategy: orchestrator runs historical date ranges in parallel; idempotency makes it safe.
- The shuffle: where it's expensive, how to minimize it (predicate pushdown, broadcast joins), and how to handle skew (salting).
- Lake vs warehouse vs lakehouse: when you'd choose each; why open table formats changed the calculus.
- Streaming + batch unification: both paths write to the same Iceberg table; batch overwrites streaming micro-batches for past dates.
- Schema evolution: schema registry, compatibility rules, dead-letter queues for bad records.
- Small-files: compaction jobs are a first-class operational concern for streaming workloads.
Things you should now be able to answer
- Why is ELT now preferred over ETL for most large-scale pipelines?
- What does "idempotent task" mean in the context of a batch data job, and how do you achieve it with partition overwrite?
- What is a shuffle in Spark, why is it expensive, and what are two ways to reduce its cost?
- What is data skew and how do you fix it with salting?
- What is the difference between a data lake, a data warehouse, and a lakehouse? When would you use each?
- What is the small-files problem and how does it arise in a streaming pipeline?
- How do event time and processing time differ, and why does it matter for out-of-order data?
- Why does a schema registry prevent silent data corruption?
Further reading
- "Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics" — Armbrust, Ghodsi, Xin, Zaharia, CIDR 2021
- Apache Iceberg specification — iceberg.apache.org
- "Spark: The Definitive Guide" — Chambers & Zaharia, O'Reilly
- "Fundamentals of Data Engineering" — Reis & Housley, O'Reilly
- Apache Airflow documentation — airflow.apache.org
- OpenLineage specification — openlineage.io
- Design a Distributed Message Queue
- Design Object Storage
- LSM Trees vs B-Trees
- Design a Distributed Job Scheduler
Frequently asked questions
▸What is the difference between ETL and ELT, and why does ELT dominate modern large-scale pipelines?
ETL transforms data before loading it into the warehouse, often discarding raw copies in the process. ELT loads raw data first into an immutable data lake, then transforms it inside the warehouse or Spark — making reprocessing safe and cheap because the raw source is always preserved. ELT won because warehouse compute (BigQuery, Snowflake) now auto-scales and separates compute from storage, making a CREATE TABLE AS SELECT over 10 TB fast and inexpensive.
▸What is a shuffle in Spark and why is it expensive?
A shuffle is an all-to-all network transfer that happens whenever Spark needs to co-locate all rows sharing the same key on one executor — required for GROUP BY, JOIN, or reduceByKey operations. It is expensive for three compounding reasons: every executor must serialize, transfer, and deserialize potentially all of its data; a single slow straggler blocks the entire stage; and shuffle data spills to local disk when it does not fit in memory.
▸What is the small-files problem in a streaming data pipeline and how do you fix it?
Streaming writes produce one small file per micro-batch per partition per Kafka partition. At 10,000 partitions, 100 micro-batches per hour, and 24 hours, that is 24 million files per day — S3 listing operations for that many files take minutes and exhaust memory in the query planner. The fix is periodic compaction jobs that merge small files into larger targets of 128–512 MB, using Iceberg's RewriteDataFiles action or Delta's OPTIMIZE command.
▸When should you use a lakehouse (Iceberg or Delta Lake) instead of a plain data lake or a managed data warehouse?
A plain data lake on object storage has no ACID guarantees — two concurrent writers can corrupt each other, and row-level deletes require rewriting entire partitions. A managed warehouse like BigQuery or Snowflake is fully transactional but creates vendor lock-in and prevents direct Spark access to the data. A lakehouse adds a metadata layer on top of object storage that provides ACID commits, schema evolution, and time travel while keeping compute open to Spark, Flink, and Trino — making it the modern default for refined and modeled tables.
▸How do you make a batch Spark job idempotent so it can be safely re-run or backfilled?
Use dynamic partition overwrite: write with SaveMode.Overwrite scoped to the specific partition (setting partitionOverwriteMode to dynamic), which replaces only the partitions present in the current DataFrame and leaves all others untouched. This guarantees that re-running the job for any historical date produces exactly the same result without double-counting rows — the same guarantee that makes orchestrator-driven backfills safe to run in parallel across a full year of history.
You may also like
Design an LLM Observability Platform
Build the distributed tracing backbone for non-deterministic, multi-step LLM applications — capturing every prompt, completion, token count, and dollar cost across chains, retrievals, and tool calls so you can debug a failed agent run and account for every cent.
Design an LLM Gateway (AI Gateway & Model Router)
A single proxy control plane in front of OpenAI, Anthropic, Google, and open models — routing ~65 trillion tokens a month with automatic failover, semantic caching, per-team budget enforcement, and streaming SSE passthrough, all under 50 ms of added latency.
Design an LLM Fine-Tuning Platform
Turn a base model and a dataset into a deployed fine-tuned adapter at scale — the end-to-end platform covering dataset ingestion, LoRA/QLoRA/DPO training, fault-tolerant distributed GPU scheduling, eval gating, and multi-LoRA serving for hundreds of concurrent fine-tunes.