Custom Benchmarks

Tags advanced guide custom-benchmark python-api

Complete guide to creating, implementing, and integrating custom benchmarks with the BenchBox framework.


Creating Custom Benchmarks

Basic Custom Benchmark

The simplest way to create a custom benchmark is to inherit from BaseBenchmark:

from benchbox.base import BaseBenchmark
from pathlib import Path
from typing import Dict, Union, Optional
import csv
import random

class SimpleBenchmark(BaseBenchmark):
    """Simple custom benchmark for demonstration."""

    def __init__(self, scale_factor: float = 1.0, output_dir: Optional[Path] = None):
        super().__init__(scale_factor, output_dir)

        # Define queries
        self._queries = {
            "count_all": "SELECT COUNT(*) FROM test_table",
            "sum_values": "SELECT SUM(value) FROM test_table",
            "avg_by_category": "SELECT category, AVG(value) FROM test_table GROUP BY category",
            "top_values": "SELECT * FROM test_table ORDER BY value DESC LIMIT 10"
        }

        # Define schema
        self._schema = {
            "test_table": {
                "columns": [
                    {"name": "id", "type": "INTEGER", "primary_key": True},
                    {"name": "category", "type": "VARCHAR(10)", "nullable": False},
                    {"name": "value", "type": "DOUBLE", "nullable": False},
                    {"name": "created_at", "type": "TIMESTAMP", "nullable": True}
                ]
            }
        }

    def generate_data(self) -> Dict[str, Path]:
        """Generate simple test data."""
        self.output_dir.mkdir(parents=True, exist_ok=True)

        # Calculate row count based on scale factor
        base_rows = 1000
        row_count = int(base_rows * self.scale_factor)

        # Generate test_table data
        test_file = self.output_dir / "test_table.csv"

        categories = ['A', 'B', 'C', 'D']

        with open(test_file, 'w', newline='') as f:
            writer = csv.writer(f, delimiter='|')

            for i in range(row_count):
                category = random.choice(categories)
                value = random.uniform(1.0, 1000.0)
                created_at = f"2023-01-{(i % 30) + 1:02d} 12:00:00"

                writer.writerow([i, category, value, created_at])

        return {"test_table": test_file}

    def get_queries(self) -> Dict[Union[int, str], str]:
        """Get all benchmark queries."""
        return self._queries.copy()

    def get_query(self, query_id: Union[int, str]) -> str:
        """Get specific query by ID."""
        if query_id not in self._queries:
            raise ValueError(f"Query '{query_id}' not found")
        return self._queries[query_id]

    def get_create_tables_sql(self) -> str:
        """Get DDL statements for benchmark tables."""
        return """
        CREATE TABLE test_table (
            id INTEGER PRIMARY KEY,
            category VARCHAR(10) NOT NULL,
            value DOUBLE NOT NULL,
            created_at TIMESTAMP
        );
        """

    def get_table_names(self) -> list:
        """Get list of table names."""
        return list(self._schema.keys())

# Usage example
def test_simple_benchmark():
    """Test the simple custom benchmark."""
    import duckdb

    # Initialize benchmark
    benchmark = SimpleBenchmark(scale_factor=0.1)

    # Generate data
    data_files = benchmark.generate_data()
    print(f"Generated data: {list(data_files.keys())}")

    # Setup database
    conn = duckdb.connect(":memory:")

    # Create tables
    ddl = benchmark.get_create_tables_sql("duckdb")
    conn.execute(ddl)

    # Load data
    for file_path in data_files:
    table_name = file_path.stem  # Get filename without extension
        conn.execute(f"""
            INSERT INTO {table_name}
            SELECT * FROM read_csv('{file_path}', header=false, delimiter='|')
        """)

        # Check row count
        count = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
        print(f"Loaded {count} rows into {table_name}")

    # Run queries
    queries = benchmark.get_queries()
    for query_id, query_sql in queries.items():
        result = conn.execute(query_sql).fetchall()
        print(f"{query_id}: {result}")

    conn.close()

if __name__ == "__main__":
    test_simple_benchmark()

Benchmark Architecture

Advanced-level Benchmark Structure

For more complex benchmarks, use a modular architecture:

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Any, Optional
import json

@dataclass
class TableSchema:
    """Schema definition for a benchmark table."""
    name: str
    columns: List[Dict[str, Any]]
    primary_key: Optional[List[str]] = None
    foreign_keys: Optional[List[Dict[str, Any]]] = None
    indexes: Optional[List[Dict[str, Any]]] = None

@dataclass
class QueryMetadata:
    """Metadata for benchmark queries."""
    query_id: str
    name: str
    description: str
    category: str
    complexity: str  # 'simple', 'medium', 'complex'
    sql_features: List[str]
    estimated_runtime_ms: Optional[int] = None

class DataGenerator(ABC):
    """Abstract base class for data generators."""

    @abstractmethod
    def generate_table_data(self, table_schema: TableSchema, scale_factor: float, output_path: Path) -> Path:
        """Generate data for a specific table."""
        pass

class QueryManager(ABC):
    """Abstract base class for query managers."""

    @abstractmethod
    def get_query_metadata(self, query_id: str) -> QueryMetadata:
        """Get metadata for a specific query."""
        pass

    @abstractmethod
    def get_queries_by_category(self, category: str) -> Dict[str, str]:
        """Get queries filtered by category."""
        pass

class AdvancedBenchmark(BaseBenchmark):
    """Advanced-level benchmark with modular architecture."""

    def __init__(
        self,
        scale_factor: float = 1.0,
        output_dir: Optional[Path] = None,
        data_generator: Optional[DataGenerator] = None,
        query_manager: Optional[QueryManager] = None
    ):
        super().__init__(scale_factor, output_dir)

        self.data_generator = data_generator or self._create_default_data_generator()
        self.query_manager = query_manager or self._create_default_query_manager()

        # Load benchmark configuration
        self.config = self._load_benchmark_config()
        self.schemas = self._load_table_schemas()

    def _create_default_data_generator(self) -> DataGenerator:
        """Create default data generator."""
        return DefaultDataGenerator()

    def _create_default_query_manager(self) -> QueryManager:
        """Create default query manager."""
        return DefaultQueryManager()

    def _load_benchmark_config(self) -> Dict[str, Any]:
        """Load benchmark configuration."""
        # Override in subclasses to load from config files
        return {
            "name": "Advanced-level Benchmark",
            "version": "1.0.0",
            "description": "Advanced-level benchmark template"
        }

    def _load_table_schemas(self) -> Dict[str, TableSchema]:
        """Load table schemas."""
        # Override in subclasses to define actual schemas
        return {}

    def generate_data(self) -> Dict[str, Path]:
        """Generate data using the configured data generator."""
        self.output_dir.mkdir(parents=True, exist_ok=True)

        data_files = {}

        for table_name, schema in self.schemas.items():
            print(f"Generating data for {table_name}...")

            output_path = self.output_dir / f"{table_name}.csv"
            generated_file = self.data_generator.generate_table_data(
                schema, self.scale_factor, output_path
            )

            data_files[table_name] = generated_file

        return data_files

    def get_queries(self) -> Dict[Union[int, str], str]:
        """Get all queries from the query manager."""
        return self.query_manager.get_all_queries()

    def get_query(self, query_id: Union[int, str]) -> str:
        """Get specific query."""
        return self.query_manager.get_query(str(query_id))

    def get_query_metadata(self, query_id: str) -> QueryMetadata:
        """Get query metadata."""
        return self.query_manager.get_query_metadata(query_id)

    def get_queries_by_category(self, category: str) -> Dict[str, str]:
        """Get queries by category."""
        return self.query_manager.get_queries_by_category(category)

class DefaultDataGenerator(DataGenerator):
    """Default data generator implementation."""

    def generate_table_data(self, table_schema: TableSchema, scale_factor: float, output_path: Path) -> Path:
        """Generate realistic data based on schema."""

        # Calculate row count
        base_rows = self._get_base_row_count(table_schema.name)
        row_count = int(base_rows * scale_factor)

        with open(output_path, 'w', newline='') as f:
            writer = csv.writer(f, delimiter='|')

            for i in range(row_count):
                row = []
                for column in table_schema.columns:
                    value = self._generate_column_value(column, i, row_count)
                    row.append(value)

                writer.writerow(row)

        return output_path

    def _get_base_row_count(self, table_name: str) -> int:
        """Get base row count for table."""
        # Override for specific table sizing
        return 1000

    def _generate_column_value(self, column: Dict[str, Any], row_id: int, total_rows: int) -> Any:
        """Generate value for a specific column."""

        column_type = column["type"].upper()
        column_name = column["name"].lower()

        # Handle different data types
        if "INTEGER" in column_type or "INT" in column_type:
            if column.get("primary_key"):
                return row_id
            else:
                return random.randint(1, 1000)

        elif "VARCHAR" in column_type or "TEXT" in column_type:
            if "category" in column_name:
                return random.choice(['A', 'B', 'C', 'D', 'E'])
            elif "name" in column_name:
                return f"Name_{random.randint(1, 1000)}"
            else:
                return f"Value_{row_id}"

        elif "DOUBLE" in column_type or "FLOAT" in column_type:
            return round(random.uniform(1.0, 1000.0), 2)

        elif "TIMESTAMP" in column_type or "DATE" in column_type:
            return f"2023-{random.randint(1, 12):02d}-{random.randint(1, 28):02d} 12:00:00"

        else:
            return f"default_{row_id}"

class DefaultQueryManager(QueryManager):
    """Default query manager implementation."""

    def __init__(self):
        self.queries = {}
        self.metadata = {}

    def add_query(self, query_id: str, query_sql: str, metadata: QueryMetadata):
        """Add a query to the manager."""
        self.queries[query_id] = query_sql
        self.metadata[query_id] = metadata

    def get_query(self, query_id: str) -> str:
        """Get specific query."""
        if query_id not in self.queries:
            raise ValueError(f"Query '{query_id}' not found")
        return self.queries[query_id]

    def get_all_queries(self) -> Dict[str, str]:
        """Get all queries."""
        return self.queries.copy()

    def get_query_metadata(self, query_id: str) -> QueryMetadata:
        """Get query metadata."""
        if query_id not in self.metadata:
            raise ValueError(f"Metadata for query '{query_id}' not found")
        return self.metadata[query_id]

    def get_queries_by_category(self, category: str) -> Dict[str, str]:
        """Get queries by category."""
        filtered_queries = {}

        for query_id, metadata in self.metadata.items():
            if metadata.category == category:
                filtered_queries[query_id] = self.queries[query_id]

        return filtered_queries

Data Generation Strategies

Realistic Data Generation

import numpy as np
from faker import Faker
from datetime import datetime, timedelta
import string

class RealisticDataGenerator(DataGenerator):
    """Generate realistic data for benchmarks."""

    def __init__(self, seed: int = 42):
        self.fake = Faker()
        self.fake.seed_instance(seed)
        random.seed(seed)
        np.random.seed(seed)

    def generate_table_data(self, table_schema: TableSchema, scale_factor: float, output_path: Path) -> Path:
        """Generate realistic data based on table schema."""

        # Determine table characteristics
        table_config = self._get_table_config(table_schema.name)
        row_count = int(table_config["base_rows"] * scale_factor)

        print(f"Generating {row_count} rows for {table_schema.name}")

        # Generate data in batches for memory efficiency
        batch_size = 10000

        with open(output_path, 'w', newline='') as f:
            writer = csv.writer(f, delimiter='|')

            for batch_start in range(0, row_count, batch_size):
                batch_end = min(batch_start + batch_size, row_count)
                batch_data = self._generate_batch(
                    table_schema, batch_start, batch_end, table_config
                )

                for row in batch_data:
                    writer.writerow(row)

        return output_path

    def _get_table_config(self, table_name: str) -> Dict[str, Any]:
        """Get configuration for specific table types."""

        configs = {
            "customer": {
                "base_rows": 10000,
                "distributions": {
                    "segment": {"AUTOMOBILE": 0.3, "BUILDING": 0.3, "FURNITURE": 0.25, "MACHINERY": 0.15},
                    "nation": "uniform"
                }
            },
            "orders": {
                "base_rows": 50000,
                "date_range": (datetime(2020, 1, 1), datetime(2023, 12, 31)),
                "order_priority": ["1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED", "5-LOW"]
            },
            "lineitem": {
                "base_rows": 200000,
                "lines_per_order": (1, 7),  # Min, max lines per order
                "discount_range": (0.0, 0.10),
                "tax_range": (0.0, 0.08)
            },
            "part": {
                "base_rows": 5000,
                "part_types": ["ECONOMY ANODIZED", "ECONOMY BRUSHED", "ECONOMY BURNISHED"],
                "containers": ["SM CASE", "SM BOX", "SM PACK", "LG CASE", "LG BOX", "LG PACK"]
            }
        }

        return configs.get(table_name, {"base_rows": 1000})

    def _generate_batch(
        self,
        table_schema: TableSchema,
        start_id: int,
        end_id: int,
        table_config: Dict[str, Any]
    ) -> List[List[Any]]:
        """Generate a batch of rows."""

        batch_data = []

        for row_id in range(start_id, end_id):
            row = []

            for column in table_schema.columns:
                value = self._generate_realistic_value(column, row_id, table_config)
                row.append(value)

            batch_data.append(row)

        return batch_data

    def _generate_realistic_value(
        self,
        column: Dict[str, Any],
        row_id: int,
        table_config: Dict[str, Any]
    ) -> Any:
        """Generate realistic value based on column semantics."""

        column_name = column["name"].lower()
        column_type = column["type"].upper()

        # Primary key handling
        if column.get("primary_key"):
            return row_id

        # Foreign key handling
        if column_name.endswith("_key") and not column.get("primary_key"):
            # Generate foreign key reference
            return self._generate_foreign_key_value(column_name, row_id, table_config)

        # Name fields
        if "name" in column_name:
            if "customer" in column_name or "supplier" in column_name:
                return self.fake.company()
            elif "nation" in column_name:
                return self.fake.country()
            else:
                return self.fake.name()

        # Address fields
        if "address" in column_name:
            return self.fake.address().replace('\n', ', ')

        # Phone fields
        if "phone" in column_name:
            return self.fake.phone_number()

        # Comment fields
        if "comment" in column_name:
            return self.fake.text(max_nb_chars=100)

        # Date fields
        if "date" in column_name or "TIMESTAMP" in column_type or "DATE" in column_type:
            if "date_range" in table_config:
                start_date, end_date = table_config["date_range"]
                return self.fake.date_between(start_date=start_date, end_date=end_date)
            else:
                return self.fake.date_between(start_date='-2y', end_date='today')

        # Numeric fields with semantic meaning
        if "price" in column_name or "amount" in column_name:
            return round(random.uniform(10.0, 10000.0), 2)

        elif "quantity" in column_name:
            return random.randint(1, 50)

        elif "discount" in column_name:
            if "discount_range" in table_config:
                min_val, max_val = table_config["discount_range"]
                return round(random.uniform(min_val, max_val), 2)
            else:
                return round(random.uniform(0.0, 0.10), 2)

        elif "tax" in column_name:
            if "tax_range" in table_config:
                min_val, max_val = table_config["tax_range"]
                return round(random.uniform(min_val, max_val), 2)
            else:
                return round(random.uniform(0.0, 0.08), 2)

        # Priority and status fields
        elif "priority" in column_name:
            if "order_priority" in table_config:
                return random.choice(table_config["order_priority"])
            else:
                return random.choice(["HIGH", "MEDIUM", "LOW"])

        elif "status" in column_name:
            return random.choice(["F", "O", "P"])  # Fulfilled, Open, Pending

        # Segment and category fields
        elif "segment" in column_name or "mktsegment" in column_name:
            if "distributions" in table_config and "segment" in table_config["distributions"]:
                segments = list(table_config["distributions"]["segment"].keys())
                weights = list(table_config["distributions"]["segment"].values())
                return np.random.choice(segments, p=weights)
            else:
                return random.choice(["AUTOMOBILE", "BUILDING", "FURNITURE", "MACHINERY"])

        # Generic type-based generation
        elif "INTEGER" in column_type or "INT" in column_type:
            return random.randint(1, 100000)

        elif "DOUBLE" in column_type or "FLOAT" in column_type or "DECIMAL" in column_type:
            return round(random.uniform(1.0, 1000.0), 2)

        elif "VARCHAR" in column_type or "TEXT" in column_type:
            max_length = self._extract_varchar_length(column_type)
            text = self.fake.text(max_nb_chars=max_length)
            return text[:max_length] if max_length else text

        else:
            return f"value_{row_id}"

    def _generate_foreign_key_value(self, column_name: str, row_id: int, table_config: Dict[str, Any]) -> int:
        """Generate realistic foreign key values."""

        # Map foreign key columns to their referenced table sizes
        fk_mappings = {
            "custkey": 10000,      # customer table size
            "orderkey": 50000,     # orders table size
            "partkey": 5000,       # part table size
            "suppkey": 1000,       # supplier table size
            "nationkey": 25,       # nation table size
            "regionkey": 5         # region table size
        }

        # Extract the base name (remove prefixes like o_, l_, etc.)
        base_name = column_name.split('_')[-1]

        if base_name in fk_mappings:
            max_value = fk_mappings[base_name]
            return random.randint(0, max_value - 1)
        else:
            # Default foreign key generation
            return random.randint(0, 1000)

    def _extract_varchar_length(self, column_type: str) -> Optional[int]:
        """Extract length from VARCHAR(n) type."""
        import re
        match = re.search(r'VARCHAR\((\d+)\)', column_type)
        return int(match.group(1)) if match else None

# Example usage with realistic data
class ECommerceBenchmark(AdvancedBenchmark):
    """E-commerce benchmark with realistic data."""

    def _create_default_data_generator(self) -> DataGenerator:
        return RealisticDataGenerator()

    def _load_table_schemas(self) -> Dict[str, TableSchema]:
        return {
            "customer": TableSchema(
                name="customer",
                columns=[
                    {"name": "custkey", "type": "INTEGER", "primary_key": True},
                    {"name": "name", "type": "VARCHAR(25)", "nullable": False},
                    {"name": "address", "type": "VARCHAR(40)", "nullable": False},
                    {"name": "nationkey", "type": "INTEGER", "nullable": False},
                    {"name": "phone", "type": "VARCHAR(15)", "nullable": False},
                    {"name": "acctbal", "type": "DECIMAL(15,2)", "nullable": False},
                    {"name": "mktsegment", "type": "VARCHAR(10)", "nullable": False},
                    {"name": "comment", "type": "VARCHAR(117)", "nullable": True}
                ]
            ),
            "orders": TableSchema(
                name="orders",
                columns=[
                    {"name": "orderkey", "type": "INTEGER", "primary_key": True},
                    {"name": "custkey", "type": "INTEGER", "nullable": False},
                    {"name": "orderstatus", "type": "VARCHAR(1)", "nullable": False},
                    {"name": "totalprice", "type": "DECIMAL(15,2)", "nullable": False},
                    {"name": "orderdate", "type": "DATE", "nullable": False},
                    {"name": "orderpriority", "type": "VARCHAR(15)", "nullable": False},
                    {"name": "clerk", "type": "VARCHAR(15)", "nullable": False},
                    {"name": "shippriority", "type": "INTEGER", "nullable": False},
                    {"name": "comment", "type": "VARCHAR(79)", "nullable": True}
                ]
            )
        }

    def _create_default_query_manager(self) -> QueryManager:
        query_manager = DefaultQueryManager()

        # Add e-commerce specific queries
        query_manager.add_query(
            "customer_orders",
            """
            SELECT c.name, COUNT(o.orderkey) as order_count, SUM(o.totalprice) as total_spent
            FROM customer c
            JOIN orders o ON c.custkey = o.custkey
            GROUP BY c.custkey, c.name
            ORDER BY total_spent DESC
            LIMIT 10
            """,
            QueryMetadata(
                query_id="customer_orders",
                name="Top Customers by Revenue",
                description="Find top 10 customers by total purchase amount",
                category="analytics",
                complexity="medium",
                sql_features=["JOIN", "GROUP BY", "ORDER BY", "LIMIT"]
            )
        )

        query_manager.add_query(
            "monthly_revenue",
            """
            SELECT
                EXTRACT(YEAR FROM orderdate) as year,
                EXTRACT(MONTH FROM orderdate) as month,
                SUM(totalprice) as monthly_revenue,
                COUNT(*) as order_count
            FROM orders
            GROUP BY EXTRACT(YEAR FROM orderdate), EXTRACT(MONTH FROM orderdate)
            ORDER BY year, month
            """,
            QueryMetadata(
                query_id="monthly_revenue",
                name="Monthly Revenue Trend",
                description="Calculate monthly revenue and order count trends",
                category="time_series",
                complexity="simple",
                sql_features=["EXTRACT", "GROUP BY", "ORDER BY"]
            )
        )

        return query_manager

# Usage example
def test_ecommerce_benchmark():
    """Test the e-commerce benchmark."""

    benchmark = ECommerceBenchmark(scale_factor=0.01)

    # Generate data
    data_files = benchmark.generate_data()

    # Test queries
    queries = benchmark.get_queries()
    for query_id in queries:
        metadata = benchmark.get_query_metadata(query_id)
        print(f"Query: {metadata.name}")
        print(f"Category: {metadata.category}")
        print(f"Complexity: {metadata.complexity}")
        print()

if __name__ == "__main__":
    test_ecommerce_benchmark()

Query Management

Advanced-level Query Management

from typing import Set
import re
import sqlparse

class AdvancedQueryManager(QueryManager):
    """Advanced-level query manager with analysis capabilities."""

    def __init__(self):
        super().__init__()
        self.query_analysis_cache = {}

    def add_query_from_file(self, query_id: str, file_path: Path, metadata: QueryMetadata):
        """Add query from SQL file."""
        with open(file_path, 'r') as f:
            query_sql = f.read()

        self.add_query(query_id, query_sql, metadata)

    def add_queries_from_directory(self, directory: Path, pattern: str = "*.sql"):
        """Add all queries from a directory."""
        for sql_file in directory.glob(pattern):
            query_id = sql_file.stem

            # Create default metadata
            metadata = QueryMetadata(
                query_id=query_id,
                name=query_id.replace('_', ' ').title(),
                description=f"Query loaded from {sql_file.name}",
                category="unknown",
                complexity="unknown",
                sql_features=[]
            )

            self.add_query_from_file(query_id, sql_file, metadata)

    def analyze_query_features(self, query_id: str) -> Set[str]:
        """Analyze SQL features used in a query."""

        if query_id in self.query_analysis_cache:
            return self.query_analysis_cache[query_id]

        query_sql = self.get_query(query_id)
        features = set()

        # Parse SQL to extract features
        try:
            parsed = sqlparse.parse(query_sql)[0]
            features = self._extract_features_from_tokens(parsed.tokens)
        except:
            # Fallback to regex-based analysis
            features = self._extract_features_regex(query_sql)

        # Cache results
        self.query_analysis_cache[query_id] = features
        return features

    def _extract_features_from_tokens(self, tokens) -> Set[str]:
        """Extract SQL features from parsed tokens."""
        features = set()

        def analyze_token(token):
            if token.ttype is sqlparse.tokens.Keyword:
                keyword = token.value.upper()

                # Map keywords to features
                keyword_mappings = {
                    'JOIN': 'joins',
                    'INNER JOIN': 'joins',
                    'LEFT JOIN': 'joins',
                    'RIGHT JOIN': 'joins',
                    'FULL JOIN': 'joins',
                    'GROUP BY': 'aggregation',
                    'HAVING': 'aggregation',
                    'WINDOW': 'window_functions',
                    'OVER': 'window_functions',
                    'WITH': 'cte',
                    'RECURSIVE': 'recursive_cte',
                    'UNION': 'set_operations',
                    'INTERSECT': 'set_operations',
                    'EXCEPT': 'set_operations',
                    'EXISTS': 'subqueries',
                    'IN': 'subqueries',
                    'ANY': 'subqueries',
                    'ALL': 'subqueries',
                    'CASE': 'conditional_logic',
                    'WHEN': 'conditional_logic',
                    'ORDER BY': 'sorting'
                }

                for kw, feature in keyword_mappings.items():
                    if kw in keyword:
                        features.add(feature)

            # Recursive analysis for nested tokens
            if hasattr(token, 'tokens'):
                for subtoken in token.tokens:
                    analyze_token(subtoken)

        for token in tokens:
            analyze_token(token)

        return features

    def _extract_features_regex(self, query_sql: str) -> Set[str]:
        """Extract SQL features using regex patterns."""
        features = set()

        query_upper = query_sql.upper()

        # Define patterns for SQL features
        patterns = {
            'joins': r'\b(JOIN|INNER\s+JOIN|LEFT\s+JOIN|RIGHT\s+JOIN|FULL\s+JOIN)\b',
            'aggregation': r'\b(GROUP\s+BY|HAVING|COUNT|SUM|AVG|MIN|MAX)\b',
            'window_functions': r'\b(OVER\s*\(|ROW_NUMBER|RANK|DENSE_RANK|LAG|LEAD)\b',
            'cte': r'\bWITH\b',
            'recursive_cte': r'\bWITH\s+RECURSIVE\b',
            'set_operations': r'\b(UNION|INTERSECT|EXCEPT)\b',
            'subqueries': r'\b(EXISTS|IN\s*\(|ANY\s*\(|ALL\s*\()\b',
            'conditional_logic': r'\b(CASE\s+WHEN|IF\()\b',
            'sorting': r'\bORDER\s+BY\b'
        }

        for feature, pattern in patterns.items():
            if re.search(pattern, query_upper):
                features.add(feature)

        return features

    def calculate_query_complexity(self, query_id: str) -> str:
        """Calculate query complexity score."""

        query_sql = self.get_query(query_id)
        features = self.analyze_query_features(query_id)

        # Complexity scoring
        complexity_score = 0

        # Base score from query length
        complexity_score += len(query_sql.split()) // 10

        # Feature-based scoring
        feature_weights = {
            'joins': 3,
            'aggregation': 2,
            'window_functions': 4,
            'cte': 3,
            'recursive_cte': 5,
            'set_operations': 3,
            'subqueries': 2,
            'conditional_logic': 1,
            'sorting': 1
        }

        for feature in features:
            complexity_score += feature_weights.get(feature, 1)

        # Count number of tables
        table_count = len(re.findall(r'\bFROM\s+(\w+)', query_sql, re.IGNORECASE))
        complexity_score += table_count

        # Classify complexity
        if complexity_score <= 5:
            return "simple"
        elif complexity_score <= 15:
            return "medium"
        else:
            return "complex"

    def get_query_statistics(self) -> Dict[str, Any]:
        """Get statistics about all queries."""

        total_queries = len(self.queries)

        # Analyze all queries
        complexities = []
        features_count = {}
        categories_count = {}

        for query_id in self.queries:
            # Complexity analysis
            complexity = self.calculate_query_complexity(query_id)
            complexities.append(complexity)

            # Feature analysis
            features = self.analyze_query_features(query_id)
            for feature in features:
                features_count[feature] = features_count.get(feature, 0) + 1

            # Category analysis
            metadata = self.get_query_metadata(query_id)
            category = metadata.category
            categories_count[category] = categories_count.get(category, 0) + 1

        # Calculate distributions
        complexity_dist = {
            complexity: complexities.count(complexity)
            for complexity in ["simple", "medium", "complex"]
        }

        return {
            "total_queries": total_queries,
            "complexity_distribution": complexity_dist,
            "feature_usage": features_count,
            "category_distribution": categories_count,
            "most_common_features": sorted(features_count.items(), key=lambda x: x[1], reverse=True)[:5]
        }

    def export_query_catalog(self, output_file: Path):
        """Export query catalog to JSON."""

        catalog = {
            "metadata": {
                "total_queries": len(self.queries),
                "generated_at": datetime.now().isoformat()
            },
            "statistics": self.get_query_statistics(),
            "queries": {}
        }

        for query_id in self.queries:
            metadata = self.get_query_metadata(query_id)
            features = self.analyze_query_features(query_id)
            complexity = self.calculate_query_complexity(query_id)

            catalog["queries"][query_id] = {
                "name": metadata.name,
                "description": metadata.description,
                "category": metadata.category,
                "complexity": complexity,
                "sql_features": list(features),
                "estimated_runtime_ms": metadata.estimated_runtime_ms,
                "query_sql": self.get_query(query_id)
            }

        with open(output_file, 'w') as f:
            json.dump(catalog, f, indent=2)

# Example: Web Analytics Benchmark
class WebAnalyticsBenchmark(AdvancedBenchmark):
    """Web analytics benchmark with realistic queries."""

    def _create_default_query_manager(self) -> QueryManager:
        query_manager = AdvancedQueryManager()

        # Page views analysis
        query_manager.add_query(
            "daily_page_views",
            """
            SELECT
                DATE(timestamp) as date,
                page_url,
                COUNT(*) as page_views,
                COUNT(DISTINCT user_id) as unique_visitors
            FROM page_views
            WHERE timestamp >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
            GROUP BY DATE(timestamp), page_url
            ORDER BY date DESC, page_views DESC
            """,
            QueryMetadata(
                query_id="daily_page_views",
                name="Daily Page Views Analysis",
                description="Analyze daily page views and unique visitors",
                category="analytics",
                complexity="medium",
                sql_features=["aggregation", "date_functions", "filtering"]
            )
        )

        # User journey analysis
        query_manager.add_query(
            "user_journey",
            """
            WITH user_sessions AS (
                SELECT
                    user_id,
                    session_id,
                    page_url,
                    timestamp,
                    ROW_NUMBER() OVER (PARTITION BY user_id, session_id ORDER BY timestamp) as step_number
                FROM page_views
                WHERE timestamp >= DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY)
            ),
            journey_paths AS (
                SELECT
                    user_id,
                    session_id,
                    STRING_AGG(page_url, ' -> ' ORDER BY step_number) as journey_path,
                    COUNT(*) as steps
                FROM user_sessions
                GROUP BY user_id, session_id
            )
            SELECT
                journey_path,
                COUNT(*) as session_count,
                AVG(steps) as avg_steps
            FROM journey_paths
            WHERE steps >= 2
            GROUP BY journey_path
            ORDER BY session_count DESC
            LIMIT 20
            """,
            QueryMetadata(
                query_id="user_journey",
                name="User Journey Analysis",
                description="Analyze most common user journey paths",
                category="behavior_analysis",
                complexity="complex",
                sql_features=["cte", "window_functions", "aggregation", "string_functions"]
            )
        )

        # Conversion funnel
        query_manager.add_query(
            "conversion_funnel",
            """
            WITH funnel_steps AS (
                SELECT
                    user_id,
                    MAX(CASE WHEN page_url LIKE '%/product/%' THEN 1 ELSE 0 END) as viewed_product,
                    MAX(CASE WHEN page_url LIKE '%/cart%' THEN 1 ELSE 0 END) as added_to_cart,
                    MAX(CASE WHEN page_url LIKE '%/checkout%' THEN 1 ELSE 0 END) as started_checkout,
                    MAX(CASE WHEN page_url LIKE '%/confirmation%' THEN 1 ELSE 0 END) as completed_purchase
                FROM page_views
                WHERE timestamp >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
                GROUP BY user_id
            )
            SELECT
                'Total Users' as step,
                COUNT(*) as users,
                100.0 as conversion_rate
            FROM funnel_steps

            UNION ALL

            SELECT
                'Viewed Product' as step,
                SUM(viewed_product) as users,
                100.0 * SUM(viewed_product) / COUNT(*) as conversion_rate
            FROM funnel_steps

            UNION ALL

            SELECT
                'Added to Cart' as step,
                SUM(added_to_cart) as users,
                100.0 * SUM(added_to_cart) / COUNT(*) as conversion_rate
            FROM funnel_steps

            UNION ALL

            SELECT
                'Started Checkout' as step,
                SUM(started_checkout) as users,
                100.0 * SUM(started_checkout) / COUNT(*) as conversion_rate
            FROM funnel_steps

            UNION ALL

            SELECT
                'Completed Purchase' as step,
                SUM(completed_purchase) as users,
                100.0 * SUM(completed_purchase) / COUNT(*) as conversion_rate
            FROM funnel_steps
            """,
            QueryMetadata(
                query_id="conversion_funnel",
                name="E-commerce Conversion Funnel",
                description="Analyze conversion rates through purchase funnel",
                category="conversion_analysis",
                complexity="complex",
                sql_features=["cte", "conditional_logic", "set_operations", "aggregation"]
            )
        )

        return query_manager

def test_web_analytics_benchmark():
    """Test web analytics benchmark with query analysis."""

    benchmark = WebAnalyticsBenchmark()
    query_manager = benchmark.query_manager

    # Analyze queries
    stats = query_manager.get_query_statistics()

    print("Query Analysis Results:")
    print(f"Total queries: {stats['total_queries']}")
    print(f"Complexity distribution: {stats['complexity_distribution']}")
    print(f"Most common features: {stats['most_common_features']}")

    # Export catalog
    query_manager.export_query_catalog(Path("web_analytics_catalog.json"))
    print("Query catalog exported to web_analytics_catalog.json")

if __name__ == "__main__":
    test_web_analytics_benchmark()

See Also


This guide provides systematic patterns for creating custom benchmarks in BenchBox. Start with simple examples and gradually implement more advanced features as needed.