Source code for benchbox.core.results.models

"""
Core result models for benchmark execution.

These dataclasses capture detailed execution phases and summary metrics for
benchmarks and are intentionally free of CLI/platform imports to avoid cycles.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from benchbox.core.results.query_plan_models import QueryPlanDAG


@dataclass
class TableGenerationStats:
    generation_time_ms: int
    status: str
    rows_generated: int
    data_size_bytes: int
    file_path: str
    error_type: str | None = None
    error_message: str | None = None
    rows_attempted: int | None = None
    bytes_attempted: int | None = None
    error_timestamp: str | None = None


@dataclass
class DataGenerationPhase:
    duration_ms: int
    status: str
    tables_generated: int
    total_rows_generated: int
    total_data_size_bytes: int
    per_table_stats: dict[str, TableGenerationStats]


@dataclass
class TableCreationStats:
    creation_time_ms: int
    status: str
    constraints_applied: int
    indexes_created: int
    error_type: str | None = None
    error_message: str | None = None
    error_timestamp: str | None = None


@dataclass
class SchemaCreationPhase:
    duration_ms: int
    status: str
    tables_created: int
    constraints_applied: int
    indexes_created: int
    per_table_creation: dict[str, TableCreationStats]


@dataclass
class TableLoadingStats:
    rows: int
    load_time_ms: int
    status: str
    error_type: str | None = None
    error_message: str | None = None
    rows_processed: int | None = None
    rows_successful: int | None = None
    error_timestamp: str | None = None


@dataclass
class DataLoadingPhase:
    duration_ms: int
    status: str
    total_rows_loaded: int
    tables_loaded: int
    per_table_stats: dict[str, TableLoadingStats]


@dataclass
class ValidationPhase:
    duration_ms: int
    row_count_validation: str
    schema_validation: str
    data_integrity_checks: str
    validation_details: dict[str, Any] | None = None


@dataclass
class SetupPhase:
    data_generation: DataGenerationPhase | None = None
    schema_creation: SchemaCreationPhase | None = None
    data_loading: DataLoadingPhase | None = None
    validation: ValidationPhase | None = None


@dataclass
class QueryExecution:
    query_id: str
    stream_id: str
    execution_order: int
    execution_time_ms: int
    status: str
    rows_returned: int | None = None
    resource_usage: dict[str, Any] | None = None
    error_message: str | None = None
    iteration: int | None = None
    run_type: str | None = None
    # Row count validation - nested object structure
    row_count_validation: dict[str, Any] | None = None  # Contains: expected, actual, status, error/warning
    # Cost estimation
    cost: float | None = None  # Compute cost in USD for this query
    # Query plan capture (structured DAG representation)
    query_plan: QueryPlanDAG | None = None  # Captured query execution plan
    plan_fingerprint: str | None = None  # SHA256 hash for fast plan comparison
    plan_capture_time_ms: float | None = None  # Time spent capturing plan (EXPLAIN + parse)


@dataclass
class PowerTestPhase:
    start_time: str
    end_time: str
    duration_ms: int
    query_executions: list[QueryExecution]
    geometric_mean_time: float
    power_at_size: float


@dataclass
class ThroughputStream:
    stream_id: int
    start_time: str
    end_time: str
    duration_ms: int
    query_executions: list[QueryExecution]


@dataclass
class ThroughputTestPhase:
    start_time: str
    end_time: str
    duration_ms: int
    num_streams: int
    streams: list[ThroughputStream]
    total_queries_executed: int
    throughput_at_size: float


@dataclass
class MaintenanceOperation:
    operation: str
    operation_type: str
    table: str
    execution_time_ms: int
    rows_affected: int
    status: str
    error_message: str | None = None


@dataclass
class MaintenanceTestPhase:
    start_time: str
    end_time: str
    duration_ms: int
    maintenance_operations: list[MaintenanceOperation]
    query_executions: list[QueryExecution]


[docs] @dataclass class ExecutionPhases: setup: SetupPhase power_test: PowerTestPhase | None = None throughput_test: ThroughputTestPhase | None = None maintenance_test: MaintenanceTestPhase | None = None
@dataclass class QueryDefinition: sql: str parameters: dict[str, Any] | None = None
[docs] @dataclass class BenchmarkResults: benchmark_name: str platform: str scale_factor: float execution_id: str timestamp: datetime duration_seconds: float total_queries: int successful_queries: int failed_queries: int # Summary of queries (flattened list for basic consumers) query_results: list[dict[str, Any]] = field(default_factory=list) # Summary metrics total_execution_time: float = 0.0 average_query_time: float = 0.0 # Setup metrics data_loading_time: float = 0.0 schema_creation_time: float = 0.0 total_rows_loaded: int = 0 data_size_mb: float = 0.0 table_statistics: dict[str, int] = field(default_factory=dict) # Optional detailed per-query timing info (for CSV export and analysis) per_query_timings: list[dict[str, Any]] | None = field(default_factory=list) # Optional detailed structures execution_phases: ExecutionPhases | None = None query_definitions: dict[str, dict[str, QueryDefinition]] | None = None # TPC metrics and execution type test_execution_type: str = "standard" power_at_size: float | None = None throughput_at_size: float | None = None qphh_at_size: float | None = None geometric_mean_execution_time: float | None = None # Validation and metadata validation_status: str = "PASSED" validation_details: dict[str, Any] | None = None platform_info: dict[str, Any] | None = None platform_metadata: dict[str, Any] | None = None tunings_applied: dict[str, Any] | None = None tuning_config_hash: str | None = None # SHA-256 hash for config comparison tuning_source_file: str | None = None # Path to tuning YAML file if applicable tuning_validation_status: str = "NOT_VALIDATED" tuning_metadata_saved: bool = False system_profile: dict[str, Any] | None = None database_name: str | None = None anonymous_machine_id: str | None = None execution_metadata: dict[str, Any] | None = None performance_characteristics: dict[str, Any] = field(default_factory=dict) performance_summary: dict[str, Any] = field(default_factory=dict) # Cost estimation cost_summary: dict[str, Any] | None = None # Contains: total_cost, phase_costs, platform_details driver_package: str | None = None driver_version_requested: str | None = None driver_version_resolved: str | None = None driver_auto_install: bool = False # Additional optional attributes set dynamically output_filename: str | None = None resource_utilization: dict[str, Any] | None = None _benchmark_id_override: str | None = None summary_metrics: dict[str, Any] = field(default_factory=dict) query_subset: list[str] | None = None concurrency_level: int | None = None benchmark_version: str | None = None # Query plan capture statistics query_plans_captured: int = 0 # Count of queries with captured plans plan_capture_failures: int = 0 # Count of plan capture failures plan_capture_errors: list[dict[str, str]] = field(default_factory=list) plan_comparison_summary: dict[str, Any] | None = None # Cross-run/platform plan comparison results # Query plan capture timing (set during result aggregation) total_plan_capture_time_ms: float = 0.0 # Total time spent on plan capture avg_plan_capture_overhead_pct: float = 0.0 # Average overhead as % of query time max_plan_capture_time_ms: float = 0.0 # Maximum single capture time @property def benchmark_id(self) -> str: """Return benchmark identifier derived from benchmark name.""" override = getattr(self, "_benchmark_id_override", None) if override: return override if isinstance(self.execution_metadata, dict): metadata_override = self.execution_metadata.get("benchmark_id") if isinstance(metadata_override, str) and metadata_override: return metadata_override normalized = self.benchmark_name.lower().replace(" ", "_").replace("-", "_") while "__" in normalized: normalized = normalized.replace("__", "_") return normalized