TPC-DI ETL Implementation Guide¶
Overview¶
The TPC-DI ETL implementation in BenchBox provides a systematic framework for testing Extract, Transform, and Load (ETL) processes in data warehousing scenarios. Unlike traditional benchmarks that focus solely on query performance, TPC-DI emphasizes the complete data integration pipeline, including data transformation, quality validation, and loading of both historical and incremental data.
This guide covers the improved ETL features, configuration options, best practices, and implementation patterns for enterprise-grade data integration testing.
ETL Architecture¶
Core Components¶
The TPC-DI ETL implementation consists of several key components:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Source Data │ │ Staging Area │ │ Data Warehouse │
│ │ │ │ │ │
│ • CSV Files │───▶│ • Transformed │───▶│ • Dimension │
│ • XML Files │ │ Data │ │ Tables │
│ • Fixed-Width │ │ • Data Quality │ │ • Fact Tables │
│ • JSON Files │ │ Checks │ │ • Audit Trails │
└─────────────────┘ └─────────────────┘ └─────────────────┘
ETL Pipeline Phases¶
Extract Phase: Generate and read source data in multiple formats
Transform Phase: Apply business rules, data cleansing, and format conversion
Load Phase: Insert data into target warehouse with proper SCD handling
Validate Phase: Run data quality checks and business rule validation
Directory Structure¶
When ETL mode is enabled, the following directory structure is created:
tpcdi_data/
├── source/ # Source data files by format and batch type
│ ├── csv/
│ │ ├── historical/
│ │ ├── incremental/
│ │ └── scd/
│ ├── xml/
│ ├── fixed_width/
│ └── json/
├── staging/ # Transformed data files
└── warehouse/ # Final warehouse data (optional file output)
Getting Started¶
Basic ETL Setup¶
from benchbox import TPCDI
# Initialize TPC-DI with ETL mode enabled
tpcdi = TPCDI(
scale_factor=1.0,
output_dir="tpcdi_etl",
etl_mode=True, # Enable ETL capabilities
verbose=True # Enable detailed logging
)
# Check ETL status
etl_status = tpcdi.get_etl_status()
print(f"ETL mode enabled: {etl_status['etl_mode_enabled']}")
print(f"Supported formats: {etl_status['supported_formats']}")
print(f"Batch types: {etl_status['batch_types']}")
Backwards Compatibility¶
The ETL mode is fully backwards compatible with existing TPC-DI usage:
# Traditional mode (default) - generates warehouse tables directly
tpcdi_traditional = TPCDI(scale_factor=1.0, etl_mode=False)
data_files = tpcdi_traditional.generate_data()
# ETL mode - provides full ETL pipeline capabilities
tpcdi_etl = TPCDI(scale_factor=1.0, etl_mode=True)
source_files = tpcdi_etl.generate_source_data()
ETL Mode Configuration¶
Configuration Options¶
tpcdi = TPCDI(
scale_factor=1.0, # Data volume scale factor
output_dir="tpcdi_data", # Base directory for all artifacts
etl_mode=True, # Enable ETL mode
verbose=True, # Enable detailed logging
# ETL-specific configuration (future extensions)
batch_size=10000, # Records per batch for loading
parallel_workers=4, # Parallel processing workers
validate_on_load=True # Run validation after each load
)
Scale Factor Guidelines¶
Scale Factor |
Source Data Size |
ETL Complexity |
Use Case |
|---|---|---|---|
0.01 |
~1 MB |
Minimal |
Unit testing, development |
0.1 |
~10 MB |
Standard |
Integration testing |
1.0 |
~100 MB |
Full complexity |
Performance testing |
3.0+ |
~300+ MB |
Enterprise-scale |
Stress testing |
Environment Configuration¶
# Development environment
tpcdi_dev = TPCDI(
scale_factor=0.05,
output_dir="dev_etl",
etl_mode=True,
verbose=True
)
# Production-like testing
tpcdi_prod = TPCDI(
scale_factor=3.0,
output_dir="prod_etl",
etl_mode=True,
verbose=False # Reduce logging in production tests
)
Source Data Generation¶
Supported File Formats¶
The ETL implementation supports multiple source data formats to simulate real-world data integration scenarios:
1. CSV Format¶
Use case: Traditional database exports, flat file feeds
Characteristics: Delimited text, header rows, standard data types
Example tables: Customer data, account information
# Generate CSV source files
csv_files = tpcdi.generate_source_data(
formats=['csv'],
batch_types=['historical', 'incremental']
)
2. XML Format¶
Use case: Web services, enterprise application integration
Characteristics: Hierarchical structure, nested elements
Example tables: Company data, reference information
# Generate XML source files
xml_files = tpcdi.generate_source_data(
formats=['xml'],
batch_types=['historical']
)
3. Fixed-Width Format¶
Use case: Legacy mainframe systems, financial data feeds
Characteristics: Fixed column positions, no delimiters
Example tables: Security data, market information
# Generate fixed-width source files
fixed_width_files = tpcdi.generate_source_data(
formats=['fixed_width'],
batch_types=['historical', 'incremental']
)
4. JSON Format¶
Use case: NoSQL databases, REST APIs, modern applications
Characteristics: Schema-flexible, nested objects
Example tables: Account data, transaction logs
# Generate JSON source files
json_files = tpcdi.generate_source_data(
formats=['json'],
batch_types=['incremental', 'scd']
)
Batch Types¶
Historical Batch¶
Purpose: Initial data warehouse population
Characteristics: Full data set, no incremental logic required
Volume: 100% of scale factor data
# Generate historical batch data
historical_files = tpcdi.generate_source_data(
formats=['csv', 'xml', 'fixed_width', 'json'],
batch_types=['historical']
)
Incremental Batch¶
Purpose: Regular updates, new records
Characteristics: Subset of data, append-only operations
Volume: ~10-20% of scale factor data
# Generate incremental batch data
incremental_files = tpcdi.generate_source_data(
formats=['csv', 'json'],
batch_types=['incremental']
)
SCD (Slowly Changing Dimension) Batch¶
Purpose: Dimension updates, historical tracking
Characteristics: Changed records requiring SCD Type 2 processing
Volume: ~5-10% of scale factor data
# Generate SCD batch data
scd_files = tpcdi.generate_source_data(
formats=['csv', 'xml'],
batch_types=['scd']
)
Multi-Format Generation Example¶
# Generate systematic source data set
all_source_files = tpcdi.generate_source_data(
formats=['csv', 'xml', 'fixed_width', 'json'],
batch_types=['historical', 'incremental', 'scd']
)
print("Generated source files:")
for format_type, files in all_source_files.items():
print(f"{format_type}: {len(files)} files")
for file_path in files:
file_size = Path(file_path).stat().st_size
print(f" - {Path(file_path).name} ({file_size:,} bytes)")
ETL Pipeline Execution¶
Basic Pipeline Execution¶
import sqlite3
from benchbox import TPCDI
# Initialize ETL instance
tpcdi = TPCDI(scale_factor=0.1, output_dir="tpcdi_pipeline", etl_mode=True)
# Create target database
conn = sqlite3.connect("warehouse.db")
# Create warehouse schema
schema_sql = tpcdi.get_create_tables_sql()
conn.executescript(schema_sql)
# Run ETL pipeline
pipeline_result = tpcdi.run_etl_pipeline(
connection=conn,
batch_type="historical",
validate_data=True
)
# Check results
if pipeline_result['success']:
print(f"ETL completed in {pipeline_result['total_duration']:.2f} seconds")
print(f"Records processed: {pipeline_result['phases']['transform']['records_processed']}")
print(f"Data quality score: {pipeline_result['validation_results']['data_quality_score']}")
else:
print(f"ETL failed: {pipeline_result.get('error', 'Unknown error')}")
conn.close()
Advanced-level Pipeline Execution¶
import duckdb
from datetime import datetime
# Use DuckDB for better SQL support
conn = duckdb.connect("warehouse.duckdb")
# Initialize schema
schema_sql = tpcdi.get_create_tables_sql()
conn.execute("BEGIN TRANSACTION")
for statement in schema_sql.split(';'):
if statement.strip():
conn.execute(statement)
conn.execute("COMMIT")
# Process multiple batch types in sequence
batch_types = ['historical', 'incremental', 'incremental', 'scd']
pipeline_results = []
for i, batch_type in enumerate(batch_types, 1):
print(f"Processing batch {i}/{len(batch_types)}: {batch_type}")
start_time = datetime.now()
result = tpcdi.run_etl_pipeline(
connection=conn,
batch_type=batch_type,
validate_data=(batch_type in ['historical', 'scd']) # Validate key batches
)
result['batch_number'] = i
result['start_time'] = start_time
pipeline_results.append(result)
if result['success']:
phases = result['phases']
print(f" ✅ Completed in {result['total_duration']:.2f}s")
print(f" Extract: {phases['extract']['duration']:.2f}s")
print(f" Transform: {phases['transform']['duration']:.2f}s")
print(f" Load: {phases['load']['duration']:.2f}s")
if 'validation' in phases:
print(f" Validation: {phases['validation']['duration']:.2f}s")
else:
print(f" ❌ Failed: {result.get('error', 'Unknown error')}")
conn.close()
# Analyze pipeline performance
successful_batches = [r for r in pipeline_results if r['success']]
if successful_batches:
avg_time = sum(r['total_duration'] for r in successful_batches) / len(successful_batches)
print(f"\nPipeline Summary:")
print(f" Successful batches: {len(successful_batches)}/{len(pipeline_results)}")
print(f" Average execution time: {avg_time:.2f} seconds")
Pipeline Phase Details¶
Extract Phase¶
Generates source data files in specified formats
Handles multiple batch types simultaneously
Tracks file generation metrics
# Extract phase results
extract_results = pipeline_result['phases']['extract']
print(f"Files generated: {extract_results['files_generated']}")
print(f"Extract duration: {extract_results['duration']:.2f}s")
Transform Phase¶
Converts source data to staging format
Applies data type conversions and business rules
Handles schema mapping between formats
# Transform phase results
transform_results = pipeline_result['phases']['transform']
print(f"Records processed: {transform_results['records_processed']}")
print(f"Transformations applied: {len(transform_results['transformations_applied'])}")
print(f"Transform duration: {transform_results['duration']:.2f}s")
Load Phase¶
Inserts data into target warehouse tables
Handles SCD Type 2 logic for dimension tables
Maintains referential integrity
# Load phase results
load_results = pipeline_result['phases']['load']
print(f"Records loaded: {load_results['records_loaded']}")
print(f"Tables updated: {load_results['tables_updated']}")
print(f"Load duration: {load_results['duration']:.2f}s")
Data Validation and Quality¶
Built-in Validation Queries¶
The TPC-DI implementation includes systematic validation queries:
Validation Queries (V1-V5)¶
V1: Customer dimension validation
V2: Account dimension validation
V3: Trade fact validation
V4: Security dimension validation
V5: Cash balance validation
# Run individual validation queries
for query_id in ['V1', 'V2', 'V3']:
query_sql = tpcdi.get_query(query_id)
result = conn.execute(query_sql).fetchall()
print(f"Validation {query_id}: {len(result)} records")
Analytical Queries (A1-A6)¶
A1: Customer trading analysis
A2: Company performance analysis
A3: Broker commission analysis
A4: Portfolio analysis
A5: Market trend analysis
A6: Customer lifecycle analysis
# Run analytical queries
analytical_queries = ['A1', 'A2']
for query_id in analytical_queries:
query_sql = tpcdi.get_query(query_id)
result = conn.execute(query_sql).fetchall()
print(f"Analysis {query_id}: {len(result)} analysis records")
Comprehensive Data Quality Validation¶
# Run systematic validation
validation_results = tpcdi.validate_etl_results(conn)
print("Data Quality Results:")
print(f"Overall score: {validation_results['data_quality_score']:.1f}/100")
print(f"Validation queries: {len(validation_results['validation_queries'])}")
print(f"Quality issues: {len(validation_results['data_quality_issues'])}")
# Detailed validation breakdown
print("\nValidation Query Results:")
for query_id, result in validation_results['validation_queries'].items():
status = "✅" if result['success'] else "❌"
row_count = result.get('row_count', 'N/A')
print(f" {status} {query_id}: {row_count} rows")
# Data quality checks
print("\nData Quality Checks:")
for check_type in ['completeness_checks', 'consistency_checks', 'accuracy_checks']:
if check_type in validation_results:
print(f" {check_type.replace('_', ' ').title()}:")
checks = validation_results[check_type]
for check_name, check_result in checks.items():
if isinstance(check_result, dict) and 'error' not in check_result:
print(f" ✅ {check_name}")
else:
print(f" ❌ {check_name}: {check_result}")
Custom Data Quality Framework¶
class CustomDataQualityValidator:
"""Custom data quality validation framework."""
def __init__(self, connection):
self.connection = connection
def check_referential_integrity(self):
"""Check foreign key relationships."""
checks = {
'customer_account_fk': """
SELECT COUNT(*) FROM DimAccount a
LEFT JOIN DimCustomer c ON a.SK_CustomerID = c.SK_CustomerID
WHERE c.SK_CustomerID IS NULL
""",
'trade_customer_fk': """
SELECT COUNT(*) FROM FactTrade t
LEFT JOIN DimCustomer c ON t.SK_CustomerID = c.SK_CustomerID
WHERE c.SK_CustomerID IS NULL
"""
}
results = {}
for check_name, check_sql in checks.items():
violations = self.connection.execute(check_sql).fetchone()[0]
results[check_name] = {
'violations': violations,
'status': 'PASS' if violations == 0 else 'FAIL'
}
return results
def check_business_rules(self):
"""Check domain-specific business rules."""
rules = {
'positive_trade_prices': """
SELECT COUNT(*) FROM FactTrade WHERE TradePrice <= 0
""",
'valid_customer_tiers': """
SELECT COUNT(*) FROM DimCustomer WHERE Tier NOT IN (1, 2, 3)
""",
'current_records_no_end_date': """
SELECT COUNT(*) FROM DimCustomer
WHERE IsCurrent = 1 AND EndDate IS NOT NULL
"""
}
results = {}
for rule_name, rule_sql in rules.items():
violations = self.connection.execute(rule_sql).fetchone()[0]
results[rule_name] = {
'violations': violations,
'status': 'PASS' if violations == 0 else 'FAIL'
}
return results
def run_complete_validation(self):
"""Run all custom validation checks."""
return {
'referential_integrity': self.check_referential_integrity(),
'business_rules': self.check_business_rules(),
'timestamp': datetime.now().isoformat()
}
# Usage
validator = CustomDataQualityValidator(conn)
custom_results = validator.run_complete_validation()
print("Custom validation results:", custom_results)
Performance Monitoring¶
ETL Metrics Tracking¶
# Get current ETL status and metrics
etl_status = tpcdi.get_etl_status()
metrics = etl_status['metrics']
print("ETL Performance Metrics:")
print(f" Batches processed: {metrics['batches_processed']}")
print(f" Total processing time: {metrics['total_processing_time']:.2f}s")
print(f" Average processing time: {metrics['avg_processing_time']:.2f}s")
print(f" Error count: {metrics['error_count']}")
# Batch status tracking
print("\nBatch Status:")
for batch_type, status in metrics['batch_status'].items():
print(f" {batch_type}: {status['status']} ({status['records']} records)")
Performance Benchmarking¶
import time
def benchmark_etl_performance(tpcdi, connection, iterations=3):
"""Benchmark ETL performance across multiple iterations."""
results = []
batch_types = ['historical', 'incremental', 'scd']
for batch_type in batch_types:
batch_results = []
for i in range(iterations):
# Reset connection state
connection.execute("DELETE FROM DimCustomer")
connection.execute("DELETE FROM FactTrade")
start_time = time.time()
pipeline_result = tpcdi.run_etl_pipeline(
connection=connection,
batch_type=batch_type,
validate_data=False # Skip validation for pure performance
)
total_time = time.time() - start_time
if pipeline_result['success']:
phases = pipeline_result['phases']
records_processed = phases['transform']['records_processed']
batch_results.append({
'iteration': i + 1,
'total_time': total_time,
'extract_time': phases['extract']['duration'],
'transform_time': phases['transform']['duration'],
'load_time': phases['load']['duration'],
'records_processed': records_processed,
'throughput': records_processed / total_time if total_time > 0 else 0
})
if batch_results:
avg_results = {
'batch_type': batch_type,
'iterations': len(batch_results),
'avg_total_time': sum(r['total_time'] for r in batch_results) / len(batch_results),
'avg_throughput': sum(r['throughput'] for r in batch_results) / len(batch_results),
'avg_extract_time': sum(r['extract_time'] for r in batch_results) / len(batch_results),
'avg_transform_time': sum(r['transform_time'] for r in batch_results) / len(batch_results),
'avg_load_time': sum(r['load_time'] for r in batch_results) / len(batch_results)
}
results.append(avg_results)
return results
# Run performance benchmark
benchmark_results = benchmark_etl_performance(tpcdi, conn, iterations=3)
print("Performance Benchmark Results:")
for result in benchmark_results:
print(f"\n{result['batch_type'].title()} Batch:")
print(f" Average time: {result['avg_total_time']:.2f}s")
print(f" Average throughput: {result['avg_throughput']:.0f} records/sec")
print(f" Phase breakdown:")
print(f" Extract: {result['avg_extract_time']:.2f}s")
print(f" Transform: {result['avg_transform_time']:.2f}s")
print(f" Load: {result['avg_load_time']:.2f}s")
Memory and Resource Monitoring¶
import psutil
import threading
import time
class ResourceMonitor:
"""Monitor system resources during ETL execution."""
def __init__(self):
self.monitoring = False
self.metrics = []
def start_monitoring(self):
"""Start resource monitoring in background thread."""
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_resources)
self.monitor_thread.start()
def stop_monitoring(self):
"""Stop resource monitoring."""
self.monitoring = False
if hasattr(self, 'monitor_thread'):
self.monitor_thread.join()
def _monitor_resources(self):
"""Monitor CPU, memory, and disk usage."""
while self.monitoring:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
self.metrics.append({
'timestamp': time.time(),
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'memory_used_gb': memory.used / (1024**3),
'disk_used_percent': disk.percent
})
time.sleep(1)
def get_summary(self):
"""Get resource usage summary."""
if not self.metrics:
return {}
cpu_values = [m['cpu_percent'] for m in self.metrics]
memory_values = [m['memory_percent'] for m in self.metrics]
return {
'duration': len(self.metrics),
'avg_cpu': sum(cpu_values) / len(cpu_values),
'max_cpu': max(cpu_values),
'avg_memory': sum(memory_values) / len(memory_values),
'max_memory': max(memory_values),
'peak_memory_gb': max(m['memory_used_gb'] for m in self.metrics)
}
# Usage with resource monitoring
monitor = ResourceMonitor()
monitor.start_monitoring()
# Run ETL pipeline
pipeline_result = tpcdi.run_etl_pipeline(
connection=conn,
batch_type='historical',
validate_data=True
)
monitor.stop_monitoring()
# Get resource usage summary
resource_summary = monitor.get_summary()
print(f"Resource Usage Summary:")
print(f" Duration: {resource_summary['duration']} seconds")
print(f" Average CPU: {resource_summary['avg_cpu']:.1f}%")
print(f" Peak CPU: {resource_summary['max_cpu']:.1f}%")
print(f" Average Memory: {resource_summary['avg_memory']:.1f}%")
print(f" Peak Memory: {resource_summary['peak_memory_gb']:.2f} GB")
Features¶
Slowly Changing Dimension (SCD) Processing¶
The ETL implementation includes built-in SCD Type 2 logic for dimension tables:
# SCD Type 2 processing example
scd_result = tpcdi.run_etl_pipeline(
connection=conn,
batch_type='scd',
validate_data=True
)
# Verify SCD implementation
scd_validation_sql = """
SELECT
CustomerID,
COUNT(*) as versions,
SUM(CASE WHEN IsCurrent = 1 THEN 1 ELSE 0 END) as current_versions,
MIN(EffectiveDate) as first_effective_date,
MAX(COALESCE(EndDate, '9999-12-31')) as last_end_date
FROM DimCustomer
GROUP BY CustomerID
HAVING COUNT(*) > 1 -- Customers with multiple versions
ORDER BY versions DESC
LIMIT 10
"""
scd_results = conn.execute(scd_validation_sql).fetchall()
print("SCD Type 2 Implementation Verification:")
for result in scd_results:
customer_id, versions, current, first_date, last_date = result
print(f" Customer {customer_id}: {versions} versions, {current} current")
Multi-Database Support¶
# Test ETL across multiple database engines
databases = {
'sqlite': sqlite3.connect('warehouse_sqlite.db'),
'duckdb': duckdb.connect('warehouse_duckdb.db')
}
# Initialize schemas
for db_name, conn in databases.items():
schema_sql = tpcdi.get_create_tables_sql()
if db_name == 'sqlite':
conn.executescript(schema_sql)
else: # DuckDB
conn.execute("BEGIN TRANSACTION")
for statement in schema_sql.split(';'):
if statement.strip():
conn.execute(statement)
conn.execute("COMMIT")
# Run ETL on each database
results = {}
for db_name, conn in databases.items():
print(f"Running ETL on {db_name}...")
start_time = time.time()
pipeline_result = tpcdi.run_etl_pipeline(
connection=conn,
batch_type='historical',
validate_data=True
)
execution_time = time.time() - start_time
results[db_name] = {
'success': pipeline_result['success'],
'execution_time': execution_time,
'data_quality_score': pipeline_result.get('validation_results', {}).get('data_quality_score', 0)
}
# Compare results
print("\nDatabase Comparison:")
for db_name, result in results.items():
status = "✅" if result['success'] else "❌"
print(f" {status} {db_name}: {result['execution_time']:.2f}s, quality score {result['data_quality_score']:.1f}")
# Clean up connections
for conn in databases.values():
conn.close()
Custom Transformation Logic¶
class CustomETLTransformer:
"""Custom transformation logic for domain-specific requirements."""
def __init__(self, tpcdi):
self.tpcdi = tpcdi
def apply_custom_business_rules(self, staging_data):
"""Apply custom business rules to staging data."""
transformed_data = staging_data.copy()
# Example: Custom customer tier calculation
for record in transformed_data:
if 'customer' in record:
# Apply custom tier logic based on business rules
age = self._calculate_age(record.get('DOB', '1980-01-01'))
if age > 65:
record['Tier'] = 3 # Senior tier
elif age > 35:
record['Tier'] = 2 # Standard tier
else:
record['Tier'] = 1 # Basic tier
return transformed_data
def _calculate_age(self, dob_string):
"""Calculate age from date of birth string."""
from datetime import datetime
try:
dob = datetime.strptime(dob_string, '%Y-%m-%d')
today = datetime.now()
return today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day))
except:
return 40 # Default age
def validate_custom_rules(self, connection):
"""Validate custom business rules after ETL."""
validation_results = {}
# Custom validation: Check tier distribution
tier_distribution = connection.execute("""
SELECT Tier, COUNT(*) as count
FROM DimCustomer
WHERE IsCurrent = 1
GROUP BY Tier
ORDER BY Tier
""").fetchall()
validation_results['tier_distribution'] = {
'distribution': tier_distribution,
'has_all_tiers': len(tier_distribution) == 3
}
return validation_results
# Usage
custom_transformer = CustomETLTransformer(tpcdi)
# Run ETL with custom validation
pipeline_result = tpcdi.run_etl_pipeline(
connection=conn,
batch_type='historical',
validate_data=True
)
# Apply custom validation
custom_validation = custom_transformer.validate_custom_rules(conn)
print("Custom validation results:", custom_validation)
Integration Patterns¶
Apache Airflow Integration¶
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from benchbox import TPCDI
def extract_source_data(**context):
"""Extract data from source systems."""
tpcdi = TPCDI(scale_factor=1.0, etl_mode=True, output_dir=f"/data/tpcdi/{context['ds']}")
source_files = tpcdi.generate_source_data(
formats=['csv', 'xml', 'json'],
batch_types=['incremental']
)
return {
'source_files': source_files,
'extraction_date': context['ds']
}
def transform_and_load(**context):
"""Transform and load data into warehouse."""
import duckdb
# Get source files from previous task
ti = context['ti']
extract_output = ti.xcom_pull(task_ids='extract_source_data')
# Initialize TPC-DI and database connection
tpcdi = TPCDI(scale_factor=1.0, etl_mode=True, output_dir=f"/data/tpcdi/{context['ds']}")
conn = duckdb.connect(f"/data/warehouse/tpcdi_{context['ds_nodash']}.duckdb")
# Create schema if needed
schema_sql = tpcdi.get_create_tables_sql()
conn.execute("BEGIN TRANSACTION")
for statement in schema_sql.split(';'):
if statement.strip():
conn.execute(statement)
conn.execute("COMMIT")
# Run ETL pipeline
pipeline_result = tpcdi.run_etl_pipeline(
connection=conn,
batch_type='incremental',
validate_data=True
)
conn.close()
if not pipeline_result['success']:
raise Exception(f"ETL pipeline failed: {pipeline_result.get('error')}")
return {
'records_processed': pipeline_result['phases']['transform']['records_processed'],
'data_quality_score': pipeline_result['validation_results']['data_quality_score']
}
def validate_data_quality(**context):
"""Run systematic data quality validation."""
import duckdb
tpcdi = TPCDI(scale_factor=1.0, etl_mode=True)
conn = duckdb.connect(f"/data/warehouse/tpcdi_{context['ds_nodash']}.duckdb")
# Run validation queries
validation_results = tpcdi.validate_etl_results(conn)
conn.close()
# Check quality thresholds
quality_score = validation_results.get('data_quality_score', 0)
if quality_score < 80:
raise Exception(f"Data quality score {quality_score} below threshold of 80")
return validation_results
# Define DAG
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'tpcdi_etl_pipeline',
default_args=default_args,
description='TPC-DI ETL Pipeline',
schedule_interval='@daily',
catchup=False,
max_active_runs=1
)
# Define tasks
extract_task = PythonOperator(
task_id='extract_source_data',
python_callable=extract_source_data,
dag=dag
)
transform_load_task = PythonOperator(
task_id='transform_and_load',
python_callable=transform_and_load,
dag=dag
)
validate_task = PythonOperator(
task_id='validate_data_quality',
python_callable=validate_data_quality,
dag=dag
)
# Archive successful runs
archive_task = BashOperator(
task_id='archive_data',
bash_command='tar -czf /archive/tpcdi_{{ ds_nodash }}.tar.gz /data/tpcdi/{{ ds }}/',
dag=dag
)
# Set task dependencies
extract_task >> transform_load_task >> validate_task >> archive_task
Prefect Integration¶
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import duckdb
from benchbox import TPCDI
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract_tpcdi_data(scale_factor: float, batch_type: str):
"""Extract TPC-DI source data."""
tpcdi = TPCDI(scale_factor=scale_factor, etl_mode=True, output_dir=f"prefect_etl_{batch_type}")
source_files = tpcdi.generate_source_data(
formats=['csv', 'xml', 'json'],
batch_types=[batch_type]
)
return {
'tpcdi_instance': tpcdi,
'source_files': source_files,
'batch_type': batch_type
}
@task
def setup_warehouse_schema(database_path: str):
"""Set up warehouse schema."""
from pathlib import Path
# Remove existing database
db_path = Path(database_path)
if db_path.exists():
db_path.unlink()
# Create new database with schema
tpcdi = TPCDI(scale_factor=1.0)
conn = duckdb.connect(database_path)
schema_sql = tpcdi.get_create_tables_sql()
conn.execute("BEGIN TRANSACTION")
for statement in schema_sql.split(';'):
if statement.strip():
conn.execute(statement)
conn.execute("COMMIT")
conn.close()
return database_path
@task
def run_etl_pipeline(extract_result: dict, database_path: str):
"""Run the ETL pipeline."""
tpcdi = extract_result['tpcdi_instance']
batch_type = extract_result['batch_type']
conn = duckdb.connect(database_path)
pipeline_result = tpcdi.run_etl_pipeline(
connection=conn,
batch_type=batch_type,
validate_data=True
)
conn.close()
if not pipeline_result['success']:
raise Exception(f"ETL pipeline failed: {pipeline_result.get('error')}")
return pipeline_result
@task
def validate_and_report(pipeline_result: dict):
"""Validate results and generate report."""
phases = pipeline_result['phases']
validation = pipeline_result.get('validation_results', {})
report = {
'success': pipeline_result['success'],
'total_duration': pipeline_result['total_duration'],
'records_processed': phases['transform']['records_processed'],
'records_loaded': phases['load']['records_loaded'],
'data_quality_score': validation.get('data_quality_score', 0),
'validation_queries_passed': len([
q for q in validation.get('validation_queries', {}).values()
if q.get('success', False)
])
}
return report
@flow(name="TPC-DI ETL Flow")
def tpcdi_etl_flow(scale_factor: float = 0.1, batch_type: str = "historical"):
"""Main TPC-DI ETL flow."""
# Extract source data
extract_result = extract_tpcdi_data(scale_factor, batch_type)
# Set up warehouse
database_path = setup_warehouse_schema(f"warehouse_{batch_type}.duckdb")
# Run ETL pipeline
pipeline_result = run_etl_pipeline(extract_result, database_path)
# Validate and report
final_report = validate_and_report(pipeline_result)
return final_report
# Run the flow
if __name__ == "__main__":
result = tpcdi_etl_flow(scale_factor=0.1, batch_type="historical")
print("ETL Flow Results:", result)
dbt Integration¶
{{ config(materialized='view') }}
SELECT
CustomerID,
TaxID,
Status,
LastName,
FirstName,
MiddleInitial,
Gender,
CAST(Tier AS INTEGER) as Tier,
CAST(DOB AS DATE) as DOB,
AddressLine1,
City,
StateProv,
PostalCode,
Country,
Phone1,
Email1,
-- Add data quality flags
CASE
WHEN LastName IS NULL OR FirstName IS NULL THEN 0
ELSE 1
END as name_complete_flag,
CASE
WHEN Email1 LIKE '%@%' THEN 1
ELSE 0
END as email_valid_flag,
-- Add audit columns
1 as IsCurrent,
1 as BatchID,
CURRENT_DATE as EffectiveDate,
NULL as EndDate
FROM {{ source('tpcdi_raw', 'customers_historical') }}
WHERE Status IS NOT NULL
-- models/marts/dim_customer.sql
-- Final customer dimension with SCD Type 2 logic
{{ config(
materialized='incremental',
unique_key='SK_CustomerID',
on_schema_change='fail'
) }}
WITH customer_changes AS (
SELECT
*,
LAG(LastName) OVER (PARTITION BY CustomerID ORDER BY EffectiveDate) as prev_last_name,
LAG(AddressLine1) OVER (PARTITION BY CustomerID ORDER BY EffectiveDate) as prev_address
FROM {{ ref('stg_tpcdi_customers') }}
{% if is_incremental() %}
WHERE EffectiveDate > (SELECT MAX(EffectiveDate) FROM {{ this }})
{% endif %}
),
scd_logic AS (
SELECT
*,
CASE
WHEN prev_last_name IS NULL OR prev_address IS NULL THEN 1 -- New record
WHEN prev_last_name != LastName OR prev_address != AddressLine1 THEN 1 -- Changed record
ELSE 0 -- No change
END as is_new_version
FROM customer_changes
)
SELECT
ROW_NUMBER() OVER (ORDER BY CustomerID, EffectiveDate) as SK_CustomerID,
CustomerID,
TaxID,
Status,
LastName,
FirstName,
MiddleInitial,
Gender,
Tier,
DOB,
AddressLine1,
City,
StateProv,
PostalCode,
Country,
Phone1,
Email1,
IsCurrent,
BatchID,
EffectiveDate,
EndDate
FROM scd_logic
WHERE is_new_version = 1
# dbt_project.yml
name: 'tpcdi_etl'
version: '1.0.0'
model-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_modules"
models:
tpcdi_etl:
staging:
+materialized: view
+docs:
node_color: "lightblue"
marts:
+materialized: table
+docs:
node_color: "lightgreen"
sources:
- name: tpcdi_raw
description: "Raw TPC-DI source data"
tables:
- name: customers_historical
description: "Historical customer data from TPC-DI ETL"
columns:
- name: CustomerID
description: "Unique customer identifier"
tests:
- not_null
- unique
- name: LastName
description: "Customer last name"
tests:
- not_null
- name: Email1
description: "Primary email address"
tests:
- email_format
Troubleshooting¶
Common Issues and Solutions¶
1. ETL Mode Not Enabled Error¶
Error: ValueError: ETL mode must be enabled to generate source data
Solution:
# Incorrect - ETL mode not enabled
tpcdi = TPCDI(scale_factor=1.0) # etl_mode=False by default
# Correct - Enable ETL mode
tpcdi = TPCDI(scale_factor=1.0, etl_mode=True)
2. Database Connection Errors¶
Error: Database schema not found or table does not exist
Solution:
# Always create schema before running ETL
schema_sql = tpcdi.get_create_tables_sql()
if isinstance(connection, sqlite3.Connection):
connection.executescript(schema_sql)
else: # DuckDB
connection.execute("BEGIN TRANSACTION")
for statement in schema_sql.split(';'):
if statement.strip():
connection.execute(statement)
connection.execute("COMMIT")
3. Memory Issues with Large Scale Factors¶
Error: Out of memory during ETL processing
Solutions:
# Solution 1: Reduce scale factor
tpcdi = TPCDI(scale_factor=0.1, etl_mode=True) # Instead of 1.0+
# Solution 2: Use in-memory database for testing
conn = duckdb.connect(':memory:')
# Solution 3: Process in smaller batches
# Split large batches into smaller incremental batches
for i in range(5):
result = tpcdi.run_etl_pipeline(
connection=conn,
batch_type='incremental',
validate_data=False
)
4. Data Quality Validation Failures¶
Error: Low data quality scores or validation failures
Investigation:
# Run detailed validation to identify issues
validation_results = tpcdi.validate_etl_results(conn)
# Check specific validation queries
for query_id, result in validation_results['validation_queries'].items():
if not result['success']:
print(f"Failed validation {query_id}: {result.get('error', 'Unknown error')}")
# Check data quality issues
for issue in validation_results['data_quality_issues']:
print(f"Quality issue: {issue}")
# Check completeness
for table, completeness in validation_results['completeness_checks'].items():
if 'error' in completeness:
print(f"Completeness check failed for {table}: {completeness['error']}")
5. Performance Issues¶
Symptoms: ETL pipeline takes too long to complete
Solutions:
# Solution 1: Skip validation for performance testing
pipeline_result = tpcdi.run_etl_pipeline(
connection=conn,
batch_type='historical',
validate_data=False # Skip time-consuming validation
)
# Solution 2: Use faster database engine
# Use DuckDB instead of SQLite for better performance
conn = duckdb.connect(':memory:') # In-memory for fastest performance
# Solution 3: Monitor resource usage
import psutil
process = psutil.Process()
print(f"Memory usage: {process.memory_info().rss / 1024 / 1024:.1f} MB")
print(f"CPU usage: {process.cpu_percent():.1f}%")
Debugging ETL Pipeline Issues¶
Enable Detailed Logging¶
import logging
# Set up detailed logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('tpcdi_etl_debug.log'),
logging.StreamHandler()
]
)
# Initialize TPC-DI with verbose output
tpcdi = TPCDI(scale_factor=0.1, etl_mode=True, verbose=True)
Inspect Generated Source Files¶
# Check generated source files
source_files = tpcdi.generate_source_data(
formats=['csv'],
batch_types=['historical']
)
for format_type, files in source_files.items():
for file_path in files:
print(f"Inspecting {file_path}:")
# Check file size
file_size = Path(file_path).stat().st_size
print(f" Size: {file_size:,} bytes")
# Check file content
with open(file_path, 'r') as f:
lines = f.readlines()
print(f" Lines: {len(lines)}")
print(f" Header: {lines[0].strip() if lines else 'Empty file'}")
if len(lines) > 1:
print(f" Sample: {lines[1].strip()}")
Validate Database State¶
def inspect_database_state(connection):
"""Inspect database state for debugging."""
# Check table existence
tables = ['DimCustomer', 'FactTrade', 'DimAccount']
for table in tables:
try:
cursor = connection.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
print(f"{table}: {count} rows")
# Check sample data
cursor = connection.execute(f"SELECT * FROM {table} LIMIT 3")
sample_rows = cursor.fetchall()
print(f" Sample rows: {len(sample_rows)}")
except Exception as e:
print(f"{table}: Error - {e}")
# Usage
inspect_database_state(conn)
Error Recovery Patterns¶
Graceful Pipeline Recovery¶
def robust_etl_pipeline(tpcdi, connection, batch_type, max_retries=3):
"""Run ETL pipeline with retry logic and error recovery."""
for attempt in range(max_retries):
try:
print(f"ETL attempt {attempt + 1}/{max_retries}")
# Run ETL pipeline
pipeline_result = tpcdi.run_etl_pipeline(
connection=connection,
batch_type=batch_type,
validate_data=True
)
if pipeline_result['success']:
print(f"ETL successful on attempt {attempt + 1}")
return pipeline_result
else:
print(f"ETL failed on attempt {attempt + 1}: {pipeline_result.get('error')}")
# Clean up partial data before retry
if attempt < max_retries - 1:
print("Cleaning up for retry...")
connection.execute("DELETE FROM DimCustomer WHERE BatchID = 1")
connection.execute("DELETE FROM FactTrade WHERE 1=1")
connection.commit()
except Exception as e:
print(f"ETL exception on attempt {attempt + 1}: {e}")
# Clean up on exception
if attempt < max_retries - 1:
try:
connection.rollback()
except:
pass
raise Exception(f"ETL failed after {max_retries} attempts")
# Usage
try:
result = robust_etl_pipeline(tpcdi, conn, 'historical')
print("ETL completed successfully with recovery")
except Exception as e:
print(f"ETL failed permanently: {e}")
Best Practices¶
1. Environment Configuration¶
Development Environment¶
# Development setup - fast iteration
tpcdi_dev = TPCDI(
scale_factor=0.01, # Minimal data for fast testing
output_dir="dev_etl",
etl_mode=True,
verbose=True # Enable detailed logging
)
# Use in-memory database for speed
conn = duckdb.connect(':memory:')
Testing Environment¶
# Testing setup - systematic validation
tpcdi_test = TPCDI(
scale_factor=0.1, # Reasonable data size
output_dir="test_etl",
etl_mode=True,
verbose=False # Reduce log noise in tests
)
# Use persistent database for test reproducibility
conn = duckdb.connect('test_warehouse.duckdb')
Production-like Environment¶
# Production setup - full scale validation
tpcdi_prod = TPCDI(
scale_factor=1.0, # Full scale factor
output_dir="/data/etl/tpcdi",
etl_mode=True,
verbose=False
)
# Use production database connection
conn = your_production_db_connection()
2. Data Quality Best Practices¶
Comprehensive Validation Strategy¶
def systematic_etl_validation(tpcdi, connection):
"""Implement systematic ETL validation strategy."""
validation_results = {
'pre_etl_checks': {},
'post_etl_validation': {},
'business_rule_validation': {},
'summary': {}
}
# Pre-ETL checks
print("Running pre-ETL environment checks...")
validation_results['pre_etl_checks'] = {
'database_accessible': check_database_connection(connection),
'schema_exists': check_schema_exists(connection),
'sufficient_space': check_disk_space(),
'dependencies_available': check_dependencies()
}
# Run ETL pipeline
pipeline_result = tpcdi.run_etl_pipeline(
connection=connection,
batch_type='historical',
validate_data=True
)
# Post-ETL validation
if pipeline_result['success']:
print("Running post-ETL validation...")
validation_results['post_etl_validation'] = tpcdi.validate_etl_results(connection)
# Additional business rule validation
validation_results['business_rule_validation'] = validate_business_rules(connection)
# Calculate overall validation score
quality_score = validation_results['post_etl_validation']['data_quality_score']
business_score = calculate_business_rule_score(validation_results['business_rule_validation'])
validation_results['summary'] = {
'overall_success': pipeline_result['success'],
'data_quality_score': quality_score,
'business_rule_score': business_score,
'combined_score': (quality_score + business_score) / 2
}
return validation_results
def check_database_connection(connection):
"""Check if database connection is working."""
try:
connection.execute("SELECT 1")
return True
except:
return False
def check_schema_exists(connection):
"""Check if required schema exists."""
try:
connection.execute("SELECT COUNT(*) FROM DimCustomer LIMIT 1")
return True
except:
return False
def check_disk_space():
"""Check available disk space."""
import shutil
total, used, free = shutil.disk_usage("/")
return free > 1024**3 # At least 1GB free
def check_dependencies():
"""Check if required dependencies are available."""
try:
import duckdb
import sqlite3
return True
except ImportError:
return False
def validate_business_rules(connection):
"""Validate domain-specific business rules."""
rules = {
'positive_prices': connection.execute("SELECT COUNT(*) FROM FactTrade WHERE TradePrice > 0").fetchone()[0],
'valid_tiers': connection.execute("SELECT COUNT(*) FROM DimCustomer WHERE Tier IN (1,2,3)").fetchone()[0],
'current_flags': connection.execute("SELECT COUNT(*) FROM DimCustomer WHERE IsCurrent IN (0,1)").fetchone()[0]
}
return rules
def calculate_business_rule_score(business_rules):
"""Calculate business rule compliance score."""
total_rules = len(business_rules)
passed_rules = sum(1 for value in business_rules.values() if value > 0)
return (passed_rules / total_rules) * 100 if total_rules > 0 else 0
3. Performance Optimization¶
Batch Processing Optimization¶
def configured_batch_processing(tpcdi, connection, batch_types):
"""Implement configured batch processing strategy."""
# Pre-warm database connections and caches
connection.execute("PRAGMA cache_size = 100000") # SQLite optimization
# Process batches in appropriate order
ordered_batches = ['historical', 'incremental', 'incremental', 'scd']
performance_metrics = []
for i, batch_type in enumerate(ordered_batches):
print(f"Processing batch {i+1}/{len(ordered_batches)}: {batch_type}")
# Measure performance
start_time = time.time()
start_memory = psutil.Process().memory_info().rss
# Run ETL with configured settings
pipeline_result = tpcdi.run_etl_pipeline(
connection=connection,
batch_type=batch_type,
validate_data=(batch_type == 'historical') # Only validate historical
)
end_time = time.time()
end_memory = psutil.Process().memory_info().rss
# Record metrics
if pipeline_result['success']:
phases = pipeline_result['phases']
metrics = {
'batch_type': batch_type,
'batch_number': i + 1,
'total_time': end_time - start_time,
'memory_delta': (end_memory - start_memory) / 1024 / 1024, # MB
'records_processed': phases['transform']['records_processed'],
'records_loaded': phases['load']['records_loaded'],
'throughput': phases['transform']['records_processed'] / (end_time - start_time)
}
performance_metrics.append(metrics)
# Optimize between batches
if i < len(ordered_batches) - 1:
connection.execute("VACUUM") # Optimize database
time.sleep(0.1) # Brief pause for system recovery
return performance_metrics
# Usage
performance_results = configured_batch_processing(tpcdi, conn, ordered_batches)
# Analyze performance trends
print("Performance Analysis:")
for metrics in performance_results:
print(f" Batch {metrics['batch_number']} ({metrics['batch_type']}):")
print(f" Time: {metrics['total_time']:.2f}s")
print(f" Throughput: {metrics['throughput']:.0f} records/sec")
print(f" Memory: {metrics['memory_delta']:.1f} MB")
4. Error Handling and Monitoring¶
Production-Grade Error Handling¶
import logging
from datetime import datetime
from pathlib import Path
class ETLMonitor:
"""Production-grade ETL monitoring and error handling."""
def __init__(self, log_dir="etl_logs"):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(exist_ok=True)
# Set up logging
self.logger = logging.getLogger('tpcdi_etl')
self.logger.setLevel(logging.INFO)
# File handler
log_file = self.log_dir / f"etl_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def run_monitored_etl(self, tpcdi, connection, batch_type):
"""Run ETL with systematic monitoring."""
run_id = datetime.now().strftime('%Y%m%d_%H%M%S')
self.logger.info(f"Starting ETL run {run_id} for batch type {batch_type}")
try:
# Pre-flight checks
self._pre_flight_checks(connection)
# Run ETL pipeline
start_time = time.time()
pipeline_result = tpcdi.run_etl_pipeline(
connection=connection,
batch_type=batch_type,
validate_data=True
)
execution_time = time.time() - start_time
# Log results
if pipeline_result['success']:
self.logger.info(f"ETL run {run_id} completed successfully in {execution_time:.2f}s")
# Log phase details
phases = pipeline_result['phases']
self.logger.info(f"Phase timings - Extract: {phases['extract']['duration']:.2f}s, "
f"Transform: {phases['transform']['duration']:.2f}s, "
f"Load: {phases['load']['duration']:.2f}s")
# Log data quality
if 'validation_results' in pipeline_result:
quality_score = pipeline_result['validation_results']['data_quality_score']
self.logger.info(f"Data quality score: {quality_score:.1f}/100")
# Post-execution validation
self._post_execution_validation(connection, pipeline_result)
else:
self.logger.error(f"ETL run {run_id} failed: {pipeline_result.get('error')}")
self._handle_etl_failure(connection, pipeline_result)
return pipeline_result
except Exception as e:
self.logger.error(f"ETL run {run_id} failed with exception: {str(e)}", exc_info=True)
self._handle_etl_exception(connection, e)
raise
def _pre_flight_checks(self, connection):
"""Run pre-flight checks before ETL."""
self.logger.info("Running pre-flight checks...")
# Check database connectivity
try:
connection.execute("SELECT 1")
self.logger.info("✅ Database connectivity confirmed")
except Exception as e:
self.logger.error(f"❌ Database connectivity failed: {e}")
raise
# Check schema exists
try:
connection.execute("SELECT COUNT(*) FROM DimCustomer LIMIT 1")
self.logger.info("✅ Schema validation passed")
except Exception as e:
self.logger.error(f"❌ Schema validation failed: {e}")
raise
# Check disk space
import shutil
total, used, free = shutil.disk_usage("/")
free_gb = free / (1024**3)
if free_gb > 1:
self.logger.info(f"✅ Sufficient disk space: {free_gb:.1f} GB available")
else:
self.logger.warning(f"⚠️ Low disk space: {free_gb:.1f} GB available")
def _post_execution_validation(self, connection, pipeline_result):
"""Run post-execution validation."""
self.logger.info("Running post-execution validation...")
# Check record counts
try:
customer_count = connection.execute("SELECT COUNT(*) FROM DimCustomer").fetchone()[0]
trade_count = connection.execute("SELECT COUNT(*) FROM FactTrade").fetchone()[0]
self.logger.info(f"Record counts - Customers: {customer_count}, Trades: {trade_count}")
if customer_count == 0:
self.logger.warning("⚠️ No customer records found after ETL")
if trade_count == 0:
self.logger.warning("⚠️ No trade records found after ETL")
except Exception as e:
self.logger.error(f"Post-execution validation failed: {e}")
def _handle_etl_failure(self, connection, pipeline_result):
"""Handle ETL failure."""
self.logger.info("Handling ETL failure...")
# Log detailed error information
error_msg = pipeline_result.get('error', 'Unknown error')
self.logger.error(f"Detailed error: {error_msg}")
# Save error state for debugging
error_file = self.log_dir / f"error_state_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
import json
with open(error_file, 'w') as f:
json.dump(pipeline_result, f, indent=2, default=str)
self.logger.info(f"Error state saved to {error_file}")
def _handle_etl_exception(self, connection, exception):
"""Handle ETL exception."""
self.logger.info("Handling ETL exception...")
# Attempt to rollback any partial transactions
try:
connection.rollback()
self.logger.info("Transaction rolled back successfully")
except:
self.logger.warning("Failed to rollback transaction")
# Usage
monitor = ETLMonitor(log_dir="production_etl_logs")
try:
result = monitor.run_monitored_etl(tpcdi, conn, 'historical')
print("ETL completed with monitoring")
except Exception as e:
print(f"ETL failed: {e}")
5. Testing and Quality Assurance¶
Comprehensive Test Suite¶
import unittest
from pathlib import Path
import tempfile
import duckdb
from benchbox import TPCDI
class TPCDIETLTestSuite(unittest.TestCase):
"""Comprehensive test suite for TPC-DI ETL implementation."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.tpcdi = TPCDI(
scale_factor=0.01, # Minimal scale for fast testing
output_dir=self.temp_dir,
etl_mode=True,
verbose=False
)
# Create in-memory database for testing
self.conn = duckdb.connect(':memory:')
schema_sql = self.tpcdi.get_create_tables_sql()
self.conn.execute("BEGIN TRANSACTION")
for statement in schema_sql.split(';'):
if statement.strip():
self.conn.execute(statement)
self.conn.execute("COMMIT")
def tearDown(self):
"""Clean up test environment."""
if self.conn:
self.conn.close()
# Clean up temporary files
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_etl_mode_initialization(self):
"""Test ETL mode initialization."""
# Test ETL mode enabled
etl_status = self.tpcdi.get_etl_status()
self.assertTrue(etl_status['etl_mode_enabled'])
self.assertIn('csv', etl_status['supported_formats'])
self.assertIn('historical', etl_status['batch_types'])
def test_source_data_generation(self):
"""Test source data generation in multiple formats."""
source_files = self.tpcdi.generate_source_data(
formats=['csv', 'xml'],
batch_types=['historical']
)
# Verify files were generated
self.assertIn('csv', source_files)
self.assertIn('xml', source_files)
# Verify files exist and have content
for format_type, files in source_files.items():
for file_path in files:
file_obj = Path(file_path)
self.assertTrue(file_obj.exists())
self.assertGreater(file_obj.stat().st_size, 0)
def test_historical_etl_pipeline(self):
"""Test historical ETL pipeline execution."""
pipeline_result = self.tpcdi.run_etl_pipeline(
connection=self.conn,
batch_type='historical',
validate_data=True
)
# Verify pipeline success
self.assertTrue(pipeline_result['success'])
self.assertIn('phases', pipeline_result)
self.assertIn('validation_results', pipeline_result)
# Verify phases completed
phases = pipeline_result['phases']
self.assertIn('extract', phases)
self.assertIn('transform', phases)
self.assertIn('load', phases)
self.assertIn('validation', phases)
# Verify data was loaded
customer_count = self.conn.execute("SELECT COUNT(*) FROM DimCustomer").fetchone()[0]
self.assertGreater(customer_count, 0)
def test_incremental_etl_pipeline(self):
"""Test incremental ETL pipeline execution."""
# Run historical first
self.tpcdi.run_etl_pipeline(
connection=self.conn,
batch_type='historical',
validate_data=False
)
# Get initial count
initial_count = self.conn.execute("SELECT COUNT(*) FROM DimCustomer").fetchone()[0]
# Run incremental
pipeline_result = self.tpcdi.run_etl_pipeline(
connection=self.conn,
batch_type='incremental',
validate_data=False
)
# Verify incremental success
self.assertTrue(pipeline_result['success'])
# Verify data was added (should be more records)
final_count = self.conn.execute("SELECT COUNT(*) FROM DimCustomer").fetchone()[0]
self.assertGreaterEqual(final_count, initial_count)
def test_data_validation(self):
"""Test systematic data validation."""
# Run ETL pipeline
pipeline_result = self.tpcdi.run_etl_pipeline(
connection=self.conn,
batch_type='historical',
validate_data=True
)
# Verify validation results
validation_results = pipeline_result['validation_results']
self.assertIn('data_quality_score', validation_results)
self.assertIn('validation_queries', validation_results)
# Data quality score should be reasonable
quality_score = validation_results['data_quality_score']
self.assertGreaterEqual(quality_score, 0)
self.assertLessEqual(quality_score, 100)
# Validation queries should execute
validation_queries = validation_results['validation_queries']
self.assertGreater(len(validation_queries), 0)
def test_scd_implementation(self):
"""Test SCD Type 2 implementation."""
# Run historical load
self.tpcdi.run_etl_pipeline(
connection=self.conn,
batch_type='historical',
validate_data=False
)
# Run SCD batch
pipeline_result = self.tpcdi.run_etl_pipeline(
connection=self.conn,
batch_type='scd',
validate_data=False
)
self.assertTrue(pipeline_result['success'])
# Verify SCD implementation
# Check for customers with multiple versions
multi_version_customers = self.conn.execute("""
SELECT CustomerID, COUNT(*) as versions
FROM DimCustomer
GROUP BY CustomerID
HAVING COUNT(*) > 1
""").fetchall()
# Should have some customers with multiple versions
self.assertGreaterEqual(len(multi_version_customers), 0)
# Verify current flags are correct
current_violations = self.conn.execute("""
SELECT CustomerID, COUNT(*) as current_count
FROM DimCustomer
WHERE IsCurrent = 1
GROUP BY CustomerID
HAVING COUNT(*) > 1
""").fetchall()
# Should not have multiple current records per customer
self.assertEqual(len(current_violations), 0)
def test_error_handling(self):
"""Test error handling scenarios."""
# Test with invalid batch type
with self.assertRaises(Exception):
self.tpcdi.run_etl_pipeline(
connection=self.conn,
batch_type='invalid_batch_type',
validate_data=False
)
# Test with closed connection
bad_conn = duckdb.connect(':memory:')
bad_conn.close()
pipeline_result = self.tpcdi.run_etl_pipeline(
connection=bad_conn,
batch_type='historical',
validate_data=False
)
# Should fail gracefully
self.assertFalse(pipeline_result['success'])
self.assertIn('error', pipeline_result)
def test_performance_within_limits(self):
"""Test that ETL performance is within acceptable limits."""
import time
start_time = time.time()
pipeline_result = self.tpcdi.run_etl_pipeline(
connection=self.conn,
batch_type='historical',
validate_data=True
)
execution_time = time.time() - start_time
# Verify success
self.assertTrue(pipeline_result['success'])
# Verify execution time is reasonable (adjust based on environment)
self.assertLess(execution_time, 30) # Should complete within 30 seconds
# Verify phase timing breakdown
phases = pipeline_result['phases']
total_phase_time = (
phases['extract']['duration'] +
phases['transform']['duration'] +
phases['load']['duration'] +
phases.get('validation', {}).get('duration', 0)
)
# Total phase time should be close to overall execution time
self.assertLessEqual(abs(total_phase_time - pipeline_result['total_duration']), 1.0)
def run_test_suite():
"""Run the complete TPC-DI ETL test suite."""
# Create test suite
suite = unittest.TestLoader().loadTestsFromTestCase(TPCDIETLTestSuite)
# Run tests with verbose output
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
# Return test results
return {
'tests_run': result.testsRun,
'failures': len(result.failures),
'errors': len(result.errors),
'success_rate': (result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun if result.testsRun > 0 else 0
}
# Run test suite
if __name__ == "__main__":
test_results = run_test_suite()
print(f"\nTest Suite Results:")
print(f"Tests run: {test_results['tests_run']}")
print(f"Failures: {test_results['failures']}")
print(f"Errors: {test_results['errors']}")
print(f"Success rate: {test_results['success_rate']:.1%}")
Conclusion¶
The TPC-DI ETL implementation in BenchBox provides a systematic framework for testing data integration scenarios in enterprise environments. By following the patterns and best practices outlined in this guide, you can:
Implement robust ETL pipelines with proper error handling and monitoring
Validate data quality thoroughly using built-in and custom validation rules
Optimize performance across different database backends and scale factors
Integrate with orchestration tools like Airflow, Prefect, and dbt
Test SCD implementations with proper Type 2 slowly changing dimension logic
Monitor and debug ETL processes with detailed logging and metrics
The ETL mode maintains full backwards compatibility while providing advanced capabilities for modern data integration testing scenarios. Whether you’re validating a new ETL pipeline, benchmarking database performance, or testing data quality frameworks, the TPC-DI ETL implementation provides the tools and patterns needed for systematic data integration testing.
For additional examples and advanced usage patterns, refer to the example files and integration documentation provided with BenchBox.