Source code for benchbox.monitoring.performance

"""Performance monitoring utilities for BenchBox.

This module provides lightweight primitives for recording runtime metrics, taking
snapshots, persisting history, and detecting regressions. It is intentionally
framework-agnostic so both the CLI and tests can reuse the same functionality
without introducing circular imports.
"""

from __future__ import annotations

import json
import statistics
import tempfile
from collections.abc import Iterable
from contextlib import contextmanager
from dataclasses import asdict, dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from time import perf_counter
from typing import Any

# Percentage thresholds are expressed as floating point values (e.g. 0.15 == 15%).
DEFAULT_REGRESSION_THRESHOLD = 0.15


[docs] @dataclass(frozen=True) class TimingStats: """Aggregate timing statistics for a single metric.""" count: int minimum: float maximum: float mean: float median: float p90: float p95: float p99: float total: float
[docs] def to_dict(self) -> dict[str, float]: return asdict(self)
[docs] @dataclass(frozen=True) class PerformanceSnapshot: """Serializable snapshot of recorded metrics.""" timestamp: str counters: dict[str, int] gauges: dict[str, float] timings: dict[str, TimingStats] metadata: dict[str, Any] = field(default_factory=dict)
[docs] def to_dict(self) -> dict[str, Any]: return { "timestamp": self.timestamp, "counters": dict(self.counters), "gauges": dict(self.gauges), "timings": {name: stats.to_dict() for name, stats in self.timings.items()}, "metadata": dict(self.metadata), }
[docs] @dataclass(frozen=True) class PerformanceRegressionAlert: """Represents a detected performance regression for a metric.""" metric: str baseline: float current: float change_percent: float threshold_percent: float direction: str
[docs] def to_dict(self) -> dict[str, Any]: return asdict(self)
[docs] class PerformanceMonitor: """Record counters, gauges, and timing metrics for benchmark execution."""
[docs] def __init__(self) -> None: self._counters: dict[str, int] = {} self._gauges: dict[str, float] = {} self._timings: dict[str, list[float]] = {} self._metadata: dict[str, Any] = {}
[docs] def increment_counter(self, name: str, value: int = 1) -> None: """Increment a named counter.""" self._counters[name] = self._counters.get(name, 0) + value
[docs] def get_counter(self, name: str) -> int: """Return the current value for counter *name*.""" return self._counters.get(name, 0)
[docs] def set_gauge(self, name: str, value: float) -> None: """Record the latest value for a gauge metric.""" self._gauges[name] = float(value)
[docs] def record_timing(self, name: str, duration_seconds: float) -> None: """Record a single timing observation for *name*.""" self._timings.setdefault(name, []).append(float(duration_seconds))
[docs] @contextmanager def time_operation(self, name: str): """Context manager that records timing on exit.""" start = perf_counter() try: yield finally: elapsed = perf_counter() - start self.record_timing(name, elapsed)
[docs] def set_metadata(self, key: str, value: Any) -> None: """Attach arbitrary metadata to the snapshot.""" self._metadata[key] = value
[docs] def update_metadata(self, items: dict[str, Any]) -> None: """Bulk update metadata with *items*.""" self._metadata.update(items)
# --------------------------- Snapshot / summary ---------------------------
[docs] def snapshot(self) -> PerformanceSnapshot: """Create an immutable snapshot of the currently recorded metrics.""" timings_summary = {name: self._summarize_timings(samples) for name, samples in self._timings.items()} return PerformanceSnapshot( timestamp=datetime.now(timezone.utc).isoformat(), counters=dict(self._counters), gauges=dict(self._gauges), timings=timings_summary, metadata=dict(self._metadata), )
[docs] def summary(self) -> dict[str, Any]: """Return a plain dictionary representation useful for serialization.""" snapshot = self.snapshot() return snapshot.to_dict()
[docs] def reset(self) -> None: """Clear all recorded metrics and metadata.""" self._counters.clear() self._gauges.clear() self._timings.clear() self._metadata.clear()
@staticmethod def _summarize_timings(samples: Iterable[float]) -> TimingStats: series = [float(value) for value in samples if value is not None] if not series: return TimingStats( count=0, minimum=0.0, maximum=0.0, mean=0.0, median=0.0, p90=0.0, p95=0.0, p99=0.0, total=0.0 ) sorted_series = sorted(series) count = len(sorted_series) total = sum(sorted_series) mean = statistics.fmean(sorted_series) median = statistics.median(sorted_series) def percentile(p: float) -> float: if count == 1: return sorted_series[0] rank = (p / 100) * (count - 1) lower = int(rank) upper = min(lower + 1, count - 1) weight = rank - lower return sorted_series[lower] + weight * (sorted_series[upper] - sorted_series[lower]) return TimingStats( count=count, minimum=sorted_series[0], maximum=sorted_series[-1], mean=mean, median=median, p90=percentile(90), p95=percentile(95), p99=percentile(99), total=total, )
[docs] class PerformanceHistory: """Persist performance snapshots and surface trends/regressions."""
[docs] def __init__(self, storage_path: Path, max_entries: int = 50) -> None: self.storage_path = Path(storage_path) self.max_entries = max_entries self.storage_path.parent.mkdir(parents=True, exist_ok=True) self._history: list[dict[str, Any]] = self._load_history()
def _load_history(self) -> list[dict[str, Any]]: if not self.storage_path.exists(): return [] try: with self.storage_path.open("r", encoding="utf-8") as handle: payload = json.load(handle) if isinstance(payload, list): return payload if isinstance(payload, dict) and "entries" in payload: return list(payload["entries"]) except (json.JSONDecodeError, OSError): pass return [] def _write_history(self) -> None: payload = {"entries": self._history} with self.storage_path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2) # ------------------------------ Recording --------------------------------
[docs] def record( self, snapshot: PerformanceSnapshot, regression_thresholds: dict[str, float] | None = None, prefer_lower_metrics: list[str] | None = None, ) -> list[PerformanceRegressionAlert]: """Persist *snapshot* and return any regression alerts. Args: snapshot: Snapshot to persist. regression_thresholds: Optional per-metric thresholds (percent). prefer_lower_metrics: Metrics where higher values indicate regressions. """ prefer_lower_metrics = prefer_lower_metrics or [] regression_thresholds = regression_thresholds or {} alerts: list[PerformanceRegressionAlert] = [] if self._history: baseline = self._history[-1] alerts = self._detect_regressions(baseline, snapshot.to_dict(), regression_thresholds, prefer_lower_metrics) # Append and truncate history window self._history.append(snapshot.to_dict()) if len(self._history) > self.max_entries: self._history = self._history[-self.max_entries :] self._write_history() return alerts
# ------------------------------ Analytics --------------------------------
[docs] def trend(self, metric: str, window: int = 10) -> str: """Return simple trend descriptor for *metric* using last *window* entries.""" if window < 2: window = 2 series = self.metric_history(metric)[-window:] if len(series) < 2: return "insufficient_data" midpoint = len(series) // 2 first_half = series[:midpoint] second_half = series[midpoint:] if not first_half or not second_half: return "insufficient_data" first_avg = statistics.fmean(first_half) second_avg = statistics.fmean(second_half) if first_avg == 0 and second_avg == 0: return "stable" change = float("inf") if first_avg == 0 else (second_avg - first_avg) / abs(first_avg) if change > 0.1: return "degrading" if change < -0.1: return "improving" return "stable"
[docs] def metric_history(self, metric: str) -> list[float]: values: list[float] = [] for entry in self._history: if metric in entry.get("counters", {}): values.append(float(entry["counters"][metric])) continue timing = entry.get("timings", {}).get(metric) if timing: values.append(float(timing.get("mean", 0.0))) continue gauge = entry.get("gauges", {}).get(metric) if gauge is not None: values.append(float(gauge)) return values
# --------------------------- Regression logic ---------------------------- @staticmethod def _detect_regressions( baseline: dict[str, Any], current: dict[str, Any], thresholds: dict[str, float], prefer_lower_metrics: list[str], ) -> list[PerformanceRegressionAlert]: alerts: list[PerformanceRegressionAlert] = [] def extract_value(entry: dict[str, Any], key: str) -> float | None: if key in entry.get("counters", {}): return float(entry["counters"][key]) if key in entry.get("gauges", {}): return float(entry["gauges"][key]) timing = entry.get("timings", {}).get(key) if timing: return float(timing.get("mean", 0.0)) return None for metric, threshold in thresholds.items(): baseline_value = extract_value(baseline, metric) current_value = extract_value(current, metric) if baseline_value is None or current_value is None: continue if baseline_value == 0 and current_value == 0: continue change = 0.0 if baseline_value == 0 else (current_value - baseline_value) / abs(baseline_value) prefers_lower = metric in prefer_lower_metrics regression = change > threshold if prefers_lower else change < -threshold if regression: direction = "increase" if prefers_lower else "decrease" alerts.append( PerformanceRegressionAlert( metric=metric, baseline=baseline_value, current=current_value, change_percent=change, threshold_percent=threshold, direction=direction, ) ) return alerts
[docs] class PerformanceTracker: """File-backed metric recorder with basic trend/anomaly analysis."""
[docs] def __init__(self, storage_path: Path | None = None) -> None: default_path = Path(tempfile.gettempdir()) / "benchbox_performance_history.json" self.storage_path = Path(storage_path) if storage_path else default_path self.storage_path.parent.mkdir(parents=True, exist_ok=True) self.metrics_history = self._load_history()
def _load_history(self) -> dict[str, list[dict[str, Any]]]: if not self.storage_path.exists(): return {} try: with self.storage_path.open("r", encoding="utf-8") as handle: payload = json.load(handle) if isinstance(payload, dict): return {key: list(value) for key, value in payload.items()} except (json.JSONDecodeError, OSError): pass return {} def _save_history(self) -> None: with self.storage_path.open("w", encoding="utf-8") as handle: json.dump(self.metrics_history, handle, indent=2)
[docs] def record_metric(self, metric_name: str, value: float, timestamp: datetime | None = None) -> None: """Record a metric measurement with optional timestamp.""" if timestamp is None: timestamp = datetime.now(timezone.utc) entry = {"timestamp": timestamp.isoformat(), "value": float(value)} self.metrics_history.setdefault(metric_name, []).append(entry) self._save_history()
[docs] def get_trend(self, metric_name: str, days: int = 30) -> dict[str, Any]: """Return trend information for *metric_name* over *days* days.""" history = self.metrics_history.get(metric_name, []) if not history: return {"trend": "unknown", "recent_values": [], "average": 0} cutoff = datetime.now(timezone.utc) - timedelta(days=days) recent_entries = [entry for entry in history if datetime.fromisoformat(entry["timestamp"]) >= cutoff] if len(recent_entries) < 2: return {"trend": "insufficient_data", "recent_values": recent_entries, "average": 0} values = [entry["value"] for entry in recent_entries] if not values: return {"trend": "unknown", "recent_values": [], "average": 0} half = len(values) // 2 first_half = values[:half] or values second_half = values[half:] or values first_avg = statistics.fmean(first_half) second_avg = statistics.fmean(second_half) change = (float("inf") if second_avg else 0.0) if first_avg == 0 else (second_avg - first_avg) / abs(first_avg) if change > 0.1: trend = "degrading" elif change < -0.1: trend = "improving" else: trend = "stable" return { "trend": trend, "recent_values": values, "average": statistics.fmean(values), "min": min(values), "max": max(values), "std_dev": statistics.stdev(values) if len(values) > 1 else 0.0, }
[docs] def detect_anomalies(self, metric_name: str, threshold_multiplier: float = 2.0) -> list[dict[str, Any]]: """Return entries whose deviation exceeds ``threshold_multiplier`` * std dev.""" history = self.metrics_history.get(metric_name, []) if len(history) < 10: return [] values = [entry["value"] for entry in history] mean_value = statistics.fmean(values) std_dev = statistics.stdev(values) threshold = std_dev * threshold_multiplier anomalies = [] for entry in history: deviation = abs(entry["value"] - mean_value) if deviation > threshold: anomalies.append( { "timestamp": entry["timestamp"], "value": entry["value"], "deviation": deviation, "threshold": threshold, } ) return anomalies
class ResourceMonitor: """Track system resource usage during benchmark execution. Uses psutil to sample memory and CPU usage at regular intervals in a background thread. Updates are recorded as gauges in the provided PerformanceMonitor instance. Example: >>> monitor = PerformanceMonitor() >>> resource_mon = ResourceMonitor(monitor, sample_interval=2.0) >>> resource_mon.start() >>> # ... run benchmark ... >>> resource_mon.stop() >>> snapshot = monitor.snapshot() >>> print(f"Peak memory: {snapshot.gauges['peak_memory_mb']:.1f} MB") """ def __init__(self, monitor: PerformanceMonitor, sample_interval: float = 2.0): """Initialize resource monitor. Args: monitor: PerformanceMonitor instance to record metrics into sample_interval: Seconds between resource samples (default: 2.0) """ self.monitor = monitor self.sample_interval = sample_interval self._stop_event: Any = None # threading.Event, imported lazily self._thread: Any = None # threading.Thread self._peak_memory_mb: float = 0.0 self._process: Any = None # psutil.Process def start(self) -> None: """Start background resource sampling thread.""" try: import threading import psutil except ImportError: # pragma: no cover # If psutil not available, silently skip resource monitoring return if self._thread is not None and self._thread.is_alive(): return # Already running self._process = psutil.Process() self._stop_event = threading.Event() self._peak_memory_mb = 0.0 def _sample_loop(): while not self._stop_event.is_set(): try: # Memory metrics mem_info = self._process.memory_info() memory_mb = mem_info.rss / (1024 * 1024) # Convert bytes to MB memory_percent = self._process.memory_percent() # CPU metrics cpu_percent = self._process.cpu_percent(interval=0.1) # Set gauge values self.monitor.set_gauge("memory_mb", memory_mb) self.monitor.set_gauge("memory_percent", memory_percent) self.monitor.set_gauge("cpu_percent", cpu_percent) # Track peak memory if memory_mb > self._peak_memory_mb: self._peak_memory_mb = memory_mb self.monitor.set_gauge("peak_memory_mb", memory_mb) except Exception: # pragma: no cover # If sampling fails, silently continue pass # Sleep until next sample or stop event self._stop_event.wait(self.sample_interval) self._thread = threading.Thread(target=_sample_loop, daemon=True, name="ResourceMonitor") self._thread.start() def stop(self) -> None: """Stop background resource sampling and record final metrics.""" if self._stop_event is not None: self._stop_event.set() if self._thread is not None: self._thread.join(timeout=5.0) # Wait up to 5 seconds for clean shutdown self._thread = None # Record final peak memory if self._peak_memory_mb > 0: self.monitor.set_gauge("peak_memory_mb", self._peak_memory_mb) def get_current_usage(self) -> dict[str, float]: """Get current resource usage without updating monitor. Returns: Dictionary with memory_mb, memory_percent, cpu_percent keys. Returns zeros if psutil unavailable or not started. """ if self._process is None: return {"memory_mb": 0.0, "memory_percent": 0.0, "cpu_percent": 0.0} try: mem_info = self._process.memory_info() return { "memory_mb": mem_info.rss / (1024 * 1024), "memory_percent": self._process.memory_percent(), "cpu_percent": self._process.cpu_percent(interval=0.1), } except Exception: # pragma: no cover return {"memory_mb": 0.0, "memory_percent": 0.0, "cpu_percent": 0.0} def attach_snapshot_to_result(result: Any, snapshot: PerformanceSnapshot) -> dict[str, Any]: """Attach performance metrics to a BenchmarkResults-like object. The function mutates *result* by setting ``performance_summary`` and ``performance_characteristics`` attributes with the snapshot data. It returns the dictionary representation for convenience so callers can reuse it immediately (e.g. for logging or exporting). """ summary = snapshot.to_dict() result.performance_summary = summary # Preserve compatibility with existing code that reads performance_characteristics existing = getattr(result, "performance_characteristics", None) if not existing: result.performance_characteristics = summary else: # Merge without clobbering custom fields merged = dict(existing) merged.setdefault("monitoring", summary) result.performance_characteristics = merged return summary