AMPLab Big Data Benchmark API

Tags reference python-api custom-benchmark

Complete Python API reference for the AMPLab Big Data Benchmark.

Overview

The AMPLab Big Data Benchmark tests the performance of big data processing systems using realistic web analytics workloads. Developed by UC Berkeley’s AMPLab, this benchmark focuses on three core patterns: scanning, joining, and analytics operations on web-scale data.

Key Features:

  • Web Analytics Workload: Models internet-scale data processing

  • Three Query Types: Scan, Join, and Analytics patterns

  • Simple Schema: 3 tables (Rankings, UserVisits, Documents)

  • Scalable: From 100MB (SF 0.01) to multi-TB (SF 100+)

  • Big Data Focus: Designed for distributed processing systems

Reference: https://amplab.cs.berkeley.edu/benchmark/

Quick Start

from benchbox.amplab import AMPLab
from benchbox.platforms.duckdb import DuckDBAdapter

# Create benchmark
benchmark = AMPLab(scale_factor=0.1)
benchmark.generate_data()

# Run benchmark
adapter = DuckDBAdapter()
results = adapter.run_benchmark(benchmark)

API Reference

AMPLab Class

class AMPLab(scale_factor=1.0, output_dir=None, **kwargs)[source]

Bases: BaseBenchmark

AMPLab Big Data Benchmark implementation.

Provides AMPLab Big Data Benchmark implementation, including data generation and access to scan, join, and analytical queries for web analytics data.

Reference: AMPLab Big Data Benchmark - https://amplab.cs.berkeley.edu/benchmark/

__init__(scale_factor=1.0, output_dir=None, **kwargs)[source]

Initialize AMPLab Big Data Benchmark instance.

Parameters:
  • scale_factor (float) – Scale factor for the benchmark (1.0 = ~1GB)

  • output_dir (str | Path | None) – Directory to output generated data files

  • **kwargs – Additional implementation-specific options

generate_data()[source]

Generate AMPLab Big Data Benchmark data.

Returns:

A list of paths to the generated data files

Return type:

list[str | Path]

get_queries(dialect=None)[source]

Get all AMPLab Big Data Benchmark queries.

Parameters:

dialect (str | None) – Target SQL dialect for query translation. If None, returns original queries.

Returns:

Dictionary mapping query IDs to query strings

Return type:

dict[str, str]

get_query(query_id, *, params=None)[source]

Get specific AMPLab Big Data Benchmark query.

Parameters:
  • query_id (int | str) – ID of the query to retrieve (1-5)

  • params (dict[str, Any] | None) – Optional parameters to customize the query

Returns:

Query string

Raises:

ValueError – If query_id is invalid

Return type:

str

get_schema()[source]

Get AMPLab Big Data Benchmark schema.

Returns:

List of dictionaries describing the tables in the schema

Return type:

list[dict]

get_create_tables_sql(dialect='standard', tuning_config=None)[source]

Get SQL to create all AMPLab Big Data Benchmark tables.

Parameters:
  • dialect (str) – SQL dialect to use

  • tuning_config – Unified tuning configuration for constraint settings

Returns:

SQL script for creating all tables

Return type:

str

apply_verbosity(settings)

Apply verbosity settings to the mixin consumer.

property benchmark_name: str

Get the human-readable benchmark name.

create_enhanced_benchmark_result(platform, query_results, execution_metadata=None, phases=None, resource_utilization=None, performance_characteristics=None, **kwargs)

Create a BenchmarkResults object with standardized fields.

This centralizes the logic for creating benchmark results that was previously duplicated across platform adapters and CLI orchestrator.

Parameters:
  • platform (str) – Platform name (e.g., “DuckDB”, “ClickHouse”)

  • query_results (list[dict[str, Any]]) – List of query execution results

  • execution_metadata (dict[str, Any] | None) – Optional execution metadata

  • phases (dict[str, dict[str, Any]] | None) – Optional phase tracking information

  • resource_utilization (dict[str, Any] | None) – Optional resource usage metrics

  • performance_characteristics (dict[str, Any] | None) – Optional performance analysis

  • **kwargs (Any) – Additional fields to override defaults

Returns:

Fully configured BenchmarkResults object

Return type:

BenchmarkResults

format_results(benchmark_result)

Format benchmark results for display.

Parameters:

benchmark_result (dict[str, Any]) – Result dictionary from run_benchmark()

Returns:

Formatted string representation of the results

Return type:

str

get_data_source_benchmark()

Return the canonical source benchmark when data is shared.

Benchmarks that reuse data generated by another benchmark (for example, Primitives reusing TPC-H datasets) should override this method and return the lower-case identifier of the source benchmark. Benchmarks that produce their own data should return None (default).

log_debug_info(context='Debug')

Log comprehensive debug information including version details.

log_error_with_debug_info(error, context='Error')

Log an error with comprehensive debug information.

log_operation_complete(operation, duration=None, details='')
log_operation_start(operation, details='')
log_verbose(message)
log_version_warning()

Log version consistency warnings if any exist.

log_very_verbose(message)
property logger: Logger

Return the logger configured for the verbosity mixin consumer.

quiet: bool = False
run_benchmark(connection, query_ids=None, fetch_results=False, setup_database=True)

Run the complete benchmark suite.

Parameters:
  • connection (DatabaseConnection) – Database connection to execute queries on

  • query_ids (list[int | str] | None) – Optional list of specific query IDs to run (defaults to all)

  • fetch_results (bool) – Whether to fetch and return query results

  • setup_database (bool) – Whether to set up the database first

Returns:

  • benchmark_name: Name of the benchmark

  • total_execution_time: Total time for all queries

  • total_queries: Number of queries executed

  • successful_queries: Number of queries that succeeded

  • failed_queries: Number of queries that failed

  • query_results: List of individual query results

  • setup_time: Time taken for database setup (if performed)

Return type:

Dictionary containing

Raises:

Exception – If benchmark execution fails

run_query(query_id, connection, params=None, fetch_results=False)

Execute single query and return timing and results.

Parameters:
  • query_id (int | str) – ID of the query to execute

  • connection (DatabaseConnection) – Database connection to execute query on

  • params (dict[str, Any] | None) – Optional parameters for query customization

  • fetch_results (bool) – Whether to fetch and return query results

Returns:

  • query_id: Executed query ID

  • execution_time: Time taken to execute query in seconds

  • query_text: Executed query text

  • results: Query results if fetch_results=True, otherwise None

  • row_count: Number of rows returned (if results fetched)

Return type:

Dictionary containing

Raises:
  • ValueError – If query_id is invalid

  • Exception – If query execution fails

run_with_platform(platform_adapter, **run_config)

Run complete benchmark using platform-specific optimizations.

This method provides a unified interface for running benchmarks using database platform adapters that handle connection management, data loading optimizations, and query execution.

This is the standard method that all benchmarks should support for integration with the CLI and other orchestration tools.

Parameters:
  • platform_adapter – Platform adapter instance (e.g., DuckDBAdapter)

  • **run_config – Configuration options: - categories: List of query categories to run (if benchmark supports) - query_subset: List of specific query IDs to run - connection: Connection configuration - benchmark_type: Type hint for optimizations (‘olap’, ‘oltp’, etc.)

Returns:

BenchmarkResults object with execution results

Example

from benchbox.platforms import DuckDBAdapter

benchmark = SomeBenchmark(scale_factor=0.1) adapter = DuckDBAdapter() results = benchmark.run_with_platform(adapter)

setup_database(connection)

Set up database with schema and data.

Creates necessary database schema and loads benchmark data into the database.

Parameters:

connection (DatabaseConnection) – Database connection to set up

Raises:
  • ValueError – If data generation fails

  • Exception – If database setup fails

translate_query(query_id, dialect)

Translate a query to a specific SQL dialect.

Parameters:
  • query_id (int | str) – The ID of the query to translate

  • dialect (str) – The target SQL dialect

Returns:

The translated query string

Raises:
  • ValueError – If the query_id is invalid

  • ImportError – If sqlglot is not installed

  • ValueError – If the dialect is not supported

Return type:

str

verbose: bool = False
verbose_enabled: bool = False
verbose_level: int = 0
property verbosity_settings: VerbositySettings

Return the current verbosity settings.

very_verbose: bool = False

Constructor:

AMPLab(
    scale_factor: float = 1.0,
    output_dir: Optional[Union[str, Path]] = None,
    **kwargs
)

Parameters:

  • scale_factor (float): Data size multiplier (1.0 ≈ 10GB)

  • output_dir (str | Path | None): Output directory for generated data

Schema Methods

get_create_tables_sql(dialect='standard', tuning_config=None) str

Get SQL to create all AMPLab tables (rankings, uservisits, documents).

Example:

schema_sql = benchmark.get_create_tables_sql(dialect="duckdb")
get_schema() list[dict]

Get detailed schema information.

Query Methods

get_query(query_id, *, params=None) str

Get specific AMPLab query with optional parameters.

Parameters:

  • query_id (int | str): Query ID (1-3 or “1a”, “2a”, “3a”)

  • params (dict | None): Query parameters

Supported Parameters:

  • pagerank_threshold (int): Minimum page rank

  • start_date (str): Start date for filtering

  • end_date (str): End date for filtering

  • limit_rows (int): Result limit

  • search_term (str): Search keyword

  • min_visits (int): Minimum visit count

Example:

# Scan query with custom threshold
scan_query = benchmark.get_query("1", params={
    'pagerank_threshold': 1500
})

# Join query with date range
join_query = benchmark.get_query("2", params={
    'start_date': '1980-01-01',
    'end_date': '1980-04-01',
    'limit_rows': 100
})

# Analytics query with filters
analytics_query = benchmark.get_query("3", params={
    'search_term': 'google',
    'min_visits': 10
})
get_queries(dialect=None) dict[str, str]

Get all AMPLab queries (5 queries total).

Example:

queries = benchmark.get_queries()
print(f"Available queries: {list(queries.keys())}")
# Output: ['1', '1a', '2', '2a', '3', '3a']

Usage Examples

Basic Benchmark Execution

from benchbox.amplab import AMPLab
from benchbox.platforms.duckdb import DuckDBAdapter

# Initialize with testing scale
benchmark = AMPLab(scale_factor=0.1)
data_files = benchmark.generate_data()

# Run with DuckDB
adapter = DuckDBAdapter(memory_limit="4GB")
results = adapter.run_benchmark(benchmark)

print(f"Queries: {results.total_queries}")
print(f"Average time: {results.average_query_time:.3f}s")

Query Type Testing

from benchbox.amplab import AMPLab
from benchbox.platforms.duckdb import DuckDBAdapter
import time

benchmark = AMPLab(scale_factor=0.01)
benchmark.generate_data()

adapter = DuckDBAdapter()
conn = adapter.create_connection()
adapter.create_schema(benchmark, conn)
adapter.load_data(benchmark, conn, benchmark.output_dir)

# Test query types
query_types = {
    'Scan': ['1', '1a'],
    'Join': ['2', '2a'],
    'Analytics': ['3', '3a']
}

params = {
    'pagerank_threshold': 1000,
    'start_date': '1980-01-01',
    'end_date': '1980-04-01',
    'limit_rows': 100
}

for query_type, query_ids in query_types.items():
    print(f"\n{query_type} Queries:")
    for query_id in query_ids:
        query = benchmark.get_query(query_id, params=params)

        start = time.time()
        result = conn.execute(query).fetchall()
        elapsed = time.time() - start

        print(f"  Query {query_id}: {elapsed*1000:.1f} ms ({len(result)} rows)")

Performance Comparison

from benchbox.amplab import AMPLab
from benchbox.platforms.duckdb import DuckDBAdapter
from benchbox.core.results.timing import TimingCollector, TimingAnalyzer

# Test across scale factors
scale_factors = [0.01, 0.1, 0.5]

for sf in scale_factors:
    print(f"\n=== Scale Factor {sf} ===")

    benchmark = AMPLab(scale_factor=sf)
    benchmark.generate_data()

    adapter = DuckDBAdapter()
    conn = adapter.create_connection()
    adapter.create_schema(benchmark, conn)
    adapter.load_data(benchmark, conn, benchmark.output_dir)

    # Time scan query
    scan_query = benchmark.get_query("1", params={'pagerank_threshold': 1000})

    start = time.time()
    result = conn.execute(scan_query).fetchall()
    elapsed = time.time() - start

    print(f"  Scan query: {elapsed*1000:.1f} ms")
    print(f"  Rows: {len(result)}")

Best Practices

  1. Use Appropriate Scale Factors

    # Development
    dev = AMPLab(scale_factor=0.01)  # ~100 MB
    
    # Testing
    test = AMPLab(scale_factor=0.1)  # ~1 GB
    
    # Production
    prod = AMPLab(scale_factor=1.0)  # ~10 GB
    
  2. Parameterize Queries

    params = {
        'pagerank_threshold': 1000,
        'start_date': '1980-01-01',
        'end_date': '1980-04-01',
        'limit_rows': 100,
        'search_term': 'google',
        'min_visits': 10
    }
    
    query = benchmark.get_query("2", params=params)
    
  3. Test Query Types Separately

    # Test scan performance
    scan_queries = ['1', '1a']
    
    # Test join performance
    join_queries = ['2', '2a']
    
    # Test analytics performance
    analytics_queries = ['3', '3a']
    

See Also

External Resources