DataFrame Performance Benchmarks¶
This guide covers DataFrame performance profiling, optimization, and benchmarking across platforms.
Overview¶
BenchBox provides tools for:
Profiling DataFrame query execution
Comparing DataFrame vs SQL performance
Capturing and analyzing query plans
Measuring lazy evaluation overhead
Profiling Queries¶
Basic Profiling¶
from benchbox.core.dataframe.profiling import DataFrameProfiler
profiler = DataFrameProfiler(platform="polars")
# Profile a query
with profiler.profile_query("q1") as ctx:
result = df.filter(col("l_shipdate") <= lit(cutoff)).collect()
ctx.set_rows(len(result))
# Get profile
profile = ctx.get_profile()
print(f"Query: {profile.query_id}")
print(f"Execution time: {profile.execution_time_ms:.2f}ms")
print(f"Rows processed: {profile.rows_processed}")
Lazy Evaluation Profiling¶
For platforms with lazy evaluation (Polars, PySpark, DataFusion):
with profiler.profile_query("q1") as ctx:
# Planning phase
ctx.start_planning()
lazy_result = df.filter(...).group_by(...).agg(...)
ctx.end_planning()
# Execution/collect phase
ctx.start_collect()
result = lazy_result.collect()
ctx.end_collect()
ctx.set_rows(len(result))
profile = ctx.get_profile()
print(f"Planning: {profile.planning_time_ms:.2f}ms")
print(f"Collect: {profile.collect_time_ms:.2f}ms")
print(f"Lazy overhead: {profile.lazy_overhead_percent:.1f}%")
Aggregate Statistics¶
# Profile multiple queries
for query_id in ["q1", "q2", "q3"]:
with profiler.profile_query(query_id) as ctx:
execute_query(query_id)
# Get aggregate statistics
stats = profiler.get_statistics()
print(f"Queries: {stats['query_count']}")
print(f"Total time: {stats['total_execution_time_ms']:.0f}ms")
print(f"Average time: {stats['avg_execution_time_ms']:.2f}ms")
print(f"Lazy evaluation queries: {stats['lazy_evaluation_queries']}")
print(f"Avg lazy overhead: {stats['avg_lazy_overhead_percent']:.1f}%")
Query Plan Capture¶
Polars¶
from benchbox.core.dataframe.profiling import capture_polars_plan
# Build lazy query
lazy_df = (
df.lazy()
.filter(col("l_shipdate") <= lit(cutoff))
.group_by("l_returnflag")
.agg(col("l_quantity").sum())
)
# Capture plan BEFORE collect
plan = capture_polars_plan(lazy_df)
print(plan.plan_text)
print("Optimization hints:", plan.optimization_hints)
# Execute
result = lazy_df.collect()
DataFusion¶
from benchbox.core.dataframe.profiling import capture_datafusion_plan
plan = capture_datafusion_plan(df)
print(plan.plan_text)
PySpark¶
from benchbox.core.dataframe.profiling import capture_pyspark_plan
plan = capture_pyspark_plan(spark_df)
print(plan.plan_text)
Generic Capture¶
from benchbox.core.dataframe.profiling import capture_query_plan
# Automatically detects platform
plan = capture_query_plan(df, platform="polars")
if plan:
print(plan.plan_text)
for hint in plan.optimization_hints:
print(f" - {hint}")
SQL vs DataFrame Comparison¶
Compare execution modes for platforms supporting both:
from benchbox.core.dataframe.profiling import (
DataFrameProfiler,
compare_execution_modes,
)
# Collect DataFrame execution times
df_profiler = DataFrameProfiler(platform="polars-df")
for query_id in benchmark.query_ids:
with df_profiler.profile_query(query_id):
benchmark.execute_dataframe_query(query_id)
# Get SQL execution times (from previous benchmark run)
sql_times = {"q1": 45.2, "q2": 123.4, "q3": 89.1}
# Compare
comparisons = compare_execution_modes(
df_profiler.get_profiles(),
sql_times
)
for comp in comparisons:
print(f"{comp.query_id}: DataFrame={comp.dataframe_time_ms:.1f}ms, "
f"SQL={comp.sql_time_ms:.1f}ms, Winner={comp.winner}")
for note in comp.notes:
print(f" - {note}")
Platform Performance Characteristics¶
Polars¶
Characteristic |
Value |
Notes |
|---|---|---|
Execution Model |
Lazy with streaming |
Query plan optimized before execution |
Memory Efficiency |
High |
Columnar, zero-copy operations |
Parallelism |
Automatic |
Thread pool based on CPU cores |
Best For |
Medium-large datasets |
Up to ~100GB on single node |
Optimization Tips:
Use
.lazy()for query optimizationEnable streaming for very large datasets:
--platform-option streaming=trueMinimize
.collect()callsFilter early to reduce data volume
Pandas¶
Characteristic |
Value |
Notes |
|---|---|---|
Execution Model |
Eager |
Each operation executes immediately |
Memory Efficiency |
Moderate |
Higher memory footprint |
Parallelism |
Limited |
Single-threaded by default |
Best For |
Small-medium datasets |
Up to ~10GB |
Optimization Tips:
Use PyArrow backend:
--platform-option dtype_backend=pyarrowMinimize DataFrame copies
Use vectorized operations
Filter data early
DataFusion¶
Characteristic |
Value |
Notes |
|---|---|---|
Execution Model |
Lazy |
Similar to Polars |
Memory Efficiency |
High |
Arrow-native |
Parallelism |
Automatic |
Configurable thread pool |
Best For |
Analytical queries |
Arrow ecosystem integration |
PySpark¶
Characteristic |
Value |
Notes |
|---|---|---|
Execution Model |
Lazy |
Catalyst optimizer |
Memory Efficiency |
Variable |
Depends on configuration |
Parallelism |
Distributed |
Cluster-wide execution |
Best For |
Very large datasets |
Terabyte+ scale |
Benchmark Results Analysis¶
Running Performance Benchmarks¶
# Run TPC-H on multiple DataFrame platforms
benchbox run --platform polars-df --benchmark tpch --scale 1 \
--output results/polars_sf1.json
benchbox run --platform pandas-df --benchmark tpch --scale 1 \
--output results/pandas_sf1.json
benchbox run --platform duckdb-df --benchmark tpch --scale 1 \
--output results/duckdb_sf1.json
Comparing Results¶
# Compare multiple result files
benchbox compare results/polars_sf1.json results/pandas_sf1.json results/duckdb_sf1.json
Programmatic Analysis¶
from pathlib import Path
import json
def load_results(path: str) -> dict:
with open(path) as f:
return json.load(f)
def compare_platforms(result_paths: list[str]) -> dict:
results = {Path(p).stem: load_results(p) for p in result_paths}
comparison = {}
for platform, data in results.items():
query_times = {
q["query_id"]: q["execution_time_ms"]
for q in data.get("query_results", [])
}
comparison[platform] = query_times
return comparison
# Analyze
comparison = compare_platforms([
"results/polars_sf1.json",
"results/pandas_sf1.json",
"results/duckdb_sf1.json"
])
# Find fastest platform per query
for query_id in comparison["polars_sf1"]:
times = {p: comparison[p].get(query_id) for p in comparison}
fastest = min(times, key=lambda x: times[x] or float('inf'))
print(f"{query_id}: {fastest} ({times[fastest]:.0f}ms)")
Memory Profiling¶
Tracking Memory Usage¶
import tracemalloc
tracemalloc.start()
# Execute query
result = df.filter(...).collect()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Current memory: {current / 1024 / 1024:.1f} MB")
print(f"Peak memory: {peak / 1024 / 1024:.1f} MB")
With Profiler Context¶
import tracemalloc
with profiler.profile_query("q1") as ctx:
tracemalloc.start()
result = df.filter(...).collect()
ctx.set_rows(len(result))
_, peak = tracemalloc.get_traced_memory()
ctx.set_peak_memory(peak / 1024 / 1024) # Convert to MB
tracemalloc.stop()
profile = ctx.get_profile()
print(f"Peak memory: {profile.peak_memory_mb:.1f} MB")
Best Practices¶
1. Profile Before Optimizing¶
Always measure before making changes:
# Baseline measurement
baseline_profiles = []
for i in range(3): # Run multiple times
with profiler.profile_query(f"q1_run{i}"):
execute_query()
stats = profiler.get_statistics()
print(f"Baseline avg: {stats['avg_execution_time_ms']:.2f}ms")
2. Capture Query Plans¶
Understand what the optimizer is doing:
plan = capture_query_plan(lazy_df, "polars")
print(plan.plan_text)
3. Compare Lazy vs Eager¶
For platforms supporting both:
# Lazy execution
with profiler.profile_query("q1_lazy") as ctx:
ctx.start_planning()
lazy_result = df.lazy().filter(...).group_by(...).agg(...)
ctx.end_planning()
ctx.start_collect()
result = lazy_result.collect()
ctx.end_collect()
# Eager execution (if available)
with profiler.profile_query("q1_eager") as ctx:
result = df.filter(...).group_by(...).agg(...)
# Compare
lazy_profile = profiler.get_profile("q1_lazy")
eager_profile = profiler.get_profile("q1_eager")
print(f"Lazy: {lazy_profile.execution_time_ms:.2f}ms "
f"(overhead: {lazy_profile.lazy_overhead_percent:.1f}%)")
print(f"Eager: {eager_profile.execution_time_ms:.2f}ms")
4. Scale Factor Selection¶
Platform |
Recommended Max SF |
Memory Needed |
|---|---|---|
Pandas |
1-10 |
~6GB per SF |
Polars |
10-100 |
~4GB per SF |
DataFusion |
10-100 |
~4GB per SF |
PySpark |
100+ |
Cluster dependent |
5. Monitor Optimization Hints¶
plan = capture_polars_plan(lazy_df)
if plan.optimization_hints:
print("Optimization opportunities:")
for hint in plan.optimization_hints:
print(f" - {hint}")