Source code for benchbox.core.results.display

"""
Copyright 2026 Joe Harris / BenchBox Project

Licensed under the MIT License. See LICENSE file in the project root for details.

Result Display Utilities

This module provides reusable result formatting and display functions
for benchmark execution results and system information.
"""

from __future__ import annotations

from typing import Any

from benchbox.utils import format_duration


[docs] def display_results(result_data: dict[str, Any], verbosity: int = 0) -> None: """Display benchmark results in a standardized format. Args: result_data: Dictionary containing benchmark results verbosity: Verbosity level (0=minimal, 1=detailed, 2=verbose) """ benchmark_name = result_data.get("benchmark", "unknown").upper() print(f"Benchmark: {benchmark_name}") print(f"Scale Factor: {result_data.get('scale_factor', 'unknown')}") print(f"Platform: {result_data.get('platform', 'unknown')}") success = result_data.get("success", False) print(f"\nBenchmark Status: {'PASSED' if success else 'FAILED'}") if verbosity > 0 and "total_duration" in result_data: print(f"Total Duration: {format_duration(result_data['total_duration'])}") if "schema_creation_time" in result_data: print(f"Schema Creation: {format_duration(result_data['schema_creation_time'])}") if "data_loading_time" in result_data: print(f"Data Loading: {format_duration(result_data['data_loading_time'])}") if "successful_queries" in result_data: print(f"Queries: {result_data['successful_queries']}/{result_data['total_queries']} successful") if success and "total_execution_time" in result_data: print(f"Query Execution Time: {format_duration(result_data['total_execution_time'])}") if "average_query_time" in result_data: print(f"Average Query Time: {format_duration(result_data['average_query_time'])}") if success: print(f"\n{benchmark_name} benchmark completed!") else: print(f"\n{benchmark_name} benchmark failed.")
def display_platform_list(platforms: dict[str, bool], get_requirements_func=None) -> None: """Display list of available platforms. Args: platforms: Dictionary mapping platform names to availability status get_requirements_func: Optional function to get platform requirements """ print("Available platforms:") for platform, available in platforms.items(): status = "✅ Available" if available else "❌ Not available" if not available and get_requirements_func: requirements = get_requirements_func(platform) status += f" ({requirements})" print(f" {platform}: {status}") def display_benchmark_list(benchmark_classes: dict[str, Any]) -> None: """Display list of available benchmarks. Args: benchmark_classes: Dictionary mapping benchmark names to classes """ print("Available benchmarks:") for benchmark_name in sorted(benchmark_classes.keys()): benchmark_class = benchmark_classes[benchmark_name] description = ( getattr(benchmark_class, "__doc__", "").split("\n")[0] if benchmark_class.__doc__ else "No description available" ) print(f" {benchmark_name}: {description}") def display_configuration_summary(config: dict[str, Any], verbosity: int = 0) -> None: """Display configuration summary. Args: config: Unified configuration dictionary verbosity: Verbosity level """ if verbosity == 0: return print(f"Benchmark: {config['benchmark'].upper()} @ SF{config['scale_factor']} on {config['platform'].title()}") print(f"Phases: {config.get('phases', 'unknown')}") if verbosity > 0: tuning_mode = config.get("tuning_mode", "unknown") print(f"Tuning mode: {tuning_mode}") if config.get("tuning_config"): tuning_type = config["tuning_config"].get("_metadata", {}).get("configuration_type", "unknown") print(f"Tuning type: {tuning_type}") def display_verbose_config_feedback(config: dict[str, Any], platform: str) -> None: """Display verbose configuration feedback after adapter creation. Args: config: Platform adapter configuration platform: Platform name """ if platform in ["duckdb", "sqlite"]: db_path = config.get("database_path", "unknown") print(f"Database file: {db_path}") elif platform == "databricks": schema_name = config.get("schema", "unknown") catalog_name = config.get("catalog", "unknown") print(f"Database schema: {catalog_name}.{schema_name}") elif platform == "bigquery": dataset_id = config.get("dataset_id", "unknown") project_id = config.get("project_id", "unknown") print(f"BigQuery dataset: {project_id}.{dataset_id}") elif platform in ["redshift", "snowflake"]: database_name = config.get("database", "unknown") print(f"Database name: {database_name}") elif platform == "clickhouse": data_path = config.get("data_path", "unknown") print(f"Data path: {data_path}") def print_phase_header(phase: str) -> None: """Print a formatted phase header. Args: phase: Phase name to display """ print(f"\n--- Running Phase: {phase.title()} ---") def print_completion_message(phase: str, output_location: str | None = None) -> None: """Print a completion message for a phase. Args: phase: Phase that completed output_location: Optional output location to display """ if output_location: print(f"✅ {phase.title()} phase complete in {output_location}") else: print(f"✅ {phase.title()} phase complete.") def print_dry_run_summary(result, output_dir, saved_files=None) -> None: """Print a human-friendly summary of dry run artifacts and insights.""" benchmark_info = result.benchmark_config or {} preview = result.query_preview or {} platform_info = result.platform_config or {} resources = result.estimated_resources or {} query_count = preview.get("query_count") or len(result.queries) or 0 print("✅ Dry run completed successfully!") print(f"Artifacts directory: {output_dir}") def _format_metric(value, suffix): try: return f"{float(value):.2f} {suffix}" except (TypeError, ValueError): return f"{value} {suffix}" if value is not None else None if saved_files: print("Artifacts:") for label, path in sorted(saved_files.items()): print(f" • {label}: {path}") else: print("Artifacts: JSON, YAML, and per-query SQL files emitted.") print("\nBenchmark Overview:") display_name = benchmark_info.get("display_name") or benchmark_info.get("name", "unknown").upper() print(f" • Benchmark: {display_name}") if "scale_factor" in benchmark_info: print(f" • Scale factor: {benchmark_info['scale_factor']}") execution_context = ( preview.get("execution_context") or benchmark_info.get("test_execution_type") or "standard execution" ) print(f" • Execution mode: {execution_context}") print(f" • Queries prepared: {query_count}") if preview.get("estimated_time"): print(f" • Estimated run time: {preview['estimated_time']}") if preview.get("data_size_mb"): print(f" • Estimated data size: {preview['data_size_mb']} MB") if platform_info: print("\nPlatform Summary:") platform_name = platform_info.get("platform_name") or platform_info.get("platform_type") if platform_name: print(f" • Platform: {platform_name}") connection_mode = platform_info.get("connection_mode") or platform_info.get("connection_type") if connection_mode: print(f" • Connection: {connection_mode}") configuration = platform_info.get("configuration") or {} if configuration: interesting_keys = [ "database_path", "memory_limit", "schema", "catalog", "staging_root", ] for key in interesting_keys: if key in configuration and configuration[key] is not None: print(f" • {key.replace('_', ' ').title()}: {configuration[key]}") if resources: print("\nResource Estimates:") data_size = _format_metric(resources.get("estimated_data_size_mb"), "MB") if data_size: print(f" • Estimated data size: {data_size}") memory_usage = _format_metric(resources.get("estimated_memory_usage_mb"), "MB") if memory_usage: print(f" • Estimated memory usage: {memory_usage}") runtime = _format_metric(resources.get("estimated_runtime_minutes"), "minutes") if runtime: print(f" • Estimated runtime: {runtime}") if resources.get("cpu_cores_available") is not None: print(f" • Cores available: {resources['cpu_cores_available']}") if result.warnings: print("\nWarnings:") for warning in result.warnings: print(f" • {warning}")