TPC Patterns Usage Guide¶
This guide explains how to use the TPC patterns and utilities provided in benchbox.core.tpc_patterns for running TPC-H and TPC-DS benchmarks with proper stream execution, parameter management, and result aggregation.
Overview¶
The tpc_patterns module provides common test execution patterns and utilities that are shared between TPC-H and TPC-DS implementations. It follows TPC specification requirements for:
Stream independence and isolation: Each stream runs independently with its own parameters
Proper query parameter generation: Deterministic parameter generation with seeds
Concurrent execution management: Thread-safe concurrent query execution
Result aggregation and validation: Comprehensive result collection and validation
Core Components¶
1. StreamExecutor¶
Manages concurrent query stream execution with proper isolation.
from benchbox.core.tpc_patterns import StreamExecutor, create_tpc_stream_configs
# Create connection factory
def connection_factory():
return create_database_connection()
# Create stream executor
executor = StreamExecutor(connection_factory, max_concurrent_streams=4)
# Create stream configurations
stream_configs = create_tpc_stream_configs(
num_streams=2,
query_ids=[1, 2, 3, 4, 5],
base_seed=42,
permutation_mode="tpc_standard"
)
# Execute streams
results = executor.execute_streams(stream_configs, query_executor)
2. QueryPermutator¶
Creates TPC-compliant query permutations for stream execution.
from benchbox.core.tpc_patterns import QueryPermutator, PermutationConfig
# Create permutation configuration
config = PermutationConfig(
mode="tpc_standard", # or "random", "sequential"
seed=42,
ensure_unique=True
)
# Create permutator
permutator = QueryPermutator(config)
# Generate permutation
query_ids = [1, 2, 3, 4, 5]
permuted = permutator.generate_permutation(query_ids)
3. ParameterManager¶
Handles query parameter substitution with deterministic seeding.
from benchbox.core.tpc_patterns import ParameterManager
# Create parameter manager
manager = ParameterManager(base_seed=42)
# Generate parameters
params = manager.generate_parameters(
query_id=1,
stream_id=0,
scale_factor=1.0
)
4. TransactionManager¶
Manages database transactions and isolation levels.
from benchbox.core.tpc_patterns import TransactionManager
# Create transaction manager
transaction_manager = TransactionManager(connection)
# Use transaction context
with transaction_manager.transaction(isolation_level="READ_COMMITTED", timeout=30.0):
# Execute queries within transaction
cursor = connection.execute("SELECT * FROM table")
results = connection.fetchall(cursor)
5. TestRunner¶
Provides common test execution patterns.
from benchbox.core.tpc_patterns import TestRunner
# Create test runner
runner = TestRunner(connection_factory)
# Run single query test
result = runner.run_single_query_test(query_id=1, query_executor=executor)
# Run sequential test
results = runner.run_sequential_test([1, 2, 3], query_executor=executor)
# Run concurrent test
results = runner.run_concurrent_test(stream_configs, query_executor=executor)
6. ErrorHandler¶
Standardized error handling with retry policies.
from benchbox.core.tpc_patterns import ErrorHandler, RetryPolicy
# Create error handler
handler = ErrorHandler(
retry_policy=RetryPolicy.EXPONENTIAL_BACKOFF,
max_retries=3
)
# Handle errors
context = {'query_id': 1, 'stream_id': 0, 'retry_count': 0}
should_retry = handler.handle_error(exception, context)
7. ProgressTracker¶
Tracks test progress and provides logging.
from benchbox.core.tpc_patterns import ProgressTracker
# Create progress tracker
tracker = ProgressTracker(total_items=100, description="Processing queries")
# Update progress
tracker.update(completed=1, item_description="Query 1 completed")
# Get summary
summary = tracker.get_summary()
8. ResultAggregator¶
Combines results from multiple streams/queries.
from benchbox.core.tpc_patterns import ResultAggregator
# Create aggregator
aggregator = ResultAggregator()
# Add results
aggregator.add_stream_result(stream_result)
aggregator.add_query_result(query_result)
# Get aggregated results
results = aggregator.get_aggregated_results()
Usage Patterns¶
Pattern 1: Simple Sequential Execution¶
from benchbox.core.tpc_patterns import TestRunner, create_basic_query_executor
from benchbox import TPCH
# Create benchmark
benchmark = TPCH(scale_factor=1.0)
# Create connection factory
def connection_factory():
return create_database_connection()
# Create query executor
query_executor = create_basic_query_executor(benchmark)
# Create test runner
runner = TestRunner(connection_factory)
# Run sequential test
results = runner.run_sequential_test([1, 2, 3, 4, 5], query_executor)
Pattern 2: Concurrent Stream Execution¶
from benchbox.core.tpc_patterns import (
StreamExecutor, create_tpc_stream_configs, create_basic_query_executor
)
from benchbox import TPCH
# Create benchmark
benchmark = TPCH(scale_factor=1.0)
# Create connection factory
def connection_factory():
return create_database_connection()
# Create query executor
query_executor = create_basic_query_executor(benchmark)
# Create stream configurations
stream_configs = create_tpc_stream_configs(
num_streams=4,
query_ids=list(range(1, 23)), # TPC-H queries 1-22
base_seed=42,
permutation_mode="tpc_standard"
)
# Execute streams
executor = StreamExecutor(connection_factory, max_concurrent_streams=4)
results = executor.execute_streams(stream_configs, query_executor)
Pattern 3: Custom Query Execution with Validation¶
from benchbox.core.tpc_patterns import (
TestRunner, create_basic_query_executor, create_validation_function
)
# Create validation function
expected_results = {
1: {"row_count": 4},
2: {"row_count": 460},
3: {"row_count": 10}
}
validator = create_validation_function(expected_results)
# Run validation test
results = runner.run_validation_test(
query_ids=[1, 2, 3],
query_executor=query_executor,
validator=validator
)
Pattern 4: Custom Parameter Generation¶
from benchbox.core.tpc_patterns import ParameterManager
class CustomParameterManager(ParameterManager):
def _generate_query_parameters(self, query_id, rng, scale_factor):
# Custom parameter generation logic
params = super()._generate_query_parameters(query_id, rng, scale_factor)
# Add custom parameters
params.update({
'custom_param1': rng.randint(1, 100),
'custom_param2': rng.choice(['A', 'B', 'C']),
'date_range': f"{rng.randint(1990, 2020)}-{rng.randint(1, 12):02d}-{rng.randint(1, 28):02d}"
})
return params
# Use custom parameter manager
manager = CustomParameterManager(base_seed=42)
params = manager.generate_parameters(query_id=1, stream_id=0)
Configuration Options¶
Stream Configuration¶
from benchbox.core.tpc_patterns import StreamConfig, PermutationConfig, RetryPolicy
stream_config = StreamConfig(
stream_id=0,
query_ids=[1, 2, 3, 4, 5],
permutation_config=PermutationConfig(
mode="tpc_standard",
seed=42,
ensure_unique=True
),
parameter_seed=1042,
isolation_level="READ_COMMITTED",
timeout=300.0, # 5 minutes
retry_policy=RetryPolicy.EXPONENTIAL_BACKOFF,
max_retries=3
)
Permutation Configuration¶
from benchbox.core.tpc_patterns import PermutationConfig
# Sequential permutation
config = PermutationConfig(mode="sequential")
# Random permutation with seed
config = PermutationConfig(mode="random", seed=42)
# TPC standard permutation
config = PermutationConfig(mode="tpc_standard", seed=42, ensure_unique=True)
Result Structure¶
Query Result¶
{
'query_id': 1,
'stream_id': 0,
'execution_time': 0.125,
'status': 'completed',
'error': None,
'row_count': 4,
'retry_count': 0,
'parameters': {...}
}
Aggregated Results¶
{
'total_streams': 2,
'total_queries': 10,
'successful_queries': 9,
'failed_queries': 1,
'success_rate': 0.9,
'total_execution_time': 2.5,
'average_execution_time': 0.25,
'min_execution_time': 0.1,
'max_execution_time': 0.5,
'stream_statistics': {
0: {
'total_queries': 5,
'successful_queries': 5,
'failed_queries': 0,
'total_execution_time': 1.2,
'status': 'completed'
},
1: {
'total_queries': 5,
'successful_queries': 4,
'failed_queries': 1,
'total_execution_time': 1.3,
'status': 'completed'
}
},
'query_results': [...]
}
Error Handling¶
The module provides systematic error handling with retry policies:
NONE: No retries
FIXED_DELAY: Fixed delay between retries
LINEAR_BACKOFF: Increasing delay (1s, 2s, 3s, …)
EXPONENTIAL_BACKOFF: Exponential delay (1s, 2s, 4s, 8s, …)
Retryable errors are automatically detected based on common patterns like:
Connection timeouts
Network errors
Database locks
Temporary failures
Performance Considerations¶
Connection Pooling: Use connection pooling for better performance
Concurrent Streams: Limit concurrent streams based on database capacity
Memory Management: Monitor memory usage for large result sets
Timeout Configuration: Set appropriate timeouts for long-running queries
Retry Strategy: Choose appropriate retry policies for your environment
Best Practices¶
Use Deterministic Seeds: Always use seeds for reproducible results
Proper Transaction Management: Use appropriate isolation levels
Monitor Progress: Use progress tracking for long-running tests
Validate Results: Implement proper result validation
Error Logging: Enable systematic logging for debugging
Resource Cleanup: Ensure connections are properly closed
Integration with Existing Benchmarks¶
The TPC patterns can be easily integrated with existing TPC-H and TPC-DS benchmarks:
# With TPC-H
from benchbox import TPCH
from benchbox.core.tpc_patterns import create_basic_query_executor
benchmark = TPCH(scale_factor=1.0)
query_executor = create_basic_query_executor(benchmark)
# With TPC-DS
from benchbox.tpcds import TPCDSBenchmark
from benchbox.core.tpc_patterns import create_basic_query_executor
benchmark = TPCDSBenchmark(scale_factor=1.0)
query_executor = create_basic_query_executor(benchmark)
This provides a consistent interface for both benchmarks while maintaining their specific implementations.