DataFrame Performance Optimization Guide

Tags advanced guide dataframe-platform performance

This guide covers performance optimization techniques for BenchBox DataFrame adapters.

Overview

DataFrame benchmarking performance depends on:

  1. Platform selection - Different platforms excel at different workloads

  2. Configuration options - Platform-specific tuning settings

  3. Query patterns - Writing efficient DataFrame operations

  4. Memory management - Managing data size and memory pressure

Platform-Specific Optimizations

Polars

Polars is the default expression-family adapter and excels at single-node performance.

Key Optimizations:

  • Lazy evaluation: Use LazyFrame for query planning optimization

  • Streaming mode: Enable for large datasets that don’t fit in memory

  • Predicate pushdown: Filters are automatically pushed to scan operations

from benchbox.platforms.dataframe.polars_df import PolarsDataFrameAdapter

adapter = PolarsDataFrameAdapter(
    streaming=True,      # Enable streaming for large datasets
    rechunk=True,        # Rechunk for better memory layout
)

Best Practices:

  • Use ctx.scalar() instead of .collect()[0, 0] for single-value extraction

  • Chain operations before calling .collect()

  • Use select() to limit columns early in the query

Pandas

Pandas is the reference Pandas-family implementation.

Key Optimizations:

  • Copy-on-write (2.0+): Reduces memory copies for read-heavy workloads

  • PyArrow backend: Uses Apache Arrow for better memory efficiency

  • Categorical dtypes: Reduces memory for low-cardinality string columns

from benchbox.platforms.dataframe.pandas_df import PandasDataFrameAdapter

adapter = PandasDataFrameAdapter(
    copy_on_write=True,              # Enable CoW for Pandas 2.0+
    dtype_backend="pyarrow",         # Use PyArrow backend
)

Best Practices:

  • Enable CoW for read-heavy workloads with many intermediate operations

  • Use ctx.scalar() for efficient single-value extraction

  • Avoid chained indexing (df['a']['b']) - use .loc[row, col] instead

PySpark

PySpark excels at distributed processing and large-scale data.

Key Optimizations:

  • Adaptive Query Execution (AQE): Auto-optimizes joins and aggregations

  • Shuffle partitions: Tune based on data size

  • Broadcast joins: Use for small dimension tables

from benchbox.platforms.dataframe.pyspark_df import PySparkDataFrameAdapter

adapter = PySparkDataFrameAdapter(
    master="local[*]",           # Use all cores locally
    driver_memory="8g",          # Increase driver memory
    shuffle_partitions=200,      # Tune for your data size
    enable_aqe=True,             # Enable Adaptive Query Execution
)

Best Practices:

  • Use ctx.scalar() instead of .collect()[0][0] for single values

  • Cache frequently-accessed DataFrames with .cache()

  • Avoid collecting large datasets to the driver

DataFusion

DataFusion provides SQL-like optimization with expression-family syntax.

Key Optimizations:

  • Parquet pushdown: Filters and projections pushed to file scan

  • Repartition joins: Parallel hash joins

  • Batch size tuning: Adjust for memory/throughput tradeoff

from benchbox.platforms.dataframe.datafusion_df import DataFusionDataFrameAdapter

adapter = DataFusionDataFrameAdapter(
    repartition_joins=True,      # Enable parallel hash joins
    parquet_pushdown=True,       # Push predicates to Parquet scan
    batch_size=8192,             # Row batch size
)

Common Optimization Patterns

Scalar Extraction

When extracting a single value (e.g., for use in a subsequent filter), use the optimized scalar() method:

# Before (inefficient)
total = df.select(col("value").sum()).collect()[0, 0]

# After (optimized)
total = ctx.scalar(df.select(col("value").sum()))

This uses platform-native methods for efficient scalar extraction:

  • Polars: .item()

  • PySpark: .first()[0]

  • DataFusion: PyArrow column access

  • Pandas: .iloc[0, 0]

Filter Push-down

Write queries that allow predicate push-down to file scans:

# Good: Filter early, before joins
filtered = lineitem.filter(col("l_shipdate") >= lit(start_date))
result = filtered.join(orders, ...)

# Avoid: Filter after expensive operations
result = lineitem.join(orders, ...).filter(col("l_shipdate") >= lit(start_date))

Column Pruning

Select only needed columns early in the query:

# Good: Select columns early
subset = lineitem.select("l_orderkey", "l_quantity", "l_extendedprice")
result = subset.join(orders.select("o_orderkey", "o_orderdate"), ...)

# Avoid: Carrying unnecessary columns through joins
result = lineitem.join(orders, ...).select("l_orderkey", "l_quantity", ...)

Memory Management

Scale Factor Guidelines

Scale Factor

Memory Required

Recommended Platform

SF 0.01

<1 GB

Any

SF 0.1

<4 GB

Any

SF 1

4-8 GB

Polars, Pandas

SF 10

32-64 GB

Polars (streaming)

SF 100+

256+ GB

PySpark, Dask

Large Dataset Strategies

For datasets larger than available memory:

  1. Polars streaming: streaming=True processes data in chunks

  2. Dask: Automatically partitions data across workers

  3. PySpark: Distributed processing with memory spill to disk

Profiling Queries

BenchBox provides built-in profiling for DataFrame queries:

# Execute with profiling
result, profile = adapter.execute_query_profiled(ctx, query)

# Access timing breakdown
print(f"Planning time: {profile.planning_time_ms}ms")
print(f"Execution time: {profile.execution_time_ms}ms")
print(f"Peak memory: {profile.peak_memory_mb}MB")

# Get query plan (lazy platforms only)
if profile.query_plan:
    print(profile.query_plan.plan_text)

Tuning Configuration

Use YAML tuning configurations for reproducible optimization:

# tuning/performance.yaml
platform: polars-df
settings:
  streaming:
    enabled: true
  memory:
    batch_size: 16384
  data_types:
    auto_categorize_strings: true
benchbox run --platform polars-df --tuning tuning/performance.yaml ...