Data Validation Utilities API¶
Complete Python API reference for data validation utilities.
Overview¶
BenchBox provides comprehensive data validation utilities for benchmark data generation. These utilities validate existing data, detect issues, and determine if regeneration is needed. The validation system supports TPC-H, TPC-DS, and generic benchmarks with features like row count validation, file size checking, and compression support.
Key Features:
Automatic Validation: Validates data files against expected row counts
Manifest Support: Uses
_datagen_manifest.jsonfor fast validationCompression Support: Handles
.gzand.zstcompressed filesChunked Files: Supports parallel data generation with chunked files
Row Count Tolerance: Allows ±5% variance in row counts
Scale Factor Awareness: Adjusts expectations based on scale factor
Multiple Formats: Supports
.tbl,.dat,.csv,.parquet
Quick Start¶
from benchbox.utils.data_validation import BenchmarkDataValidator
# Validate TPC-H data
validator = BenchmarkDataValidator("tpch", scale_factor=1.0)
result = validator.validate_data_directory("data/tpch_sf1")
if result.valid:
print("✅ Data validation passed")
else:
print("❌ Data validation failed")
validator.print_validation_report(result)
API Reference¶
BenchmarkDataValidator Class¶
- class BenchmarkDataValidator(benchmark_name, scale_factor=1.0)[source]¶
Bases:
objectValidates benchmark data and determines if regeneration is needed.
- TPCH_TABLE_EXPECTATIONS = {'customer': TableExpectation(name='customer', expected_rows=150000, expected_files=['customer.tbl'], min_file_size=0, allow_zero_rows=False), 'lineitem': TableExpectation(name='lineitem', expected_rows=6001215, expected_files=['lineitem.tbl'], min_file_size=0, allow_zero_rows=False), 'nation': TableExpectation(name='nation', expected_rows=25, expected_files=['nation.tbl'], min_file_size=0, allow_zero_rows=False), 'orders': TableExpectation(name='orders', expected_rows=1500000, expected_files=['orders.tbl'], min_file_size=0, allow_zero_rows=False), 'part': TableExpectation(name='part', expected_rows=200000, expected_files=['part.tbl'], min_file_size=0, allow_zero_rows=False), 'partsupp': TableExpectation(name='partsupp', expected_rows=800000, expected_files=['partsupp.tbl'], min_file_size=0, allow_zero_rows=False), 'region': TableExpectation(name='region', expected_rows=5, expected_files=['region.tbl'], min_file_size=0, allow_zero_rows=False), 'supplier': TableExpectation(name='supplier', expected_rows=10000, expected_files=['supplier.tbl'], min_file_size=0, allow_zero_rows=False)}¶
- TPCDS_TABLE_EXPECTATIONS = {'call_center': TableExpectation(name='call_center', expected_rows=6, expected_files=['call_center.dat'], min_file_size=0, allow_zero_rows=False), 'catalog_page': TableExpectation(name='catalog_page', expected_rows=11718, expected_files=['catalog_page.dat'], min_file_size=0, allow_zero_rows=False), 'catalog_returns': TableExpectation(name='catalog_returns', expected_rows=144067, expected_files=['catalog_returns.dat'], min_file_size=0, allow_zero_rows=False), 'catalog_sales': TableExpectation(name='catalog_sales', expected_rows=1441548, expected_files=['catalog_sales.dat'], min_file_size=0, allow_zero_rows=False), 'customer': TableExpectation(name='customer', expected_rows=100000, expected_files=['customer.dat'], min_file_size=0, allow_zero_rows=False), 'customer_address': TableExpectation(name='customer_address', expected_rows=50000, expected_files=['customer_address.dat'], min_file_size=0, allow_zero_rows=False), 'customer_demographics': TableExpectation(name='customer_demographics', expected_rows=1920800, expected_files=['customer_demographics.dat'], min_file_size=0, allow_zero_rows=False), 'date_dim': TableExpectation(name='date_dim', expected_rows=73049, expected_files=['date_dim.dat'], min_file_size=0, allow_zero_rows=False), 'household_demographics': TableExpectation(name='household_demographics', expected_rows=7200, expected_files=['household_demographics.dat'], min_file_size=0, allow_zero_rows=False), 'income_band': TableExpectation(name='income_band', expected_rows=20, expected_files=['income_band.dat'], min_file_size=0, allow_zero_rows=False), 'inventory': TableExpectation(name='inventory', expected_rows=11745000, expected_files=['inventory.dat'], min_file_size=0, allow_zero_rows=False), 'item': TableExpectation(name='item', expected_rows=18000, expected_files=['item.dat'], min_file_size=0, allow_zero_rows=False), 'promotion': TableExpectation(name='promotion', expected_rows=300, expected_files=['promotion.dat'], min_file_size=0, allow_zero_rows=False), 'reason': TableExpectation(name='reason', expected_rows=35, expected_files=['reason.dat'], min_file_size=0, allow_zero_rows=False), 'ship_mode': TableExpectation(name='ship_mode', expected_rows=20, expected_files=['ship_mode.dat'], min_file_size=0, allow_zero_rows=False), 'store': TableExpectation(name='store', expected_rows=12, expected_files=['store.dat'], min_file_size=0, allow_zero_rows=False), 'store_returns': TableExpectation(name='store_returns', expected_rows=287514, expected_files=['store_returns.dat'], min_file_size=0, allow_zero_rows=False), 'store_sales': TableExpectation(name='store_sales', expected_rows=2880404, expected_files=['store_sales.dat'], min_file_size=0, allow_zero_rows=False), 'time_dim': TableExpectation(name='time_dim', expected_rows=86400, expected_files=['time_dim.dat'], min_file_size=0, allow_zero_rows=False), 'warehouse': TableExpectation(name='warehouse', expected_rows=5, expected_files=['warehouse.dat'], min_file_size=0, allow_zero_rows=False), 'web_page': TableExpectation(name='web_page', expected_rows=60, expected_files=['web_page.dat'], min_file_size=0, allow_zero_rows=False), 'web_returns': TableExpectation(name='web_returns', expected_rows=71763, expected_files=['web_returns.dat'], min_file_size=0, allow_zero_rows=False), 'web_sales': TableExpectation(name='web_sales', expected_rows=719384, expected_files=['web_sales.dat'], min_file_size=0, allow_zero_rows=False), 'web_site': TableExpectation(name='web_site', expected_rows=30, expected_files=['web_site.dat'], min_file_size=0, allow_zero_rows=False)}¶
- __init__(benchmark_name, scale_factor=1.0)[source]¶
Initialize validator for a specific benchmark.
- Parameters:
benchmark_name (str) – Name of the benchmark (tpch, tpcds, etc.)
scale_factor (float) – Scale factor for row count calculations
- validate_data_directory(data_dir)[source]¶
Validate data in the specified directory.
- Parameters:
data_dir (str | Path) – Path to the data directory to validate
- Returns:
DataValidationResult with validation details
- Return type:
- should_regenerate_data(data_dir, force_regenerate=False)[source]¶
Determine if data should be regenerated.
- Parameters:
data_dir (str | Path) – Path to the data directory
force_regenerate (bool) – If True, always regenerate regardless of validation
- Returns:
Tuple of (should_regenerate, validation_result)
- Return type:
tuple[bool, DataValidationResult]
Constructor:
BenchmarkDataValidator(
benchmark_name: str,
scale_factor: float = 1.0
)
Parameters:
benchmark_name (str): Benchmark name (“tpch”, “tpcds”, or other)
scale_factor (float): Scale factor for row count calculations
Supported Benchmarks:
tpch: TPC-H with 8 tables and known row countstpcds: TPC-DS with 24 tables and known row countsOther benchmarks use generic file existence validation
Validation Methods¶
- validate_data_directory(data_dir) DataValidationResult¶
Validate data in the specified directory.
Parameters:
data_dir (str | Path): Path to data directory to validate
Returns:
DataValidationResultwith validation detailsValidation Checks:
Directory existence
Table/file presence
File size (non-zero)
Row counts (±5% tolerance)
Compression support (gz, zst)
Chunked file detection
Example:
validator = BenchmarkDataValidator("tpch", scale_factor=1.0) result = validator.validate_data_directory("data/tpch_sf1") print(f"Valid: {result.valid}") print(f"Tables: {len(result.tables_validated)}") print(f"Missing: {result.missing_tables}") print(f"Mismatches: {result.row_count_mismatches}")
- should_regenerate_data(data_dir, force_regenerate=False) tuple[bool, DataValidationResult]¶
Determine if data should be regenerated.
Parameters:
data_dir (str | Path): Path to data directory
force_regenerate (bool): If True, always regenerate
Returns: Tuple of (should_regenerate, validation_result)
Example:
should_regen, result = validator.should_regenerate_data("data/tpch_sf1") if should_regen: print("Data regeneration needed") print(f"Reasons: {result.issues}") else: print("Existing data is valid")
- print_validation_report(result, verbose=True) None¶
Print a human-readable validation report.
Parameters:
result (DataValidationResult): Validation result to report
verbose (bool): Include detailed issue listing
Example:
result = validator.validate_data_directory("data/tpch_sf1") validator.print_validation_report(result, verbose=True) # Output: # ❌ Data validation FAILED # Missing tables: lineitem, orders # Row count mismatches: # customer: expected 150,000, found 140,000 # Issues: # - Missing data files for table lineitem # - Table customer: expected ~150000 rows, found 140000 rows
DataValidationResult Class¶
Result object returned by validation operations.
- class DataValidationResult(valid, tables_validated, missing_tables, row_count_mismatches, file_size_info, validation_timestamp, issues)[source]¶
Bases:
objectResults from data validation.
- valid: bool¶
- tables_validated: dict[str, bool]¶
- missing_tables: list[str]¶
- row_count_mismatches: dict[str, tuple[int, int]]¶
- file_size_info: dict[str, int]¶
- validation_timestamp: datetime¶
- issues: list[str]¶
- __init__(valid, tables_validated, missing_tables, row_count_mismatches, file_size_info, validation_timestamp, issues)¶
Fields:
valid (bool): Whether data passed all validations
tables_validated (dict[str, bool]): Per-table validation status
missing_tables (list[str]): Tables with missing data files
row_count_mismatches (dict[str, tuple[int, int]]): Tables with row count issues (expected, actual)
file_size_info (dict[str, int]): File sizes in bytes
validation_timestamp (datetime): When validation was performed
issues (list[str]): Human-readable issue descriptions
Example:
result = validator.validate_data_directory("data/tpch_sf1")
if not result.valid:
print("Validation issues:")
for issue in result.issues:
print(f" - {issue}")
if result.missing_tables:
print(f"\nMissing tables: {', '.join(result.missing_tables)}")
if result.row_count_mismatches:
print("\nRow count mismatches:")
for table, (expected, actual) in result.row_count_mismatches.items():
diff_pct = abs(actual - expected) / expected * 100
print(f" {table}: expected {expected:,}, actual {actual:,} ({diff_pct:.1f}% diff)")
TableExpectation Class¶
Expected data characteristics for a table.
- class TableExpectation(name, expected_rows, expected_files, min_file_size=0, allow_zero_rows=False)[source]¶
Bases:
objectExpected data characteristics for a table.
- name: str¶
- expected_rows: int¶
- expected_files: list[str]¶
- min_file_size: int = 0¶
- allow_zero_rows: bool = False¶
- __init__(name, expected_rows, expected_files, min_file_size=0, allow_zero_rows=False)¶
Fields:
name (str): Table name
expected_rows (int): Expected row count at scale factor 1.0
expected_files (list[str]): Expected file names
min_file_size (int): Minimum file size in bytes (default: 0)
allow_zero_rows (bool): Whether zero-row tables are valid (default: False)
Usage Examples¶
Basic TPC-H Validation¶
from benchbox.utils.data_validation import BenchmarkDataValidator
# Validate TPC-H SF 1.0 data
validator = BenchmarkDataValidator("tpch", scale_factor=1.0)
result = validator.validate_data_directory("data/tpch_sf1")
if result.valid:
print("✅ All TPC-H tables valid")
total_size = sum(result.file_size_info.values())
print(f"Total size: {total_size / (1024**3):.2f} GB")
else:
print("❌ Validation failed")
validator.print_validation_report(result)
TPC-DS Validation with Scale Factor¶
# Validate TPC-DS SF 0.1 data
validator = BenchmarkDataValidator("tpcds", scale_factor=0.1)
result = validator.validate_data_directory("data/tpcds_sf0.1")
# Check specific tables
if not result.tables_validated.get("store_sales", False):
print("store_sales table has issues")
if "store_sales" in result.missing_tables:
print(" - Missing data files")
if "store_sales" in result.row_count_mismatches:
expected, actual = result.row_count_mismatches["store_sales"]
print(f" - Row count: expected {expected:,}, found {actual:,}")
Compressed Data Validation¶
# Validator automatically handles .gz and .zst compression
validator = BenchmarkDataValidator("tpch", scale_factor=1.0)
# Works with both compressed and uncompressed files
# - customer.tbl
# - customer.tbl.gz
# - customer.tbl.zst
result = validator.validate_data_directory("data/tpch_compressed")
if result.valid:
print("Compressed data is valid")
for file, size in result.file_size_info.items():
print(f" {file}: {size / (1024**2):.2f} MB")
Chunked/Parallel Data Validation¶
# Validator handles chunked files from parallel generation
# - lineitem_1_4.dat
# - lineitem_2_4.dat
# - lineitem_3_4.dat
# - lineitem_4_4.dat
validator = BenchmarkDataValidator("tpch", scale_factor=10.0)
result = validator.validate_data_directory("data/tpch_sf10_parallel")
if result.valid:
print("Chunked data validated successfully")
Data Regeneration Decision¶
from pathlib import Path
def ensure_valid_data(benchmark_name, scale_factor, data_dir):
"""Ensure data is valid, regenerating if needed."""
validator = BenchmarkDataValidator(benchmark_name, scale_factor)
should_regen, result = validator.should_regenerate_data(data_dir)
if should_regen:
print(f"Data needs regeneration: {', '.join(result.issues[:3])}")
# Generate data
if benchmark_name == "tpch":
from benchbox.tpch import TPCH
bench = TPCH(scale_factor=scale_factor, output_dir=data_dir)
bench.generate_data()
elif benchmark_name == "tpcds":
from benchbox.tpcds import TPCDS
bench = TPCDS(scale_factor=scale_factor, output_dir=data_dir)
bench.generate_data()
# Validate after generation
result = validator.validate_data_directory(data_dir)
if result.valid:
print("✅ Data generation successful")
else:
print("❌ Data generation failed validation")
validator.print_validation_report(result)
else:
print("Existing data is valid")
ensure_valid_data("tpch", 1.0, "data/tpch_sf1")
Custom Benchmark Validation¶
# For custom benchmarks, validation checks for any data files
validator = BenchmarkDataValidator("custom_benchmark", scale_factor=1.0)
result = validator.validate_data_directory("data/custom")
# Checks for .tbl, .dat, .csv, .parquet files
if result.valid:
print(f"Found {len(result.file_size_info)} data files")
for file, size in result.file_size_info.items():
print(f" {file}: {size / 1024:.2f} KB")
else:
print("No valid data files found")
Manifest-Based Validation¶
# Validator uses _datagen_manifest.json for fast validation
# Manifest is auto-generated during data generation
validator = BenchmarkDataValidator("tpch", scale_factor=1.0)
result = validator.validate_data_directory("data/tpch_sf1")
# With manifest: fast validation (reads JSON, checks file sizes)
# Without manifest: full validation (counts rows, scans directory)
if result.valid:
print(f"Validated at {result.validation_timestamp}")
Validation Report Integration¶
import sys
validator = BenchmarkDataValidator("tpcds", scale_factor=1.0)
result = validator.validate_data_directory("data/tpcds_sf1")
# Exit with error code if validation fails
if not result.valid:
validator.print_validation_report(result, verbose=True)
sys.exit(1)
print(f"All {len(result.tables_validated)} tables validated")
Best Practices¶
Always Validate Before Benchmarking
# Check data validity before running benchmarks validator = BenchmarkDataValidator("tpch", scale_factor=1.0) should_regen, _ = validator.should_regenerate_data("data/tpch_sf1") if should_regen: # Regenerate data first benchmark.generate_data() # Now run benchmark results = adapter.run_benchmark(benchmark)
Use Manifest for Performance
# Manifest-based validation avoids re-scanning files # Let data generation create manifest automatically benchmark.generate_data() # Creates _datagen_manifest.json # Future validations will be fast result = validator.validate_data_directory(data_dir)
Handle Compressed Data
# Validator handles compression automatically # Use compression for large datasets benchmark = TPCH(scale_factor=10.0, compression="zstd") benchmark.generate_data() # Validation works transparently result = validator.validate_data_directory(benchmark.output_dir)
Check Specific Tables
result = validator.validate_data_directory(data_dir) # Check critical tables only critical_tables = ["customer", "orders", "lineitem"] all_critical_valid = all( result.tables_validated.get(t, False) for t in critical_tables )
Tolerate Small Variances
# Validator allows ±5% row count variance # This is normal for some data generators if result.row_count_mismatches: for table, (expected, actual) in result.row_count_mismatches.items(): variance_pct = abs(actual - expected) / expected * 100 if variance_pct > 10: print(f"⚠️ Large variance in {table}: {variance_pct:.1f}%")
Common Issues¶
- Issue: “Missing data files for table X”
Cause: Data file not found in directory
Solution: Regenerate data or check file name format
Check: Look for X.tbl, X.dat, X.tbl.gz, X_1_N.dat variants
- Issue: “Row count mismatch”
Cause: File has different row count than expected
Solution: Regenerate data if variance > 5%
Note: Some variance is normal due to sampling or scale factor rounding
- Issue: “Empty data file”
Cause: File exists but has 0 bytes
Solution: Regenerate data; likely a generation failure
- Issue: “Skipping zstd row count”
Cause: zstandard library not installed
Solution: Install zstandard:
pip install zstandardImpact: Validation skips row counting for .zst files (file existence still checked)
- Issue: “No data files found in directory”
Cause: Wrong directory or no data generated
Solution: Verify directory path and generate data first
- Issue: “Manifest mismatch”
Cause: Manifest is for different scale factor or benchmark
Solution: Delete manifest and re-validate, or regenerate data
See Also¶
Data Generation - Data generation guide
Base Benchmark API - Base benchmark interface
Cloud Storage Integration API - Cloud storage utilities
/TROUBLESHOOTING - Troubleshooting guide
Standard Row Counts¶
TPC-H Tables (Scale Factor 1.0)¶
Table |
Rows |
Notes |
|---|---|---|
customer |
150,000 |
Scales with SF |
lineitem |
6,001,215 |
Scales with SF (largest table) |
nation |
25 |
Fixed size (does not scale) |
orders |
1,500,000 |
Scales with SF |
part |
200,000 |
Scales with SF |
partsupp |
800,000 |
Scales with SF |
region |
5 |
Fixed size (does not scale) |
supplier |
10,000 |
Scales with SF |
TPC-DS Tables (Scale Factor 1.0)¶
24 tables with varying row counts. Key tables:
Table |
Rows (Approx) |
Notes |
|---|---|---|
store_sales |
2,880,404 |
Largest fact table |
catalog_sales |
1,441,548 |
Fact table |
web_sales |
719,384 |
Fact table |
inventory |
11,745,000 |
Very large table |
customer |
100,000 |
Dimension table |
date_dim |
73,049 |
Fixed size (does not scale) |
time_dim |
86,400 |
Fixed size (does not scale) |
See the TPC-DS specification for complete row counts.