TPC-DI Benchmark API

Tags reference python-api tpc-di

Complete Python API reference for the TPC-DI (Data Integration) benchmark.

Overview

The TPC-DI benchmark evaluates data integration and ETL (Extract, Transform, Load) processes in data warehousing scenarios. It models a financial services environment with customer data, trading activities, and complex transformation logic including slowly changing dimensions (SCD).

Key Features:

  • Data Integration focus - Tests ETL processes, not just queries

  • Financial services domain - Trading, customer, and company data

  • Slowly Changing Dimensions - SCD Type 1 and Type 2 implementations

  • Data quality validation - Systematic validation queries

  • Multiple data sources - CSV, XML, fixed-width formats

  • Historical and incremental loading - Full loads and updates

  • Complex transformations - Business rules and data cleansing

  • Audit and lineage tracking - Data governance capabilities

Quick Start

from benchbox.tpcdi import TPCDI
from benchbox.platforms.duckdb import DuckDBAdapter

# Create benchmark
benchmark = TPCDI(scale_factor=1.0)

# Generate data
benchmark.generate_data()

# Run on platform
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)

print(f"Completed in {results.total_execution_time:.2f}s")

API Reference

TPCDI Class

class TPCDI(scale_factor=1.0, output_dir=None, **kwargs)[source]

Bases: BaseBenchmark

TPC-DI benchmark implementation.

This class provides an implementation of the TPC-DI benchmark, including data generation and access to validation and analytical queries.

Official specification: http://www.tpc.org/tpcdi

__init__(scale_factor=1.0, output_dir=None, **kwargs)[source]

Initialize a TPC-DI benchmark instance.

Parameters:
  • scale_factor (float) – Scale factor for the benchmark (1.0 = ~1GB)

  • output_dir (str | Path | None) – Directory to output generated data files

  • **kwargs – Additional implementation-specific options

generate_data()[source]

Generate TPC-DI benchmark data.

Returns:

A list of paths to the generated data files

Return type:

list[str | Path]

get_queries(dialect=None)[source]

Get all TPC-DI benchmark queries.

Parameters:

dialect (str | None) – Target SQL dialect for query translation. If None, returns original queries.

Returns:

A dictionary mapping query IDs to query strings

Return type:

dict[str, str]

get_query(query_id, *, params=None)[source]

Get a specific TPC-DI benchmark query.

Parameters:
  • query_id (int | str) – The ID of the query to retrieve

  • params (dict[str, Any] | None) – Optional parameters to customize the query

Returns:

The query string

Raises:

ValueError – If the query_id is invalid

Return type:

str

get_schema(dialect='standard')[source]

Get the TPC-DI schema.

Parameters:

dialect (str) – Target SQL dialect

Returns:

A dictionary mapping table names to table definitions

Return type:

dict[str, dict[str, Any]]

get_create_tables_sql(dialect='standard', tuning_config=None)[source]

Get SQL to create all TPC-DI tables.

Parameters:
  • dialect (str) – SQL dialect to use

  • tuning_config – Unified tuning configuration for constraint settings

Returns:

SQL script for creating all tables

Return type:

str

generate_source_data(formats=None, batch_types=None)[source]

Generate source data in various formats for ETL processing.

Parameters:
  • formats (list[str] | None) – List of data formats to generate (csv, xml, fixed_width, json)

  • batch_types (list[str] | None) – List of batch types to generate (historical, incremental, scd)

Returns:

Dictionary mapping formats to lists of generated file paths

Return type:

dict[str, list[str]]

run_etl_pipeline(connection, batch_type='historical', validate_data=True)[source]

Run the complete ETL pipeline for TPC-DI.

Parameters:
  • connection (Any) – Database connection for target warehouse

  • batch_type (str) – Type of batch to process (historical, incremental, scd)

  • validate_data (bool) – Whether to run data validation after ETL

Returns:

Dictionary containing ETL execution results and metrics

Return type:

dict[str, Any]

validate_etl_results(connection)[source]

Validate ETL results using data quality checks.

Parameters:

connection (Any) – Database connection to validate against

Returns:

Dictionary containing validation results and data quality metrics

Return type:

dict[str, Any]

get_etl_status()[source]

Get current ETL processing status and metrics.

Returns:

Dictionary containing ETL status, metrics, and batch information

Return type:

dict[str, Any]

property etl_mode: bool

Check if ETL mode is enabled.

Returns:

Always True as TPC-DI is now a pure ETL benchmark

load_data_to_database(connection, tables=None)[source]

Load generated data into a database.

Parameters:
  • connection (Any) – Database connection

  • tables (list[str] | None) – Optional list of tables to load. If None, loads all.

Raises:

ValueError – If data hasn’t been generated yet

run_benchmark(connection, queries=None, iterations=1)[source]

Run the complete TPC-DI benchmark.

Parameters:
  • connection (Any) – Database connection to use

  • queries (list[str] | None) – Optional list of query IDs to run. If None, runs all.

  • iterations (int) – Number of times to run each query

Returns:

Dictionary containing benchmark results

Return type:

dict[str, Any]

execute_query(query_id, connection, params=None)[source]

Execute a TPC-DI query on the given database connection.

Parameters:
  • query_id (int | str) – Query identifier (e.g., “V1”, “V2”, “A1”, etc.)

  • connection (Any) – Database connection to use for execution

  • params (dict[str, Any] | None) – Optional parameters to use in the query

Returns:

Query results from the database

Raises:

ValueError – If the query_id is not valid

Return type:

Any

create_schema(connection, dialect='duckdb')[source]

Create TPC-DI schema using the schema manager.

Parameters:
  • connection (Any) – Database connection

  • dialect (str) – Target SQL dialect

run_full_benchmark(connection, dialect='duckdb')[source]

Run the complete TPC-DI benchmark with all phases.

This is the main entry point for running a complete TPC-DI benchmark including schema creation, data loading, ETL processing, validation, and metrics calculation.

Parameters:
  • connection (Any) – Database connection

  • dialect (str) – SQL dialect for the target database

Returns:

Complete benchmark results with all metrics

Return type:

dict[str, Any]

run_etl_benchmark(connection, dialect='duckdb')[source]

Run the ETL benchmark pipeline.

Parameters:
  • connection (Any) – Database connection

  • dialect (str) – SQL dialect

Returns:

ETL execution results

Return type:

Any

run_data_validation(connection)[source]

Run data quality validation.

Parameters:

connection (Any) – Database connection

Returns:

Data quality validation results

Return type:

Any

calculate_official_metrics(etl_result, validation_result)[source]

Calculate official TPC-DI metrics.

Parameters:
  • etl_result (Any) – ETL execution results

  • validation_result (Any) – Data validation results

Returns:

Official TPC-DI benchmark metrics

Return type:

Any

optimize_database(connection)[source]

Optimize database performance for TPC-DI queries.

Parameters:

connection (Any) – Database connection

Returns:

Optimization results

Return type:

dict[str, Any]

property validator: Any

Get the TPC-DI validator instance.

Returns:

TPCDIValidator instance

property schema_manager: Any

Get the TPC-DI schema manager instance.

Returns:

TPCDISchemaManager instance

property metrics_calculator: Any

Get the TPC-DI metrics calculator instance.

Returns:

TPCDIMetrics instance

apply_verbosity(settings)

Apply verbosity settings to the mixin consumer.

property benchmark_name: str

Get the human-readable benchmark name.

create_enhanced_benchmark_result(platform, query_results, execution_metadata=None, phases=None, resource_utilization=None, performance_characteristics=None, **kwargs)

Create a BenchmarkResults object with standardized fields.

This centralizes the logic for creating benchmark results that was previously duplicated across platform adapters and CLI orchestrator.

Parameters:
  • platform (str) – Platform name (e.g., “DuckDB”, “ClickHouse”)

  • query_results (list[dict[str, Any]]) – List of query execution results

  • execution_metadata (dict[str, Any] | None) – Optional execution metadata

  • phases (dict[str, dict[str, Any]] | None) – Optional phase tracking information

  • resource_utilization (dict[str, Any] | None) – Optional resource usage metrics

  • performance_characteristics (dict[str, Any] | None) – Optional performance analysis

  • **kwargs (Any) – Additional fields to override defaults

Returns:

Fully configured BenchmarkResults object

Return type:

BenchmarkResults

format_results(benchmark_result)

Format benchmark results for display.

Parameters:

benchmark_result (dict[str, Any]) – Result dictionary from run_benchmark()

Returns:

Formatted string representation of the results

Return type:

str

get_data_source_benchmark()

Return the canonical source benchmark when data is shared.

Benchmarks that reuse data generated by another benchmark (for example, Primitives reusing TPC-H datasets) should override this method and return the lower-case identifier of the source benchmark. Benchmarks that produce their own data should return None (default).

log_debug_info(context='Debug')

Log comprehensive debug information including version details.

log_error_with_debug_info(error, context='Error')

Log an error with comprehensive debug information.

log_operation_complete(operation, duration=None, details='')
log_operation_start(operation, details='')
log_verbose(message)
log_version_warning()

Log version consistency warnings if any exist.

log_very_verbose(message)
property logger: Logger

Return the logger configured for the verbosity mixin consumer.

quiet: bool = False
run_query(query_id, connection, params=None, fetch_results=False)

Execute single query and return timing and results.

Parameters:
  • query_id (int | str) – ID of the query to execute

  • connection (DatabaseConnection) – Database connection to execute query on

  • params (dict[str, Any] | None) – Optional parameters for query customization

  • fetch_results (bool) – Whether to fetch and return query results

Returns:

  • query_id: Executed query ID

  • execution_time: Time taken to execute query in seconds

  • query_text: Executed query text

  • results: Query results if fetch_results=True, otherwise None

  • row_count: Number of rows returned (if results fetched)

Return type:

Dictionary containing

Raises:
  • ValueError – If query_id is invalid

  • Exception – If query execution fails

run_with_platform(platform_adapter, **run_config)

Run complete benchmark using platform-specific optimizations.

This method provides a unified interface for running benchmarks using database platform adapters that handle connection management, data loading optimizations, and query execution.

This is the standard method that all benchmarks should support for integration with the CLI and other orchestration tools.

Parameters:
  • platform_adapter – Platform adapter instance (e.g., DuckDBAdapter)

  • **run_config – Configuration options: - categories: List of query categories to run (if benchmark supports) - query_subset: List of specific query IDs to run - connection: Connection configuration - benchmark_type: Type hint for optimizations (‘olap’, ‘oltp’, etc.)

Returns:

BenchmarkResults object with execution results

Example

from benchbox.platforms import DuckDBAdapter

benchmark = SomeBenchmark(scale_factor=0.1) adapter = DuckDBAdapter() results = benchmark.run_with_platform(adapter)

setup_database(connection)

Set up database with schema and data.

Creates necessary database schema and loads benchmark data into the database.

Parameters:

connection (DatabaseConnection) – Database connection to set up

Raises:
  • ValueError – If data generation fails

  • Exception – If database setup fails

translate_query(query_id, dialect)

Translate a query to a specific SQL dialect.

Parameters:
  • query_id (int | str) – The ID of the query to translate

  • dialect (str) – The target SQL dialect

Returns:

The translated query string

Raises:
  • ValueError – If the query_id is invalid

  • ImportError – If sqlglot is not installed

  • ValueError – If the dialect is not supported

Return type:

str

verbose: bool = False
verbose_enabled: bool = False
verbose_level: int = 0
property verbosity_settings: VerbositySettings

Return the current verbosity settings.

very_verbose: bool = False

Constructor

TPCDI(
    scale_factor: float = 1.0,
    output_dir: Optional[Union[str, Path]] = None,
    verbose: bool = False,
    **kwargs
)

Parameters:

  • scale_factor (float): Data size multiplier. SF=1.0 generates ~100MB of data. Range: 0.1 to 30+

  • output_dir (str|Path, optional): Directory for generated data files. Default: temporary directory

  • verbose (bool): Enable verbose logging. Default: False

  • kwargs: Additional options (e.g., batch_size, enable_scd, validate_data)

Raises:

  • ValueError: If scale_factor is not positive

  • TypeError: If scale_factor is not a number

Methods

generate_data()

Generate TPC-DI data warehouse tables.

data_files = benchmark.generate_data()
print(f"Generated {len(data_files)} table files")
Returns:

List[Union[str, Path]]: Paths to generated data files

generate_source_data(formats=None, batch_types=None)

Generate source data files in various formats for ETL processing.

# Generate all source formats
source_files = benchmark.generate_source_data()

# Generate specific formats
source_files = benchmark.generate_source_data(
    formats=["csv", "xml"],
    batch_types=["historical", "incremental"]
)

for format_type, files in source_files.items():
    print(f"{format_type}: {len(files)} files")

Parameters:

  • formats (list[str], optional): Data formats to generate. Options: “csv”, “xml”, “fixed_width”, “json”

  • batch_types (list[str], optional): Batch types to generate. Options: “historical”, “incremental”, “scd”

Returns:

dict[str, list[str]]: Dictionary mapping formats to file paths

get_query(query_id, \*, params=None)

Get a specific TPC-DI query.

# Get validation query
v1 = benchmark.get_query("V1")

# Get analytical query
a1 = benchmark.get_query("A1")

# Get data quality query
dq1 = benchmark.get_query("DQ1")

Parameters:

  • query_id (int|str): Query ID (e.g., “V1”, “A1”, “DQ1”)

  • params (dict, optional): Query parameters

Returns:

str: Query SQL text

Raises:

  • ValueError: If query_id is invalid

get_queries(dialect=None)

Get all TPC-DI benchmark queries.

# Get all queries
queries = benchmark.get_queries()
print(f"Total queries: {len(queries)}")

# Get with dialect translation
queries_bq = benchmark.get_queries(dialect="bigquery")

Parameters:

  • dialect (str, optional): Target SQL dialect

Returns:

dict[str, str]: Dictionary mapping query IDs to SQL text

get_schema()

Get TPC-DI schema information.

schema = benchmark.get_schema()
for table in schema:
    print(f"{table['name']}: {len(table['columns'])} columns")
Returns:

list[dict]: List of table definitions with columns and types

get_create_tables_sql(dialect=”standard”, tuning_config=None)

Get CREATE TABLE SQL for all TPC-DI tables.

# Standard SQL
create_sql = benchmark.get_create_tables_sql()

# With dialect
create_sql_sf = benchmark.get_create_tables_sql(dialect="snowflake")

# With tuning configuration
from benchbox.core.tuning.interface import UnifiedTuningConfiguration
tuning = UnifiedTuningConfiguration(...)
create_sql_tuned = benchmark.get_create_tables_sql(tuning_config=tuning)

Parameters:

  • dialect (str): Target SQL dialect. Default: “standard”

  • tuning_config (UnifiedTuningConfiguration, optional): Tuning settings

Returns:

str: SQL script for creating all tables

ETL Methods

run_etl_pipeline(connection, batch_type=”historical”, validate_data=True)

Run the complete ETL pipeline for TPC-DI.

from benchbox.platforms.duckdb import DuckDBAdapter

adapter = DuckDBAdapter()
conn = adapter.create_connection()

# Run historical batch
etl_result = benchmark.run_etl_pipeline(
    conn,
    batch_type="historical",
    validate_data=True
)

print(f"ETL duration: {etl_result['duration']:.2f}s")
print(f"Records loaded: {etl_result['records_loaded']:,}")

Parameters:

  • connection (Any): Database connection for target warehouse

  • batch_type (str): Type of batch. Options: “historical”, “incremental”, “scd”. Default: “historical”

  • validate_data (bool): Run data validation after ETL. Default: True

Returns:

dict[str, Any]: ETL execution results and metrics

validate_etl_results(connection)

Validate ETL results using data quality checks.

validation_result = benchmark.validate_etl_results(conn)

print(f"Validation status: {validation_result['status']}")
print(f"Passed checks: {validation_result['passed_checks']}")
print(f"Failed checks: {validation_result['failed_checks']}")

Parameters:

  • connection (Any): Database connection to validate against

Returns:

dict[str, Any]: Validation results and data quality metrics

get_etl_status()

Get current ETL processing status and metrics.

status = benchmark.get_etl_status()

print(f"Current batch: {status['batch_id']}")
print(f"Tables loaded: {status['tables_loaded']}")
print(f"Total records: {status['total_records']:,}")
Returns:

dict[str, Any]: ETL status, metrics, and batch information

Benchmark Methods

run_full_benchmark(connection, dialect=”duckdb”)

Run the complete TPC-DI benchmark with all phases.

from benchbox.platforms.duckdb import DuckDBAdapter

adapter = DuckDBAdapter()
conn = adapter.create_connection()

# Run complete benchmark
results = benchmark.run_full_benchmark(conn, dialect="duckdb")

print(f"Total duration: {results['total_duration']:.2f}s")
print(f"ETL duration: {results['etl_duration']:.2f}s")
print(f"Query duration: {results['query_duration']:.2f}s")
print(f"Data quality score: {results['quality_score']:.2f}")

Parameters:

  • connection (Any): Database connection

  • dialect (str): SQL dialect for the target database. Default: “duckdb”

Returns:

dict[str, Any]: Complete benchmark results with all metrics

create_schema(connection, dialect=”duckdb”)

Create TPC-DI schema using the schema manager.

adapter.create_schema(benchmark, conn)

Parameters:

  • connection (Any): Database connection

  • dialect (str): Target SQL dialect. Default: “duckdb”

run_etl_benchmark(connection, dialect=”duckdb”)

Run the ETL benchmark pipeline.

etl_results = benchmark.run_etl_benchmark(conn, dialect="duckdb")

Parameters:

  • connection (Any): Database connection

  • dialect (str): SQL dialect. Default: “duckdb”

Returns:

Any: ETL execution results

run_data_validation(connection)

Run data quality validation.

validation_results = benchmark.run_data_validation(conn)

Parameters:

  • connection (Any): Database connection

Returns:

Any: Data quality validation results

calculate_official_metrics(etl_result, validation_result)

Calculate official TPC-DI metrics.

# Run ETL and validation
etl_result = benchmark.run_etl_benchmark(conn)
validation_result = benchmark.run_data_validation(conn)

# Calculate official metrics
metrics = benchmark.calculate_official_metrics(etl_result, validation_result)

print(f"TPC-DI Metric: {metrics['tpcdi_metric']}")
print(f"Throughput: {metrics['throughput']:.2f} records/sec")

Parameters:

  • etl_result (Any): ETL execution results

  • validation_result (Any): Data validation results

Returns:

Any: Official TPC-DI benchmark metrics

optimize_database(connection)

Optimize database performance for TPC-DI queries.

optimization_result = benchmark.optimize_database(conn)

print(f"Indexes created: {optimization_result['indexes_created']}")
print(f"Statistics updated: {optimization_result['stats_updated']}")

Parameters:

  • connection (Any): Database connection

Returns:

dict[str, Any]: Optimization results

Properties

etl_mode

Check if ETL mode is enabled.

if benchmark.etl_mode:
    print("ETL mode enabled")
Returns:

bool: Always True (TPC-DI is an ETL benchmark)

validator

Access to the TPC-DI validator instance.

validator = benchmark.validator
validation_result = validator.run_all_validations(conn)
Returns:

TPCDIValidator: Validator instance for data quality checks

schema_manager

Access to the TPC-DI schema manager instance.

schema_mgr = benchmark.schema_manager
schema_info = schema_mgr.get_table_info("DimCustomer")
Returns:

TPCDISchemaManager: Schema manager instance

metrics_calculator

Access to the TPC-DI metrics calculator instance.

metrics_calc = benchmark.metrics_calculator
official_metrics = metrics_calc.calculate_metrics(etl_result)
Returns:

TPCDIMetrics: Metrics calculator instance

Usage Examples

Basic ETL Pipeline

from benchbox.tpcdi import TPCDI
from benchbox.platforms.duckdb import DuckDBAdapter

# Create benchmark with scale factor 1 (~100MB)
benchmark = TPCDI(scale_factor=1.0)

# Generate source data
source_files = benchmark.generate_source_data()

# Setup database
adapter = DuckDBAdapter()
conn = adapter.create_connection()

# Create schema
benchmark.create_schema(conn)

# Run ETL pipeline
etl_result = benchmark.run_etl_pipeline(
    conn,
    batch_type="historical",
    validate_data=True
)

print(f"ETL completed in {etl_result['duration']:.2f}s")
print(f"Validation status: {etl_result['validation_status']}")

Incremental Batch Processing

from benchbox.tpcdi import TPCDI
from benchbox.platforms.duckdb import DuckDBAdapter

benchmark = TPCDI(scale_factor=1.0)
adapter = DuckDBAdapter()
conn = adapter.create_connection()

# Initial historical load
print("Running historical load...")
hist_result = benchmark.run_etl_pipeline(
    conn,
    batch_type="historical",
    validate_data=True
)

# Process incremental batches
for batch_id in range(1, 4):
    print(f"Processing incremental batch {batch_id}...")
    inc_result = benchmark.run_etl_pipeline(
        conn,
        batch_type="incremental",
        validate_data=True
    )
    print(f"Batch {batch_id} duration: {inc_result['duration']:.2f}s")

Data Quality Validation

from benchbox.tpcdi import TPCDI

benchmark = TPCDI(scale_factor=1.0)

# Run ETL
etl_result = benchmark.run_etl_pipeline(conn)

# Run comprehensive validation
validation = benchmark.validate_etl_results(conn)

# Check validation results
print(f"Validation Queries:")
for query_id in ["V1", "V2", "V3", "V4", "V5"]:
    status = validation['queries'][query_id]['status']
    print(f"  {query_id}: {status}")

print(f"\nData Quality Checks:")
for check_id in ["DQ1", "DQ2", "DQ3", "DQ4", "DQ5"]:
    status = validation['quality_checks'][check_id]['status']
    violations = validation['quality_checks'][check_id]['violations']
    print(f"  {check_id}: {status} ({violations} violations)")

# Overall quality score
print(f"\nOverall quality score: {validation['quality_score']:.2f}%")

SCD Type 2 Processing Example

from benchbox.tpcdi import TPCDI
from benchbox.platforms.duckdb import DuckDBAdapter

benchmark = TPCDI(scale_factor=0.1, enable_scd=True)
adapter = DuckDBAdapter()
conn = adapter.create_connection()

# Create schema with SCD support
benchmark.create_schema(conn)

# Load initial data
benchmark.run_etl_pipeline(conn, batch_type="historical")

# Query current customer records
current_customers = conn.execute("""
    SELECT CustomerID, LastName, FirstName, IsCurrent, EffectiveDate
    FROM DimCustomer
    WHERE IsCurrent = 1
    ORDER BY CustomerID
    LIMIT 5
""").fetchall()

print("Current customers:")
for customer in current_customers:
    print(f"  {customer}")

# Process SCD batch (creates new versions)
benchmark.run_etl_pipeline(conn, batch_type="scd")

# Query historical records
historical_customers = conn.execute("""
    SELECT CustomerID, LastName, FirstName, IsCurrent,
           EffectiveDate, EndDate
    FROM DimCustomer
    WHERE CustomerID = 1000
    ORDER BY EffectiveDate
""").fetchall()

print("\nCustomer history (CustomerID=1000):")
for record in historical_customers:
    print(f"  {record}")

Multi-Platform Comparison

from benchbox.tpcdi import TPCDI
from benchbox.platforms.duckdb import DuckDBAdapter
from benchbox.platforms.clickhouse import ClickHouseAdapter
import pandas as pd

benchmark = TPCDI(scale_factor=1.0, output_dir="./data/tpcdi_sf1")

platforms = {
    "DuckDB": DuckDBAdapter(),
    "ClickHouse": ClickHouseAdapter(host="localhost"),
}

results_data = []

for name, adapter in platforms.items():
    print(f"\nBenchmarking {name}...")
    conn = adapter.create_connection()

    # Run full benchmark
    result = benchmark.run_full_benchmark(conn)

    results_data.append({
        "platform": name,
        "etl_duration": result['etl_duration'],
        "query_duration": result['query_duration'],
        "total_duration": result['total_duration'],
        "quality_score": result['quality_score'],
        "validation_passed": result['validation_passed'],
    })

df = pd.DataFrame(results_data)
print("\nBenchmark Results:")
print(df)

Complete Official Benchmark

from benchbox.tpcdi import TPCDI
from benchbox.platforms.duckdb import DuckDBAdapter

# Setup
benchmark = TPCDI(scale_factor=3.0, verbose=True)
adapter = DuckDBAdapter()
conn = adapter.create_connection()

# Phase 1: Schema Creation
print("Phase 1: Creating schema...")
benchmark.create_schema(conn)

# Phase 2: Historical Load
print("Phase 2: Historical data load...")
etl_result = benchmark.run_etl_benchmark(conn)

# Phase 3: Data Validation
print("Phase 3: Data quality validation...")
validation_result = benchmark.run_data_validation(conn)

# Phase 4: Database Optimization
print("Phase 4: Database optimization...")
opt_result = benchmark.optimize_database(conn)

# Phase 5: Query Execution
print("Phase 5: Running analytical queries...")
query_results = {}
for query_id in ["A1", "A2", "A3", "A4", "A5", "A6"]:
    query = benchmark.get_query(query_id)
    result = adapter.execute_query(conn, query, query_id)
    query_results[query_id] = result

# Phase 6: Calculate Official Metrics
print("Phase 6: Calculating official metrics...")
official_metrics = benchmark.calculate_official_metrics(
    etl_result,
    validation_result
)

# Report results
print("\n" + "="*60)
print("TPC-DI Benchmark Results")
print("="*60)
print(f"Scale Factor: {benchmark.scale_factor}")
print(f"ETL Duration: {etl_result['duration']:.2f}s")
print(f"Data Quality Score: {validation_result['quality_score']:.2f}%")
print(f"TPC-DI Metric: {official_metrics['tpcdi_metric']}")
print(f"Throughput: {official_metrics['throughput']:.2f} records/sec")
print("="*60)

See Also

External Resources