SSB (Star Schema Benchmark) API¶
Complete Python API reference for the Star Schema Benchmark (SSB).
Overview¶
The Star Schema Benchmark (SSB) is a simplified variant of TPC-H specifically designed for testing OLAP systems and data warehouses. It transforms the normalized TPC-H schema into a denormalized star schema that better represents typical data warehouse designs.
Key Features:
Star schema design - Single fact table with dimension tables
13 analytical queries - Organized into 4 logical flights
Simplified query patterns - Focus on aggregation and filtering
OLAP-oriented - Tests typical data warehouse patterns
Parameterized queries - Configurable selectivity
Performance-focused - Stresses aggregation and scan performance
Scale factors from 0.01 to 100+
Quick Start¶
from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter
# Create benchmark
benchmark = SSB(scale_factor=1.0)
# Generate data
benchmark.generate_data()
# Run on platform
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)
print(f"Completed in {results.total_execution_time:.2f}s")
API Reference¶
SSB Class¶
- class SSB(scale_factor=1.0, output_dir=None, **kwargs)[source]¶
Bases:
BaseBenchmarkStar Schema Benchmark implementation.
This class provides an implementation of the Star Schema Benchmark, including data generation and access to the 13 benchmark queries organized in 4 flights.
Reference: O’Neil et al. “The Star Schema Benchmark and Augmented Fact Table Indexing”
- __init__(scale_factor=1.0, output_dir=None, **kwargs)[source]¶
Initialize a Star Schema Benchmark instance.
- Parameters:
scale_factor (float) – Scale factor for the benchmark (1.0 = ~1GB)
output_dir (str | Path | None) – Directory to output generated data files
**kwargs (Any) – Additional implementation-specific options
- generate_data()[source]¶
Generate Star Schema Benchmark data.
- Returns:
A list of paths to the generated data files
- Return type:
list[str | Path]
- get_queries(dialect=None)[source]¶
Get all Star Schema Benchmark queries.
- Parameters:
dialect (str | None) – Target SQL dialect for query translation. If None, returns original queries.
- Returns:
A dictionary mapping query IDs to query strings
- Return type:
dict[str, str]
- get_query(query_id, *, params=None)[source]¶
Get a specific Star Schema Benchmark query.
- Parameters:
query_id (str) – The ID of the query to retrieve (Q1.1-Q4.3)
params (dict[str, Any] | None) – Optional parameters to customize the query
- Returns:
The query string
- Raises:
ValueError – If the query_id is invalid
- Return type:
str
- get_schema()[source]¶
Get the Star Schema Benchmark schema.
- Returns:
A list of dictionaries describing the tables in the schema
- Return type:
list[dict]
- get_create_tables_sql(dialect='standard', tuning_config=None)[source]¶
Get SQL to create all Star Schema Benchmark tables.
- Parameters:
dialect (str) – SQL dialect to use
tuning_config – Unified tuning configuration for constraint settings
- Returns:
SQL script for creating all tables
- Return type:
str
- apply_verbosity(settings)¶
Apply verbosity settings to the mixin consumer.
- property benchmark_name: str¶
Get the human-readable benchmark name.
- create_enhanced_benchmark_result(platform, query_results, execution_metadata=None, phases=None, resource_utilization=None, performance_characteristics=None, **kwargs)¶
Create a BenchmarkResults object with standardized fields.
This centralizes the logic for creating benchmark results that was previously duplicated across platform adapters and CLI orchestrator.
- Parameters:
platform (str) – Platform name (e.g., “DuckDB”, “ClickHouse”)
query_results (list[dict[str, Any]]) – List of query execution results
execution_metadata (dict[str, Any] | None) – Optional execution metadata
phases (dict[str, dict[str, Any]] | None) – Optional phase tracking information
resource_utilization (dict[str, Any] | None) – Optional resource usage metrics
performance_characteristics (dict[str, Any] | None) – Optional performance analysis
**kwargs (Any) – Additional fields to override defaults
- Returns:
Fully configured BenchmarkResults object
- Return type:
- format_results(benchmark_result)¶
Format benchmark results for display.
- Parameters:
benchmark_result (dict[str, Any]) – Result dictionary from run_benchmark()
- Returns:
Formatted string representation of the results
- Return type:
str
- get_data_source_benchmark()¶
Return the canonical source benchmark when data is shared.
Benchmarks that reuse data generated by another benchmark (for example,
PrimitivesreusingTPC-Hdatasets) should override this method and return the lower-case identifier of the source benchmark. Benchmarks that produce their own data should returnNone(default).
- log_debug_info(context='Debug')¶
Log comprehensive debug information including version details.
- log_error_with_debug_info(error, context='Error')¶
Log an error with comprehensive debug information.
- log_operation_complete(operation, duration=None, details='')¶
- log_operation_start(operation, details='')¶
- log_verbose(message)¶
- log_version_warning()¶
Log version consistency warnings if any exist.
- log_very_verbose(message)¶
- property logger: Logger¶
Return the logger configured for the verbosity mixin consumer.
- quiet: bool = False¶
- run_benchmark(connection, query_ids=None, fetch_results=False, setup_database=True)¶
Run the complete benchmark suite.
- Parameters:
connection (DatabaseConnection) – Database connection to execute queries on
query_ids (list[int | str] | None) – Optional list of specific query IDs to run (defaults to all)
fetch_results (bool) – Whether to fetch and return query results
setup_database (bool) – Whether to set up the database first
- Returns:
benchmark_name: Name of the benchmark
total_execution_time: Total time for all queries
total_queries: Number of queries executed
successful_queries: Number of queries that succeeded
failed_queries: Number of queries that failed
query_results: List of individual query results
setup_time: Time taken for database setup (if performed)
- Return type:
Dictionary containing
- Raises:
Exception – If benchmark execution fails
- run_query(query_id, connection, params=None, fetch_results=False)¶
Execute single query and return timing and results.
- Parameters:
query_id (int | str) – ID of the query to execute
connection (DatabaseConnection) – Database connection to execute query on
params (dict[str, Any] | None) – Optional parameters for query customization
fetch_results (bool) – Whether to fetch and return query results
- Returns:
query_id: Executed query ID
execution_time: Time taken to execute query in seconds
query_text: Executed query text
results: Query results if fetch_results=True, otherwise None
row_count: Number of rows returned (if results fetched)
- Return type:
Dictionary containing
- Raises:
ValueError – If query_id is invalid
Exception – If query execution fails
- run_with_platform(platform_adapter, **run_config)¶
Run complete benchmark using platform-specific optimizations.
This method provides a unified interface for running benchmarks using database platform adapters that handle connection management, data loading optimizations, and query execution.
This is the standard method that all benchmarks should support for integration with the CLI and other orchestration tools.
- Parameters:
platform_adapter – Platform adapter instance (e.g., DuckDBAdapter)
**run_config – Configuration options: - categories: List of query categories to run (if benchmark supports) - query_subset: List of specific query IDs to run - connection: Connection configuration - benchmark_type: Type hint for optimizations (‘olap’, ‘oltp’, etc.)
- Returns:
BenchmarkResults object with execution results
Example
from benchbox.platforms import DuckDBAdapter
benchmark = SomeBenchmark(scale_factor=0.1) adapter = DuckDBAdapter() results = benchmark.run_with_platform(adapter)
- setup_database(connection)¶
Set up database with schema and data.
Creates necessary database schema and loads benchmark data into the database.
- Parameters:
connection (DatabaseConnection) – Database connection to set up
- Raises:
ValueError – If data generation fails
Exception – If database setup fails
- translate_query(query_id, dialect)¶
Translate a query to a specific SQL dialect.
- Parameters:
query_id (int | str) – The ID of the query to translate
dialect (str) – The target SQL dialect
- Returns:
The translated query string
- Raises:
ValueError – If the query_id is invalid
ImportError – If sqlglot is not installed
ValueError – If the dialect is not supported
- Return type:
str
- verbose: bool = False¶
- verbose_enabled: bool = False¶
- verbose_level: int = 0¶
- property verbosity_settings: VerbositySettings¶
Return the current verbosity settings.
- very_verbose: bool = False¶
Constructor¶
SSB(
scale_factor: float = 1.0,
output_dir: Optional[Union[str, Path]] = None,
**kwargs
)
Parameters:
scale_factor (float): Data size multiplier. SF=1.0 generates ~500MB (~6M rows in fact table). Range: 0.01 to 100+
output_dir (str|Path, optional): Directory for generated data files. Default: temporary directory
kwargs: Additional options (e.g., date_range_years, partition_fact_table)
Raises:
ValueError: If scale_factor is not positive
TypeError: If scale_factor is not a number
Methods¶
generate_data()¶
Generate Star Schema Benchmark data.
data_files = benchmark.generate_data()
print(f"Generated {len(data_files)} table files")
- Returns:
List[Union[str, Path]]: Paths to generated data files (5 tables: date, customer, supplier, part, lineorder)
get_query(query_id, \*, params=None)¶
Get a specific SSB query.
# Get Flight 1 query
q1_1 = benchmark.get_query("Q1.1")
# Get with custom parameters
q2_1 = benchmark.get_query("Q2.1", params={
"category": "MFGR#12",
"region": "AMERICA"
})
# Get Flight 3 query
q3_1 = benchmark.get_query("Q3.1")
Parameters:
query_id (str): Query ID (Q1.1 to Q4.3)
params (dict, optional): Query parameters for customization
- Returns:
str: Query SQL text
Raises:
ValueError: If query_id is invalid
get_queries(dialect=None)¶
Get all SSB benchmark queries.
# Get all queries
queries = benchmark.get_queries()
print(f"Total queries: {len(queries)}") # 13 queries
# Get with dialect translation
queries_ch = benchmark.get_queries(dialect="clickhouse")
Parameters:
dialect (str, optional): Target SQL dialect
- Returns:
dict[str, str]: Dictionary mapping query IDs to SQL text
get_schema()¶
Get SSB schema information.
schema = benchmark.get_schema()
for table in schema:
print(f"{table['name']}: {len(table['columns'])} columns")
- Returns:
list[dict]: List of table definitions (5 tables: date, customer, supplier, part, lineorder)
get_create_tables_sql(dialect=”standard”, tuning_config=None)¶
Get CREATE TABLE SQL for all SSB tables.
# Standard SQL
create_sql = benchmark.get_create_tables_sql()
# With dialect
create_sql_ch = benchmark.get_create_tables_sql(dialect="clickhouse")
# With tuning configuration
from benchbox.core.tuning.interface import UnifiedTuningConfiguration
tuning = UnifiedTuningConfiguration(...)
create_sql_tuned = benchmark.get_create_tables_sql(tuning_config=tuning)
Parameters:
dialect (str): Target SQL dialect. Default: “standard”
tuning_config (UnifiedTuningConfiguration, optional): Tuning settings
- Returns:
str: SQL script for creating all tables
Query Flights¶
SSB organizes 13 queries into 4 logical flights:
Flight 1: Simple Aggregation (Q1.1 - Q1.3)¶
Tests basic aggregation and filtering on the fact table.
# Flight 1 focuses on fact table scan performance
flight_1_queries = ["Q1.1", "Q1.2", "Q1.3"]
for query_id in flight_1_queries:
query = benchmark.get_query(query_id)
# Execute query...
Query Characteristics:
Q1.1: Year-based aggregation with discount and quantity filters
Q1.2: Month-based aggregation with quantity range
Q1.3: Week-based aggregation with refined quantity range
Flight 2: Customer-Supplier Analysis (Q2.1 - Q2.3)¶
Tests dimension table joins and drill-down analysis.
# Flight 2 tests star schema join performance
flight_2_queries = ["Q2.1", "Q2.2", "Q2.3"]
for query_id in flight_2_queries:
query = benchmark.get_query(query_id, params={
"category": "MFGR#12",
"region": "AMERICA"
})
# Execute query...
Query Characteristics:
Q2.1: Brand category analysis with supplier region
Q2.2: Brand range analysis with supplier region
Q2.3: Specific brand focus with supplier region
Flight 3: Customer Behavior Analysis (Q3.1 - Q3.4)¶
Tests complex multi-dimension analysis with customer geography.
# Flight 3 tests complex star joins
flight_3_queries = ["Q3.1", "Q3.2", "Q3.3", "Q3.4"]
for query_id in flight_3_queries:
query = benchmark.get_query(query_id)
# Execute query...
Query Characteristics:
Q3.1: Customer nation analysis
Q3.2: City-level analysis
Q3.3: Specific city analysis
Q3.4: Regional analysis
Flight 4: Profit Analysis (Q4.1 - Q4.3)¶
Tests complex aggregation with profit calculations.
# Flight 4 tests analytical queries with calculations
flight_4_queries = ["Q4.1", "Q4.2", "Q4.3"]
for query_id in flight_4_queries:
query = benchmark.get_query(query_id)
# Execute query...
Query Characteristics:
Q4.1: Regional profit analysis
Q4.2: Nation-specific profit trends
Q4.3: City-specific profit with part category
Usage Examples¶
Basic Benchmark Run¶
from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter
# Create benchmark with scale factor 1 (~500MB)
benchmark = SSB(scale_factor=1.0)
# Generate data
benchmark.generate_data()
# Run on DuckDB
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)
# Print results
print(f"Benchmark: {results.benchmark_name}")
print(f"Total time: {results.total_execution_time:.2f}s")
print(f"Queries: {results.successful_queries}/{results.total_queries}")
Flight-Based Execution¶
from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter
import time
benchmark = SSB(scale_factor=1.0)
benchmark.generate_data()
adapter = DuckDBAdapter()
conn = adapter.create_connection()
# Load data
adapter.create_schema(benchmark, conn)
adapter.load_data(benchmark, conn, benchmark.output_dir)
# Run Flight 1 (Simple Aggregation)
print("Flight 1: Simple Aggregation")
flight_1_queries = ["Q1.1", "Q1.2", "Q1.3"]
flight_1_times = {}
for query_id in flight_1_queries:
query = benchmark.get_query(query_id)
start = time.time()
result = adapter.execute_query(conn, query, query_id)
duration = time.time() - start
flight_1_times[query_id] = duration
print(f" {query_id}: {duration:.3f}s")
print(f"Flight 1 total: {sum(flight_1_times.values()):.2f}s")
Parameterized Query Execution¶
from benchbox.ssb import SSB
benchmark = SSB(scale_factor=1.0)
# Define custom parameters
custom_params = {
"year": 1993,
"year_month": 199401,
"week": 6,
"discount_min": 1,
"discount_max": 3,
"quantity": 25,
"quantity_min": 26,
"quantity_max": 35,
"category": "MFGR#12",
"region": "AMERICA",
"brand_min": "MFGR#2221",
"brand_max": "MFGR#2228",
"brand": "MFGR#2221"
}
# Get queries with custom parameters
q1_1 = benchmark.get_query("Q1.1", params=custom_params)
q2_1 = benchmark.get_query("Q2.1", params=custom_params)
print("Q1.1 with custom year:")
print(q1_1[:200])
Complete Flight Benchmark¶
from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter
import time
benchmark = SSB(scale_factor=1.0)
adapter = DuckDBAdapter()
# Setup
benchmark.generate_data()
conn = adapter.create_connection()
adapter.create_schema(benchmark, conn)
adapter.load_data(benchmark, conn, benchmark.output_dir)
# Define all flights
flights = {
"Flight 1": ["Q1.1", "Q1.2", "Q1.3"],
"Flight 2": ["Q2.1", "Q2.2", "Q2.3"],
"Flight 3": ["Q3.1", "Q3.2", "Q3.3", "Q3.4"],
"Flight 4": ["Q4.1", "Q4.2", "Q4.3"]
}
# Run all flights
all_results = {}
for flight_name, query_ids in flights.items():
print(f"\n{flight_name}:")
flight_results = {}
for query_id in query_ids:
query = benchmark.get_query(query_id)
start = time.time()
result = adapter.execute_query(conn, query, query_id)
duration = time.time() - start
flight_results[query_id] = {
"duration": duration,
"status": result["status"],
"rows": result.get("rows_returned", 0)
}
print(f" {query_id}: {duration:.3f}s ({result['status']})")
all_results[flight_name] = flight_results
# Summary
print("\n" + "="*60)
print("SSB Benchmark Summary")
print("="*60)
for flight_name, flight_results in all_results.items():
total_time = sum(r["duration"] for r in flight_results.values())
print(f"{flight_name}: {total_time:.2f}s total")
Multi-Platform Comparison¶
from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter
from benchbox.platforms.clickhouse import ClickHouseAdapter
import pandas as pd
benchmark = SSB(scale_factor=1.0, output_dir="./data/ssb_sf1")
benchmark.generate_data()
platforms = {
"DuckDB": DuckDBAdapter(),
"ClickHouse": ClickHouseAdapter(host="localhost"),
}
results_data = []
for name, adapter in platforms.items():
print(f"\nBenchmarking {name}...")
results = benchmark.run_with_platform(adapter)
results_data.append({
"platform": name,
"total_time": results.total_execution_time,
"avg_query_time": results.average_query_time,
"successful": results.successful_queries,
"failed": results.failed_queries,
})
df = pd.DataFrame(results_data)
print("\nBenchmark Results:")
print(df)
Columnar Database Optimization¶
from benchbox.ssb import SSB
from benchbox.platforms.clickhouse import ClickHouseAdapter
# SSB is ideal for columnar databases
benchmark = SSB(scale_factor=10.0)
adapter = ClickHouseAdapter(host="localhost")
# Generate and load data
benchmark.generate_data()
conn = adapter.create_connection()
# Create schema with columnar optimizations
adapter.create_schema(benchmark, conn)
adapter.load_data(benchmark, conn, benchmark.output_dir)
# Add columnar-specific optimizations
conn.execute("""
-- Optimize fact table for columnar access
ALTER TABLE lineorder
ORDER BY (lo_orderdate, lo_custkey, lo_partkey);
""")
# Run Flight 1 (benefits most from columnar storage)
flight_1_queries = ["Q1.1", "Q1.2", "Q1.3"]
for query_id in flight_1_queries:
query = benchmark.get_query(query_id)
result = adapter.execute_query(conn, query, query_id)
print(f"{query_id}: {result['execution_time']:.3f}s")
Scale Factor Comparison¶
from benchbox.ssb import SSB
from benchbox.platforms.duckdb import DuckDBAdapter
import pandas as pd
adapter = DuckDBAdapter()
scale_factors = [0.1, 1.0, 10.0]
results_data = []
for sf in scale_factors:
print(f"\nTesting SF={sf}...")
benchmark = SSB(scale_factor=sf, output_dir=f"./data/ssb_sf{sf}")
benchmark.generate_data()
results = benchmark.run_with_platform(adapter)
results_data.append({
"scale_factor": sf,
"data_size_mb": sf * 500, # Approximate
"total_time": results.total_execution_time,
"avg_query_time": results.average_query_time,
"queries_per_sec": results.total_queries / results.total_execution_time
})
df = pd.DataFrame(results_data)
print("\nScale Factor Performance:")
print(df)
See Also¶
Benchmark APIs - Benchmark API overview
TPC-H Benchmark API - TPC-H benchmark API (normalized schema)
TPC-DS Benchmark API - TPC-DS benchmark API
Base Benchmark API - Base benchmark interface
Results API - Results API
Star Schema Benchmark (SSB) - SSB guide
TPC-H Benchmark - TPC-H guide (SSB is derived from TPC-H)
External Resources¶
Original SSB Paper - “Star Schema Benchmark” by O’Neil et al.
Star Schema Design - Star schema principles
OLAP Performance - Data warehouse benchmarking