Source code for benchbox.platforms.bigquery

"""BigQuery platform adapter with native BigQuery SQL and Cloud Storage integration.

Provides BigQuery-specific optimizations for large-scale analytics,
including efficient data loading via Cloud Storage and native BigQuery features.

Copyright 2026 Joe Harris / BenchBox Project

Licensed under the MIT License. See LICENSE file in the project root for details.
"""

from __future__ import annotations

import argparse
import json
import logging
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from benchbox.core.tuning.interface import (
        ForeignKeyConfiguration,
        PlatformOptimizationConfiguration,
        PrimaryKeyConfiguration,
        UnifiedTuningConfiguration,
    )

from benchbox.utils.cloud_storage import get_cloud_path_info, is_cloud_path

from ..utils.dependencies import check_platform_dependencies, get_dependency_error_message
from .base import PlatformAdapter

try:
    import google.auth

    google_auth = google.auth  # Store reference for _load_credentials
    from google.cloud import bigquery, storage
    from google.cloud.exceptions import NotFound
    from google.oauth2 import service_account
except ImportError:
    google_auth = None
    bigquery = None
    storage = None
    service_account = None


[docs] class BigQueryAdapter(PlatformAdapter): """BigQuery platform adapter with Cloud Storage integration."""
[docs] def __init__(self, **config): super().__init__(**config) # Check dependencies with improved error message available, missing = check_platform_dependencies("bigquery") if not available: error_msg = get_dependency_error_message("bigquery", missing) raise ImportError(error_msg) self._dialect = "bigquery" # BigQuery configuration self.project_id = config.get("project_id") self.dataset_id = config.get("dataset_id") or "benchbox" self.location = config.get("location") or "US" self.credentials_path = config.get("credentials_path") # Cloud Storage settings for data loading # Check for staging_root first (set by orchestrator for CloudStagingPath) staging_root = config.get("staging_root") if staging_root: # Parse gs://bucket/path format to extract bucket and prefix from benchbox.utils.cloud_storage import get_cloud_path_info path_info = get_cloud_path_info(staging_root) if path_info["provider"] in ("gs", "gcs"): self.storage_bucket = path_info["bucket"] # Use the path component if provided, otherwise use default self.storage_prefix = path_info["path"].strip("/") if path_info["path"] else "benchbox-data" self.logger.info( f"Using staging location from config: gs://{self.storage_bucket}/{self.storage_prefix}" ) else: raise ValueError(f"BigQuery requires GCS (gs://) staging location, got: {path_info['provider']}://") else: # Fall back to explicit storage_bucket configuration self.storage_bucket = config.get("storage_bucket") self.storage_prefix = config.get("storage_prefix") or "benchbox-data" # Query settings self.job_priority = config.get("job_priority") or "INTERACTIVE" # INTERACTIVE or BATCH # Disable result cache by default for accurate benchmarking # Can be overridden with query_cache=true or disable_result_cache=false if config.get("query_cache") is not None: self.query_cache = config.get("query_cache") elif config.get("disable_result_cache") is not None: self.query_cache = not config.get("disable_result_cache") else: # Default: disable cache for accurate benchmark results self.query_cache = False self.dry_run = config.get("dry_run") if config.get("dry_run") is not None else False self.maximum_bytes_billed = config.get("maximum_bytes_billed") # Table settings self.clustering_fields = config.get("clustering_fields") or [] self.partitioning_field = config.get("partitioning_field") if not self.project_id: from ..core.exceptions import ConfigurationError raise ConfigurationError( "BigQuery configuration requires project_id.\n" "Configure with one of:\n" " 1. CLI: benchbox platforms setup --platform bigquery\n" " 2. Environment variable: BIGQUERY_PROJECT\n" " 3. CLI option: --platform-option project_id=<your-project>\n" "Also ensure Google Cloud credentials are configured:\n" " - Set GOOGLE_APPLICATION_CREDENTIALS to service account JSON path\n" " - Or run 'gcloud auth application-default login'" )
[docs] @staticmethod def add_cli_arguments(parser: argparse.ArgumentParser) -> None: """Add BigQuery-specific CLI arguments.""" bq_group = parser.add_argument_group("BigQuery Arguments") bq_group.add_argument("--project-id", type=str, help="BigQuery project ID") bq_group.add_argument("--dataset-id", type=str, help="BigQuery dataset ID") bq_group.add_argument("--location", type=str, default="US", help="BigQuery dataset location") bq_group.add_argument("--credentials-path", type=str, help="Path to Google Cloud credentials file") bq_group.add_argument("--storage-bucket", type=str, help="GCS bucket for data loading")
[docs] @classmethod def from_config(cls, config: dict[str, Any]): """Create BigQuery adapter from unified configuration.""" from benchbox.utils.database_naming import generate_database_name adapter_config = {} very_verbose = config.get("very_verbose", False) # Auto-detect project ID if not provided if not config.get("project_id"): try: _, project_id = google.auth.default() if project_id: adapter_config["project_id"] = project_id if very_verbose: logging.info(f"Auto-detected BigQuery project ID: {project_id}") except google.auth.exceptions.DefaultCredentialsError: if very_verbose: logging.warning("Could not auto-detect BigQuery project ID. Please provide --project-id.") # Override with explicit config values if config.get("project_id"): adapter_config["project_id"] = config["project_id"] # Generate dataset name dataset_name = generate_database_name( benchmark_name=config["benchmark"], scale_factor=config["scale_factor"], platform="bigquery", tuning_config=config.get("tuning_config"), ) adapter_config["dataset_id"] = dataset_name # Pass through other relevant config for key in [ "location", "credentials_path", "storage_bucket", "tuning_config", "verbose_enabled", "very_verbose", ]: if key in config: adapter_config[key] = config[key] return cls(**adapter_config)
@property def platform_name(self) -> str: return "BigQuery"
[docs] def get_platform_info(self, connection: Any = None) -> dict[str, Any]: """Get BigQuery platform information. Captures comprehensive BigQuery configuration including: - Dataset location and region - Slot reservation information (best effort) - Project and billing configuration - Dataset metadata BigQuery doesn't expose a version number as it's a fully managed service. Gracefully degrades if permissions are insufficient for metadata queries. """ platform_info = { "platform_type": "bigquery", "platform_name": "BigQuery", "connection_mode": "remote", "cloud_provider": "GCP", "configuration": { "project_id": self.project_id, "dataset_id": self.dataset_id, "location": self.location, "storage_bucket": self.storage_bucket, "job_timeout": getattr(self, "job_timeout", None), "job_priority": self.job_priority, "query_cache_enabled": self.query_cache, }, } # Get client library version try: import google.cloud.bigquery platform_info["client_library_version"] = getattr(google.cloud.bigquery, "__version__", None) except (ImportError, AttributeError): platform_info["client_library_version"] = None # BigQuery doesn't expose server version info (fully managed service) platform_info["platform_version"] = None # Try to get extended metadata from BigQuery client if connection: try: # Get dataset metadata try: dataset_ref = f"{self.project_id}.{self.dataset_id}" dataset = connection.get_dataset(dataset_ref) platform_info["compute_configuration"] = { "dataset_location": dataset.location, "dataset_default_table_expiration_ms": dataset.default_table_expiration_ms, "dataset_default_partition_expiration_ms": dataset.default_partition_expiration_ms, "dataset_created": dataset.created.isoformat() if dataset.created else None, "dataset_modified": dataset.modified.isoformat() if dataset.modified else None, } self.logger.debug(f"Successfully captured BigQuery dataset metadata for {dataset_ref}") except Exception as e: self.logger.debug(f"Could not fetch BigQuery dataset metadata: {e}") # Try to get slot reservation information (enhanced with edition and autoscaling) reservation_info = None try: # Query INFORMATION_SCHEMA for reservation info if available query = f""" SELECT reservation_name, slot_capacity, ignore_idle_slots, edition, autoscale_max_slots, creation_time, update_time FROM `region-{self.location}`.INFORMATION_SCHEMA.RESERVATIONS WHERE project_id = @project_id LIMIT 1 """ from google.cloud.bigquery import ScalarQueryParameter job_config = bigquery.QueryJobConfig( query_parameters=[ScalarQueryParameter("project_id", "STRING", self.project_id)] ) query_job = connection.query(query, job_config=job_config) results = list(query_job.result()) if results: row = results[0] reservation_info = { "reservation_name": row.reservation_name if hasattr(row, "reservation_name") else None, "slot_capacity": int(row.slot_capacity) if hasattr(row, "slot_capacity") and row.slot_capacity else None, "ignore_idle_slots": row.ignore_idle_slots if hasattr(row, "ignore_idle_slots") else None, "edition": row.edition if hasattr(row, "edition") else None, "autoscale_max_slots": int(row.autoscale_max_slots) if hasattr(row, "autoscale_max_slots") and row.autoscale_max_slots else None, "creation_time": row.creation_time.isoformat() if hasattr(row, "creation_time") and row.creation_time else None, "update_time": row.update_time.isoformat() if hasattr(row, "update_time") and row.update_time else None, } self.logger.debug( f"Successfully captured BigQuery reservation info: {reservation_info['reservation_name']}" ) except Exception as e: self.logger.debug( f"Could not fetch BigQuery reservation info (insufficient permissions or not configured): {e}" ) # Try to get capacity commitment information commitment_info = None try: commitment_query = f""" SELECT commitment_id, slot_count, commitment_plan, state, renewal_plan, commitment_start_time, commitment_end_time FROM `region-{self.location}`.INFORMATION_SCHEMA.CAPACITY_COMMITMENTS WHERE project_id = @project_id AND state = 'ACTIVE' ORDER BY commitment_start_time DESC LIMIT 1 """ job_config = bigquery.QueryJobConfig( query_parameters=[ScalarQueryParameter("project_id", "STRING", self.project_id)] ) commitment_job = connection.query(commitment_query, job_config=job_config) commitment_results = list(commitment_job.result()) if commitment_results: row = commitment_results[0] commitment_info = { "commitment_id": row.commitment_id if hasattr(row, "commitment_id") else None, "slot_count": int(row.slot_count) if hasattr(row, "slot_count") and row.slot_count else None, "commitment_plan": row.commitment_plan if hasattr(row, "commitment_plan") else None, "state": row.state if hasattr(row, "state") else None, "renewal_plan": row.renewal_plan if hasattr(row, "renewal_plan") else None, "commitment_start_time": row.commitment_start_time.isoformat() if hasattr(row, "commitment_start_time") and row.commitment_start_time else None, "commitment_end_time": row.commitment_end_time.isoformat() if hasattr(row, "commitment_end_time") and row.commitment_end_time else None, } self.logger.debug( f"Successfully captured capacity commitment: {commitment_info['commitment_plan']}" ) except Exception as e: error_msg = str(e).lower() if "permission" in error_msg or "access denied" in error_msg: self.logger.warning( f"Unable to query capacity commitments (insufficient permissions): {e}. " "Grant bigquery.capacityCommitments.list permission for complete platform metadata." ) else: self.logger.debug(f"Could not fetch capacity commitment info: {e}") # Try to get reservation assignment information assignment_info = None try: assignment_query = f""" SELECT assignment_id, assignee_id, assignee_type, job_type, reservation_name FROM `region-{self.location}`.INFORMATION_SCHEMA.ASSIGNMENTS_BY_PROJECT WHERE assignee_id = @project_id LIMIT 1 """ job_config = bigquery.QueryJobConfig( query_parameters=[ScalarQueryParameter("project_id", "STRING", self.project_id)] ) assignment_job = connection.query(assignment_query, job_config=job_config) assignment_results = list(assignment_job.result()) if assignment_results: row = assignment_results[0] assignment_info = { "assignment_id": row.assignment_id if hasattr(row, "assignment_id") else None, "assignee_type": row.assignee_type if hasattr(row, "assignee_type") else None, "job_type": row.job_type if hasattr(row, "job_type") else None, } self.logger.debug( f"Successfully captured reservation assignment: {assignment_info['assignment_id']}" ) except Exception as e: error_msg = str(e).lower() if "permission" in error_msg or "access denied" in error_msg: self.logger.warning( f"Unable to query reservation assignments (insufficient permissions): {e}. " "Grant bigquery.reservationAssignments.list permission for complete platform metadata." ) else: self.logger.debug(f"Could not fetch reservation assignment info: {e}") # Determine granular pricing model and edition pricing_model = "on-demand" edition = "ON_DEMAND" if reservation_info: # Extract edition from reservation edition = reservation_info.get("edition") or "STANDARD" # Determine pricing model from commitment plan if commitment_info: commitment_plan = commitment_info.get("commitment_plan", "") if commitment_plan == "FLEX": pricing_model = "flex-slots" elif commitment_plan == "MONTHLY": pricing_model = "monthly-commitment" elif commitment_plan == "ANNUAL": pricing_model = "annual-commitment" elif commitment_plan == "THREE_YEAR": pricing_model = "three-year-commitment" else: # Has reservation but no recognized commitment pricing_model = "flat-rate" else: # Has reservation but no commitment info (legacy or couldn't query) pricing_model = "flat-rate" else: # No reservation = on-demand pricing_model = "on-demand" edition = "ON_DEMAND" self.logger.debug(f"Detected BigQuery pricing model: {pricing_model}, edition: {edition}") # Update compute_configuration with hybrid structure (flat key fields + nested details) if "compute_configuration" not in platform_info: platform_info["compute_configuration"] = {} # Add key fields (flat for easy access) platform_info["compute_configuration"]["pricing_model"] = pricing_model platform_info["compute_configuration"]["edition"] = edition platform_info["compute_configuration"]["slot_capacity"] = ( reservation_info.get("slot_capacity") if reservation_info else None ) platform_info["compute_configuration"]["autoscale_max_slots"] = ( reservation_info.get("autoscale_max_slots") if reservation_info else None ) # Add detailed reservation info (nested) if reservation_info: platform_info["compute_configuration"]["reservation_details"] = { "name": reservation_info.get("reservation_name"), "slot_capacity": reservation_info.get("slot_capacity"), "ignore_idle_slots": reservation_info.get("ignore_idle_slots"), "autoscale_max_slots": reservation_info.get("autoscale_max_slots"), "creation_time": reservation_info.get("creation_time"), "update_time": reservation_info.get("update_time"), } # Add capacity commitment info (nested) if commitment_info: platform_info["compute_configuration"]["capacity_commitment"] = commitment_info # Add assignment info (nested) if assignment_info: platform_info["compute_configuration"]["assignment"] = assignment_info except Exception as e: self.logger.debug(f"Error collecting BigQuery platform info: {e}") return platform_info
[docs] def get_target_dialect(self) -> str: """Return the target SQL dialect for BigQuery.""" return "bigquery"
def _get_connection_params(self, **connection_config) -> dict[str, Any]: """Get standardized connection parameters.""" return { "project_id": connection_config.get("project_id", self.project_id), "location": connection_config.get("location", self.location), "credentials_path": connection_config.get("credentials_path", self.credentials_path), } def _load_credentials(self, credentials_path: str | None) -> Any: """Load Google Cloud credentials from service account file. This method loads credentials directly from the file without setting environment variables, which is more secure as it avoids credential exposure via environment inspection. Args: credentials_path: Path to service account JSON file, or None for default Returns: Credentials object suitable for BigQuery/Storage clients """ if credentials_path: return service_account.Credentials.from_service_account_file(credentials_path) # Fall back to default credentials (ADC) credentials, _ = google_auth.default() return credentials def _create_admin_client(self, **connection_config) -> Any: """Create BigQuery client for admin operations.""" params = self._get_connection_params(**connection_config) credentials = self._load_credentials(params["credentials_path"]) return bigquery.Client( project=params["project_id"], location=params["location"], credentials=credentials, )
[docs] def check_server_database_exists(self, **connection_config) -> bool: """Check if dataset exists in BigQuery project.""" try: client = self._create_admin_client(**connection_config) dataset_id = connection_config.get("dataset", self.dataset_id) # Check if dataset exists datasets = list(client.list_datasets()) dataset_names = [d.dataset_id for d in datasets] return dataset_id in dataset_names except Exception: # If we can't connect or check, assume dataset doesn't exist return False
[docs] def drop_database(self, **connection_config) -> None: """Drop dataset in BigQuery project.""" try: client = self._create_admin_client(**connection_config) dataset_id = connection_config.get("dataset", self.dataset_id) # Create dataset reference dataset_ref = client.dataset(dataset_id) # Drop dataset and all its tables client.delete_dataset(dataset_ref, delete_contents=True, not_found_ok=True) except Exception as e: raise RuntimeError(f"Failed to drop BigQuery dataset {dataset_id}: {e}")
def _validate_database_compatibility(self, **connection_config): """Validate database compatibility with BigQuery-specific empty table detection. Extends base validation to add fast empty table detection using BigQuery's table.num_rows metadata property. This catches cases where tables exist but have no data (usually from failed previous loads). Args: **connection_config: Connection configuration Returns: DatabaseValidationResult with compatibility information """ # First run the standard validation from benchbox.platforms.base.validation import DatabaseValidator validator = DatabaseValidator(adapter=self, connection_config=connection_config) result = validator.validate() # If validation already failed, no need for additional checks if not result.is_valid: return result # BigQuery-specific check: detect empty tables using fast num_rows API # This is more comprehensive than the standard row count validation which # only samples 2 tables for performance. BigQuery's num_rows is metadata # and doesn't require running queries, so we can check all tables quickly. try: client = self._create_admin_client(**connection_config) dataset_ref = client.dataset(self.dataset_id, project=self.project_id) try: tables = list(client.list_tables(dataset_ref)) except Exception as e: # If we can't list tables, validation already failed in base validator self.log_very_verbose(f"Could not list tables for empty table check: {e}") return result if not tables: # No tables means database is empty - base validator should have caught this return result # Count empty tables using fast metadata API empty_count = 0 empty_tables = [] for table_info in tables: try: table = client.get_table(table_info.reference) if table.num_rows == 0: empty_count += 1 empty_tables.append(table.table_id) except Exception as e: self.log_very_verbose(f"Failed to check table {table_info.table_id}: {e}") # If more than half the tables are empty, mark database as invalid # This indicates a failed previous load where schema was created but data loading failed if empty_count > len(tables) / 2: self.log_verbose( f"Found {empty_count}/{len(tables)} empty tables - database appears to have failed data load" ) result.issues.append( f"Empty tables detected: {empty_count}/{len(tables)} tables have no rows " f"(indicates failed previous load)" ) result.is_valid = False result.can_reuse = False except Exception as e: # Don't fail validation if empty table check fails - just log it self.log_very_verbose(f"Empty table check failed: {e}") return result def _cleanup_empty_tables_if_needed(self, client) -> None: """Fallback cleanup for empty tables (defense-in-depth). This is a fallback mechanism that runs after connection is created. The primary empty table detection now happens during _validate_database_compatibility(), which prevents the database from being marked as reusable in the first place. This method is kept as defense-in-depth in case validation is bypassed or tables become empty between validation and connection. If more than half the tables are empty, it drops all empty tables and forces schema/data recreation by setting database_was_reused = False. Args: client: BigQuery client connection """ try: self.logger.debug(f"Starting empty table cleanup check for dataset {self.dataset_id}") dataset_ref = client.dataset(self.dataset_id, project=self.project_id) self.logger.debug(f"Listing tables in dataset {self.dataset_id}") tables = list(client.list_tables(dataset_ref)) self.logger.debug(f"Found {len(tables)} tables in dataset") if not tables: # No tables exist - nothing to clean up self.logger.debug("No tables found - nothing to clean up") return empty_count = 0 empty_tables = [] # Check each table for emptiness self.logger.debug(f"Checking {len(tables)} tables for empty rows") for table_info in tables: try: table = client.get_table(table_info.reference) if table.num_rows == 0: empty_count += 1 empty_tables.append(table.table_id) self.logger.debug(f"Table {table.table_id} is empty (0 rows)") except Exception as e: self.logger.warning(f"Failed to check table {table_info.table_id}: {e}") self.logger.debug(f"Empty table count: {empty_count}/{len(tables)}") # If more than half the tables are empty, likely a failed load - drop them all if empty_count > len(tables) / 2: self.logger.warning( f"Found {empty_count}/{len(tables)} empty tables - cleaning up failed previous load" ) # Drop all empty tables for table_id in empty_tables: try: table_ref = dataset_ref.table(table_id) client.delete_table(table_ref, not_found_ok=True) self.logger.info(f"Dropped empty table: {table_id}") except Exception as e: self.logger.warning(f"Failed to drop empty table {table_id}: {e}") # Mark database as NOT reused so schema and data will be loaded self.database_was_reused = False self.logger.info(f"Dropped {len(empty_tables)} empty tables - forcing schema and data recreation") elif empty_count > 0: # Some tables are empty but not majority - just log it self.logger.info( f"Found {empty_count} empty tables (out of {len(tables)} total) - not enough to trigger cleanup" ) else: self.logger.debug("No empty tables found") except Exception as e: # Don't fail if cleanup check fails - just log and continue import traceback self.logger.error(f"Empty table cleanup check failed: {e}") self.logger.error(f"Traceback: {traceback.format_exc()}")
[docs] def create_connection(self, **connection_config) -> Any: """Create optimized BigQuery client connection.""" self.log_operation_start("BigQuery connection") # Handle existing database using base class method self.handle_existing_database(**connection_config) # Get connection parameters params = self._get_connection_params(**connection_config) self.log_very_verbose( f"BigQuery connection params: project={params.get('project_id')}, location={params.get('location')}" ) try: # Load credentials securely (without environment variable exposure) credentials = self._load_credentials(params["credentials_path"]) # Create BigQuery client client = bigquery.Client( project=params["project_id"], location=params["location"], credentials=credentials, ) # Test connection query = "SELECT 1 as test" query_job = client.query(query) list(query_job.result()) self.logger.info(f"Connected to BigQuery project: {params['project_id']}") # If database was reused, check for and clean up empty tables from failed previous loads self.logger.debug( f"Checking if cleanup needed: database_was_reused={getattr(self, 'database_was_reused', False)}" ) if getattr(self, "database_was_reused", False): self.logger.info("Database was reused - checking for empty tables from failed previous loads") self._cleanup_empty_tables_if_needed(client) # Storage client will be created lazily when needed (see load_data) self.log_operation_complete("BigQuery connection", details=f"Connected to project {params['project_id']}") return client except Exception as e: self.logger.error(f"Failed to connect to BigQuery: {e}") raise
def _get_existing_tables(self, connection: Any) -> list[str]: """Get list of existing tables in BigQuery dataset. Returns table names in lowercase for case-insensitive comparison. BigQuery stores table names in the case they were created (usually uppercase for TPC benchmarks), but we normalize to lowercase for validation. """ try: dataset_ref = connection.dataset(self.dataset_id, project=self.project_id) tables = list(connection.list_tables(dataset_ref)) # Normalize to lowercase since BigQuery is case-insensitive return [table.table_id.lower() for table in tables] except Exception as e: self.log_very_verbose(f"Failed to list tables: {e}") return [] def _validate_data_integrity( self, benchmark, connection: Any, table_stats: dict[str, int] ) -> tuple[str, dict[str, Any]]: """Validate data integrity using BigQuery Client API. BigQuery Client doesn't support cursor pattern (connection.cursor()) like traditional databases. Instead, it uses connection.query() for SQL execution. This override prevents "'Client' object has no attribute 'cursor'" errors during database compatibility validation. Args: benchmark: Benchmark instance connection: BigQuery Client table_stats: Dictionary of table names to row counts Returns: Tuple of (status, validation_details) """ validation_details = {} try: accessible_tables = [] inaccessible_tables = [] for table_name in table_stats: try: # BigQuery stores tables in uppercase for TPC benchmarks table_upper = table_name.upper() # Use BigQuery's query API instead of cursor pattern query = f"SELECT 1 FROM `{self.project_id}.{self.dataset_id}.{table_upper}` LIMIT 1" query_job = connection.query(query) list(query_job.result()) # Execute query to verify table is accessible accessible_tables.append(table_name) except Exception as e: self.log_very_verbose(f"Table {table_name} inaccessible: {e}") inaccessible_tables.append(table_name) if inaccessible_tables: validation_details["inaccessible_tables"] = inaccessible_tables validation_details["constraints_enabled"] = False return "FAILED", validation_details else: validation_details["accessible_tables"] = accessible_tables validation_details["constraints_enabled"] = True return "PASSED", validation_details except Exception as e: validation_details["constraints_enabled"] = False validation_details["integrity_error"] = str(e) return "FAILED", validation_details
[docs] def get_table_row_count(self, connection: Any, table: str) -> int: """Get row count using BigQuery Client API. Overrides base implementation that uses cursor pattern. BigQuery Client doesn't have .cursor() method, so we use .query() instead. Args: connection: BigQuery Client table: Table name Returns: Row count as integer, or 0 if unable to determine """ try: # BigQuery stores tables in uppercase for TPC benchmarks table_upper = table.upper() # Use BigQuery's query API instead of cursor pattern query = f"SELECT COUNT(*) FROM `{self.project_id}.{self.dataset_id}.{table_upper}`" query_job = connection.query(query) result = list(query_job.result()) return result[0][0] if result else 0 except Exception as e: self.log_very_verbose(f"Could not get row count for {table}: {e}") return 0
[docs] def create_schema(self, benchmark, connection: Any) -> float: """Create schema using BigQuery dataset and tables.""" start_time = time.time() try: # Create dataset if it doesn't exist dataset_ref = connection.dataset(self.dataset_id) try: connection.get_dataset(dataset_ref) self.logger.info(f"Dataset {self.dataset_id} already exists") except NotFound: dataset = bigquery.Dataset(dataset_ref) dataset.location = self.location dataset.description = ( f"BenchBox benchmark data for {benchmark._name if hasattr(benchmark, '_name') else 'benchmark'}" ) dataset = connection.create_dataset(dataset) self.logger.info(f"Created dataset {self.dataset_id}") # Use common schema creation helper schema_sql = self._create_schema_with_tuning(benchmark, source_dialect="duckdb") # Split schema into individual statements and execute statements = [stmt.strip() for stmt in schema_sql.split(";") if stmt.strip()] for statement in statements: # Convert to BigQuery table definition bq_statement = self._convert_to_bigquery_table(statement) # Execute via query job query_job = connection.query(bq_statement) query_job.result() # Wait for completion self.logger.debug(f"Executed schema statement: {bq_statement[:100]}...") self.logger.info("Schema created") except Exception as e: self.logger.error(f"Schema creation failed: {e}") raise return time.time() - start_time
[docs] def load_data( self, benchmark, connection: Any, data_dir: Path ) -> tuple[dict[str, int], float, dict[str, Any] | None]: """Load data using BigQuery efficient loading via Cloud Storage.""" logger = logging.getLogger(__name__) logger.debug(f"Starting data loading for benchmark: {benchmark.__class__.__name__}") logger.debug(f"Data directory: {data_dir}") # Check if using cloud storage if is_cloud_path(str(data_dir)): path_info = get_cloud_path_info(str(data_dir)) logger.info(f"Loading data from cloud storage: {path_info['provider']} bucket '{path_info['bucket']}'") print(f" Loading data from {path_info['provider']} cloud storage") start_time = time.time() table_stats = {} try: # Get data files from benchmark or manifest fallback if hasattr(benchmark, "tables") and benchmark.tables: data_files = benchmark.tables else: # Manifest fallback data_files = None try: manifest_path = Path(data_dir) / "_datagen_manifest.json" if manifest_path.exists(): with open(manifest_path) as f: manifest = json.load(f) tables = manifest.get("tables") or {} mapping = {} for table, entries in tables.items(): if entries: # Collect ALL chunk files, not just the first one chunk_paths = [] for entry in entries: rel = entry.get("path") if rel: chunk_paths.append(Path(data_dir) / rel) if chunk_paths: mapping[table] = chunk_paths if mapping: data_files = mapping logger.debug("Using data files from _datagen_manifest.json") except Exception as e: logger.debug(f"Manifest fallback failed: {e}") if not data_files: # No data files available - benchmark should have generated data first raise ValueError("No data files found. Ensure benchmark.generate_data() was called first.") # Check for incompatible compression format (BigQuery only supports gzip or uncompressed for CSV files) for table_name, file_paths in data_files.items(): if not isinstance(file_paths, list): file_paths = [file_paths] for file_path in file_paths: if Path(file_path).suffix == ".zst": benchmark_name = getattr(benchmark, "name", "unknown") scale_factor = getattr(benchmark, "scale_factor", "unknown") raise ValueError( f"\n❌ Incompatible data compression detected\n\n" f"BigQuery does not support Zstd (.zst) compression for CSV file loading.\n" f"Found Zstd file: {Path(file_path).name}\n\n" f"To fix this, regenerate the data with gzip compression:\n\n" f" # Remove existing incompatible data\n" f" rm -rf {benchmark.data_dir}\n\n" f" # Regenerate with gzip compression\n" f" benchbox run --platform bigquery --benchmark {benchmark_name} " f"--scale {scale_factor} --compression-type gzip\n\n" f"Or use uncompressed data (larger files, slower uploads):\n\n" f" benchbox run --platform bigquery --benchmark {benchmark_name} " f"--scale {scale_factor} --no-compression\n" ) # Upload files to Cloud Storage if bucket is configured if self.storage_bucket: # Create Storage client on-demand with same credentials as BigQuery client params = self._get_connection_params() credentials = self._load_credentials(params["credentials_path"]) storage_client = storage.Client(project=self.project_id, credentials=credentials) bucket = storage_client.bucket(self.storage_bucket) # Load data for each table (handle multi-chunk files) for table_name, file_paths in data_files.items(): # Normalize to list (data resolver should always return lists now) if not isinstance(file_paths, list): file_paths = [file_paths] # Filter out non-existent or empty files valid_files = [] for file_path in file_paths: if is_cloud_path(str(file_path)): # For cloud paths, trust the generator created the file valid_files.append(file_path) else: file_path = Path(file_path) if file_path.exists() and file_path.stat().st_size > 0: valid_files.append(file_path) if not valid_files: self.logger.warning(f"Skipping {table_name} - no valid data files") table_stats[table_name] = 0 continue logger.debug(f"Loading {table_name} from {len(valid_files)} file(s)") try: self.log_verbose(f"Loading data for table: {table_name}") load_start = time.time() table_name_upper = table_name.upper() table_ref = connection.dataset(self.dataset_id).table(table_name_upper) # Load each file chunk for file_idx, file_path in enumerate(valid_files): file_path = Path(file_path) chunk_info = f" (chunk {file_idx + 1}/{len(valid_files)})" if len(valid_files) > 1 else "" # Detect file format to determine delimiter # TPC-H uses .tbl (pipe-delimited), TPC-DS uses .dat (pipe-delimited) # Use substring check to handle chunked files like customer.tbl.1 or customer.tbl.1.zst file_str = str(file_path.name) delimiter = "|" if ".tbl" in file_str or ".dat" in file_str else "," # Upload file directly to Cloud Storage with original compression and format # BigQuery supports compressed files and any delimiter natively # Preserve file extension so BigQuery can detect compression blob_name = f"{self.storage_prefix}/{table_name}_{file_idx}{file_path.suffix}" self.log_very_verbose(f"Uploading to Cloud Storage{chunk_info}: {blob_name}") blob = bucket.blob(blob_name) blob.upload_from_filename(str(file_path)) # Load from Cloud Storage to BigQuery # First file truncates table, subsequent files append write_disposition = ( bigquery.WriteDisposition.WRITE_TRUNCATE if file_idx == 0 else bigquery.WriteDisposition.WRITE_APPEND ) job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.CSV, skip_leading_rows=0, autodetect=False, # Use existing schema field_delimiter=delimiter, allow_quoted_newlines=True, write_disposition=write_disposition, ) # Create load job uri = f"gs://{self.storage_bucket}/{blob_name}" load_job = connection.load_table_from_uri(uri, table_ref, job_config=job_config) load_job.result() # Wait for completion # Get final row count after loading all chunks query = f"SELECT COUNT(*) FROM `{self.project_id}.{self.dataset_id}.{table_name_upper}`" query_job = connection.query(query) result = list(query_job.result()) row_count = result[0][0] if result else 0 table_stats[table_name_upper] = row_count load_time = time.time() - load_start chunk_info = f" from {len(valid_files)} file(s)" if len(valid_files) > 1 else "" self.logger.info( f"✅ Loaded {row_count:,} rows into {table_name_upper}{chunk_info} in {load_time:.2f}s" ) except Exception as e: self.logger.error(f"Failed to load {table_name}: {str(e)[:100]}...") table_stats[table_name.upper()] = 0 else: # Direct loading without Cloud Storage (less efficient) self.logger.warning("No Cloud Storage bucket configured, using direct loading") for table_name, file_paths in data_files.items(): # Normalize to list (handle both single paths and lists for TPC-H vs TPC-DS) if not isinstance(file_paths, list): file_paths = [file_paths] # Filter valid files valid_files = [] for file_path in file_paths: file_path = Path(file_path) if file_path.exists() and file_path.stat().st_size > 0: valid_files.append(file_path) if not valid_files: self.logger.warning(f"Skipping {table_name} - no valid data files") table_stats[table_name.upper()] = 0 continue try: self.log_verbose(f"Direct loading data for table: {table_name}") load_start = time.time() table_name_upper = table_name.upper() # Load directly from local file(s) table_ref = connection.dataset(self.dataset_id).table(table_name_upper) # Load each chunk file for file_idx, file_path in enumerate(valid_files): chunk_info = f" (chunk {file_idx + 1}/{len(valid_files)})" if len(valid_files) > 1 else "" self.log_very_verbose(f"Loading {table_name}{chunk_info} from {file_path.name}") # Detect delimiter from filename (handle chunked files like customer.tbl.1) file_str = str(file_path.name) delimiter = "|" if ".tbl" in file_str or ".dat" in file_str else "," # Use WRITE_APPEND for subsequent chunks, WRITE_TRUNCATE for first write_disposition = ( bigquery.WriteDisposition.WRITE_TRUNCATE if file_idx == 0 else bigquery.WriteDisposition.WRITE_APPEND ) job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.CSV, # TPC-H uses .tbl files, TPC-DS uses .dat files - both are pipe-delimited field_delimiter=delimiter, skip_leading_rows=0, autodetect=False, write_disposition=write_disposition, ) with open(file_path, "rb") as source_file: load_job = connection.load_table_from_file( source_file, table_ref, job_config=job_config ) load_job.result() # Get final row count after all chunks loaded query = f"SELECT COUNT(*) FROM `{self.project_id}.{self.dataset_id}.{table_name_upper}`" query_job = connection.query(query) result = list(query_job.result()) row_count = result[0][0] if result else 0 table_stats[table_name_upper] = row_count load_time = time.time() - load_start chunk_info = f" from {len(valid_files)} file(s)" if len(valid_files) > 1 else "" self.logger.info( f"✅ Loaded {row_count:,} rows into {table_name_upper}{chunk_info} in {load_time:.2f}s" ) except Exception as e: self.logger.error(f"Failed to load {table_name}: {str(e)[:100]}...") table_stats[table_name.upper()] = 0 total_time = time.time() - start_time total_rows = sum(table_stats.values()) self.logger.info(f"✅ Loaded {total_rows:,} total rows in {total_time:.2f}s") except Exception as e: self.logger.error(f"Data loading failed: {e}") raise # BigQuery doesn't provide detailed per-table timings yet return table_stats, total_time, None
[docs] def configure_for_benchmark(self, connection: Any, benchmark_type: str) -> None: """Apply BigQuery-specific optimizations based on benchmark type.""" # Set default query job configuration job_config = bigquery.QueryJobConfig( priority=getattr( bigquery.QueryPriority, self.job_priority, bigquery.QueryPriority.INTERACTIVE, ), use_query_cache=self.query_cache, dry_run=self.dry_run, ) if self.maximum_bytes_billed: job_config.maximum_bytes_billed = self.maximum_bytes_billed # Set default dataset to avoid requiring fully-qualified table names if self.project_id and self.dataset_id: job_config.default_dataset = f"{self.project_id}.{self.dataset_id}" if benchmark_type.lower() in ["olap", "analytics", "tpch", "tpcds"]: # OLAP optimizations job_config.use_legacy_sql = False # Use Standard SQL job_config.flatten_results = False # Keep nested/repeated fields # Store job config for use in execute_query connection._default_job_config = job_config
[docs] def execute_query( self, connection: Any, query: str, query_id: str, benchmark_type: str | None = None, scale_factor: float | None = None, validate_row_count: bool = True, stream_id: int | None = None, ) -> dict[str, Any]: """Execute query with detailed timing and cost tracking.""" start_time = time.time() try: # Replace table references with fully qualified names # Note: Query dialect translation is now handled automatically by the base adapter # Skip qualification if query has backtick-quoted identifiers (from sqlglot) # because default_dataset handles unqualified names and regex breaks backticks if "`" in query: # Query processed by sqlglot with identify=True # Normalize lowercase table names to UPPERCASE to match TPC-DS schema # (BigQuery backtick-quoted identifiers are case-sensitive) translated_query = self._normalize_table_names_case(query) else: # Non-translated queries (e.g., raw TPC-H) need explicit qualification translated_query = self._qualify_table_names(query) # Use default job config if available job_config = getattr(connection, "_default_job_config", bigquery.QueryJobConfig()) # Execute the query query_job = connection.query(translated_query, job_config=job_config) result = list(query_job.result()) execution_time = time.time() - start_time actual_row_count = len(result) if result else 0 # Get job statistics job_stats = { "bytes_processed": query_job.total_bytes_processed, "bytes_billed": query_job.total_bytes_billed, "slot_ms": query_job.slot_millis, "creation_time": query_job.created.isoformat() if query_job.created else None, "start_time": query_job.started.isoformat() if query_job.started else None, "end_time": query_job.ended.isoformat() if query_job.ended else None, } # Validate row count if enabled and benchmark type is provided validation_result = None if validate_row_count and benchmark_type: from benchbox.core.validation.query_validation import QueryValidator validator = QueryValidator() validation_result = validator.validate_query_result( benchmark_type=benchmark_type, query_id=query_id, actual_row_count=actual_row_count, scale_factor=scale_factor, stream_id=stream_id, ) # Log validation result if validation_result.warning_message: self.log_verbose(f"Row count validation: {validation_result.warning_message}") elif not validation_result.is_valid: self.log_verbose(f"Row count validation FAILED: {validation_result.error_message}") else: self.log_very_verbose( f"Row count validation PASSED: {actual_row_count} rows " f"(expected: {validation_result.expected_row_count})" ) # Use base helper to build result with consistent validation field mapping result_dict = self._build_query_result_with_validation( query_id=query_id, execution_time=execution_time, actual_row_count=actual_row_count, first_row=result[0] if result else None, validation_result=validation_result, ) # Include BigQuery-specific fields result_dict["translated_query"] = translated_query if translated_query != query else None result_dict["job_statistics"] = job_stats result_dict["job_id"] = query_job.job_id # Map job_statistics to resource_usage for cost calculation result_dict["resource_usage"] = job_stats return result_dict except Exception as e: execution_time = time.time() - start_time return { "query_id": query_id, "status": "FAILED", "execution_time": execution_time, "rows_returned": 0, "error": str(e), "error_type": type(e).__name__, }
def _convert_to_bigquery_table(self, statement: str) -> str: """Convert CREATE TABLE statement to BigQuery format. Makes tables idempotent by using CREATE OR REPLACE TABLE. """ if not statement.upper().startswith("CREATE TABLE"): return statement # Ensure idempotency with OR REPLACE (defense-in-depth) if "CREATE TABLE" in statement and "OR REPLACE" not in statement.upper(): statement = statement.replace("CREATE TABLE", "CREATE OR REPLACE TABLE", 1) # Include dataset qualification if f"{self.dataset_id}." not in statement: statement = statement.replace( "CREATE OR REPLACE TABLE ", f"CREATE OR REPLACE TABLE `{self.project_id}.{self.dataset_id}." ) statement = statement.replace(" (", "` (") # Include partitioning and clustering if configured if "PARTITION BY" not in statement.upper() and self.partitioning_field: statement += f" PARTITION BY DATE({self.partitioning_field})" if "CLUSTER BY" not in statement.upper() and self.clustering_fields: clustering = ", ".join(self.clustering_fields) statement += f" CLUSTER BY {clustering}" return statement def _qualify_table_names(self, query: str) -> str: """Add full qualification to table names in query. Note: Only used for non-translated queries (e.g., raw TPC-H queries without sqlglot). Queries processed by sqlglot with identify=True should skip this method to avoid conflicts with backtick-quoted identifiers. When default_dataset is configured, BigQuery automatically resolves unqualified table names. """ # Simple table name qualification - could be with proper SQL parsing table_names = [ "REGION", "NATION", "CUSTOMER", "SUPPLIER", "PART", "PARTSUPP", "ORDERS", "LINEITEM", ] for table_name in table_names: # Replace unqualified table names qualified_name = f"`{self.project_id}.{self.dataset_id}.{table_name}`" # Simple replacement - in production would use proper SQL parser import re pattern = rf"\b{table_name}\b" query = re.sub(pattern, qualified_name, query, flags=re.IGNORECASE) return query def _normalize_table_names_case(self, query: str) -> str: """Normalize backtick-quoted table names to UPPERCASE for case-sensitive matching. BigQuery stores TPC-DS tables in UPPERCASE (per schema), but sqlglot generates lowercase table names with backticks. Since backtick-quoted identifiers are case-sensitive in BigQuery, we need to normalize to UPPERCASE to match the schema. Only processes backtick-quoted identifiers to avoid affecting string literals. Args: query: SQL query with backtick-quoted identifiers Returns: Query with lowercase table names normalized to UPPERCASE """ import re # Pattern: backtick, lowercase word characters (table names), backtick # This safely matches table identifiers without affecting string literals pattern = r"`([a-z_][a-z0-9_]*)`" def uppercase_table(match: re.Match[str]) -> str: return f"`{match.group(1).upper()}`" return re.sub(pattern, uppercase_table, query) def _get_platform_metadata(self, connection: Any) -> dict[str, Any]: """Get BigQuery-specific metadata and system information.""" metadata = { "platform": self.platform_name, "project_id": self.project_id, "dataset_id": self.dataset_id, "location": self.location, "result_cache_enabled": self.query_cache, } try: # Get dataset information dataset_ref = connection.dataset(self.dataset_id) dataset = connection.get_dataset(dataset_ref) metadata["dataset_info"] = { "created": dataset.created.isoformat() if dataset.created else None, "modified": dataset.modified.isoformat() if dataset.modified else None, "location": dataset.location, "description": dataset.description, } # Get table list and sizes tables = list(connection.list_tables(dataset)) table_info = [] for table in tables: table_ref = dataset.table(table.table_id) table_obj = connection.get_table(table_ref) table_info.append( { "table_id": table.table_id, "num_rows": table_obj.num_rows, "num_bytes": table_obj.num_bytes, "created": table_obj.created.isoformat() if table_obj.created else None, "modified": table_obj.modified.isoformat() if table_obj.modified else None, } ) metadata["tables"] = table_info # Get project information try: # This requires additional permissions project = connection.get_project(self.project_id) metadata["project_info"] = { "display_name": project.display_name, "project_number": project.project_number, } except Exception: pass # Skip if no permissions except Exception as e: metadata["metadata_error"] = str(e) return metadata
[docs] def get_query_plan(self, connection: Any, query: str) -> dict[str, Any]: """Get query execution plan for analysis.""" try: # Use dry run to get query plan without execution job_config = bigquery.QueryJobConfig(dry_run=True) # Note: Query dialect translation is now handled automatically by the base adapter qualified_query = self._qualify_table_names(query) query_job = connection.query(qualified_query, job_config=job_config) return { "bytes_processed": query_job.total_bytes_processed, "estimated_cost": query_job.total_bytes_processed / (1024**4) * 5, # Rough cost estimate "query_plan": "Dry run completed", "job_id": query_job.job_id, } except Exception as e: return {"error": str(e)}
[docs] def close_connection(self, connection: Any) -> None: """Close BigQuery connection. Handles credential refresh errors gracefully during connection cleanup. Suppresses all credential-related errors as they are non-fatal during cleanup. """ if not connection: return try: if hasattr(connection, "close"): connection.close() except Exception as e: # Suppress credential refresh errors during cleanup (anonymous credentials, expired tokens, etc.) # These are non-fatal - the connection is being closed anyway error_msg = str(e).lower() if any(keyword in error_msg for keyword in ["credential", "refresh", "anonymous", "auth", "token"]): # Silently suppress credential/auth errors during cleanup self.log_very_verbose(f"Credential cleanup warning (non-fatal, suppressed): {e}") else: self.logger.warning(f"Error closing connection: {e}") # Also try to clean up any transport/channel resources that might have credential issues try: if hasattr(connection, "_transport"): transport = connection._transport if hasattr(transport, "close"): transport.close() except Exception: # Silently ignore transport cleanup errors pass
[docs] def supports_tuning_type(self, tuning_type) -> bool: """Check if BigQuery supports a specific tuning type. BigQuery supports: - PARTITIONING: Via PARTITION BY clause (date/timestamp/integer columns) - CLUSTERING: Via CLUSTER BY clause (up to 4 columns) Args: tuning_type: The type of tuning to check support for Returns: True if the tuning type is supported by BigQuery """ # Import here to avoid circular imports try: from benchbox.core.tuning.interface import TuningType return tuning_type in {TuningType.PARTITIONING, TuningType.CLUSTERING} except ImportError: return False
[docs] def generate_tuning_clause(self, table_tuning) -> str: """Generate BigQuery-specific tuning clauses for CREATE TABLE statements. BigQuery supports: - PARTITION BY DATE(column), DATETIME_TRUNC(column, DAY), column (for date/integer) - CLUSTER BY column1, column2, ... (up to 4 columns) Args: table_tuning: The tuning configuration for the table Returns: SQL clause string to be appended to CREATE TABLE statement """ if not table_tuning or not table_tuning.has_any_tuning(): return "" clauses = [] try: # Import here to avoid circular imports from benchbox.core.tuning.interface import TuningType # Handle partitioning partition_columns = table_tuning.get_columns_by_type(TuningType.PARTITIONING) if partition_columns: # Sort by order and use first column for partitioning sorted_cols = sorted(partition_columns, key=lambda col: col.order) partition_col = sorted_cols[0] # BigQuery typically uses single column partitioning # Determine partition strategy based on column type col_type = partition_col.type.upper() if any(date_type in col_type for date_type in ["DATE", "TIMESTAMP", "DATETIME"]): if "DATE" in col_type: partition_clause = f"PARTITION BY {partition_col.name}" else: partition_clause = f"PARTITION BY DATE({partition_col.name})" elif "INT" in col_type: # Integer partitioning with range partition_clause = ( f"PARTITION BY RANGE_BUCKET({partition_col.name}, GENERATE_ARRAY(0, 1000000, 10000))" ) else: # Default date-based partitioning partition_clause = f"PARTITION BY DATE({partition_col.name})" clauses.append(partition_clause) # Handle clustering (up to 4 columns) cluster_columns = table_tuning.get_columns_by_type(TuningType.CLUSTERING) if cluster_columns: # Sort by order and take up to 4 columns sorted_cols = sorted(cluster_columns, key=lambda col: col.order) cluster_cols = sorted_cols[:4] # BigQuery limit of 4 clustering columns column_names = [col.name for col in cluster_cols] cluster_clause = f"CLUSTER BY {', '.join(column_names)}" clauses.append(cluster_clause) # Distribution and sorting not directly supported in BigQuery CREATE TABLE # but handled through partitioning and clustering except ImportError: # If tuning interface not available, return empty string pass return " ".join(clauses)
[docs] def apply_table_tunings(self, table_tuning, connection: Any) -> None: """Apply tuning configurations to a BigQuery table. BigQuery tuning approach: - PARTITIONING: Handled in CREATE TABLE via PARTITION BY - CLUSTERING: Handled in CREATE TABLE via CLUSTER BY - Additional optimization via table options Args: table_tuning: The tuning configuration to apply connection: BigQuery client connection Raises: ValueError: If the tuning configuration is invalid for BigQuery """ if not table_tuning or not table_tuning.has_any_tuning(): return table_name = table_tuning.table_name self.logger.info(f"Applying BigQuery tunings for table: {table_name}") try: # Import here to avoid circular imports from benchbox.core.tuning.interface import TuningType # BigQuery tuning is primarily handled at table creation time # Post-creation optimizations are limited # Get table reference dataset_ref = connection.dataset(self.dataset_id) table_ref = dataset_ref.table(table_name) try: table_obj = connection.get_table(table_ref) # Log current table configuration if table_obj.time_partitioning: self.logger.info(f"Table {table_name} has partitioning: {table_obj.time_partitioning.type_}") if table_obj.clustering_fields: self.logger.info(f"Table {table_name} has clustering: {table_obj.clustering_fields}") # Check if table needs to be recreated with new tuning partition_columns = table_tuning.get_columns_by_type(TuningType.PARTITIONING) cluster_columns = table_tuning.get_columns_by_type(TuningType.CLUSTERING) needs_recreation = False # Check if partitioning configuration changed if partition_columns and not table_obj.time_partitioning: needs_recreation = True self.logger.info(f"Table {table_name} needs recreation for partitioning") # Check if clustering configuration changed if cluster_columns: sorted_cols = sorted(cluster_columns, key=lambda col: col.order) desired_clustering = [col.name for col in sorted_cols[:4]] current_clustering = table_obj.clustering_fields or [] if desired_clustering != current_clustering: needs_recreation = True self.logger.info(f"Table {table_name} needs recreation for clustering") if needs_recreation: self.logger.warning( f"Table {table_name} configuration differs from desired tuning. " "Consider recreating the table with proper tuning configuration." ) except Exception as e: self.logger.warning(f"Could not verify table configuration for {table_name}: {e}") # Handle unsupported tuning types distribution_columns = table_tuning.get_columns_by_type(TuningType.DISTRIBUTION) if distribution_columns: self.logger.warning(f"Distribution tuning not directly supported in BigQuery for table: {table_name}") sorting_columns = table_tuning.get_columns_by_type(TuningType.SORTING) if sorting_columns: # In BigQuery, sorting is achieved through clustering sorted_cols = sorted(sorting_columns, key=lambda col: col.order) column_names = [col.name for col in sorted_cols] self.logger.info( f"Sorting in BigQuery achieved via clustering for table {table_name}: {', '.join(column_names)}" ) except ImportError: self.logger.warning("Tuning interface not available - skipping tuning application") except Exception as e: raise ValueError(f"Failed to apply tunings to BigQuery table {table_name}: {e}")
[docs] def apply_unified_tuning(self, unified_config: UnifiedTuningConfiguration, connection: Any) -> None: """Apply unified tuning configuration to BigQuery. Args: unified_config: Unified tuning configuration to apply connection: BigQuery connection """ if not unified_config: return # Apply constraint configurations self.apply_constraint_configuration(unified_config.primary_keys, unified_config.foreign_keys, connection) # Apply platform optimizations if unified_config.platform_optimizations: self.apply_platform_optimizations(unified_config.platform_optimizations, connection) # Apply table-level tunings for _table_name, table_tuning in unified_config.table_tunings.items(): self.apply_table_tunings(table_tuning, connection)
[docs] def apply_platform_optimizations(self, platform_config: PlatformOptimizationConfiguration, connection: Any) -> None: """Apply BigQuery-specific platform optimizations. Args: platform_config: Platform optimization configuration connection: BigQuery connection """ if not platform_config: return # BigQuery optimizations are mostly applied at query/job level # Store optimizations for use during query execution self.logger.info("BigQuery platform optimizations stored for query execution")
[docs] def apply_constraint_configuration( self, primary_key_config: PrimaryKeyConfiguration, foreign_key_config: ForeignKeyConfiguration, connection: Any, ) -> None: """Apply constraint configurations to BigQuery. Note: BigQuery has limited constraint support. Constraints are mainly for metadata/optimization. Args: primary_key_config: Primary key constraint configuration foreign_key_config: Foreign key constraint configuration connection: BigQuery connection """ # BigQuery constraints are applied at table creation time # This method is called after tables are created, so log the configurations if primary_key_config and primary_key_config.enabled: self.logger.info("Primary key constraints enabled for BigQuery (applied during table creation)") if foreign_key_config and foreign_key_config.enabled: self.logger.info("Foreign key constraints enabled for BigQuery (applied during table creation)")
# BigQuery doesn't support ALTER TABLE to add constraints after creation # So there's no additional work to do here def _build_bigquery_config( platform: str, options: dict[str, Any], overrides: dict[str, Any], info: Any, ) -> Any: """Build BigQuery database configuration with credential loading. This function loads saved credentials from the CredentialManager and merges them with CLI options and runtime overrides. Args: platform: Platform name (should be 'bigquery') options: CLI platform options from --platform-option flags overrides: Runtime overrides from orchestrator info: Platform info from registry Returns: DatabaseConfig with credentials loaded and platform-specific fields at top-level """ from benchbox.core.config import DatabaseConfig from benchbox.security.credentials import CredentialManager # Load saved credentials cred_manager = CredentialManager() saved_creds = cred_manager.get_platform_credentials("bigquery") or {} # Build merged options: saved_creds < options < overrides merged_options = {} merged_options.update(saved_creds) merged_options.update(options) merged_options.update(overrides) # Extract credential fields for DatabaseConfig name = info.display_name if info else "Google BigQuery" driver_package = info.driver_package if info else "google-cloud-bigquery" # Handle staging_root from orchestrator (overrides) or fall back to default_output_location staging_root = overrides.get("staging_root") if not staging_root: # Fall back to default_output_location from credentials/options default_output = merged_options.get("default_output_location") if default_output and isinstance(default_output, str) and default_output.startswith("gs://"): staging_root = default_output # Parse staging_root to extract storage_bucket and storage_prefix storage_bucket = merged_options.get("storage_bucket") storage_prefix = merged_options.get("storage_prefix") if staging_root: try: path_info = get_cloud_path_info(staging_root) if path_info and path_info.get("bucket"): storage_bucket = path_info["bucket"] storage_prefix = path_info.get("path", "") except Exception: # If parsing fails, fall back to merged_options values pass # Build config dict with platform-specific fields at top-level # This allows BigQueryAdapter.__init__() to access them via config.get() config_dict = { "type": "bigquery", "name": name, "options": merged_options or {}, # Ensure options is never None (Pydantic v2 uses None if explicitly passed) "driver_package": driver_package, "driver_version": overrides.get("driver_version") or options.get("driver_version"), "driver_auto_install": bool(overrides.get("driver_auto_install", options.get("driver_auto_install", False))), # Platform-specific fields at top-level (adapters expect these here) "project_id": merged_options.get("project_id"), "dataset_id": merged_options.get("dataset_id"), "location": merged_options.get("location"), "credentials_path": merged_options.get("credentials_path"), "staging_root": staging_root, # Pass through staging_root "storage_bucket": storage_bucket, # Use parsed bucket or fallback "storage_prefix": storage_prefix, # Use parsed prefix or fallback } return DatabaseConfig(**config_dict) # Register the config builder with the platform hook registry # This must happen when the module is imported try: from benchbox.cli.platform_hooks import PlatformHookRegistry PlatformHookRegistry.register_config_builder("bigquery", _build_bigquery_config) except ImportError: # Platform hooks may not be available in all contexts (e.g., core-only usage) pass