Source code for benchbox.utils.data_validation

"""Data validation utilities for benchmark data generation.

This module provides utilities for validating existing benchmark data
and determining if regeneration is needed.

Copyright 2026 Joe Harris / BenchBox Project

Licensed under the MIT License. See LICENSE file in the project root for details.
"""

import json
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Union

logger = logging.getLogger(__name__)


[docs] @dataclass class DataValidationResult: """Results from data validation.""" valid: bool tables_validated: dict[str, bool] missing_tables: list[str] row_count_mismatches: dict[str, tuple[int, int]] # table: (expected, actual) file_size_info: dict[str, int] validation_timestamp: datetime issues: list[str]
[docs] @dataclass class TableExpectation: """Expected data characteristics for a table.""" name: str expected_rows: int expected_files: list[str] min_file_size: int = 0 # Minimum file size in bytes allow_zero_rows: bool = False
[docs] class BenchmarkDataValidator: """Validates benchmark data and determines if regeneration is needed.""" # Standard TPC-H row counts for scale factor 1.0 # Note: nation and region tables have fixed sizes regardless of scale factor TPCH_TABLE_EXPECTATIONS = { "customer": TableExpectation("customer", 150000, ["customer.tbl"]), "lineitem": TableExpectation("lineitem", 6001215, ["lineitem.tbl"]), "nation": TableExpectation("nation", 25, ["nation.tbl"]), # Fixed size table "orders": TableExpectation("orders", 1500000, ["orders.tbl"]), "part": TableExpectation("part", 200000, ["part.tbl"]), "partsupp": TableExpectation("partsupp", 800000, ["partsupp.tbl"]), "region": TableExpectation("region", 5, ["region.tbl"]), # Fixed size table "supplier": TableExpectation("supplier", 10000, ["supplier.tbl"]), } # Standard TPC-DS row counts for scale factor 1.0 (approximate) TPCDS_TABLE_EXPECTATIONS = { "call_center": TableExpectation("call_center", 6, ["call_center.dat"]), "catalog_page": TableExpectation("catalog_page", 11718, ["catalog_page.dat"]), "catalog_returns": TableExpectation("catalog_returns", 144067, ["catalog_returns.dat"]), "catalog_sales": TableExpectation("catalog_sales", 1441548, ["catalog_sales.dat"]), "customer": TableExpectation("customer", 100000, ["customer.dat"]), "customer_address": TableExpectation("customer_address", 50000, ["customer_address.dat"]), "customer_demographics": TableExpectation("customer_demographics", 1920800, ["customer_demographics.dat"]), "date_dim": TableExpectation("date_dim", 73049, ["date_dim.dat"]), "household_demographics": TableExpectation("household_demographics", 7200, ["household_demographics.dat"]), "income_band": TableExpectation("income_band", 20, ["income_band.dat"]), "inventory": TableExpectation("inventory", 11745000, ["inventory.dat"]), "item": TableExpectation("item", 18000, ["item.dat"]), "promotion": TableExpectation("promotion", 300, ["promotion.dat"]), "reason": TableExpectation("reason", 35, ["reason.dat"]), "ship_mode": TableExpectation("ship_mode", 20, ["ship_mode.dat"]), "store": TableExpectation("store", 12, ["store.dat"]), "store_returns": TableExpectation("store_returns", 287514, ["store_returns.dat"]), "store_sales": TableExpectation("store_sales", 2880404, ["store_sales.dat"]), "time_dim": TableExpectation("time_dim", 86400, ["time_dim.dat"]), "warehouse": TableExpectation("warehouse", 5, ["warehouse.dat"]), "web_page": TableExpectation("web_page", 60, ["web_page.dat"]), "web_returns": TableExpectation("web_returns", 71763, ["web_returns.dat"]), "web_sales": TableExpectation("web_sales", 719384, ["web_sales.dat"]), "web_site": TableExpectation("web_site", 30, ["web_site.dat"]), }
[docs] def __init__(self, benchmark_name: str, scale_factor: float = 1.0): """Initialize validator for a specific benchmark. Args: benchmark_name: Name of the benchmark (tpch, tpcds, etc.) scale_factor: Scale factor for row count calculations """ self.benchmark_name = benchmark_name.lower() self.scale_factor = scale_factor # Get table expectations based on benchmark type if self.benchmark_name == "tpch": self.table_expectations = self._scale_expectations(self.TPCH_TABLE_EXPECTATIONS) elif self.benchmark_name == "tpcds": self.table_expectations = self._scale_expectations(self.TPCDS_TABLE_EXPECTATIONS) else: # For other benchmarks, we'll do basic file existence validation self.table_expectations = {}
def _scale_expectations(self, base_expectations: dict[str, TableExpectation]) -> dict[str, TableExpectation]: """Scale row count expectations based on scale factor.""" scaled = {} # Tables that have fixed row counts regardless of scale factor fixed_size_tables = { "nation", "region", "call_center", "reason", "ship_mode", "warehouse", "income_band", "web_site", "store", "time_dim", } for table_name, expectation in base_expectations.items(): if table_name in fixed_size_tables: # Don't scale these tables scaled_rows = expectation.expected_rows else: scaled_rows = int(expectation.expected_rows * self.scale_factor) scaled[table_name] = TableExpectation( name=expectation.name, expected_rows=scaled_rows, expected_files=expectation.expected_files, min_file_size=expectation.min_file_size, allow_zero_rows=expectation.allow_zero_rows, ) return scaled
[docs] def validate_data_directory(self, data_dir: Union[str, Path]) -> DataValidationResult: """Validate data in the specified directory. Args: data_dir: Path to the data directory to validate Returns: DataValidationResult with validation details """ data_path = Path(data_dir) if not data_path.exists(): return DataValidationResult( valid=False, tables_validated={}, missing_tables=[], row_count_mismatches={}, file_size_info={}, validation_timestamp=datetime.now(), issues=[f"Data directory does not exist: {data_path}"], ) # If a manifest exists and is valid for this benchmark/scale, use it for validation manifest = self._read_manifest(data_path) if manifest and self._manifest_matches_config(manifest): return self._validate_with_manifest(data_path, manifest) # Otherwise, check for expected tables/files by scanning and rebuild manifest tables_validated = {} missing_tables = [] row_count_mismatches = {} file_size_info = {} issues = [] # If we have specific expectations for this benchmark if self.table_expectations: for table_name, expectation in self.table_expectations.items(): table_valid = True # Resolve data files that may be compressed or chunked. resolved_files = self._resolve_table_files(data_path, table_name, expectation.expected_files) if not resolved_files: missing_tables.append(table_name) issues.append(f"Missing data files for table {table_name}") table_valid = False else: # Record sizes and check empties for f in resolved_files: try: size = f.stat().st_size file_size_info[f.name] = size if size == 0 and not expectation.allow_zero_rows: table_valid = False issues.append(f"File {f.name} is empty") except Exception as e: table_valid = False issues.append(f"Failed to stat {f}: {e}") # For TPC-H/TPC-DS, perform row count validation (best-effort for compressed formats) if table_valid and self.benchmark_name in ["tpch", "tpcds"]: try: actual_rows = self._count_rows_paths(resolved_files) expected_rows = expectation.expected_rows # Allow for some variance in row counts (±5%) tolerance = max(1, int(expected_rows * 0.05)) if actual_rows > 0 and abs(actual_rows - expected_rows) > tolerance: row_count_mismatches[table_name] = ( expected_rows, actual_rows, ) table_valid = False issues.append( f"Table {table_name}: expected ~{expected_rows} rows, found {actual_rows} rows" ) except Exception as e: # If counting fails (e.g., zstd not available), we do not hard-fail here logger.debug(f"Row counting skipped for {table_name}: {e}") tables_validated[table_name] = table_valid else: # For unknown benchmarks, just check if directory has any data files data_files = ( list(data_path.glob("*.tbl")) + list(data_path.glob("*.dat")) + list(data_path.glob("*.csv")) + list(data_path.glob("*.parquet")) ) if not data_files: issues.append("No data files found in directory") else: for data_file in data_files: file_size = data_file.stat().st_size file_size_info[data_file.name] = file_size if file_size == 0: issues.append(f"Empty data file: {data_file.name}") # Overall validation result all_valid = ( not missing_tables and not row_count_mismatches and all(tables_validated.values()) if tables_validated else len(file_size_info) > 0 ) result = DataValidationResult( valid=all_valid, tables_validated=tables_validated, missing_tables=missing_tables, row_count_mismatches=row_count_mismatches, file_size_info=file_size_info, validation_timestamp=datetime.now(), issues=issues, ) # Build a manifest from the scan for future runs (best-effort) try: self._write_manifest_from_scan(data_path) except Exception as e: logger.debug(f"Skipping manifest write after scan: {e}") return result
def _count_rows_in_files(self, data_dir: Path, file_names: list[str]) -> int: """Count total rows across multiple data files.""" total_rows = 0 for file_name in file_names: file_path = data_dir / file_name if file_path.exists(): try: # Fast line counting with open(file_path, "rb") as f: total_rows += sum(1 for _ in f) except Exception as e: logger.warning(f"Failed to count rows in {file_path}: {e}") return total_rows def _count_rows_paths(self, files: list[Path]) -> int: """Count rows across a list of file paths (supports .dat, .tbl, .csv, .gz; best-effort for .zst).""" total = 0 for f in files: name = f.name.lower() try: if name.endswith(".gz"): import gzip with gzip.open(f, "rt") as g: total += sum(1 for _ in g) elif name.endswith(".zst"): try: import zstandard as zstd # type: ignore dctx = zstd.ZstdDecompressor() with open(f, "rb") as fh, dctx.stream_reader(fh) as reader: import io text = io.TextIOWrapper(reader) total += sum(1 for _ in text) except Exception: # If zstandard is not available, skip row counting for this file logger.debug(f"Skipping zstd row count for {f}") continue else: with open(f, "rb") as fh: total += sum(1 for _ in fh) except Exception as e: logger.debug(f"Failed to count rows in {f}: {e}") continue return total def _resolve_table_files(self, data_dir: Path, table_name: str, expected_files: list[str]) -> list[Path]: """Resolve actual files for a table, considering compression and chunk patterns. Supports: - Exact files from expectations - Compressed variants: .gz, .zst - Chunked variants: table_N_M.dat[.ext] """ candidates: list[Path] = [] # 1) Direct expected files for ef in expected_files: p = data_dir / ef if p.exists(): candidates.append(p) # Compressed variants gz = data_dir / f"{ef}.gz" zst = data_dir / f"{ef}.zst" if gz.exists(): candidates.append(gz) if zst.exists(): candidates.append(zst) # 2) Chunked patterns: table_N_M.dat(.ext) for ext in [".dat", ".dat.gz", ".dat.zst"]: for pf in data_dir.glob(f"{table_name}_*.{ext.split('.')[-1]}"): name = pf.name # Normalize compression suffix handling core = name[:-3] if name.endswith(".gz") else (name[:-4] if name.endswith(".zst") else name) stem = Path(core).stem # removes .dat if stem.startswith(f"{table_name}_"): suffix = stem[len(f"{table_name}_") :] parts = suffix.split("_") if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): candidates.append(pf) # Ensure uniqueness uniq = [] seen = set() for p in candidates: if p.exists() and p not in seen: uniq.append(p) seen.add(p) return uniq
[docs] def should_regenerate_data( self, data_dir: Union[str, Path], force_regenerate: bool = False ) -> tuple[bool, DataValidationResult]: """Determine if data should be regenerated. Args: data_dir: Path to the data directory force_regenerate: If True, always regenerate regardless of validation Returns: Tuple of (should_regenerate, validation_result) """ if force_regenerate: return True, DataValidationResult( valid=False, tables_validated={}, missing_tables=[], row_count_mismatches={}, file_size_info={}, validation_timestamp=datetime.now(), issues=["Force regeneration requested"], ) validation_result = self.validate_data_directory(data_dir) return not validation_result.valid, validation_result
[docs] def print_validation_report(self, result: DataValidationResult, verbose: bool = True) -> None: """Print a human-readable validation report.""" if result.valid: print("✅ Data validation PASSED") if verbose: print(f" Validated {len(result.tables_validated)} tables") total_size = sum(result.file_size_info.values()) print(f" Total data size: {self._format_bytes(total_size)}") else: print("❌ Data validation FAILED") if result.missing_tables: print(f" Missing tables: {', '.join(result.missing_tables)}") if result.row_count_mismatches: print(" Row count mismatches:") for table, (expected, actual) in result.row_count_mismatches.items(): print(f" {table}: expected {expected:,}, found {actual:,}") if result.issues and verbose: print(" Issues:") for issue in result.issues[:5]: # Show first 5 issues print(f" - {issue}") if len(result.issues) > 5: print(f" ... and {len(result.issues) - 5} more issues")
def _format_bytes(self, size_bytes: int) -> str: """Format bytes into human readable format.""" if size_bytes == 0: return "0 B" size_names = ["B", "KB", "MB", "GB", "TB"] import math i = int(math.floor(math.log(size_bytes, 1024))) p = math.pow(1024, i) s = round(size_bytes / p, 2) return f"{s} {size_names[i]}" # ---------- Manifest helpers ---------- def _manifest_path(self, data_dir: Path) -> Path: return Path(data_dir) / "_datagen_manifest.json" def _read_manifest(self, data_dir: Path) -> Optional[dict]: try: mp = self._manifest_path(data_dir) if not mp.exists(): return None with open(mp) as f: return json.load(f) except Exception as e: logger.debug(f"Failed to read manifest: {e}") return None def _manifest_matches_config(self, manifest: dict) -> bool: try: if str(manifest.get("benchmark", "")).lower() != self.benchmark_name: return False # Tolerate int/float types for scale factor return float(manifest.get("scale_factor", -1)) == float(self.scale_factor) except Exception: return False def _validate_with_manifest(self, data_dir: Path, manifest: dict) -> DataValidationResult: from benchbox.utils.datagen_manifest import get_table_files tables_validated: dict[str, bool] = {} missing_tables: list[str] = [] row_count_mismatches: dict[str, tuple[int, int]] = {} file_size_info: dict[str, int] = {} issues: list[str] = [] # If we have concrete expectations, validate them against manifest entries if self.table_expectations: for table_name, expectation in self.table_expectations.items(): # Use get_table_files which handles both V1 and V2 manifest formats entries = get_table_files(manifest, table_name) if not entries: missing_tables.append(table_name) issues.append(f"Missing manifest entries for table {table_name}") tables_validated[table_name] = False continue table_ok = True actual_rows_total = 0 for e in entries: rel = e.get("path") size = int(e.get("size_bytes", -1)) rc = int(e.get("row_count", 0)) if rel is None or size < 0: table_ok = False issues.append(f"Invalid manifest entry for {table_name}") continue fp = data_dir / rel if (not fp.exists()) or fp.stat().st_size != size: table_ok = False issues.append(f"File missing or size mismatch: {rel}") file_size_info[rel] = size actual_rows_total += rc # Row count tolerance check when available if table_ok and expectation.expected_rows > 0 and actual_rows_total > 0: tol = max(1, int(expectation.expected_rows * 0.05)) if abs(actual_rows_total - expectation.expected_rows) > tol: row_count_mismatches[table_name] = ( expectation.expected_rows, actual_rows_total, ) table_ok = False issues.append( f"Table {table_name}: expected ~{expectation.expected_rows} rows, manifest has {actual_rows_total} rows" ) tables_validated[table_name] = table_ok else: # Unknown benchmarks: ensure manifest files exist and are non-empty any_files = False for table_name in manifest.get("tables", {}).keys(): entries = get_table_files(manifest, table_name) table_ok = True for e in entries: rel = e.get("path") size = int(e.get("size_bytes", -1)) if not rel: table_ok = False issues.append(f"Invalid manifest entry for {table_name}") continue fp = data_dir / rel if (not fp.exists()) or fp.stat().st_size != size or size == 0: table_ok = False issues.append(f"Missing/empty file {rel}") file_size_info[rel] = max(size, 0) any_files = True tables_validated[table_name] = table_ok if not any_files: issues.append("No files listed in manifest") all_valid = ( not missing_tables and not row_count_mismatches and all(tables_validated.values()) if tables_validated else False ) return DataValidationResult( valid=all_valid, tables_validated=tables_validated, missing_tables=missing_tables, row_count_mismatches=row_count_mismatches, file_size_info=file_size_info, validation_timestamp=datetime.now(), issues=issues, ) def _write_manifest_from_scan(self, data_dir: Path) -> None: """Create a manifest by scanning files in the directory (best-effort).""" tables: dict[str, list[Path]] = {} # Discover data-like files candidates = [] for pattern in [ "*.tbl", "*.dat", "*.csv", "*.tbl.gz", "*.tbl.zst", "*.dat.gz", "*.dat.zst", ]: candidates.extend(list(Path(data_dir).glob(pattern))) # Group by table name (stem up to first dot for tbl/dat, handle chunk names) for fp in candidates: name = fp.name base = name # Remove compression suffix if base.endswith(".gz"): base = base[:-3] elif base.endswith(".zst"): base = base[:-4] stem = Path(base).stem # removes .tbl/.dat/.csv # Normalize chunked variants by table prefix table = stem # For known patterns like table_N_M or table.tbl.N preserve table name before first underscore/dot-digit if "_" in stem: prefix, rest = stem.split("_", 1) if all(p.isdigit() for p in rest.split("_")): table = prefix if "." in name: parts = name.split(".") if len(parts) >= 3 and parts[-1].isdigit(): table = parts[0] tables.setdefault(table, []).append(fp) manifest = { "benchmark": self.benchmark_name, "scale_factor": self.scale_factor, "compression": {}, "parallel": 1, "created_at": datetime.now(timezone.utc).isoformat(), "generator_version": "scan-v1", "tables": {}, } for table, paths in tables.items(): # Sort for determinism paths = sorted(paths) # Count rows across files rows = self._count_rows_paths(paths) for p in paths: manifest["tables"].setdefault(table, []).append( { "path": p.name, "size_bytes": p.stat().st_size if p.exists() else 0, "row_count": rows if len(paths) == 1 else 0, # store total on single file entries } ) mp = self._manifest_path(data_dir) with open(mp, "w") as f: json.dump(manifest, f, indent=2)