"""BigQuery platform adapter with native BigQuery SQL and Cloud Storage integration.
Provides BigQuery-specific optimizations for large-scale analytics,
including efficient data loading via Cloud Storage and native BigQuery features.
Copyright 2026 Joe Harris / BenchBox Project
Licensed under the MIT License. See LICENSE file in the project root for details.
"""
from __future__ import annotations
import argparse
import json
import logging
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from benchbox.core.tuning.interface import (
ForeignKeyConfiguration,
PlatformOptimizationConfiguration,
PrimaryKeyConfiguration,
UnifiedTuningConfiguration,
)
from benchbox.utils.cloud_storage import get_cloud_path_info, is_cloud_path
from ..utils.dependencies import check_platform_dependencies, get_dependency_error_message
from .base import PlatformAdapter
try:
import google.auth
google_auth = google.auth # Store reference for _load_credentials
from google.cloud import bigquery, storage
from google.cloud.exceptions import NotFound
from google.oauth2 import service_account
except ImportError:
google_auth = None
bigquery = None
storage = None
service_account = None
[docs]
class BigQueryAdapter(PlatformAdapter):
"""BigQuery platform adapter with Cloud Storage integration."""
[docs]
def __init__(self, **config):
super().__init__(**config)
# Check dependencies with improved error message
available, missing = check_platform_dependencies("bigquery")
if not available:
error_msg = get_dependency_error_message("bigquery", missing)
raise ImportError(error_msg)
self._dialect = "bigquery"
# BigQuery configuration
self.project_id = config.get("project_id")
self.dataset_id = config.get("dataset_id") or "benchbox"
self.location = config.get("location") or "US"
self.credentials_path = config.get("credentials_path")
# Cloud Storage settings for data loading
# Check for staging_root first (set by orchestrator for CloudStagingPath)
staging_root = config.get("staging_root")
if staging_root:
# Parse gs://bucket/path format to extract bucket and prefix
from benchbox.utils.cloud_storage import get_cloud_path_info
path_info = get_cloud_path_info(staging_root)
if path_info["provider"] in ("gs", "gcs"):
self.storage_bucket = path_info["bucket"]
# Use the path component if provided, otherwise use default
self.storage_prefix = path_info["path"].strip("/") if path_info["path"] else "benchbox-data"
self.logger.info(
f"Using staging location from config: gs://{self.storage_bucket}/{self.storage_prefix}"
)
else:
raise ValueError(f"BigQuery requires GCS (gs://) staging location, got: {path_info['provider']}://")
else:
# Fall back to explicit storage_bucket configuration
self.storage_bucket = config.get("storage_bucket")
self.storage_prefix = config.get("storage_prefix") or "benchbox-data"
# Query settings
self.job_priority = config.get("job_priority") or "INTERACTIVE" # INTERACTIVE or BATCH
# Disable result cache by default for accurate benchmarking
# Can be overridden with query_cache=true or disable_result_cache=false
if config.get("query_cache") is not None:
self.query_cache = config.get("query_cache")
elif config.get("disable_result_cache") is not None:
self.query_cache = not config.get("disable_result_cache")
else:
# Default: disable cache for accurate benchmark results
self.query_cache = False
self.dry_run = config.get("dry_run") if config.get("dry_run") is not None else False
self.maximum_bytes_billed = config.get("maximum_bytes_billed")
# Table settings
self.clustering_fields = config.get("clustering_fields") or []
self.partitioning_field = config.get("partitioning_field")
if not self.project_id:
from ..core.exceptions import ConfigurationError
raise ConfigurationError(
"BigQuery configuration requires project_id.\n"
"Configure with one of:\n"
" 1. CLI: benchbox platforms setup --platform bigquery\n"
" 2. Environment variable: BIGQUERY_PROJECT\n"
" 3. CLI option: --platform-option project_id=<your-project>\n"
"Also ensure Google Cloud credentials are configured:\n"
" - Set GOOGLE_APPLICATION_CREDENTIALS to service account JSON path\n"
" - Or run 'gcloud auth application-default login'"
)
[docs]
@staticmethod
def add_cli_arguments(parser: argparse.ArgumentParser) -> None:
"""Add BigQuery-specific CLI arguments."""
bq_group = parser.add_argument_group("BigQuery Arguments")
bq_group.add_argument("--project-id", type=str, help="BigQuery project ID")
bq_group.add_argument("--dataset-id", type=str, help="BigQuery dataset ID")
bq_group.add_argument("--location", type=str, default="US", help="BigQuery dataset location")
bq_group.add_argument("--credentials-path", type=str, help="Path to Google Cloud credentials file")
bq_group.add_argument("--storage-bucket", type=str, help="GCS bucket for data loading")
[docs]
@classmethod
def from_config(cls, config: dict[str, Any]):
"""Create BigQuery adapter from unified configuration."""
from benchbox.utils.database_naming import generate_database_name
adapter_config = {}
very_verbose = config.get("very_verbose", False)
# Auto-detect project ID if not provided
if not config.get("project_id"):
try:
_, project_id = google.auth.default()
if project_id:
adapter_config["project_id"] = project_id
if very_verbose:
logging.info(f"Auto-detected BigQuery project ID: {project_id}")
except google.auth.exceptions.DefaultCredentialsError:
if very_verbose:
logging.warning("Could not auto-detect BigQuery project ID. Please provide --project-id.")
# Override with explicit config values
if config.get("project_id"):
adapter_config["project_id"] = config["project_id"]
# Generate dataset name
dataset_name = generate_database_name(
benchmark_name=config["benchmark"],
scale_factor=config["scale_factor"],
platform="bigquery",
tuning_config=config.get("tuning_config"),
)
adapter_config["dataset_id"] = dataset_name
# Pass through other relevant config
for key in [
"location",
"credentials_path",
"storage_bucket",
"tuning_config",
"verbose_enabled",
"very_verbose",
]:
if key in config:
adapter_config[key] = config[key]
return cls(**adapter_config)
@property
def platform_name(self) -> str:
return "BigQuery"
[docs]
def get_target_dialect(self) -> str:
"""Return the target SQL dialect for BigQuery."""
return "bigquery"
def _get_connection_params(self, **connection_config) -> dict[str, Any]:
"""Get standardized connection parameters."""
return {
"project_id": connection_config.get("project_id", self.project_id),
"location": connection_config.get("location", self.location),
"credentials_path": connection_config.get("credentials_path", self.credentials_path),
}
def _load_credentials(self, credentials_path: str | None) -> Any:
"""Load Google Cloud credentials from service account file.
This method loads credentials directly from the file without setting
environment variables, which is more secure as it avoids credential
exposure via environment inspection.
Args:
credentials_path: Path to service account JSON file, or None for default
Returns:
Credentials object suitable for BigQuery/Storage clients
"""
if credentials_path:
return service_account.Credentials.from_service_account_file(credentials_path)
# Fall back to default credentials (ADC)
credentials, _ = google_auth.default()
return credentials
def _create_admin_client(self, **connection_config) -> Any:
"""Create BigQuery client for admin operations."""
params = self._get_connection_params(**connection_config)
credentials = self._load_credentials(params["credentials_path"])
return bigquery.Client(
project=params["project_id"],
location=params["location"],
credentials=credentials,
)
[docs]
def check_server_database_exists(self, **connection_config) -> bool:
"""Check if dataset exists in BigQuery project."""
try:
client = self._create_admin_client(**connection_config)
dataset_id = connection_config.get("dataset", self.dataset_id)
# Check if dataset exists
datasets = list(client.list_datasets())
dataset_names = [d.dataset_id for d in datasets]
return dataset_id in dataset_names
except Exception:
# If we can't connect or check, assume dataset doesn't exist
return False
[docs]
def drop_database(self, **connection_config) -> None:
"""Drop dataset in BigQuery project."""
try:
client = self._create_admin_client(**connection_config)
dataset_id = connection_config.get("dataset", self.dataset_id)
# Create dataset reference
dataset_ref = client.dataset(dataset_id)
# Drop dataset and all its tables
client.delete_dataset(dataset_ref, delete_contents=True, not_found_ok=True)
except Exception as e:
raise RuntimeError(f"Failed to drop BigQuery dataset {dataset_id}: {e}")
def _validate_database_compatibility(self, **connection_config):
"""Validate database compatibility with BigQuery-specific empty table detection.
Extends base validation to add fast empty table detection using BigQuery's
table.num_rows metadata property. This catches cases where tables exist but
have no data (usually from failed previous loads).
Args:
**connection_config: Connection configuration
Returns:
DatabaseValidationResult with compatibility information
"""
# First run the standard validation
from benchbox.platforms.base.validation import DatabaseValidator
validator = DatabaseValidator(adapter=self, connection_config=connection_config)
result = validator.validate()
# If validation already failed, no need for additional checks
if not result.is_valid:
return result
# BigQuery-specific check: detect empty tables using fast num_rows API
# This is more comprehensive than the standard row count validation which
# only samples 2 tables for performance. BigQuery's num_rows is metadata
# and doesn't require running queries, so we can check all tables quickly.
try:
client = self._create_admin_client(**connection_config)
dataset_ref = client.dataset(self.dataset_id, project=self.project_id)
try:
tables = list(client.list_tables(dataset_ref))
except Exception as e:
# If we can't list tables, validation already failed in base validator
self.log_very_verbose(f"Could not list tables for empty table check: {e}")
return result
if not tables:
# No tables means database is empty - base validator should have caught this
return result
# Count empty tables using fast metadata API
empty_count = 0
empty_tables = []
for table_info in tables:
try:
table = client.get_table(table_info.reference)
if table.num_rows == 0:
empty_count += 1
empty_tables.append(table.table_id)
except Exception as e:
self.log_very_verbose(f"Failed to check table {table_info.table_id}: {e}")
# If more than half the tables are empty, mark database as invalid
# This indicates a failed previous load where schema was created but data loading failed
if empty_count > len(tables) / 2:
self.log_verbose(
f"Found {empty_count}/{len(tables)} empty tables - database appears to have failed data load"
)
result.issues.append(
f"Empty tables detected: {empty_count}/{len(tables)} tables have no rows "
f"(indicates failed previous load)"
)
result.is_valid = False
result.can_reuse = False
except Exception as e:
# Don't fail validation if empty table check fails - just log it
self.log_very_verbose(f"Empty table check failed: {e}")
return result
def _cleanup_empty_tables_if_needed(self, client) -> None:
"""Fallback cleanup for empty tables (defense-in-depth).
This is a fallback mechanism that runs after connection is created.
The primary empty table detection now happens during _validate_database_compatibility(),
which prevents the database from being marked as reusable in the first place.
This method is kept as defense-in-depth in case validation is bypassed or
tables become empty between validation and connection.
If more than half the tables are empty, it drops all empty tables and forces
schema/data recreation by setting database_was_reused = False.
Args:
client: BigQuery client connection
"""
try:
self.logger.debug(f"Starting empty table cleanup check for dataset {self.dataset_id}")
dataset_ref = client.dataset(self.dataset_id, project=self.project_id)
self.logger.debug(f"Listing tables in dataset {self.dataset_id}")
tables = list(client.list_tables(dataset_ref))
self.logger.debug(f"Found {len(tables)} tables in dataset")
if not tables:
# No tables exist - nothing to clean up
self.logger.debug("No tables found - nothing to clean up")
return
empty_count = 0
empty_tables = []
# Check each table for emptiness
self.logger.debug(f"Checking {len(tables)} tables for empty rows")
for table_info in tables:
try:
table = client.get_table(table_info.reference)
if table.num_rows == 0:
empty_count += 1
empty_tables.append(table.table_id)
self.logger.debug(f"Table {table.table_id} is empty (0 rows)")
except Exception as e:
self.logger.warning(f"Failed to check table {table_info.table_id}: {e}")
self.logger.debug(f"Empty table count: {empty_count}/{len(tables)}")
# If more than half the tables are empty, likely a failed load - drop them all
if empty_count > len(tables) / 2:
self.logger.warning(
f"Found {empty_count}/{len(tables)} empty tables - cleaning up failed previous load"
)
# Drop all empty tables
for table_id in empty_tables:
try:
table_ref = dataset_ref.table(table_id)
client.delete_table(table_ref, not_found_ok=True)
self.logger.info(f"Dropped empty table: {table_id}")
except Exception as e:
self.logger.warning(f"Failed to drop empty table {table_id}: {e}")
# Mark database as NOT reused so schema and data will be loaded
self.database_was_reused = False
self.logger.info(f"Dropped {len(empty_tables)} empty tables - forcing schema and data recreation")
elif empty_count > 0:
# Some tables are empty but not majority - just log it
self.logger.info(
f"Found {empty_count} empty tables (out of {len(tables)} total) - not enough to trigger cleanup"
)
else:
self.logger.debug("No empty tables found")
except Exception as e:
# Don't fail if cleanup check fails - just log and continue
import traceback
self.logger.error(f"Empty table cleanup check failed: {e}")
self.logger.error(f"Traceback: {traceback.format_exc()}")
[docs]
def create_connection(self, **connection_config) -> Any:
"""Create optimized BigQuery client connection."""
self.log_operation_start("BigQuery connection")
# Handle existing database using base class method
self.handle_existing_database(**connection_config)
# Get connection parameters
params = self._get_connection_params(**connection_config)
self.log_very_verbose(
f"BigQuery connection params: project={params.get('project_id')}, location={params.get('location')}"
)
try:
# Load credentials securely (without environment variable exposure)
credentials = self._load_credentials(params["credentials_path"])
# Create BigQuery client
client = bigquery.Client(
project=params["project_id"],
location=params["location"],
credentials=credentials,
)
# Test connection
query = "SELECT 1 as test"
query_job = client.query(query)
list(query_job.result())
self.logger.info(f"Connected to BigQuery project: {params['project_id']}")
# If database was reused, check for and clean up empty tables from failed previous loads
self.logger.debug(
f"Checking if cleanup needed: database_was_reused={getattr(self, 'database_was_reused', False)}"
)
if getattr(self, "database_was_reused", False):
self.logger.info("Database was reused - checking for empty tables from failed previous loads")
self._cleanup_empty_tables_if_needed(client)
# Storage client will be created lazily when needed (see load_data)
self.log_operation_complete("BigQuery connection", details=f"Connected to project {params['project_id']}")
return client
except Exception as e:
self.logger.error(f"Failed to connect to BigQuery: {e}")
raise
def _get_existing_tables(self, connection: Any) -> list[str]:
"""Get list of existing tables in BigQuery dataset.
Returns table names in lowercase for case-insensitive comparison.
BigQuery stores table names in the case they were created (usually uppercase
for TPC benchmarks), but we normalize to lowercase for validation.
"""
try:
dataset_ref = connection.dataset(self.dataset_id, project=self.project_id)
tables = list(connection.list_tables(dataset_ref))
# Normalize to lowercase since BigQuery is case-insensitive
return [table.table_id.lower() for table in tables]
except Exception as e:
self.log_very_verbose(f"Failed to list tables: {e}")
return []
def _validate_data_integrity(
self, benchmark, connection: Any, table_stats: dict[str, int]
) -> tuple[str, dict[str, Any]]:
"""Validate data integrity using BigQuery Client API.
BigQuery Client doesn't support cursor pattern (connection.cursor()) like
traditional databases. Instead, it uses connection.query() for SQL execution.
This override prevents "'Client' object has no attribute 'cursor'" errors
during database compatibility validation.
Args:
benchmark: Benchmark instance
connection: BigQuery Client
table_stats: Dictionary of table names to row counts
Returns:
Tuple of (status, validation_details)
"""
validation_details = {}
try:
accessible_tables = []
inaccessible_tables = []
for table_name in table_stats:
try:
# BigQuery stores tables in uppercase for TPC benchmarks
table_upper = table_name.upper()
# Use BigQuery's query API instead of cursor pattern
query = f"SELECT 1 FROM `{self.project_id}.{self.dataset_id}.{table_upper}` LIMIT 1"
query_job = connection.query(query)
list(query_job.result()) # Execute query to verify table is accessible
accessible_tables.append(table_name)
except Exception as e:
self.log_very_verbose(f"Table {table_name} inaccessible: {e}")
inaccessible_tables.append(table_name)
if inaccessible_tables:
validation_details["inaccessible_tables"] = inaccessible_tables
validation_details["constraints_enabled"] = False
return "FAILED", validation_details
else:
validation_details["accessible_tables"] = accessible_tables
validation_details["constraints_enabled"] = True
return "PASSED", validation_details
except Exception as e:
validation_details["constraints_enabled"] = False
validation_details["integrity_error"] = str(e)
return "FAILED", validation_details
[docs]
def get_table_row_count(self, connection: Any, table: str) -> int:
"""Get row count using BigQuery Client API.
Overrides base implementation that uses cursor pattern.
BigQuery Client doesn't have .cursor() method, so we use .query() instead.
Args:
connection: BigQuery Client
table: Table name
Returns:
Row count as integer, or 0 if unable to determine
"""
try:
# BigQuery stores tables in uppercase for TPC benchmarks
table_upper = table.upper()
# Use BigQuery's query API instead of cursor pattern
query = f"SELECT COUNT(*) FROM `{self.project_id}.{self.dataset_id}.{table_upper}`"
query_job = connection.query(query)
result = list(query_job.result())
return result[0][0] if result else 0
except Exception as e:
self.log_very_verbose(f"Could not get row count for {table}: {e}")
return 0
[docs]
def create_schema(self, benchmark, connection: Any) -> float:
"""Create schema using BigQuery dataset and tables."""
start_time = time.time()
try:
# Create dataset if it doesn't exist
dataset_ref = connection.dataset(self.dataset_id)
try:
connection.get_dataset(dataset_ref)
self.logger.info(f"Dataset {self.dataset_id} already exists")
except NotFound:
dataset = bigquery.Dataset(dataset_ref)
dataset.location = self.location
dataset.description = (
f"BenchBox benchmark data for {benchmark._name if hasattr(benchmark, '_name') else 'benchmark'}"
)
dataset = connection.create_dataset(dataset)
self.logger.info(f"Created dataset {self.dataset_id}")
# Use common schema creation helper
schema_sql = self._create_schema_with_tuning(benchmark, source_dialect="duckdb")
# Split schema into individual statements and execute
statements = [stmt.strip() for stmt in schema_sql.split(";") if stmt.strip()]
for statement in statements:
# Convert to BigQuery table definition
bq_statement = self._convert_to_bigquery_table(statement)
# Execute via query job
query_job = connection.query(bq_statement)
query_job.result() # Wait for completion
self.logger.debug(f"Executed schema statement: {bq_statement[:100]}...")
self.logger.info("Schema created")
except Exception as e:
self.logger.error(f"Schema creation failed: {e}")
raise
return time.time() - start_time
[docs]
def load_data(
self, benchmark, connection: Any, data_dir: Path
) -> tuple[dict[str, int], float, dict[str, Any] | None]:
"""Load data using BigQuery efficient loading via Cloud Storage."""
logger = logging.getLogger(__name__)
logger.debug(f"Starting data loading for benchmark: {benchmark.__class__.__name__}")
logger.debug(f"Data directory: {data_dir}")
# Check if using cloud storage
if is_cloud_path(str(data_dir)):
path_info = get_cloud_path_info(str(data_dir))
logger.info(f"Loading data from cloud storage: {path_info['provider']} bucket '{path_info['bucket']}'")
print(f" Loading data from {path_info['provider']} cloud storage")
start_time = time.time()
table_stats = {}
try:
# Get data files from benchmark or manifest fallback
if hasattr(benchmark, "tables") and benchmark.tables:
data_files = benchmark.tables
else:
# Manifest fallback
data_files = None
try:
manifest_path = Path(data_dir) / "_datagen_manifest.json"
if manifest_path.exists():
with open(manifest_path) as f:
manifest = json.load(f)
tables = manifest.get("tables") or {}
mapping = {}
for table, entries in tables.items():
if entries:
# Collect ALL chunk files, not just the first one
chunk_paths = []
for entry in entries:
rel = entry.get("path")
if rel:
chunk_paths.append(Path(data_dir) / rel)
if chunk_paths:
mapping[table] = chunk_paths
if mapping:
data_files = mapping
logger.debug("Using data files from _datagen_manifest.json")
except Exception as e:
logger.debug(f"Manifest fallback failed: {e}")
if not data_files:
# No data files available - benchmark should have generated data first
raise ValueError("No data files found. Ensure benchmark.generate_data() was called first.")
# Check for incompatible compression format (BigQuery only supports gzip or uncompressed for CSV files)
for table_name, file_paths in data_files.items():
if not isinstance(file_paths, list):
file_paths = [file_paths]
for file_path in file_paths:
if Path(file_path).suffix == ".zst":
benchmark_name = getattr(benchmark, "name", "unknown")
scale_factor = getattr(benchmark, "scale_factor", "unknown")
raise ValueError(
f"\n❌ Incompatible data compression detected\n\n"
f"BigQuery does not support Zstd (.zst) compression for CSV file loading.\n"
f"Found Zstd file: {Path(file_path).name}\n\n"
f"To fix this, regenerate the data with gzip compression:\n\n"
f" # Remove existing incompatible data\n"
f" rm -rf {benchmark.data_dir}\n\n"
f" # Regenerate with gzip compression\n"
f" benchbox run --platform bigquery --benchmark {benchmark_name} "
f"--scale {scale_factor} --compression-type gzip\n\n"
f"Or use uncompressed data (larger files, slower uploads):\n\n"
f" benchbox run --platform bigquery --benchmark {benchmark_name} "
f"--scale {scale_factor} --no-compression\n"
)
# Upload files to Cloud Storage if bucket is configured
if self.storage_bucket:
# Create Storage client on-demand with same credentials as BigQuery client
params = self._get_connection_params()
credentials = self._load_credentials(params["credentials_path"])
storage_client = storage.Client(project=self.project_id, credentials=credentials)
bucket = storage_client.bucket(self.storage_bucket)
# Load data for each table (handle multi-chunk files)
for table_name, file_paths in data_files.items():
# Normalize to list (data resolver should always return lists now)
if not isinstance(file_paths, list):
file_paths = [file_paths]
# Filter out non-existent or empty files
valid_files = []
for file_path in file_paths:
if is_cloud_path(str(file_path)):
# For cloud paths, trust the generator created the file
valid_files.append(file_path)
else:
file_path = Path(file_path)
if file_path.exists() and file_path.stat().st_size > 0:
valid_files.append(file_path)
if not valid_files:
self.logger.warning(f"Skipping {table_name} - no valid data files")
table_stats[table_name] = 0
continue
logger.debug(f"Loading {table_name} from {len(valid_files)} file(s)")
try:
self.log_verbose(f"Loading data for table: {table_name}")
load_start = time.time()
table_name_upper = table_name.upper()
table_ref = connection.dataset(self.dataset_id).table(table_name_upper)
# Load each file chunk
for file_idx, file_path in enumerate(valid_files):
file_path = Path(file_path)
chunk_info = f" (chunk {file_idx + 1}/{len(valid_files)})" if len(valid_files) > 1 else ""
# Detect file format to determine delimiter
# TPC-H uses .tbl (pipe-delimited), TPC-DS uses .dat (pipe-delimited)
# Use substring check to handle chunked files like customer.tbl.1 or customer.tbl.1.zst
file_str = str(file_path.name)
delimiter = "|" if ".tbl" in file_str or ".dat" in file_str else ","
# Upload file directly to Cloud Storage with original compression and format
# BigQuery supports compressed files and any delimiter natively
# Preserve file extension so BigQuery can detect compression
blob_name = f"{self.storage_prefix}/{table_name}_{file_idx}{file_path.suffix}"
self.log_very_verbose(f"Uploading to Cloud Storage{chunk_info}: {blob_name}")
blob = bucket.blob(blob_name)
blob.upload_from_filename(str(file_path))
# Load from Cloud Storage to BigQuery
# First file truncates table, subsequent files append
write_disposition = (
bigquery.WriteDisposition.WRITE_TRUNCATE
if file_idx == 0
else bigquery.WriteDisposition.WRITE_APPEND
)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=0,
autodetect=False, # Use existing schema
field_delimiter=delimiter,
allow_quoted_newlines=True,
write_disposition=write_disposition,
)
# Create load job
uri = f"gs://{self.storage_bucket}/{blob_name}"
load_job = connection.load_table_from_uri(uri, table_ref, job_config=job_config)
load_job.result() # Wait for completion
# Get final row count after loading all chunks
query = f"SELECT COUNT(*) FROM `{self.project_id}.{self.dataset_id}.{table_name_upper}`"
query_job = connection.query(query)
result = list(query_job.result())
row_count = result[0][0] if result else 0
table_stats[table_name_upper] = row_count
load_time = time.time() - load_start
chunk_info = f" from {len(valid_files)} file(s)" if len(valid_files) > 1 else ""
self.logger.info(
f"✅ Loaded {row_count:,} rows into {table_name_upper}{chunk_info} in {load_time:.2f}s"
)
except Exception as e:
self.logger.error(f"Failed to load {table_name}: {str(e)[:100]}...")
table_stats[table_name.upper()] = 0
else:
# Direct loading without Cloud Storage (less efficient)
self.logger.warning("No Cloud Storage bucket configured, using direct loading")
for table_name, file_paths in data_files.items():
# Normalize to list (handle both single paths and lists for TPC-H vs TPC-DS)
if not isinstance(file_paths, list):
file_paths = [file_paths]
# Filter valid files
valid_files = []
for file_path in file_paths:
file_path = Path(file_path)
if file_path.exists() and file_path.stat().st_size > 0:
valid_files.append(file_path)
if not valid_files:
self.logger.warning(f"Skipping {table_name} - no valid data files")
table_stats[table_name.upper()] = 0
continue
try:
self.log_verbose(f"Direct loading data for table: {table_name}")
load_start = time.time()
table_name_upper = table_name.upper()
# Load directly from local file(s)
table_ref = connection.dataset(self.dataset_id).table(table_name_upper)
# Load each chunk file
for file_idx, file_path in enumerate(valid_files):
chunk_info = f" (chunk {file_idx + 1}/{len(valid_files)})" if len(valid_files) > 1 else ""
self.log_very_verbose(f"Loading {table_name}{chunk_info} from {file_path.name}")
# Detect delimiter from filename (handle chunked files like customer.tbl.1)
file_str = str(file_path.name)
delimiter = "|" if ".tbl" in file_str or ".dat" in file_str else ","
# Use WRITE_APPEND for subsequent chunks, WRITE_TRUNCATE for first
write_disposition = (
bigquery.WriteDisposition.WRITE_TRUNCATE
if file_idx == 0
else bigquery.WriteDisposition.WRITE_APPEND
)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
# TPC-H uses .tbl files, TPC-DS uses .dat files - both are pipe-delimited
field_delimiter=delimiter,
skip_leading_rows=0,
autodetect=False,
write_disposition=write_disposition,
)
with open(file_path, "rb") as source_file:
load_job = connection.load_table_from_file(
source_file, table_ref, job_config=job_config
)
load_job.result()
# Get final row count after all chunks loaded
query = f"SELECT COUNT(*) FROM `{self.project_id}.{self.dataset_id}.{table_name_upper}`"
query_job = connection.query(query)
result = list(query_job.result())
row_count = result[0][0] if result else 0
table_stats[table_name_upper] = row_count
load_time = time.time() - load_start
chunk_info = f" from {len(valid_files)} file(s)" if len(valid_files) > 1 else ""
self.logger.info(
f"✅ Loaded {row_count:,} rows into {table_name_upper}{chunk_info} in {load_time:.2f}s"
)
except Exception as e:
self.logger.error(f"Failed to load {table_name}: {str(e)[:100]}...")
table_stats[table_name.upper()] = 0
total_time = time.time() - start_time
total_rows = sum(table_stats.values())
self.logger.info(f"✅ Loaded {total_rows:,} total rows in {total_time:.2f}s")
except Exception as e:
self.logger.error(f"Data loading failed: {e}")
raise
# BigQuery doesn't provide detailed per-table timings yet
return table_stats, total_time, None
[docs]
def execute_query(
self,
connection: Any,
query: str,
query_id: str,
benchmark_type: str | None = None,
scale_factor: float | None = None,
validate_row_count: bool = True,
stream_id: int | None = None,
) -> dict[str, Any]:
"""Execute query with detailed timing and cost tracking."""
start_time = time.time()
try:
# Replace table references with fully qualified names
# Note: Query dialect translation is now handled automatically by the base adapter
# Skip qualification if query has backtick-quoted identifiers (from sqlglot)
# because default_dataset handles unqualified names and regex breaks backticks
if "`" in query:
# Query processed by sqlglot with identify=True
# Normalize lowercase table names to UPPERCASE to match TPC-DS schema
# (BigQuery backtick-quoted identifiers are case-sensitive)
translated_query = self._normalize_table_names_case(query)
else:
# Non-translated queries (e.g., raw TPC-H) need explicit qualification
translated_query = self._qualify_table_names(query)
# Use default job config if available
job_config = getattr(connection, "_default_job_config", bigquery.QueryJobConfig())
# Execute the query
query_job = connection.query(translated_query, job_config=job_config)
result = list(query_job.result())
execution_time = time.time() - start_time
actual_row_count = len(result) if result else 0
# Get job statistics
job_stats = {
"bytes_processed": query_job.total_bytes_processed,
"bytes_billed": query_job.total_bytes_billed,
"slot_ms": query_job.slot_millis,
"creation_time": query_job.created.isoformat() if query_job.created else None,
"start_time": query_job.started.isoformat() if query_job.started else None,
"end_time": query_job.ended.isoformat() if query_job.ended else None,
}
# Validate row count if enabled and benchmark type is provided
validation_result = None
if validate_row_count and benchmark_type:
from benchbox.core.validation.query_validation import QueryValidator
validator = QueryValidator()
validation_result = validator.validate_query_result(
benchmark_type=benchmark_type,
query_id=query_id,
actual_row_count=actual_row_count,
scale_factor=scale_factor,
stream_id=stream_id,
)
# Log validation result
if validation_result.warning_message:
self.log_verbose(f"Row count validation: {validation_result.warning_message}")
elif not validation_result.is_valid:
self.log_verbose(f"Row count validation FAILED: {validation_result.error_message}")
else:
self.log_very_verbose(
f"Row count validation PASSED: {actual_row_count} rows "
f"(expected: {validation_result.expected_row_count})"
)
# Use base helper to build result with consistent validation field mapping
result_dict = self._build_query_result_with_validation(
query_id=query_id,
execution_time=execution_time,
actual_row_count=actual_row_count,
first_row=result[0] if result else None,
validation_result=validation_result,
)
# Include BigQuery-specific fields
result_dict["translated_query"] = translated_query if translated_query != query else None
result_dict["job_statistics"] = job_stats
result_dict["job_id"] = query_job.job_id
# Map job_statistics to resource_usage for cost calculation
result_dict["resource_usage"] = job_stats
return result_dict
except Exception as e:
execution_time = time.time() - start_time
return {
"query_id": query_id,
"status": "FAILED",
"execution_time": execution_time,
"rows_returned": 0,
"error": str(e),
"error_type": type(e).__name__,
}
def _convert_to_bigquery_table(self, statement: str) -> str:
"""Convert CREATE TABLE statement to BigQuery format.
Makes tables idempotent by using CREATE OR REPLACE TABLE.
"""
if not statement.upper().startswith("CREATE TABLE"):
return statement
# Ensure idempotency with OR REPLACE (defense-in-depth)
if "CREATE TABLE" in statement and "OR REPLACE" not in statement.upper():
statement = statement.replace("CREATE TABLE", "CREATE OR REPLACE TABLE", 1)
# Include dataset qualification
if f"{self.dataset_id}." not in statement:
statement = statement.replace(
"CREATE OR REPLACE TABLE ", f"CREATE OR REPLACE TABLE `{self.project_id}.{self.dataset_id}."
)
statement = statement.replace(" (", "` (")
# Include partitioning and clustering if configured
if "PARTITION BY" not in statement.upper() and self.partitioning_field:
statement += f" PARTITION BY DATE({self.partitioning_field})"
if "CLUSTER BY" not in statement.upper() and self.clustering_fields:
clustering = ", ".join(self.clustering_fields)
statement += f" CLUSTER BY {clustering}"
return statement
def _qualify_table_names(self, query: str) -> str:
"""Add full qualification to table names in query.
Note: Only used for non-translated queries (e.g., raw TPC-H queries without sqlglot).
Queries processed by sqlglot with identify=True should skip this method to avoid
conflicts with backtick-quoted identifiers. When default_dataset is configured,
BigQuery automatically resolves unqualified table names.
"""
# Simple table name qualification - could be with proper SQL parsing
table_names = [
"REGION",
"NATION",
"CUSTOMER",
"SUPPLIER",
"PART",
"PARTSUPP",
"ORDERS",
"LINEITEM",
]
for table_name in table_names:
# Replace unqualified table names
qualified_name = f"`{self.project_id}.{self.dataset_id}.{table_name}`"
# Simple replacement - in production would use proper SQL parser
import re
pattern = rf"\b{table_name}\b"
query = re.sub(pattern, qualified_name, query, flags=re.IGNORECASE)
return query
def _normalize_table_names_case(self, query: str) -> str:
"""Normalize backtick-quoted table names to UPPERCASE for case-sensitive matching.
BigQuery stores TPC-DS tables in UPPERCASE (per schema), but sqlglot generates
lowercase table names with backticks. Since backtick-quoted identifiers are
case-sensitive in BigQuery, we need to normalize to UPPERCASE to match the schema.
Only processes backtick-quoted identifiers to avoid affecting string literals.
Args:
query: SQL query with backtick-quoted identifiers
Returns:
Query with lowercase table names normalized to UPPERCASE
"""
import re
# Pattern: backtick, lowercase word characters (table names), backtick
# This safely matches table identifiers without affecting string literals
pattern = r"`([a-z_][a-z0-9_]*)`"
def uppercase_table(match: re.Match[str]) -> str:
return f"`{match.group(1).upper()}`"
return re.sub(pattern, uppercase_table, query)
def _get_platform_metadata(self, connection: Any) -> dict[str, Any]:
"""Get BigQuery-specific metadata and system information."""
metadata = {
"platform": self.platform_name,
"project_id": self.project_id,
"dataset_id": self.dataset_id,
"location": self.location,
"result_cache_enabled": self.query_cache,
}
try:
# Get dataset information
dataset_ref = connection.dataset(self.dataset_id)
dataset = connection.get_dataset(dataset_ref)
metadata["dataset_info"] = {
"created": dataset.created.isoformat() if dataset.created else None,
"modified": dataset.modified.isoformat() if dataset.modified else None,
"location": dataset.location,
"description": dataset.description,
}
# Get table list and sizes
tables = list(connection.list_tables(dataset))
table_info = []
for table in tables:
table_ref = dataset.table(table.table_id)
table_obj = connection.get_table(table_ref)
table_info.append(
{
"table_id": table.table_id,
"num_rows": table_obj.num_rows,
"num_bytes": table_obj.num_bytes,
"created": table_obj.created.isoformat() if table_obj.created else None,
"modified": table_obj.modified.isoformat() if table_obj.modified else None,
}
)
metadata["tables"] = table_info
# Get project information
try:
# This requires additional permissions
project = connection.get_project(self.project_id)
metadata["project_info"] = {
"display_name": project.display_name,
"project_number": project.project_number,
}
except Exception:
pass # Skip if no permissions
except Exception as e:
metadata["metadata_error"] = str(e)
return metadata
[docs]
def get_query_plan(self, connection: Any, query: str) -> dict[str, Any]:
"""Get query execution plan for analysis."""
try:
# Use dry run to get query plan without execution
job_config = bigquery.QueryJobConfig(dry_run=True)
# Note: Query dialect translation is now handled automatically by the base adapter
qualified_query = self._qualify_table_names(query)
query_job = connection.query(qualified_query, job_config=job_config)
return {
"bytes_processed": query_job.total_bytes_processed,
"estimated_cost": query_job.total_bytes_processed / (1024**4) * 5, # Rough cost estimate
"query_plan": "Dry run completed",
"job_id": query_job.job_id,
}
except Exception as e:
return {"error": str(e)}
[docs]
def close_connection(self, connection: Any) -> None:
"""Close BigQuery connection.
Handles credential refresh errors gracefully during connection cleanup.
Suppresses all credential-related errors as they are non-fatal during cleanup.
"""
if not connection:
return
try:
if hasattr(connection, "close"):
connection.close()
except Exception as e:
# Suppress credential refresh errors during cleanup (anonymous credentials, expired tokens, etc.)
# These are non-fatal - the connection is being closed anyway
error_msg = str(e).lower()
if any(keyword in error_msg for keyword in ["credential", "refresh", "anonymous", "auth", "token"]):
# Silently suppress credential/auth errors during cleanup
self.log_very_verbose(f"Credential cleanup warning (non-fatal, suppressed): {e}")
else:
self.logger.warning(f"Error closing connection: {e}")
# Also try to clean up any transport/channel resources that might have credential issues
try:
if hasattr(connection, "_transport"):
transport = connection._transport
if hasattr(transport, "close"):
transport.close()
except Exception:
# Silently ignore transport cleanup errors
pass
[docs]
def supports_tuning_type(self, tuning_type) -> bool:
"""Check if BigQuery supports a specific tuning type.
BigQuery supports:
- PARTITIONING: Via PARTITION BY clause (date/timestamp/integer columns)
- CLUSTERING: Via CLUSTER BY clause (up to 4 columns)
Args:
tuning_type: The type of tuning to check support for
Returns:
True if the tuning type is supported by BigQuery
"""
# Import here to avoid circular imports
try:
from benchbox.core.tuning.interface import TuningType
return tuning_type in {TuningType.PARTITIONING, TuningType.CLUSTERING}
except ImportError:
return False
[docs]
def generate_tuning_clause(self, table_tuning) -> str:
"""Generate BigQuery-specific tuning clauses for CREATE TABLE statements.
BigQuery supports:
- PARTITION BY DATE(column), DATETIME_TRUNC(column, DAY), column (for date/integer)
- CLUSTER BY column1, column2, ... (up to 4 columns)
Args:
table_tuning: The tuning configuration for the table
Returns:
SQL clause string to be appended to CREATE TABLE statement
"""
if not table_tuning or not table_tuning.has_any_tuning():
return ""
clauses = []
try:
# Import here to avoid circular imports
from benchbox.core.tuning.interface import TuningType
# Handle partitioning
partition_columns = table_tuning.get_columns_by_type(TuningType.PARTITIONING)
if partition_columns:
# Sort by order and use first column for partitioning
sorted_cols = sorted(partition_columns, key=lambda col: col.order)
partition_col = sorted_cols[0] # BigQuery typically uses single column partitioning
# Determine partition strategy based on column type
col_type = partition_col.type.upper()
if any(date_type in col_type for date_type in ["DATE", "TIMESTAMP", "DATETIME"]):
if "DATE" in col_type:
partition_clause = f"PARTITION BY {partition_col.name}"
else:
partition_clause = f"PARTITION BY DATE({partition_col.name})"
elif "INT" in col_type:
# Integer partitioning with range
partition_clause = (
f"PARTITION BY RANGE_BUCKET({partition_col.name}, GENERATE_ARRAY(0, 1000000, 10000))"
)
else:
# Default date-based partitioning
partition_clause = f"PARTITION BY DATE({partition_col.name})"
clauses.append(partition_clause)
# Handle clustering (up to 4 columns)
cluster_columns = table_tuning.get_columns_by_type(TuningType.CLUSTERING)
if cluster_columns:
# Sort by order and take up to 4 columns
sorted_cols = sorted(cluster_columns, key=lambda col: col.order)
cluster_cols = sorted_cols[:4] # BigQuery limit of 4 clustering columns
column_names = [col.name for col in cluster_cols]
cluster_clause = f"CLUSTER BY {', '.join(column_names)}"
clauses.append(cluster_clause)
# Distribution and sorting not directly supported in BigQuery CREATE TABLE
# but handled through partitioning and clustering
except ImportError:
# If tuning interface not available, return empty string
pass
return " ".join(clauses)
[docs]
def apply_table_tunings(self, table_tuning, connection: Any) -> None:
"""Apply tuning configurations to a BigQuery table.
BigQuery tuning approach:
- PARTITIONING: Handled in CREATE TABLE via PARTITION BY
- CLUSTERING: Handled in CREATE TABLE via CLUSTER BY
- Additional optimization via table options
Args:
table_tuning: The tuning configuration to apply
connection: BigQuery client connection
Raises:
ValueError: If the tuning configuration is invalid for BigQuery
"""
if not table_tuning or not table_tuning.has_any_tuning():
return
table_name = table_tuning.table_name
self.logger.info(f"Applying BigQuery tunings for table: {table_name}")
try:
# Import here to avoid circular imports
from benchbox.core.tuning.interface import TuningType
# BigQuery tuning is primarily handled at table creation time
# Post-creation optimizations are limited
# Get table reference
dataset_ref = connection.dataset(self.dataset_id)
table_ref = dataset_ref.table(table_name)
try:
table_obj = connection.get_table(table_ref)
# Log current table configuration
if table_obj.time_partitioning:
self.logger.info(f"Table {table_name} has partitioning: {table_obj.time_partitioning.type_}")
if table_obj.clustering_fields:
self.logger.info(f"Table {table_name} has clustering: {table_obj.clustering_fields}")
# Check if table needs to be recreated with new tuning
partition_columns = table_tuning.get_columns_by_type(TuningType.PARTITIONING)
cluster_columns = table_tuning.get_columns_by_type(TuningType.CLUSTERING)
needs_recreation = False
# Check if partitioning configuration changed
if partition_columns and not table_obj.time_partitioning:
needs_recreation = True
self.logger.info(f"Table {table_name} needs recreation for partitioning")
# Check if clustering configuration changed
if cluster_columns:
sorted_cols = sorted(cluster_columns, key=lambda col: col.order)
desired_clustering = [col.name for col in sorted_cols[:4]]
current_clustering = table_obj.clustering_fields or []
if desired_clustering != current_clustering:
needs_recreation = True
self.logger.info(f"Table {table_name} needs recreation for clustering")
if needs_recreation:
self.logger.warning(
f"Table {table_name} configuration differs from desired tuning. "
"Consider recreating the table with proper tuning configuration."
)
except Exception as e:
self.logger.warning(f"Could not verify table configuration for {table_name}: {e}")
# Handle unsupported tuning types
distribution_columns = table_tuning.get_columns_by_type(TuningType.DISTRIBUTION)
if distribution_columns:
self.logger.warning(f"Distribution tuning not directly supported in BigQuery for table: {table_name}")
sorting_columns = table_tuning.get_columns_by_type(TuningType.SORTING)
if sorting_columns:
# In BigQuery, sorting is achieved through clustering
sorted_cols = sorted(sorting_columns, key=lambda col: col.order)
column_names = [col.name for col in sorted_cols]
self.logger.info(
f"Sorting in BigQuery achieved via clustering for table {table_name}: {', '.join(column_names)}"
)
except ImportError:
self.logger.warning("Tuning interface not available - skipping tuning application")
except Exception as e:
raise ValueError(f"Failed to apply tunings to BigQuery table {table_name}: {e}")
[docs]
def apply_unified_tuning(self, unified_config: UnifiedTuningConfiguration, connection: Any) -> None:
"""Apply unified tuning configuration to BigQuery.
Args:
unified_config: Unified tuning configuration to apply
connection: BigQuery connection
"""
if not unified_config:
return
# Apply constraint configurations
self.apply_constraint_configuration(unified_config.primary_keys, unified_config.foreign_keys, connection)
# Apply platform optimizations
if unified_config.platform_optimizations:
self.apply_platform_optimizations(unified_config.platform_optimizations, connection)
# Apply table-level tunings
for _table_name, table_tuning in unified_config.table_tunings.items():
self.apply_table_tunings(table_tuning, connection)
[docs]
def apply_constraint_configuration(
self,
primary_key_config: PrimaryKeyConfiguration,
foreign_key_config: ForeignKeyConfiguration,
connection: Any,
) -> None:
"""Apply constraint configurations to BigQuery.
Note: BigQuery has limited constraint support. Constraints are mainly for metadata/optimization.
Args:
primary_key_config: Primary key constraint configuration
foreign_key_config: Foreign key constraint configuration
connection: BigQuery connection
"""
# BigQuery constraints are applied at table creation time
# This method is called after tables are created, so log the configurations
if primary_key_config and primary_key_config.enabled:
self.logger.info("Primary key constraints enabled for BigQuery (applied during table creation)")
if foreign_key_config and foreign_key_config.enabled:
self.logger.info("Foreign key constraints enabled for BigQuery (applied during table creation)")
# BigQuery doesn't support ALTER TABLE to add constraints after creation
# So there's no additional work to do here
def _build_bigquery_config(
platform: str,
options: dict[str, Any],
overrides: dict[str, Any],
info: Any,
) -> Any:
"""Build BigQuery database configuration with credential loading.
This function loads saved credentials from the CredentialManager and
merges them with CLI options and runtime overrides.
Args:
platform: Platform name (should be 'bigquery')
options: CLI platform options from --platform-option flags
overrides: Runtime overrides from orchestrator
info: Platform info from registry
Returns:
DatabaseConfig with credentials loaded and platform-specific fields at top-level
"""
from benchbox.core.config import DatabaseConfig
from benchbox.security.credentials import CredentialManager
# Load saved credentials
cred_manager = CredentialManager()
saved_creds = cred_manager.get_platform_credentials("bigquery") or {}
# Build merged options: saved_creds < options < overrides
merged_options = {}
merged_options.update(saved_creds)
merged_options.update(options)
merged_options.update(overrides)
# Extract credential fields for DatabaseConfig
name = info.display_name if info else "Google BigQuery"
driver_package = info.driver_package if info else "google-cloud-bigquery"
# Handle staging_root from orchestrator (overrides) or fall back to default_output_location
staging_root = overrides.get("staging_root")
if not staging_root:
# Fall back to default_output_location from credentials/options
default_output = merged_options.get("default_output_location")
if default_output and isinstance(default_output, str) and default_output.startswith("gs://"):
staging_root = default_output
# Parse staging_root to extract storage_bucket and storage_prefix
storage_bucket = merged_options.get("storage_bucket")
storage_prefix = merged_options.get("storage_prefix")
if staging_root:
try:
path_info = get_cloud_path_info(staging_root)
if path_info and path_info.get("bucket"):
storage_bucket = path_info["bucket"]
storage_prefix = path_info.get("path", "")
except Exception:
# If parsing fails, fall back to merged_options values
pass
# Build config dict with platform-specific fields at top-level
# This allows BigQueryAdapter.__init__() to access them via config.get()
config_dict = {
"type": "bigquery",
"name": name,
"options": merged_options or {}, # Ensure options is never None (Pydantic v2 uses None if explicitly passed)
"driver_package": driver_package,
"driver_version": overrides.get("driver_version") or options.get("driver_version"),
"driver_auto_install": bool(overrides.get("driver_auto_install", options.get("driver_auto_install", False))),
# Platform-specific fields at top-level (adapters expect these here)
"project_id": merged_options.get("project_id"),
"dataset_id": merged_options.get("dataset_id"),
"location": merged_options.get("location"),
"credentials_path": merged_options.get("credentials_path"),
"staging_root": staging_root, # Pass through staging_root
"storage_bucket": storage_bucket, # Use parsed bucket or fallback
"storage_prefix": storage_prefix, # Use parsed prefix or fallback
}
return DatabaseConfig(**config_dict)
# Register the config builder with the platform hook registry
# This must happen when the module is imported
try:
from benchbox.cli.platform_hooks import PlatformHookRegistry
PlatformHookRegistry.register_config_builder("bigquery", _build_bigquery_config)
except ImportError:
# Platform hooks may not be available in all contexts (e.g., core-only usage)
pass