SSB (Star Schema Benchmark) API

Tags reference python-api ssb

Complete Python API reference for the Star Schema Benchmark (SSB).

Overview

The Star Schema Benchmark (SSB) is a simplified variant of TPC-H specifically designed for testing OLAP systems and data warehouses. It transforms the normalized TPC-H schema into a denormalized star schema that better represents typical data warehouse designs.

Key Features:

  • Star schema design - Single fact table with dimension tables

  • 13 analytical queries - Organized into 4 logical flights

  • Simplified query patterns - Focus on aggregation and filtering

  • OLAP-oriented - Tests typical data warehouse patterns

  • Parameterized queries - Configurable selectivity

  • Performance-focused - Stresses aggregation and scan performance

  • Scale factors from 0.01 to 100+

Quick Start

from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter

# Create benchmark
benchmark = SSB(scale_factor=1.0)

# Generate data
benchmark.generate_data()

# Run on platform
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)

print(f"Completed in {results.total_execution_time:.2f}s")

API Reference

SSB Class

class SSB(scale_factor=1.0, output_dir=None, **kwargs)[source]

Bases: BaseBenchmark

Star Schema Benchmark implementation.

This class provides an implementation of the Star Schema Benchmark, including data generation and access to the 13 benchmark queries organized in 4 flights.

Reference: O’Neil et al. “The Star Schema Benchmark and Augmented Fact Table Indexing”

__init__(scale_factor=1.0, output_dir=None, **kwargs)[source]

Initialize a Star Schema Benchmark instance.

Parameters:
  • scale_factor (float) – Scale factor for the benchmark (1.0 = ~1GB)

  • output_dir (str | Path | None) – Directory to output generated data files

  • **kwargs (Any) – Additional implementation-specific options

generate_data()[source]

Generate Star Schema Benchmark data.

Returns:

A list of paths to the generated data files

Return type:

list[str | Path]

get_queries(dialect=None)[source]

Get all Star Schema Benchmark queries.

Parameters:

dialect (str | None) – Target SQL dialect for query translation. If None, returns original queries.

Returns:

A dictionary mapping query IDs to query strings

Return type:

dict[str, str]

get_query(query_id, *, params=None)[source]

Get a specific Star Schema Benchmark query.

Parameters:
  • query_id (str) – The ID of the query to retrieve (Q1.1-Q4.3)

  • params (dict[str, Any] | None) – Optional parameters to customize the query

Returns:

The query string

Raises:

ValueError – If the query_id is invalid

Return type:

str

get_schema()[source]

Get the Star Schema Benchmark schema.

Returns:

A list of dictionaries describing the tables in the schema

Return type:

list[dict]

get_create_tables_sql(dialect='standard', tuning_config=None)[source]

Get SQL to create all Star Schema Benchmark tables.

Parameters:
  • dialect (str) – SQL dialect to use

  • tuning_config – Unified tuning configuration for constraint settings

Returns:

SQL script for creating all tables

Return type:

str

apply_verbosity(settings)

Apply verbosity settings to the mixin consumer.

property benchmark_name: str

Get the human-readable benchmark name.

create_enhanced_benchmark_result(platform, query_results, execution_metadata=None, phases=None, resource_utilization=None, performance_characteristics=None, **kwargs)

Create a BenchmarkResults object with standardized fields.

This centralizes the logic for creating benchmark results that was previously duplicated across platform adapters and CLI orchestrator.

Parameters:
  • platform (str) – Platform name (e.g., “DuckDB”, “ClickHouse”)

  • query_results (list[dict[str, Any]]) – List of query execution results

  • execution_metadata (dict[str, Any] | None) – Optional execution metadata

  • phases (dict[str, dict[str, Any]] | None) – Optional phase tracking information

  • resource_utilization (dict[str, Any] | None) – Optional resource usage metrics

  • performance_characteristics (dict[str, Any] | None) – Optional performance analysis

  • **kwargs (Any) – Additional fields to override defaults

Returns:

Fully configured BenchmarkResults object

Return type:

BenchmarkResults

format_results(benchmark_result)

Format benchmark results for display.

Parameters:

benchmark_result (dict[str, Any]) – Result dictionary from run_benchmark()

Returns:

Formatted string representation of the results

Return type:

str

get_data_source_benchmark()

Return the canonical source benchmark when data is shared.

Benchmarks that reuse data generated by another benchmark (for example, Primitives reusing TPC-H datasets) should override this method and return the lower-case identifier of the source benchmark. Benchmarks that produce their own data should return None (default).

log_debug_info(context='Debug')

Log comprehensive debug information including version details.

log_error_with_debug_info(error, context='Error')

Log an error with comprehensive debug information.

log_operation_complete(operation, duration=None, details='')
log_operation_start(operation, details='')
log_verbose(message)
log_version_warning()

Log version consistency warnings if any exist.

log_very_verbose(message)
property logger: Logger

Return the logger configured for the verbosity mixin consumer.

quiet: bool = False
run_benchmark(connection, query_ids=None, fetch_results=False, setup_database=True)

Run the complete benchmark suite.

Parameters:
  • connection (DatabaseConnection) – Database connection to execute queries on

  • query_ids (list[int | str] | None) – Optional list of specific query IDs to run (defaults to all)

  • fetch_results (bool) – Whether to fetch and return query results

  • setup_database (bool) – Whether to set up the database first

Returns:

  • benchmark_name: Name of the benchmark

  • total_execution_time: Total time for all queries

  • total_queries: Number of queries executed

  • successful_queries: Number of queries that succeeded

  • failed_queries: Number of queries that failed

  • query_results: List of individual query results

  • setup_time: Time taken for database setup (if performed)

Return type:

Dictionary containing

Raises:

Exception – If benchmark execution fails

run_query(query_id, connection, params=None, fetch_results=False)

Execute single query and return timing and results.

Parameters:
  • query_id (int | str) – ID of the query to execute

  • connection (DatabaseConnection) – Database connection to execute query on

  • params (dict[str, Any] | None) – Optional parameters for query customization

  • fetch_results (bool) – Whether to fetch and return query results

Returns:

  • query_id: Executed query ID

  • execution_time: Time taken to execute query in seconds

  • query_text: Executed query text

  • results: Query results if fetch_results=True, otherwise None

  • row_count: Number of rows returned (if results fetched)

Return type:

Dictionary containing

Raises:
  • ValueError – If query_id is invalid

  • Exception – If query execution fails

run_with_platform(platform_adapter, **run_config)

Run complete benchmark using platform-specific optimizations.

This method provides a unified interface for running benchmarks using database platform adapters that handle connection management, data loading optimizations, and query execution.

This is the standard method that all benchmarks should support for integration with the CLI and other orchestration tools.

Parameters:
  • platform_adapter – Platform adapter instance (e.g., DuckDBAdapter)

  • **run_config – Configuration options: - categories: List of query categories to run (if benchmark supports) - query_subset: List of specific query IDs to run - connection: Connection configuration - benchmark_type: Type hint for optimizations (‘olap’, ‘oltp’, etc.)

Returns:

BenchmarkResults object with execution results

Example

from benchbox.platforms import DuckDBAdapter

benchmark = SomeBenchmark(scale_factor=0.1) adapter = DuckDBAdapter() results = benchmark.run_with_platform(adapter)

setup_database(connection)

Set up database with schema and data.

Creates necessary database schema and loads benchmark data into the database.

Parameters:

connection (DatabaseConnection) – Database connection to set up

Raises:
  • ValueError – If data generation fails

  • Exception – If database setup fails

translate_query(query_id, dialect)

Translate a query to a specific SQL dialect.

Parameters:
  • query_id (int | str) – The ID of the query to translate

  • dialect (str) – The target SQL dialect

Returns:

The translated query string

Raises:
  • ValueError – If the query_id is invalid

  • ImportError – If sqlglot is not installed

  • ValueError – If the dialect is not supported

Return type:

str

verbose: bool = False
verbose_enabled: bool = False
verbose_level: int = 0
property verbosity_settings: VerbositySettings

Return the current verbosity settings.

very_verbose: bool = False

Constructor

SSB(
    scale_factor: float = 1.0,
    output_dir: Optional[Union[str, Path]] = None,
    **kwargs
)

Parameters:

  • scale_factor (float): Data size multiplier. SF=1.0 generates ~500MB (~6M rows in fact table). Range: 0.01 to 100+

  • output_dir (str|Path, optional): Directory for generated data files. Default: temporary directory

  • kwargs: Additional options (e.g., date_range_years, partition_fact_table)

Raises:

  • ValueError: If scale_factor is not positive

  • TypeError: If scale_factor is not a number

Methods

generate_data()

Generate Star Schema Benchmark data.

data_files = benchmark.generate_data()
print(f"Generated {len(data_files)} table files")
Returns:

List[Union[str, Path]]: Paths to generated data files (5 tables: date, customer, supplier, part, lineorder)

get_query(query_id, \*, params=None)

Get a specific SSB query.

# Get Flight 1 query
q1_1 = benchmark.get_query("Q1.1")

# Get with custom parameters
q2_1 = benchmark.get_query("Q2.1", params={
    "category": "MFGR#12",
    "region": "AMERICA"
})

# Get Flight 3 query
q3_1 = benchmark.get_query("Q3.1")

Parameters:

  • query_id (str): Query ID (Q1.1 to Q4.3)

  • params (dict, optional): Query parameters for customization

Returns:

str: Query SQL text

Raises:

  • ValueError: If query_id is invalid

get_queries(dialect=None)

Get all SSB benchmark queries.

# Get all queries
queries = benchmark.get_queries()
print(f"Total queries: {len(queries)}")  # 13 queries

# Get with dialect translation
queries_ch = benchmark.get_queries(dialect="clickhouse")

Parameters:

  • dialect (str, optional): Target SQL dialect

Returns:

dict[str, str]: Dictionary mapping query IDs to SQL text

get_schema()

Get SSB schema information.

schema = benchmark.get_schema()
for table in schema:
    print(f"{table['name']}: {len(table['columns'])} columns")
Returns:

list[dict]: List of table definitions (5 tables: date, customer, supplier, part, lineorder)

get_create_tables_sql(dialect=”standard”, tuning_config=None)

Get CREATE TABLE SQL for all SSB tables.

# Standard SQL
create_sql = benchmark.get_create_tables_sql()

# With dialect
create_sql_ch = benchmark.get_create_tables_sql(dialect="clickhouse")

# With tuning configuration
from benchbox.core.tuning.interface import UnifiedTuningConfiguration
tuning = UnifiedTuningConfiguration(...)
create_sql_tuned = benchmark.get_create_tables_sql(tuning_config=tuning)

Parameters:

  • dialect (str): Target SQL dialect. Default: “standard”

  • tuning_config (UnifiedTuningConfiguration, optional): Tuning settings

Returns:

str: SQL script for creating all tables

Query Flights

SSB organizes 13 queries into 4 logical flights:

Flight 1: Simple Aggregation (Q1.1 - Q1.3)

Tests basic aggregation and filtering on the fact table.

# Flight 1 focuses on fact table scan performance
flight_1_queries = ["Q1.1", "Q1.2", "Q1.3"]

for query_id in flight_1_queries:
    query = benchmark.get_query(query_id)
    # Execute query...

Query Characteristics:

  • Q1.1: Year-based aggregation with discount and quantity filters

  • Q1.2: Month-based aggregation with quantity range

  • Q1.3: Week-based aggregation with refined quantity range

Flight 2: Customer-Supplier Analysis (Q2.1 - Q2.3)

Tests dimension table joins and drill-down analysis.

# Flight 2 tests star schema join performance
flight_2_queries = ["Q2.1", "Q2.2", "Q2.3"]

for query_id in flight_2_queries:
    query = benchmark.get_query(query_id, params={
        "category": "MFGR#12",
        "region": "AMERICA"
    })
    # Execute query...

Query Characteristics:

  • Q2.1: Brand category analysis with supplier region

  • Q2.2: Brand range analysis with supplier region

  • Q2.3: Specific brand focus with supplier region

Flight 3: Customer Behavior Analysis (Q3.1 - Q3.4)

Tests complex multi-dimension analysis with customer geography.

# Flight 3 tests complex star joins
flight_3_queries = ["Q3.1", "Q3.2", "Q3.3", "Q3.4"]

for query_id in flight_3_queries:
    query = benchmark.get_query(query_id)
    # Execute query...

Query Characteristics:

  • Q3.1: Customer nation analysis

  • Q3.2: City-level analysis

  • Q3.3: Specific city analysis

  • Q3.4: Regional analysis

Flight 4: Profit Analysis (Q4.1 - Q4.3)

Tests complex aggregation with profit calculations.

# Flight 4 tests analytical queries with calculations
flight_4_queries = ["Q4.1", "Q4.2", "Q4.3"]

for query_id in flight_4_queries:
    query = benchmark.get_query(query_id)
    # Execute query...

Query Characteristics:

  • Q4.1: Regional profit analysis

  • Q4.2: Nation-specific profit trends

  • Q4.3: City-specific profit with part category

Usage Examples

Basic Benchmark Run

from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter

# Create benchmark with scale factor 1 (~500MB)
benchmark = SSB(scale_factor=1.0)

# Generate data
benchmark.generate_data()

# Run on DuckDB
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)

# Print results
print(f"Benchmark: {results.benchmark_name}")
print(f"Total time: {results.total_execution_time:.2f}s")
print(f"Queries: {results.successful_queries}/{results.total_queries}")

Flight-Based Execution

from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter
import time

benchmark = SSB(scale_factor=1.0)
benchmark.generate_data()

adapter = DuckDBAdapter()
conn = adapter.create_connection()

# Load data
adapter.create_schema(benchmark, conn)
adapter.load_data(benchmark, conn, benchmark.output_dir)

# Run Flight 1 (Simple Aggregation)
print("Flight 1: Simple Aggregation")
flight_1_queries = ["Q1.1", "Q1.2", "Q1.3"]

flight_1_times = {}
for query_id in flight_1_queries:
    query = benchmark.get_query(query_id)

    start = time.time()
    result = adapter.execute_query(conn, query, query_id)
    duration = time.time() - start

    flight_1_times[query_id] = duration
    print(f"  {query_id}: {duration:.3f}s")

print(f"Flight 1 total: {sum(flight_1_times.values()):.2f}s")

Parameterized Query Execution

from benchbox.ssb import SSB

benchmark = SSB(scale_factor=1.0)

# Define custom parameters
custom_params = {
    "year": 1993,
    "year_month": 199401,
    "week": 6,
    "discount_min": 1,
    "discount_max": 3,
    "quantity": 25,
    "quantity_min": 26,
    "quantity_max": 35,
    "category": "MFGR#12",
    "region": "AMERICA",
    "brand_min": "MFGR#2221",
    "brand_max": "MFGR#2228",
    "brand": "MFGR#2221"
}

# Get queries with custom parameters
q1_1 = benchmark.get_query("Q1.1", params=custom_params)
q2_1 = benchmark.get_query("Q2.1", params=custom_params)

print("Q1.1 with custom year:")
print(q1_1[:200])

Complete Flight Benchmark

from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter
import time

benchmark = SSB(scale_factor=1.0)
adapter = DuckDBAdapter()

# Setup
benchmark.generate_data()
conn = adapter.create_connection()
adapter.create_schema(benchmark, conn)
adapter.load_data(benchmark, conn, benchmark.output_dir)

# Define all flights
flights = {
    "Flight 1": ["Q1.1", "Q1.2", "Q1.3"],
    "Flight 2": ["Q2.1", "Q2.2", "Q2.3"],
    "Flight 3": ["Q3.1", "Q3.2", "Q3.3", "Q3.4"],
    "Flight 4": ["Q4.1", "Q4.2", "Q4.3"]
}

# Run all flights
all_results = {}

for flight_name, query_ids in flights.items():
    print(f"\n{flight_name}:")
    flight_results = {}

    for query_id in query_ids:
        query = benchmark.get_query(query_id)

        start = time.time()
        result = adapter.execute_query(conn, query, query_id)
        duration = time.time() - start

        flight_results[query_id] = {
            "duration": duration,
            "status": result["status"],
            "rows": result.get("rows_returned", 0)
        }

        print(f"  {query_id}: {duration:.3f}s ({result['status']})")

    all_results[flight_name] = flight_results

# Summary
print("\n" + "="*60)
print("SSB Benchmark Summary")
print("="*60)
for flight_name, flight_results in all_results.items():
    total_time = sum(r["duration"] for r in flight_results.values())
    print(f"{flight_name}: {total_time:.2f}s total")

Multi-Platform Comparison

from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter
from benchbox.platforms.clickhouse import ClickHouseAdapter
import pandas as pd

benchmark = SSB(scale_factor=1.0, output_dir="./data/ssb_sf1")
benchmark.generate_data()

platforms = {
    "DuckDB": DuckDBAdapter(),
    "ClickHouse": ClickHouseAdapter(host="localhost"),
}

results_data = []

for name, adapter in platforms.items():
    print(f"\nBenchmarking {name}...")
    results = benchmark.run_with_platform(adapter)

    results_data.append({
        "platform": name,
        "total_time": results.total_execution_time,
        "avg_query_time": results.average_query_time,
        "successful": results.successful_queries,
        "failed": results.failed_queries,
    })

df = pd.DataFrame(results_data)
print("\nBenchmark Results:")
print(df)

Columnar Database Optimization

from benchbox.ssb import SSB
from benchbox.platforms.clickhouse import ClickHouseAdapter

# SSB is ideal for columnar databases
benchmark = SSB(scale_factor=10.0)
adapter = ClickHouseAdapter(host="localhost")

# Generate and load data
benchmark.generate_data()
conn = adapter.create_connection()

# Create schema with columnar optimizations
adapter.create_schema(benchmark, conn)
adapter.load_data(benchmark, conn, benchmark.output_dir)

# Add columnar-specific optimizations
conn.execute("""
    -- Optimize fact table for columnar access
    ALTER TABLE lineorder
    ORDER BY (lo_orderdate, lo_custkey, lo_partkey);
""")

# Run Flight 1 (benefits most from columnar storage)
flight_1_queries = ["Q1.1", "Q1.2", "Q1.3"]
for query_id in flight_1_queries:
    query = benchmark.get_query(query_id)
    result = adapter.execute_query(conn, query, query_id)
    print(f"{query_id}: {result['execution_time']:.3f}s")

Scale Factor Comparison

from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter
import pandas as pd

adapter = DuckDBAdapter()
scale_factors = [0.1, 1.0, 10.0]
results_data = []

for sf in scale_factors:
    print(f"\nTesting SF={sf}...")

    benchmark = SSB(scale_factor=sf, output_dir=f"./data/ssb_sf{sf}")
    benchmark.generate_data()

    results = benchmark.run_with_platform(adapter)

    results_data.append({
        "scale_factor": sf,
        "data_size_mb": sf * 500,  # Approximate
        "total_time": results.total_execution_time,
        "avg_query_time": results.average_query_time,
        "queries_per_sec": results.total_queries / results.total_execution_time
    })

df = pd.DataFrame(results_data)
print("\nScale Factor Performance:")
print(df)

See Also

External Resources