Spark Catalyst and AQE on the data engineering interview

Train for your next tech interview
1,500+ real interview questions across engineering, product, design, and data — with worked solutions.
Join the waitlist

Why this comes up on the DE loop

Catalyst is the brain of Spark SQL, and Adaptive Query Execution (AQE) is the headline improvement of the Spark 3.x line. If you are interviewing for a Data Engineering role at Databricks, Netflix, Stripe, or any shop that runs petabyte-scale jobs, expect the loop to drill on how Catalyst optimizes joins, what AQE actually does at runtime, and what skew join optimization is. Senior loops push further: physical plans, broadcast thresholds, codegen, and the runtime knobs you would tune.

The pain you want to avoid in the room is the classic story: a DE turned off AQE because of a stale rumor from the 3.0 era, lost a 5x speedup on a skewed join, and spent two weeks blaming Spark instead of the config. Knowing the four Catalyst phases, the AQE feature set, and the explain output keeps you out of that trap.

This is also why the explain-plan question pairs so well with Catalyst — the plan is the only place where the optimizer's decisions are visible.

Catalyst: the four phases

Catalyst is an extensible query optimizer. The input is SQL or the DataFrame DSL; the output is JVM bytecode running over RDDs. There are four phases, and df.explain(mode='extended') will dump all of them.

Phase Input Output What happens
Parse SQL string or DSL call Unresolved Logical Plan Syntax tree built; identifiers not yet bound
Analyze Unresolved Logical Plan Resolved Logical Plan Catalog lookup binds names to columns and types
Optimize Resolved Logical Plan Optimized Logical Plan Rule-based rewrites: pushdown, pruning, folding
Plan Optimized Logical Plan Physical Plan Pick concrete operators (SortMergeJoin vs BroadcastHashJoin)

After the physical plan, Spark generates bytecode through whole-stage codegen, which is technically a fifth step but usually folded into "planning" in interview answers.

SQL or DSL
   |
Unresolved Logical Plan      (after parse)
   |  analyze
Resolved Logical Plan        (catalog resolved)
   |  optimize
Optimized Logical Plan       (rule-based rewrites)
   |  plan
Physical Plan                (operator choice)
   |  codegen
RDD bytecode on the JVM

Load-bearing trick: the analyzer fails fast on bad column names, but the optimizer is silent — a missing predicate pushdown can quietly cost you a 10x slowdown without an error message.

Logical optimization rules

The optimizer applies a fixed-point set of rule-based rewrites. The big six worth naming in an interview:

Predicate pushdown. Filters get pushed as close to the source as possible. For Parquet this becomes row-group statistics filtering, so you read fewer bytes from disk.

# Logical: scan all rows, then filter
# Optimized: scan only the row groups whose max(amount) > 100
df.filter("amount > 100").select("user_id", "amount")

Projection pruning. Columns not in the final SELECT are never read. With Parquet this is column pruning at the file level — a 50-column table where you select two is roughly 25x cheaper than a naive scan.

Constant folding. Expressions like 1 + 2 or concat('a','b') are evaluated at plan time, not per row.

Join reordering. When the Cost-Based Optimizer (CBO) is on and statistics exist, Catalyst will reorder joins to minimize intermediate row counts. Without stats, it relies on the order you wrote.

Predicate combination and simplification. a = 1 AND a = 2 collapses to a contradiction and gets pruned. Tautologies (x = x) are simplified.

Subquery decorrelation. Simple correlated subqueries get rewritten as joins, which is almost always faster.

All of these run automatically. Your job as a DE is to write queries that let the optimizer help — for instance, filtering on a partition column before joining, not after.

Physical planning and joins

The logical plan does not pick algorithms. Physical planning picks the operator implementations. The join story is the one interviewers care about most.

Operator When chosen Cost
BroadcastHashJoin One side fits under spark.sql.autoBroadcastJoinThreshold (default 10 MB) Cheap; one shuffle avoided
SortMergeJoin Both sides large, equi-join on the same key Two shuffles + sort; the workhorse
ShuffleHashJoin One side small-ish, equi-join, sort cost > hash Rare default; enable with hint
BroadcastNestedLoopJoin Non-equi-join with one small side Quadratic in the broadcast side
CartesianProduct Cross join, no key Worst case; avoid in production

For aggregations the choice is HashAggregate (fast, the default) or SortAggregate (the fallback when the hash table does not fit in memory).

The Cost-Based Optimizer is gated behind spark.sql.cbo.enabled=true and depends on table statistics computed with ANALYZE TABLE ... COMPUTE STATISTICS. Without those, the planner uses size-on-disk heuristics, which are wrong often enough that you should treat CBO as an opt-in feature, not a default.

Tungsten and whole-stage codegen

Tungsten is the project that took Spark from "JVM iterators with boxing overhead" to "close to hand-written native code". Three pieces matter:

Off-heap memory management cuts the JVM garbage collector out of the hot path for shuffle buffers and hash tables, so a long-running job does not stall on 5-second GC pauses. Cache-aware computation lays out rows so that the CPU's L1 and L2 caches are not thrashed by random column access. And whole-stage codegen fuses a chain of operators into a single Java method.

HashAggregate
  Project
    Filter
      Scan

After codegen this chain becomes one tight loop with no virtual calls, no iterator overhead, and no boxing. In explain you will see it as WholeStageCodegen (1) wrapping the fused operators, with the (N) being the stage id.

The practical takeaway: a plan with lots of *(N) stars is fast; a plan without them is using the interpreted fallback, usually because of a UDF that broke the chain.

Train for your next tech interview
1,500+ real interview questions across engineering, product, design, and data — with worked solutions.
Join the waitlist

Adaptive Query Execution

AQE landed in Spark 3.0 and is on by default from 3.2 onward. The key insight: Catalyst plans the query before a single byte moves, using whatever statistics exist. AQE re-plans at shuffle boundaries using the actual size of the shuffled data.

Feature Config What it does
Dynamic partition coalesce spark.sql.adaptive.coalescePartitions.enabled Merges small post-shuffle partitions toward advisoryPartitionSizeInBytes (default 64 MB)
Skew join handling spark.sql.adaptive.skewJoin.enabled Detects skewed partitions and splits them, duplicating the matching side
Coalesce target size spark.sql.adaptive.advisoryPartitionSizeInBytes Sets the target size after coalesce; tune up for big jobs

Dynamic coalesce. Without AQE you get spark.sql.shuffle.partitions partitions after every shuffle (the default of 200 is almost never right). Half of them end up tiny, which means 200 tasks of scheduler overhead for nothing. AQE looks at the real shuffle file sizes and merges adjacent small partitions until each one is roughly 64 MB.

SortMergeJoin to BroadcastHashJoin demotion. If a filter reduces one side of a join below the broadcast threshold at runtime, AQE swaps the planned SortMergeJoin for a BroadcastHashJoin and skips a shuffle.

Skew join optimization. When AQE detects a partition that is skewedPartitionFactor (default 5) times the median and over skewedPartitionThresholdInBytes (default 256 MB), it splits that partition into sub-partitions and replicates the matching rows on the other side. The classic case: one country code with 40% of all events stalling the entire stage on one executor.

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")

Gotcha: AQE only triggers at shuffle boundaries. If your skew lives in the source Parquet partition layout, AQE cannot help — you need to repartition before the first shuffle, or fix the partition strategy upstream.

What AQE does not do: it does not replace Catalyst, it does not optimize plans that have zero shuffles, and it does not fix the source partition skew problem.

Reading explain output

There are four modes you should be able to identify on a whiteboard:

df.explain()                  # physical plan only
df.explain(mode='extended')   # all four phases
df.explain(mode='cost')       # adds size and row estimates
df.explain(mode='formatted')  # cleanest for screen sharing

The elements to call out by name when the interviewer points at a plan:

Scan parquet [...] is the source read; if PushedFilters: [IsNotNull(amount), GreaterThan(amount,100)] appears, predicate pushdown worked. Project [user_id, amount] is projection pruning. Exchange hashpartitioning(user_id, 200) is a shuffle — the 200 is your partition count, the number AQE will collapse. *(3) HashAggregate with the star means whole-stage codegen group 3. BroadcastExchange followed by BroadcastHashJoin means the small side was broadcast successfully. AdaptiveSparkPlan wrapping everything means AQE is on; the actual runtime changes show up in the Spark UI after the action runs.

Common pitfalls

Turning AQE off because of stale advice. AQE shipped rough in 3.0 and earned a bad reputation for the first six months. By 3.2 it is production-ready and on by default for a reason. If a senior on your team says "we disabled AQE", ask which version they tested on — it is almost always pre-3.2 muscle memory, and you are leaving free speedups on the table.

Ignoring the broadcast threshold. Default is 10 MB, and the size estimate Spark uses comes from table statistics. If the right side is 11 MB, you get a SortMergeJoin with two shuffles instead of a BroadcastHashJoin. The fix is either to bump spark.sql.autoBroadcastJoinThreshold (say to 50 MB on a cluster with enough driver memory) or to wrap the small DataFrame with broadcast(df) explicitly. The explicit hint wins because it does not depend on size estimates being accurate.

Skipping ANALYZE TABLE. The CBO without statistics is the CBO running on file sizes, which lies about row counts after compression. Run ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS user_id, amount and the join reordering and broadcast decisions both improve. A 30-second analyze can save 30 minutes per job.

Inflating spark.sql.shuffle.partitions without AQE. Setting it to 4000 to "handle big shuffles" without enabling AQE means 4000 tasks of a few kilobytes each — scheduler overhead crushes you. With AQE coalesce on, you can leave it high and let the runtime pick the right number.

Treating explain() as if it runs the query. It does not. explain is pure planning. To see the AQE rewrites you need to run an action (show(), write(), count()) and then open the Spark UI's SQL tab.

Using repartition reflexively. repartition(n) triggers a full shuffle. If you just need fewer partitions before a write and do not care about balance, coalesce(n) skips the shuffle. The distinction matters: a misplaced repartition adds a shuffle stage that AQE will then try to clean up.

Trusting CBO blindly. Spark CBO is improving but still trails Snowflake and BigQuery by a noticeable margin in our experience. Explicit hints — broadcast(df), repartition(n, col), hint("merge") — beat the automatic planner on the joins you care about. Use CBO for the 80% of queries you do not hand-tune, and override it on the hot ones.

If you want to drill Spark and DE questions like this every day, NAILDD is launching with hundreds of data engineering problems built around exactly these patterns.

FAQ

Does Catalyst optimize Python UDFs?

No. A Python UDF is opaque to Catalyst — it cannot see inside the function, so it cannot push filters through it or apply column pruning past it. The optimizer will push filters up to the UDF and prune columns after it, but the body itself is a black box. This is why Spark SQL functions, Pandas UDFs with applyInPandas, or Scala UDFs are preferred for hot paths.

Does AQE slow down the first run?

Slightly. AQE adds a small runtime planning overhead at each shuffle boundary. On short queries with no shuffles this can look like a regression. On any real production query — joins, aggregations, skewed keys — the benefit of dynamic coalesce and skew handling dwarfs the planning cost by orders of magnitude.

When do I need an explicit broadcast(df) hint?

When the size estimate is wrong or absent. The auto-broadcast path depends on Spark estimating the size of the smaller side accurately. If your table has no statistics, or the size after a filter is much smaller than the source, the planner often misses the broadcast opportunity. Wrapping with broadcast(df) from pyspark.sql.functions forces the decision.

What is Dynamic Partition Pruning?

DPP is a separate Spark 3 feature, complementary to AQE. It runs the filter on the dimension side of a star-schema join first, then uses the resulting key set to prune partitions on the fact side at runtime. The win on a billion-row fact table with 300 partitions and a small dim filter can be 10x to 50x.

Does Catalyst apply to RDDs?

No. Catalyst only sees DataFrame and Dataset operations. RDD code is opaque to the optimizer, runs without codegen, and misses all the rewrites described above. This is the single biggest reason to migrate any remaining RDD code to DataFrame APIs.

Is this official Spark guidance?

No. This article is built from the Spark 3.x documentation, Databricks engineering blog posts on AQE and Tungsten, and patterns we see in DE interview loops at large data shops. Always verify config defaults against the version of Spark you are actually running.