"""TPC-H benchmark implementation.
Copyright 2026 Joe Harris / BenchBox Project
TPC Benchmark™ H (TPC-H) - Copyright © Transaction Processing Performance Council
This implementation is based on the TPC-H specification.
Licensed under the MIT License. See LICENSE file in the project root for details.
"""
from pathlib import Path
from typing import Any, Optional, Union
from benchbox.base import BaseBenchmark
from benchbox.core.tpch.benchmark import TPCHBenchmark
[docs]
class TPCH(BaseBenchmark):
"""TPC-H benchmark implementation.
Provides TPC-H benchmark implementation, including data generation and access to the 22 benchmark queries.
Official specification: http://www.tpc.org/tpch
"""
[docs]
def __init__(
self,
scale_factor: float = 1.0,
output_dir: Optional[Union[str, Path]] = None,
**kwargs: Any,
) -> None:
"""Initialize TPC-H benchmark instance.
Args:
scale_factor: Scale factor for the benchmark (1.0 = ~1GB)
output_dir: Directory to output generated data files
**kwargs: Additional implementation-specific options
Raises:
ValueError: If scale_factor is not positive
TypeError: If scale_factor is not a number
"""
# Validate scale_factor type (positivity already checked in base class)
self._validate_scale_factor_type(scale_factor)
super().__init__(scale_factor=scale_factor, output_dir=output_dir, **kwargs)
# Initialize the actual implementation using common pattern
self._initialize_benchmark_implementation(TPCHBenchmark, scale_factor, output_dir, **kwargs)
[docs]
def generate_data(self) -> list[Union[str, Path]]:
"""Generate TPC-H benchmark data.
Returns:
A list of paths to the generated data files
"""
return self._impl.generate_data()
[docs]
def get_queries(self, dialect: Optional[str] = None, base_dialect: Optional[str] = None) -> dict[str, str]:
"""Get all TPC-H benchmark queries.
Args:
dialect: Target SQL dialect for translation (e.g., 'duckdb', 'bigquery', 'snowflake')
If None, returns queries in their original format.
Returns:
A dictionary mapping query IDs (1-22) to query strings
"""
return self._impl.get_queries(dialect=dialect, base_dialect=base_dialect)
[docs]
def get_query(
self,
query_id: int,
*,
params: Optional[dict[str, Any]] = None,
seed: Optional[int] = None,
scale_factor: Optional[float] = None,
dialect: Optional[str] = None,
base_dialect: Optional[str] = None,
**kwargs,
) -> str:
"""Get a specific TPC-H benchmark query.
Args:
query_id: The ID of the query to retrieve (1-22)
params: Optional parameters to customize the query (legacy parameter, mostly ignored)
seed: Random number generator seed for parameter generation
scale_factor: Scale factor for parameter calculations
dialect: Target SQL dialect
base_dialect: Source SQL dialect (default: netezza)
**kwargs: Additional parameters
Returns:
The query string
Raises:
ValueError: If the query_id is invalid
TypeError: If query_id is not an integer
"""
# Validate query_id to match TPC-DS patterns
if not isinstance(query_id, int):
raise TypeError(f"query_id must be an integer, got {type(query_id).__name__}")
if not (1 <= query_id <= 22):
raise ValueError(f"Query ID must be 1-22, got {query_id}")
# Validate scale_factor if provided
if scale_factor is not None:
if not isinstance(scale_factor, (int, float)):
raise TypeError(f"scale_factor must be a number, got {type(scale_factor).__name__}")
if scale_factor <= 0:
raise ValueError(f"scale_factor must be positive, got {scale_factor}")
# Validate seed if provided
if seed is not None and not isinstance(seed, int):
raise TypeError(f"seed must be an integer, got {type(seed).__name__}")
return self._impl.get_query(
query_id,
params=params,
seed=seed,
scale_factor=scale_factor,
dialect=dialect,
base_dialect=base_dialect,
**kwargs,
)
[docs]
def get_schema(self) -> list[dict]:
"""Get the TPC-H schema.
Returns:
A list of dictionaries describing the tables in the schema
"""
return self._impl.get_schema()
[docs]
def get_create_tables_sql(self, dialect: str = "standard", tuning_config=None) -> str:
"""Get SQL to create all TPC-H tables.
Args:
dialect: SQL dialect to use (currently ignored, TPC-H uses standard SQL)
tuning_config: Unified tuning configuration for constraint settings
Returns:
SQL script for creating all tables
"""
return self._impl.get_create_tables_sql(dialect=dialect, tuning_config=tuning_config)
[docs]
def generate_streams(
self,
num_streams: int = 1,
rng_seed: Optional[int] = None,
streams_output_dir: Optional[Union[str, Path]] = None,
) -> list[Path]:
"""Generate TPC-H query streams.
Args:
num_streams: Number of concurrent streams to generate
rng_seed: Random number generator seed for parameter generation
streams_output_dir: Directory to output stream files
Returns:
List of paths to generated stream files
"""
return self._impl.generate_streams(
num_streams=num_streams,
rng_seed=rng_seed,
streams_output_dir=streams_output_dir,
)
[docs]
def get_stream_info(self, stream_id: int) -> dict[str, Any]:
"""Get information about a specific stream.
Args:
stream_id: Stream identifier
Returns:
Dictionary containing stream information
"""
return self._impl.get_stream_info(stream_id)
[docs]
def get_all_streams_info(self) -> list[dict[str, Any]]:
"""Get information about all streams.
Returns:
List of dictionaries containing stream information
"""
return self._impl.get_all_streams_info()
@property
def tables(self) -> dict[str, Path]:
"""Get the mapping of table names to data file paths.
Returns:
Dictionary mapping table names to paths of generated data files
"""
return getattr(self._impl, "tables", {})
[docs]
def run_official_benchmark(self, connection_factory, config=None):
"""Run the official TPC-H benchmark.
This method provides compatibility for official benchmark examples.
Args:
connection_factory: Factory function or connection object
config: Optional configuration parameters
Returns:
Dictionary with benchmark results
"""
try:
from benchbox.core.tpch.official_benchmark import TPCHOfficialBenchmark
official = TPCHOfficialBenchmark(self)
return official.run_official_benchmark(connection_factory, config)
except ImportError:
# Fallback to standard benchmark run
connection = connection_factory() if callable(connection_factory) else connection_factory
# Extract connection string if it's a connection object
if hasattr(connection, "execute"):
# Assume it's a database connection, use a placeholder string
pass
else:
str(connection)
# Since connection_string methods were removed, return a basic result
return {
"status": "fallback",
"message": "Use adapter.run_benchmark() instead",
}
[docs]
def run_power_test(self, connection_factory, config=None):
"""Run the TPC-H power test.
This method provides compatibility for power test examples.
Args:
connection_factory: Factory function or connection object
config: Optional configuration parameters
Returns:
Dictionary with power test results
"""
try:
from benchbox.core.tpch.power_test import TPCHPowerTest
# Pass config options to constructor if provided
kwargs = config if config else {}
power_test = TPCHPowerTest(self, connection_factory, **kwargs)
return power_test.run()
except ImportError:
# Fallback to running all queries once
connection = connection_factory() if callable(connection_factory) else connection_factory
if hasattr(connection, "execute"):
pass
else:
str(connection)
# Since connection_string methods were removed, return a basic result
return {
"status": "fallback",
"message": "Use adapter.run_benchmark() instead",
}
[docs]
def run_maintenance_test(self, connection_factory, config=None):
"""Run the TPC-H maintenance test.
This method provides compatibility for maintenance test examples.
Args:
connection_factory: Factory function or connection object
config: Optional configuration parameters
Returns:
Dictionary with maintenance test results
"""
try:
from benchbox.core.tpch.maintenance_test import TPCHMaintenanceTest
maint_test = TPCHMaintenanceTest(self, connection_factory)
return maint_test.run(config)
except ImportError:
# Fallback to basic functionality
connection = connection_factory() if callable(connection_factory) else connection_factory
# For maintenance test, we simulate refresh functions
refresh_results = {
"refresh_function_1": {
"status": "completed",
"rows_inserted": 150,
"duration": 0.5,
},
"refresh_function_2": {
"status": "completed",
"rows_deleted": 75,
"duration": 0.3,
},
}
if hasattr(connection, "execute"):
pass
else:
str(connection)
# Since connection_string methods were removed, return a basic result
return {
"status": "fallback",
"message": "Use adapter.run_benchmark() instead",
"refresh_functions": refresh_results,
}