TPC-H Benchmark API¶
Complete Python API reference for the TPC-H benchmark.
Overview¶
The TPC-H benchmark simulates a decision support system with 22 analytical queries. It models a wholesale supplier with parts, suppliers, customers, and orders.
Key Features:
22 analytical queries covering complex business scenarios
8 tables with realistic relationships
Parametrized queries for randomization
Official TPC-H specification compliance
Multiple scale factors (SF=0.01 to 30000+)
Quick Start¶
from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter
# Create benchmark
benchmark = TPCH(scale_factor=1.0)
# Generate data
benchmark.generate_data()
# Run on platform
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)
print(f"Completed in {results.total_execution_time:.2f}s")
API Reference¶
TPCH Class¶
- class TPCH(scale_factor=1.0, output_dir=None, **kwargs)[source]¶
Bases:
BaseBenchmarkTPC-H benchmark implementation.
Provides TPC-H benchmark implementation, including data generation and access to the 22 benchmark queries.
Official specification: http://www.tpc.org/tpch
- __init__(scale_factor=1.0, output_dir=None, **kwargs)[source]¶
Initialize TPC-H benchmark instance.
- Parameters:
scale_factor (float) – Scale factor for the benchmark (1.0 = ~1GB)
output_dir (str | Path | None) – Directory to output generated data files
**kwargs (Any) – Additional implementation-specific options
- Raises:
ValueError – If scale_factor is not positive
TypeError – If scale_factor is not a number
- generate_data()[source]¶
Generate TPC-H benchmark data.
- Returns:
A list of paths to the generated data files
- Return type:
list[str | Path]
- get_queries(dialect=None, base_dialect=None)[source]¶
Get all TPC-H benchmark queries.
- Parameters:
dialect (str | None) – Target SQL dialect for translation (e.g., ‘duckdb’, ‘bigquery’, ‘snowflake’) If None, returns queries in their original format.
- Returns:
A dictionary mapping query IDs (1-22) to query strings
- Return type:
dict[str, str]
- get_query(query_id, *, params=None, seed=None, scale_factor=None, dialect=None, base_dialect=None, **kwargs)[source]¶
Get a specific TPC-H benchmark query.
- Parameters:
query_id (int) – The ID of the query to retrieve (1-22)
params (dict[str, Any] | None) – Optional parameters to customize the query (legacy parameter, mostly ignored)
seed (int | None) – Random number generator seed for parameter generation
scale_factor (float | None) – Scale factor for parameter calculations
dialect (str | None) – Target SQL dialect
base_dialect (str | None) – Source SQL dialect (default: netezza)
**kwargs – Additional parameters
- Returns:
The query string
- Raises:
ValueError – If the query_id is invalid
TypeError – If query_id is not an integer
- Return type:
str
- get_schema()[source]¶
Get the TPC-H schema.
- Returns:
A list of dictionaries describing the tables in the schema
- Return type:
list[dict]
- get_create_tables_sql(dialect='standard', tuning_config=None)[source]¶
Get SQL to create all TPC-H tables.
- Parameters:
dialect (str) – SQL dialect to use (currently ignored, TPC-H uses standard SQL)
tuning_config – Unified tuning configuration for constraint settings
- Returns:
SQL script for creating all tables
- Return type:
str
- generate_streams(num_streams=1, rng_seed=None, streams_output_dir=None)[source]¶
Generate TPC-H query streams.
- Parameters:
num_streams (int) – Number of concurrent streams to generate
rng_seed (int | None) – Random number generator seed for parameter generation
streams_output_dir (str | Path | None) – Directory to output stream files
- Returns:
List of paths to generated stream files
- Return type:
list[Path]
- get_stream_info(stream_id)[source]¶
Get information about a specific stream.
- Parameters:
stream_id (int) – Stream identifier
- Returns:
Dictionary containing stream information
- Return type:
dict[str, Any]
- get_all_streams_info()[source]¶
Get information about all streams.
- Returns:
List of dictionaries containing stream information
- Return type:
list[dict[str, Any]]
- property tables: dict[str, Path]¶
Get the mapping of table names to data file paths.
- Returns:
Dictionary mapping table names to paths of generated data files
- run_official_benchmark(connection_factory, config=None)[source]¶
Run the official TPC-H benchmark.
This method provides compatibility for official benchmark examples.
- Parameters:
connection_factory – Factory function or connection object
config – Optional configuration parameters
- Returns:
Dictionary with benchmark results
- run_power_test(connection_factory, config=None)[source]¶
Run the TPC-H power test.
This method provides compatibility for power test examples.
- Parameters:
connection_factory – Factory function or connection object
config – Optional configuration parameters
- Returns:
Dictionary with power test results
- run_maintenance_test(connection_factory, config=None)[source]¶
Run the TPC-H maintenance test.
This method provides compatibility for maintenance test examples.
- Parameters:
connection_factory – Factory function or connection object
config – Optional configuration parameters
- Returns:
Dictionary with maintenance test results
- apply_verbosity(settings)¶
Apply verbosity settings to the mixin consumer.
- property benchmark_name: str¶
Get the human-readable benchmark name.
- create_enhanced_benchmark_result(platform, query_results, execution_metadata=None, phases=None, resource_utilization=None, performance_characteristics=None, **kwargs)¶
Create a BenchmarkResults object with standardized fields.
This centralizes the logic for creating benchmark results that was previously duplicated across platform adapters and CLI orchestrator.
- Parameters:
platform (str) – Platform name (e.g., “DuckDB”, “ClickHouse”)
query_results (list[dict[str, Any]]) – List of query execution results
execution_metadata (dict[str, Any] | None) – Optional execution metadata
phases (dict[str, dict[str, Any]] | None) – Optional phase tracking information
resource_utilization (dict[str, Any] | None) – Optional resource usage metrics
performance_characteristics (dict[str, Any] | None) – Optional performance analysis
**kwargs (Any) – Additional fields to override defaults
- Returns:
Fully configured BenchmarkResults object
- Return type:
- format_results(benchmark_result)¶
Format benchmark results for display.
- Parameters:
benchmark_result (dict[str, Any]) – Result dictionary from run_benchmark()
- Returns:
Formatted string representation of the results
- Return type:
str
- get_data_source_benchmark()¶
Return the canonical source benchmark when data is shared.
Benchmarks that reuse data generated by another benchmark (for example,
PrimitivesreusingTPC-Hdatasets) should override this method and return the lower-case identifier of the source benchmark. Benchmarks that produce their own data should returnNone(default).
- log_debug_info(context='Debug')¶
Log comprehensive debug information including version details.
- log_error_with_debug_info(error, context='Error')¶
Log an error with comprehensive debug information.
- log_operation_complete(operation, duration=None, details='')¶
- log_operation_start(operation, details='')¶
- log_verbose(message)¶
- log_version_warning()¶
Log version consistency warnings if any exist.
- log_very_verbose(message)¶
- property logger: Logger¶
Return the logger configured for the verbosity mixin consumer.
- quiet: bool = False¶
- run_benchmark(connection, query_ids=None, fetch_results=False, setup_database=True)¶
Run the complete benchmark suite.
- Parameters:
connection (DatabaseConnection) – Database connection to execute queries on
query_ids (list[int | str] | None) – Optional list of specific query IDs to run (defaults to all)
fetch_results (bool) – Whether to fetch and return query results
setup_database (bool) – Whether to set up the database first
- Returns:
benchmark_name: Name of the benchmark
total_execution_time: Total time for all queries
total_queries: Number of queries executed
successful_queries: Number of queries that succeeded
failed_queries: Number of queries that failed
query_results: List of individual query results
setup_time: Time taken for database setup (if performed)
- Return type:
Dictionary containing
- Raises:
Exception – If benchmark execution fails
- run_query(query_id, connection, params=None, fetch_results=False)¶
Execute single query and return timing and results.
- Parameters:
query_id (int | str) – ID of the query to execute
connection (DatabaseConnection) – Database connection to execute query on
params (dict[str, Any] | None) – Optional parameters for query customization
fetch_results (bool) – Whether to fetch and return query results
- Returns:
query_id: Executed query ID
execution_time: Time taken to execute query in seconds
query_text: Executed query text
results: Query results if fetch_results=True, otherwise None
row_count: Number of rows returned (if results fetched)
- Return type:
Dictionary containing
- Raises:
ValueError – If query_id is invalid
Exception – If query execution fails
- run_with_platform(platform_adapter, **run_config)¶
Run complete benchmark using platform-specific optimizations.
This method provides a unified interface for running benchmarks using database platform adapters that handle connection management, data loading optimizations, and query execution.
This is the standard method that all benchmarks should support for integration with the CLI and other orchestration tools.
- Parameters:
platform_adapter – Platform adapter instance (e.g., DuckDBAdapter)
**run_config – Configuration options: - categories: List of query categories to run (if benchmark supports) - query_subset: List of specific query IDs to run - connection: Connection configuration - benchmark_type: Type hint for optimizations (‘olap’, ‘oltp’, etc.)
- Returns:
BenchmarkResults object with execution results
Example
from benchbox.platforms import DuckDBAdapter
benchmark = SomeBenchmark(scale_factor=0.1) adapter = DuckDBAdapter() results = benchmark.run_with_platform(adapter)
- setup_database(connection)¶
Set up database with schema and data.
Creates necessary database schema and loads benchmark data into the database.
- Parameters:
connection (DatabaseConnection) – Database connection to set up
- Raises:
ValueError – If data generation fails
Exception – If database setup fails
- translate_query(query_id, dialect)¶
Translate a query to a specific SQL dialect.
- Parameters:
query_id (int | str) – The ID of the query to translate
dialect (str) – The target SQL dialect
- Returns:
The translated query string
- Raises:
ValueError – If the query_id is invalid
ImportError – If sqlglot is not installed
ValueError – If the dialect is not supported
- Return type:
str
- verbose: bool = False¶
- verbose_enabled: bool = False¶
- verbose_level: int = 0¶
- property verbosity_settings: VerbositySettings¶
Return the current verbosity settings.
- very_verbose: bool = False¶
Constructor¶
TPCH(
scale_factor: float = 1.0,
output_dir: Optional[Union[str, Path]] = None,
**kwargs
)
Parameters:
scale_factor (float): Data size multiplier. SF=1.0 generates ~1GB of data. Range: 0.01 to 30000+
output_dir (str|Path, optional): Directory for generated data files. Default: temporary directory
kwargs: Additional options (e.g., parallel=True for parallel generation)
Raises:
ValueError: If scale_factor is not positive
TypeError: If scale_factor is not a number
Methods¶
generate_data()¶
Generate TPC-H benchmark data.
data_files = benchmark.generate_data()
print(f"Generated {len(data_files)} table files")
- Returns:
List[Union[str, Path]]: Paths to generated data files
get_query(query_id, *, params=None, seed=None, scale_factor=None, dialect=None)¶
Get a specific TPC-H query.
# Get query 1
q1 = benchmark.get_query(1)
# Get with dialect translation
q1_bq = benchmark.get_query(1, dialect="bigquery")
# Get with custom parameters
q1_param = benchmark.get_query(1, seed=42, scale_factor=10.0)
Parameters:
query_id (int): Query ID (1-22)
params (dict, optional): Query parameters (legacy, mostly ignored)
seed (int, optional): RNG seed for parameter generation
scale_factor (float, optional): Override default scale factor
dialect (str, optional): Target SQL dialect (duckdb, bigquery, snowflake, etc.)
- Returns:
str: Query SQL text
Raises:
ValueError: If query_id not in range 1-22
TypeError: If query_id is not an integer
get_queries(dialect=None, base_dialect=None)¶
Get all TPC-H queries.
# Get all queries
queries = benchmark.get_queries()
print(f"Total queries: {len(queries)}")
# Get with dialect translation
queries_bq = benchmark.get_queries(dialect="bigquery")
Parameters:
dialect (str, optional): Target SQL dialect
base_dialect (str, optional): Source dialect for translation
- Returns:
Dict[str, str]: Dictionary mapping query IDs to SQL text
get_schema()¶
Get TPC-H schema information.
schema = benchmark.get_schema()
for table in schema:
print(f"{table['name']}: {len(table['columns'])} columns")
- Returns:
List[dict]: List of table definitions with columns and types
get_create_tables_sql(dialect=”standard”, tuning_config=None)¶
Get CREATE TABLE SQL for all tables.
# Standard SQL
create_sql = benchmark.get_create_tables_sql()
# With dialect
create_sql_bq = benchmark.get_create_tables_sql(dialect="bigquery")
# With tuning configuration
from benchbox.core.tuning.interface import UnifiedTuningConfiguration
tuning = UnifiedTuningConfiguration(...)
create_sql_tuned = benchmark.get_create_tables_sql(tuning_config=tuning)
Parameters:
dialect (str): Target SQL dialect. Default: “standard”
tuning_config (UnifiedTuningConfiguration, optional): Tuning settings
- Returns:
str: SQL script for creating all tables
generate_streams(num_streams=1, rng_seed=None, streams_output_dir=None)¶
Generate query streams for throughput testing.
# Generate 5 concurrent streams
streams = benchmark.generate_streams(
num_streams=5,
rng_seed=42,
streams_output_dir="./streams"
)
for stream_path in streams:
print(f"Stream: {stream_path}")
Parameters:
num_streams (int): Number of streams to generate. Default: 1
rng_seed (int, optional): Seed for reproducible streams
streams_output_dir (str|Path, optional): Output directory for stream files
- Returns:
List[Path]: Paths to generated stream files
Properties¶
tables¶
Mapping of table names to generated data file paths.
for table_name, file_path in benchmark.tables.items():
size_mb = Path(file_path).stat().st_size / 1024 / 1024
print(f"{table_name}: {size_mb:.2f} MB at {file_path}")
- Returns:
Dict[str, Path]: Dictionary mapping table names to file paths
Usage Examples¶
Basic Benchmark Run¶
from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter
# Create benchmark with scale factor 1 (~1GB)
benchmark = TPCH(scale_factor=1.0)
# Generate data
benchmark.generate_data()
# Run on DuckDB
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)
# Print results
print(f"Benchmark: {results.benchmark_name}")
print(f"Total time: {results.total_execution_time:.2f}s")
print(f"Queries: {results.successful_queries}/{results.total_queries}")
Multi-Platform Comparison¶
from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter
from benchbox.platforms.clickhouse import ClickHouseAdapter
benchmark = TPCH(scale_factor=1.0, output_dir="./data/tpch_sf1")
benchmark.generate_data()
platforms = {
"DuckDB": DuckDBAdapter(),
"ClickHouse": ClickHouseAdapter(host="localhost"),
}
for name, adapter in platforms.items():
results = benchmark.run_with_platform(adapter)
print(f"{name}: {results.total_execution_time:.2f}s")
Specific Query Execution¶
from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter
benchmark = TPCH(scale_factor=0.1)
benchmark.generate_data()
adapter = DuckDBAdapter()
conn = adapter.create_connection()
# Load data
adapter.create_schema(benchmark, conn)
adapter.load_data(benchmark, conn, benchmark.output_dir)
# Execute specific queries
for query_id in [1, 3, 6, 10]:
query = benchmark.get_query(query_id)
result = adapter.execute_query(conn, query, f"q{query_id}")
print(f"Q{query_id}: {result['execution_time']:.3f}s")
Scale Factor Comparison¶
import pandas as pd
from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter
scale_factors = [0.01, 0.1, 1.0]
results_data = []
adapter = DuckDBAdapter()
for sf in scale_factors:
benchmark = TPCH(scale_factor=sf)
benchmark.generate_data()
results = benchmark.run_with_platform(adapter)
results_data.append({
"scale_factor": sf,
"data_size_gb": sf * 1.0,
"total_time": results.total_execution_time,
"avg_query_time": results.average_query_time,
})
df = pd.DataFrame(results_data)
print(df)
Official Benchmark Tests¶
from benchbox.tpch import TPCH
benchmark = TPCH(scale_factor=1.0)
# Run official power test
power_results = benchmark.run_power_test(connection_factory)
# Run official throughput test
throughput_results = benchmark.run_throughput_test(connection_factory)
# Run official maintenance test
maintenance_results = benchmark.run_maintenance_test(connection_factory)
See Also¶
Benchmark APIs - Benchmark API overview
TPC-DS Benchmark API - TPC-DS benchmark API
Base Benchmark API - Base benchmark interface
Results API - Results API
TPC-H Benchmark - TPC-H guide
/tpch_official_benchmark_guide - Official benchmark guide