Read Primitives Benchmark API

Tags reference python-api custom-benchmark

Complete Python API reference for the Read Primitives benchmark.

Overview

The Read Primitives benchmark tests 80+ fundamental database operations using the TPC-H schema. It provides focused testing of specific database capabilities without business logic complexity, ideal for performance regression detection and optimization validation.

Key Features:

  • 80+ primitive operation queries - Fundamental database operations

  • 13 operation categories - Aggregation, joins, filters, window functions, and more

  • TPC-H schema - Proven data model

  • Category-based execution - Test specific operation types

  • Performance regression detection - Isolated operation testing

  • Extensible architecture - Custom primitives support

Quick Start

from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter

# Create benchmark
benchmark = ReadPrimitives(scale_factor=1.0)

# Generate data
benchmark.generate_data()

# Run on platform
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)

print(f"Completed {results.total_queries} queries in {results.total_execution_time:.2f}s")

API Reference

ReadPrimitives Class

class ReadPrimitives(scale_factor=1.0, output_dir=None, **kwargs)[source]

Bases: BaseBenchmark

Read Primitives benchmark implementation.

Provides Read Primitives benchmark implementation, including data generation and access to 80+ primitive read operation queries that test fundamental database capabilities using the TPC-H schema.

The benchmark covers: - Aggregation, joins, filters, window functions - OLAP operations, statistical functions - JSON operations, full-text search - Time series analysis, array operations - Graph operations, temporal queries

__init__(scale_factor=1.0, output_dir=None, **kwargs)[source]

Initialize Read Primitives benchmark instance.

Parameters:
  • scale_factor (float) – Scale factor for the benchmark (1.0 = ~6M lineitem rows)

  • output_dir (str | Path | None) – Directory to output generated data files

  • **kwargs (Any) – Additional implementation-specific options

generate_data(tables=None)[source]

Generate Read Primitives benchmark data.

Parameters:

tables (list[str] | None) – Optional list of table names to generate. If None, generates all.

Returns:

A dictionary mapping table names to file paths

Return type:

dict[str, str]

get_queries(dialect=None)[source]

Get all Read Primitives benchmark queries.

Parameters:

dialect (str | None) – Target SQL dialect for query translation. If None, returns original queries.

Returns:

A dictionary mapping query IDs to query strings

Return type:

dict[str, str]

get_query(query_id, *, params=None)[source]

Get a specific Read Primitives benchmark query.

Parameters:
  • query_id (int | str) – The ID of the query to retrieve (e.g., ‘aggregation_simple’)

  • params (dict[str, Any] | None) – Optional parameters to customize the query

Returns:

The query string

Raises:

ValueError – If the query_id is invalid

Return type:

str

get_queries_by_category(category)[source]

Get queries filtered by category.

Parameters:

category (str) – Category name (e.g., ‘aggregation’, ‘window’, ‘join’)

Returns:

Dictionary mapping query IDs to SQL text for the category

Return type:

dict[str, str]

get_query_categories()[source]

Get list of available query categories.

Returns:

List of category names

Return type:

list[str]

get_schema()[source]

Get the Read Primitives benchmark schema (TPC-H).

Returns:

A dictionary mapping table names to their schema definitions

Return type:

dict[str, dict]

get_create_tables_sql(dialect='standard', tuning_config=None)[source]

Get SQL to create all Read Primitives benchmark tables.

Parameters:
  • dialect (str) – SQL dialect to use

  • tuning_config – Unified tuning configuration for constraint settings

Returns:

SQL script for creating all tables

Return type:

str

load_data_to_database(connection, tables=None)[source]

Load generated data into a database.

Parameters:
  • connection (Any) – Database connection

  • tables (list[str] | None) – Optional list of tables to load. If None, loads all.

Raises:

ValueError – If data hasn’t been generated yet

execute_query(query_id, connection, params=None)[source]

Execute a Read Primitives query on the given database connection.

Parameters:
  • query_id (str) – Query identifier (e.g., ‘aggregation_simple’)

  • connection (Any) – Database connection to use for execution

  • params (dict[str, Any] | None) – Optional parameters to use in the query

Returns:

Query results from the database

Raises:

ValueError – If the query_id is not valid

Return type:

Any

run_benchmark(connection, queries=None, iterations=1, categories=None)[source]

Run the complete Read Primitives benchmark.

Parameters:
  • connection (Any) – Database connection to use

  • queries (list[str] | None) – Optional list of query IDs to run. If None, runs all.

  • iterations (int) – Number of times to run each query

  • categories (list[str] | None) – Optional list of categories to run. If specified, overrides queries.

Returns:

Dictionary containing benchmark results

Return type:

dict[str, Any]

run_category_benchmark(connection, category, iterations=1)[source]

Run benchmark for a specific query category.

Parameters:
  • connection (Any) – Database connection to use

  • category (str) – Category name to run (e.g., ‘aggregation’, ‘window’, ‘join’)

  • iterations (int) – Number of times to run each query

Returns:

Dictionary containing benchmark results for the category

Return type:

dict[str, Any]

get_benchmark_info()[source]

Get information about the benchmark.

Returns:

Dictionary containing benchmark metadata

Return type:

dict[str, Any]

apply_verbosity(settings)

Apply verbosity settings to the mixin consumer.

property benchmark_name: str

Get the human-readable benchmark name.

create_enhanced_benchmark_result(platform, query_results, execution_metadata=None, phases=None, resource_utilization=None, performance_characteristics=None, **kwargs)

Create a BenchmarkResults object with standardized fields.

This centralizes the logic for creating benchmark results that was previously duplicated across platform adapters and CLI orchestrator.

Parameters:
  • platform (str) – Platform name (e.g., “DuckDB”, “ClickHouse”)

  • query_results (list[dict[str, Any]]) – List of query execution results

  • execution_metadata (dict[str, Any] | None) – Optional execution metadata

  • phases (dict[str, dict[str, Any]] | None) – Optional phase tracking information

  • resource_utilization (dict[str, Any] | None) – Optional resource usage metrics

  • performance_characteristics (dict[str, Any] | None) – Optional performance analysis

  • **kwargs (Any) – Additional fields to override defaults

Returns:

Fully configured BenchmarkResults object

Return type:

BenchmarkResults

format_results(benchmark_result)

Format benchmark results for display.

Parameters:

benchmark_result (dict[str, Any]) – Result dictionary from run_benchmark()

Returns:

Formatted string representation of the results

Return type:

str

get_data_source_benchmark()

Return the canonical source benchmark when data is shared.

Benchmarks that reuse data generated by another benchmark (for example, Primitives reusing TPC-H datasets) should override this method and return the lower-case identifier of the source benchmark. Benchmarks that produce their own data should return None (default).

log_debug_info(context='Debug')

Log comprehensive debug information including version details.

log_error_with_debug_info(error, context='Error')

Log an error with comprehensive debug information.

log_operation_complete(operation, duration=None, details='')
log_operation_start(operation, details='')
log_verbose(message)
log_version_warning()

Log version consistency warnings if any exist.

log_very_verbose(message)
property logger: Logger

Return the logger configured for the verbosity mixin consumer.

quiet: bool = False
run_query(query_id, connection, params=None, fetch_results=False)

Execute single query and return timing and results.

Parameters:
  • query_id (int | str) – ID of the query to execute

  • connection (DatabaseConnection) – Database connection to execute query on

  • params (dict[str, Any] | None) – Optional parameters for query customization

  • fetch_results (bool) – Whether to fetch and return query results

Returns:

  • query_id: Executed query ID

  • execution_time: Time taken to execute query in seconds

  • query_text: Executed query text

  • results: Query results if fetch_results=True, otherwise None

  • row_count: Number of rows returned (if results fetched)

Return type:

Dictionary containing

Raises:
  • ValueError – If query_id is invalid

  • Exception – If query execution fails

run_with_platform(platform_adapter, **run_config)

Run complete benchmark using platform-specific optimizations.

This method provides a unified interface for running benchmarks using database platform adapters that handle connection management, data loading optimizations, and query execution.

This is the standard method that all benchmarks should support for integration with the CLI and other orchestration tools.

Parameters:
  • platform_adapter – Platform adapter instance (e.g., DuckDBAdapter)

  • **run_config – Configuration options: - categories: List of query categories to run (if benchmark supports) - query_subset: List of specific query IDs to run - connection: Connection configuration - benchmark_type: Type hint for optimizations (‘olap’, ‘oltp’, etc.)

Returns:

BenchmarkResults object with execution results

Example

from benchbox.platforms import DuckDBAdapter

benchmark = SomeBenchmark(scale_factor=0.1) adapter = DuckDBAdapter() results = benchmark.run_with_platform(adapter)

setup_database(connection)

Set up database with schema and data.

Creates necessary database schema and loads benchmark data into the database.

Parameters:

connection (DatabaseConnection) – Database connection to set up

Raises:
  • ValueError – If data generation fails

  • Exception – If database setup fails

translate_query(query_id, dialect)

Translate a query to a specific SQL dialect.

Parameters:
  • query_id (int | str) – The ID of the query to translate

  • dialect (str) – The target SQL dialect

Returns:

The translated query string

Raises:
  • ValueError – If the query_id is invalid

  • ImportError – If sqlglot is not installed

  • ValueError – If the dialect is not supported

Return type:

str

verbose: bool = False
verbose_enabled: bool = False
verbose_level: int = 0
property verbosity_settings: VerbositySettings

Return the current verbosity settings.

very_verbose: bool = False

Constructor

ReadPrimitives(
    scale_factor: float = 1.0,
    output_dir: Optional[Union[str, Path]] = None,
    verbose: bool = False,
    **kwargs
)

Parameters:

  • scale_factor (float): Data size multiplier. SF=1.0 generates ~6M lineitem rows (~1GB). Range: 0.01 to 10+

  • output_dir (str|Path, optional): Directory for generated data files. Default: temporary directory

  • verbose (bool): Enable verbose logging. Default: False

  • kwargs: Additional options

Raises:

  • ValueError: If scale_factor is not positive

  • TypeError: If scale_factor is not a number

Methods

generate_data(tables=None)

Generate Read Primitives benchmark data (TPC-H schema).

# Generate all tables
table_files = benchmark.generate_data()

# Generate specific tables
table_files = benchmark.generate_data(tables=["lineitem", "orders"])

for table_name, file_path in table_files.items():
    print(f"{table_name}: {file_path}")

Parameters:

  • tables (list[str], optional): Table names to generate. If None, generates all tables

Returns:

dict[str, str]: Dictionary mapping table names to file paths

get_query(query_id, \*, params=None)

Get a specific primitive query.

# Get simple aggregation query
q1 = benchmark.get_query("aggregation_simple")

# Get window function query
q2 = benchmark.get_query("window_rank")

# Get join query
q3 = benchmark.get_query("join_inner")

Parameters:

  • query_id (int|str): Query identifier (e.g., “aggregation_simple”, “window_rank”)

  • params (dict, optional): Query parameters

Returns:

str: Query SQL text

Raises:

  • ValueError: If query_id is invalid

get_queries(dialect=None)

Get all primitive queries.

# Get all queries
queries = benchmark.get_queries()
print(f"Total queries: {len(queries)}")

# Get with dialect translation
queries_sf = benchmark.get_queries(dialect="snowflake")

Parameters:

  • dialect (str, optional): Target SQL dialect for translation

Returns:

dict[str, str]: Dictionary mapping query IDs to SQL text

get_queries_by_category(category)

Get queries filtered by category.

# Get all aggregation queries
agg_queries = benchmark.get_queries_by_category("aggregation")

# Get all window function queries
window_queries = benchmark.get_queries_by_category("window")

# Get all join queries
join_queries = benchmark.get_queries_by_category("join")

Parameters:

  • category (str): Category name (e.g., “aggregation”, “window”, “join”)

Returns:

dict[str, str]: Dictionary mapping query IDs to SQL text for the category

Raises:

  • ValueError: If category is invalid

get_query_categories()

Get list of available query categories.

categories = benchmark.get_query_categories()

print("Available categories:")
for category in categories:
    queries = benchmark.get_queries_by_category(category)
    print(f"  {category}: {len(queries)} queries")
Returns:

list[str]: List of category names

Available Categories:

  • aggregation: SUM, COUNT, AVG, MIN, MAX operations

  • window: Window functions (ROW_NUMBER, RANK, DENSE_RANK, LAG, LEAD)

  • join: Inner, left, right, full outer joins

  • filter: WHERE clause filtering operations

  • groupby: GROUP BY and HAVING operations

  • orderby: ORDER BY and TOP-N operations

  • subquery: Subquery and correlated subquery operations

  • union: UNION, UNION ALL, INTERSECT, EXCEPT operations

  • cte: Common table expressions (WITH clauses)

  • string: String manipulation functions

  • date: Date/time operations

  • numeric: Numeric calculations and functions

  • analytical: Advanced analytical operations

get_schema()

Get Read Primitives benchmark schema (TPC-H).

schema = benchmark.get_schema()

for table_name, table_def in schema.items():
    print(f"{table_name}: {len(table_def['columns'])} columns")
Returns:

dict[str, dict]: Dictionary mapping table names to schema definitions

get_create_tables_sql(dialect=”standard”, tuning_config=None)

Get CREATE TABLE SQL for all tables.

# Standard SQL
create_sql = benchmark.get_create_tables_sql()

# With dialect
create_sql_pg = benchmark.get_create_tables_sql(dialect="postgres")

# With tuning configuration
from benchbox.core.tuning.interface import UnifiedTuningConfiguration
tuning = UnifiedTuningConfiguration(...)
create_sql_tuned = benchmark.get_create_tables_sql(tuning_config=tuning)

Parameters:

  • dialect (str): Target SQL dialect. Default: “standard”

  • tuning_config (UnifiedTuningConfiguration, optional): Tuning settings

Returns:

str: SQL script for creating all tables

run_benchmark(connection, queries=None, iterations=1, categories=None)

Run the complete Read Primitives benchmark.

from benchbox.platforms.duckdb import DuckDBAdapter

adapter = DuckDBAdapter()
conn = adapter.create_connection()

# Run all queries
results = benchmark.run_benchmark(conn)

# Run specific queries
results = benchmark.run_benchmark(
    conn,
    queries=["aggregation_simple", "window_rank"],
    iterations=3
)

# Run specific categories
results = benchmark.run_benchmark(
    conn,
    categories=["aggregation", "window"],
    iterations=3
)

Parameters:

  • connection (Any): Database connection

  • queries (list[str], optional): Query IDs to run. If None, runs all

  • iterations (int): Number of times to run each query. Default: 1

  • categories (list[str], optional): Categories to run. Overrides queries parameter

Returns:

dict[str, Any]: Benchmark results with timing and status

run_category_benchmark(connection, category, iterations=1)

Run benchmark for a specific query category.

# Run all aggregation queries
agg_results = benchmark.run_category_benchmark(conn, "aggregation", iterations=3)

# Run all window function queries
window_results = benchmark.run_category_benchmark(conn, "window", iterations=3)

print(f"Aggregation results: {agg_results}")

Parameters:

  • connection (Any): Database connection

  • category (str): Category name (e.g., “aggregation”, “window”)

  • iterations (int): Number of times to run each query. Default: 1

Returns:

dict[str, Any]: Benchmark results for the category

load_data_to_database(connection, tables=None)

Load generated data into a database.

# Load all tables
benchmark.load_data_to_database(conn)

# Load specific tables
benchmark.load_data_to_database(conn, tables=["lineitem", "orders"])

Parameters:

  • connection (Any): Database connection

  • tables (list[str], optional): Tables to load. If None, loads all

Raises:

  • ValueError: If data hasn’t been generated yet

execute_query(query_id, connection, params=None)

Execute a primitive query on the database.

# Execute single query
result = benchmark.execute_query("aggregation_simple", conn)

# Execute with custom parameters
result = benchmark.execute_query(
    "aggregation_simple",
    conn,
    params={"threshold": 100}
)

Parameters:

  • query_id (str): Query identifier

  • connection (Any): Database connection

  • params (dict, optional): Query parameters

Returns:

Any: Query results from the database

Raises:

  • ValueError: If query_id is invalid

get_benchmark_info()

Get information about the benchmark.

info = benchmark.get_benchmark_info()

print(f"Benchmark: {info['name']}")
print(f"Total queries: {info['total_queries']}")
print(f"Categories: {', '.join(info['categories'])}")
print(f"Scale factor: {info['scale_factor']}")
Returns:

dict[str, Any]: Benchmark metadata

Usage Examples

Basic Benchmark Run

from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter

# Create benchmark with scale factor 0.1 (~100MB)
benchmark = ReadPrimitives(scale_factor=0.1)

# Generate data
benchmark.generate_data()

# Run on DuckDB
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)

# Print results
print(f"Benchmark: {results.benchmark_name}")
print(f"Total time: {results.total_execution_time:.2f}s")
print(f"Queries: {results.successful_queries}/{results.total_queries}")

Category-Based Execution

from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter

benchmark = ReadPrimitives(scale_factor=0.1)
benchmark.generate_data()

adapter = DuckDBAdapter()
conn = adapter.create_connection()

# Load data
adapter.create_schema(benchmark, conn)
benchmark.load_data_to_database(conn)

# Get all categories
categories = benchmark.get_query_categories()

# Run each category
category_results = {}

for category in categories:
    print(f"\nRunning {category} category...")
    results = benchmark.run_category_benchmark(conn, category, iterations=3)

    category_results[category] = {
        "total_time": results["total_time"],
        "avg_time": results["average_time"],
        "query_count": results["query_count"],
        "successful": results["successful_queries"]
    }

    print(f"  Total time: {results['total_time']:.2f}s")
    print(f"  Avg query time: {results['average_time']:.3f}s")
    print(f"  Successful: {results['successful_queries']}/{results['query_count']}")

# Print category summary
print("\n" + "="*60)
print("Category Performance Summary")
print("="*60)
for category, stats in sorted(category_results.items()):
    print(f"{category:15s}: {stats['avg_time']:6.3f}s avg ({stats['query_count']} queries)")

Selective Query Execution

from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter
import time

benchmark = ReadPrimitives(scale_factor=0.1)
adapter = DuckDBAdapter()

# Setup
benchmark.generate_data()
conn = adapter.create_connection()
adapter.create_schema(benchmark, conn)
benchmark.load_data_to_database(conn)

# Run specific queries
selected_queries = [
    "aggregation_simple",
    "aggregation_group",
    "window_rank",
    "join_inner",
    "filter_simple"
]

print("Running selected primitive queries:")
query_times = {}

for query_id in selected_queries:
    query = benchmark.get_query(query_id)

    start = time.time()
    result = benchmark.execute_query(query_id, conn)
    duration = time.time() - start

    query_times[query_id] = duration
    print(f"  {query_id:25s}: {duration:.3f}s")

# Summary
total_time = sum(query_times.values())
avg_time = total_time / len(query_times)
print(f"\nTotal: {total_time:.2f}s, Average: {avg_time:.3f}s")

Performance Regression Testing

from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter
import json

benchmark = ReadPrimitives(scale_factor=0.1)
adapter = DuckDBAdapter()

# Setup
benchmark.generate_data()
conn = adapter.create_connection()
adapter.create_schema(benchmark, conn)
benchmark.load_data_to_database(conn)

# Run baseline benchmark
print("Running baseline benchmark...")
baseline_results = benchmark.run_benchmark(conn, iterations=3)

# Save baseline
with open("baseline_results.json", "w") as f:
    json.dump(baseline_results, f, indent=2)

# ... make changes to database configuration ...

# Run comparison benchmark
print("\nRunning comparison benchmark...")
comparison_results = benchmark.run_benchmark(conn, iterations=3)

# Compare results
print("\nPerformance Comparison:")
print(f"{'Query':<30s} {'Baseline':<12s} {'Current':<12s} {'Change':<10s}")
print("-" * 70)

for query_id in baseline_results.keys():
    if query_id in ["total_time", "average_time"]:
        continue

    baseline_time = baseline_results[query_id]["average_time"]
    current_time = comparison_results[query_id]["average_time"]
    change_pct = ((current_time - baseline_time) / baseline_time) * 100

    status = "REGRESSION" if change_pct > 10 else "OK"
    print(f"{query_id:<30s} {baseline_time:>10.3f}s  {current_time:>10.3f}s  "
          f"{change_pct:>+7.1f}% {status}")

Multi-Category Analysis

from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter
import pandas as pd

benchmark = ReadPrimitives(scale_factor=0.1)
adapter = DuckDBAdapter()

# Setup
benchmark.generate_data()
conn = adapter.create_connection()
adapter.create_schema(benchmark, conn)
benchmark.load_data_to_database(conn)

# Run selected categories
categories_to_test = ["aggregation", "window", "join", "filter"]

results_data = []

for category in categories_to_test:
    print(f"\nTesting {category} category...")
    results = benchmark.run_category_benchmark(conn, category, iterations=3)

    results_data.append({
        "category": category,
        "total_time": results["total_time"],
        "avg_time": results["average_time"],
        "query_count": results["query_count"],
        "successful": results["successful_queries"],
        "failed": results["failed_queries"],
        "success_rate": (results["successful_queries"] / results["query_count"]) * 100
    })

# Create DataFrame for analysis
df = pd.DataFrame(results_data)

print("\nCategory Analysis:")
print(df.to_string(index=False))

# Identify slowest categories
df_sorted = df.sort_values("avg_time", ascending=False)
print(f"\nSlowest categories:")
for idx, row in df_sorted.head(3).iterrows():
    print(f"  {row['category']}: {row['avg_time']:.3f}s average")

Multi-Platform Comparison

from benchbox.read_primitives import ReadPrimitives
from benchbox.platforms.duckdb import DuckDBAdapter
from benchbox.platforms.clickhouse import ClickHouseAdapter
import pandas as pd

benchmark = ReadPrimitives(scale_factor=0.1, output_dir="./data/primitives")
benchmark.generate_data()

platforms = {
    "DuckDB": DuckDBAdapter(),
    "ClickHouse": ClickHouseAdapter(host="localhost"),
}

results_data = []

for name, adapter in platforms.items():
    print(f"\nBenchmarking {name}...")
    results = benchmark.run_with_platform(adapter)

    results_data.append({
        "platform": name,
        "total_time": results.total_execution_time,
        "avg_query_time": results.average_query_time,
        "successful": results.successful_queries,
        "failed": results.failed_queries,
    })

df = pd.DataFrame(results_data)
print("\nPlatform Comparison:")
print(df)

Custom Primitive Development

from benchbox.read_primitives import ReadPrimitives

# Extend with custom primitives
class CustomPrimitives(ReadPrimitives):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Add custom queries
        self._custom_queries = {
            "custom_complex_join": """
                SELECT l.l_orderkey, o.o_custkey, c.c_name, p.p_name
                FROM lineitem l
                JOIN orders o ON l.l_orderkey = o.o_orderkey
                JOIN customer c ON o.o_custkey = c.c_custkey
                JOIN part p ON l.l_partkey = p.p_partkey
                WHERE l.l_shipdate > '1995-01-01'
                LIMIT 100
            """,
            "custom_window_aggregation": """
                SELECT
                    l_orderkey,
                    l_partkey,
                    l_quantity,
                    SUM(l_quantity) OVER (PARTITION BY l_partkey) as total_qty,
                    AVG(l_quantity) OVER (PARTITION BY l_partkey) as avg_qty
                FROM lineitem
                WHERE l_shipdate > '1995-01-01'
                LIMIT 1000
            """
        }

    def get_query(self, query_id, *, params=None):
        # Check custom queries first
        if query_id in self._custom_queries:
            return self._custom_queries[query_id]

        # Fall back to standard primitives
        return super().get_query(query_id, params=params)

# Use custom primitives
custom_bench = CustomPrimitives(scale_factor=0.1)
custom_bench.generate_data()

# Execute custom query
custom_query = custom_bench.get_query("custom_complex_join")
print(f"Custom query: {custom_query}")

See Also

External Resources