Write Primitives Benchmark API

Tags reference python-api custom-benchmark

Complete Python API reference for the Write Primitives Benchmark.

Overview

The Write Read Primitives benchmark tests fundamental database write operations using TPC-H schema as foundation. It provides comprehensive testing of INSERT, UPDATE, DELETE, BULK_LOAD, MERGE, DDL, and TRANSACTION operations with automatic validation and cleanup.

Key Features:

  • Data Sharing: Reuses TPC-H data (no duplicate generation)

  • Catalog-Driven: Operations defined in YAML catalog

  • 12 Starter Operations: Across 5 categories (INSERT, UPDATE, DELETE, DDL, TRANSACTION)

  • Automatic Validation: Every write validated with read queries

  • Lifecycle Management: Setup, reset, teardown, status checking

  • Extensible: Easy to add more operations to catalog

Reference: See benchbox/core/write_primitives/README.md for full design

Quick Start

from benchbox import TPCH, WritePrimitives
import duckdb

# 1. Load TPC-H data first (required)
tpch = TPCH(scale_factor=1.0)
tpch.generate_data()

conn = duckdb.connect("benchmark.db")
adapter = tpch.create_adapter("duckdb")
adapter.load_data(tpch, conn, tpch.output_dir)

# 2. Setup Write Primitives
bench = WritePrimitives(scale_factor=1.0)
bench.setup(conn)

# 3. Execute operations
result = bench.execute_operation("insert_single_row", conn)
print(f"Success: {result.success}, Time: {result.write_duration_ms:.2f}ms")

API Reference

WritePrimitives Class

class WritePrimitives(scale_factor=1.0, output_dir=None, **kwargs)[source]

Bases: BaseBenchmark, OperationExecutor

Write Primitives benchmark implementation.

Tests fundamental write operations (INSERT, UPDATE, DELETE, BULK_LOAD, MERGE, DDL, TRANSACTION) using TPC-H schema as foundation.

The benchmark covers: - INSERT operations: single row, batch, INSERT…SELECT - UPDATE operations: selective, bulk, with joins/subqueries - DELETE operations: selective, bulk, cascading - BULK_LOAD operations: CSV, Parquet, various compressions - MERGE operations: insert/update/delete combinations - DDL operations: CREATE, DROP, TRUNCATE, ALTER - TRANSACTION operations: commit, rollback, savepoints

__init__(scale_factor=1.0, output_dir=None, **kwargs)[source]

Initialize Write Primitives benchmark instance.

Parameters:
  • scale_factor (float) – Scale factor for the benchmark (1.0 = ~1GB TPC-H data)

  • output_dir (str | Path | None) – Directory to output generated data files

  • **kwargs (Any) – Additional implementation-specific options

get_data_source_benchmark()[source]

Get the benchmark that provides data for this benchmark.

Returns:

“tpch” to indicate this benchmark reuses TPC-H data

Return type:

str | None

property tables: dict[str, Path]

Get the mapping of table names to data file paths.

Returns:

Dictionary mapping table names to paths of generated data files

generate_data(tables=None)[source]

Generate Write Primitives benchmark data.

This generates/reuses TPC-H base data. Staging tables are created during benchmark setup.

Parameters:

tables (list[str] | None) – Optional list of table names to generate. If None, generates all.

Returns:

List of paths to generated data files

Return type:

list[str | Path]

get_queries(dialect=None)[source]

Get all write operation SQL.

Parameters:

dialect (str | None) – Target SQL dialect for query translation. If None, returns original SQL.

Returns:

A dictionary mapping operation IDs to write SQL strings

Return type:

dict[str, str]

get_queries_by_category(category)[source]

Get write operation SQL filtered by category.

Parameters:

category (str) – Operation category (insert, update, delete, ddl, transaction)

Returns:

Dictionary mapping operation IDs to write SQL for the category

Return type:

dict[str, str]

get_query(query_id, **kwargs)[source]

Get a specific write operation SQL.

Parameters:
  • query_id (int | str) – The ID of the operation to retrieve (e.g., ‘insert_single_row’)

  • **kwargs – Additional parameters

Returns:

The write SQL string

Raises:

ValueError – If the query_id is invalid

Return type:

str

get_operation(operation_id)[source]

Get a specific write operation.

Parameters:

operation_id (str) – Operation identifier

Returns:

WriteOperation object

Return type:

Any

get_all_operations()[source]

Get all write operations.

Returns:

Dictionary mapping operation IDs to WriteOperation objects

Return type:

dict[str, Any]

get_operations_by_category(category)[source]

Get operations filtered by category.

Parameters:

category (str) – Category name (e.g., ‘insert’, ‘update’, ‘delete’)

Returns:

Dictionary mapping operation IDs to WriteOperation objects

Return type:

dict[str, Any]

get_operation_categories()[source]

Get list of available operation categories.

Returns:

List of category names

Return type:

list[str]

get_schema(dialect='standard')[source]

Get the Write Primitives schema (staging tables).

Parameters:

dialect (str) – SQL dialect to use for data types

Returns:

A dictionary mapping table names to their schema definitions

Return type:

dict[str, dict]

get_create_tables_sql(dialect='standard', tuning_config=None)[source]

Get SQL to create all required tables.

Includes both TPC-H base tables and Write Primitives staging tables.

Parameters:
  • dialect (str) – SQL dialect to use

  • tuning_config – Unified tuning configuration for constraint settings

Returns:

SQL script for creating all tables

Return type:

str

get_benchmark_info()[source]

Get information about the benchmark.

Returns:

Dictionary containing benchmark metadata

Return type:

dict[str, Any]

setup(connection, force=False)[source]

Setup benchmark for execution.

Creates staging tables and populates them from TPC-H base tables.

Parameters:
  • connection (Any) – Database connection

  • force (bool) – If True, drop existing staging tables first

Returns:

Dictionary with setup status and details

Raises:

RuntimeError – If TPC-H tables don’t exist or setup fails

Return type:

dict[str, Any]

load_data(connection, **kwargs)[source]

Load data into database (standard benchmark interface).

This wraps the setup() method to integrate with platform adapter’s data loading pipeline.

Parameters:
  • connection (Any) – Database connection

  • **kwargs – Additional arguments (e.g., force)

Returns:

Dictionary with loading results

Return type:

dict[str, Any]

teardown(connection)[source]

Clean up all staging tables.

Parameters:

connection (Any) – Database connection

reset(connection)[source]

Reset staging tables to initial state.

Truncates and repopulates staging tables.

Parameters:

connection (Any) – Database connection

is_setup(connection)[source]

Check if staging tables are ready.

Parameters:

connection (Any) – Database connection

Returns:

True if all staging tables exist and have data

Return type:

bool

execute_operation(operation_id, connection, use_transaction=True)[source]

Execute a write operation and validate results.

Parameters:
  • operation_id (str) – ID of operation to execute

  • connection (Any) – Database connection

  • use_transaction (bool) – If True, wrap in transaction and rollback after validation

Returns:

OperationResult with execution metrics

Return type:

Any

run_benchmark(connection, operation_ids=None, categories=None)[source]

Run write operations benchmark.

Parameters:
  • connection (Any) – Database connection

  • operation_ids (list[str] | None) – Optional list of specific operations to run

  • categories (list[str] | None) – Optional list of categories to run

Returns:

List of OperationResult objects

Return type:

list[Any]

apply_verbosity(settings)

Apply verbosity settings to the mixin consumer.

property benchmark_name: str

Get the human-readable benchmark name.

create_enhanced_benchmark_result(platform, query_results, execution_metadata=None, phases=None, resource_utilization=None, performance_characteristics=None, **kwargs)

Create a BenchmarkResults object with standardized fields.

This centralizes the logic for creating benchmark results that was previously duplicated across platform adapters and CLI orchestrator.

Parameters:
  • platform (str) – Platform name (e.g., “DuckDB”, “ClickHouse”)

  • query_results (list[dict[str, Any]]) – List of query execution results

  • execution_metadata (dict[str, Any] | None) – Optional execution metadata

  • phases (dict[str, dict[str, Any]] | None) – Optional phase tracking information

  • resource_utilization (dict[str, Any] | None) – Optional resource usage metrics

  • performance_characteristics (dict[str, Any] | None) – Optional performance analysis

  • **kwargs (Any) – Additional fields to override defaults

Returns:

Fully configured BenchmarkResults object

Return type:

BenchmarkResults

format_results(benchmark_result)

Format benchmark results for display.

Parameters:

benchmark_result (dict[str, Any]) – Result dictionary from run_benchmark()

Returns:

Formatted string representation of the results

Return type:

str

log_debug_info(context='Debug')

Log comprehensive debug information including version details.

log_error_with_debug_info(error, context='Error')

Log an error with comprehensive debug information.

log_operation_complete(operation, duration=None, details='')
log_operation_start(operation, details='')
log_verbose(message)
log_version_warning()

Log version consistency warnings if any exist.

log_very_verbose(message)
property logger: Logger

Return the logger configured for the verbosity mixin consumer.

quiet: bool = False
run_query(query_id, connection, params=None, fetch_results=False)

Execute single query and return timing and results.

Parameters:
  • query_id (int | str) – ID of the query to execute

  • connection (DatabaseConnection) – Database connection to execute query on

  • params (dict[str, Any] | None) – Optional parameters for query customization

  • fetch_results (bool) – Whether to fetch and return query results

Returns:

  • query_id: Executed query ID

  • execution_time: Time taken to execute query in seconds

  • query_text: Executed query text

  • results: Query results if fetch_results=True, otherwise None

  • row_count: Number of rows returned (if results fetched)

Return type:

Dictionary containing

Raises:
  • ValueError – If query_id is invalid

  • Exception – If query execution fails

run_with_platform(platform_adapter, **run_config)

Run complete benchmark using platform-specific optimizations.

This method provides a unified interface for running benchmarks using database platform adapters that handle connection management, data loading optimizations, and query execution.

This is the standard method that all benchmarks should support for integration with the CLI and other orchestration tools.

Parameters:
  • platform_adapter – Platform adapter instance (e.g., DuckDBAdapter)

  • **run_config – Configuration options: - categories: List of query categories to run (if benchmark supports) - query_subset: List of specific query IDs to run - connection: Connection configuration - benchmark_type: Type hint for optimizations (‘olap’, ‘oltp’, etc.)

Returns:

BenchmarkResults object with execution results

Example

from benchbox.platforms import DuckDBAdapter

benchmark = SomeBenchmark(scale_factor=0.1) adapter = DuckDBAdapter() results = benchmark.run_with_platform(adapter)

setup_database(connection)

Set up database with schema and data.

Creates necessary database schema and loads benchmark data into the database.

Parameters:

connection (DatabaseConnection) – Database connection to set up

Raises:
  • ValueError – If data generation fails

  • Exception – If database setup fails

translate_query(query_id, dialect)

Translate a query to a specific SQL dialect.

Parameters:
  • query_id (int | str) – The ID of the query to translate

  • dialect (str) – The target SQL dialect

Returns:

The translated query string

Raises:
  • ValueError – If the query_id is invalid

  • ImportError – If sqlglot is not installed

  • ValueError – If the dialect is not supported

Return type:

str

verbose: bool = False
verbose_enabled: bool = False
verbose_level: int = 0
property verbosity_settings: VerbositySettings

Return the current verbosity settings.

very_verbose: bool = False

Constructor:

WritePrimitives(
    scale_factor: float = 1.0,
    output_dir: Optional[Union[str, Path]] = None,
    **kwargs
)

Parameters:

  • scale_factor (float): TPC-H scale factor (must match TPC-H data)

  • output_dir (str | Path | None): Output directory (defaults to TPC-H path)

Lifecycle Methods

setup(connection, force=False) dict

Create staging tables and populate from TPC-H base tables.

Parameters:

  • connection: Database connection

  • force (bool): If True, drop existing staging tables first

Returns: Dict with keys success, tables_created, tables_populated

Raises: RuntimeError if TPC-H tables don’t exist

Example:

setup_result = bench.setup(conn, force=True)
print(f"Tables created: {setup_result['tables_created']}")
print(f"Rows: {setup_result['tables_populated']}")
is_setup(connection) bool

Check if staging tables are initialized and ready.

Example:

if bench.is_setup(conn):
    print("Ready to execute operations")
reset(connection) None

Truncate and repopulate staging tables to initial state.

Example:

# Reset after validation failures
bench.reset(conn)
teardown(connection) None

Drop all staging tables.

Example:

# Cleanup when done
bench.teardown(conn)

Operation Execution Methods

execute_operation(operation_id, connection, use_transaction=True) OperationResult

Execute a single write operation with validation and cleanup.

Parameters:

  • operation_id (str): Operation ID (e.g., “insert_single_row”)

  • connection: Database connection

  • use_transaction (bool): Wrap in transaction for rollback cleanup

Returns: OperationResult with execution metrics

Raises: ValueError if operation not found

Example:

result = bench.execute_operation("insert_single_row", conn)
print(f"Operation: {result.operation_id}")
print(f"Success: {result.success}")
print(f"Rows affected: {result.rows_affected}")
print(f"Write time: {result.write_duration_ms:.2f}ms")
print(f"Validation: {result.validation_passed}")
run_benchmark(connection, operation_ids=None, categories=None) list[OperationResult]

Run multiple operations.

Parameters:

  • connection: Database connection

  • operation_ids (list[str] | None): List of specific operation IDs

  • categories (list[str] | None): List of categories (e.g., [“insert”, “update”])

Returns: List of OperationResult objects

Example:

# Run all operations
results = bench.run_benchmark(conn)

# Run only INSERT operations
insert_results = bench.run_benchmark(conn, categories=["insert"])

# Run specific operations
specific_ops = ["insert_single_row", "update_single_row_pk"]
results = bench.run_benchmark(conn, operation_ids=specific_ops)

Operation Query Methods

get_all_operations() dict[str, WriteOperation]

Get all available operations.

Example:

operations = bench.get_all_operations()
print(f"Total operations: {len(operations)}")
get_operation(operation_id) WriteOperation

Get specific operation by ID.

Example:

op = bench.get_operation("insert_single_row")
print(f"Category: {op.category}")
get_operations_by_category(category) dict[str, WriteOperation]

Get operations filtered by category.

Parameters:

  • category (str): Category name (“insert”, “update”, “delete”, “ddl”, “transaction”)

Example:

insert_ops = bench.get_operations_by_category("insert")
print(f"INSERT operations: {len(insert_ops)}")
get_operation_categories() list[str]

Get list of available categories.

Example:

categories = bench.get_operation_categories()
# Output: ['insert', 'update', 'delete', 'ddl', 'transaction']
get_queries(dialect=None) dict[str, str]

Get all write operation SQL.

Example:

queries = bench.get_queries()
print(queries["insert_single_row"])
get_query(query_id, **kwargs) str

Get a specific write operation SQL.

Example:

sql = bench.get_query("insert_single_row")

Schema Methods

get_schema(dialect='standard') dict[str, dict]

Get staging table schema definitions.

Example:

schema = bench.get_schema(dialect="duckdb")
for table, definition in schema.items():
    print(f"{table}: {definition['columns']}")
get_create_tables_sql(dialect='standard') str

Get SQL to create staging tables.

Example:

create_sql = bench.get_create_tables_sql(dialect="postgresql")

OperationResult Class

Result object returned by execute_operation and run_benchmark.

Fields:

  • operation_id (str): Operation identifier

  • success (bool): Whether operation succeeded

  • write_duration_ms (float): Write execution time in milliseconds

  • rows_affected (int): Number of rows affected (-1 if unknown)

  • validation_duration_ms (float): Validation time in milliseconds

  • validation_passed (bool): Whether validation passed

  • validation_results (list[dict]): Detailed validation results

  • cleanup_duration_ms (float): Cleanup time in milliseconds

  • cleanup_success (bool): Whether cleanup succeeded

  • error (str | None): Error message if operation failed

  • cleanup_warning (str | None): Warning for transaction cleanup failures

Operations Catalog

The benchmark includes 12 starter operations across 5 categories:

INSERT Operations (3)

  • insert_single_row: Single row INSERT with all columns

  • insert_batch_values_10: Batch INSERT with 10 rows

  • insert_select_simple: INSERT…SELECT with WHERE filter

UPDATE Operations (3)

  • update_single_row_pk: UPDATE single row by primary key

  • update_selective_10pct: UPDATE ~10% of rows

  • update_with_subquery: UPDATE with correlated subquery

DELETE Operations (2)

  • delete_single_row_pk: DELETE single row by primary key

  • delete_selective_10pct: DELETE ~10% of rows

DDL Operations (3)

  • ddl_create_table_simple: CREATE TABLE with simple schema

  • ddl_truncate_table_small: TRUNCATE small staging table

  • ddl_create_table_as_select_simple: CTAS with simple SELECT

TRANSACTION Operations (1)

  • transaction_commit_small: BEGIN + 10 writes + COMMIT

Staging Tables

The benchmark creates these staging tables:

Data Tables:

  • orders_stage: Copy of ORDERS for UPDATE/DELETE testing

  • lineitem_stage: Copy of LINEITEM for write testing

  • orders_new: Source for MERGE testing

  • orders_summary: Target for aggregated INSERT…SELECT

  • lineitem_enriched: Target for joined INSERT…SELECT

Metadata Tables:

  • write_ops_log: Audit log for all write operations

  • batch_metadata: Tracks batch operations

Usage Examples

Complete Workflow

from benchbox import TPCH, WritePrimitives
import duckdb

# 1. Load TPC-H data first
tpch = TPCH(scale_factor=1.0)
tpch.generate_data()

conn = duckdb.connect("benchmark.db")
adapter = tpch.create_adapter("duckdb")
adapter.load_data(tpch, conn, tpch.output_dir)

# 2. Setup Write Primitives
bench = WritePrimitives(scale_factor=1.0)
setup_result = bench.setup(conn, force=True)
print(f"Setup: {setup_result['success']}")
print(f"Tables: {setup_result['tables_created']}")

# 3. Execute single operation
result = bench.execute_operation("insert_single_row", conn)
print(f"Success: {result.success}")
print(f"Time: {result.write_duration_ms:.2f}ms")
print(f"Validation: {result.validation_passed}")

# 4. Run all operations
results = bench.run_benchmark(conn)
print(f"Total: {len(results)}")
successful = [r for r in results if r.success]
print(f"Successful: {len(successful)}")

# 5. Cleanup
bench.teardown(conn)

Category-Based Testing

# Test each category separately
categories = bench.get_operation_categories()

for category in categories:
    print(f"\nTesting {category.upper()} operations:")
    results = bench.run_benchmark(conn, categories=[category])

    for result in results:
        status = "✓" if result.success else "✗"
        print(f"  {status} {result.operation_id}: {result.write_duration_ms:.2f}ms")

    # Reset between categories
    bench.reset(conn)

Performance Analysis

import time
from statistics import mean, median

# Run multiple iterations for stable timing
operation_id = "insert_single_row"
iterations = 10
times = []

for i in range(iterations):
    result = bench.execute_operation(operation_id, conn)
    if result.success:
        times.append(result.write_duration_ms)

    # Reset between iterations
    if i < iterations - 1:
        bench.reset(conn)

print(f"Operation: {operation_id}")
print(f"Iterations: {iterations}")
print(f"Mean: {mean(times):.2f}ms")
print(f"Median: {median(times):.2f}ms")
print(f"Min: {min(times):.2f}ms")
print(f"Max: {max(times):.2f}ms")

Error Handling and Recovery

# Handle operation failures
result = bench.execute_operation("insert_batch_values_10", conn)

if not result.success:
    print(f"Operation failed: {result.error}")
    if result.cleanup_warning:
        print(f"Cleanup warning: {result.cleanup_warning}")

    # Reset to recover
    print("Resetting staging tables...")
    bench.reset(conn)

# Handle validation failures
if not result.validation_passed:
    print("Validation failed:")
    for val_result in result.validation_results:
        if not val_result['passed']:
            print(f"  Query: {val_result['query_id']}")
            print(f"  Expected: {val_result['expected_rows']}")
            print(f"  Actual: {val_result['actual_rows']}")

    # Reset and retry
    bench.reset(conn)
    result = bench.execute_operation("insert_batch_values_10", conn)

Best Practices

  1. Always Load TPC-H First

    # Wrong: Will fail
    bench = WritePrimitives()
    bench.setup(conn)  # Error: TPC-H tables not found
    
    # Correct: Load TPC-H first
    tpch = TPCH(scale_factor=1.0)
    tpch.generate_data()
    adapter = tpch.create_adapter("duckdb")
    adapter.load_data(tpch, conn, tpch.output_dir)
    
    bench = WritePrimitives(scale_factor=1.0)
    bench.setup(conn)
    
  2. Check Setup Status

    if not bench.is_setup(conn):
        bench.setup(conn)
    
  3. Use Transactions for Cleanup

    # Automatic rollback cleanup
    result = bench.execute_operation("insert_single_row", conn, use_transaction=True)
    
    # Manual cleanup (for DDL operations)
    result = bench.execute_operation("ddl_create_table_simple", conn, use_transaction=False)
    
  4. Reset Between Test Runs

    # Reset to ensure clean state
    for operation_id in ["insert_single_row", "update_single_row_pk"]:
        result = bench.execute_operation(operation_id, conn)
        bench.reset(conn)  # Clean slate for next operation
    
  5. Validate and Handle Errors

    result = bench.execute_operation("insert_select_simple", conn)
    
    if result.success and result.validation_passed:
        print(f"Success: {result.write_duration_ms:.2f}ms")
    elif not result.success:
        print(f"Failed: {result.error}")
    elif not result.validation_passed:
        print("Validation failed, check data dependencies")
    

Common Issues

Issue: “Required TPC-H table not found”
  • Cause: Write Primitives requires TPC-H base tables

  • Solution: Load TPC-H data before calling setup()

Issue: “Staging tables not initialized”
  • Cause: Trying to execute before calling setup()

  • Solution: Call bench.setup(conn) first

Issue: Transaction cleanup failures
  • Cause: Some operations can’t be rolled back (DDL)

  • Solution: Use bench.reset(conn) to restore clean state

Issue: Validation failures
  • Cause: Data-dependent operations or constraint violations

  • Solution: Check validation_results for details, reset if needed

Issue: rows_affected = -1
  • Cause: Some databases (DuckDB) don’t return rowcount

  • Impact: Normal behavior, doesn’t affect operation success

See Also

Future Expansion

The catalog can be expanded to include all 101 planned operations:

  • More INSERT variants (12 total planned)

  • More UPDATE variants (15 total planned)

  • More DELETE variants (12 total planned)

  • BULK_LOAD operations (24 planned) - CSV, Parquet, compressions

  • MERGE/UPSERT operations (18 planned)

  • More DDL operations (12 total planned)

  • More TRANSACTION operations (8 total planned)

See benchbox/core/write_primitives/README.md for full plan.