Results API¶
Complete reference for working with benchmark results, including the BenchmarkResults object, query results, and result analysis.
Overview¶
The results API provides structured access to benchmark execution data:
BenchmarkResults: Top-level container for all execution data
QueryResult: Individual query execution details
ExecutionPhases: Breakdown of setup, execution, and validation phases
Validation: Result correctness verification
All result objects support JSON serialization for storage and analysis.
Quick Start¶
Basic result access:
from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter
from benchbox.core.results.models import BenchmarkResults
# Run benchmark
benchmark = TPCH(scale_factor=0.1)
adapter = DuckDBAdapter()
results = benchmark.run_with_platform(adapter)
# Access result data
print(f"Benchmark: {results.benchmark_name}")
print(f"Platform: {results.platform}")
print(f"Total time: {results.total_execution_time:.2f}s")
print(f"Success rate: {results.successful_queries}/{results.total_queries}")
# Iterate query results
for qr in results.query_results:
print(f"{qr.query_id}: {qr.execution_time:.3f}s ({qr.status})")
# Save to file
results.to_json_file("results.json")
Core Classes¶
BenchmarkResults¶
- class BenchmarkResults(benchmark_name: 'str', platform: 'str', scale_factor: 'float', execution_id: 'str', timestamp: 'datetime', duration_seconds: 'float', total_queries: 'int', successful_queries: 'int', failed_queries: 'int', query_results: 'list[dict[str, Any]]'=<factory>, total_execution_time: 'float' = 0.0, average_query_time: 'float' = 0.0, data_loading_time: 'float' = 0.0, schema_creation_time: 'float' = 0.0, total_rows_loaded: 'int' = 0, data_size_mb: 'float' = 0.0, table_statistics: 'dict[str, int]'=<factory>, per_query_timings: 'list[dict[str, Any]] | None'=<factory>, execution_phases: 'ExecutionPhases | None' = None, query_definitions: 'dict[str, dict[str, QueryDefinition]] | None'=None, test_execution_type: 'str' = 'standard', power_at_size: 'float | None' = None, throughput_at_size: 'float | None' = None, qphh_at_size: 'float | None' = None, geometric_mean_execution_time: 'float | None' = None, validation_status: 'str' = 'PASSED', validation_details: 'dict[str, Any] | None'=None, platform_info: 'dict[str, Any] | None'=None, platform_metadata: 'dict[str, Any] | None'=None, tunings_applied: 'dict[str, Any] | None'=None, tuning_config_hash: 'str | None' = None, tuning_source_file: 'str | None' = None, tuning_validation_status: 'str' = 'NOT_VALIDATED', tuning_metadata_saved: 'bool' = False, system_profile: 'dict[str, Any] | None'=None, database_name: 'str | None' = None, anonymous_machine_id: 'str | None' = None, execution_metadata: 'dict[str, Any] | None'=None, performance_characteristics: 'dict[str, Any]'=<factory>, performance_summary: 'dict[str, Any]'=<factory>, cost_summary: 'dict[str, Any] | None'=None, driver_package: 'str | None' = None, driver_version_requested: 'str | None' = None, driver_version_resolved: 'str | None' = None, driver_auto_install: 'bool' = False, output_filename: 'str | None' = None, resource_utilization: 'dict[str, Any] | None'=None, _benchmark_id_override: 'str | None' = None, summary_metrics: 'dict[str, Any]'=<factory>, query_subset: 'list[str] | None' = None, concurrency_level: 'int | None' = None, benchmark_version: 'str | None' = None, query_plans_captured: 'int' = 0, plan_capture_failures: 'int' = 0, plan_capture_errors: 'list[dict[str, str]]'=<factory>, plan_comparison_summary: 'dict[str, Any] | None'=None, total_plan_capture_time_ms: 'float' = 0.0, avg_plan_capture_overhead_pct: 'float' = 0.0, max_plan_capture_time_ms: 'float' = 0.0)[source]¶
Bases:
object- benchmark_name: str¶
- platform: str¶
- scale_factor: float¶
- execution_id: str¶
- timestamp: datetime¶
- duration_seconds: float¶
- total_queries: int¶
- successful_queries: int¶
- failed_queries: int¶
- query_results: list[dict[str, Any]]¶
- total_execution_time: float = 0.0¶
- average_query_time: float = 0.0¶
- data_loading_time: float = 0.0¶
- schema_creation_time: float = 0.0¶
- total_rows_loaded: int = 0¶
- data_size_mb: float = 0.0¶
- table_statistics: dict[str, int]¶
- per_query_timings: list[dict[str, Any]] | None¶
- execution_phases: ExecutionPhases | None = None¶
- query_definitions: dict[str, dict[str, QueryDefinition]] | None = None¶
- test_execution_type: str = 'standard'¶
- power_at_size: float | None = None¶
- throughput_at_size: float | None = None¶
- qphh_at_size: float | None = None¶
- geometric_mean_execution_time: float | None = None¶
- validation_status: str = 'PASSED'¶
- validation_details: dict[str, Any] | None = None¶
- platform_info: dict[str, Any] | None = None¶
- platform_metadata: dict[str, Any] | None = None¶
- tunings_applied: dict[str, Any] | None = None¶
- tuning_config_hash: str | None = None¶
- tuning_source_file: str | None = None¶
- tuning_validation_status: str = 'NOT_VALIDATED'¶
- tuning_metadata_saved: bool = False¶
- system_profile: dict[str, Any] | None = None¶
- database_name: str | None = None¶
- anonymous_machine_id: str | None = None¶
- execution_metadata: dict[str, Any] | None = None¶
- performance_characteristics: dict[str, Any]¶
- performance_summary: dict[str, Any]¶
- cost_summary: dict[str, Any] | None = None¶
- driver_package: str | None = None¶
- driver_version_requested: str | None = None¶
- driver_version_resolved: str | None = None¶
- driver_auto_install: bool = False¶
- output_filename: str | None = None¶
- resource_utilization: dict[str, Any] | None = None¶
- summary_metrics: dict[str, Any]¶
- query_subset: list[str] | None = None¶
- concurrency_level: int | None = None¶
- benchmark_version: str | None = None¶
- query_plans_captured: int = 0¶
- plan_capture_failures: int = 0¶
- plan_capture_errors: list[dict[str, str]]¶
- plan_comparison_summary: dict[str, Any] | None = None¶
- total_plan_capture_time_ms: float = 0.0¶
- avg_plan_capture_overhead_pct: float = 0.0¶
- max_plan_capture_time_ms: float = 0.0¶
- property benchmark_id: str¶
Return benchmark identifier derived from benchmark name.
- __init__(benchmark_name, platform, scale_factor, execution_id, timestamp, duration_seconds, total_queries, successful_queries, failed_queries, query_results=<factory>, total_execution_time=0.0, average_query_time=0.0, data_loading_time=0.0, schema_creation_time=0.0, total_rows_loaded=0, data_size_mb=0.0, table_statistics=<factory>, per_query_timings=<factory>, execution_phases=None, query_definitions=None, test_execution_type='standard', power_at_size=None, throughput_at_size=None, qphh_at_size=None, geometric_mean_execution_time=None, validation_status='PASSED', validation_details=None, platform_info=None, platform_metadata=None, tunings_applied=None, tuning_config_hash=None, tuning_source_file=None, tuning_validation_status='NOT_VALIDATED', tuning_metadata_saved=False, system_profile=None, database_name=None, anonymous_machine_id=None, execution_metadata=None, performance_characteristics=<factory>, performance_summary=<factory>, cost_summary=None, driver_package=None, driver_version_requested=None, driver_version_resolved=None, driver_auto_install=False, output_filename=None, resource_utilization=None, _benchmark_id_override=None, summary_metrics=<factory>, query_subset=None, concurrency_level=None, benchmark_version=None, query_plans_captured=0, plan_capture_failures=0, plan_capture_errors=<factory>, plan_comparison_summary=None, total_plan_capture_time_ms=0.0, avg_plan_capture_overhead_pct=0.0, max_plan_capture_time_ms=0.0)¶
Key Attributes:
class BenchmarkResults:
# Identification
benchmark_name: str # e.g., "TPC-H"
platform: str # e.g., "DuckDB"
execution_id: str # Unique run identifier
timestamp: datetime # ISO 8601 timestamp
# Configuration
scale_factor: float # Data size multiplier
test_execution_type: str # "standard", "power", "throughput"
# Timing Summary
duration_seconds: float # Total execution time
total_execution_time: float # Query execution time only
average_query_time: float # Mean query time
data_loading_time: float # Time to load data
schema_creation_time: float # Time to create schema
# Query Statistics
total_queries: int # Queries attempted
successful_queries: int # Queries succeeded
failed_queries: int # Queries failed
# Detailed Results
query_results: List[QueryResult]
query_definitions: Dict[str, QueryDefinition]
execution_phases: ExecutionPhases
# Validation
validation_status: str # "PASSED", "FAILED", "SKIPPED"
validation_details: dict
# System Context
system_profile: dict # Hardware/software info
platform_info: dict # Platform-specific metadata
Note
Individual query results are embedded within the BenchmarkResult object.
See the query_results attribute above for query-level details.
Structure:
class QueryResult:
query_id: str # e.g., "q1", "query15"
stream_id: str # "stream_1", "stream_2", etc.
execution_time: float # Seconds
status: str # "SUCCESS", "FAILED", "SKIPPED"
row_count: int # Rows returned
data_scanned_bytes: int # Bytes scanned (if available)
error_message: Optional[str] # Error details if failed
query_text: str # Executed SQL
parameters: Optional[dict] # Query parameters
start_time: datetime # Query start
end_time: datetime # Query completion
ExecutionPhases¶
- class ExecutionPhases(setup: 'SetupPhase', power_test: 'PowerTestPhase | None' = None, throughput_test: 'ThroughputTestPhase | None' = None, maintenance_test: 'MaintenanceTestPhase | None' = None)[source]¶
Bases:
object- setup: SetupPhase¶
- power_test: PowerTestPhase | None = None¶
- throughput_test: ThroughputTestPhase | None = None¶
- maintenance_test: MaintenanceTestPhase | None = None¶
- __init__(setup, power_test=None, throughput_test=None, maintenance_test=None)¶
Working with Results¶
Loading Results¶
From JSON file:
from benchbox.core.results.models import BenchmarkResults
# Load from file
results = BenchmarkResults.from_json_file("results.json")
# Load from JSON string
import json
with open("results.json") as f:
json_data = json.load(f)
results = BenchmarkResults.from_dict(json_data)
Saving Results¶
To JSON file:
# Save to file
results.to_json_file("results.json")
# Get as JSON string
json_str = results.to_json()
# Get as dictionary
result_dict = results.to_dict()
# Pretty-print JSON
import json
print(json.dumps(result_dict, indent=2))
Analyzing Results¶
Query Performance Analysis¶
# Find slowest queries
sorted_results = sorted(
results.query_results,
key=lambda qr: qr.execution_time,
reverse=True
)
print("Top 5 slowest queries:")
for qr in sorted_results[:5]:
print(f"{qr.query_id}: {qr.execution_time:.3f}s")
# Calculate percentiles
import statistics
times = [qr.execution_time for qr in results.query_results
if qr.status == "SUCCESS"]
print(f"Median: {statistics.median(times):.3f}s")
print(f"Mean: {statistics.mean(times):.3f}s")
print(f"Stdev: {statistics.stdev(times):.3f}s")
Geometric Mean Calculation¶
Standard for TPC benchmarks:
import math
def geometric_mean(values):
"""Calculate geometric mean of execution times."""
if not values:
return 0.0
product = math.prod(values)
return product ** (1.0 / len(values))
# Calculate for successful queries only
times = [qr.execution_time for qr in results.query_results
if qr.status == "SUCCESS"]
geomean = geometric_mean(times)
print(f"Geometric mean: {geomean:.3f}s")
Result Comparison¶
Compare two benchmark runs:
def compare_results(baseline, current, threshold=1.1):
"""Compare two result sets and identify regressions."""
baseline_times = {
qr.query_id: qr.execution_time
for qr in baseline.query_results
if qr.status == "SUCCESS"
}
current_times = {
qr.query_id: qr.execution_time
for qr in current.query_results
if qr.status == "SUCCESS"
}
regressions = []
improvements = []
for qid in baseline_times:
if qid in current_times:
ratio = current_times[qid] / baseline_times[qid]
if ratio > threshold:
regressions.append({
"query_id": qid,
"baseline": baseline_times[qid],
"current": current_times[qid],
"ratio": ratio
})
elif ratio < (1 / threshold):
improvements.append({
"query_id": qid,
"baseline": baseline_times[qid],
"current": current_times[qid],
"ratio": ratio
})
return {"regressions": regressions, "improvements": improvements}
# Usage
baseline = BenchmarkResults.from_json_file("baseline.json")
current = BenchmarkResults.from_json_file("current.json")
comparison = compare_results(baseline, current, threshold=1.1)
if comparison["regressions"]:
print(f"Found {len(comparison['regressions'])} regressions:")
for r in comparison["regressions"]:
print(f" {r['query_id']}: {r['ratio']:.2f}x slower")
Export to DataFrame¶
For analysis in pandas:
import pandas as pd
# Convert to DataFrame
df = pd.DataFrame([
{
"query_id": qr.query_id,
"execution_time": qr.execution_time,
"status": qr.status,
"row_count": qr.row_count,
"stream_id": qr.stream_id,
}
for qr in results.query_results
])
# Analyze
print(df.describe())
print(df.groupby("status").size())
# Filter and visualize
successful = df[df["status"] == "SUCCESS"]
successful.plot(x="query_id", y="execution_time", kind="bar")
Validation Results¶
Check Result Correctness¶
# Check validation status
if results.validation_status == "PASSED":
print("All validation checks passed")
elif results.validation_status == "FAILED":
print("Validation failed:")
for check, details in results.validation_details.items():
if details.get("status") == "FAILED":
print(f" {check}: {details}")
# Access specific validation details
if "row_count" in results.validation_details:
row_count_check = results.validation_details["row_count"]
print(f"Row count validation: {row_count_check['status']}")
Row Count Validation¶
def validate_row_counts(results, expected_counts):
"""Validate query result row counts."""
mismatches = []
for qr in results.query_results:
if qr.query_id in expected_counts:
expected = expected_counts[qr.query_id]
if qr.row_count != expected:
mismatches.append({
"query_id": qr.query_id,
"expected": expected,
"actual": qr.row_count
})
return mismatches
# TPC-H Q1 expected row count
expected_counts = {
"q1": 4, # 4 rows for standard parameters
"q6": 1, # 1 row (aggregation)
}
mismatches = validate_row_counts(results, expected_counts)
if mismatches:
print("Row count mismatches found:")
for m in mismatches:
print(f" {m['query_id']}: expected {m['expected']}, got {m['actual']}")
System Information¶
Access Execution Context¶
# System profile
print(f"OS: {results.system_profile['os']}")
print(f"CPU: {results.system_profile['cpu_model']}")
print(f"RAM: {results.system_profile['ram_gb']}GB")
print(f"Python: {results.system_profile['python_version']}")
print(f"BenchBox: {results.system_profile['benchbox_version']}")
# Platform information
print(f"Platform: {results.platform_info['platform_name']}")
print(f"Driver: {results.platform_info['driver_name']}")
print(f"Version: {results.platform_info['driver_version']}")
Execution Metadata¶
# Execution phases
phases = results.execution_phases
if phases.setup:
setup = phases.setup
print(f"Data generation: {setup.data_generation.duration_ms}ms")
print(f"Schema creation: {setup.schema_creation.duration_ms}ms")
print(f"Data loading: {setup.data_loading.duration_ms}ms")
print(f"Validation: {setup.validation.duration_ms}ms")
# Power test (if applicable)
if hasattr(phases, 'power_test') and phases.power_test:
power = phases.power_test
print(f"Power test geometric mean: {power.geometric_mean:.3f}s")
Performance Tracking¶
Time Series Analysis¶
Track performance over time:
from pathlib import Path
import pandas as pd
def load_historical_results(results_dir):
"""Load all results from directory into DataFrame."""
results_files = Path(results_dir).glob("**/*.json")
data = []
for file_path in results_files:
results = BenchmarkResults.from_json_file(str(file_path))
for qr in results.query_results:
data.append({
"timestamp": results.timestamp,
"query_id": qr.query_id,
"execution_time": qr.execution_time,
"status": qr.status,
"platform": results.platform,
"scale_factor": results.scale_factor
})
return pd.DataFrame(data)
# Usage
df = load_historical_results("benchmark_runs/")
# Calculate trends
trends = df.groupby(["query_id", "timestamp"])["execution_time"].mean()
print(trends)
Regression Detection¶
Automated regression alerting:
def detect_regressions(current_results, baseline_results, threshold=1.15):
"""Detect performance regressions vs baseline."""
alerts = []
baseline_map = {
qr.query_id: qr.execution_time
for qr in baseline_results.query_results
if qr.status == "SUCCESS"
}
for qr in current_results.query_results:
if qr.status != "SUCCESS":
continue
if qr.query_id in baseline_map:
baseline_time = baseline_map[qr.query_id]
ratio = qr.execution_time / baseline_time
if ratio > threshold:
alerts.append({
"severity": "HIGH" if ratio > 1.5 else "MEDIUM",
"query_id": qr.query_id,
"baseline": baseline_time,
"current": qr.execution_time,
"regression": f"{(ratio - 1) * 100:.1f}%"
})
return sorted(alerts, key=lambda x: x["current"], reverse=True)
# Usage
alerts = detect_regressions(current, baseline, threshold=1.15)
if alerts:
print(f"⚠️ {len(alerts)} performance regressions detected:")
for alert in alerts:
print(f" [{alert['severity']}] {alert['query_id']}: "
f"+{alert['regression']} regression")
Best Practices¶
Result Storage¶
Use descriptive file names:
import datetime timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"tpch_sf1_duckdb_{timestamp}.json" results.to_json_file(f"benchmark_runs/{filename}")
Version control results for important baselines:
git add benchmark_runs/baseline/tpch_sf1_duckdb.json git commit -m "Add TPC-H SF1 baseline"
Compress old results:
gzip benchmark_runs/archive/*.json
Result Analysis¶
Always check validation status before trusting results
Use geometric mean for TPC benchmarks (not arithmetic mean)
Compare like with like (same scale factor, system profile)
Account for variance with multiple runs
Performance Monitoring¶
Establish baseline with known-good configuration
Track over time to detect gradual degradation
Set alert thresholds (e.g., 15% regression)
Investigate outliers before dismissing as noise
See Also¶
Conceptual Documentation¶
Data Model - Complete data model reference
BenchBox Architecture - How results fit into BenchBox
Benchmarking Workflows - Result collection in workflows
API Reference¶
Base Benchmark API - Base benchmark interface
Python API Reference - Python API overview
API Reference - High-level API guide
Guides¶
Performance Monitoring - Performance monitoring
TPC Test Result Validation System - TPC compliance validation
Examples - Result analysis examples
External Resources¶
Benchmark Result Schema (v1.1) - JSON schema specification