Snowflake Platform Adapter¶
The Snowflake adapter provides cloud-native data warehouse execution with elastic compute and automatic optimization.
Overview¶
Snowflake is a multi-cloud Data Cloud platform that provides:
Multi-cloud support - Available on AWS, Azure, and GCP
Storage-compute separation - Independent scaling of storage and compute
Elastic compute - Scale warehouses up/down
Multi-cluster warehouses - Concurrency scaling capabilities
Zero-copy cloning - Instant data cloning for testing
Time Travel - Query historical data (up to 90 days)
Micro-partitions - Self-optimizing data organization
Common use cases:
Multi-cloud analytics workloads
Variable workload patterns (auto-suspend/resume)
Multi-tenant benchmarking environments
Enterprise-scale data warehousing
Testing with per-second billing
Quick Start¶
Basic Configuration¶
from benchbox.tpch import TPCH
from benchbox.platforms.snowflake import SnowflakeAdapter
# Connect to Snowflake
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="benchbox_user",
password="secure_password_123",
warehouse="COMPUTE_WH",
database="BENCHBOX",
schema="PUBLIC"
)
# Run benchmark
benchmark = TPCH(scale_factor=1.0)
results = benchmark.run_with_platform(adapter)
print(f"Completed in {results.total_execution_time:.2f}s")
API Reference¶
SnowflakeAdapter Class¶
- class SnowflakeAdapter(**config)[source]¶
Bases:
PlatformAdapterSnowflake platform adapter with cloud data warehouse optimizations.
- __init__(**config)[source]¶
Initialize the platform adapter with configuration.
- Parameters:
**config – Platform-specific configuration options
- property platform_name: str¶
Return the name of this database platform.
Default implementation returns the class name. Concrete adapters may override to provide a user-friendly display name. Tests may instantiate lightweight mock adapters without overriding this property.
- get_platform_info(connection=None)[source]¶
Get Snowflake platform information.
Captures comprehensive Snowflake configuration including: - Snowflake version - Warehouse size and auto-suspend/resume settings - Multi-cluster warehouse configuration - Cloud provider and region - Account edition (best effort)
Gracefully degrades if permissions are insufficient for metadata queries.
- check_server_database_exists(**connection_config)[source]¶
Check if database exists in Snowflake account.
Also checks for existing schemas and tables, since they may exist from a previous run even if the database doesn’t formally exist at account level.
- load_data(benchmark, connection, data_dir)[source]¶
Load data using Snowflake PUT and COPY INTO commands.
- configure_for_benchmark(connection, benchmark_type)[source]¶
Apply Snowflake-specific optimizations based on benchmark type.
- validate_session_cache_control(connection)[source]¶
Validate that session-level cache control settings were successfully applied.
- Parameters:
connection (Any) – Active Snowflake database connection
- Returns:
validated: bool - Whether validation passed
cache_disabled: bool - Whether cache is actually disabled
settings: dict - Actual session settings
warnings: list[str] - Any validation warnings
errors: list[str] - Any validation errors
- Return type:
dict with
- Raises:
ConfigurationError – If cache control validation fails and strict_validation=True
- execute_query(connection, query, query_id, benchmark_type=None, scale_factor=None, validate_row_count=True, stream_id=None)[source]¶
Execute query with detailed timing and performance tracking.
- analyze_table(connection, table_name)[source]¶
Trigger table analysis for better query optimization.
- supports_tuning_type(tuning_type)[source]¶
Check if Snowflake supports a specific tuning type.
Snowflake supports: - CLUSTERING: Via CLUSTER BY clause and automatic clustering - PARTITIONING: Via micro-partitions (automatic) and manual clustering keys
- Parameters:
tuning_type – The type of tuning to check support for
- Returns:
True if the tuning type is supported by Snowflake
- Return type:
bool
- generate_tuning_clause(table_tuning)[source]¶
Generate Snowflake-specific tuning clauses for CREATE TABLE statements.
Snowflake supports: - CLUSTER BY (column1, column2, …) for clustering keys - Micro-partitions are automatic based on ingestion order and clustering
- Parameters:
table_tuning – The tuning configuration for the table
- Returns:
SQL clause string to be appended to CREATE TABLE statement
- Return type:
str
- apply_table_tunings(table_tuning, connection)[source]¶
Apply tuning configurations to a Snowflake table.
Snowflake tuning approach: - CLUSTERING: Handled via CLUSTER BY in CREATE TABLE or ALTER TABLE - PARTITIONING: Automatic micro-partitions with optional clustering keys - Automatic clustering can be enabled for maintenance
- Parameters:
table_tuning – The tuning configuration to apply
connection (Any) – Snowflake connection
- Raises:
ValueError – If the tuning configuration is invalid for Snowflake
- apply_unified_tuning(unified_config, connection)[source]¶
Apply unified tuning configuration to Snowflake.
- Parameters:
unified_config (UnifiedTuningConfiguration) – Unified tuning configuration to apply
connection (Any) – Snowflake connection
- apply_platform_optimizations(platform_config, connection)[source]¶
Apply Snowflake-specific platform optimizations.
Snowflake optimizations include: - Warehouse scaling and multi-cluster configuration - Query acceleration service settings - Result set caching configuration - Session-level optimization parameters
- Parameters:
platform_config (PlatformOptimizationConfiguration) – Platform optimization configuration
connection (Any) – Snowflake connection
- apply_constraint_configuration(primary_key_config, foreign_key_config, connection)[source]¶
Apply constraint configurations to Snowflake.
Note: Snowflake supports PRIMARY KEY and FOREIGN KEY constraints but they are not enforced (informational only). They are used for query optimization and must be applied during table creation time.
- Parameters:
primary_key_config (PrimaryKeyConfiguration) – Primary key constraint configuration
foreign_key_config (ForeignKeyConfiguration) – Foreign key constraint configuration
connection (Any) – Snowflake connection
Constructor Parameters¶
SnowflakeAdapter(
account: str,
username: str,
password: str,
warehouse: str = "COMPUTE_WH",
database: str = "BENCHBOX",
schema: str = "PUBLIC",
role: Optional[str] = None,
authenticator: str = "snowflake",
private_key_path: Optional[str] = None,
private_key_passphrase: Optional[str] = None,
warehouse_size: str = "MEDIUM",
auto_suspend: int = 300,
auto_resume: bool = True,
multi_cluster_warehouse: bool = False,
query_tag: str = "BenchBox",
timezone: str = "UTC",
file_format: str = "CSV",
compression: str = "AUTO"
)
Parameters:
Connection (Required):
account (str): Snowflake account identifier (e.g., “xy12345.us-east-1”)
username (str): Snowflake username
password (str): User password
warehouse (str): Virtual warehouse name. Default: “COMPUTE_WH”
database (str): Database name. Default: “BENCHBOX”
schema (str): Schema name. Default: “PUBLIC”
Authentication:
role (str, optional): Role to assume for the session
authenticator (str): Authentication method (“snowflake”, “oauth”, etc.). Default: “snowflake”
private_key_path (str, optional): Path to private key for key-pair authentication
private_key_passphrase (str, optional): Passphrase for encrypted private key
Warehouse Configuration:
warehouse_size (str): Warehouse size (X-SMALL, SMALL, MEDIUM, LARGE, X-LARGE, 2X-LARGE, etc.). Default: “MEDIUM”
auto_suspend (int): Auto-suspend timeout in seconds. Default: 300 (5 minutes)
auto_resume (bool): Enable automatic warehouse resume. Default: True
multi_cluster_warehouse (bool): Enable multi-cluster configuration. Default: False
Session Settings:
query_tag (str): Tag for query tracking and monitoring. Default: “BenchBox”
timezone (str): Session timezone. Default: “UTC”
Data Loading:
file_format (str): Default file format. Default: “CSV”
compression (str): Compression type (AUTO, GZIP, BROTLI, ZSTD, etc.). Default: “AUTO”
Configuration Examples¶
Password Authentication¶
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="benchbox_user",
password="secure_password_123",
warehouse="COMPUTE_WH",
database="BENCHBOX"
)
Key-Pair Authentication (Recommended for Production)¶
# Generate key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
# Extract public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# In Snowflake, assign public key to user
ALTER USER benchbox_user SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="benchbox_user",
password="", # Not needed with key-pair
private_key_path="/path/to/rsa_key.p8",
warehouse="COMPUTE_WH",
database="BENCHBOX"
)
OAuth Authentication¶
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="benchbox_user",
password="oauth_token_here",
authenticator="oauth",
warehouse="COMPUTE_WH",
database="BENCHBOX"
)
Warehouse Sizing¶
# Small for development (1 credit/hour)
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="user",
password="password",
warehouse="DEV_WH",
warehouse_size="X-SMALL"
)
# Large for production (8 credits/hour)
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="user",
password="password",
warehouse="PROD_WH",
warehouse_size="LARGE"
)
# 4X-Large for heavy workloads (128 credits/hour)
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="user",
password="password",
warehouse="HEAVY_WH",
warehouse_size="4X-LARGE"
)
Multi-Cluster Warehouse¶
# Auto-scale for concurrent workloads
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="user",
password="password",
warehouse="MULTI_CLUSTER_WH",
warehouse_size="LARGE",
multi_cluster_warehouse=True,
auto_suspend=60, # Suspend after 1 minute idle
auto_resume=True
)
Data Loading¶
PUT and COPY INTO (Recommended)¶
Snowflake uses internal stages for efficient data loading:
from benchbox.platforms.snowflake import SnowflakeAdapter
from benchbox.tpch import TPCH
from pathlib import Path
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="user",
password="password",
warehouse="LOAD_WH",
database="BENCHBOX"
)
# Generate data locally
benchmark = TPCH(scale_factor=1.0)
data_dir = Path("./tpch_data")
benchmark.generate_data(data_dir)
# Load data (automatically uses PUT + COPY INTO)
conn = adapter.create_connection()
table_stats, load_time = adapter.load_data(benchmark, conn, data_dir)
# Data uploaded to internal stage, then bulk loaded
print(f"Loaded {sum(table_stats.values()):,} rows in {load_time:.2f}s")
External Stage (S3/GCS/Azure)¶
# Create external stage
conn = adapter.create_connection()
cursor = conn.cursor()
# S3 external stage
cursor.execute("""
CREATE OR REPLACE STAGE benchbox_stage
URL = 's3://my-bucket/benchbox-data/'
CREDENTIALS = (
AWS_KEY_ID = 'AKIAIOSFODNN7EXAMPLE'
AWS_SECRET_KEY = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
)
""")
# Load from external stage
cursor.execute("""
COPY INTO lineitem
FROM @benchbox_stage/lineitem.tbl
FILE_FORMAT = (
TYPE = 'CSV'
FIELD_DELIMITER = '|'
SKIP_HEADER = 0
)
""")
Compressed Data¶
# Snowflake automatically handles compressed files
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="user",
password="password",
compression="GZIP" # GZIP, BROTLI, ZSTD, etc.
)
# GZIP files automatically decompressed during COPY INTO
Query Execution¶
Basic Query Execution¶
adapter = SnowflakeAdapter(account="...", username="...", password="...")
conn = adapter.create_connection()
# Execute SQL query
cursor = conn.cursor()
cursor.execute("""
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
count(*) as count_order
FROM lineitem
WHERE l_shipdate <= '1998-09-01'
GROUP BY l_returnflag, l_linestatus
ORDER BY l_returnflag, l_linestatus
""")
results = cursor.fetchall()
for row in results:
print(row)
Query Statistics¶
# Execute with query tag for tracking
cursor.execute("ALTER SESSION SET QUERY_TAG = 'benchmark_q1'")
cursor.execute(query)
results = cursor.fetchall()
# Get query history with performance metrics
cursor.execute("""
SELECT
QUERY_ID,
QUERY_TEXT,
TOTAL_ELAPSED_TIME,
EXECUTION_TIME,
COMPILATION_TIME,
BYTES_SCANNED,
ROWS_PRODUCED,
CREDITS_USED_CLOUD_SERVICES,
WAREHOUSE_SIZE
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())
WHERE QUERY_TAG = 'benchmark_q1'
ORDER BY START_TIME DESC
LIMIT 1
""")
stats = cursor.fetchone()
print(f"Execution time: {stats[3]}ms")
print(f"Bytes scanned: {stats[5]:,}")
print(f"Credits used: {stats[7]}")
Query Plans¶
# Get query execution plan
cursor.execute("""
EXPLAIN
SELECT * FROM lineitem
WHERE l_shipdate > '1995-01-01'
""")
plan = cursor.fetchall()
for step in plan:
print(step[0])
Advanced Features¶
Clustering¶
# Create table with clustering key
cursor.execute("""
CREATE OR REPLACE TABLE orders_clustered (
o_orderkey NUMBER,
o_custkey NUMBER,
o_orderstatus STRING,
o_totalprice NUMBER(15,2),
o_orderdate DATE
)
CLUSTER BY (o_orderdate, o_orderkey)
""")
# Snowflake automatically maintains clustering
# Manual recluster if needed
cursor.execute("ALTER TABLE orders_clustered RECLUSTER")
# Enable automatic clustering
cursor.execute("ALTER TABLE orders_clustered RESUME RECLUSTER")
# Check clustering quality
cursor.execute("""
SELECT SYSTEM$CLUSTERING_INFORMATION('orders_clustered')
""")
Time Travel¶
# Query data as of 1 hour ago
cursor.execute("""
SELECT * FROM lineitem
AT(OFFSET => -3600)
WHERE l_shipdate = '1995-01-01'
""")
# Query data at specific timestamp
cursor.execute("""
SELECT * FROM lineitem
AT(TIMESTAMP => '2025-01-01 00:00:00'::TIMESTAMP)
WHERE l_shipdate = '1995-01-01'
""")
# View table changes (before/after)
cursor.execute("""
SELECT * FROM lineitem
BEFORE(STATEMENT => '01a12345-6789-abcd-ef01-234567890abc')
""")
Zero-Copy Cloning¶
# Clone database instantly (no data copy)
cursor.execute("""
CREATE DATABASE benchbox_clone
CLONE benchbox
""")
# Clone table
cursor.execute("""
CREATE TABLE lineitem_clone
CLONE lineitem
""")
# Clone at specific time
cursor.execute("""
CREATE TABLE lineitem_yesterday
CLONE lineitem
AT(OFFSET => -86400) # 24 hours ago
""")
Result Set Caching¶
# Enable result caching (default)
cursor.execute("ALTER SESSION SET USE_CACHED_RESULT = TRUE")
# First execution computes result
cursor.execute("SELECT COUNT(*) FROM lineitem")
result1 = cursor.fetchone() # Executes query
# Second execution uses cached result (instant, no credits)
cursor.execute("SELECT COUNT(*) FROM lineitem")
result2 = cursor.fetchone() # Returns cached result
Best Practices¶
Warehouse Management¶
Right-size warehouses for workload:
# Development: X-SMALL to SMALL # Testing: MEDIUM to LARGE # Production: LARGE to 4X-LARGE adapter = SnowflakeAdapter( warehouse_size="MEDIUM", # Balance of cost and performance auto_suspend=300, # Suspend after 5 min idle auto_resume=True # Auto-resume on query )
Use separate warehouses for different workloads:
# Loading warehouse load_adapter = SnowflakeAdapter(warehouse="LOAD_WH", warehouse_size="LARGE") # Query warehouse query_adapter = SnowflakeAdapter(warehouse="QUERY_WH", warehouse_size="MEDIUM")
Enable multi-cluster for concurrent workloads:
adapter = SnowflakeAdapter( warehouse="CONCURRENT_WH", multi_cluster_warehouse=True )
Cost Optimization¶
Suspend idle warehouses:
adapter = SnowflakeAdapter( auto_suspend=60, # Aggressive suspension (1 minute) auto_resume=True )
Use result caching:
# Enabled by default - reuses results for identical queries cursor.execute("ALTER SESSION SET USE_CACHED_RESULT = TRUE")
Start small, scale up as needed:
# Start with smallest warehouse adapter = SnowflakeAdapter(warehouse_size="X-SMALL") # Monitor and resize if needed cursor.execute(f"ALTER WAREHOUSE {warehouse} SET WAREHOUSE_SIZE = 'MEDIUM'")
Monitor credit usage:
# Check warehouse credit usage cursor.execute(""" SELECT WAREHOUSE_NAME, SUM(CREDITS_USED) as total_credits, SUM(CREDITS_USED_COMPUTE) as compute_credits, SUM(CREDITS_USED_CLOUD_SERVICES) as cloud_services_credits FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY WHERE START_TIME >= DATEADD('day', -7, CURRENT_TIMESTAMP()) GROUP BY WAREHOUSE_NAME ORDER BY total_credits DESC """)
Data Organization¶
Use clustering keys for filtered columns:
CREATE TABLE lineitem (...) CLUSTER BY (l_shipdate, l_orderkey)
Partition large tables by date:
# Snowflake automatically creates micro-partitions # Clustering by date provides similar benefits CLUSTER BY (DATE_TRUNC('month', order_date))
Analyze clustering quality:
cursor.execute(""" SELECT SYSTEM$CLUSTERING_INFORMATION('lineitem') """) # Recluster if quality degrades if clustering_depth > 10: cursor.execute("ALTER TABLE lineitem RECLUSTER")
Common Issues¶
Warehouse Not Running¶
Problem: “Warehouse is suspended” error
Solutions:
# 1. Enable auto-resume
adapter = SnowflakeAdapter(
auto_resume=True # Warehouse starts automatically
)
# 2. Manually resume warehouse
cursor.execute(f"ALTER WAREHOUSE {warehouse} RESUME")
# 3. Check warehouse status
cursor.execute(f"SHOW WAREHOUSES LIKE '{warehouse}'")
status = cursor.fetchall()
print(f"Warehouse state: {status[0][1]}")
Authentication Failed¶
Problem: “Incorrect username or password” error
Solutions:
# 1. Verify account identifier format
# Correct: "xy12345.us-east-1" or "xy12345.us-east-1.aws"
# Incorrect: "https://xy12345.snowflakecomputing.com"
# 2. Check username (case-insensitive but must exist)
# In Snowflake UI: SHOW USERS;
# 3. Use key-pair auth for better security
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="benchbox_user",
password="",
private_key_path="/path/to/key.p8"
)
Insufficient Privileges¶
Problem: “Insufficient privileges” error
Solutions:
# Grant required privileges in Snowflake
GRANT USAGE ON WAREHOUSE COMPUTE_WH TO ROLE benchbox_role;
GRANT USAGE ON DATABASE BENCHBOX TO ROLE benchbox_role;
GRANT CREATE SCHEMA ON DATABASE BENCHBOX TO ROLE benchbox_role;
GRANT USAGE ON SCHEMA BENCHBOX.PUBLIC TO ROLE benchbox_role;
GRANT CREATE TABLE ON SCHEMA BENCHBOX.PUBLIC TO ROLE benchbox_role;
# Specify role with sufficient privileges
adapter = SnowflakeAdapter(
account="xy12345.us-east-1",
username="user",
password="password",
role="BENCHBOX_ROLE" # Role with required privileges
)
High Costs¶
Problem: Unexpected credit consumption
Solutions:
# 1. Check query history for expensive queries
cursor.execute("""
SELECT
QUERY_TEXT,
TOTAL_ELAPSED_TIME,
BYTES_SCANNED,
CREDITS_USED_CLOUD_SERVICES
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())
WHERE START_TIME >= DATEADD('hour', -24, CURRENT_TIMESTAMP())
ORDER BY CREDITS_USED_CLOUD_SERVICES DESC
LIMIT 10
""")
# 2. Use smaller warehouse
adapter = SnowflakeAdapter(warehouse_size="X-SMALL")
# 3. Enable aggressive auto-suspend
adapter = SnowflakeAdapter(auto_suspend=60) # 1 minute
# 4. Set resource monitors
cursor.execute("""
CREATE RESOURCE MONITOR daily_limit WITH CREDIT_QUOTA = 100
TRIGGERS ON 75 PERCENT DO NOTIFY
ON 100 PERCENT DO SUSPEND
""")
cursor.execute(f"""
ALTER WAREHOUSE {warehouse} SET RESOURCE_MONITOR = daily_limit
""")
Slow Query Performance¶
Problem: Queries slower than expected
Solutions:
# 1. Resize warehouse
cursor.execute(f"""
ALTER WAREHOUSE {warehouse} SET WAREHOUSE_SIZE = 'LARGE'
""")
# 2. Check clustering quality
cursor.execute("""
SELECT SYSTEM$CLUSTERING_INFORMATION('lineitem')
""")
# 3. Add clustering keys
cursor.execute("""
ALTER TABLE lineitem CLUSTER BY (l_shipdate, l_orderkey)
""")
# 4. Enable automatic clustering
cursor.execute("ALTER TABLE lineitem RESUME RECLUSTER")
# 5. Check query profile
# In Snowflake UI: Query History → Click query → View Profile
See Also¶
Platform Documentation¶
Platform Selection Guide - Choosing Snowflake vs other platforms
Multi-Platform Database Support - Quick setup for all platforms
Platform Comparison Matrix - Feature comparison
Benchmark Guides¶
TPC-H Benchmark - TPC-H on Snowflake
TPC-DS Benchmark - TPC-DS on Snowflake
API Reference¶
DuckDB Platform Adapter - DuckDB adapter
ClickHouse Platform Adapter - ClickHouse adapter
Databricks Platform Adapter - Databricks adapter
BigQuery Platform Adapter - BigQuery adapter
Base Benchmark API - Base benchmark interface
Python API Reference - Python API overview
External Resources¶
Snowflake Documentation - Official Snowflake docs
Warehouse Sizing - Sizing guidance
Clustering Keys - Clustering best practices
Cost Optimization - Cost management
Time Travel - Time Travel guide