TPC-H Benchmark API

Tags reference python-api tpc-h

Complete Python API reference for the TPC-H benchmark.

Overview

The TPC-H benchmark simulates a decision support system with 22 analytical queries. It models a wholesale supplier with parts, suppliers, customers, and orders.

Key Features:

  • 22 analytical queries covering complex business scenarios

  • 8 tables with realistic relationships

  • Parametrized queries for randomization

  • Official TPC-H specification compliance

  • Multiple scale factors (SF=0.01 to 30000+)

Quick Start

from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter

# Create benchmark
benchmark = TPCH(scale_factor=1.0)

# Generate data
benchmark.generate_data()

# Run on platform
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)

print(f"Completed in {results.total_execution_time:.2f}s")

API Reference

TPCH Class

class TPCH(scale_factor=1.0, output_dir=None, **kwargs)[source]

Bases: BaseBenchmark

TPC-H benchmark implementation.

Provides TPC-H benchmark implementation, including data generation and access to the 22 benchmark queries.

Official specification: http://www.tpc.org/tpch

__init__(scale_factor=1.0, output_dir=None, **kwargs)[source]

Initialize TPC-H benchmark instance.

Parameters:
  • scale_factor (float) – Scale factor for the benchmark (1.0 = ~1GB)

  • output_dir (str | Path | None) – Directory to output generated data files

  • **kwargs (Any) – Additional implementation-specific options

Raises:
  • ValueError – If scale_factor is not positive

  • TypeError – If scale_factor is not a number

generate_data()[source]

Generate TPC-H benchmark data.

Returns:

A list of paths to the generated data files

Return type:

list[str | Path]

get_queries(dialect=None, base_dialect=None)[source]

Get all TPC-H benchmark queries.

Parameters:

dialect (str | None) – Target SQL dialect for translation (e.g., ‘duckdb’, ‘bigquery’, ‘snowflake’) If None, returns queries in their original format.

Returns:

A dictionary mapping query IDs (1-22) to query strings

Return type:

dict[str, str]

get_query(query_id, *, params=None, seed=None, scale_factor=None, dialect=None, base_dialect=None, **kwargs)[source]

Get a specific TPC-H benchmark query.

Parameters:
  • query_id (int) – The ID of the query to retrieve (1-22)

  • params (dict[str, Any] | None) – Optional parameters to customize the query (legacy parameter, mostly ignored)

  • seed (int | None) – Random number generator seed for parameter generation

  • scale_factor (float | None) – Scale factor for parameter calculations

  • dialect (str | None) – Target SQL dialect

  • base_dialect (str | None) – Source SQL dialect (default: netezza)

  • **kwargs – Additional parameters

Returns:

The query string

Raises:
  • ValueError – If the query_id is invalid

  • TypeError – If query_id is not an integer

Return type:

str

get_schema()[source]

Get the TPC-H schema.

Returns:

A list of dictionaries describing the tables in the schema

Return type:

list[dict]

get_create_tables_sql(dialect='standard', tuning_config=None)[source]

Get SQL to create all TPC-H tables.

Parameters:
  • dialect (str) – SQL dialect to use (currently ignored, TPC-H uses standard SQL)

  • tuning_config – Unified tuning configuration for constraint settings

Returns:

SQL script for creating all tables

Return type:

str

generate_streams(num_streams=1, rng_seed=None, streams_output_dir=None)[source]

Generate TPC-H query streams.

Parameters:
  • num_streams (int) – Number of concurrent streams to generate

  • rng_seed (int | None) – Random number generator seed for parameter generation

  • streams_output_dir (str | Path | None) – Directory to output stream files

Returns:

List of paths to generated stream files

Return type:

list[Path]

get_stream_info(stream_id)[source]

Get information about a specific stream.

Parameters:

stream_id (int) – Stream identifier

Returns:

Dictionary containing stream information

Return type:

dict[str, Any]

get_all_streams_info()[source]

Get information about all streams.

Returns:

List of dictionaries containing stream information

Return type:

list[dict[str, Any]]

property tables: dict[str, Path]

Get the mapping of table names to data file paths.

Returns:

Dictionary mapping table names to paths of generated data files

run_official_benchmark(connection_factory, config=None)[source]

Run the official TPC-H benchmark.

This method provides compatibility for official benchmark examples.

Parameters:
  • connection_factory – Factory function or connection object

  • config – Optional configuration parameters

Returns:

Dictionary with benchmark results

run_power_test(connection_factory, config=None)[source]

Run the TPC-H power test.

This method provides compatibility for power test examples.

Parameters:
  • connection_factory – Factory function or connection object

  • config – Optional configuration parameters

Returns:

Dictionary with power test results

run_maintenance_test(connection_factory, config=None)[source]

Run the TPC-H maintenance test.

This method provides compatibility for maintenance test examples.

Parameters:
  • connection_factory – Factory function or connection object

  • config – Optional configuration parameters

Returns:

Dictionary with maintenance test results

apply_verbosity(settings)

Apply verbosity settings to the mixin consumer.

property benchmark_name: str

Get the human-readable benchmark name.

create_enhanced_benchmark_result(platform, query_results, execution_metadata=None, phases=None, resource_utilization=None, performance_characteristics=None, **kwargs)

Create a BenchmarkResults object with standardized fields.

This centralizes the logic for creating benchmark results that was previously duplicated across platform adapters and CLI orchestrator.

Parameters:
  • platform (str) – Platform name (e.g., “DuckDB”, “ClickHouse”)

  • query_results (list[dict[str, Any]]) – List of query execution results

  • execution_metadata (dict[str, Any] | None) – Optional execution metadata

  • phases (dict[str, dict[str, Any]] | None) – Optional phase tracking information

  • resource_utilization (dict[str, Any] | None) – Optional resource usage metrics

  • performance_characteristics (dict[str, Any] | None) – Optional performance analysis

  • **kwargs (Any) – Additional fields to override defaults

Returns:

Fully configured BenchmarkResults object

Return type:

BenchmarkResults

format_results(benchmark_result)

Format benchmark results for display.

Parameters:

benchmark_result (dict[str, Any]) – Result dictionary from run_benchmark()

Returns:

Formatted string representation of the results

Return type:

str

get_data_source_benchmark()

Return the canonical source benchmark when data is shared.

Benchmarks that reuse data generated by another benchmark (for example, Primitives reusing TPC-H datasets) should override this method and return the lower-case identifier of the source benchmark. Benchmarks that produce their own data should return None (default).

log_debug_info(context='Debug')

Log comprehensive debug information including version details.

log_error_with_debug_info(error, context='Error')

Log an error with comprehensive debug information.

log_operation_complete(operation, duration=None, details='')
log_operation_start(operation, details='')
log_verbose(message)
log_version_warning()

Log version consistency warnings if any exist.

log_very_verbose(message)
property logger: Logger

Return the logger configured for the verbosity mixin consumer.

quiet: bool = False
run_benchmark(connection, query_ids=None, fetch_results=False, setup_database=True)

Run the complete benchmark suite.

Parameters:
  • connection (DatabaseConnection) – Database connection to execute queries on

  • query_ids (list[int | str] | None) – Optional list of specific query IDs to run (defaults to all)

  • fetch_results (bool) – Whether to fetch and return query results

  • setup_database (bool) – Whether to set up the database first

Returns:

  • benchmark_name: Name of the benchmark

  • total_execution_time: Total time for all queries

  • total_queries: Number of queries executed

  • successful_queries: Number of queries that succeeded

  • failed_queries: Number of queries that failed

  • query_results: List of individual query results

  • setup_time: Time taken for database setup (if performed)

Return type:

Dictionary containing

Raises:

Exception – If benchmark execution fails

run_query(query_id, connection, params=None, fetch_results=False)

Execute single query and return timing and results.

Parameters:
  • query_id (int | str) – ID of the query to execute

  • connection (DatabaseConnection) – Database connection to execute query on

  • params (dict[str, Any] | None) – Optional parameters for query customization

  • fetch_results (bool) – Whether to fetch and return query results

Returns:

  • query_id: Executed query ID

  • execution_time: Time taken to execute query in seconds

  • query_text: Executed query text

  • results: Query results if fetch_results=True, otherwise None

  • row_count: Number of rows returned (if results fetched)

Return type:

Dictionary containing

Raises:
  • ValueError – If query_id is invalid

  • Exception – If query execution fails

run_with_platform(platform_adapter, **run_config)

Run complete benchmark using platform-specific optimizations.

This method provides a unified interface for running benchmarks using database platform adapters that handle connection management, data loading optimizations, and query execution.

This is the standard method that all benchmarks should support for integration with the CLI and other orchestration tools.

Parameters:
  • platform_adapter – Platform adapter instance (e.g., DuckDBAdapter)

  • **run_config – Configuration options: - categories: List of query categories to run (if benchmark supports) - query_subset: List of specific query IDs to run - connection: Connection configuration - benchmark_type: Type hint for optimizations (‘olap’, ‘oltp’, etc.)

Returns:

BenchmarkResults object with execution results

Example

from benchbox.platforms import DuckDBAdapter

benchmark = SomeBenchmark(scale_factor=0.1) adapter = DuckDBAdapter() results = benchmark.run_with_platform(adapter)

setup_database(connection)

Set up database with schema and data.

Creates necessary database schema and loads benchmark data into the database.

Parameters:

connection (DatabaseConnection) – Database connection to set up

Raises:
  • ValueError – If data generation fails

  • Exception – If database setup fails

translate_query(query_id, dialect)

Translate a query to a specific SQL dialect.

Parameters:
  • query_id (int | str) – The ID of the query to translate

  • dialect (str) – The target SQL dialect

Returns:

The translated query string

Raises:
  • ValueError – If the query_id is invalid

  • ImportError – If sqlglot is not installed

  • ValueError – If the dialect is not supported

Return type:

str

verbose: bool = False
verbose_enabled: bool = False
verbose_level: int = 0
property verbosity_settings: VerbositySettings

Return the current verbosity settings.

very_verbose: bool = False

Constructor

TPCH(
    scale_factor: float = 1.0,
    output_dir: Optional[Union[str, Path]] = None,
    **kwargs
)

Parameters:

  • scale_factor (float): Data size multiplier. SF=1.0 generates ~1GB of data. Range: 0.01 to 30000+

  • output_dir (str|Path, optional): Directory for generated data files. Default: temporary directory

  • kwargs: Additional options (e.g., parallel=True for parallel generation)

Raises:

  • ValueError: If scale_factor is not positive

  • TypeError: If scale_factor is not a number

Methods

generate_data()

Generate TPC-H benchmark data.

data_files = benchmark.generate_data()
print(f"Generated {len(data_files)} table files")
Returns:

List[Union[str, Path]]: Paths to generated data files

get_query(query_id, *, params=None, seed=None, scale_factor=None, dialect=None)

Get a specific TPC-H query.

# Get query 1
q1 = benchmark.get_query(1)

# Get with dialect translation
q1_bq = benchmark.get_query(1, dialect="bigquery")

# Get with custom parameters
q1_param = benchmark.get_query(1, seed=42, scale_factor=10.0)

Parameters:

  • query_id (int): Query ID (1-22)

  • params (dict, optional): Query parameters (legacy, mostly ignored)

  • seed (int, optional): RNG seed for parameter generation

  • scale_factor (float, optional): Override default scale factor

  • dialect (str, optional): Target SQL dialect (duckdb, bigquery, snowflake, etc.)

Returns:

str: Query SQL text

Raises:

  • ValueError: If query_id not in range 1-22

  • TypeError: If query_id is not an integer

get_queries(dialect=None, base_dialect=None)

Get all TPC-H queries.

# Get all queries
queries = benchmark.get_queries()
print(f"Total queries: {len(queries)}")

# Get with dialect translation
queries_bq = benchmark.get_queries(dialect="bigquery")

Parameters:

  • dialect (str, optional): Target SQL dialect

  • base_dialect (str, optional): Source dialect for translation

Returns:

Dict[str, str]: Dictionary mapping query IDs to SQL text

get_schema()

Get TPC-H schema information.

schema = benchmark.get_schema()
for table in schema:
    print(f"{table['name']}: {len(table['columns'])} columns")
Returns:

List[dict]: List of table definitions with columns and types

get_create_tables_sql(dialect=”standard”, tuning_config=None)

Get CREATE TABLE SQL for all tables.

# Standard SQL
create_sql = benchmark.get_create_tables_sql()

# With dialect
create_sql_bq = benchmark.get_create_tables_sql(dialect="bigquery")

# With tuning configuration
from benchbox.core.tuning.interface import UnifiedTuningConfiguration
tuning = UnifiedTuningConfiguration(...)
create_sql_tuned = benchmark.get_create_tables_sql(tuning_config=tuning)

Parameters:

  • dialect (str): Target SQL dialect. Default: “standard”

  • tuning_config (UnifiedTuningConfiguration, optional): Tuning settings

Returns:

str: SQL script for creating all tables

generate_streams(num_streams=1, rng_seed=None, streams_output_dir=None)

Generate query streams for throughput testing.

# Generate 5 concurrent streams
streams = benchmark.generate_streams(
    num_streams=5,
    rng_seed=42,
    streams_output_dir="./streams"
)

for stream_path in streams:
    print(f"Stream: {stream_path}")

Parameters:

  • num_streams (int): Number of streams to generate. Default: 1

  • rng_seed (int, optional): Seed for reproducible streams

  • streams_output_dir (str|Path, optional): Output directory for stream files

Returns:

List[Path]: Paths to generated stream files

Properties

tables

Mapping of table names to generated data file paths.

for table_name, file_path in benchmark.tables.items():
    size_mb = Path(file_path).stat().st_size / 1024 / 1024
    print(f"{table_name}: {size_mb:.2f} MB at {file_path}")
Returns:

Dict[str, Path]: Dictionary mapping table names to file paths

Usage Examples

Basic Benchmark Run

from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter

# Create benchmark with scale factor 1 (~1GB)
benchmark = TPCH(scale_factor=1.0)

# Generate data
benchmark.generate_data()

# Run on DuckDB
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)

# Print results
print(f"Benchmark: {results.benchmark_name}")
print(f"Total time: {results.total_execution_time:.2f}s")
print(f"Queries: {results.successful_queries}/{results.total_queries}")

Multi-Platform Comparison

from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter
from benchbox.platforms.clickhouse import ClickHouseAdapter

benchmark = TPCH(scale_factor=1.0, output_dir="./data/tpch_sf1")
benchmark.generate_data()

platforms = {
    "DuckDB": DuckDBAdapter(),
    "ClickHouse": ClickHouseAdapter(host="localhost"),
}

for name, adapter in platforms.items():
    results = benchmark.run_with_platform(adapter)
    print(f"{name}: {results.total_execution_time:.2f}s")

Specific Query Execution

from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter

benchmark = TPCH(scale_factor=0.1)
benchmark.generate_data()

adapter = DuckDBAdapter()
conn = adapter.create_connection()

# Load data
adapter.create_schema(benchmark, conn)
adapter.load_data(benchmark, conn, benchmark.output_dir)

# Execute specific queries
for query_id in [1, 3, 6, 10]:
    query = benchmark.get_query(query_id)
    result = adapter.execute_query(conn, query, f"q{query_id}")
    print(f"Q{query_id}: {result['execution_time']:.3f}s")

Scale Factor Comparison

import pandas as pd
from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter

scale_factors = [0.01, 0.1, 1.0]
results_data = []

adapter = DuckDBAdapter()

for sf in scale_factors:
    benchmark = TPCH(scale_factor=sf)
    benchmark.generate_data()

    results = benchmark.run_with_platform(adapter)

    results_data.append({
        "scale_factor": sf,
        "data_size_gb": sf * 1.0,
        "total_time": results.total_execution_time,
        "avg_query_time": results.average_query_time,
    })

df = pd.DataFrame(results_data)
print(df)

Official Benchmark Tests

from benchbox.tpch import TPCH

benchmark = TPCH(scale_factor=1.0)

# Run official power test
power_results = benchmark.run_power_test(connection_factory)

# Run official throughput test
throughput_results = benchmark.run_throughput_test(connection_factory)

# Run official maintenance test
maintenance_results = benchmark.run_maintenance_test(connection_factory)

See Also