Source code for benchbox.tpcds

"""TPC-DS benchmark implementation.

Copyright 2026 Joe Harris / BenchBox Project

TPC Benchmark™ DS (TPC-DS) - Copyright © Transaction Processing Performance Council
This implementation is based on the TPC-DS specification.

Licensed under the MIT License. See LICENSE file in the project root for details.
"""

from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Union

if TYPE_CHECKING:
    from benchbox.core.tpcds.generator.manager import TPCDSDataGenerator
    from benchbox.core.tpcds.queries import TPCDSQueryManager

from benchbox.base import BaseBenchmark
from benchbox.core.tpcds.benchmark import TPCDSBenchmark


[docs] class TPCDS(BaseBenchmark): """TPC-DS benchmark implementation. Provides TPC-DS benchmark implementation, including data generation and access to the benchmark queries. Official specification: http://www.tpc.org/tpcds """
[docs] def __init__( self, scale_factor: float = 1.0, output_dir: Optional[Union[str, Path]] = None, **kwargs: Any, ) -> None: """Initialize TPC-DS benchmark instance. Args: scale_factor: Scale factor for the benchmark (1.0 = ~1GB) output_dir: Directory to output generated data files **kwargs: Additional implementation-specific options Raises: ValueError: If scale_factor is not positive TypeError: If scale_factor is not a number """ # Validate scale_factor type (positivity already checked in base class) self._validate_scale_factor_type(scale_factor) super().__init__(scale_factor=scale_factor, output_dir=output_dir, **kwargs) # Initialize the actual implementation using common pattern self._initialize_benchmark_implementation(TPCDSBenchmark, scale_factor, output_dir, **kwargs)
[docs] def generate_data(self) -> list[Union[str, Path]]: """Generate TPC-DS benchmark data. Returns: A list of paths to the generated data files """ return self._impl.generate_data()
[docs] def get_queries(self, dialect: Optional[str] = None, base_dialect: Optional[str] = None) -> dict[str, str]: """Get all TPC-DS benchmark queries. Args: dialect: Target SQL dialect for translation (e.g., 'duckdb', 'postgres') Returns: A dictionary mapping query IDs to query strings """ return self._impl.get_queries(dialect=dialect, base_dialect=base_dialect)
[docs] def get_query( self, query_id: int, *, params: Optional[dict[str, Any]] = None, seed: Optional[int] = None, scale_factor: Optional[float] = None, dialect: Optional[str] = None, **kwargs, ) -> str: """Get a specific TPC-DS benchmark query. Args: query_id: The ID of the query to retrieve (1-99) params: Optional parameters to customize the query (legacy parameter, mostly ignored) seed: Random number generator seed for parameter generation scale_factor: Scale factor for parameter calculations dialect: Target SQL dialect **kwargs: Additional parameters Returns: The query string Raises: ValueError: If the query_id is invalid TypeError: If query_id is not an integer """ # Validate query_id to match TPC-H patterns if not isinstance(query_id, int): raise TypeError(f"query_id must be an integer, got {type(query_id).__name__}") if not (1 <= query_id <= 99): raise ValueError(f"Query ID must be 1-99, got {query_id}") # Validate scale_factor if provided if scale_factor is not None: if not isinstance(scale_factor, (int, float)): raise TypeError(f"scale_factor must be a number, got {type(scale_factor).__name__}") if scale_factor <= 0: raise ValueError(f"scale_factor must be positive, got {scale_factor}") # Validate seed if provided if seed is not None and not isinstance(seed, int): raise TypeError(f"seed must be an integer, got {type(seed).__name__}") return self._impl.get_query( query_id, params=params, seed=seed, scale_factor=scale_factor, dialect=dialect, **kwargs, )
@property def queries(self) -> "TPCDSQueryManager": """Access to the query manager. Returns: The underlying query manager instance """ return self._impl.query_manager @property def generator(self) -> "TPCDSDataGenerator": """Access to the data generator. Returns: The underlying data generator instance """ return self._impl.data_generator
[docs] def get_available_tables(self) -> list[str]: """Get list of available tables. Returns: List of table names """ return self._impl.get_available_tables()
[docs] def get_available_queries(self) -> list[int]: """Get list of available query IDs. Returns: List of query IDs (1-99) """ return self._impl.get_available_queries()
[docs] def generate_table_data(self, table_name: str, output_dir: Optional[str] = None) -> str: """Generate data for a specific table. Args: table_name: Name of the table to generate data for output_dir: Optional output directory for generated data Returns: Iterator of data rows for the table """ return self._impl.generate_table_data(table_name, output_dir)
[docs] def get_schema(self) -> list[dict]: """Get the TPC-DS schema. Returns: A list of dictionaries describing the tables in the schema """ return self._impl.get_schema()
[docs] def get_create_tables_sql(self, dialect: str = "standard", tuning_config=None) -> str: """Get SQL to create all TPC-DS tables. Args: dialect: SQL dialect to use (currently ignored, TPC-DS uses standard SQL) tuning_config: Unified tuning configuration for constraint settings Returns: SQL script for creating all tables """ return self._impl.get_create_tables_sql(dialect=dialect, tuning_config=tuning_config)
[docs] def generate_streams( self, num_streams: int = 1, rng_seed: Optional[int] = None, streams_output_dir: Optional[Union[str, Path]] = None, ) -> list[Path]: """Generate TPC-DS query streams. Args: num_streams: Number of concurrent streams to generate rng_seed: Random number generator seed for parameter generation streams_output_dir: Directory to output stream files Returns: List of paths to generated stream files """ return self._impl.generate_streams( num_streams=num_streams, rng_seed=rng_seed, streams_output_dir=streams_output_dir, )
[docs] def get_stream_info(self, stream_id: int) -> dict[str, Any]: """Get information about a specific stream. Args: stream_id: Stream identifier Returns: Dictionary containing stream information """ return self._impl.get_stream_info(stream_id)
[docs] def get_all_streams_info(self) -> list[dict[str, Any]]: """Get information about all streams. Returns: List of dictionaries containing stream information """ return self._impl.get_all_streams_info()
[docs] def get_benchmark_info(self) -> dict[str, Any]: """Get benchmark information. Returns: Dictionary with benchmark information including name, scale factor, available tables, queries, and C tools info """ return self._impl.get_benchmark_info()