DuckDB Platform Adapter

Tags reference python-api duckdb

The DuckDB adapter provides fast, embedded analytical database execution for benchmarks.

Overview

DuckDB is included by default with BenchBox, providing:

  • No additional configuration required - Works without additional setup

  • Columnar query engine - Optimized for analytical queries

  • In-memory or persistent - Flexible storage options

  • ANSI SQL support - Comprehensive analytical SQL features

Common use cases:

  • Development and testing

  • CI/CD pipelines

  • Small to medium datasets (< 100GB)

  • Local benchmarking without cloud infrastructure

Quick Start

Basic usage:

from benchbox.tpch import TPCH
from benchbox.platforms.duckdb import DuckDBAdapter

# In-memory database (default)
adapter = DuckDBAdapter()

# Or persistent database
adapter = DuckDBAdapter(database_path="benchmark.duckdb")

# Run benchmark
benchmark = TPCH(scale_factor=0.1)
results = benchmark.run_with_platform(adapter)

API Reference

DuckDBAdapter Class

class DuckDBAdapter(**config)[source]

Bases: PlatformAdapter

DuckDB platform adapter with optimized bulk loading and execution.

property platform_name: str

Return the name of this database platform.

Default implementation returns the class name. Concrete adapters may override to provide a user-friendly display name. Tests may instantiate lightweight mock adapters without overriding this property.

static add_cli_arguments(parser)[source]

Add DuckDB-specific CLI arguments.

classmethod from_config(config)[source]

Create DuckDB adapter from unified configuration.

get_platform_info(connection=None)[source]

Get DuckDB platform information.

__init__(**config)[source]

Initialize the platform adapter with configuration.

Parameters:

**config – Platform-specific configuration options

get_database_path(**connection_config)[source]

Get the database file path for DuckDB.

Priority: 1. connection_config[“database_path”] if provided and not None 2. self.database_path (set during from_config) 3. “:memory:” as final fallback

create_connection(**connection_config)[source]

Create optimized DuckDB connection.

create_schema(benchmark, connection)[source]

Create schema using benchmark’s SQL definitions.

load_data(benchmark, connection, data_dir)[source]

Load data using DuckDB’s optimized CSV reading capabilities.

configure_for_benchmark(connection, benchmark_type)[source]

Apply DuckDB-specific optimizations based on benchmark type.

execute_query(connection, query, query_id, benchmark_type=None, scale_factor=None, validate_row_count=True, stream_id=None)[source]

Execute query with detailed timing and profiling.

get_query_plan(connection, query)[source]

Get DuckDB query execution plan.

get_query_plan_parser()[source]

Get DuckDB query plan parser.

analyze_tables(connection)[source]

Run ANALYZE on all tables for better query optimization.

get_target_dialect()[source]

Get the target SQL dialect for this platform.

supports_tuning_type(tuning_type)[source]

Check if DuckDB supports a specific tuning type.

DuckDB supports: - SORTING: Via ORDER BY in table definition (DuckDB 0.10+) - PARTITIONING: Limited support, mainly through file-based partitions

Parameters:

tuning_type – The type of tuning to check support for

Returns:

True if the tuning type is supported by DuckDB

Return type:

bool

generate_tuning_clause(table_tuning)[source]

Generate DuckDB-specific tuning clauses for CREATE TABLE statements.

DuckDB supports: - Sorting optimization hints (no explicit syntax in CREATE TABLE) - Partitioning through file organization (handled at data loading level)

Parameters:

table_tuning – The tuning configuration for the table

Returns:

SQL clause string to be appended to CREATE TABLE statement

Return type:

str

apply_table_tunings(table_name, table_tuning, connection)[source]

Apply tuning configurations to a DuckDB table.

DuckDB tuning approach: - SORTING: Create indexes on sort columns for query optimization - PARTITIONING: Log partitioning strategy (handled at data loading level) - CLUSTERING: Treat as secondary sorting for optimization hints - DISTRIBUTION: Not applicable for single-node DuckDB

Parameters:
  • table_tuning – The tuning configuration to apply

  • connection (Any) – DuckDB connection

Raises:

ValueError – If the tuning configuration is invalid for DuckDB

apply_unified_tuning(tuning_config, connection)[source]

Apply unified tuning configuration to DuckDB.

Parameters:
  • tuning_config – UnifiedTuningConfiguration instance

  • connection – DuckDB connection

apply_platform_optimizations(tuning_config, connection)[source]

Apply DuckDB-specific platform optimizations.

Parameters:
  • tuning_config – UnifiedTuningConfiguration instance

  • connection – DuckDB connection

apply_constraint_configuration(tuning_config, table_name, connection)[source]

Apply constraint configuration for a specific table.

Note: Constraints are applied during schema creation in DuckDB, so this method primarily validates the configuration.

Parameters:
  • tuning_config – UnifiedTuningConfiguration instance

  • table_name (str) – Name of the table

  • connection – DuckDB connection

validate_platform_capabilities(benchmark_type)[source]

Validate DuckDB-specific capabilities for the benchmark.

Parameters:

benchmark_type (str) – Type of benchmark (e.g., ‘tpcds’, ‘tpch’)

Returns:

ValidationResult with DuckDB capability validation status

validate_connection_health(connection)[source]

Validate DuckDB connection health and capabilities.

Parameters:

connection (Any) – DuckDB connection object

Returns:

ValidationResult with connection health status

Constructor Parameters

DuckDBAdapter(
    database_path: Optional[str] = ":memory:",
    memory_limit: Optional[str] = None,
    thread_limit: Optional[int] = None,
    temp_directory: Optional[str] = None,
    enable_profiling: bool = False,
    read_only: bool = False,
    config: Optional[Dict[str, Any]] = None
)

Parameters:

  • database_path (str, optional): Database file path or “:memory:” for in-memory. Default: “:memory:”

  • memory_limit (str, optional): Maximum memory usage (e.g., “4GB”, “512MB”). Default: No limit

  • thread_limit (int, optional): Maximum number of threads. Default: All available cores

  • temp_directory (str, optional): Directory for temporary files. Default: System temp

  • enable_profiling (bool): Enable query profiling. Default: False

  • read_only (bool): Open database in read-only mode. Default: False

  • config (dict, optional): Additional DuckDB configuration options

Configuration Examples

In-Memory Database

Suitable for small datasets and rapid iteration:

from benchbox.platforms.duckdb import DuckDBAdapter

# Default in-memory configuration
adapter = DuckDBAdapter()

# With memory limit
adapter = DuckDBAdapter(memory_limit="2GB")

# With thread control
adapter = DuckDBAdapter(
    memory_limit="4GB",
    thread_limit=4
)

Persistent Database

For reusable benchmark data:

# Create persistent database
adapter = DuckDBAdapter(database_path="./benchmarks/tpch.duckdb")

# Run benchmark (data persists)
benchmark = TPCH(scale_factor=1.0)
results = benchmark.run_with_platform(adapter)

# Later: reuse the same database
adapter2 = DuckDBAdapter(database_path="./benchmarks/tpch.duckdb")
results2 = benchmark.run_with_platform(adapter2)

Performance Tuning

Configure for optimal performance:

adapter = DuckDBAdapter(
    database_path="benchmark.duckdb",
    memory_limit="16GB",      # Set appropriate for your system
    thread_limit=8,           # Match your CPU cores
    temp_directory="/fast/ssd/temp",  # Use fast storage
    config={
        "default_order": "DESC",
        "preserve_insertion_order": False,
        "enable_object_cache": True
    }
)

Profiling and Debugging

Enable query profiling for analysis:

adapter = DuckDBAdapter(
    enable_profiling=True,
    config={
        "enable_profiling": "json",
        "profiling_output": "./profiles"
    }
)

# Run benchmark
results = benchmark.run_with_platform(adapter)

# Profile information saved to ./profiles/

Data Loading

The adapter handles data loading automatically, but you can customize the process:

Bulk Loading from Parquet

import duckdb
from benchbox.platforms.duckdb import DuckDBAdapter

# Create adapter with custom connection
adapter = DuckDBAdapter(database_path="benchmark.duckdb")

# Access underlying DuckDB connection
conn = adapter.connection

# Custom bulk load from Parquet
conn.execute("""
    CREATE TABLE lineitem AS
    SELECT * FROM read_parquet('data/lineitem/*.parquet')
""")

Loading from CSV

# DuckDB automatically detects CSV format
conn.execute("""
    CREATE TABLE customer AS
    SELECT * FROM read_csv('data/customer.tbl',
                           delim='|',
                           header=false,
                           columns={
                               'c_custkey': 'INTEGER',
                               'c_name': 'VARCHAR',
                               'c_address': 'VARCHAR',
                               'c_nationkey': 'INTEGER',
                               'c_phone': 'VARCHAR',
                               'c_acctbal': 'DECIMAL(15,2)',
                               'c_mktsegment': 'VARCHAR',
                               'c_comment': 'VARCHAR'
                           })
""")

Query Execution

Execute Queries Directly

from benchbox.platforms.duckdb import DuckDBAdapter

adapter = DuckDBAdapter()

# Execute arbitrary SQL
result = adapter.connection.execute("SELECT COUNT(*) FROM lineitem")
row_count = result.fetchone()[0]

# Execute with parameters
query = "SELECT * FROM orders WHERE o_orderdate > ?"
result = adapter.connection.execute(query, ["1995-01-01"])

Query Plans and Optimization

# Get query plan
explain_result = adapter.connection.execute(
    "EXPLAIN SELECT * FROM lineitem WHERE l_shipdate > '1995-01-01'"
)
print(explain_result.fetchall())

# Analyze query with profiling
adapter.connection.execute("PRAGMA enable_profiling")
result = adapter.connection.execute("SELECT COUNT(*) FROM lineitem")
profiling_info = adapter.connection.execute("PRAGMA profiling_output").fetchall()

Advanced Features

Parallel Query Execution

# DuckDB automatically parallelizes queries
adapter = DuckDBAdapter(
    memory_limit="16GB",
    thread_limit=8  # Use 8 threads for parallel execution
)

# Complex aggregation will use all threads
results = benchmark.run_with_platform(adapter)

Extensions and Functions

import duckdb

adapter = DuckDBAdapter()
conn = adapter.connection

# Load DuckDB extensions
conn.execute("INSTALL httpfs")
conn.execute("LOAD httpfs")

# Now can read from S3
conn.execute("""
    CREATE TABLE data AS
    SELECT * FROM read_parquet('s3://bucket/data/*.parquet')
""")

Window Functions

# DuckDB supports advanced window functions
query = """
    SELECT
        l_orderkey,
        l_partkey,
        l_extendedprice,
        ROW_NUMBER() OVER (PARTITION BY l_orderkey ORDER BY l_extendedprice DESC) as rn
    FROM lineitem
    WHERE l_shipdate > '1995-01-01'
"""
result = adapter.connection.execute(query)

Best Practices

Memory Management

  1. Set memory limits to prevent OOM errors:

    adapter = DuckDBAdapter(memory_limit="8GB")
    
  2. Use persistent databases for large datasets:

    adapter = DuckDBAdapter(database_path="large_dataset.duckdb")
    
  3. Monitor memory usage during execution:

    import psutil
    process = psutil.Process()
    print(f"Memory usage: {process.memory_info().rss / 1024 / 1024:.0f} MB")
    

Performance Optimization

  1. Match thread count to CPU cores:

    import os
    adapter = DuckDBAdapter(thread_limit=os.cpu_count())
    
  2. Use appropriate data types in schema:

    # Prefer HUGEINT over VARCHAR for large integers
    # Use DATE/TIMESTAMP instead of VARCHAR for dates
    
  3. Create indexes for filtered columns:

    conn.execute("CREATE INDEX idx_shipdate ON lineitem(l_shipdate)")
    

Data Validation

  1. Verify row counts after loading:

    expected_rows = 6_000_000  # SF=1 TPC-H
    actual_rows = conn.execute("SELECT COUNT(*) FROM lineitem").fetchone()[0]
    assert actual_rows == expected_rows, f"Expected {expected_rows}, got {actual_rows}"
    
  2. Check data types:

    schema = conn.execute("PRAGMA table_info('lineitem')").fetchall()
    for column in schema:
        print(f"{column[1]}: {column[2]}")
    

Common Issues

Out of Memory Errors

Problem: Query fails with out of memory error

Solution:

# Set explicit memory limit
adapter = DuckDBAdapter(memory_limit="4GB")

# Or use persistent database with disk spilling
adapter = DuckDBAdapter(
    database_path="benchmark.duckdb",
    memory_limit="4GB",
    temp_directory="/large/disk/temp"
)

Slow Query Performance

Problem: Queries execute slowly

Solutions:

# 1. Increase thread count
adapter = DuckDBAdapter(thread_limit=8)

# 2. Use persistent database to avoid repeated loads
adapter = DuckDBAdapter(database_path="cached.duckdb")

# 3. Enable profiling to identify bottlenecks
adapter = DuckDBAdapter(enable_profiling=True)

Database Lock Errors

Problem: “Database is locked” error

Solution:

# Use separate database files for concurrent access
adapter1 = DuckDBAdapter(database_path="benchmark1.duckdb")
adapter2 = DuckDBAdapter(database_path="benchmark2.duckdb")

# Or use in-memory for read-only workloads
adapter = DuckDBAdapter(database_path=":memory:")

See Also

Platform Documentation

Benchmark Guides

API Reference

  • Base Benchmark API - Base benchmark interface

  • index - Python API overview

  • /usage/api-reference - High-level API guide

External Resources