Performance Monitoring Utilities API¶
Complete Python API reference for performance monitoring utilities.
Overview¶
BenchBox provides lightweight performance monitoring primitives for recording runtime metrics, taking snapshots, persisting history, and detecting regressions. The monitoring system is framework-agnostic and can be used by CLI tools, tests, and custom benchmark runners.
Key Features:
Multiple Metric Types: Counters, gauges, and timing measurements
Statistical Analysis: Mean, median, percentiles (P90, P95, P99)
Snapshot System: Immutable snapshots with timestamps
Performance History: Persistent storage with rolling window
Regression Detection: Automatic detection with configurable thresholds
Trend Analysis: Identify improving, degrading, or stable trends
Anomaly Detection: Statistical outlier detection
Quick Start¶
from benchbox.monitoring.performance import PerformanceMonitor
# Create monitor
monitor = PerformanceMonitor()
# Record metrics
monitor.increment_counter("queries_executed")
monitor.set_gauge("memory_usage_mb", 2048.5)
with monitor.time_operation("query_execution"):
# Run benchmark query
result = execute_query(query)
# Get snapshot
snapshot = monitor.snapshot()
print(f"Queries: {snapshot.counters['queries_executed']}")
print(f"Avg time: {snapshot.timings['query_execution'].mean:.3f}s")
API Reference¶
PerformanceMonitor Class¶
- class PerformanceMonitor[source]¶
Bases:
objectRecord counters, gauges, and timing metrics for benchmark execution.
Constructor:
PerformanceMonitor()
Recording Methods¶
- increment_counter(name, value=1) None¶
Increment a named counter.
Parameters:
name (str): Counter name
value (int): Amount to increment (default: 1)
Example:
monitor.increment_counter("queries_executed") monitor.increment_counter("rows_processed", 1000)
- set_gauge(name, value) None¶
Record the latest value for a gauge metric.
Parameters:
name (str): Gauge name
value (float): Gauge value
Example:
monitor.set_gauge("memory_usage_mb", 2048.5) monitor.set_gauge("cpu_percent", 75.2)
- record_timing(name, duration_seconds) None¶
Record a single timing observation.
Parameters:
name (str): Timing name
duration_seconds (float): Duration in seconds
Example:
import time start = time.perf_counter() execute_query(query) elapsed = time.perf_counter() - start monitor.record_timing("query_execution", elapsed)
- time_operation(name) ContextManager¶
Context manager that records timing on exit.
Parameters:
name (str): Operation name
Example:
with monitor.time_operation("data_loading"): load_data_to_database(data_files) with monitor.time_operation("query_Q1"): result = conn.execute(query_1).fetchall()
- set_metadata(key, value) None¶
Attach arbitrary metadata to the snapshot.
Parameters:
key (str): Metadata key
value (Any): Metadata value
Example:
monitor.set_metadata("benchmark", "tpch") monitor.set_metadata("scale_factor", 1.0) monitor.set_metadata("database", "duckdb")
- update_metadata(items) None¶
Bulk update metadata with dict.
Parameters:
items (dict[str, Any]): Metadata dict
Example:
monitor.update_metadata({ "benchmark": "tpcds", "scale_factor": 10.0, "queries": 99, "platform": "databricks" })
Snapshot Methods¶
- snapshot() PerformanceSnapshot¶
Create an immutable snapshot of currently recorded metrics.
Returns:
PerformanceSnapshotwith all metricsExample:
snapshot = monitor.snapshot() print(f"Timestamp: {snapshot.timestamp}") print(f"Counters: {snapshot.counters}") print(f"Timings: {snapshot.timings}")
- summary() dict¶
Return a plain dictionary representation for serialization.
Returns: Dict representation of snapshot
Example:
summary = monitor.summary() import json with open("metrics.json", "w") as f: json.dump(summary, f, indent=2)
- reset() None¶
Clear all recorded metrics and metadata.
Example:
# Reset between benchmark runs for benchmark in benchmarks: monitor.reset() run_benchmark(benchmark) snapshot = monitor.snapshot() save_results(snapshot)
PerformanceSnapshot Class¶
Immutable snapshot of recorded metrics.
- class PerformanceSnapshot(timestamp, counters, gauges, timings, metadata=<factory>)[source]¶
Bases:
objectSerializable snapshot of recorded metrics.
- timestamp: str¶
- counters: dict[str, int]¶
- gauges: dict[str, float]¶
- timings: dict[str, TimingStats]¶
- metadata: dict[str, Any]¶
- __init__(timestamp, counters, gauges, timings, metadata=<factory>)¶
Fields:
timestamp (str): ISO 8601 timestamp
counters (dict[str, int]): Counter values
gauges (dict[str, float]): Gauge values
timings (dict[str, TimingStats]): Timing statistics
metadata (dict[str, Any]): Attached metadata
- to_dict() dict¶
Convert snapshot to dictionary.
TimingStats Class¶
Aggregate timing statistics for a metric.
- class TimingStats(count, minimum, maximum, mean, median, p90, p95, p99, total)[source]¶
Bases:
objectAggregate timing statistics for a single metric.
- count: int¶
- minimum: float¶
- maximum: float¶
- mean: float¶
- median: float¶
- p90: float¶
- p95: float¶
- p99: float¶
- total: float¶
- __init__(count, minimum, maximum, mean, median, p90, p95, p99, total)¶
Fields:
count (int): Number of observations
minimum (float): Minimum value
maximum (float): Maximum value
mean (float): Arithmetic mean
median (float): Median value
p90 (float): 90th percentile
p95 (float): 95th percentile
p99 (float): 99th percentile
total (float): Sum of all values
PerformanceHistory Class¶
Persist performance snapshots and detect regressions.
- class PerformanceHistory(storage_path, max_entries=50)[source]¶
Bases:
objectPersist performance snapshots and surface trends/regressions.
- record(snapshot, regression_thresholds=None, prefer_lower_metrics=None)[source]¶
Persist snapshot and return any regression alerts.
- Parameters:
snapshot (PerformanceSnapshot) – Snapshot to persist.
regression_thresholds (dict[str, float] | None) – Optional per-metric thresholds (percent).
prefer_lower_metrics (list[str] | None) – Metrics where higher values indicate regressions.
Constructor:
PerformanceHistory(
storage_path: Path,
max_entries: int = 50
)
Parameters:
storage_path (Path): Path to JSON history file
max_entries (int): Maximum snapshots to keep (rolling window)
- record(snapshot, regression_thresholds=None, prefer_lower_metrics=None) list[PerformanceRegressionAlert]¶
Persist snapshot and return any regression alerts.
Parameters:
snapshot (PerformanceSnapshot): Snapshot to persist
regression_thresholds (dict[str, float] | None): Per-metric thresholds (as percentages)
prefer_lower_metrics (list[str] | None): Metrics where higher values indicate regressions
Returns: List of
PerformanceRegressionAlertobjectsExample:
from pathlib import Path history = PerformanceHistory(Path("performance_history.json")) snapshot = monitor.snapshot() alerts = history.record( snapshot, regression_thresholds={ "query_execution": 0.15, # 15% threshold "memory_usage_mb": 0.20 # 20% threshold }, prefer_lower_metrics=["query_execution", "memory_usage_mb"] ) for alert in alerts: print(f"⚠️ {alert.metric}: {alert.change_percent:.1%} {alert.direction}")
- trend(metric, window=10) str¶
Return simple trend descriptor for metric.
Parameters:
metric (str): Metric name
window (int): Number of recent entries to analyze
Returns: Trend descriptor (“improving”, “degrading”, “stable”, “insufficient_data”)
Example:
trend = history.trend("query_execution", window=10) if trend == "degrading": print("⚠️ Performance is degrading") elif trend == "improving": print("✅ Performance is improving")
- metric_history(metric) list[float]¶
Get historical values for a metric.
Parameters:
metric (str): Metric name
Returns: List of historical values
Example:
values = history.metric_history("query_execution") import matplotlib.pyplot as plt plt.plot(values) plt.title("Query Execution Time Trend") plt.show()
PerformanceRegressionAlert Class¶
Represents a detected performance regression.
- class PerformanceRegressionAlert(metric, baseline, current, change_percent, threshold_percent, direction)[source]¶
Bases:
objectRepresents a detected performance regression for a metric.
- metric: str¶
- baseline: float¶
- current: float¶
- change_percent: float¶
- threshold_percent: float¶
- direction: str¶
- __init__(metric, baseline, current, change_percent, threshold_percent, direction)¶
Fields:
metric (str): Metric name
baseline (float): Baseline value
current (float): Current value
change_percent (float): Change as percentage (e.g., 0.15 = 15%)
threshold_percent (float): Threshold that was exceeded
direction (str): “increase” or “decrease”
PerformanceTracker Class¶
Simplified file-backed metric recorder.
- class PerformanceTracker(storage_path=None)[source]¶
Bases:
objectFile-backed metric recorder with basic trend/anomaly analysis.
Constructor:
PerformanceTracker(storage_path: Path | None = None)
Parameters:
storage_path (Path | None): Storage path (defaults to temp directory)
- record_metric(metric_name, value, timestamp=None) None¶
Record a metric measurement.
- get_trend(metric_name, days=30) dict¶
Get trend information for metric over specified days.
- detect_anomalies(metric_name, threshold_multiplier=2.0) list[dict]¶
Return entries whose deviation exceeds threshold * std dev.
Usage Examples¶
Basic Monitoring¶
from benchbox.monitoring.performance import PerformanceMonitor
monitor = PerformanceMonitor()
# Set benchmark metadata
monitor.update_metadata({
"benchmark": "tpch",
"scale_factor": 1.0,
"database": "duckdb"
})
# Run benchmark and record metrics
for query_id in range(1, 23):
with monitor.time_operation(f"query_Q{query_id}"):
result = execute_query(query_id)
rows = len(result)
monitor.increment_counter("queries_executed")
monitor.increment_counter("rows_returned", rows)
# Get summary
snapshot = monitor.snapshot()
print(f"Executed {snapshot.counters['queries_executed']} queries")
print(f"Total time: {snapshot.timings['query_Q1'].total:.2f}s")
Statistical Analysis¶
# Run query multiple times for stable timing
monitor = PerformanceMonitor()
for iteration in range(10):
with monitor.time_operation("query_performance"):
execute_query(query)
snapshot = monitor.snapshot()
stats = snapshot.timings["query_performance"]
print(f"Count: {stats.count}")
print(f"Mean: {stats.mean:.3f}s")
print(f"Median: {stats.median:.3f}s")
print(f"P95: {stats.p95:.3f}s")
print(f"P99: {stats.p99:.3f}s")
print(f"Min: {stats.minimum:.3f}s")
print(f"Max: {stats.maximum:.3f}s")
Regression Detection¶
from pathlib import Path
from benchbox.monitoring.performance import (
PerformanceMonitor,
PerformanceHistory
)
# Setup history
history = PerformanceHistory(
Path("benchbox_performance.json"),
max_entries=100
)
# Run benchmark
monitor = PerformanceMonitor()
monitor.set_metadata("version", "1.2.3")
with monitor.time_operation("full_benchmark"):
run_full_benchmark()
# Record and check for regressions
snapshot = monitor.snapshot()
alerts = history.record(
snapshot,
regression_thresholds={
"full_benchmark": 0.10, # 10% threshold
},
prefer_lower_metrics=["full_benchmark"]
)
if alerts:
print("⚠️ Performance regressions detected:")
for alert in alerts:
print(f" {alert.metric}: {alert.baseline:.2f}s → {alert.current:.2f}s "
f"({alert.change_percent:.1%} {alert.direction})")
else:
print("✅ No regressions detected")
Trend Analysis¶
history = PerformanceHistory(Path("performance.json"))
# Analyze trends
metrics_to_check = [
"query_execution",
"data_loading",
"memory_usage_mb"
]
for metric in metrics_to_check:
trend = history.trend(metric, window=20)
if trend == "degrading":
print(f"⚠️ {metric}: Performance degrading")
elif trend == "improving":
print(f"✅ {metric}: Performance improving")
elif trend == "stable":
print(f"➖ {metric}: Performance stable")
else:
print(f"❓ {metric}: Insufficient data")
Performance Dashboard¶
import json
from pathlib import Path
def create_performance_dashboard(history_path: Path):
"""Create performance dashboard from history."""
history = PerformanceHistory(history_path)
dashboard = {
"metrics": {},
"trends": {},
"latest_snapshot": None
}
# Get all metrics from latest snapshot
if history._history:
latest = history._history[-1]
dashboard["latest_snapshot"] = latest
# Analyze each timing metric
for metric_name in latest.get("timings", {}).keys():
values = history.metric_history(metric_name)
trend = history.trend(metric_name, window=10)
dashboard["metrics"][metric_name] = {
"current": values[-1] if values else 0,
"history": values[-20:], # Last 20 values
"trend": trend
}
return dashboard
# Generate dashboard
dashboard = create_performance_dashboard(Path("performance.json"))
with open("dashboard.json", "w") as f:
json.dump(dashboard, f, indent=2)
CI/CD Integration¶
import sys
from pathlib import Path
from benchbox.monitoring.performance import (
PerformanceMonitor,
PerformanceHistory
)
def ci_performance_check():
"""Performance check for CI/CD pipeline."""
monitor = PerformanceMonitor()
monitor.set_metadata("ci_run", True)
monitor.set_metadata("commit", os.getenv("CI_COMMIT_SHA"))
# Run benchmarks
with monitor.time_operation("ci_benchmark"):
run_ci_benchmarks()
# Check for regressions
history = PerformanceHistory(
Path("ci_performance_history.json"),
max_entries=50
)
snapshot = monitor.snapshot()
alerts = history.record(
snapshot,
regression_thresholds={"ci_benchmark": 0.15},
prefer_lower_metrics=["ci_benchmark"]
)
if alerts:
print("❌ Performance regression detected!")
for alert in alerts:
print(f" {alert.metric}: {alert.change_percent:.1%} slower")
sys.exit(1)
else:
print("✅ Performance check passed")
sys.exit(0)
ci_performance_check()
Best Practices¶
Use Context Managers for Timing
# Good: Automatic timing with monitor.time_operation("operation"): do_work() # Avoid: Manual timing (error-prone) start = time.time() do_work() monitor.record_timing("operation", time.time() - start)
Record Metadata
# Always record context monitor.update_metadata({ "benchmark": "tpch", "scale_factor": 1.0, "database": "duckdb", "version": "0.9.0", "date": datetime.now().isoformat() })
Use Appropriate Metric Types
# Counters: Things that accumulate monitor.increment_counter("queries_executed") # Gauges: Current values monitor.set_gauge("memory_usage_mb", current_memory) # Timings: Operations with duration with monitor.time_operation("query"): execute_query()
Set Appropriate Regression Thresholds
# Conservative: 10% threshold thresholds = {"query_time": 0.10} # Moderate: 15% threshold thresholds = {"query_time": 0.15} # Permissive: 25% threshold thresholds = {"query_time": 0.25}
Maintain History Rolling Window
# Keep manageable history (50-100 entries) history = PerformanceHistory( Path("performance.json"), max_entries=100 # ~100 recent runs )
Common Issues¶
- Issue: “Insufficient data for trend”
Cause: Less than 2 data points
Solution: Run more iterations or reduce window size
- Issue: False regression alerts
Cause: Natural variance or aggressive thresholds
Solution: Increase threshold (e.g., 0.15 → 0.20) or run more iterations
- Issue: Missing timings in snapshot
Cause: Operation not recorded or exception during timing
Solution: Ensure all operations use time_operation() and handle exceptions
- Issue: History file grows too large
Cause: max_entries set too high
Solution: Reduce max_entries (default: 50, max recommended: 200)
- Issue: Percentile calculations unstable
Cause: Too few samples (count < 10)
Solution: Record more timing observations per snapshot
See Also¶
Result Analysis API - Result analysis and comparison utilities
Advanced Performance Optimization Guide - Performance optimization guide
CI/CD Integration Guide - CI/CD integration guide
/TROUBLESHOOTING - Troubleshooting guide