Write Primitives Benchmark API¶
Complete Python API reference for the Write Primitives Benchmark.
Overview¶
The Write Read Primitives benchmark tests fundamental database write operations using TPC-H schema as foundation. It provides comprehensive testing of INSERT, UPDATE, DELETE, BULK_LOAD, MERGE, DDL, and TRANSACTION operations with automatic validation and cleanup.
Key Features:
Data Sharing: Reuses TPC-H data (no duplicate generation)
Catalog-Driven: Operations defined in YAML catalog
12 Starter Operations: Across 5 categories (INSERT, UPDATE, DELETE, DDL, TRANSACTION)
Automatic Validation: Every write validated with read queries
Lifecycle Management: Setup, reset, teardown, status checking
Extensible: Easy to add more operations to catalog
Reference: See benchbox/core/write_primitives/README.md for full design
Quick Start¶
from benchbox import TPCH, WritePrimitives
import duckdb
# 1. Load TPC-H data first (required)
tpch = TPCH(scale_factor=1.0)
tpch.generate_data()
conn = duckdb.connect("benchmark.db")
adapter = tpch.create_adapter("duckdb")
adapter.load_data(tpch, conn, tpch.output_dir)
# 2. Setup Write Primitives
bench = WritePrimitives(scale_factor=1.0)
bench.setup(conn)
# 3. Execute operations
result = bench.execute_operation("insert_single_row", conn)
print(f"Success: {result.success}, Time: {result.write_duration_ms:.2f}ms")
API Reference¶
WritePrimitives Class¶
- class WritePrimitives(scale_factor=1.0, output_dir=None, **kwargs)[source]¶
Bases:
BaseBenchmark,OperationExecutorWrite Primitives benchmark implementation.
Tests fundamental write operations (INSERT, UPDATE, DELETE, BULK_LOAD, MERGE, DDL, TRANSACTION) using TPC-H schema as foundation.
The benchmark covers: - INSERT operations: single row, batch, INSERT…SELECT - UPDATE operations: selective, bulk, with joins/subqueries - DELETE operations: selective, bulk, cascading - BULK_LOAD operations: CSV, Parquet, various compressions - MERGE operations: insert/update/delete combinations - DDL operations: CREATE, DROP, TRUNCATE, ALTER - TRANSACTION operations: commit, rollback, savepoints
- __init__(scale_factor=1.0, output_dir=None, **kwargs)[source]¶
Initialize Write Primitives benchmark instance.
- Parameters:
scale_factor (float) – Scale factor for the benchmark (1.0 = ~1GB TPC-H data)
output_dir (str | Path | None) – Directory to output generated data files
**kwargs (Any) – Additional implementation-specific options
- get_data_source_benchmark()[source]¶
Get the benchmark that provides data for this benchmark.
- Returns:
“tpch” to indicate this benchmark reuses TPC-H data
- Return type:
str | None
- property tables: dict[str, Path]¶
Get the mapping of table names to data file paths.
- Returns:
Dictionary mapping table names to paths of generated data files
- generate_data(tables=None)[source]¶
Generate Write Primitives benchmark data.
This generates/reuses TPC-H base data. Staging tables are created during benchmark setup.
- Parameters:
tables (list[str] | None) – Optional list of table names to generate. If None, generates all.
- Returns:
List of paths to generated data files
- Return type:
list[str | Path]
- get_queries(dialect=None)[source]¶
Get all write operation SQL.
- Parameters:
dialect (str | None) – Target SQL dialect for query translation. If None, returns original SQL.
- Returns:
A dictionary mapping operation IDs to write SQL strings
- Return type:
dict[str, str]
- get_queries_by_category(category)[source]¶
Get write operation SQL filtered by category.
- Parameters:
category (str) – Operation category (insert, update, delete, ddl, transaction)
- Returns:
Dictionary mapping operation IDs to write SQL for the category
- Return type:
dict[str, str]
- get_query(query_id, **kwargs)[source]¶
Get a specific write operation SQL.
- Parameters:
query_id (int | str) – The ID of the operation to retrieve (e.g., ‘insert_single_row’)
**kwargs – Additional parameters
- Returns:
The write SQL string
- Raises:
ValueError – If the query_id is invalid
- Return type:
str
- get_operation(operation_id)[source]¶
Get a specific write operation.
- Parameters:
operation_id (str) – Operation identifier
- Returns:
WriteOperation object
- Return type:
Any
- get_all_operations()[source]¶
Get all write operations.
- Returns:
Dictionary mapping operation IDs to WriteOperation objects
- Return type:
dict[str, Any]
- get_operations_by_category(category)[source]¶
Get operations filtered by category.
- Parameters:
category (str) – Category name (e.g., ‘insert’, ‘update’, ‘delete’)
- Returns:
Dictionary mapping operation IDs to WriteOperation objects
- Return type:
dict[str, Any]
- get_operation_categories()[source]¶
Get list of available operation categories.
- Returns:
List of category names
- Return type:
list[str]
- get_schema(dialect='standard')[source]¶
Get the Write Primitives schema (staging tables).
- Parameters:
dialect (str) – SQL dialect to use for data types
- Returns:
A dictionary mapping table names to their schema definitions
- Return type:
dict[str, dict]
- get_create_tables_sql(dialect='standard', tuning_config=None)[source]¶
Get SQL to create all required tables.
Includes both TPC-H base tables and Write Primitives staging tables.
- Parameters:
dialect (str) – SQL dialect to use
tuning_config – Unified tuning configuration for constraint settings
- Returns:
SQL script for creating all tables
- Return type:
str
- get_benchmark_info()[source]¶
Get information about the benchmark.
- Returns:
Dictionary containing benchmark metadata
- Return type:
dict[str, Any]
- setup(connection, force=False)[source]¶
Setup benchmark for execution.
Creates staging tables and populates them from TPC-H base tables.
- Parameters:
connection (Any) – Database connection
force (bool) – If True, drop existing staging tables first
- Returns:
Dictionary with setup status and details
- Raises:
RuntimeError – If TPC-H tables don’t exist or setup fails
- Return type:
dict[str, Any]
- load_data(connection, **kwargs)[source]¶
Load data into database (standard benchmark interface).
This wraps the setup() method to integrate with platform adapter’s data loading pipeline.
- Parameters:
connection (Any) – Database connection
**kwargs – Additional arguments (e.g., force)
- Returns:
Dictionary with loading results
- Return type:
dict[str, Any]
- teardown(connection)[source]¶
Clean up all staging tables.
- Parameters:
connection (Any) – Database connection
- reset(connection)[source]¶
Reset staging tables to initial state.
Truncates and repopulates staging tables.
- Parameters:
connection (Any) – Database connection
- is_setup(connection)[source]¶
Check if staging tables are ready.
- Parameters:
connection (Any) – Database connection
- Returns:
True if all staging tables exist and have data
- Return type:
bool
- execute_operation(operation_id, connection, use_transaction=True)[source]¶
Execute a write operation and validate results.
- Parameters:
operation_id (str) – ID of operation to execute
connection (Any) – Database connection
use_transaction (bool) – If True, wrap in transaction and rollback after validation
- Returns:
OperationResult with execution metrics
- Return type:
Any
- run_benchmark(connection, operation_ids=None, categories=None)[source]¶
Run write operations benchmark.
- Parameters:
connection (Any) – Database connection
operation_ids (list[str] | None) – Optional list of specific operations to run
categories (list[str] | None) – Optional list of categories to run
- Returns:
List of OperationResult objects
- Return type:
list[Any]
- apply_verbosity(settings)¶
Apply verbosity settings to the mixin consumer.
- property benchmark_name: str¶
Get the human-readable benchmark name.
- create_enhanced_benchmark_result(platform, query_results, execution_metadata=None, phases=None, resource_utilization=None, performance_characteristics=None, **kwargs)¶
Create a BenchmarkResults object with standardized fields.
This centralizes the logic for creating benchmark results that was previously duplicated across platform adapters and CLI orchestrator.
- Parameters:
platform (str) – Platform name (e.g., “DuckDB”, “ClickHouse”)
query_results (list[dict[str, Any]]) – List of query execution results
execution_metadata (dict[str, Any] | None) – Optional execution metadata
phases (dict[str, dict[str, Any]] | None) – Optional phase tracking information
resource_utilization (dict[str, Any] | None) – Optional resource usage metrics
performance_characteristics (dict[str, Any] | None) – Optional performance analysis
**kwargs (Any) – Additional fields to override defaults
- Returns:
Fully configured BenchmarkResults object
- Return type:
- format_results(benchmark_result)¶
Format benchmark results for display.
- Parameters:
benchmark_result (dict[str, Any]) – Result dictionary from run_benchmark()
- Returns:
Formatted string representation of the results
- Return type:
str
- log_debug_info(context='Debug')¶
Log comprehensive debug information including version details.
- log_error_with_debug_info(error, context='Error')¶
Log an error with comprehensive debug information.
- log_operation_complete(operation, duration=None, details='')¶
- log_operation_start(operation, details='')¶
- log_verbose(message)¶
- log_version_warning()¶
Log version consistency warnings if any exist.
- log_very_verbose(message)¶
- property logger: Logger¶
Return the logger configured for the verbosity mixin consumer.
- quiet: bool = False¶
- run_query(query_id, connection, params=None, fetch_results=False)¶
Execute single query and return timing and results.
- Parameters:
query_id (int | str) – ID of the query to execute
connection (DatabaseConnection) – Database connection to execute query on
params (dict[str, Any] | None) – Optional parameters for query customization
fetch_results (bool) – Whether to fetch and return query results
- Returns:
query_id: Executed query ID
execution_time: Time taken to execute query in seconds
query_text: Executed query text
results: Query results if fetch_results=True, otherwise None
row_count: Number of rows returned (if results fetched)
- Return type:
Dictionary containing
- Raises:
ValueError – If query_id is invalid
Exception – If query execution fails
- run_with_platform(platform_adapter, **run_config)¶
Run complete benchmark using platform-specific optimizations.
This method provides a unified interface for running benchmarks using database platform adapters that handle connection management, data loading optimizations, and query execution.
This is the standard method that all benchmarks should support for integration with the CLI and other orchestration tools.
- Parameters:
platform_adapter – Platform adapter instance (e.g., DuckDBAdapter)
**run_config – Configuration options: - categories: List of query categories to run (if benchmark supports) - query_subset: List of specific query IDs to run - connection: Connection configuration - benchmark_type: Type hint for optimizations (‘olap’, ‘oltp’, etc.)
- Returns:
BenchmarkResults object with execution results
Example
from benchbox.platforms import DuckDBAdapter
benchmark = SomeBenchmark(scale_factor=0.1) adapter = DuckDBAdapter() results = benchmark.run_with_platform(adapter)
- setup_database(connection)¶
Set up database with schema and data.
Creates necessary database schema and loads benchmark data into the database.
- Parameters:
connection (DatabaseConnection) – Database connection to set up
- Raises:
ValueError – If data generation fails
Exception – If database setup fails
- translate_query(query_id, dialect)¶
Translate a query to a specific SQL dialect.
- Parameters:
query_id (int | str) – The ID of the query to translate
dialect (str) – The target SQL dialect
- Returns:
The translated query string
- Raises:
ValueError – If the query_id is invalid
ImportError – If sqlglot is not installed
ValueError – If the dialect is not supported
- Return type:
str
- verbose: bool = False¶
- verbose_enabled: bool = False¶
- verbose_level: int = 0¶
- property verbosity_settings: VerbositySettings¶
Return the current verbosity settings.
- very_verbose: bool = False¶
Constructor:
WritePrimitives(
scale_factor: float = 1.0,
output_dir: Optional[Union[str, Path]] = None,
**kwargs
)
Parameters:
scale_factor (float): TPC-H scale factor (must match TPC-H data)
output_dir (str | Path | None): Output directory (defaults to TPC-H path)
Lifecycle Methods¶
- setup(connection, force=False) dict¶
Create staging tables and populate from TPC-H base tables.
Parameters:
connection: Database connection
force (bool): If True, drop existing staging tables first
Returns: Dict with keys
success,tables_created,tables_populatedRaises:
RuntimeErrorif TPC-H tables don’t existExample:
setup_result = bench.setup(conn, force=True) print(f"Tables created: {setup_result['tables_created']}") print(f"Rows: {setup_result['tables_populated']}")
- is_setup(connection) bool¶
Check if staging tables are initialized and ready.
Example:
if bench.is_setup(conn): print("Ready to execute operations")
- reset(connection) None¶
Truncate and repopulate staging tables to initial state.
Example:
# Reset after validation failures bench.reset(conn)
- teardown(connection) None¶
Drop all staging tables.
Example:
# Cleanup when done bench.teardown(conn)
Operation Execution Methods¶
- execute_operation(operation_id, connection, use_transaction=True) OperationResult¶
Execute a single write operation with validation and cleanup.
Parameters:
operation_id (str): Operation ID (e.g., “insert_single_row”)
connection: Database connection
use_transaction (bool): Wrap in transaction for rollback cleanup
Returns:
OperationResultwith execution metricsRaises:
ValueErrorif operation not foundExample:
result = bench.execute_operation("insert_single_row", conn) print(f"Operation: {result.operation_id}") print(f"Success: {result.success}") print(f"Rows affected: {result.rows_affected}") print(f"Write time: {result.write_duration_ms:.2f}ms") print(f"Validation: {result.validation_passed}")
- run_benchmark(connection, operation_ids=None, categories=None) list[OperationResult]¶
Run multiple operations.
Parameters:
connection: Database connection
operation_ids (list[str] | None): List of specific operation IDs
categories (list[str] | None): List of categories (e.g., [“insert”, “update”])
Returns: List of
OperationResultobjectsExample:
# Run all operations results = bench.run_benchmark(conn) # Run only INSERT operations insert_results = bench.run_benchmark(conn, categories=["insert"]) # Run specific operations specific_ops = ["insert_single_row", "update_single_row_pk"] results = bench.run_benchmark(conn, operation_ids=specific_ops)
Operation Query Methods¶
- get_all_operations() dict[str, WriteOperation]¶
Get all available operations.
Example:
operations = bench.get_all_operations() print(f"Total operations: {len(operations)}")
- get_operation(operation_id) WriteOperation¶
Get specific operation by ID.
Example:
op = bench.get_operation("insert_single_row") print(f"Category: {op.category}")
- get_operations_by_category(category) dict[str, WriteOperation]¶
Get operations filtered by category.
Parameters:
category (str): Category name (“insert”, “update”, “delete”, “ddl”, “transaction”)
Example:
insert_ops = bench.get_operations_by_category("insert") print(f"INSERT operations: {len(insert_ops)}")
- get_operation_categories() list[str]¶
Get list of available categories.
Example:
categories = bench.get_operation_categories() # Output: ['insert', 'update', 'delete', 'ddl', 'transaction']
- get_queries(dialect=None) dict[str, str]¶
Get all write operation SQL.
Example:
queries = bench.get_queries() print(queries["insert_single_row"])
- get_query(query_id, **kwargs) str¶
Get a specific write operation SQL.
Example:
sql = bench.get_query("insert_single_row")
Schema Methods¶
- get_schema(dialect='standard') dict[str, dict]¶
Get staging table schema definitions.
Example:
schema = bench.get_schema(dialect="duckdb") for table, definition in schema.items(): print(f"{table}: {definition['columns']}")
- get_create_tables_sql(dialect='standard') str¶
Get SQL to create staging tables.
Example:
create_sql = bench.get_create_tables_sql(dialect="postgresql")
OperationResult Class¶
Result object returned by execute_operation and run_benchmark.
Fields:
operation_id (str): Operation identifier
success (bool): Whether operation succeeded
write_duration_ms (float): Write execution time in milliseconds
rows_affected (int): Number of rows affected (-1 if unknown)
validation_duration_ms (float): Validation time in milliseconds
validation_passed (bool): Whether validation passed
validation_results (list[dict]): Detailed validation results
cleanup_duration_ms (float): Cleanup time in milliseconds
cleanup_success (bool): Whether cleanup succeeded
error (str | None): Error message if operation failed
cleanup_warning (str | None): Warning for transaction cleanup failures
Operations Catalog¶
The benchmark includes 12 starter operations across 5 categories:
INSERT Operations (3)¶
insert_single_row: Single row INSERT with all columns
insert_batch_values_10: Batch INSERT with 10 rows
insert_select_simple: INSERT…SELECT with WHERE filter
UPDATE Operations (3)¶
update_single_row_pk: UPDATE single row by primary key
update_selective_10pct: UPDATE ~10% of rows
update_with_subquery: UPDATE with correlated subquery
DELETE Operations (2)¶
delete_single_row_pk: DELETE single row by primary key
delete_selective_10pct: DELETE ~10% of rows
DDL Operations (3)¶
ddl_create_table_simple: CREATE TABLE with simple schema
ddl_truncate_table_small: TRUNCATE small staging table
ddl_create_table_as_select_simple: CTAS with simple SELECT
TRANSACTION Operations (1)¶
transaction_commit_small: BEGIN + 10 writes + COMMIT
Staging Tables¶
The benchmark creates these staging tables:
Data Tables:
orders_stage: Copy of ORDERS for UPDATE/DELETE testing
lineitem_stage: Copy of LINEITEM for write testing
orders_new: Source for MERGE testing
orders_summary: Target for aggregated INSERT…SELECT
lineitem_enriched: Target for joined INSERT…SELECT
Metadata Tables:
write_ops_log: Audit log for all write operations
batch_metadata: Tracks batch operations
Usage Examples¶
Complete Workflow¶
from benchbox import TPCH, WritePrimitives
import duckdb
# 1. Load TPC-H data first
tpch = TPCH(scale_factor=1.0)
tpch.generate_data()
conn = duckdb.connect("benchmark.db")
adapter = tpch.create_adapter("duckdb")
adapter.load_data(tpch, conn, tpch.output_dir)
# 2. Setup Write Primitives
bench = WritePrimitives(scale_factor=1.0)
setup_result = bench.setup(conn, force=True)
print(f"Setup: {setup_result['success']}")
print(f"Tables: {setup_result['tables_created']}")
# 3. Execute single operation
result = bench.execute_operation("insert_single_row", conn)
print(f"Success: {result.success}")
print(f"Time: {result.write_duration_ms:.2f}ms")
print(f"Validation: {result.validation_passed}")
# 4. Run all operations
results = bench.run_benchmark(conn)
print(f"Total: {len(results)}")
successful = [r for r in results if r.success]
print(f"Successful: {len(successful)}")
# 5. Cleanup
bench.teardown(conn)
Category-Based Testing¶
# Test each category separately
categories = bench.get_operation_categories()
for category in categories:
print(f"\nTesting {category.upper()} operations:")
results = bench.run_benchmark(conn, categories=[category])
for result in results:
status = "✓" if result.success else "✗"
print(f" {status} {result.operation_id}: {result.write_duration_ms:.2f}ms")
# Reset between categories
bench.reset(conn)
Performance Analysis¶
import time
from statistics import mean, median
# Run multiple iterations for stable timing
operation_id = "insert_single_row"
iterations = 10
times = []
for i in range(iterations):
result = bench.execute_operation(operation_id, conn)
if result.success:
times.append(result.write_duration_ms)
# Reset between iterations
if i < iterations - 1:
bench.reset(conn)
print(f"Operation: {operation_id}")
print(f"Iterations: {iterations}")
print(f"Mean: {mean(times):.2f}ms")
print(f"Median: {median(times):.2f}ms")
print(f"Min: {min(times):.2f}ms")
print(f"Max: {max(times):.2f}ms")
Error Handling and Recovery¶
# Handle operation failures
result = bench.execute_operation("insert_batch_values_10", conn)
if not result.success:
print(f"Operation failed: {result.error}")
if result.cleanup_warning:
print(f"Cleanup warning: {result.cleanup_warning}")
# Reset to recover
print("Resetting staging tables...")
bench.reset(conn)
# Handle validation failures
if not result.validation_passed:
print("Validation failed:")
for val_result in result.validation_results:
if not val_result['passed']:
print(f" Query: {val_result['query_id']}")
print(f" Expected: {val_result['expected_rows']}")
print(f" Actual: {val_result['actual_rows']}")
# Reset and retry
bench.reset(conn)
result = bench.execute_operation("insert_batch_values_10", conn)
Best Practices¶
Always Load TPC-H First
# Wrong: Will fail bench = WritePrimitives() bench.setup(conn) # Error: TPC-H tables not found # Correct: Load TPC-H first tpch = TPCH(scale_factor=1.0) tpch.generate_data() adapter = tpch.create_adapter("duckdb") adapter.load_data(tpch, conn, tpch.output_dir) bench = WritePrimitives(scale_factor=1.0) bench.setup(conn)
Check Setup Status
if not bench.is_setup(conn): bench.setup(conn)
Use Transactions for Cleanup
# Automatic rollback cleanup result = bench.execute_operation("insert_single_row", conn, use_transaction=True) # Manual cleanup (for DDL operations) result = bench.execute_operation("ddl_create_table_simple", conn, use_transaction=False)
Reset Between Test Runs
# Reset to ensure clean state for operation_id in ["insert_single_row", "update_single_row_pk"]: result = bench.execute_operation(operation_id, conn) bench.reset(conn) # Clean slate for next operation
Validate and Handle Errors
result = bench.execute_operation("insert_select_simple", conn) if result.success and result.validation_passed: print(f"Success: {result.write_duration_ms:.2f}ms") elif not result.success: print(f"Failed: {result.error}") elif not result.validation_passed: print("Validation failed, check data dependencies")
Common Issues¶
- Issue: “Required TPC-H table not found”
Cause: Write Primitives requires TPC-H base tables
Solution: Load TPC-H data before calling setup()
- Issue: “Staging tables not initialized”
Cause: Trying to execute before calling setup()
Solution: Call bench.setup(conn) first
- Issue: Transaction cleanup failures
Cause: Some operations can’t be rolled back (DDL)
Solution: Use bench.reset(conn) to restore clean state
- Issue: Validation failures
Cause: Data-dependent operations or constraint violations
Solution: Check validation_results for details, reset if needed
- Issue: rows_affected = -1
Cause: Some databases (DuckDB) don’t return rowcount
Impact: Normal behavior, doesn’t affect operation success
See Also¶
Write Primitives Benchmark - Write Read Primitives benchmark guide
primitives - Read primitives benchmark
TPC-H Benchmark API - TPC-H benchmark (data source)
Base Benchmark API - Base benchmark interface
Future Expansion¶
The catalog can be expanded to include all 101 planned operations:
More INSERT variants (12 total planned)
More UPDATE variants (15 total planned)
More DELETE variants (12 total planned)
BULK_LOAD operations (24 planned) - CSV, Parquet, compressions
MERGE/UPSERT operations (18 planned)
More DDL operations (12 total planned)
More TRANSACTION operations (8 total planned)
See benchbox/core/write_primitives/README.md for full plan.