AMPLab Big Data Benchmark API¶
Complete Python API reference for the AMPLab Big Data Benchmark.
Overview¶
The AMPLab Big Data Benchmark tests the performance of big data processing systems using realistic web analytics workloads. Developed by UC Berkeley’s AMPLab, this benchmark focuses on three core patterns: scanning, joining, and analytics operations on web-scale data.
Key Features:
Web Analytics Workload: Models internet-scale data processing
Three Query Types: Scan, Join, and Analytics patterns
Simple Schema: 3 tables (Rankings, UserVisits, Documents)
Scalable: From 100MB (SF 0.01) to multi-TB (SF 100+)
Big Data Focus: Designed for distributed processing systems
Reference: https://amplab.cs.berkeley.edu/benchmark/
Quick Start¶
from benchbox.amplab import AMPLab
from benchbox.platforms.duckdb import DuckDBAdapter
# Create benchmark
benchmark = AMPLab(scale_factor=0.1)
benchmark.generate_data()
# Run benchmark
adapter = DuckDBAdapter()
results = adapter.run_benchmark(benchmark)
API Reference¶
AMPLab Class¶
- class AMPLab(scale_factor=1.0, output_dir=None, **kwargs)[source]¶
Bases:
BaseBenchmarkAMPLab Big Data Benchmark implementation.
Provides AMPLab Big Data Benchmark implementation, including data generation and access to scan, join, and analytical queries for web analytics data.
Reference: AMPLab Big Data Benchmark - https://amplab.cs.berkeley.edu/benchmark/
- __init__(scale_factor=1.0, output_dir=None, **kwargs)[source]¶
Initialize AMPLab Big Data Benchmark instance.
- Parameters:
scale_factor (float) – Scale factor for the benchmark (1.0 = ~1GB)
output_dir (str | Path | None) – Directory to output generated data files
**kwargs – Additional implementation-specific options
- generate_data()[source]¶
Generate AMPLab Big Data Benchmark data.
- Returns:
A list of paths to the generated data files
- Return type:
list[str | Path]
- get_queries(dialect=None)[source]¶
Get all AMPLab Big Data Benchmark queries.
- Parameters:
dialect (str | None) – Target SQL dialect for query translation. If None, returns original queries.
- Returns:
Dictionary mapping query IDs to query strings
- Return type:
dict[str, str]
- get_query(query_id, *, params=None)[source]¶
Get specific AMPLab Big Data Benchmark query.
- Parameters:
query_id (int | str) – ID of the query to retrieve (1-5)
params (dict[str, Any] | None) – Optional parameters to customize the query
- Returns:
Query string
- Raises:
ValueError – If query_id is invalid
- Return type:
str
- get_schema()[source]¶
Get AMPLab Big Data Benchmark schema.
- Returns:
List of dictionaries describing the tables in the schema
- Return type:
list[dict]
- get_create_tables_sql(dialect='standard', tuning_config=None)[source]¶
Get SQL to create all AMPLab Big Data Benchmark tables.
- Parameters:
dialect (str) – SQL dialect to use
tuning_config – Unified tuning configuration for constraint settings
- Returns:
SQL script for creating all tables
- Return type:
str
- apply_verbosity(settings)¶
Apply verbosity settings to the mixin consumer.
- property benchmark_name: str¶
Get the human-readable benchmark name.
- create_enhanced_benchmark_result(platform, query_results, execution_metadata=None, phases=None, resource_utilization=None, performance_characteristics=None, **kwargs)¶
Create a BenchmarkResults object with standardized fields.
This centralizes the logic for creating benchmark results that was previously duplicated across platform adapters and CLI orchestrator.
- Parameters:
platform (str) – Platform name (e.g., “DuckDB”, “ClickHouse”)
query_results (list[dict[str, Any]]) – List of query execution results
execution_metadata (dict[str, Any] | None) – Optional execution metadata
phases (dict[str, dict[str, Any]] | None) – Optional phase tracking information
resource_utilization (dict[str, Any] | None) – Optional resource usage metrics
performance_characteristics (dict[str, Any] | None) – Optional performance analysis
**kwargs (Any) – Additional fields to override defaults
- Returns:
Fully configured BenchmarkResults object
- Return type:
- format_results(benchmark_result)¶
Format benchmark results for display.
- Parameters:
benchmark_result (dict[str, Any]) – Result dictionary from run_benchmark()
- Returns:
Formatted string representation of the results
- Return type:
str
- get_data_source_benchmark()¶
Return the canonical source benchmark when data is shared.
Benchmarks that reuse data generated by another benchmark (for example,
PrimitivesreusingTPC-Hdatasets) should override this method and return the lower-case identifier of the source benchmark. Benchmarks that produce their own data should returnNone(default).
- log_debug_info(context='Debug')¶
Log comprehensive debug information including version details.
- log_error_with_debug_info(error, context='Error')¶
Log an error with comprehensive debug information.
- log_operation_complete(operation, duration=None, details='')¶
- log_operation_start(operation, details='')¶
- log_verbose(message)¶
- log_version_warning()¶
Log version consistency warnings if any exist.
- log_very_verbose(message)¶
- property logger: Logger¶
Return the logger configured for the verbosity mixin consumer.
- quiet: bool = False¶
- run_benchmark(connection, query_ids=None, fetch_results=False, setup_database=True)¶
Run the complete benchmark suite.
- Parameters:
connection (DatabaseConnection) – Database connection to execute queries on
query_ids (list[int | str] | None) – Optional list of specific query IDs to run (defaults to all)
fetch_results (bool) – Whether to fetch and return query results
setup_database (bool) – Whether to set up the database first
- Returns:
benchmark_name: Name of the benchmark
total_execution_time: Total time for all queries
total_queries: Number of queries executed
successful_queries: Number of queries that succeeded
failed_queries: Number of queries that failed
query_results: List of individual query results
setup_time: Time taken for database setup (if performed)
- Return type:
Dictionary containing
- Raises:
Exception – If benchmark execution fails
- run_query(query_id, connection, params=None, fetch_results=False)¶
Execute single query and return timing and results.
- Parameters:
query_id (int | str) – ID of the query to execute
connection (DatabaseConnection) – Database connection to execute query on
params (dict[str, Any] | None) – Optional parameters for query customization
fetch_results (bool) – Whether to fetch and return query results
- Returns:
query_id: Executed query ID
execution_time: Time taken to execute query in seconds
query_text: Executed query text
results: Query results if fetch_results=True, otherwise None
row_count: Number of rows returned (if results fetched)
- Return type:
Dictionary containing
- Raises:
ValueError – If query_id is invalid
Exception – If query execution fails
- run_with_platform(platform_adapter, **run_config)¶
Run complete benchmark using platform-specific optimizations.
This method provides a unified interface for running benchmarks using database platform adapters that handle connection management, data loading optimizations, and query execution.
This is the standard method that all benchmarks should support for integration with the CLI and other orchestration tools.
- Parameters:
platform_adapter – Platform adapter instance (e.g., DuckDBAdapter)
**run_config – Configuration options: - categories: List of query categories to run (if benchmark supports) - query_subset: List of specific query IDs to run - connection: Connection configuration - benchmark_type: Type hint for optimizations (‘olap’, ‘oltp’, etc.)
- Returns:
BenchmarkResults object with execution results
Example
from benchbox.platforms import DuckDBAdapter
benchmark = SomeBenchmark(scale_factor=0.1) adapter = DuckDBAdapter() results = benchmark.run_with_platform(adapter)
- setup_database(connection)¶
Set up database with schema and data.
Creates necessary database schema and loads benchmark data into the database.
- Parameters:
connection (DatabaseConnection) – Database connection to set up
- Raises:
ValueError – If data generation fails
Exception – If database setup fails
- translate_query(query_id, dialect)¶
Translate a query to a specific SQL dialect.
- Parameters:
query_id (int | str) – The ID of the query to translate
dialect (str) – The target SQL dialect
- Returns:
The translated query string
- Raises:
ValueError – If the query_id is invalid
ImportError – If sqlglot is not installed
ValueError – If the dialect is not supported
- Return type:
str
- verbose: bool = False¶
- verbose_enabled: bool = False¶
- verbose_level: int = 0¶
- property verbosity_settings: VerbositySettings¶
Return the current verbosity settings.
- very_verbose: bool = False¶
Constructor:
AMPLab(
scale_factor: float = 1.0,
output_dir: Optional[Union[str, Path]] = None,
**kwargs
)
Parameters:
scale_factor (float): Data size multiplier (1.0 ≈ 10GB)
output_dir (str | Path | None): Output directory for generated data
Schema Methods¶
- get_create_tables_sql(dialect='standard', tuning_config=None) str¶
Get SQL to create all AMPLab tables (rankings, uservisits, documents).
Example:
schema_sql = benchmark.get_create_tables_sql(dialect="duckdb")
- get_schema() list[dict]¶
Get detailed schema information.
Query Methods¶
- get_query(query_id, *, params=None) str¶
Get specific AMPLab query with optional parameters.
Parameters:
query_id (int | str): Query ID (1-3 or “1a”, “2a”, “3a”)
params (dict | None): Query parameters
Supported Parameters:
pagerank_threshold (int): Minimum page rank
start_date (str): Start date for filtering
end_date (str): End date for filtering
limit_rows (int): Result limit
search_term (str): Search keyword
min_visits (int): Minimum visit count
Example:
# Scan query with custom threshold scan_query = benchmark.get_query("1", params={ 'pagerank_threshold': 1500 }) # Join query with date range join_query = benchmark.get_query("2", params={ 'start_date': '1980-01-01', 'end_date': '1980-04-01', 'limit_rows': 100 }) # Analytics query with filters analytics_query = benchmark.get_query("3", params={ 'search_term': 'google', 'min_visits': 10 })
- get_queries(dialect=None) dict[str, str]¶
Get all AMPLab queries (5 queries total).
Example:
queries = benchmark.get_queries() print(f"Available queries: {list(queries.keys())}") # Output: ['1', '1a', '2', '2a', '3', '3a']
Usage Examples¶
Basic Benchmark Execution¶
from benchbox.amplab import AMPLab
from benchbox.platforms.duckdb import DuckDBAdapter
# Initialize with testing scale
benchmark = AMPLab(scale_factor=0.1)
data_files = benchmark.generate_data()
# Run with DuckDB
adapter = DuckDBAdapter(memory_limit="4GB")
results = adapter.run_benchmark(benchmark)
print(f"Queries: {results.total_queries}")
print(f"Average time: {results.average_query_time:.3f}s")
Query Type Testing¶
from benchbox.amplab import AMPLab
from benchbox.platforms.duckdb import DuckDBAdapter
import time
benchmark = AMPLab(scale_factor=0.01)
benchmark.generate_data()
adapter = DuckDBAdapter()
conn = adapter.create_connection()
adapter.create_schema(benchmark, conn)
adapter.load_data(benchmark, conn, benchmark.output_dir)
# Test query types
query_types = {
'Scan': ['1', '1a'],
'Join': ['2', '2a'],
'Analytics': ['3', '3a']
}
params = {
'pagerank_threshold': 1000,
'start_date': '1980-01-01',
'end_date': '1980-04-01',
'limit_rows': 100
}
for query_type, query_ids in query_types.items():
print(f"\n{query_type} Queries:")
for query_id in query_ids:
query = benchmark.get_query(query_id, params=params)
start = time.time()
result = conn.execute(query).fetchall()
elapsed = time.time() - start
print(f" Query {query_id}: {elapsed*1000:.1f} ms ({len(result)} rows)")
Performance Comparison¶
from benchbox.amplab import AMPLab
from benchbox.platforms.duckdb import DuckDBAdapter
from benchbox.core.results.timing import TimingCollector, TimingAnalyzer
# Test across scale factors
scale_factors = [0.01, 0.1, 0.5]
for sf in scale_factors:
print(f"\n=== Scale Factor {sf} ===")
benchmark = AMPLab(scale_factor=sf)
benchmark.generate_data()
adapter = DuckDBAdapter()
conn = adapter.create_connection()
adapter.create_schema(benchmark, conn)
adapter.load_data(benchmark, conn, benchmark.output_dir)
# Time scan query
scan_query = benchmark.get_query("1", params={'pagerank_threshold': 1000})
start = time.time()
result = conn.execute(scan_query).fetchall()
elapsed = time.time() - start
print(f" Scan query: {elapsed*1000:.1f} ms")
print(f" Rows: {len(result)}")
Best Practices¶
Use Appropriate Scale Factors
# Development dev = AMPLab(scale_factor=0.01) # ~100 MB # Testing test = AMPLab(scale_factor=0.1) # ~1 GB # Production prod = AMPLab(scale_factor=1.0) # ~10 GB
Parameterize Queries
params = { 'pagerank_threshold': 1000, 'start_date': '1980-01-01', 'end_date': '1980-04-01', 'limit_rows': 100, 'search_term': 'google', 'min_visits': 10 } query = benchmark.get_query("2", params=params)
Test Query Types Separately
# Test scan performance scan_queries = ['1', '1a'] # Test join performance join_queries = ['2', '2a'] # Test analytics performance analytics_queries = ['3', '3a']
See Also¶
AMPLab Big Data Benchmark - AMPLab benchmark guide
ClickBench Benchmark API - ClickBench analytics benchmark
TPC-H Benchmark API - TPC-H benchmark
Base Benchmark API - Base benchmark interface
External Resources¶
AMPLab Benchmark - Original specification
Berkeley AMPLab - Research lab