Adding a New DataFrame Platform¶
This guide explains how to add support for a new DataFrame platform to BenchBox.
Overview¶
BenchBox uses a family-based architecture that minimizes code duplication when adding new platforms. Most new platforms require:
Determine which family the platform belongs to
Implement a DataFrameContext subclass
Implement a platform adapter
Register the platform
Add tests
Step 1: Determine the Family¶
DataFrame platforms fall into two families based on their API style:
Expression Family¶
Use expression objects for column references and operations.
Members: Polars, PySpark, DataFusion
# Expression-style syntax
result = (
df.filter(col('status') == lit('active'))
.group_by('category')
.agg(col('amount').sum().alias('total'))
)
Key characteristics:
col()function for column referenceslit()function for literal valuesMethod chaining with expression composition
Often supports lazy evaluation
Pandas Family¶
Use string-based column access and boolean indexing.
Members: Pandas, Modin, cuDF, Dask, Vaex
# Pandas-style syntax
filtered = df[df['status'] == 'active']
result = filtered.groupby('category').agg({'amount': 'sum'})
Key characteristics:
String-based column names:
df['column']Boolean indexing for filtering
Dictionary-based aggregation specifications
Usually eager evaluation (except Dask)
Step 2: Implement DataFrameContext¶
Create a context class that provides table access and family-specific helpers.
Expression Family Context¶
# benchbox/core/dataframe/context.py
class MyPlatformDataFrameContext(DataFrameContext):
"""Context for MyPlatform DataFrame operations."""
def __init__(self):
self._tables: dict[str, Any] = {}
@property
def family(self) -> str:
return "expression"
def get_table(self, name: str) -> Any:
"""Get a registered table by name."""
if name not in self._tables:
raise KeyError(f"Table '{name}' not registered")
return self._tables[name]
def register_table(self, name: str, df: Any) -> None:
"""Register a DataFrame as a named table."""
self._tables[name] = df
@property
def col(self):
"""Column reference function."""
from myplatform import col
return col
@property
def lit(self):
"""Literal value function."""
from myplatform import lit
return lit
Pandas Family Context¶
class MyPandasLikeContext(DataFrameContext):
"""Context for MyPandasLike DataFrame operations."""
def __init__(self):
self._tables: dict[str, Any] = {}
@property
def family(self) -> str:
return "pandas"
def get_table(self, name: str) -> Any:
if name not in self._tables:
raise KeyError(f"Table '{name}' not registered")
return self._tables[name].copy() # Return copy for safety
def register_table(self, name: str, df: Any) -> None:
self._tables[name] = df
@property
def col(self):
return None # Not used in pandas family
@property
def lit(self):
return None # Not used in pandas family
Step 3: Implement Platform Adapter¶
Create an adapter that handles data loading and query execution.
# benchbox/platforms/dataframe/myplatform.py
from benchbox.platforms.dataframe.base import DataFrameAdapter
class MyPlatformAdapter(DataFrameAdapter):
"""Adapter for MyPlatform DataFrame benchmarking."""
platform_name = "myplatform-df"
family = "expression" # or "pandas"
def __init__(self, working_dir: str, **options):
super().__init__(working_dir)
self.options = options
def create_context(self) -> MyPlatformDataFrameContext:
"""Create a new context for query execution."""
return MyPlatformDataFrameContext()
def load_tables(self, ctx: DataFrameContext, data_dir: str) -> None:
"""Load benchmark tables into the context."""
import myplatform as mp
from pathlib import Path
data_path = Path(data_dir)
parquet_dir = data_path / "parquet"
for table_file in parquet_dir.glob("*.parquet"):
table_name = table_file.stem
df = mp.read_parquet(str(table_file))
ctx.register_table(table_name, df)
def execute_query(self, ctx: DataFrameContext, query: DataFrameQuery) -> Any:
"""Execute a query and return results."""
impl = query.get_impl_for_family(self.family)
if impl is None:
raise ValueError(f"No {self.family} implementation for {query.query_id}")
result = impl(ctx)
# Collect if lazy
if hasattr(result, 'collect'):
result = result.collect()
return result
@staticmethod
def is_available() -> bool:
"""Check if the platform is installed."""
try:
import myplatform
return True
except ImportError:
return False
@staticmethod
def get_version() -> str | None:
"""Get platform version string."""
try:
import myplatform
return myplatform.__version__
except ImportError:
return None
Step 4: Register the Platform¶
Add the platform to the registry.
# benchbox/platforms/dataframe/__init__.py
from benchbox.platforms.dataframe.myplatform import MyPlatformAdapter
DATAFRAME_ADAPTERS = {
"polars-df": PolarsDataFrameAdapter,
"pandas-df": PandasDataFrameAdapter,
"myplatform-df": MyPlatformAdapter, # Add new platform
}
Step 5: Add Tests¶
Unit Tests¶
# tests/unit/platforms/test_myplatform_adapter.py
import pytest
from benchbox.platforms.dataframe.myplatform import MyPlatformAdapter
class TestMyPlatformAdapter:
"""Tests for MyPlatform DataFrame adapter."""
def test_is_available(self):
"""Test availability check."""
# Will be True if myplatform is installed
result = MyPlatformAdapter.is_available()
assert isinstance(result, bool)
@pytest.mark.skipif(
not MyPlatformAdapter.is_available(),
reason="myplatform not installed"
)
def test_create_context(self, tmp_path):
"""Test context creation."""
adapter = MyPlatformAdapter(str(tmp_path))
ctx = adapter.create_context()
assert ctx.family == "expression" # or "pandas"
@pytest.mark.skipif(
not MyPlatformAdapter.is_available(),
reason="myplatform not installed"
)
def test_query_execution(self, tmp_path, sample_data):
"""Test query execution with sample data."""
adapter = MyPlatformAdapter(str(tmp_path))
ctx = adapter.create_context()
adapter.load_tables(ctx, sample_data)
from benchbox.core.tpch.dataframe_queries import get_tpch_query
query = get_tpch_query("Q1")
result = adapter.execute_query(ctx, query)
assert len(result) > 0
Integration Tests¶
# tests/integration/test_myplatform_tpch.py
import pytest
from benchbox.platforms.dataframe.myplatform import MyPlatformAdapter
@pytest.mark.integration
@pytest.mark.skipif(
not MyPlatformAdapter.is_available(),
reason="myplatform not installed"
)
class TestMyPlatformTPCH:
"""Integration tests for MyPlatform TPC-H execution."""
def test_all_tpch_queries(self, tpch_data_dir):
"""Test all TPC-H queries execute successfully."""
from benchbox.core.tpch.dataframe_queries import TPCH_DATAFRAME_QUERIES
adapter = MyPlatformAdapter(str(tpch_data_dir))
ctx = adapter.create_context()
adapter.load_tables(ctx, str(tpch_data_dir))
for query in TPCH_DATAFRAME_QUERIES.get_all_queries():
result = adapter.execute_query(ctx, query)
assert result is not None
Platform-Specific Considerations¶
PySpark¶
Requires Spark session management
Use
spark.createDataFrame()for table registrationHandle distributed execution semantics
Modin/Ray¶
Drop-in Pandas replacement
Import
modin.pandas as pdinstead ofpandasContext is identical to Pandas family
cuDF (GPU)¶
Requires CUDA-enabled GPU
Memory limited to GPU VRAM
Similar to Pandas family API
Dask¶
Supports larger-than-memory datasets
Lazy evaluation like expression family
Uses Pandas-style API
Testing Checklist¶
Before submitting a PR:
[ ] Unit tests pass:
pytest tests/unit/platforms/test_myplatform_adapter.py[ ] Integration tests pass (if platform installed)
[ ] Platform availability check works correctly
[ ] Version detection works
[ ] All TPC-H queries execute successfully
[ ] Documentation updated
[ ] Example script created in
examples/dataframe/