Adding New Platform Adapters¶
This guide provides step-by-step instructions for implementing new platform adapters in BenchBox. Platform adapters enable BenchBox to run benchmarks on different database systems.
Overview¶
Platform adapters in BenchBox follow a consistent architecture based on the PlatformAdapter base class. Each adapter provides:
Database connection management
Schema creation and data loading
Query execution with dialect translation
Performance tuning and optimization
Platform-specific features integration
Prerequisites¶
Before implementing a new platform adapter, ensure you have:
Understanding of the target database platform
Access to the platform for testing
Python client library implementing DB API 2.0 specification (see DB API 2.0 documentation)
Familiarity with BenchBox architecture
Knowledge of SQL dialect differences
DB API 2.0 Requirement¶
BenchBox platform selection is based on the availability of a robust Python library using DB API 2.0 (PEP 249). This standardized interface enables universal database access and simplifies platform integration. See the DB API 2.0 documentation for detailed information on:
Why DB API 2.0 is fundamental to BenchBox
How BenchBox uses DB API 2.0 protocols and patterns
Platform compliance levels and requirements
Testing and validation procedures
Architecture Overview¶
Base Class Structure¶
All platform adapters inherit from PlatformAdapter in benchbox/platforms/base/:
from benchbox.platforms.base import PlatformAdapter
class NewDatabaseAdapter(PlatformAdapter):
"""Adapter for NewDatabase platform."""
@property
def platform_name(self) -> str:
return "NewDatabase"
def get_target_dialect(self) -> str:
return "newdatabase" # SQL dialect identifier
# Implement required abstract methods...
Required Dependencies¶
Add platform dependencies to pyproject.toml via the [project.optional-dependencies] table:
[project.optional-dependencies]
newdatabase = ["newdatabase-python>=2.0.0"]
all = ["newdatabase-python>=2.0.0", "benchbox[cloud]"]
This allows contributors to install the adapter with uv pip install "benchbox[newdatabase]".
Step-by-Step Implementation¶
Step 1: Create Adapter File¶
Create a new file: benchbox/platforms/newdatabase.py
"""NewDatabase platform adapter with optimizations.
Provides NewDatabase-specific functionality for BenchBox benchmarking.
"""
import time
import logging
from pathlib import Path
from typing import Any, Dict, Tuple, Optional
from .base import PlatformAdapter, ConnectionConfig
# Import platform client library
try:
import newdatabase
from newdatabase import Connection
from newdatabase.errors import DatabaseError
except ImportError:
newdatabase = None
Connection = None
DatabaseError = Exception
logger = logging.getLogger(__name__)
class NewDatabaseAdapter(PlatformAdapter):
"""NewDatabase platform adapter with performance optimizations."""
def __init__(self, **config):
super().__init__(**config)
if newdatabase is None:
raise ImportError("NewDatabase client not installed. Install with: pip install newdatabase-python")
# Platform-specific configuration
self.host = config.get('host', 'localhost')
self.port = config.get('port', 5432) # Default port
self.database = config.get('database', 'benchbox')
self.username = config.get('username', 'user')
self.password = config.get('password', '')
# Performance settings
self.connection_pool_size = config.get('connection_pool_size', 5)
self.query_timeout = config.get('query_timeout', 300)
@property
def platform_name(self) -> str:
return "NewDatabase"
def get_target_dialect(self) -> str:
return "newdatabase"
# Continue with required method implementations...
Step 2: Implement Connection Management¶
Important: Ensure the returned connection object is DB API 2.0 compliant. It must support either:
Standard cursor pattern:
connection.cursor()returning a cursor withexecute()methodDirect execute pattern:
connection.execute()method available directly
See DB API 2.0 documentation for details on both patterns.
def create_connection(self, **connection_config) -> Any:
"""Create optimized NewDatabase connection.
Returns a DB API 2.0 compliant connection object supporting either:
- Standard cursor pattern: connection.cursor().execute(query)
- Direct execute pattern: connection.execute(query)
"""
# Handle existing database
self.handle_existing_database(**connection_config)
# Get connection parameters
host = connection_config.get('host', self.host)
port = connection_config.get('port', self.port)
database = connection_config.get('database', self.database)
username = connection_config.get('username', self.username)
password = connection_config.get('password', self.password)
try:
# Create database connection (DB API 2.0 compliant)
connection = newdatabase.connect(
host=host,
port=port,
database=database,
username=username,
password=password,
timeout=self.query_timeout
)
# Test connection using standard DB API 2.0 cursor pattern
cursor = connection.cursor()
cursor.execute("SELECT 1")
cursor.fetchall()
cursor.close()
logger.info(f"Connected to NewDatabase at {host}:{port}")
return connection
except Exception as e:
logger.error(f"Failed to connect to NewDatabase: {e}")
raise
def check_server_database_exists(self, **connection_config) -> bool:
"""Check if database exists on NewDatabase server."""
try:
# Create admin connection (without specifying database)
admin_connection = newdatabase.connect(
host=connection_config.get('host', self.host),
port=connection_config.get('port', self.port),
username=connection_config.get('username', self.username),
password=connection_config.get('password', self.password)
)
database = connection_config.get('database', self.database)
# Check if database exists
cursor = admin_connection.cursor()
cursor.execute("SHOW DATABASES")
databases = [row[0] for row in cursor.fetchall()]
return database in databases
except Exception:
return False
finally:
if 'admin_connection' in locals() and admin_connection:
admin_connection.close()
def drop_database(self, **connection_config) -> None:
"""Drop database on NewDatabase server."""
try:
admin_connection = newdatabase.connect(
host=connection_config.get('host', self.host),
port=connection_config.get('port', self.port),
username=connection_config.get('username', self.username),
password=connection_config.get('password', self.password)
)
database = connection_config.get('database', self.database)
cursor = admin_connection.cursor()
cursor.execute(f"DROP DATABASE IF EXISTS {database}")
cursor.close()
except Exception as e:
raise RuntimeError(f"Failed to drop NewDatabase database: {e}")
finally:
if 'admin_connection' in locals() and admin_connection:
admin_connection.close()
Step 3: Implement Schema Creation¶
def create_schema(
self,
benchmark,
connection: Any,
enable_primary_keys: bool = True,
enable_foreign_keys: bool = True
) -> float:
"""Create schema using NewDatabase-optimized table definitions."""
start_time = time.time()
try:
# Get base schema SQL with constraint settings
schema_sql = benchmark.get_create_tables_sql(
enable_primary_keys=enable_primary_keys,
enable_foreign_keys=enable_foreign_keys
)
# Translate to NewDatabase dialect if needed
if hasattr(self, 'translate_sql'):
schema_sql = self.translate_sql(schema_sql, "duckdb") # From DuckDB dialect
# Split and execute statements
statements = [stmt.strip() for stmt in schema_sql.split(';') if stmt.strip()]
cursor = connection.cursor()
for statement in statements:
# Apply platform-specific optimizations
statement = self._optimize_table_definition(statement)
cursor.execute(statement)
logger.debug(f"Executed schema statement: {statement[:100]}...")
cursor.close()
connection.commit()
logger.info(f"Schema created (PKs: {enable_primary_keys}, FKs: {enable_foreign_keys})")
except Exception as e:
logger.error(f"Schema creation failed: {e}")
raise
return time.time() - start_time
def _optimize_table_definition(self, statement: str) -> str:
"""Apply NewDatabase-specific table optimizations."""
if not statement.upper().startswith('CREATE TABLE'):
return statement
# Example: Add storage engine or other platform-specific options
if 'ENGINE' not in statement.upper():
statement += " ENGINE=InnoDB" # Example for MySQL-like databases
return statement
Step 4: Implement Data Loading¶
def load_data(self, benchmark, connection: Any, data_dir: Path) -> Tuple[Dict[str, int], float]:
"""Load data using NewDatabase bulk loading capabilities."""
start_time = time.time()
table_stats = {}
# Get data files from benchmark
if hasattr(benchmark, 'tables') and benchmark.tables:
data_files = benchmark.tables
else:
raise ValueError("No data files found. Ensure benchmark.generate_data() was called first.")
cursor = connection.cursor()
# Load data for each table
for table_name, file_path in data_files.items():
file_path = Path(file_path)
if not file_path.exists() or file_path.stat().st_size == 0:
logger.warning(f"Skipping {table_name} - no data file or empty file")
table_stats[table_name] = 0
continue
try:
load_start = time.time()
table_name_upper = table_name.upper()
# Use platform-specific bulk loading method
if self._supports_bulk_copy():
# Use COPY command or equivalent
copy_command = self._build_copy_command(table_name_upper, file_path)
cursor.execute(copy_command)
else:
# Fall back to INSERT statements
self._load_via_inserts(cursor, table_name_upper, file_path)
# Get row count
cursor.execute(f"SELECT COUNT(*) FROM {table_name_upper}")
row_count = cursor.fetchone()[0]
table_stats[table_name_upper] = row_count
load_time = time.time() - load_start
logger.info(f"✅ Loaded {row_count:,} rows into {table_name_upper} in {load_time:.2f}s")
except Exception as e:
logger.error(f"Failed to load {table_name}: {str(e)[:100]}...")
table_stats[table_name.upper()] = 0
cursor.close()
connection.commit()
total_time = time.time() - start_time
total_rows = sum(table_stats.values())
logger.info(f"✅ Loaded {total_rows:,} total rows in {total_time:.2f}s")
return table_stats, total_time
def _supports_bulk_copy(self) -> bool:
"""Check if platform supports efficient bulk loading."""
return True # Implement based on platform capabilities
def _build_copy_command(self, table_name: str, file_path: Path) -> str:
"""Build platform-specific COPY command."""
delimiter = '|' if file_path.suffix == '.tbl' else ','
return f"""
COPY {table_name} FROM '{file_path}'
WITH (FORMAT CSV, DELIMITER '{delimiter}', HEADER FALSE)
"""
def _load_via_inserts(self, cursor, table_name: str, file_path: Path):
"""Load data via INSERT statements (fallback method)."""
delimiter = '|' if file_path.suffix == '.tbl' else ','
with open(file_path, 'r') as f:
batch = []
batch_size = 1000
for line in f:
line = line.strip()
if line.endswith(delimiter):
line = line[:-1] # Remove trailing delimiter
values = line.split(delimiter)
placeholders = ','.join(['?' for _ in values])
if not batch:
insert_sql = f"INSERT INTO {table_name} VALUES ({placeholders})"
batch.append(tuple(values))
if len(batch) >= batch_size:
cursor.executemany(insert_sql, batch)
batch = []
if batch:
cursor.executemany(insert_sql, batch)
Step 5: Implement Query Execution¶
Important: Use standard DB API 2.0 cursor methods for query execution. BenchBox’s DatabaseConnection wrapper handles both cursor patterns automatically, but adapter implementations should follow DB API 2.0 conventions.
def execute_query(self, connection: Any, query: str, query_id: str) -> Dict[str, Any]:
"""Execute query with detailed timing and metrics.
Uses DB API 2.0 standard cursor pattern for query execution.
The connection parameter should be a DB API 2.0 compliant connection object.
"""
start_time = time.time()
try:
# DB API 2.0 standard cursor pattern
cursor = connection.cursor()
# Apply any platform-specific query hints or settings
self._apply_query_optimizations(cursor)
# Execute the query using DB API 2.0 execute() method
# (dialect translation handled by base class)
cursor.execute(query)
# Fetch results using DB API 2.0 fetchall() method
results = cursor.fetchall()
execution_time = time.time() - start_time
# Get platform-specific metrics if available
query_metrics = self._get_query_metrics(cursor)
# DB API 2.0 cleanup
cursor.close()
return {
'query_id': query_id,
'status': 'SUCCESS',
'execution_time': execution_time,
'rows_returned': len(results),
'first_row': results[0] if results else None,
'platform_metrics': query_metrics
}
except Exception as e:
execution_time = time.time() - start_time
return {
'query_id': query_id,
'status': 'FAILED',
'execution_time': execution_time,
'rows_returned': 0,
'error': str(e),
'error_type': type(e).__name__
}
def _apply_query_optimizations(self, cursor):
"""Apply platform-specific query optimizations."""
# Example optimizations
cursor.execute("SET query_cache = ON")
cursor.execute("SET optimizer_mode = 'performance'")
def _get_query_metrics(self, cursor) -> Dict[str, Any]:
"""Get platform-specific query execution metrics."""
try:
# Example: Get query stats if platform supports it
cursor.execute("SHOW QUERY STATS")
stats = cursor.fetchall()
return {'query_stats': stats}
except:
return {}
def configure_for_benchmark(self, connection: Any, benchmark_type: str) -> None:
"""Apply platform optimizations based on benchmark type."""
cursor = connection.cursor()
if benchmark_type.lower() in ['olap', 'analytics', 'tpch', 'tpcds']:
# OLAP optimizations
cursor.execute("SET join_algorithm = 'hash'")
cursor.execute("SET parallel_workers = 8")
cursor.execute("SET work_mem = '256MB'")
elif benchmark_type.lower() in ['oltp', 'transactional']:
# OLTP optimizations
cursor.execute("SET synchronous_commit = ON")
cursor.execute("SET random_page_cost = 1.1")
cursor.close()
Step 6: Implement Platform Metadata¶
def _get_platform_metadata(self, connection: Any) -> Dict[str, Any]:
"""Get platform-specific metadata and system information."""
metadata = {
"platform": self.platform_name,
"host": self.host,
"port": self.port,
"database": self.database
}
try:
cursor = connection.cursor()
# Get platform version
cursor.execute("SELECT VERSION()")
version_result = cursor.fetchone()
metadata["version"] = version_result[0] if version_result else "unknown"
# Get system settings
cursor.execute("SHOW VARIABLES LIKE 'max_connections'")
settings = cursor.fetchall()
metadata["settings"] = {name: value for name, value in settings}
# Get database size information
cursor.execute("""
SELECT
table_name,
table_rows,
data_length,
index_length
FROM information_schema.tables
WHERE table_schema = DATABASE()
""")
tables = cursor.fetchall()
metadata["tables"] = [
{
"name": table[0],
"rows": table[1],
"data_size": table[2],
"index_size": table[3]
} for table in tables
]
cursor.close()
except Exception as e:
metadata["metadata_error"] = str(e)
return metadata
Step 7: Implement Performance Tuning (Optional)¶
def supports_tuning_type(self, tuning_type) -> bool:
"""Check if NewDatabase supports a specific tuning type."""
try:
from benchbox.core.tuning.interface import TuningType
# Define supported tuning types for this platform
supported_types = {
TuningType.SORTING, # Supports indexes
TuningType.CLUSTERING, # Supports clustered indexes
TuningType.PARTITIONING # Supports table partitioning
}
return tuning_type in supported_types
except ImportError:
return False
def generate_tuning_clause(self, table_tuning) -> str:
"""Generate platform-specific tuning clauses for CREATE TABLE."""
if not table_tuning or not table_tuning.has_any_tuning():
return ""
clauses = []
try:
from benchbox.core.tuning.interface import TuningType
# Handle partitioning
partition_columns = table_tuning.get_columns_by_type(TuningType.PARTITIONING)
if partition_columns:
sorted_cols = sorted(partition_columns, key=lambda col: col.order)
partition_col = sorted_cols[0].name
clauses.append(f"PARTITION BY HASH({partition_col})")
# Handle clustering
cluster_columns = table_tuning.get_columns_by_type(TuningType.CLUSTERING)
if cluster_columns:
sorted_cols = sorted(cluster_columns, key=lambda col: col.order)
column_names = [col.name for col in sorted_cols]
clauses.append(f"CLUSTERED INDEX ({', '.join(column_names)})")
except ImportError:
pass
return " ".join(clauses)
def apply_table_tunings(self, table_tuning, connection: Any) -> None:
"""Apply tuning configurations to a table after creation."""
if not table_tuning or not table_tuning.has_any_tuning():
return
table_name = table_tuning.table_name
cursor = connection.cursor()
try:
from benchbox.core.tuning.interface import TuningType
# Create indexes for sorting optimization
sort_columns = table_tuning.get_columns_by_type(TuningType.SORTING)
if sort_columns:
sorted_cols = sorted(sort_columns, key=lambda col: col.order)
column_names = [col.name for col in sorted_cols]
index_name = f"idx_{table_name.lower()}_sort"
index_sql = f"CREATE INDEX {index_name} ON {table_name} ({', '.join(column_names)})"
cursor.execute(index_sql)
logger.info(f"Created sort index on {table_name}: {', '.join(column_names)}")
cursor.close()
connection.commit()
except Exception as e:
logger.error(f"Failed to apply tunings to {table_name}: {e}")
raise
Step 8: Add Power/Throughput Tests (Optional)¶
def run_power_test(self, benchmark, **kwargs) -> Dict[str, Any]:
"""Run TPC power test measuring single-stream query performance."""
return self.run_benchmark(benchmark, **kwargs).__dict__
def run_throughput_test(self, benchmark, **kwargs) -> Dict[str, Any]:
"""Run TPC throughput test measuring concurrent multi-stream performance."""
# For now, run as single stream - extend for true multi-stream later
return self.run_power_test(benchmark, **kwargs)
def run_maintenance_test(self, benchmark, **kwargs) -> Dict[str, Any]:
"""Run TPC maintenance test measuring data modification performance."""
return {"status": "NOT_IMPLEMENTED", "message": "Maintenance test not implemented"}
Integration Steps¶
Step 1: Register the Adapter¶
Add your adapter to benchbox/platforms/__init__.py:
from .newdatabase import NewDatabaseAdapter
__all__ = [
'PlatformAdapter',
'DuckDBAdapter',
'SQLiteAdapter',
# ... existing adapters
'NewDatabaseAdapter', # Add your adapter
]
Step 2: Add SQL Dialect Support¶
If your platform has a unique SQL dialect, add it to the SQL translation system in benchbox/core/sql_translation.py:
# Add dialect-specific translation rules
DIALECT_TRANSLATIONS = {
'newdatabase': {
'functions': {
'SUBSTRING': 'SUBSTR', # Example translation
'LENGTH': 'CHAR_LENGTH',
},
'types': {
'INTEGER': 'INT',
'VARCHAR': 'TEXT',
}
}
}
Step 3: Create Tests¶
Create comprehensive tests in tests/unit/platforms/test_newdatabase_adapter.py:
import pytest
from unittest.mock import Mock, patch, MagicMock
from benchbox.platforms.newdatabase import NewDatabaseAdapter
class TestNewDatabaseAdapter:
"""Test cases for NewDatabase platform adapter."""
@pytest.fixture
def adapter(self):
"""Create adapter instance for testing."""
return NewDatabaseAdapter(
host='localhost',
port=5432,
database='test',
username='test',
password='test'
)
def test_platform_name(self, adapter):
"""Test platform name property."""
assert adapter.platform_name == "NewDatabase"
def test_target_dialect(self, adapter):
"""Test SQL dialect identifier."""
assert adapter.get_target_dialect() == "newdatabase"
@patch('newdatabase.connect')
def test_create_connection_success(self, mock_connect, adapter):
"""Test successful connection creation."""
mock_connection = Mock()
mock_cursor = Mock()
mock_connection.cursor.return_value = mock_cursor
mock_connect.return_value = mock_connection
connection = adapter.create_connection()
assert connection is mock_connection
mock_connect.assert_called_once()
mock_cursor.execute.assert_called_with("SELECT 1")
@patch('newdatabase.connect')
def test_create_connection_failure(self, mock_connect, adapter):
"""Test connection failure handling."""
mock_connect.side_effect = Exception("Connection failed")
with pytest.raises(Exception, match="Connection failed"):
adapter.create_connection()
def test_schema_creation(self, adapter):
"""Test schema creation process."""
mock_benchmark = Mock()
mock_benchmark.get_create_tables_sql.return_value = """
CREATE TABLE test_table (id INTEGER, name VARCHAR(50));
CREATE TABLE test_table2 (id INTEGER, value DECIMAL(10,2));
"""
mock_connection = Mock()
mock_cursor = Mock()
mock_connection.cursor.return_value = mock_cursor
duration = adapter.create_schema(mock_benchmark, mock_connection)
assert duration > 0
assert mock_cursor.execute.call_count == 2
mock_connection.commit.assert_called_once()
def test_data_loading(self, adapter, tmp_path):
"""Test data loading functionality."""
# Create test data files
test_data_file = tmp_path / "test_table.tbl"
test_data_file.write_text("1|John Doe\n2|Jane Smith\n")
mock_benchmark = Mock()
mock_benchmark.tables = {"test_table": str(test_data_file)}
mock_connection = Mock()
mock_cursor = Mock()
mock_cursor.fetchone.return_value = (2,) # Row count
mock_connection.cursor.return_value = mock_cursor
with patch.object(adapter, '_supports_bulk_copy', return_value=True):
with patch.object(adapter, '_build_copy_command', return_value="COPY test_table FROM 'file'"):
table_stats, load_time = adapter.load_data(mock_benchmark, mock_connection, tmp_path)
assert table_stats["TEST_TABLE"] == 2
assert load_time > 0
def test_query_execution_success(self, adapter):
"""Test successful query execution."""
mock_connection = Mock()
mock_cursor = Mock()
mock_cursor.fetchall.return_value = [(1, 'test'), (2, 'data')]
mock_connection.cursor.return_value = mock_cursor
result = adapter.execute_query(mock_connection, "SELECT * FROM test", "Q1")
assert result['status'] == 'SUCCESS'
assert result['query_id'] == 'Q1'
assert result['rows_returned'] == 2
assert result['execution_time'] > 0
def test_query_execution_failure(self, adapter):
"""Test query execution error handling."""
mock_connection = Mock()
mock_cursor = Mock()
mock_cursor.execute.side_effect = Exception("SQL error")
mock_connection.cursor.return_value = mock_cursor
result = adapter.execute_query(mock_connection, "INVALID SQL", "Q1")
assert result['status'] == 'FAILED'
assert result['query_id'] == 'Q1'
assert result['rows_returned'] == 0
assert 'SQL error' in result['error']
def test_platform_metadata(self, adapter):
"""Test platform metadata collection."""
mock_connection = Mock()
mock_cursor = Mock()
mock_cursor.fetchone.return_value = ("NewDatabase 1.0",)
mock_cursor.fetchall.return_value = [("max_connections", "100")]
mock_connection.cursor.return_value = mock_cursor
metadata = adapter._get_platform_metadata(mock_connection)
assert metadata['platform'] == 'NewDatabase'
assert metadata['version'] == 'NewDatabase 1.0'
assert 'settings' in metadata
# Integration tests
class TestNewDatabaseIntegration:
"""Integration tests requiring actual NewDatabase instance."""
@pytest.fixture
def integration_adapter(self):
"""Create adapter for integration testing."""
# Only run if NewDatabase is available
pytest.importorskip("newdatabase")
return NewDatabaseAdapter(
host=os.getenv('NEWDATABASE_HOST', 'localhost'),
port=int(os.getenv('NEWDATABASE_PORT', '5432')),
database=os.getenv('NEWDATABASE_DATABASE', 'test'),
username=os.getenv('NEWDATABASE_USERNAME', 'test'),
password=os.getenv('NEWDATABASE_PASSWORD', 'test')
)
@pytest.mark.integration
def test_end_to_end_benchmark(self, integration_adapter):
"""Test complete benchmark execution."""
from benchbox import ReadPrimitivesBenchmark
benchmark = ReadPrimitivesBenchmark(scale_factor=0.001)
results = integration_adapter.run_benchmark(benchmark)
assert results.status == 'SUCCESS'
assert results.total_time > 0
assert len(results.query_results) > 0
Step 4: Add Documentation¶
Create platform documentation following the existing pattern in docs/platforms/newdatabase.md.
Step 5: Update Configuration¶
Add the new platform to configuration files:
# In example configuration files
platforms:
newdatabase:
host: "localhost"
port: 5432
database: "benchbox"
username: "benchbox"
password: "secure_password"
Testing Your Adapter¶
Unit Tests¶
Run unit tests to verify basic functionality:
# Run adapter-specific tests
pytest tests/unit/platforms/test_newdatabase_adapter.py -v
# Run all platform tests
pytest tests/unit/platforms/ -v
Integration Tests¶
Test with actual database instance:
# Set up test database
export NEWDATABASE_HOST=localhost
export NEWDATABASE_PORT=5432
export NEWDATABASE_DATABASE=test
export NEWDATABASE_USERNAME=test
export NEWDATABASE_PASSWORD=test
# Run integration tests
pytest tests/integration/ -k newdatabase -v
Benchmark Tests¶
Test with small benchmark:
from benchbox import ReadPrimitivesBenchmark
from benchbox.platforms.newdatabase import NewDatabaseAdapter
# Test with minimal benchmark
benchmark = ReadPrimitivesBenchmark(scale_factor=0.001)
adapter = NewDatabaseAdapter(
host="localhost",
database="test",
username="test",
password="test"
)
results = adapter.run_benchmark(benchmark)
print(f"Benchmark completed in {results.total_time:.2f}s")
Best Practices¶
Error Handling¶
Implement comprehensive error handling for connection failures
Provide clear error messages with troubleshooting hints
Handle platform-specific exceptions gracefully
Log errors at appropriate levels
Performance Optimization¶
Use platform-native bulk loading methods when available
Implement connection pooling for concurrent workloads
Apply platform-specific query optimizations
Monitor and report platform-specific metrics
Configuration Management¶
Support environment variable configuration
Provide sensible defaults for all parameters
Validate configuration parameters early
Document all configuration options
Testing Strategy¶
Write comprehensive unit tests with mocking
Create integration tests for real database connections
Test error conditions and edge cases
Validate performance characteristics
Documentation¶
Document all configuration parameters
Provide installation and setup instructions
Include troubleshooting guides
Add usage examples and best practices
Common Patterns¶
Connection String Parsing¶
def _parse_connection_string(self, conn_str: str) -> Dict[str, str]:
"""Parse database connection string."""
# Example: "newdatabase://user:pass@host:port/database"
import re
pattern = r"newdatabase://(?:([^:]*):([^@]*)@)?([^:]*):(\d+)/(.+)"
match = re.match(pattern, conn_str)
if not match:
raise ValueError(f"Invalid connection string: {conn_str}")
return {
'username': match.group(1) or '',
'password': match.group(2) or '',
'host': match.group(3),
'port': int(match.group(4)),
'database': match.group(5)
}
Retry Logic¶
def _execute_with_retry(self, cursor, query: str, max_retries: int = 3):
"""Execute query with retry logic for transient failures."""
import time
for attempt in range(max_retries):
try:
return cursor.execute(query)
except TransientError as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt # Exponential backoff
logger.warning(f"Query failed (attempt {attempt + 1}), retrying in {wait_time}s: {e}")
time.sleep(wait_time)
Resource Management¶
from contextlib import contextmanager
@contextmanager
def managed_connection(self, **config):
"""Context manager for database connections."""
connection = None
try:
connection = self.create_connection(**config)
yield connection
finally:
if connection:
connection.close()
@contextmanager
def managed_cursor(self, connection):
"""Context manager for database cursors."""
cursor = None
try:
cursor = connection.cursor()
yield cursor
finally:
if cursor:
cursor.close()
Advanced Features¶
Connection Pooling¶
class PooledNewDatabaseAdapter(NewDatabaseAdapter):
"""NewDatabase adapter with connection pooling."""
def __init__(self, **config):
super().__init__(**config)
self._connection_pool = None
self._pool_size = config.get('pool_size', 10)
def _get_connection_pool(self):
"""Get or create connection pool."""
if self._connection_pool is None:
from newdatabase.pool import ConnectionPool
self._connection_pool = ConnectionPool(
host=self.host,
port=self.port,
database=self.database,
username=self.username,
password=self.password,
max_connections=self._pool_size
)
return self._connection_pool
def create_connection(self, **connection_config) -> Any:
"""Get connection from pool."""
pool = self._get_connection_pool()
return pool.get_connection()
Async Support¶
import asyncio
from typing import AsyncGenerator
class AsyncNewDatabaseAdapter(NewDatabaseAdapter):
"""Async version of NewDatabase adapter."""
async def create_async_connection(self, **config):
"""Create async database connection."""
import newdatabase.asyncio as async_newdb
return await async_newdb.connect(
host=self.host,
port=self.port,
database=self.database,
username=self.username,
password=self.password
)
async def execute_query_async(self, connection, query: str, query_id: str):
"""Execute query asynchronously."""
start_time = time.time()
try:
cursor = await connection.cursor()
await cursor.execute(query)
results = await cursor.fetchall()
return {
'query_id': query_id,
'status': 'SUCCESS',
'execution_time': time.time() - start_time,
'rows_returned': len(results),
'first_row': results[0] if results else None
}
except Exception as e:
return {
'query_id': query_id,
'status': 'FAILED',
'execution_time': time.time() - start_time,
'error': str(e)
}
Troubleshooting¶
Common Issues¶
Import Errors: Ensure client library is installed and accessible
Connection Failures: Check network connectivity, credentials, and firewall settings
SQL Compatibility: Verify dialect translation is working correctly
Performance Issues: Check for proper indexing and query optimization
Memory Usage: Monitor memory consumption during large data loads
Debugging Tips¶
Enable debug logging to trace execution flow
Use database-specific profiling tools
Test with small datasets first
Validate schema creation independently
Check platform-specific documentation for optimization hints
Contributing Your Adapter¶
When your adapter is complete and tested:
Create a pull request with your implementation
Include comprehensive tests and documentation
Add the platform to the CI/CD pipeline
Update the README with the new platform support
Consider adding examples and tutorials
Your contribution helps make BenchBox more comprehensive and valuable for the analytics community!
Resources¶
Base Platform Adapter:
benchbox/platforms/base/packageExisting Adapters:
benchbox/platforms/directorySQL Translation:
benchbox/core/sql_translation.pyTest Examples:
tests/unit/platforms/Documentation Examples:
docs/platforms/BenchBox Architecture:
docs/development/architecture.md