"""
Copyright 2026 Joe Harris / BenchBox Project
Licensed under the MIT License. See LICENSE file in the project root for details.
Result Display Utilities
This module provides reusable result formatting and display functions
for benchmark execution results and system information.
"""
from __future__ import annotations
from typing import Any
from benchbox.utils import format_duration
[docs]
def display_results(result_data: dict[str, Any], verbosity: int = 0) -> None:
"""Display benchmark results in a standardized format.
Args:
result_data: Dictionary containing benchmark results
verbosity: Verbosity level (0=minimal, 1=detailed, 2=verbose)
"""
benchmark_name = result_data.get("benchmark", "unknown").upper()
print(f"Benchmark: {benchmark_name}")
print(f"Scale Factor: {result_data.get('scale_factor', 'unknown')}")
print(f"Platform: {result_data.get('platform', 'unknown')}")
success = result_data.get("success", False)
print(f"\nBenchmark Status: {'PASSED' if success else 'FAILED'}")
if verbosity > 0 and "total_duration" in result_data:
print(f"Total Duration: {format_duration(result_data['total_duration'])}")
if "schema_creation_time" in result_data:
print(f"Schema Creation: {format_duration(result_data['schema_creation_time'])}")
if "data_loading_time" in result_data:
print(f"Data Loading: {format_duration(result_data['data_loading_time'])}")
if "successful_queries" in result_data:
print(f"Queries: {result_data['successful_queries']}/{result_data['total_queries']} successful")
if success and "total_execution_time" in result_data:
print(f"Query Execution Time: {format_duration(result_data['total_execution_time'])}")
if "average_query_time" in result_data:
print(f"Average Query Time: {format_duration(result_data['average_query_time'])}")
if success:
print(f"\n✅ {benchmark_name} benchmark completed!")
else:
print(f"\n❌ {benchmark_name} benchmark failed.")
def display_platform_list(platforms: dict[str, bool], get_requirements_func=None) -> None:
"""Display list of available platforms.
Args:
platforms: Dictionary mapping platform names to availability status
get_requirements_func: Optional function to get platform requirements
"""
print("Available platforms:")
for platform, available in platforms.items():
status = "✅ Available" if available else "❌ Not available"
if not available and get_requirements_func:
requirements = get_requirements_func(platform)
status += f" ({requirements})"
print(f" {platform}: {status}")
def display_benchmark_list(benchmark_classes: dict[str, Any]) -> None:
"""Display list of available benchmarks.
Args:
benchmark_classes: Dictionary mapping benchmark names to classes
"""
print("Available benchmarks:")
for benchmark_name in sorted(benchmark_classes.keys()):
benchmark_class = benchmark_classes[benchmark_name]
description = (
getattr(benchmark_class, "__doc__", "").split("\n")[0]
if benchmark_class.__doc__
else "No description available"
)
print(f" {benchmark_name}: {description}")
def display_configuration_summary(config: dict[str, Any], verbosity: int = 0) -> None:
"""Display configuration summary.
Args:
config: Unified configuration dictionary
verbosity: Verbosity level
"""
if verbosity == 0:
return
print(f"Benchmark: {config['benchmark'].upper()} @ SF{config['scale_factor']} on {config['platform'].title()}")
print(f"Phases: {config.get('phases', 'unknown')}")
if verbosity > 0:
tuning_mode = config.get("tuning_mode", "unknown")
print(f"Tuning mode: {tuning_mode}")
if config.get("tuning_config"):
tuning_type = config["tuning_config"].get("_metadata", {}).get("configuration_type", "unknown")
print(f"Tuning type: {tuning_type}")
def display_verbose_config_feedback(config: dict[str, Any], platform: str) -> None:
"""Display verbose configuration feedback after adapter creation.
Args:
config: Platform adapter configuration
platform: Platform name
"""
if platform in ["duckdb", "sqlite"]:
db_path = config.get("database_path", "unknown")
print(f"Database file: {db_path}")
elif platform == "databricks":
schema_name = config.get("schema", "unknown")
catalog_name = config.get("catalog", "unknown")
print(f"Database schema: {catalog_name}.{schema_name}")
elif platform == "bigquery":
dataset_id = config.get("dataset_id", "unknown")
project_id = config.get("project_id", "unknown")
print(f"BigQuery dataset: {project_id}.{dataset_id}")
elif platform in ["redshift", "snowflake"]:
database_name = config.get("database", "unknown")
print(f"Database name: {database_name}")
elif platform == "clickhouse":
data_path = config.get("data_path", "unknown")
print(f"Data path: {data_path}")
def print_phase_header(phase: str) -> None:
"""Print a formatted phase header.
Args:
phase: Phase name to display
"""
print(f"\n--- Running Phase: {phase.title()} ---")
def print_completion_message(phase: str, output_location: str | None = None) -> None:
"""Print a completion message for a phase.
Args:
phase: Phase that completed
output_location: Optional output location to display
"""
if output_location:
print(f"✅ {phase.title()} phase complete in {output_location}")
else:
print(f"✅ {phase.title()} phase complete.")
def print_dry_run_summary(result, output_dir, saved_files=None) -> None:
"""Print a human-friendly summary of dry run artifacts and insights."""
benchmark_info = result.benchmark_config or {}
preview = result.query_preview or {}
platform_info = result.platform_config or {}
resources = result.estimated_resources or {}
query_count = preview.get("query_count") or len(result.queries) or 0
print("✅ Dry run completed successfully!")
print(f"Artifacts directory: {output_dir}")
def _format_metric(value, suffix):
try:
return f"{float(value):.2f} {suffix}"
except (TypeError, ValueError):
return f"{value} {suffix}" if value is not None else None
if saved_files:
print("Artifacts:")
for label, path in sorted(saved_files.items()):
print(f" • {label}: {path}")
else:
print("Artifacts: JSON, YAML, and per-query SQL files emitted.")
print("\nBenchmark Overview:")
display_name = benchmark_info.get("display_name") or benchmark_info.get("name", "unknown").upper()
print(f" • Benchmark: {display_name}")
if "scale_factor" in benchmark_info:
print(f" • Scale factor: {benchmark_info['scale_factor']}")
execution_context = (
preview.get("execution_context") or benchmark_info.get("test_execution_type") or "standard execution"
)
print(f" • Execution mode: {execution_context}")
print(f" • Queries prepared: {query_count}")
if preview.get("estimated_time"):
print(f" • Estimated run time: {preview['estimated_time']}")
if preview.get("data_size_mb"):
print(f" • Estimated data size: {preview['data_size_mb']} MB")
if platform_info:
print("\nPlatform Summary:")
platform_name = platform_info.get("platform_name") or platform_info.get("platform_type")
if platform_name:
print(f" • Platform: {platform_name}")
connection_mode = platform_info.get("connection_mode") or platform_info.get("connection_type")
if connection_mode:
print(f" • Connection: {connection_mode}")
configuration = platform_info.get("configuration") or {}
if configuration:
interesting_keys = [
"database_path",
"memory_limit",
"schema",
"catalog",
"staging_root",
]
for key in interesting_keys:
if key in configuration and configuration[key] is not None:
print(f" • {key.replace('_', ' ').title()}: {configuration[key]}")
if resources:
print("\nResource Estimates:")
data_size = _format_metric(resources.get("estimated_data_size_mb"), "MB")
if data_size:
print(f" • Estimated data size: {data_size}")
memory_usage = _format_metric(resources.get("estimated_memory_usage_mb"), "MB")
if memory_usage:
print(f" • Estimated memory usage: {memory_usage}")
runtime = _format_metric(resources.get("estimated_runtime_minutes"), "minutes")
if runtime:
print(f" • Estimated runtime: {runtime}")
if resources.get("cpu_cores_available") is not None:
print(f" • Cores available: {resources['cpu_cores_available']}")
if result.warnings:
print("\nWarnings:")
for warning in result.warnings:
print(f" • {warning}")