DataFrame Performance Optimization Guide¶
This guide covers performance optimization techniques for BenchBox DataFrame adapters.
Overview¶
DataFrame benchmarking performance depends on:
Platform selection - Different platforms excel at different workloads
Configuration options - Platform-specific tuning settings
Query patterns - Writing efficient DataFrame operations
Memory management - Managing data size and memory pressure
Platform-Specific Optimizations¶
Polars¶
Polars is the default expression-family adapter and excels at single-node performance.
Key Optimizations:
Lazy evaluation: Use
LazyFramefor query planning optimizationStreaming mode: Enable for large datasets that don’t fit in memory
Predicate pushdown: Filters are automatically pushed to scan operations
from benchbox.platforms.dataframe.polars_df import PolarsDataFrameAdapter
adapter = PolarsDataFrameAdapter(
streaming=True, # Enable streaming for large datasets
rechunk=True, # Rechunk for better memory layout
)
Best Practices:
Use
ctx.scalar()instead of.collect()[0, 0]for single-value extractionChain operations before calling
.collect()Use
select()to limit columns early in the query
Pandas¶
Pandas is the reference Pandas-family implementation.
Key Optimizations:
Copy-on-write (2.0+): Reduces memory copies for read-heavy workloads
PyArrow backend: Uses Apache Arrow for better memory efficiency
Categorical dtypes: Reduces memory for low-cardinality string columns
from benchbox.platforms.dataframe.pandas_df import PandasDataFrameAdapter
adapter = PandasDataFrameAdapter(
copy_on_write=True, # Enable CoW for Pandas 2.0+
dtype_backend="pyarrow", # Use PyArrow backend
)
Best Practices:
Enable CoW for read-heavy workloads with many intermediate operations
Use
ctx.scalar()for efficient single-value extractionAvoid chained indexing (
df['a']['b']) - use.loc[row, col]instead
PySpark¶
PySpark excels at distributed processing and large-scale data.
Key Optimizations:
Adaptive Query Execution (AQE): Auto-optimizes joins and aggregations
Shuffle partitions: Tune based on data size
Broadcast joins: Use for small dimension tables
from benchbox.platforms.dataframe.pyspark_df import PySparkDataFrameAdapter
adapter = PySparkDataFrameAdapter(
master="local[*]", # Use all cores locally
driver_memory="8g", # Increase driver memory
shuffle_partitions=200, # Tune for your data size
enable_aqe=True, # Enable Adaptive Query Execution
)
Best Practices:
Use
ctx.scalar()instead of.collect()[0][0]for single valuesCache frequently-accessed DataFrames with
.cache()Avoid collecting large datasets to the driver
DataFusion¶
DataFusion provides SQL-like optimization with expression-family syntax.
Key Optimizations:
Parquet pushdown: Filters and projections pushed to file scan
Repartition joins: Parallel hash joins
Batch size tuning: Adjust for memory/throughput tradeoff
from benchbox.platforms.dataframe.datafusion_df import DataFusionDataFrameAdapter
adapter = DataFusionDataFrameAdapter(
repartition_joins=True, # Enable parallel hash joins
parquet_pushdown=True, # Push predicates to Parquet scan
batch_size=8192, # Row batch size
)
Common Optimization Patterns¶
Scalar Extraction¶
When extracting a single value (e.g., for use in a subsequent filter), use the
optimized scalar() method:
# Before (inefficient)
total = df.select(col("value").sum()).collect()[0, 0]
# After (optimized)
total = ctx.scalar(df.select(col("value").sum()))
This uses platform-native methods for efficient scalar extraction:
Polars:
.item()PySpark:
.first()[0]DataFusion: PyArrow column access
Pandas:
.iloc[0, 0]
Filter Push-down¶
Write queries that allow predicate push-down to file scans:
# Good: Filter early, before joins
filtered = lineitem.filter(col("l_shipdate") >= lit(start_date))
result = filtered.join(orders, ...)
# Avoid: Filter after expensive operations
result = lineitem.join(orders, ...).filter(col("l_shipdate") >= lit(start_date))
Column Pruning¶
Select only needed columns early in the query:
# Good: Select columns early
subset = lineitem.select("l_orderkey", "l_quantity", "l_extendedprice")
result = subset.join(orders.select("o_orderkey", "o_orderdate"), ...)
# Avoid: Carrying unnecessary columns through joins
result = lineitem.join(orders, ...).select("l_orderkey", "l_quantity", ...)
Memory Management¶
Scale Factor Guidelines¶
Scale Factor |
Memory Required |
Recommended Platform |
|---|---|---|
SF 0.01 |
<1 GB |
Any |
SF 0.1 |
<4 GB |
Any |
SF 1 |
4-8 GB |
Polars, Pandas |
SF 10 |
32-64 GB |
Polars (streaming) |
SF 100+ |
256+ GB |
PySpark, Dask |
Large Dataset Strategies¶
For datasets larger than available memory:
Polars streaming:
streaming=Trueprocesses data in chunksDask: Automatically partitions data across workers
PySpark: Distributed processing with memory spill to disk
Profiling Queries¶
BenchBox provides built-in profiling for DataFrame queries:
# Execute with profiling
result, profile = adapter.execute_query_profiled(ctx, query)
# Access timing breakdown
print(f"Planning time: {profile.planning_time_ms}ms")
print(f"Execution time: {profile.execution_time_ms}ms")
print(f"Peak memory: {profile.peak_memory_mb}MB")
# Get query plan (lazy platforms only)
if profile.query_plan:
print(profile.query_plan.plan_text)
Tuning Configuration¶
Use YAML tuning configurations for reproducible optimization:
# tuning/performance.yaml
platform: polars-df
settings:
streaming:
enabled: true
memory:
batch_size: 16384
data_types:
auto_categorize_strings: true
benchbox run --platform polars-df --tuning tuning/performance.yaml ...