Apache DataFusion Platform Adapter¶
The DataFusion adapter provides in-memory analytical query execution using Apache DataFusion’s fast query engine.
Overview¶
DataFusion is a fast, embeddable query engine written in Rust with Python bindings, providing:
In-memory execution - Optimized for analytical workloads
Dual format support - CSV direct loading or Parquet conversion
PostgreSQL-compatible SQL - Broad SQL dialect compatibility
PyArrow integration - Native Arrow columnar format support
Automatic optimization - Query planning and execution optimization
Common use cases:
In-process analytics without database overhead
Rapid prototyping and development
PyArrow-based data workflows
OLAP benchmark testing
Memory-constrained environments (CSV mode)
Quick Start¶
Basic usage:
from benchbox import TPCH
from benchbox.platforms.datafusion import DataFusionAdapter
# In-memory analytics with Parquet (recommended)
adapter = DataFusionAdapter(
working_dir="./datafusion_working",
memory_limit="16G",
data_format="parquet"
)
# Run benchmark
benchmark = TPCH(scale_factor=1.0)
results = benchmark.run_with_platform(adapter)
API Reference¶
DataFusionAdapter Class¶
- class DataFusionAdapter(**config)[source]¶
Bases:
PlatformAdapterApache DataFusion platform adapter with optimized bulk loading and execution.
- property platform_name: str¶
Return the name of this database platform.
Default implementation returns the class name. Concrete adapters may override to provide a user-friendly display name. Tests may instantiate lightweight mock adapters without overriding this property.
- get_target_dialect()[source]¶
Get the target SQL dialect for DataFusion.
DataFusion uses PostgreSQL-compatible SQL dialect.
- __init__(**config)[source]¶
Initialize the platform adapter with configuration.
- Parameters:
**config – Platform-specific configuration options
- create_connection(**connection_config)[source]¶
Create DataFusion SessionContext with optimized configuration.
- create_schema(benchmark, connection)[source]¶
Create schema using DataFusion.
Note: For DataFusion, actual table creation happens during load_data() via CREATE EXTERNAL TABLE. This method validates the schema is available.
- load_data(benchmark, connection, data_dir)[source]¶
Load data into DataFusion using CSV or Parquet format.
DataFusion supports two loading modes: 1. CSV mode: Direct loading of CSV files via CREATE EXTERNAL TABLE 2. Parquet mode: Convert CSV to Parquet first for 10-50x query performance
- configure_for_benchmark(connection, benchmark_type)[source]¶
Apply DataFusion-specific optimizations based on benchmark type.
- execute_query(connection, query, query_id, benchmark_type=None, scale_factor=None, validate_row_count=True, stream_id=None)[source]¶
Execute query with detailed timing and result collection.
- apply_platform_optimizations(platform_config, connection)[source]¶
Apply DataFusion-specific platform optimizations.
DataFusion optimizations are primarily configured at SessionContext creation. This method logs the optimization configuration.
- apply_constraint_configuration(primary_key_config, foreign_key_config, connection)[source]¶
Apply constraint configuration.
Note: DataFusion does not enforce PRIMARY KEY or FOREIGN KEY constraints. This method logs the configuration but does not apply constraints.
Constructor Parameters¶
DataFusionAdapter(
working_dir: str = "./datafusion_working",
memory_limit: str = "16G",
target_partitions: Optional[int] = None,
data_format: str = "parquet",
temp_dir: Optional[str] = None,
batch_size: int = 8192,
force_recreate: bool = False,
tuning_config: Optional[Dict[str, Any]] = None,
verbose_enabled: bool = False,
very_verbose: bool = False
)
Parameters:
working_dir (str): Working directory for DataFusion tables and Parquet data. Default: “./datafusion_working”
memory_limit (str): Maximum memory usage (e.g., “16G”, “8GB”, “4096MB”). Default: “16G”
target_partitions (int, optional): Number of parallel partitions. Default: CPU count
data_format (str): Data format: “parquet” (recommended) or “csv”. Default: “parquet”
temp_dir (str, optional): Temporary directory for disk spilling. Default: None
batch_size (int): RecordBatch size for query execution. Default: 8192
force_recreate (bool): Force recreate existing data. Default: False
tuning_config (dict, optional): Additional tuning configuration
verbose_enabled (bool): Enable verbose logging. Default: False
very_verbose (bool): Enable very verbose logging. Default: False
Configuration Examples¶
Basic Configuration¶
In-memory analytics with default settings:
from benchbox.platforms.datafusion import DataFusionAdapter
# Default configuration (Parquet format, 16G memory)
adapter = DataFusionAdapter()
# Custom working directory
adapter = DataFusionAdapter(
working_dir="/fast/ssd/datafusion"
)
Performance Optimized¶
Optimized for high-performance benchmarks:
import os
adapter = DataFusionAdapter(
working_dir="/fast/nvme/datafusion",
memory_limit="64G",
target_partitions=os.cpu_count(), # Use all cores
data_format="parquet", # Columnar format with compression
batch_size=16384, # Larger batches for throughput
temp_dir="/fast/ssd/temp"
)
Memory Constrained¶
Optimized for memory-limited environments:
adapter = DataFusionAdapter(
memory_limit="4G",
target_partitions=4,
data_format="csv", # Lower memory footprint
batch_size=4096
)
Data Format Selection¶
Choose between CSV and Parquet formats:
# Parquet format (recommended for query performance)
adapter_parquet = DataFusionAdapter(
data_format="parquet",
memory_limit="16G"
)
# CSV format (faster initial load, lower memory)
adapter_csv = DataFusionAdapter(
data_format="csv",
memory_limit="8G"
)
Configuration from Unified Config¶
Create adapter from BenchBox’s unified configuration dictionary:
from benchbox.platforms.datafusion import DataFusionAdapter
config = {
"benchmark": "tpch",
"scale_factor": 10.0,
"output_dir": "/data/benchmarks",
"memory_limit": "32G",
"partitions": 16,
"format": "parquet",
"batch_size": 16384,
"force": False
}
adapter = DataFusionAdapter.from_config(config)
Configuration Keys:
benchmark (str): Benchmark name (e.g., “tpch”, “tpcds”)
scale_factor (float): Benchmark scale factor
output_dir (str, optional): Output directory for benchmark data
memory_limit (str): Memory limit (e.g., “16G”, “32G”)
partitions (int, optional): Number of parallel partitions
format (str): Data format (“csv” or “parquet”)
batch_size (int): RecordBatch size
temp_dir (str, optional): Temporary directory for disk spilling
force (bool): Force recreate existing data
working_dir (str, optional): Explicit working directory path
The from_config() method automatically generates appropriate paths based on
benchmark name and scale factor when working_dir is not explicitly provided.
Data Loading¶
DataFusion supports two data loading strategies:
CSV Mode (Direct Loading)¶
Directly registers CSV files as external tables:
adapter = DataFusionAdapter(data_format="csv")
# Automatically handles TPC format:
# - Pipe-delimited (|)
# - Trailing delimiter
# - No header row
Characteristics:
Fast initial load (seconds)
Lower memory usage
Slower query execution
Good for one-time queries or memory-constrained environments
Parquet Mode (Conversion)¶
Converts CSV to Parquet format first:
adapter = DataFusionAdapter(data_format="parquet")
# Conversion process:
# 1. Read CSV files with PyArrow
# 2. Handle trailing delimiters
# 3. Apply schema from benchmark
# 4. Write compressed Parquet files
# 5. Register Parquet tables in DataFusion
Characteristics:
One-time conversion overhead (30-60 seconds for SF=1)
Better query performance due to columnar format
Automatic columnar compression (~50-80% size reduction)
Suited for repeated query execution
Performance Comparison¶
# CSV Mode
adapter_csv = DataFusionAdapter(data_format="csv")
# Load time: ~5 seconds (SF=1)
# Query time: Baseline
# Parquet Mode
adapter_parquet = DataFusionAdapter(data_format="parquet")
# Load time: ~30 seconds (SF=1)
# Query time: Faster than CSV (varies by query)
Query Execution¶
Execute Queries¶
Execute SQL queries directly:
from benchbox.platforms.datafusion import DataFusionAdapter
adapter = DataFusionAdapter()
connection = adapter.create_connection()
# Execute query using SessionContext
df = connection.sql("SELECT COUNT(*) FROM lineitem")
result_batches = df.collect()
# Get row count
row_count = result_batches[0].column(0)[0]
print(f"Row count: {row_count}")
Execute with Validation¶
Execute queries with automatic row count validation:
result = adapter.execute_query(
connection,
query="SELECT * FROM lineitem WHERE l_shipdate > '1995-01-01'",
query_id="q1",
benchmark_type="tpch",
scale_factor=1.0,
validate_row_count=True
)
print(f"Status: {result['status']}")
print(f"Execution time: {result['execution_time']:.3f}s")
print(f"Rows returned: {result['rows_returned']}")
if result['validation_result']:
print(f"Expected rows: {result['validation_result'].expected_row_count}")
Dry-Run Mode¶
Preview queries without execution:
from benchbox import TPCH
adapter = DataFusionAdapter(dry_run_mode=True)
# Queries will be validated but not executed
benchmark = TPCH(scale_factor=1.0)
results = benchmark.run_with_platform(adapter)
# Access captured SQL
for query_id, sql in adapter.captured_sql.items():
print(f"{query_id}: {sql[:100]}...")
Platform Information¶
Get Platform Details¶
Retrieve DataFusion version and configuration:
adapter = DataFusionAdapter(memory_limit="16G")
connection = adapter.create_connection()
info = adapter.get_platform_info(connection)
print(f"Platform: {info['platform_name']}")
print(f"Version: {info['platform_version']}")
print(f"Memory limit: {info['configuration']['memory_limit']}")
print(f"Partitions: {info['configuration']['target_partitions']}")
print(f"Data format: {info['configuration']['data_format']}")
Validate Capabilities¶
Check platform capabilities before running benchmarks:
validation = adapter.validate_platform_capabilities("tpch")
if validation.is_valid:
print("Platform ready for TPC-H benchmark")
else:
print("Validation errors:")
for error in validation.errors:
print(f" - {error}")
if validation.warnings:
print("Warnings:")
for warning in validation.warnings:
print(f" - {warning}")
# Access platform details
print(f"DataFusion version: {validation.details.get('datafusion_version')}")
Advanced Features¶
Custom Configuration¶
Configure DataFusion SessionContext options:
# The adapter automatically configures:
# - Target partitions (parallelism)
# - Memory limits
# - Parquet optimizations (pruning, pushdown)
# - Identifier normalization (lowercase for TPC compatibility)
# - Batch size
adapter = DataFusionAdapter(
memory_limit="32G",
target_partitions=16,
batch_size=16384
)
Working Directory Management¶
Manage DataFusion working directory:
# Check if working directory exists with data
exists = adapter.check_database_exists()
if exists:
print("Existing DataFusion data found")
# Drop existing data if needed
adapter.drop_database()
# Or force recreate
adapter = DataFusionAdapter(force_recreate=True)
PyArrow Integration¶
DataFusion uses PyArrow for data representation:
import pyarrow as pa
import pyarrow.parquet as pq
adapter = DataFusionAdapter(data_format="parquet")
connection = adapter.create_connection()
# Query results are PyArrow RecordBatches
df = connection.sql("SELECT * FROM lineitem LIMIT 10")
batches = df.collect()
# Access as PyArrow Table
table = pa.Table.from_batches(batches)
print(f"Schema: {table.schema}")
print(f"Rows: {table.num_rows}")
# Convert to Pandas
pandas_df = table.to_pandas()
Advanced Features¶
Manual Connection Management¶
For advanced use cases requiring connection reuse:
from benchbox.platforms.datafusion import DataFusionAdapter
adapter = DataFusionAdapter(memory_limit="16G", data_format="parquet")
connection = adapter.create_connection()
# Execute multiple custom queries
result1 = connection.sql("SELECT COUNT(*) FROM lineitem").collect()
result2 = connection.sql("SELECT AVG(l_extendedprice) FROM lineitem").collect()
When to use:
Executing multiple custom queries without benchmark overhead
Testing individual queries during development
Building custom benchmark workflows
Integrating with existing DataFusion SessionContext
Note: benchmark.run_with_platform(adapter) handles connection lifecycle automatically and is recommended for most use cases.
Best Practices¶
Memory Management¶
Set appropriate memory limits for your system:
import psutil available_memory = psutil.virtual_memory().available memory_limit = f"{int(available_memory * 0.7 / 1024**3)}G" adapter = DataFusionAdapter(memory_limit=memory_limit)
Use CSV format for memory-constrained environments:
adapter = DataFusionAdapter( data_format="csv", memory_limit="4G" )
Configure temp directory for disk spilling:
adapter = DataFusionAdapter( memory_limit="16G", temp_dir="/fast/ssd/temp" )
Performance Optimization¶
Use Parquet format for repeated query execution:
adapter = DataFusionAdapter(data_format="parquet")
Match partitions to CPU cores:
import os adapter = DataFusionAdapter( target_partitions=os.cpu_count() )
Use fast storage for working directory:
adapter = DataFusionAdapter( working_dir="/fast/nvme/datafusion", data_format="parquet" )
Tune batch size for your workload:
# Smaller batches: Lower latency, lower memory adapter = DataFusionAdapter( batch_size=4096, memory_limit="4G" ) # Larger batches: Higher throughput, higher memory adapter = DataFusionAdapter( batch_size=16384, memory_limit="32G" )
Batch Size Guidelines:
4096: Best for interactive queries and memory-constrained environments
8192 (default): Good balance for most analytical workloads
16384: Optimal for high-throughput batch processing with sufficient RAM
Trade-off: Larger batches = higher memory usage but better vectorized execution
Scale Factor Recommendations¶
Small Scale (SF < 1):
adapter = DataFusionAdapter(
memory_limit="4G",
target_partitions=4,
data_format="csv"
)
Medium Scale (SF 1-10):
adapter = DataFusionAdapter(
memory_limit="16G",
target_partitions=8,
data_format="parquet"
)
Large Scale (SF 10+):
adapter = DataFusionAdapter(
memory_limit="64G",
target_partitions=16,
data_format="parquet",
temp_dir="/fast/ssd/temp",
batch_size=16384
)
Common Issues¶
Out of Memory Errors¶
Problem: Query fails with out of memory error
Solution:
# Reduce memory limit or use CSV format
adapter = DataFusionAdapter(
memory_limit="8G",
data_format="csv"
)
# Or enable disk spilling
adapter = DataFusionAdapter(
memory_limit="8G",
temp_dir="/large/disk/temp"
)
Slow Query Performance¶
Problem: Queries execute slowly
Solutions:
# 1. Use Parquet format
adapter = DataFusionAdapter(data_format="parquet")
# 2. Increase parallelism
adapter = DataFusionAdapter(target_partitions=16)
# 3. Use fast storage
adapter = DataFusionAdapter(
working_dir="/fast/nvme/datafusion"
)
SQL Feature Errors¶
Problem: Some queries fail with SQL errors
Solution:
# Validate platform capabilities first
validation = adapter.validate_platform_capabilities("tpcds")
if validation.warnings:
print("Platform warnings:")
for warning in validation.warnings:
print(f" - {warning}")
# DataFusion uses PostgreSQL dialect
# Some advanced SQL features may not be supported
See Also¶
Platform Documentation¶
Apache DataFusion Platform Guide - Comprehensive DataFusion platform guide
Platform Selection Guide - Platform selection guide
Platform Comparison Matrix - Platform comparison
DuckDB Platform Adapter - Similar in-process analytics platform
Benchmark Guides¶
TPC-H Benchmark - TPC-H benchmark
TPC-DS Benchmark - TPC-DS benchmark
Benchmarks - All benchmarks
API Reference¶
Base Benchmark API - Base platform adapter interface
index - Python API overview
External Resources¶
Apache DataFusion Documentation - Official docs
DataFusion Python Bindings - Python API
Apache Arrow - Arrow columnar format