Source code for benchbox.tpcdi

"""TPC-DI (Data Integration) benchmark implementation.

Copyright 2026 Joe Harris / BenchBox Project

TPC Benchmark™ DI (TPC-DI) - Copyright © Transaction Processing Performance Council
This implementation is based on the TPC-DI specification.

Licensed under the MIT License. See LICENSE file in the project root for details.
"""

from pathlib import Path
from typing import Any, Optional, Union

from benchbox.base import BaseBenchmark
from benchbox.core.tpcdi.benchmark import TPCDIBenchmark


[docs] class TPCDI(BaseBenchmark): """TPC-DI benchmark implementation. This class provides an implementation of the TPC-DI benchmark, including data generation and access to validation and analytical queries. Official specification: http://www.tpc.org/tpcdi """
[docs] def __init__( self, scale_factor: float = 1.0, output_dir: Optional[Union[str, Path]] = None, **kwargs, ) -> None: """Initialize a TPC-DI benchmark instance. Args: scale_factor: Scale factor for the benchmark (1.0 = ~1GB) output_dir: Directory to output generated data files **kwargs: Additional implementation-specific options """ super().__init__(scale_factor=scale_factor, output_dir=output_dir, **kwargs) # Initialize the actual implementation verbose = kwargs.pop("verbose", False) self._impl = TPCDIBenchmark(scale_factor=scale_factor, output_dir=output_dir, verbose=verbose, **kwargs)
[docs] def generate_data(self) -> list[Union[str, Path]]: """Generate TPC-DI benchmark data. Returns: A list of paths to the generated data files """ return self._impl.generate_data()
[docs] def get_queries(self, dialect: Optional[str] = None) -> dict[str, str]: """Get all TPC-DI benchmark queries. Args: dialect: Target SQL dialect for query translation. If None, returns original queries. Returns: A dictionary mapping query IDs to query strings """ return self._impl.get_queries(dialect=dialect)
[docs] def get_query(self, query_id: Union[int, str], *, params: Optional[dict[str, Any]] = None) -> str: """Get a specific TPC-DI benchmark query. Args: query_id: The ID of the query to retrieve params: Optional parameters to customize the query Returns: The query string Raises: ValueError: If the query_id is invalid """ return self._impl.get_query(query_id, params=params)
[docs] def get_schema(self, dialect: str = "standard") -> dict[str, dict[str, Any]]: """Get the TPC-DI schema. Args: dialect: Target SQL dialect Returns: A dictionary mapping table names to table definitions """ return self._impl.get_schema(dialect)
[docs] def get_create_tables_sql(self, dialect: str = "standard", tuning_config=None) -> str: """Get SQL to create all TPC-DI tables. Args: dialect: SQL dialect to use tuning_config: Unified tuning configuration for constraint settings Returns: SQL script for creating all tables """ return self._impl.get_create_tables_sql(dialect=dialect, tuning_config=tuning_config)
# ETL methods
[docs] def generate_source_data( self, formats: Optional[list[str]] = None, batch_types: Optional[list[str]] = None, ) -> dict[str, list[str]]: """Generate source data in various formats for ETL processing. Args: formats: List of data formats to generate (csv, xml, fixed_width, json) batch_types: List of batch types to generate (historical, incremental, scd) Returns: Dictionary mapping formats to lists of generated file paths """ return self._impl.generate_source_data(formats, batch_types)
[docs] def run_etl_pipeline( self, connection: Any, batch_type: str = "historical", validate_data: bool = True, ) -> dict[str, Any]: """Run the complete ETL pipeline for TPC-DI. Args: connection: Database connection for target warehouse batch_type: Type of batch to process (historical, incremental, scd) validate_data: Whether to run data validation after ETL Returns: Dictionary containing ETL execution results and metrics """ return self._impl.run_etl_pipeline(connection, batch_type, validate_data)
[docs] def validate_etl_results(self, connection: Any) -> dict[str, Any]: """Validate ETL results using data quality checks. Args: connection: Database connection to validate against Returns: Dictionary containing validation results and data quality metrics """ return self._impl.validate_etl_results(connection)
[docs] def get_etl_status(self) -> dict[str, Any]: """Get current ETL processing status and metrics. Returns: Dictionary containing ETL status, metrics, and batch information """ return self._impl.get_etl_status()
@property def etl_mode(self) -> bool: """Check if ETL mode is enabled. Returns: Always True as TPC-DI is now a pure ETL benchmark """ return True
[docs] def load_data_to_database(self, connection: Any, tables: Optional[list[str]] = None) -> None: """Load generated data into a database. Args: connection: Database connection tables: Optional list of tables to load. If None, loads all. Raises: ValueError: If data hasn't been generated yet """ return self._impl.load_data_to_database(connection, tables)
[docs] def run_benchmark( self, connection: Any, queries: Optional[list[str]] = None, iterations: int = 1 ) -> dict[str, Any]: """Run the complete TPC-DI benchmark. Args: connection: Database connection to use queries: Optional list of query IDs to run. If None, runs all. iterations: Number of times to run each query Returns: Dictionary containing benchmark results """ return self._impl.run_benchmark(connection, queries, iterations)
[docs] def execute_query( self, query_id: Union[int, str], connection: Any, params: Optional[dict[str, Any]] = None, ) -> Any: """Execute a TPC-DI query on the given database connection. Args: query_id: Query identifier (e.g., "V1", "V2", "A1", etc.) connection: Database connection to use for execution params: Optional parameters to use in the query Returns: Query results from the database Raises: ValueError: If the query_id is not valid """ return self._impl.execute_query(query_id, connection, params)
# Simplified public API
[docs] def create_schema(self, connection: Any, dialect: str = "duckdb") -> None: """Create TPC-DI schema using the schema manager. Args: connection: Database connection dialect: Target SQL dialect """ return self._impl.create_schema(connection, dialect)
[docs] def run_full_benchmark(self, connection: Any, dialect: str = "duckdb") -> dict[str, Any]: """Run the complete TPC-DI benchmark with all phases. This is the main entry point for running a complete TPC-DI benchmark including schema creation, data loading, ETL processing, validation, and metrics calculation. Args: connection: Database connection dialect: SQL dialect for the target database Returns: Complete benchmark results with all metrics """ return self._impl.run_full_benchmark(connection, dialect)
[docs] def run_etl_benchmark(self, connection: Any, dialect: str = "duckdb") -> Any: """Run the ETL benchmark pipeline. Args: connection: Database connection dialect: SQL dialect Returns: ETL execution results """ return self._impl.run_etl_benchmark(connection, dialect)
[docs] def run_data_validation(self, connection: Any) -> Any: """Run data quality validation. Args: connection: Database connection Returns: Data quality validation results """ return self._impl.run_data_validation(connection)
[docs] def calculate_official_metrics(self, etl_result: Any, validation_result: Any) -> Any: """Calculate official TPC-DI metrics. Args: etl_result: ETL execution results validation_result: Data validation results Returns: Official TPC-DI benchmark metrics """ return self._impl.calculate_official_metrics(etl_result, validation_result)
[docs] def optimize_database(self, connection: Any) -> dict[str, Any]: """Optimize database performance for TPC-DI queries. Args: connection: Database connection Returns: Optimization results """ return self._impl.optimize_database(connection)
@property def validator(self) -> Any: """Get the TPC-DI validator instance. Returns: TPCDIValidator instance """ return self._impl.validator @property def schema_manager(self) -> Any: """Get the TPC-DI schema manager instance. Returns: TPCDISchemaManager instance """ return self._impl.schema_manager @property def metrics_calculator(self) -> Any: """Get the TPC-DI metrics calculator instance. Returns: TPCDIMetrics instance """ return self._impl.metrics_calculator