"""Read Primitives benchmark implementation.
Copyright 2026 Joe Harris / BenchBox Project
This benchmark combines queries from multiple sources:
1. Apache Impala targeted-perf workload
(https://github.com/apache/impala/tree/master/testdata/workloads/targeted-perf)
Apache License 2.0, Copyright Apache Software Foundation
2. Optimizer sniff test concepts by Justin Jaffray
(https://buttondown.com/jaffray/archive/a-sniff-test-for-some-query-optimizers/)
Data generation uses the TPC-H schema (TPC Benchmark H, Copyright Transaction
Processing Performance Council).
Licensed under the MIT License. See LICENSE file in the project root for details.
"""
from pathlib import Path
from typing import Any, Optional, Union
from benchbox.base import BaseBenchmark
from benchbox.core.read_primitives.benchmark import ReadPrimitivesBenchmark
[docs]
class ReadPrimitives(BaseBenchmark):
"""Read Primitives benchmark implementation.
Provides Read Primitives benchmark implementation, including data generation and access to 80+ primitive read operation queries that test fundamental database capabilities using the TPC-H schema.
The benchmark covers:
- Aggregation, joins, filters, window functions
- OLAP operations, statistical functions
- JSON operations, full-text search
- Time series analysis, array operations
- Graph operations, temporal queries
"""
[docs]
def __init__(
self,
scale_factor: float = 1.0,
output_dir: Optional[Union[str, Path]] = None,
**kwargs: Any,
) -> None:
"""Initialize Read Primitives benchmark instance.
Args:
scale_factor: Scale factor for the benchmark (1.0 = ~6M lineitem rows)
output_dir: Directory to output generated data files
**kwargs: Additional implementation-specific options
"""
super().__init__(scale_factor=scale_factor, output_dir=output_dir, **kwargs)
# Initialize the actual implementation
verbose = kwargs.pop("verbose", False)
self._impl = ReadPrimitivesBenchmark(
scale_factor=scale_factor, output_dir=output_dir, verbose=verbose, **kwargs
)
[docs]
def generate_data(self, tables: Optional[list[str]] = None) -> dict[str, str]:
"""Generate Read Primitives benchmark data.
Args:
tables: Optional list of table names to generate. If None, generates all.
Returns:
A dictionary mapping table names to file paths
"""
# Call the implementation to generate data
self._impl.generate_data(tables)
# Return the tables dictionary (mapping table names to file paths)
return self._impl.tables
[docs]
def get_queries(self, dialect: Optional[str] = None) -> dict[str, str]:
"""Get all Read Primitives benchmark queries.
Args:
dialect: Target SQL dialect for query translation. If None, returns original queries.
Returns:
A dictionary mapping query IDs to query strings
"""
return self._impl.get_queries(dialect=dialect)
[docs]
def get_query(self, query_id: Union[int, str], *, params: Optional[dict[str, Any]] = None) -> str:
"""Get a specific Read Primitives benchmark query.
Args:
query_id: The ID of the query to retrieve (e.g., 'aggregation_simple')
params: Optional parameters to customize the query
Returns:
The query string
Raises:
ValueError: If the query_id is invalid
"""
return self._impl.get_query(query_id, params=params)
[docs]
def get_queries_by_category(self, category: str) -> dict[str, str]:
"""Get queries filtered by category.
Args:
category: Category name (e.g., 'aggregation', 'window', 'join')
Returns:
Dictionary mapping query IDs to SQL text for the category
"""
return self._impl.get_queries_by_category(category)
[docs]
def get_query_categories(self) -> list[str]:
"""Get list of available query categories.
Returns:
List of category names
"""
return self._impl.get_query_categories()
[docs]
def get_schema(self) -> dict[str, dict]:
"""Get the Read Primitives benchmark schema (TPC-H).
Returns:
A dictionary mapping table names to their schema definitions
"""
return self._impl.get_schema()
[docs]
def get_create_tables_sql(self, dialect: str = "standard", tuning_config=None) -> str:
"""Get SQL to create all Read Primitives benchmark tables.
Args:
dialect: SQL dialect to use
tuning_config: Unified tuning configuration for constraint settings
Returns:
SQL script for creating all tables
"""
return self._impl.get_create_tables_sql(dialect=dialect, tuning_config=tuning_config)
[docs]
def load_data_to_database(self, connection: Any, tables: Optional[list[str]] = None) -> None:
"""Load generated data into a database.
Args:
connection: Database connection
tables: Optional list of tables to load. If None, loads all.
Raises:
ValueError: If data hasn't been generated yet
"""
return self._impl.load_data_to_database(connection, tables)
[docs]
def execute_query(self, query_id: str, connection: Any, params: Optional[dict[str, Any]] = None) -> Any:
"""Execute a Read Primitives query on the given database connection.
Args:
query_id: Query identifier (e.g., 'aggregation_simple')
connection: Database connection to use for execution
params: Optional parameters to use in the query
Returns:
Query results from the database
Raises:
ValueError: If the query_id is not valid
"""
return self._impl.execute_query(query_id, connection, params)
[docs]
def run_benchmark(
self,
connection: Any,
queries: Optional[list[str]] = None,
iterations: int = 1,
categories: Optional[list[str]] = None,
) -> dict[str, Any]:
"""Run the complete Read Primitives benchmark.
Args:
connection: Database connection to use
queries: Optional list of query IDs to run. If None, runs all.
iterations: Number of times to run each query
categories: Optional list of categories to run. If specified, overrides queries.
Returns:
Dictionary containing benchmark results
"""
return self._impl.run_benchmark(connection, queries, iterations, categories)
[docs]
def run_category_benchmark(self, connection: Any, category: str, iterations: int = 1) -> dict[str, Any]:
"""Run benchmark for a specific query category.
Args:
connection: Database connection to use
category: Category name to run (e.g., 'aggregation', 'window', 'join')
iterations: Number of times to run each query
Returns:
Dictionary containing benchmark results for the category
"""
return self._impl.run_category_benchmark(connection, category, iterations)
[docs]
def get_benchmark_info(self) -> dict[str, Any]:
"""Get information about the benchmark.
Returns:
Dictionary containing benchmark metadata
"""
return self._impl.get_benchmark_info()