Benchmark APIs¶
Complete reference for using benchmark classes programmatically in Python.
Overview¶
BenchBox provides Python APIs for all supported benchmarks, allowing you to:
Instantiate benchmarks with custom scale factors
Generate data programmatically
Access benchmark queries with dialect translation
Run benchmarks on any supported platform
Customize execution parameters
All benchmark classes extend Base Benchmark API and share a common interface.
Available Benchmarks¶
Standard Benchmarks¶
Benchmark |
Description |
Query Count |
|---|---|---|
TPC-H analytical queries |
22 queries |
|
TPC-DS decision support queries |
99 queries |
|
TPC-DI |
Data integration benchmark |
ETL pipelines |
SSB |
Star Schema Benchmark |
13 queries |
ClickBench |
Real-world analytics |
43 queries |
Quick Start¶
Basic Usage¶
from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter
# Create benchmark
benchmark = TPCH(scale_factor=1.0)
# Generate data
data_files = benchmark.generate_data()
# Get queries
queries = benchmark.get_queries()
# Run on platform
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)
Common Patterns¶
Data Generation¶
from benchbox.tpch import TPCH
from pathlib import Path
# Generate data to specific directory
benchmark = TPCH(scale_factor=0.1, output_dir="./data")
data_files = benchmark.generate_data()
# Check generated files
for table_name, file_path in benchmark.tables.items():
size_mb = Path(file_path).stat().st_size / 1024 / 1024
print(f"{table_name}: {size_mb:.2f} MB")
Query Access¶
# Get all queries
queries = benchmark.get_queries()
print(f"Total queries: {len(queries)}")
# Get specific query
q1 = benchmark.get_query(1)
print(q1)
# Get with dialect translation
q1_bq = benchmark.get_query(1, dialect="bigquery")
q1_sf = benchmark.get_query(1, dialect="snowflake")
Platform Execution¶
from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter
from benchbox.platforms.clickhouse import ClickHouseAdapter
benchmark = TPCH(scale_factor=1.0)
# Run on DuckDB
duckdb_adapter = DuckDBAdapter()
duckdb_results = benchmark.run_with_platform(duckdb_adapter)
# Run on ClickHouse
clickhouse_adapter = ClickHouseAdapter(host="localhost")
clickhouse_results = benchmark.run_with_platform(clickhouse_adapter)
# Compare results
print(f"DuckDB: {duckdb_results.total_execution_time:.2f}s")
print(f"ClickHouse: {clickhouse_results.total_execution_time:.2f}s")
Custom Scale Factors¶
# Small for development
small_bench = TPCH(scale_factor=0.01) # ~10 MB
# Standard for testing
standard_bench = TPCH(scale_factor=1.0) # ~1 GB
# Large for production
large_bench = TPCH(scale_factor=100.0) # ~100 GB
Result Analysis¶
from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter
benchmark = TPCH(scale_factor=1.0)
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)
# Overall statistics
print(f"Benchmark: {results.benchmark_name}")
print(f"Platform: {results.platform}")
print(f"Total time: {results.total_execution_time:.2f}s")
print(f"Average query time: {results.average_query_time:.3f}s")
print(f"Success rate: {results.successful_queries}/{results.total_queries}")
# Query-level results
for qr in results.query_results:
if qr.status == "SUCCESS":
print(f"{qr.query_id}: {qr.execution_time:.3f}s ({qr.row_count} rows)")
else:
print(f"{qr.query_id}: FAILED - {qr.error_message}")
# Save results
results.to_json_file("benchmark_results.json")
Advanced Usage¶
Query Streams¶
from benchbox.tpch import TPCH
benchmark = TPCH(scale_factor=1.0)
# Generate multiple query streams for throughput testing
streams = benchmark.generate_streams(
num_streams=5,
rng_seed=42,
streams_output_dir="./streams"
)
# Get stream information
for stream_id in range(1, 6):
info = benchmark.get_stream_info(stream_id)
print(f"Stream {stream_id}: {len(info['queries'])} queries")
Custom Parameters¶
# Generate query with custom parameters
q1_custom = benchmark.get_query(
query_id=1,
seed=42, # Deterministic parameters
scale_factor=10.0 # Override benchmark scale factor
)
Schema Access¶
# Get schema information
schema = benchmark.get_schema()
for table in schema:
print(f"Table: {table['name']}")
print(f" Columns: {len(table['columns'])}")
for col in table['columns']:
print(f" {col['name']}: {col['type']}")
# Get CREATE TABLE SQL
create_sql = benchmark.get_create_tables_sql(dialect="duckdb")
print(create_sql)
Error Handling¶
from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter
try:
# Invalid scale factor
benchmark = TPCH(scale_factor=-1.0)
except ValueError as e:
print(f"Error: {e}")
try:
# Invalid query ID
benchmark = TPCH(scale_factor=1.0)
query = benchmark.get_query(999)
except ValueError as e:
print(f"Error: {e}")
try:
# Platform connection error
adapter = DuckDBAdapter(database_path="/invalid/path/db.duckdb")
results = benchmark.run_with_platform(adapter)
except Exception as e:
print(f"Platform error: {e}")
Best Practices¶
Scale Factor Selection¶
Development and CI: Use SF=0.01 to 0.1
# Fast iteration dev_benchmark = TPCH(scale_factor=0.01) # ~10 MB
Testing and validation: Use SF=1.0
# Standard reference size test_benchmark = TPCH(scale_factor=1.0) # ~1 GB
Performance benchmarking: Use SF=10 to 100
# Production-like workload perf_benchmark = TPCH(scale_factor=10.0) # ~10 GB
Data Management¶
Reuse generated data:
# Generate once benchmark = TPCH(scale_factor=1.0, output_dir="./data/tpch_sf1") benchmark.generate_data() # Reuse across runs benchmark2 = TPCH(scale_factor=1.0, output_dir="./data/tpch_sf1") # Data already exists, no regeneration needed
Clean up after testing:
import shutil from pathlib import Path # Clean up generated data data_dir = Path("./data/tpch_sf1") if data_dir.exists(): shutil.rmtree(data_dir)
Result Validation¶
Check row counts:
# Expected row counts for TPC-H SF=1 expected_rows = { "LINEITEM": 6001215, "ORDERS": 1500000, "CUSTOMER": 150000, "PART": 200000, "SUPPLIER": 10000, "PARTSUPP": 800000, "NATION": 25, "REGION": 5, } for qr in results.query_results: if qr.query_id in expected_rows: assert qr.row_count == expected_rows[qr.query_id]
Verify query results:
# Compare with baseline baseline = BenchmarkResults.from_json_file("baseline.json") current = results for qr_current in current.query_results: qr_baseline = next( (qr for qr in baseline.query_results if qr.query_id == qr_current.query_id), None ) if qr_baseline: assert qr_current.row_count == qr_baseline.row_count
See Also¶
Conceptual Documentation¶
BenchBox Architecture - System architecture overview
Benchmarking Workflows - Common benchmarking workflows
Data Model - Result data structures
API References¶
Base Benchmark API - Base benchmark interface
Results API - Results API
Python API Reference - Python API overview
Platform Adapters¶
DuckDB Platform Adapter - DuckDB adapter
ClickHouse Platform Adapter - ClickHouse adapter
Databricks Platform Adapter - Databricks adapter
BigQuery Platform Adapter - BigQuery adapter
Benchmark Guides¶
TPC-H Benchmark - TPC-H guide
TPC-DS Benchmark - TPC-DS guide
ClickBench (ClickHouse Analytics Benchmark) - ClickBench guide
/benchmarks/README - Benchmark catalog
External Resources¶
TPC Official Website - TPC benchmark specifications
TPC-H Specification - Official TPC-H spec
TPC-DS Specification - Official TPC-DS spec