Apache DataFusion Platform Guide¶
Apache DataFusion is a fast, embeddable query engine for building high-quality data-centric systems in Rust, Python, and other languages. BenchBox’s DataFusion adapter enables in-memory OLAP benchmarking with support for both CSV and Parquet formats.
Overview¶
Type: In-memory Query Engine
Common Use Cases: In-process analytics, rapid prototyping, OLAP workload testing
Installation: pip install datafusion
Key Features¶
In-memory execution: Fast analytical queries without persistent database overhead
PostgreSQL-compatible SQL: Uses PostgreSQL dialect for broad compatibility
Dual format support: Native CSV loading or high-performance Parquet conversion
Automatic optimization: Query optimization, predicate pushdown, and partition pruning
Python-native: Direct integration with Python data ecosystem (PyArrow, Pandas)
Configurable parallelism: Multi-threaded execution with tunable partition count
Quick Start¶
Installation¶
Install DataFusion support:
pip install datafusion
Or with BenchBox:
uv add benchbox datafusion
Version Compatibility: BenchBox supports DataFusion 0.x through latest versions. The adapter automatically handles API differences between versions:
Older versions: Uses
RuntimeEnvclassNewer versions: Uses
RuntimeEnvBuilderclassRecommendation: Install latest version with
pip install datafusionfor best performance and features
Basic Usage¶
from benchbox.platforms.datafusion import DataFusionAdapter
from benchbox import TPCH
# Create adapter with default settings
adapter = DataFusionAdapter(
working_dir="./datafusion_working",
memory_limit="16G",
data_format="parquet" # Columnar format with compression and predicate pushdown
)
# Run TPC-H benchmark
benchmark = TPCH(scale_factor=1.0)
results = benchmark.run_with_platform(adapter)
print(f"Completed in {results.duration_seconds:.2f}s")
print(f"Average query time: {results.average_query_time:.3f}s")
CLI Usage¶
# Run TPC-H benchmark with DataFusion
benchbox run --platform datafusion --benchmark tpch --scale 1.0 \
--datafusion-memory-limit 16G \
--datafusion-format parquet \
--datafusion-partitions 8
# Use CSV format (direct loading, lower memory)
benchbox run --platform datafusion --benchmark tpch --scale 1.0 \
--datafusion-format csv
# Specify custom working directory
benchbox run --platform datafusion --benchmark tpch --scale 1.0 \
--datafusion-working-dir /fast/ssd/datafusion
Configuration¶
Constructor Parameters¶
DataFusionAdapter(
working_dir: str = "./datafusion_working",
memory_limit: str = "16G",
target_partitions: int = None, # Defaults to CPU count
data_format: str = "parquet", # "parquet" or "csv"
temp_dir: str = None, # For disk spilling
batch_size: int = 8192, # RecordBatch size
force_recreate: bool = False, # Force rebuild existing data
)
CLI Arguments¶
CLI Argument |
Python Parameter |
Type |
Default |
Description |
|---|---|---|---|---|
|
|
str |
“16G” |
Memory limit (e.g., ‘16G’, ‘8GB’, ‘4096MB’) |
|
|
int |
CPU count |
Number of parallel partitions |
|
|
str |
“parquet” |
Data format: “csv” or “parquet” |
|
|
str |
None |
Temporary directory for disk spilling |
|
|
int |
8192 |
RecordBatch size for query execution |
|
|
str |
Auto |
Working directory for tables and data |
Performance Tuning¶
Memory Configuration¶
# Conservative memory limit for constrained environments
adapter = DataFusionAdapter(memory_limit="4G")
# Aggressive memory allocation for large-scale benchmarks
adapter = DataFusionAdapter(memory_limit="64G")
# Memory limit with disk spilling
adapter = DataFusionAdapter(
memory_limit="16G",
temp_dir="/fast/nvme/temp" # Fast SSD for spilling
)
Parallelism Configuration¶
# Match CPU core count (default)
import os
adapter = DataFusionAdapter(target_partitions=os.cpu_count())
# Conservative for multi-tenant systems
adapter = DataFusionAdapter(target_partitions=4)
# Aggressive for dedicated benchmark server
adapter = DataFusionAdapter(target_partitions=32)
Batch Size Tuning¶
The batch_size parameter controls RecordBatch size for query execution:
# Default (recommended for most workloads)
adapter = DataFusionAdapter(batch_size=8192)
# Smaller batches: Lower latency, lower memory
adapter = DataFusionAdapter(
batch_size=4096,
memory_limit="4G"
)
# Use when: Memory-constrained, interactive queries, lower latency required
# Larger batches: Higher throughput, higher memory
adapter = DataFusionAdapter(
batch_size=16384,
memory_limit="32G"
)
# Use when: Batch processing, maximum throughput, ample memory available
Batch Size Guidelines:
4096: Best for interactive queries and memory-constrained environments
8192 (default): Good balance for most analytical workloads
16384: Optimal for high-throughput batch processing with sufficient RAM
Trade-off: Larger batches = higher memory usage but better vectorized execution
Data Format Selection¶
Parquet Format (Recommended):
Better query performance due to columnar format
Automatic columnar compression
Predicate pushdown optimization
Higher memory usage during conversion
adapter = DataFusionAdapter(
data_format="parquet",
working_dir="/fast/ssd/datafusion" # Fast storage for conversion
)
CSV Format:
Direct loading (no conversion step)
Lower memory footprint
Slower query execution
Good for memory-constrained environments
adapter = DataFusionAdapter(
data_format="csv",
memory_limit="4G" # Lower memory requirements
)
Configuration from Unified Config¶
For programmatic configuration, use BenchBox’s unified config pattern:
from benchbox.platforms.datafusion import DataFusionAdapter
config = {
"benchmark": "tpch",
"scale_factor": 10.0,
"memory_limit": "32G",
"partitions": 16,
"format": "parquet",
"batch_size": 16384
}
adapter = DataFusionAdapter.from_config(config)
The from_config() method automatically generates appropriate working directory paths based on benchmark name and scale factor.
Data Loading¶
DataFusion supports two data loading strategies:
CSV Mode (Direct Loading)¶
Directly registers CSV files as external tables without conversion:
adapter = DataFusionAdapter(data_format="csv")
Characteristics:
Fast initial load
Lower memory usage
Slower query performance
Handles TPC format (pipe-delimited with trailing delimiter)
Parquet Mode (Conversion)¶
Converts CSV to Parquet format first, then registers Parquet tables:
adapter = DataFusionAdapter(data_format="parquet")
Characteristics:
One-time conversion overhead
Better query performance due to columnar format
Higher initial memory usage
Automatic columnar compression
Optimized for repeated query execution
Format Comparison:
Aspect |
CSV Mode |
Parquet Mode |
|---|---|---|
Initial Load |
Faster (streaming) |
Slower (conversion required) |
Query Execution |
Row-by-row parsing |
Columnar with pushdown |
Memory Usage |
Lower |
Higher |
Disk Storage |
N/A (external) |
Compressed (~50-80% of CSV) |
Best For |
Quick testing |
Repeated query execution |
Parquet format provides better query performance due to columnar compression and predicate pushdown optimization. The magnitude of improvement varies depending on query characteristics, data selectivity, and hardware. Run benchmarks to measure performance for your specific environment.
Benchmark Support¶
TPC-H¶
Full support for all TPC-H queries at all scale factors:
from benchbox import TPCH
benchmark = TPCH(scale_factor=10.0)
adapter = DataFusionAdapter(
memory_limit="32G",
data_format="parquet"
)
results = benchmark.run_with_platform(adapter)
Recommended Configuration:
Scale Factor 0.1-1: 8G memory, 4-8 partitions
Scale Factor 1-10: 16G memory, 8-16 partitions
Scale Factor 10+: 32G+ memory, 16+ partitions
TPC-DS¶
Good support for most TPC-DS queries (some complex queries may fail due to SQL feature limitations):
from benchbox import TPCDS
benchmark = TPCDS(scale_factor=1.0)
adapter = DataFusionAdapter(
memory_limit="32G",
data_format="parquet",
target_partitions=16
)
results = benchmark.run_with_platform(adapter)
Known Limitations:
Some window function edge cases
Complex correlated subqueries
Advanced SQL features (platform will validate before execution)
Other Benchmarks¶
DataFusion supports:
SSB (Star Schema Benchmark): Full support
ClickBench: Full support
H2ODB: Full support
AMPLab: Full support
Custom benchmarks: PostgreSQL-compatible SQL
Advanced Features¶
Manual Connection Management¶
For advanced use cases requiring connection reuse or custom query execution:
from benchbox.platforms.datafusion import DataFusionAdapter
# Create adapter
adapter = DataFusionAdapter(
memory_limit="16G",
data_format="parquet"
)
# Create and manage connection manually
connection = adapter.create_connection()
# Execute multiple custom queries on same connection
query1 = "SELECT COUNT(*) FROM lineitem WHERE l_shipdate > '1995-01-01'"
result1 = connection.sql(query1).collect()
query2 = "SELECT AVG(l_extendedprice) FROM lineitem"
result2 = connection.sql(query2).collect()
# Connection is automatically cleaned up when adapter is garbage collected
# or you can explicitly close if needed
When to use manual connection management:
Executing multiple custom queries without benchmark overhead
Testing individual queries during development
Building custom benchmark workflows
Integrating with existing DataFusion SessionContext
Note: benchmark.run_with_platform(adapter) handles connection lifecycle automatically and is recommended for most use cases.
Query Execution Options¶
# Execute with row count validation
result = adapter.execute_query(
connection,
query="SELECT * FROM lineitem WHERE l_shipdate > '1995-01-01'",
query_id="custom_query_1",
benchmark_type="tpch",
scale_factor=1.0,
validate_row_count=True
)
print(f"Query completed in {result['execution_time']:.3f}s")
print(f"Returned {result['rows_returned']} rows")
Dry-Run Mode¶
Preview queries without executing them:
adapter = DataFusionAdapter(dry_run_mode=True)
# Queries will be validated but not executed
results = benchmark.run_with_platform(adapter)
# Check generated SQL in results
Platform Validation¶
# Validate platform capabilities before running
validation = adapter.validate_platform_capabilities("tpch")
if validation.is_valid:
print("Platform ready for benchmarking")
else:
print("Validation errors:", validation.errors)
print("Warnings:", validation.warnings)
Performance Optimization¶
Best Practices¶
Use Parquet format for repeated query execution:
adapter = DataFusionAdapter(data_format="parquet")
Set appropriate memory limits to prevent OOM:
adapter = DataFusionAdapter(memory_limit="16G")
Match partitions to CPU cores:
import os adapter = DataFusionAdapter(target_partitions=os.cpu_count())
Use fast storage for working directory:
adapter = DataFusionAdapter(working_dir="/fast/nvme/datafusion")
Enable query optimization (automatic):
Predicate pushdown
Partition pruning
Join reordering
Aggregate pushdown
Performance Tuning by Scale Factor¶
Small Scale (SF < 1):
adapter = DataFusionAdapter(
memory_limit="4G",
target_partitions=4,
data_format="csv" # CSV is fine for small datasets
)
Medium Scale (SF 1-10):
adapter = DataFusionAdapter(
memory_limit="16G",
target_partitions=8,
data_format="parquet"
)
Large Scale (SF 10+):
adapter = DataFusionAdapter(
memory_limit="64G",
target_partitions=16,
data_format="parquet",
temp_dir="/fast/ssd/temp",
batch_size=16384
)
Comparison with Other Platforms¶
DataFusion vs DuckDB¶
Feature |
DataFusion |
DuckDB |
|---|---|---|
Architecture |
Python bindings to Rust engine |
Embedded C++ database |
Query Performance |
Excellent |
Excellent |
Data Format |
In-memory tables |
Persistent/in-memory |
Python Integration |
PyArrow native |
Python API |
Maturity |
Growing |
Mature |
SQL Compatibility |
PostgreSQL |
PostgreSQL-like |
Persistence |
None (working directory only) |
Native database files |
When to use DataFusion:
Testing Rust-based query engine
PyArrow-centric workflows
In-memory analytics only
Rapid prototyping
When to use DuckDB:
Production analytics workloads
Persistent database required
Mature feature set needed
File-based workflows
DataFusion vs ClickHouse¶
Feature |
DataFusion |
ClickHouse |
|---|---|---|
Deployment |
Embedded |
Client-server |
Scale |
Single-node in-memory |
Distributed clusters |
Setup Complexity |
Minimal |
Moderate |
Query Performance |
Excellent (small-medium) |
Excellent (all scales) |
Operational Overhead |
None |
Moderate-high |
Troubleshooting¶
Common Issues¶
Out of Memory Errors¶
Problem: Query fails with OOM error
Solution:
# Reduce memory limit or use CSV format
adapter = DataFusionAdapter(
memory_limit="8G",
data_format="csv", # Lower memory footprint
temp_dir="/large/disk/temp"
)
Slow CSV Loading¶
Problem: CSV loading takes too long
Solution:
# Use Parquet format for better query performance
adapter = DataFusionAdapter(
data_format="parquet",
working_dir="/fast/ssd/datafusion" # Use fast storage
)
Query Failures¶
Problem: Some queries fail with SQL errors
Solution:
# Validate platform capabilities first
validation = adapter.validate_platform_capabilities("tpcds")
if validation.warnings:
print("Platform warnings:", validation.warnings)
# Check query compatibility with PostgreSQL dialect
Trailing Delimiter Issues¶
Problem: TPC files with trailing pipe delimiters cause extra columns
Solution: The adapter automatically handles trailing delimiters in TPC format files. No action needed.
Limitations¶
DataFusion has some limitations compared to full database systems:
No persistence: Data is session-scoped (working directory contains Parquet conversions only)
No transactions: ACID properties not enforced
No constraints: PRIMARY KEY and FOREIGN KEY are informational only
SQL feature gaps: Some advanced SQL features may not be supported
Single-node: No distributed query execution
See Also¶
Platform Documentation¶
Platform Selection Guide - Choosing the right platform
Quick Reference - Multi-platform quick start
Comparison Matrix - Detailed platform comparison
DuckDB Platform Guide - Similar in-process analytics
API Reference¶
DataFusion Adapter API - Complete API documentation
Platform Base API - Platform adapter interface
Benchmark Guides¶
TPC-H Benchmark - TPC-H on DataFusion
TPC-DS Benchmark - TPC-DS on DataFusion
Benchmark Catalog - All available benchmarks
External Resources¶
Apache DataFusion Documentation - Official DataFusion docs
DataFusion Python Bindings - Python API reference
Apache Arrow - Arrow columnar format