Source code for benchbox.core.results.exporter

"""Result exporter for BenchBox schema v2.0.

Provides JSON/CSV/HTML export of benchmark results with optional anonymization,
and utilities to list, load, compare results. This module is UI-agnostic and can
be used by both CLI and non-CLI runners.

Schema v2.0 Companion Files:

- Primary: ``{run_id}.json`` - Main result with queries, timing, summary
- Plans: ``{run_id}.plans.json`` - Query plans (if captured)
- Tuning: ``{run_id}.tuning.json`` - Tuning clauses applied (if any)
"""

from __future__ import annotations

import csv
import io
import json
import logging
from collections.abc import Iterable, Mapping
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Union

from rich.console import Console

if TYPE_CHECKING:
    from cloudpathlib import CloudPath

    from benchbox.utils.cloud_storage import DatabricksPath

PathLike = Union[Path, "CloudPath", "DatabricksPath"]

from benchbox.core.results.anonymization import (
    AnonymizationConfig,
    AnonymizationManager,
)
from benchbox.core.results.models import BenchmarkResults
from benchbox.core.results.schema import (
    SchemaV2ValidationError,
    SchemaV2Validator,
    build_plans_payload,
    build_result_payload,
    build_tuning_payload,
)
from benchbox.utils.cloud_storage import create_path_handler, is_cloud_path

logger = logging.getLogger(__name__)

ResultLike = BenchmarkResults
QueryResultLike = "QueryResult | dict[str, Any]"


[docs] class ResultExporter: """Export benchmark results with detailed metadata and anonymization. Schema v2.0 exports: - Primary result file: Contains run, benchmark, platform, summary, queries - Companion files (optional): ``.plans.json`` for query plans, ``.tuning.json`` for tuning config """ EXPORTER_NAME = "benchbox-exporter"
[docs] def __init__( self, output_dir: str | Path | None = None, anonymize: bool = True, anonymization_config: AnonymizationConfig | None = None, console: Console | None = None, ): """Initialize the result exporter. Args: output_dir: Output directory for results. Defaults to benchmark_runs/results. anonymize: Whether to anonymize system information. Defaults to True. anonymization_config: Configuration for anonymization. console: Rich console for output. Creates new one if not provided. """ if output_dir is None: self.output_dir = Path("benchmark_runs/results") self.output_dir.mkdir(parents=True, exist_ok=True) self.is_cloud_output = False else: if is_cloud_path(str(output_dir)): self.output_dir = create_path_handler(output_dir) self.is_cloud_output = True else: self.output_dir = Path(output_dir) try: self.output_dir.mkdir(parents=True, exist_ok=True) except (FileNotFoundError, PermissionError, OSError) as exc: raise FileNotFoundError(str(exc)) from exc self.is_cloud_output = False self.console = console or Console() self.anonymize = anonymize self.anonymization_manager = ( AnonymizationManager(anonymization_config or AnonymizationConfig()) if anonymize else None ) self._validator = SchemaV2Validator()
def _write_file(self, file_path: Path, content: str, mode: str = "w") -> None: """Write content to file, handling both local and cloud paths.""" if self.is_cloud_output and hasattr(file_path, "write_text"): file_path.write_text(content) elif self.is_cloud_output and hasattr(file_path, "write_bytes"): file_path.write_bytes(content.encode("utf-8")) else: with open(file_path, mode, encoding="utf-8") as handle: handle.write(content) def _create_file_path(self, filename: str): """Create file path, ensuring parent directory exists.""" if self.is_cloud_output: return self.output_dir / filename file_path = self.output_dir / filename file_path.parent.mkdir(parents=True, exist_ok=True) return file_path
[docs] def export_result( self, result: ResultLike, formats: list[str] | None = None, ) -> dict[str, Path]: """Export benchmark result to specified formats using schema v2.0. Args: result: The BenchmarkResults to export. formats: List of formats to export. Defaults to ["json"]. Returns: Dictionary mapping format names to exported file paths. """ # Add cost estimation if available if isinstance(result, BenchmarkResults): try: from benchbox.core.cost.integration import add_cost_estimation_to_results result = add_cost_estimation_to_results(result) except Exception as e: logger.debug(f"Cost estimation skipped: {e}") if formats is None: formats = ["json"] exported_files: dict[str, Path] = {} timestamp = ( result.timestamp.strftime("%Y%m%d_%H%M%S") if hasattr(result, "timestamp") and result.timestamp else datetime.now().strftime("%Y%m%d_%H%M%S") ) explicit_name = getattr(result, "output_filename", None) filename_base = Path(explicit_name).stem if explicit_name else self._generate_filename_base(result, timestamp) for format_name in formats: try: if format_name == "json": filepath = self._export_json_v2(result, filename_base) elif format_name == "csv": filepath = self._export_csv_detailed(result, filename_base) elif format_name == "html": filepath = self._export_html_detailed(result, filename_base) else: self.console.print(f"[yellow]Unknown export format: {format_name}[/yellow]") continue exported_files[format_name] = filepath self.console.print(f"[green]Exported {format_name.upper()}:[/green] {filepath}") except Exception as exc: logger.error("Failed to export %s: %s", format_name, exc) self.console.print(f"[red]Failed to export {format_name}: {exc}[/red]") return exported_files
def _generate_filename_base(self, result: ResultLike, timestamp: str) -> str: """Generate base filename for exports.""" short_name = getattr(result, "benchmark_id", None) or getattr(result, "benchmark_name", "unknown") platform = getattr(result, "platform", "unknown") try: from benchbox.utils.scale_factor import format_scale_factor scale_factor = format_scale_factor(getattr(result, "scale_factor", 1.0)) except Exception: scale_factor = f"sf{getattr(result, 'scale_factor', 1.0)}" exec_id = getattr(result, "execution_id", None) if exec_id: return f"{str(short_name).lower()}_{scale_factor}_{str(platform).lower()}_{timestamp}_{exec_id}" return f"{str(short_name).lower()}_{scale_factor}_{str(platform).lower()}_{timestamp}" def _export_json_v2(self, result: ResultLike, filename_base: str) -> Path: """Export result to JSON using schema v2.0 with companion files.""" # Build primary payload payload = build_result_payload(result) # Apply anonymization if enabled if self.anonymize and self.anonymization_manager: self._apply_anonymization(payload) anonymized = True else: anonymized = False # Add export metadata payload["export"] = { "timestamp": datetime.now().isoformat(), "tool": self.EXPORTER_NAME, "anonymized": anonymized, } # Validate before writing try: self._validator.validate(payload) except SchemaV2ValidationError as e: logger.warning(f"Schema validation warning: {e}") # Write primary result file filepath = self._create_file_path(f"{filename_base}.json") json_content = json.dumps( self._convert_datetimes_to_iso(payload), indent=2, ensure_ascii=False, ) self._write_file(filepath, json_content) # Write companion files self._write_companion_files(result, filename_base) return filepath def _write_companion_files(self, result: ResultLike, filename_base: str) -> None: """Write companion files for plans and tuning if present.""" # Plans companion file plans_payload = build_plans_payload(result) if plans_payload: plans_path = self._create_file_path(f"{filename_base}.plans.json") json_content = json.dumps(plans_payload, indent=2, ensure_ascii=False) self._write_file(plans_path, json_content) self.console.print(f"[dim]Exported plans: {plans_path}[/dim]") # Tuning companion file tuning_payload = build_tuning_payload(result) if tuning_payload: tuning_path = self._create_file_path(f"{filename_base}.tuning.json") json_content = json.dumps(tuning_payload, indent=2, ensure_ascii=False) self._write_file(tuning_path, json_content) self.console.print(f"[dim]Exported tuning: {tuning_path}[/dim]") def _apply_anonymization(self, payload: dict[str, Any]) -> None: """Apply anonymization to environment block.""" if not self.anonymization_manager: return system_profile = self.anonymization_manager.anonymize_system_profile() if system_profile: env_block = payload.get("environment", {}) # Update with anonymized values if system_profile.get("os_type"): env_block["os"] = f"{system_profile.get('os_type', '')} {system_profile.get('os_release', '')}".strip() if system_profile.get("architecture"): env_block["arch"] = system_profile["architecture"] if system_profile.get("cpu_count"): env_block["cpu_count"] = system_profile["cpu_count"] if system_profile.get("memory_gb"): env_block["memory_gb"] = system_profile["memory_gb"] if system_profile.get("python_version"): env_block["python"] = system_profile["python_version"] if env_block: payload["environment"] = env_block # Add anonymous machine ID machine_id = self.anonymization_manager.get_anonymous_machine_id() if machine_id: payload.setdefault("environment", {})["machine_id"] = machine_id def _convert_datetimes_to_iso(self, obj: Any) -> Any: """Convert datetime objects to ISO format strings.""" if isinstance(obj, datetime): return obj.isoformat() if isinstance(obj, dict): return {key: self._convert_datetimes_to_iso(value) for key, value in obj.items()} if isinstance(obj, list): return [self._convert_datetimes_to_iso(item) for item in obj] return obj def _export_csv_detailed(self, result: ResultLike, filename_base: str) -> Path: """Export query results to CSV format.""" filepath = self._create_file_path(f"{filename_base}.csv") headers = [ "query_id", "execution_time_ms", "rows_returned", "status", "error_message", "iteration", "stream", ] if self.is_cloud_output: buffer = io.StringIO() writer = csv.writer(buffer) writer.writerow(headers) for query in self._iter_query_results(result): writer.writerow( [ query.get("query_id", ""), query.get("execution_time_ms", 0), query.get("rows_returned", 0), query.get("status", "UNKNOWN"), query.get("error_message", ""), query.get("iteration", ""), query.get("stream_id", ""), ] ) self._write_file(filepath, buffer.getvalue()) buffer.close() return filepath with open(filepath, "w", newline="", encoding="utf-8") as handle: writer = csv.writer(handle) writer.writerow(headers) for query in self._iter_query_results(result): exec_time_ms = query.get("execution_time_ms") exec_time = query.get("execution_time") if exec_time_ms is None and exec_time is not None: exec_time_ms = exec_time * 1000 writer.writerow( [ query.get("query_id", ""), exec_time_ms or 0, query.get("rows_returned", 0), query.get("status", "UNKNOWN"), query.get("error") or query.get("error_message", ""), query.get("iteration", ""), query.get("stream_id", ""), ] ) return filepath def _export_html_detailed(self, result: ResultLike, filename_base: str) -> Path: """Export result to HTML format.""" filepath = self._create_file_path(f"{filename_base}.html") benchmark_name = getattr(result, "benchmark_name", "Unknown Benchmark") execution_id = getattr(result, "execution_id", "") timestamp = getattr(result, "timestamp", datetime.now()) duration = getattr(result, "duration_seconds", 0.0) scale_factor = getattr(result, "scale_factor", 1.0) platform = getattr(result, "platform", "Unknown") total_queries, successful_queries = self._count_queries(result) failed_queries = max(total_queries - successful_queries, 0) if isinstance(result, BenchmarkResults): total_time = result.total_execution_time avg_time = result.average_query_time else: successes = [ query.get("execution_time_ms", 0) for query in self._iter_query_results(result) if query.get("status") == "SUCCESS" ] total_time = sum(successes) / 1000 if successes else 0.0 avg_time = (total_time / len(successes)) if successes else 0.0 html_content = f"""<!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <title>BenchBox Results - {benchmark_name}</title> <style> body {{ font-family: system-ui, -apple-system, sans-serif; margin: 20px; background: #f5f5f5; }} .container {{ max-width: 1200px; margin: 0 auto; background: white; padding: 24px; border-radius: 8px; box-shadow: 0 1px 3px rgba(0,0,0,0.1); }} h1 {{ color: #1a1a1a; margin-bottom: 8px; }} .meta {{ color: #666; font-size: 0.9em; margin-bottom: 24px; }} .stats {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); gap: 16px; margin-bottom: 24px; }} .stat {{ background: #f8f9fa; padding: 16px; border-radius: 6px; text-align: center; }} .stat-value {{ font-size: 1.5em; font-weight: 600; color: #1a1a1a; }} .stat-label {{ font-size: 0.85em; color: #666; }} table {{ border-collapse: collapse; width: 100%; margin-top: 16px; }} th, td {{ border: 1px solid #e5e5e5; padding: 10px 12px; text-align: left; }} th {{ background: #f8f9fa; font-weight: 500; }} .success {{ color: #22863a; }} .failed {{ color: #cb2431; }} </style> </head> <body> <div class="container"> <h1>{benchmark_name}</h1> <div class="meta"> <strong>Platform:</strong> {platform} | <strong>Scale:</strong> {scale_factor} | <strong>Run:</strong> {execution_id} | <strong>Time:</strong> {timestamp.isoformat() if timestamp else "N/A"} </div> <div class="stats"> <div class="stat"> <div class="stat-value">{total_queries}</div> <div class="stat-label">Total Queries</div> </div> <div class="stat"> <div class="stat-value success">{successful_queries}</div> <div class="stat-label">Passed</div> </div> <div class="stat"> <div class="stat-value failed">{failed_queries}</div> <div class="stat-label">Failed</div> </div> <div class="stat"> <div class="stat-value">{duration:.2f}s</div> <div class="stat-label">Duration</div> </div> <div class="stat"> <div class="stat-value">{total_time:.3f}s</div> <div class="stat-label">Query Time</div> </div> <div class="stat"> <div class="stat-value">{avg_time * 1000:.1f}ms</div> <div class="stat-label">Avg Query</div> </div> </div> <h2>Query Results</h2> <table> <tr><th>Query</th><th>Time (ms)</th><th>Rows</th><th>Status</th><th>Error</th></tr> {"".join(self._render_query_row(query) for query in self._iter_query_results(result))} </table> <p style="margin-top: 24px; color: #666; font-size: 0.85em;"> Generated by BenchBox v2.0 at {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} </p> </div> </body> </html>""" self._write_file(filepath, html_content) return filepath def _count_queries(self, result: ResultLike) -> tuple[int, int]: """Count total and successful queries.""" successful = 0 total = 0 for query in self._iter_query_results(result): total += 1 if query.get("status") == "SUCCESS": successful += 1 return total, successful def _render_query_row(self, query: dict[str, Any]) -> str: """Render a single query as an HTML table row.""" status = query.get("status", "UNKNOWN") status_class = "success" if status == "SUCCESS" else "failed" exec_time_ms = query.get("execution_time_ms") exec_time = query.get("execution_time") if exec_time_ms is None and exec_time is not None: exec_time_ms = exec_time * 1000 time_display = f"{exec_time_ms:.1f}" if exec_time_ms is not None else "" return ( "<tr>" f"<td>{query.get('query_id', '')}</td>" f"<td>{time_display}</td>" f"<td>{query.get('rows_returned', '')}</td>" f"<td class='{status_class}'>{status}</td>" f"<td>{query.get('error') or query.get('error_message', '')}</td>" "</tr>" ) def _iter_query_results(self, result: ResultLike) -> Iterable[dict[str, Any]]: """Iterate over query results, normalizing format.""" if isinstance(result, BenchmarkResults): for query in result.query_results or []: yield query else: for query in getattr(result, "query_results", []) or []: if isinstance(query, dict): yield query
[docs] def list_results(self) -> list[dict[str, Any]]: """List all exported results in the output directory. Returns: List of result metadata dictionaries sorted by timestamp (newest first). """ results: list[dict[str, Any]] = [] for json_file in self.output_dir.glob("*.json"): # Skip companion files if json_file.name.endswith(".plans.json") or json_file.name.endswith(".tuning.json"): continue try: with open(json_file, encoding="utf-8") as handle: data = json.load(handle) # Detect schema version version = data.get("version") or data.get("schema_version", "unknown") if version == "2.0": # Schema v2.0 format results.append( { "file": json_file, "version": "2.0", "benchmark": data.get("benchmark", {}).get("name", "Unknown"), "platform": data.get("platform", {}).get("name", "Unknown"), "scale_factor": data.get("benchmark", {}).get("scale_factor", 1.0), "execution_id": data.get("run", {}).get("id", ""), "timestamp": data.get("run", {}).get("timestamp", ""), "duration": data.get("run", {}).get("total_duration_ms", 0) / 1000, "queries": data.get("summary", {}).get("queries", {}).get("total", 0), "status": data.get("summary", {}).get("validation", "unknown"), } ) else: # Legacy v1.x format - still supported for reading results.append( { "file": json_file, "version": version, "benchmark": data.get("benchmark", {}).get("name", "Unknown"), "platform": data.get("execution", {}).get("platform", "Unknown"), "execution_id": data.get("execution", {}).get("id", ""), "timestamp": data.get("execution", {}).get("timestamp", ""), "duration": data.get("execution", {}).get("duration_ms", 0) / 1000, "queries": data.get("results", {}).get("queries", {}).get("total", 0), "status": data.get("validation", {}).get("status", "UNKNOWN"), } ) except Exception as exc: logger.debug("Could not read %s: %s", json_file, exc) return sorted(results, key=lambda item: item["timestamp"], reverse=True)
[docs] def show_results_summary(self) -> None: """Display a summary of exported results.""" results = self.list_results() if not results: self.console.print("[yellow]No exported results found[/yellow]") return self.console.print(f"\n[bold]Exported Results ({len(results)} total)[/bold]") self.console.print(f"Output directory: [cyan]{self.output_dir}[/cyan]") from rich.table import Table table = Table() table.add_column("Benchmark", style="green") table.add_column("Platform", style="blue") table.add_column("Timestamp", style="dim") table.add_column("Duration", style="yellow") table.add_column("Queries", style="cyan") table.add_column("Version", style="dim") for result in results[:10]: duration_str = f"{result['duration']:.2f}s" timestamp_str = str(result["timestamp"])[:19].replace("T", " ") table.add_row( result["benchmark"], result.get("platform", ""), timestamp_str, duration_str, str(result["queries"]), result.get("version", ""), ) self.console.print(table) if len(results) > 10: self.console.print(f"\n[dim]... and {len(results) - 10} more results[/dim]")
[docs] def load_result_from_file(self, filepath: Path) -> dict[str, Any] | None: """Load a result file and return parsed data. Args: filepath: Path to the result JSON file. Returns: Dictionary with data, version, and filepath, or None on error. """ try: with open(filepath, encoding="utf-8") as handle: data = json.load(handle) version = data.get("version") or data.get("schema_version", "unknown") return {"data": data, "version": version, "filepath": filepath} except Exception as exc: logger.error("Failed to load result from %s: %s", filepath, exc) return None
[docs] def compare_results(self, baseline_path: Path, current_path: Path) -> dict[str, Any]: """Compare two result files and return performance analysis. Args: baseline_path: Path to baseline result file. current_path: Path to current result file. Returns: Comparison dictionary with performance changes and query comparisons. """ baseline_result = self.load_result_from_file(baseline_path) current_result = self.load_result_from_file(current_path) if not baseline_result or not current_result: return { "error": "Failed to load one or both result files", "baseline_loaded": bool(baseline_result), "current_loaded": bool(current_result), } baseline_data = baseline_result["data"] current_data = current_result["data"] baseline_version = baseline_result.get("version", "unknown") current_version = current_result.get("version", "unknown") # Extract metrics based on schema version perf_baseline = self._extract_performance_metrics(baseline_data, baseline_version) perf_current = self._extract_performance_metrics(current_data, current_version) comparison: dict[str, Any] = { "baseline_file": str(baseline_path), "current_file": str(current_path), "baseline_version": baseline_version, "current_version": current_version, "performance_changes": {}, "query_comparisons": [], } # Compare overall metrics for metric in ["total_execution_time", "average_query_time"]: if metric in perf_baseline and metric in perf_current: baseline_value = perf_baseline[metric] current_value = perf_current[metric] change = ((current_value - baseline_value) / baseline_value * 100) if baseline_value else 0 comparison["performance_changes"][metric] = { "baseline": baseline_value, "current": current_value, "change_percent": round(change, 2), "improved": current_value < baseline_value, } # Compare individual queries baseline_queries = self._extract_query_map(baseline_data, baseline_version) current_queries = self._extract_query_map(current_data, current_version) for query_id, baseline_query in baseline_queries.items(): current_query = current_queries.get(query_id) if not current_query: continue baseline_time = baseline_query.get("execution_time_ms") or 0 current_time = current_query.get("execution_time_ms") or 0 change = ((current_time - baseline_time) / baseline_time * 100) if baseline_time else 0 comparison["query_comparisons"].append( { "query_id": query_id, "baseline_time_ms": baseline_time, "current_time_ms": current_time, "change_percent": round(change, 2), "improved": current_time < baseline_time, } ) # Generate summary if comparison["query_comparisons"]: improved = len([q for q in comparison["query_comparisons"] if q["improved"]]) regressed = len( [q for q in comparison["query_comparisons"] if not q["improved"] and q["change_percent"] > 0] ) comparison["summary"] = { "total_queries_compared": len(comparison["query_comparisons"]), "improved_queries": improved, "regressed_queries": regressed, "unchanged_queries": len(comparison["query_comparisons"]) - improved - regressed, "overall_assessment": self._assess_performance_change(comparison["performance_changes"]), } return comparison
def _extract_performance_metrics(self, data: dict[str, Any], version: str) -> dict[str, Any]: """Extract performance metrics from result data.""" if version == "2.0": # Schema v2.0 format summary = data.get("summary", {}) timing = summary.get("timing", {}) queries = summary.get("queries", {}) return { "total_queries": queries.get("total", 0), "successful_queries": queries.get("passed", 0), "failed_queries": queries.get("failed", 0), "total_execution_time": timing.get("total_ms", 0) / 1000, "average_query_time": timing.get("avg_ms", 0) / 1000, } else: # Legacy v1.x format results_block = data.get("results", {}) if not isinstance(results_block, Mapping): return { "total_queries": 0, "successful_queries": 0, "failed_queries": 0, "total_execution_time": 0.0, "average_query_time": 0.0, } queries_block = results_block.get("queries", {}) timing_block = results_block.get("timing", {}) return { "total_queries": queries_block.get("total", 0) if isinstance(queries_block, Mapping) else 0, "successful_queries": queries_block.get("successful", 0) if isinstance(queries_block, Mapping) else 0, "failed_queries": queries_block.get("failed", 0) if isinstance(queries_block, Mapping) else 0, "total_execution_time": (timing_block.get("total_ms", 0) / 1000) if isinstance(timing_block, Mapping) else 0.0, "average_query_time": (timing_block.get("avg_ms", 0) / 1000) if isinstance(timing_block, Mapping) else 0.0, } def _extract_query_map(self, data: dict[str, Any], version: str) -> dict[str, dict[str, Any]]: """Extract query results as a map from query ID to query data.""" if version == "2.0": # Schema v2.0 format - queries is a list queries = data.get("queries", []) result = {} for q in queries: query_id = q.get("id") if query_id: result[query_id] = { "query_id": query_id, "execution_time_ms": q.get("ms", 0), "rows_returned": q.get("rows"), } return result else: # Legacy v1.x format results_block = data.get("results", {}) if not isinstance(results_block, Mapping): return {} queries_block = results_block.get("queries", {}) if not isinstance(queries_block, Mapping): return {} details = queries_block.get("details", []) result = {} for item in details: if isinstance(item, dict): query_id = item.get("id") or item.get("query_id") if query_id: result[str(query_id)] = { "query_id": query_id, "execution_time_ms": item.get("execution_time_ms", 0), "rows_returned": item.get("rows_returned"), } return result def _assess_performance_change(self, performance_changes: dict[str, Any]) -> str: """Assess overall performance change.""" if not performance_changes: return "no_data" time_metrics = ["total_execution_time", "average_query_time"] time_changes = [performance_changes[m]["change_percent"] for m in time_metrics if m in performance_changes] if not time_changes: return "unknown" avg_change = sum(time_changes) / len(time_changes) if avg_change < -10: return "significant_improvement" if avg_change < -5: return "improvement" if avg_change > 10: return "significant_regression" if avg_change > 5: return "regression" return "no_significant_change"
[docs] def export_comparison_report( self, comparison: dict[str, Any], output_path: PathLike | None = None, ) -> PathLike: """Export comparison results as an HTML report. Args: comparison: Comparison dictionary from compare_results(). output_path: Output file path. Auto-generates if not provided. Returns: Path to the exported report. """ if output_path is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = self.output_dir / f"comparison_report_{timestamp}.html" summary = comparison.get("summary", {}) performance_changes = comparison.get("performance_changes", {}) query_comparisons = comparison.get("query_comparisons", []) html_content = f"""<!DOCTYPE html> <html> <head> <title>BenchBox Comparison Report</title> <style> body {{ font-family: system-ui, sans-serif; margin: 20px; background: #f5f5f5; }} .container {{ max-width: 1200px; margin: 0 auto; background: white; padding: 24px; border-radius: 8px; }} .header {{ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 8px; margin-bottom: 20px; }} .summary {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); gap: 16px; margin-bottom: 24px; }} .metric {{ padding: 16px; border-radius: 8px; text-align: center; }} .metric.improved {{ background: #d4edda; border: 1px solid #c3e6cb; }} .metric.regressed {{ background: #f8d7da; border: 1px solid #f5c6cb; }} .metric.neutral {{ background: #f8f9fa; border: 1px solid #e9ecef; }} .metric h3 {{ margin: 0; font-size: 0.85em; text-transform: uppercase; color: #666; }} .metric p {{ margin: 8px 0 0 0; font-size: 1.4em; font-weight: bold; }} table {{ border-collapse: collapse; width: 100%; margin-top: 16px; }} th, td {{ border: 1px solid #e5e5e5; padding: 10px 12px; }} th {{ background: #f8f9fa; font-weight: 500; }} </style> </head> <body> <div class="container"> <div class="header"> <h1>Performance Comparison Report</h1> <p>Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p> </div> <div class="summary"> <div class="metric neutral"> <h3>Queries Compared</h3> <p>{summary.get("total_queries_compared", 0)}</p> </div> <div class="metric improved"> <h3>Improved</h3> <p>{summary.get("improved_queries", 0)}</p> </div> <div class="metric regressed"> <h3>Regressed</h3> <p>{summary.get("regressed_queries", 0)}</p> </div> <div class="metric neutral"> <h3>Unchanged</h3> <p>{summary.get("unchanged_queries", 0)}</p> </div> </div> <h2>Performance Changes</h2> <ul> { "".join( f"<li>{metric.replace('_', ' ').title()}: {vals['change_percent']:+.1f}% " f"({'Improved' if vals['improved'] else 'Regressed'})</li>" for metric, vals in performance_changes.items() ) } </ul> <h2>Query Details</h2> <table> <tr><th>Query</th><th>Baseline (ms)</th><th>Current (ms)</th><th>Change</th><th>Status</th></tr> { "".join( f"<tr><td>{q['query_id']}</td><td>{q['baseline_time_ms']:.1f}</td>" f"<td>{q['current_time_ms']:.1f}</td><td>{q['change_percent']:+.1f}%</td>" f"<td>{'Improved' if q['improved'] else 'Regressed'}</td></tr>" for q in query_comparisons ) } </table> </div> </body> </html>""" self._write_file(output_path, html_content) return output_path