Read Primitives Benchmark API¶
Complete Python API reference for the Read Primitives benchmark.
Overview¶
The Read Primitives benchmark tests 80+ fundamental database operations using the TPC-H schema. It provides focused testing of specific database capabilities without business logic complexity, ideal for performance regression detection and optimization validation.
Key Features:
80+ primitive operation queries - Fundamental database operations
13 operation categories - Aggregation, joins, filters, window functions, and more
TPC-H schema - Proven data model
Category-based execution - Test specific operation types
Performance regression detection - Isolated operation testing
Extensible architecture - Custom primitives support
Quick Start¶
from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter
# Create benchmark
benchmark = ReadPrimitives(scale_factor=1.0)
# Generate data
benchmark.generate_data()
# Run on platform
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)
print(f"Completed {results.total_queries} queries in {results.total_execution_time:.2f}s")
API Reference¶
ReadPrimitives Class¶
- class ReadPrimitives(scale_factor=1.0, output_dir=None, **kwargs)[source]¶
Bases:
BaseBenchmarkRead Primitives benchmark implementation.
Provides Read Primitives benchmark implementation, including data generation and access to 80+ primitive read operation queries that test fundamental database capabilities using the TPC-H schema.
The benchmark covers: - Aggregation, joins, filters, window functions - OLAP operations, statistical functions - JSON operations, full-text search - Time series analysis, array operations - Graph operations, temporal queries
- __init__(scale_factor=1.0, output_dir=None, **kwargs)[source]¶
Initialize Read Primitives benchmark instance.
- Parameters:
scale_factor (float) – Scale factor for the benchmark (1.0 = ~6M lineitem rows)
output_dir (str | Path | None) – Directory to output generated data files
**kwargs (Any) – Additional implementation-specific options
- generate_data(tables=None)[source]¶
Generate Read Primitives benchmark data.
- Parameters:
tables (list[str] | None) – Optional list of table names to generate. If None, generates all.
- Returns:
A dictionary mapping table names to file paths
- Return type:
dict[str, str]
- get_queries(dialect=None)[source]¶
Get all Read Primitives benchmark queries.
- Parameters:
dialect (str | None) – Target SQL dialect for query translation. If None, returns original queries.
- Returns:
A dictionary mapping query IDs to query strings
- Return type:
dict[str, str]
- get_query(query_id, *, params=None)[source]¶
Get a specific Read Primitives benchmark query.
- Parameters:
query_id (int | str) – The ID of the query to retrieve (e.g., ‘aggregation_simple’)
params (dict[str, Any] | None) – Optional parameters to customize the query
- Returns:
The query string
- Raises:
ValueError – If the query_id is invalid
- Return type:
str
- get_queries_by_category(category)[source]¶
Get queries filtered by category.
- Parameters:
category (str) – Category name (e.g., ‘aggregation’, ‘window’, ‘join’)
- Returns:
Dictionary mapping query IDs to SQL text for the category
- Return type:
dict[str, str]
- get_query_categories()[source]¶
Get list of available query categories.
- Returns:
List of category names
- Return type:
list[str]
- get_schema()[source]¶
Get the Read Primitives benchmark schema (TPC-H).
- Returns:
A dictionary mapping table names to their schema definitions
- Return type:
dict[str, dict]
- get_create_tables_sql(dialect='standard', tuning_config=None)[source]¶
Get SQL to create all Read Primitives benchmark tables.
- Parameters:
dialect (str) – SQL dialect to use
tuning_config – Unified tuning configuration for constraint settings
- Returns:
SQL script for creating all tables
- Return type:
str
- load_data_to_database(connection, tables=None)[source]¶
Load generated data into a database.
- Parameters:
connection (Any) – Database connection
tables (list[str] | None) – Optional list of tables to load. If None, loads all.
- Raises:
ValueError – If data hasn’t been generated yet
- execute_query(query_id, connection, params=None)[source]¶
Execute a Read Primitives query on the given database connection.
- Parameters:
query_id (str) – Query identifier (e.g., ‘aggregation_simple’)
connection (Any) – Database connection to use for execution
params (dict[str, Any] | None) – Optional parameters to use in the query
- Returns:
Query results from the database
- Raises:
ValueError – If the query_id is not valid
- Return type:
Any
- run_benchmark(connection, queries=None, iterations=1, categories=None)[source]¶
Run the complete Read Primitives benchmark.
- Parameters:
connection (Any) – Database connection to use
queries (list[str] | None) – Optional list of query IDs to run. If None, runs all.
iterations (int) – Number of times to run each query
categories (list[str] | None) – Optional list of categories to run. If specified, overrides queries.
- Returns:
Dictionary containing benchmark results
- Return type:
dict[str, Any]
- run_category_benchmark(connection, category, iterations=1)[source]¶
Run benchmark for a specific query category.
- Parameters:
connection (Any) – Database connection to use
category (str) – Category name to run (e.g., ‘aggregation’, ‘window’, ‘join’)
iterations (int) – Number of times to run each query
- Returns:
Dictionary containing benchmark results for the category
- Return type:
dict[str, Any]
- get_benchmark_info()[source]¶
Get information about the benchmark.
- Returns:
Dictionary containing benchmark metadata
- Return type:
dict[str, Any]
- apply_verbosity(settings)¶
Apply verbosity settings to the mixin consumer.
- property benchmark_name: str¶
Get the human-readable benchmark name.
- create_enhanced_benchmark_result(platform, query_results, execution_metadata=None, phases=None, resource_utilization=None, performance_characteristics=None, **kwargs)¶
Create a BenchmarkResults object with standardized fields.
This centralizes the logic for creating benchmark results that was previously duplicated across platform adapters and CLI orchestrator.
- Parameters:
platform (str) – Platform name (e.g., “DuckDB”, “ClickHouse”)
query_results (list[dict[str, Any]]) – List of query execution results
execution_metadata (dict[str, Any] | None) – Optional execution metadata
phases (dict[str, dict[str, Any]] | None) – Optional phase tracking information
resource_utilization (dict[str, Any] | None) – Optional resource usage metrics
performance_characteristics (dict[str, Any] | None) – Optional performance analysis
**kwargs (Any) – Additional fields to override defaults
- Returns:
Fully configured BenchmarkResults object
- Return type:
- format_results(benchmark_result)¶
Format benchmark results for display.
- Parameters:
benchmark_result (dict[str, Any]) – Result dictionary from run_benchmark()
- Returns:
Formatted string representation of the results
- Return type:
str
- get_data_source_benchmark()¶
Return the canonical source benchmark when data is shared.
Benchmarks that reuse data generated by another benchmark (for example,
PrimitivesreusingTPC-Hdatasets) should override this method and return the lower-case identifier of the source benchmark. Benchmarks that produce their own data should returnNone(default).
- log_debug_info(context='Debug')¶
Log comprehensive debug information including version details.
- log_error_with_debug_info(error, context='Error')¶
Log an error with comprehensive debug information.
- log_operation_complete(operation, duration=None, details='')¶
- log_operation_start(operation, details='')¶
- log_verbose(message)¶
- log_version_warning()¶
Log version consistency warnings if any exist.
- log_very_verbose(message)¶
- property logger: Logger¶
Return the logger configured for the verbosity mixin consumer.
- quiet: bool = False¶
- run_query(query_id, connection, params=None, fetch_results=False)¶
Execute single query and return timing and results.
- Parameters:
query_id (int | str) – ID of the query to execute
connection (DatabaseConnection) – Database connection to execute query on
params (dict[str, Any] | None) – Optional parameters for query customization
fetch_results (bool) – Whether to fetch and return query results
- Returns:
query_id: Executed query ID
execution_time: Time taken to execute query in seconds
query_text: Executed query text
results: Query results if fetch_results=True, otherwise None
row_count: Number of rows returned (if results fetched)
- Return type:
Dictionary containing
- Raises:
ValueError – If query_id is invalid
Exception – If query execution fails
- run_with_platform(platform_adapter, **run_config)¶
Run complete benchmark using platform-specific optimizations.
This method provides a unified interface for running benchmarks using database platform adapters that handle connection management, data loading optimizations, and query execution.
This is the standard method that all benchmarks should support for integration with the CLI and other orchestration tools.
- Parameters:
platform_adapter – Platform adapter instance (e.g., DuckDBAdapter)
**run_config – Configuration options: - categories: List of query categories to run (if benchmark supports) - query_subset: List of specific query IDs to run - connection: Connection configuration - benchmark_type: Type hint for optimizations (‘olap’, ‘oltp’, etc.)
- Returns:
BenchmarkResults object with execution results
Example
from benchbox.platforms import DuckDBAdapter
benchmark = SomeBenchmark(scale_factor=0.1) adapter = DuckDBAdapter() results = benchmark.run_with_platform(adapter)
- setup_database(connection)¶
Set up database with schema and data.
Creates necessary database schema and loads benchmark data into the database.
- Parameters:
connection (DatabaseConnection) – Database connection to set up
- Raises:
ValueError – If data generation fails
Exception – If database setup fails
- translate_query(query_id, dialect)¶
Translate a query to a specific SQL dialect.
- Parameters:
query_id (int | str) – The ID of the query to translate
dialect (str) – The target SQL dialect
- Returns:
The translated query string
- Raises:
ValueError – If the query_id is invalid
ImportError – If sqlglot is not installed
ValueError – If the dialect is not supported
- Return type:
str
- verbose: bool = False¶
- verbose_enabled: bool = False¶
- verbose_level: int = 0¶
- property verbosity_settings: VerbositySettings¶
Return the current verbosity settings.
- very_verbose: bool = False¶
Constructor¶
ReadPrimitives(
scale_factor: float = 1.0,
output_dir: Optional[Union[str, Path]] = None,
verbose: bool = False,
**kwargs
)
Parameters:
scale_factor (float): Data size multiplier. SF=1.0 generates ~6M lineitem rows (~1GB). Range: 0.01 to 10+
output_dir (str|Path, optional): Directory for generated data files. Default: temporary directory
verbose (bool): Enable verbose logging. Default: False
kwargs: Additional options
Raises:
ValueError: If scale_factor is not positive
TypeError: If scale_factor is not a number
Methods¶
generate_data(tables=None)¶
Generate Read Primitives benchmark data (TPC-H schema).
# Generate all tables
table_files = benchmark.generate_data()
# Generate specific tables
table_files = benchmark.generate_data(tables=["lineitem", "orders"])
for table_name, file_path in table_files.items():
print(f"{table_name}: {file_path}")
Parameters:
tables (list[str], optional): Table names to generate. If None, generates all tables
- Returns:
dict[str, str]: Dictionary mapping table names to file paths
get_query(query_id, \*, params=None)¶
Get a specific primitive query.
# Get simple aggregation query
q1 = benchmark.get_query("aggregation_simple")
# Get window function query
q2 = benchmark.get_query("window_rank")
# Get join query
q3 = benchmark.get_query("join_inner")
Parameters:
query_id (int|str): Query identifier (e.g., “aggregation_simple”, “window_rank”)
params (dict, optional): Query parameters
- Returns:
str: Query SQL text
Raises:
ValueError: If query_id is invalid
get_queries(dialect=None)¶
Get all primitive queries.
# Get all queries
queries = benchmark.get_queries()
print(f"Total queries: {len(queries)}")
# Get with dialect translation
queries_sf = benchmark.get_queries(dialect="snowflake")
Parameters:
dialect (str, optional): Target SQL dialect for translation
- Returns:
dict[str, str]: Dictionary mapping query IDs to SQL text
get_queries_by_category(category)¶
Get queries filtered by category.
# Get all aggregation queries
agg_queries = benchmark.get_queries_by_category("aggregation")
# Get all window function queries
window_queries = benchmark.get_queries_by_category("window")
# Get all join queries
join_queries = benchmark.get_queries_by_category("join")
Parameters:
category (str): Category name (e.g., “aggregation”, “window”, “join”)
- Returns:
dict[str, str]: Dictionary mapping query IDs to SQL text for the category
Raises:
ValueError: If category is invalid
get_query_categories()¶
Get list of available query categories.
categories = benchmark.get_query_categories()
print("Available categories:")
for category in categories:
queries = benchmark.get_queries_by_category(category)
print(f" {category}: {len(queries)} queries")
- Returns:
list[str]: List of category names
Available Categories:
aggregation: SUM, COUNT, AVG, MIN, MAX operations
window: Window functions (ROW_NUMBER, RANK, DENSE_RANK, LAG, LEAD)
join: Inner, left, right, full outer joins
filter: WHERE clause filtering operations
groupby: GROUP BY and HAVING operations
orderby: ORDER BY and TOP-N operations
subquery: Subquery and correlated subquery operations
union: UNION, UNION ALL, INTERSECT, EXCEPT operations
cte: Common table expressions (WITH clauses)
string: String manipulation functions
date: Date/time operations
numeric: Numeric calculations and functions
analytical: Advanced analytical operations
get_schema()¶
Get Read Primitives benchmark schema (TPC-H).
schema = benchmark.get_schema()
for table_name, table_def in schema.items():
print(f"{table_name}: {len(table_def['columns'])} columns")
- Returns:
dict[str, dict]: Dictionary mapping table names to schema definitions
get_create_tables_sql(dialect=”standard”, tuning_config=None)¶
Get CREATE TABLE SQL for all tables.
# Standard SQL
create_sql = benchmark.get_create_tables_sql()
# With dialect
create_sql_pg = benchmark.get_create_tables_sql(dialect="postgres")
# With tuning configuration
from benchbox.core.tuning.interface import UnifiedTuningConfiguration
tuning = UnifiedTuningConfiguration(...)
create_sql_tuned = benchmark.get_create_tables_sql(tuning_config=tuning)
Parameters:
dialect (str): Target SQL dialect. Default: “standard”
tuning_config (UnifiedTuningConfiguration, optional): Tuning settings
- Returns:
str: SQL script for creating all tables
run_benchmark(connection, queries=None, iterations=1, categories=None)¶
Run the complete Read Primitives benchmark.
from benchbox.platforms.duckdb import DuckDBAdapter
adapter = DuckDBAdapter()
conn = adapter.create_connection()
# Run all queries
results = benchmark.run_benchmark(conn)
# Run specific queries
results = benchmark.run_benchmark(
conn,
queries=["aggregation_simple", "window_rank"],
iterations=3
)
# Run specific categories
results = benchmark.run_benchmark(
conn,
categories=["aggregation", "window"],
iterations=3
)
Parameters:
connection (Any): Database connection
queries (list[str], optional): Query IDs to run. If None, runs all
iterations (int): Number of times to run each query. Default: 1
categories (list[str], optional): Categories to run. Overrides queries parameter
- Returns:
dict[str, Any]: Benchmark results with timing and status
run_category_benchmark(connection, category, iterations=1)¶
Run benchmark for a specific query category.
# Run all aggregation queries
agg_results = benchmark.run_category_benchmark(conn, "aggregation", iterations=3)
# Run all window function queries
window_results = benchmark.run_category_benchmark(conn, "window", iterations=3)
print(f"Aggregation results: {agg_results}")
Parameters:
connection (Any): Database connection
category (str): Category name (e.g., “aggregation”, “window”)
iterations (int): Number of times to run each query. Default: 1
- Returns:
dict[str, Any]: Benchmark results for the category
load_data_to_database(connection, tables=None)¶
Load generated data into a database.
# Load all tables
benchmark.load_data_to_database(conn)
# Load specific tables
benchmark.load_data_to_database(conn, tables=["lineitem", "orders"])
Parameters:
connection (Any): Database connection
tables (list[str], optional): Tables to load. If None, loads all
Raises:
ValueError: If data hasn’t been generated yet
execute_query(query_id, connection, params=None)¶
Execute a primitive query on the database.
# Execute single query
result = benchmark.execute_query("aggregation_simple", conn)
# Execute with custom parameters
result = benchmark.execute_query(
"aggregation_simple",
conn,
params={"threshold": 100}
)
Parameters:
query_id (str): Query identifier
connection (Any): Database connection
params (dict, optional): Query parameters
- Returns:
Any: Query results from the database
Raises:
ValueError: If query_id is invalid
get_benchmark_info()¶
Get information about the benchmark.
info = benchmark.get_benchmark_info()
print(f"Benchmark: {info['name']}")
print(f"Total queries: {info['total_queries']}")
print(f"Categories: {', '.join(info['categories'])}")
print(f"Scale factor: {info['scale_factor']}")
- Returns:
dict[str, Any]: Benchmark metadata
Usage Examples¶
Basic Benchmark Run¶
from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter
# Create benchmark with scale factor 0.1 (~100MB)
benchmark = ReadPrimitives(scale_factor=0.1)
# Generate data
benchmark.generate_data()
# Run on DuckDB
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)
# Print results
print(f"Benchmark: {results.benchmark_name}")
print(f"Total time: {results.total_execution_time:.2f}s")
print(f"Queries: {results.successful_queries}/{results.total_queries}")
Category-Based Execution¶
from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter
benchmark = ReadPrimitives(scale_factor=0.1)
benchmark.generate_data()
adapter = DuckDBAdapter()
conn = adapter.create_connection()
# Load data
adapter.create_schema(benchmark, conn)
benchmark.load_data_to_database(conn)
# Get all categories
categories = benchmark.get_query_categories()
# Run each category
category_results = {}
for category in categories:
print(f"\nRunning {category} category...")
results = benchmark.run_category_benchmark(conn, category, iterations=3)
category_results[category] = {
"total_time": results["total_time"],
"avg_time": results["average_time"],
"query_count": results["query_count"],
"successful": results["successful_queries"]
}
print(f" Total time: {results['total_time']:.2f}s")
print(f" Avg query time: {results['average_time']:.3f}s")
print(f" Successful: {results['successful_queries']}/{results['query_count']}")
# Print category summary
print("\n" + "="*60)
print("Category Performance Summary")
print("="*60)
for category, stats in sorted(category_results.items()):
print(f"{category:15s}: {stats['avg_time']:6.3f}s avg ({stats['query_count']} queries)")
Selective Query Execution¶
from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter
import time
benchmark = ReadPrimitives(scale_factor=0.1)
adapter = DuckDBAdapter()
# Setup
benchmark.generate_data()
conn = adapter.create_connection()
adapter.create_schema(benchmark, conn)
benchmark.load_data_to_database(conn)
# Run specific queries
selected_queries = [
"aggregation_simple",
"aggregation_group",
"window_rank",
"join_inner",
"filter_simple"
]
print("Running selected primitive queries:")
query_times = {}
for query_id in selected_queries:
query = benchmark.get_query(query_id)
start = time.time()
result = benchmark.execute_query(query_id, conn)
duration = time.time() - start
query_times[query_id] = duration
print(f" {query_id:25s}: {duration:.3f}s")
# Summary
total_time = sum(query_times.values())
avg_time = total_time / len(query_times)
print(f"\nTotal: {total_time:.2f}s, Average: {avg_time:.3f}s")
Performance Regression Testing¶
from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter
import json
benchmark = ReadPrimitives(scale_factor=0.1)
adapter = DuckDBAdapter()
# Setup
benchmark.generate_data()
conn = adapter.create_connection()
adapter.create_schema(benchmark, conn)
benchmark.load_data_to_database(conn)
# Run baseline benchmark
print("Running baseline benchmark...")
baseline_results = benchmark.run_benchmark(conn, iterations=3)
# Save baseline
with open("baseline_results.json", "w") as f:
json.dump(baseline_results, f, indent=2)
# ... make changes to database configuration ...
# Run comparison benchmark
print("\nRunning comparison benchmark...")
comparison_results = benchmark.run_benchmark(conn, iterations=3)
# Compare results
print("\nPerformance Comparison:")
print(f"{'Query':<30s} {'Baseline':<12s} {'Current':<12s} {'Change':<10s}")
print("-" * 70)
for query_id in baseline_results.keys():
if query_id in ["total_time", "average_time"]:
continue
baseline_time = baseline_results[query_id]["average_time"]
current_time = comparison_results[query_id]["average_time"]
change_pct = ((current_time - baseline_time) / baseline_time) * 100
status = "REGRESSION" if change_pct > 10 else "OK"
print(f"{query_id:<30s} {baseline_time:>10.3f}s {current_time:>10.3f}s "
f"{change_pct:>+7.1f}% {status}")
Multi-Category Analysis¶
from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter
import pandas as pd
benchmark = ReadPrimitives(scale_factor=0.1)
adapter = DuckDBAdapter()
# Setup
benchmark.generate_data()
conn = adapter.create_connection()
adapter.create_schema(benchmark, conn)
benchmark.load_data_to_database(conn)
# Run selected categories
categories_to_test = ["aggregation", "window", "join", "filter"]
results_data = []
for category in categories_to_test:
print(f"\nTesting {category} category...")
results = benchmark.run_category_benchmark(conn, category, iterations=3)
results_data.append({
"category": category,
"total_time": results["total_time"],
"avg_time": results["average_time"],
"query_count": results["query_count"],
"successful": results["successful_queries"],
"failed": results["failed_queries"],
"success_rate": (results["successful_queries"] / results["query_count"]) * 100
})
# Create DataFrame for analysis
df = pd.DataFrame(results_data)
print("\nCategory Analysis:")
print(df.to_string(index=False))
# Identify slowest categories
df_sorted = df.sort_values("avg_time", ascending=False)
print(f"\nSlowest categories:")
for idx, row in df_sorted.head(3).iterrows():
print(f" {row['category']}: {row['avg_time']:.3f}s average")
Multi-Platform Comparison¶
from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter
from benchbox.platforms.clickhouse import ClickHouseAdapter
import pandas as pd
benchmark = ReadPrimitives(scale_factor=0.1, output_dir="./data/primitives")
benchmark.generate_data()
platforms = {
"DuckDB": DuckDBAdapter(),
"ClickHouse": ClickHouseAdapter(host="localhost"),
}
results_data = []
for name, adapter in platforms.items():
print(f"\nBenchmarking {name}...")
results = benchmark.run_with_platform(adapter)
results_data.append({
"platform": name,
"total_time": results.total_execution_time,
"avg_query_time": results.average_query_time,
"successful": results.successful_queries,
"failed": results.failed_queries,
})
df = pd.DataFrame(results_data)
print("\nPlatform Comparison:")
print(df)
Custom Primitive Development¶
from benchbox.read_primitives import ReadPrimitives
# Extend with custom primitives
class CustomPrimitives(ReadPrimitives):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Add custom queries
self._custom_queries = {
"custom_complex_join": """
SELECT l.l_orderkey, o.o_custkey, c.c_name, p.p_name
FROM lineitem l
JOIN orders o ON l.l_orderkey = o.o_orderkey
JOIN customer c ON o.o_custkey = c.c_custkey
JOIN part p ON l.l_partkey = p.p_partkey
WHERE l.l_shipdate > '1995-01-01'
LIMIT 100
""",
"custom_window_aggregation": """
SELECT
l_orderkey,
l_partkey,
l_quantity,
SUM(l_quantity) OVER (PARTITION BY l_partkey) as total_qty,
AVG(l_quantity) OVER (PARTITION BY l_partkey) as avg_qty
FROM lineitem
WHERE l_shipdate > '1995-01-01'
LIMIT 1000
"""
}
def get_query(self, query_id, *, params=None):
# Check custom queries first
if query_id in self._custom_queries:
return self._custom_queries[query_id]
# Fall back to standard primitives
return super().get_query(query_id, params=params)
# Use custom primitives
custom_bench = CustomPrimitives(scale_factor=0.1)
custom_bench.generate_data()
# Execute custom query
custom_query = custom_bench.get_query("custom_complex_join")
print(f"Custom query: {custom_query}")
See Also¶
Benchmark APIs - Benchmark API overview
TPC-H Benchmark API - TPC-H benchmark API (same schema)
TPC-DS Benchmark API - TPC-DS benchmark API
ClickBench Benchmark API - ClickBench benchmark API
Base Benchmark API - Base benchmark interface
Results API - Results API
/benchmarks/primitives - Primitives implementation plan
External Resources¶
TPC-H Specification - Schema specification
Database Testing Best Practices - VLDB resources