"""Cloud storage path utilities for BenchBox.
This module provides minimal abstraction over cloud storage paths using cloudpathlib,
allowing benchmarks to work with cloud storage locations while maintaining simplicity.
Copyright 2026 Joe Harris / BenchBox Project
Licensed under the MIT License. See LICENSE file in the project root for details.
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import Any, List, Protocol, Union, runtime_checkable
from urllib.parse import urlparse
try:
from cloudpathlib import CloudPath
from cloudpathlib.exceptions import (
CloudPathNotExistsError,
MissingCredentialsError,
)
except ImportError:
CloudPath = None # type: ignore
CloudPathNotExistsError = Exception # type: ignore
MissingCredentialsError = Exception # type: ignore
logger = logging.getLogger(__name__)
class DatabricksPath:
"""Wrapper around a local path that stores Databricks UC Volume target.
This class uses composition instead of inheritance to avoid Path subclassing
issues. It wraps a local temporary directory path and stores the target
dbfs:// path for later upload by the Databricks adapter.
The class implements the os.PathLike protocol and delegates most operations
to the underlying Path object.
"""
def __init__(self, local_path: Union[str, Path], dbfs_target: str):
"""Create a new DatabricksPath instance.
Args:
local_path: Local filesystem path (usually a temp directory)
dbfs_target: Target dbfs:// path for upload
"""
self._path = Path(local_path) if not isinstance(local_path, Path) else local_path
self._dbfs_target = dbfs_target
def __fspath__(self) -> str:
"""Return the file system path (os.PathLike protocol)."""
return str(self._path)
def __str__(self) -> str:
"""String representation returns the local path."""
return str(self._path)
def __repr__(self) -> str:
"""Repr shows both local and target paths."""
return f"DatabricksPath({self._path!r}, dbfs_target={self._dbfs_target!r})"
def __truediv__(self, other: Union[str, Path]) -> Path:
"""Path joining operator - returns regular Path."""
return self._path / other
def __eq__(self, other: object) -> bool:
"""Equality comparison."""
if isinstance(other, DatabricksPath):
return self._path == other._path and self._dbfs_target == other._dbfs_target
elif isinstance(other, (str, Path)):
return self._path == Path(other)
return False
def __hash__(self) -> int:
"""Hash based on local path."""
return hash(self._path)
@property
def dbfs_target(self) -> str:
"""Get the target dbfs:// path for this local directory."""
return self._dbfs_target
# Delegate common Path operations
def exists(self) -> bool:
"""Check if the local path exists."""
return self._path.exists()
def mkdir(self, parents: bool = True, exist_ok: bool = True) -> None:
"""Create the local directory."""
self._path.mkdir(parents=parents, exist_ok=exist_ok)
def is_dir(self) -> bool:
"""Check if the local path is a directory."""
return self._path.is_dir()
def is_file(self) -> bool:
"""Check if the local path is a file."""
return self._path.is_file()
def iterdir(self):
"""Iterate over directory contents."""
return self._path.iterdir()
def glob(self, pattern: str):
"""Glob for files matching pattern."""
return self._path.glob(pattern)
@property
def name(self) -> str:
"""Get the final path component."""
return self._path.name
@property
def parent(self) -> Path:
"""Get the parent directory."""
return self._path.parent
@property
def parts(self) -> tuple:
"""Get path components."""
return self._path.parts
def as_posix(self) -> str:
"""Return the path as a POSIX string."""
return self._path.as_posix()
def resolve(self, strict: bool = False) -> Path:
"""Resolve to absolute path."""
return self._path.resolve(strict=strict)
class CloudStagingPath:
"""Universal wrapper for persistent local staging + cloud target pattern.
This class provides a consistent approach for all cloud storage providers
(GCS, S3, Azure, etc.) by maintaining:
- local_path: Persistent local directory for data generation and caching
- cloud_target: Target cloud URI for upload (gs://, s3://, etc.)
The class implements the os.PathLike protocol and delegates filesystem
operations to the local path, allowing generators to work with local files
while the platform adapter handles uploads to the cloud target.
This eliminates the need for temporary directories and enables data caching
between runs, significantly improving performance.
"""
def __init__(self, local_path: Union[str, Path], cloud_target: str):
"""Create a new CloudStagingPath instance.
Args:
local_path: Persistent local filesystem path for staging/caching
cloud_target: Target cloud URI (gs://, s3://, etc.) for upload
"""
self._path = Path(local_path) if not isinstance(local_path, Path) else local_path
self._cloud_target = cloud_target
def __fspath__(self) -> str:
"""Return the file system path (os.PathLike protocol)."""
return str(self._path)
def __str__(self) -> str:
"""String representation returns the local path."""
return str(self._path)
def __repr__(self) -> str:
"""Repr shows both local and cloud paths."""
return f"CloudStagingPath({self._path!r}, cloud_target={self._cloud_target!r})"
def __truediv__(self, other: Union[str, Path]) -> Path:
"""Path joining operator - returns regular Path."""
return self._path / other
def __eq__(self, other: object) -> bool:
"""Equality comparison."""
if isinstance(other, CloudStagingPath):
return self._path == other._path and self._cloud_target == other._cloud_target
elif isinstance(other, (str, Path)):
return self._path == Path(other)
return False
def __hash__(self) -> int:
"""Hash based on local path."""
return hash(self._path)
@property
def cloud_target(self) -> str:
"""Get the target cloud URI for this staging directory."""
return self._cloud_target
# Delegate common Path operations to local path
def exists(self) -> bool:
"""Check if the local path exists."""
return self._path.exists()
def mkdir(self, parents: bool = True, exist_ok: bool = True) -> None:
"""Create the local directory."""
self._path.mkdir(parents=parents, exist_ok=exist_ok)
def is_dir(self) -> bool:
"""Check if the local path is a directory."""
return self._path.is_dir()
def is_file(self) -> bool:
"""Check if the local path is a file."""
return self._path.is_file()
def iterdir(self):
"""Iterate over directory contents."""
return self._path.iterdir()
def glob(self, pattern: str):
"""Glob for files matching pattern."""
return self._path.glob(pattern)
@property
def name(self) -> str:
"""Get the final path component."""
return self._path.name
@property
def parent(self) -> Path:
"""Get the parent directory."""
return self._path.parent
@property
def parts(self) -> tuple:
"""Get path components."""
return self._path.parts
def as_posix(self) -> str:
"""Return the path as a POSIX string."""
return self._path.as_posix()
def resolve(self, strict: bool = False) -> Path:
"""Resolve to absolute path."""
return self._path.resolve(strict=strict)
@runtime_checkable
class RemoteFileSystemAdapter(Protocol):
"""Minimal adapter interface for remote file operations used by validation.
Implementations should operate on absolute remote paths, including scheme
(e.g., dbfs:/Volumes/.../file). Paths are treated as opaque strings.
"""
def file_exists(self, remote_path: str) -> bool: # pragma: no cover - interface
...
def read_file(self, remote_path: str) -> bytes: # pragma: no cover - interface
...
def write_file(self, remote_path: str, content: bytes) -> None: # pragma: no cover - interface
...
def list_files(self, remote_path: str, pattern: str = "*") -> list[str]: # pragma: no cover - interface
...
class DatabricksVolumeAdapter:
"""RemoteFileSystemAdapter implementation for Databricks UC Volumes (dbfs:/Volumes/...)."""
def __init__(self, workspace_client: Any | None = None, *, host: str | None = None, token: str | None = None):
try:
from databricks.sdk import WorkspaceClient # type: ignore
except Exception as e: # pragma: no cover - optional dependency
raise ImportError(
"databricks-sdk required for DatabricksVolumeAdapter. Install with: uv add databricks-sdk"
) from e
if workspace_client is not None:
self._ws = workspace_client
else:
# Let SDK auto-configure if host/token are None
self._ws = WorkspaceClient(host=(f"https://{host}" if host else None), token=token)
def _to_ws_path(self, remote_path: str) -> str:
# Convert dbfs:/Volumes/... to /Volumes/...
return remote_path.replace("dbfs:", "")
def file_exists(self, remote_path: str) -> bool:
path = self._to_ws_path(remote_path)
try:
info = self._ws.files.get(path) # type: ignore[attr-defined]
return bool(info)
except Exception:
# get() may raise if not found
return False
def read_file(self, remote_path: str) -> bytes:
path = self._to_ws_path(remote_path)
try:
data = self._ws.files.download(path) # type: ignore[attr-defined]
# Some SDKs return bytes directly, others return a stream-like object
if hasattr(data, "read"):
return data.read()
return bytes(data)
except Exception as e: # pragma: no cover - provider specific
raise RuntimeError(f"Failed to read remote file: {remote_path}: {e}")
def write_file(self, remote_path: str, content: bytes) -> None:
path = self._to_ws_path(remote_path)
try:
from io import BytesIO
self._ws.files.upload(path, BytesIO(content), overwrite=True) # type: ignore[attr-defined]
except Exception as e: # pragma: no cover - provider specific
raise RuntimeError(f"Failed to write remote file: {remote_path}: {e}")
def list_files(self, remote_path: str, pattern: str = "*") -> list[str]:
path = self._to_ws_path(remote_path)
try:
items = self._ws.files.list(path) # type: ignore[attr-defined]
names: List[str] = []
for it in items or []:
# Item may be dict or object; extract path/name best-effort
p = getattr(it, "path", None) or getattr(it, "file_path", None) or str(it)
names.append(p)
# Best-effort pattern filter (suffix/prefix wildcard only)
import fnmatch
return [n for n in names if fnmatch.fnmatch(n.split("/")[-1], pattern)]
except Exception: # pragma: no cover - provider specific
return []
[docs]
def is_cloud_path(path: Union[str, Path]) -> bool:
"""Check if a path is a cloud storage path.
Includes dbfs:// paths (Databricks File System / Unity Catalog Volumes)
which require special handling via Databricks Files API.
Args:
path: Path to check
Returns:
True if path is a cloud storage path (s3://, gs://, abfss://, dbfs://, etc.)
"""
# Convert Path objects (including CloudPath from cloudpathlib) to strings
if not isinstance(path, str):
path = str(path)
parsed = urlparse(path)
return parsed.scheme in ["s3", "gs", "gcs", "az", "abfss", "azure", "dbfs"]
def is_databricks_path(path: Union[str, Path]) -> bool:
"""Check if a path is a Databricks DBFS or UC Volume path.
Databricks paths use the dbfs:// scheme but are NOT supported by
cloudpathlib. They require special handling via Databricks Files API.
Args:
path: Path to check
Returns:
True if path is a dbfs:// path
"""
if isinstance(path, Path):
path = str(path)
if not isinstance(path, str):
return False
parsed = urlparse(path)
return parsed.scheme == "dbfs"
[docs]
def validate_cloud_path_support() -> bool:
"""Validate that cloud path support is available.
Returns:
True if cloudpathlib is available, False otherwise
"""
return CloudPath is not None
[docs]
def create_path_handler(path: Union[str, Path]) -> Union[Path, CloudPath, DatabricksPath]:
"""Create appropriate path handler for local or cloud paths.
Note: dbfs:// paths (Databricks UC Volumes) cannot be handled directly by
cloudpathlib. For these paths, we create a local temporary directory for
data generation and store the target dbfs:// path as an attribute. The actual
upload is handled by DatabricksAdapter during the load phase.
Args:
path: Local or cloud storage path (or already-created DatabricksPath/CloudPath)
Returns:
Path object for local paths, CloudPath for cloud paths,
DatabricksPath for dbfs:// paths (either created or passed through)
Raises:
ImportError: If cloud path is provided but cloudpathlib not installed
ValueError: If cloud path format is invalid
"""
# If already a DatabricksPath instance, return as-is (avoids double-wrapping)
if isinstance(path, DatabricksPath):
return path
# If already a CloudPath instance, return as-is (avoids double-wrapping)
# Check if CloudPath is actually a class type, not None or a mock
if CloudPath is not None and hasattr(CloudPath, "__mro__") and isinstance(path, CloudPath):
return path
# Handle Databricks paths specially - they require local generation + upload
if is_databricks_path(path):
path_str = str(path)
# Validate dbfs:// path format for UC Volumes
if not path_str.startswith("dbfs:/Volumes/"):
raise ValueError(
f"Invalid dbfs:// path: {path_str}. "
f"Unity Catalog Volumes must use format: dbfs:/Volumes/catalog/schema/volume"
)
# Create temporary directory for local data generation
import tempfile
temp_dir_str = tempfile.mkdtemp(prefix="benchbox_dbfs_")
# Create DatabricksPath that wraps the temp directory and stores the target
databricks_path = DatabricksPath(temp_dir_str, path_str)
logger.info(f"Created temporary directory for dbfs:// path: {databricks_path}")
logger.debug(f"Target UC Volume: {path_str}")
return databricks_path
if not is_cloud_path(path):
return Path(path)
if CloudPath is None:
raise ImportError(
"cloudpathlib is required for cloud storage paths. Install with: uv add benchbox --extra cloudstorage"
)
try:
return CloudPath(str(path)) # type: ignore
except Exception as e:
raise ValueError(f"Invalid cloud path format '{path}': {e}")
def get_remote_fs_adapter(remote_path: str) -> RemoteFileSystemAdapter:
"""Create a RemoteFileSystemAdapter for a remote path.
Currently supports Databricks UC Volumes via dbfs:/ scheme. For other
providers, placeholder implementations can be added in the future.
"""
if is_databricks_path(remote_path):
# Lazy import to avoid enforcing dependency when unused
try:
from databricks.sdk import WorkspaceClient # type: ignore
ws = WorkspaceClient()
return DatabricksVolumeAdapter(ws)
except ImportError as e:
raise ImportError(
"databricks-sdk required for Databricks UC Volume operations. Install with: uv add databricks-sdk"
) from e
except Exception as e:
raise RuntimeError(
f"Failed to initialize Databricks workspace client: {e}. "
"Ensure DATABRICKS_HOST and DATABRICKS_TOKEN are set correctly."
) from e
# Placeholder adapters for future S3/GCS/Azure implementations could go here
raise ValueError(f"No RemoteFileSystemAdapter available for path: {remote_path}")
[docs]
def validate_cloud_credentials(path: Union[str, Path]) -> dict[str, Any]:
"""Validate cloud credentials for the given path.
Args:
path: Cloud storage path to validate
Returns:
Dictionary with validation results:
- valid: bool indicating if credentials are valid
- provider: string cloud provider (s3, gcs, azure, dbfs)
- error: error message if validation failed
- env_vars: list of environment variables checked
"""
# Databricks paths use Databricks access tokens, validated separately by the adapter
if is_databricks_path(path):
return {
"valid": True, # Assume valid - will be checked by Databricks adapter
"provider": "dbfs",
"error": None,
"env_vars": ["DATABRICKS_HOST", "DATABRICKS_HTTP_PATH", "DATABRICKS_TOKEN"],
}
if not is_cloud_path(path):
return {"valid": True, "provider": "local", "error": None, "env_vars": []}
if CloudPath is None:
return {
"valid": False,
"provider": "unknown",
"error": "cloudpathlib not installed",
"env_vars": [],
}
parsed = urlparse(str(path))
provider = parsed.scheme
# Define expected environment variables for each provider
env_checks = {
"s3": ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"],
"gs": ["GOOGLE_APPLICATION_CREDENTIALS"],
"gcs": ["GOOGLE_APPLICATION_CREDENTIALS"],
"abfss": ["AZURE_STORAGE_ACCOUNT_NAME", "AZURE_STORAGE_ACCOUNT_KEY"],
"azure": ["AZURE_STORAGE_ACCOUNT_NAME", "AZURE_STORAGE_ACCOUNT_KEY"],
}
expected_vars = env_checks.get(provider, [])
# For S3, check multiple credential sources (not just env vars)
if provider == "s3":
# Check environment variables
has_env_creds = bool(os.getenv("AWS_ACCESS_KEY_ID") and os.getenv("AWS_SECRET_ACCESS_KEY"))
# Check AWS profile
has_profile = bool(os.getenv("AWS_PROFILE"))
# Check credentials file
credentials_file = Path(os.path.expanduser("~/.aws/credentials"))
has_credentials_file = credentials_file.exists()
# Check config file with default profile
config_file = Path(os.path.expanduser("~/.aws/config"))
has_config_file = config_file.exists()
if not any([has_env_creds, has_profile, has_credentials_file, has_config_file]):
return {
"valid": False,
"provider": provider,
"error": (
"No AWS credentials found. Configure via:\n"
" - aws configure (creates ~/.aws/credentials)\n"
" - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY env vars\n"
" - AWS_PROFILE env var with named profile"
),
"env_vars": expected_vars,
}
else:
# For other providers, check environment variables
missing_vars = [var for var in expected_vars if not os.getenv(var)]
if missing_vars:
return {
"valid": False,
"provider": provider,
"error": f"Missing environment variables: {', '.join(missing_vars)}",
"env_vars": expected_vars,
}
# Try to create a cloud path to test credentials
try:
cloud_path = CloudPath(str(path)) # type: ignore
# Test basic operations
_ = cloud_path.exists()
return {
"valid": True,
"provider": provider,
"error": None,
"env_vars": expected_vars,
}
except MissingCredentialsError as e:
return {
"valid": False,
"provider": provider,
"error": f"Credential validation failed: {e}",
"env_vars": expected_vars,
}
except Exception as e:
return {
"valid": False,
"provider": provider,
"error": f"Cloud path validation failed: {e}",
"env_vars": expected_vars,
}
[docs]
def ensure_cloud_directory(
path: Union[str, Path, CloudPath],
) -> Union[Path, CloudPath, DatabricksPath]:
"""Ensure cloud or local directory exists.
Args:
path: Directory path to create
Returns:
Path object (local or cloud)
Raises:
Exception: If directory creation fails
"""
path_handler = create_path_handler(path) if isinstance(path, (str, Path)) else path
try:
if hasattr(path_handler, "mkdir"):
# Local Path or CloudPath with mkdir
path_handler.mkdir(parents=True, exist_ok=True) # type: ignore
elif hasattr(path_handler, "exists"):
# CloudPath - some providers auto-create directories
# Just check if we can access the path
if not path_handler.exists(): # type: ignore
# For cloud paths, we might need to create a marker file
logger.info(f"Cloud directory will be created on first file write: {path_handler}")
except Exception as e:
logger.error(f"Failed to ensure directory exists: {path_handler} - {e}")
raise
return path_handler
[docs]
def get_cloud_path_info(path: Union[str, Path]) -> dict[str, Any]:
"""Get information about a cloud path.
Args:
path: Path to analyze
Returns:
Dictionary with path information:
- is_cloud: bool
- provider: string provider name
- bucket: bucket/container name (or None for dbfs)
- path: path within bucket
- credentials_valid: bool
- volume_info: dict with catalog/schema/volume (for dbfs only)
"""
# Handle Databricks paths specially - extract UC Volume components
if is_databricks_path(path):
parsed = urlparse(str(path))
# For dbfs:/Volumes/catalog/schema/volume, extract components
path_parts = parsed.path.lstrip("/").split("/")
volume_info = {}
if len(path_parts) >= 4 and path_parts[0] == "Volumes":
volume_info = {
"catalog": path_parts[1] if len(path_parts) > 1 else None,
"schema": path_parts[2] if len(path_parts) > 2 else None,
"volume": path_parts[3] if len(path_parts) > 3 else None,
}
return {
"is_cloud": True,
"provider": "dbfs",
"bucket": None, # Not applicable for DBFS
"path": parsed.path,
"credentials_valid": True, # Checked by Databricks adapter
"volume_info": volume_info,
}
if not is_cloud_path(path):
return {
"is_cloud": False,
"provider": "local",
"bucket": None,
"path": str(path),
"credentials_valid": True,
}
parsed = urlparse(str(path))
scheme = parsed.scheme
bucket = parsed.netloc
cloud_path = parsed.path.lstrip("/")
# Use scheme directly as provider name for consistency with test expectations
provider = scheme
credential_check = validate_cloud_credentials(path)
return {
"is_cloud": True,
"provider": provider,
"bucket": bucket,
"path": cloud_path,
"credentials_valid": credential_check["valid"],
}
[docs]
class CloudPathAdapter:
"""Adapter to provide unified interface for local and cloud paths."""
[docs]
def __init__(self, path: Union[str, Path]):
"""Initialize path adapter.
Args:
path: Local or cloud storage path
"""
self.original_path = str(path)
self.is_cloud = is_cloud_path(path)
self.path_handler = create_path_handler(path)
if self.is_cloud:
self.path_info = get_cloud_path_info(path)
else:
self.path_info = {"is_cloud": False, "provider": "local"}
[docs]
def exists(self) -> bool:
"""Check if path exists."""
try:
return self.path_handler.exists() # type: ignore
except Exception:
return False
[docs]
def mkdir(self, parents: bool = True, exist_ok: bool = True) -> None:
"""Create directory."""
if hasattr(self.path_handler, "mkdir"):
self.path_handler.mkdir(parents=parents, exist_ok=exist_ok) # type: ignore
[docs]
def __str__(self) -> str:
"""String representation."""
return str(self.path_handler)
[docs]
def __truediv__(self, other: str) -> CloudPathAdapter:
"""Path joining operator."""
if self.is_cloud:
new_path = str(self.path_handler / other) # type: ignore
else:
new_path = str(self.path_handler / other)
return CloudPathAdapter(new_path)
@property
def name(self) -> str:
"""Get the name of the path."""
return self.path_handler.name # type: ignore
@property
def parent(self) -> CloudPathAdapter:
"""Get the parent directory."""
return CloudPathAdapter(str(self.path_handler.parent)) # type: ignore
[docs]
class CloudStorageGeneratorMixin:
"""Mixin class to add cloud storage upload functionality to data generators.
This mixin provides a standardized way for all data generators to handle cloud storage
uploads without duplicating code. Generators should inherit from this mixin and call
the cloud upload methods when needed.
"""
def _is_cloud_output(self, output_dir) -> bool:
"""Check if output directory is a cloud path."""
return is_cloud_path(str(output_dir))
def _handle_cloud_or_local_generation(self, output_dir, local_generate_func, verbose: bool = False):
"""Handle both cloud and local generation paths.
With CloudStagingPath and DatabricksPath, both cloud and local paths now work the same way:
- CloudStagingPath/DatabricksPath expose their local cache path via __fspath__()
- Generators see the local path and work normally (validation, generation, caching)
- Platform adapters access the cloud_target property for uploads
This eliminates the need for temporary directories and enables persistent caching.
Args:
output_dir: Output directory (local path, CloudStagingPath, or DatabricksPath)
local_generate_func: Function to generate data locally
verbose: Whether to print verbose output
Returns:
Dictionary mapping table names to file paths
"""
# Both local and cloud paths now use the same code path!
# CloudStagingPath/DatabricksPath implement __fspath__() to return local path
return local_generate_func(output_dir)