Apache Spark Platform¶
Apache Spark is the most widely deployed distributed computing framework for large-scale data processing. It provides a unified analytics engine supporting batch processing, streaming, and machine learning workloads.
Features¶
Distributed execution - Scale across thousands of nodes
Multiple deployment modes - Local, standalone, YARN, Kubernetes
Rich format support - Parquet, ORC, CSV, Delta Lake, Iceberg
Adaptive Query Execution - Dynamic runtime optimization
Catalyst optimizer - Sophisticated query planning
DataFrame and SQL APIs - Flexible programming models
Installation¶
# Install PySpark
pip install pyspark
# Or with specific version
pip install pyspark==3.5.0
# For Delta Lake support
pip install delta-spark
Configuration¶
Environment Variables¶
# Spark configuration
export SPARK_HOME=/path/to/spark
export PYSPARK_PYTHON=python3
# For distributed mode
export SPARK_MASTER=spark://master:7077
CLI Options¶
benchbox run --platform spark --benchmark tpch --scale 1.0 \
--platform-option master=local[*] \
--platform-option driver_memory=4g \
--platform-option executor_memory=8g
Platform Options¶
Option |
Default |
Description |
|---|---|---|
|
local[*] |
Spark master URL |
|
BenchBox |
Spark application name |
|
4g |
Driver memory allocation |
|
8g |
Executor memory allocation |
|
4 |
Cores per executor |
|
auto |
Number of executors |
|
/tmp/spark-warehouse |
Hive warehouse location |
|
spark_catalog |
Catalog name |
|
parquet |
Default table format |
Usage Examples¶
Local Mode¶
# Run locally with all cores
benchbox run --platform spark --benchmark tpch --scale 0.1 \
--platform-option master="local[*]"
Cluster Mode¶
# Run on Spark cluster
benchbox run --platform spark --benchmark tpch --scale 10.0 \
--platform-option master=spark://master:7077 \
--platform-option num_executors=10 \
--platform-option executor_memory=16g
Python API¶
from benchbox import TPCH
from benchbox.platforms.spark import SparkAdapter
# Initialize adapter
adapter = SparkAdapter(
master="local[*]",
driver_memory="4g",
app_name="TPC-H Benchmark",
)
# Load and run benchmark
benchmark = TPCH(scale_factor=1.0)
benchmark.generate_data()
adapter.load_benchmark(benchmark)
results = adapter.run_benchmark(benchmark)
Delta Lake Integration¶
from benchbox.platforms.spark import SparkAdapter
# Initialize with Delta Lake
adapter = SparkAdapter(
master="local[*]",
table_format="delta",
spark_config={
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
}
)
Deployment Modes¶
Local Mode¶
Best for development and small datasets:
benchbox run --platform spark --benchmark tpch --scale 0.1 \
--platform-option master="local[4]" # 4 cores
Standalone Cluster¶
# Start master
$SPARK_HOME/sbin/start-master.sh
# Start workers
$SPARK_HOME/sbin/start-worker.sh spark://master:7077
# Run benchmark
benchbox run --platform spark --benchmark tpch --scale 10.0 \
--platform-option master=spark://master:7077
Kubernetes¶
benchbox run --platform spark --benchmark tpch --scale 10.0 \
--platform-option master=k8s://https://kubernetes:443 \
--platform-option deploy_mode=cluster
Performance Tuning¶
Adaptive Query Execution (AQE)¶
Enable AQE for dynamic optimization:
adapter = SparkAdapter(
spark_config={
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
}
)
Memory Configuration¶
adapter = SparkAdapter(
driver_memory="8g",
executor_memory="16g",
spark_config={
"spark.memory.fraction": "0.8",
"spark.memory.storageFraction": "0.3",
}
)
Shuffle Optimization¶
adapter = SparkAdapter(
spark_config={
"spark.sql.shuffle.partitions": "200",
"spark.sql.files.maxPartitionBytes": "128m",
}
)
Benchmark Recommendations¶
Scale Factor Guidelines¶
Scale Factor |
Recommended Mode |
Executors |
Memory |
|---|---|---|---|
0.1 |
Local |
1 |
4 GB |
1.0 |
Local/Small cluster |
2-4 |
8 GB each |
10.0 |
Cluster |
10+ |
16 GB each |
100.0 |
Large cluster |
50+ |
32 GB each |
Best Practices¶
Use columnar formats - Parquet or ORC for best performance
Enable AQE - Adaptive execution handles skew automatically
Right-size partitions - 128 MB to 1 GB per partition
Cache wisely - Only cache frequently accessed data
Query Plan Analysis¶
benchbox run --platform spark --benchmark tpch \
--show-query-plans
Spark provides detailed physical and logical plans:
# Get query plan
spark.sql("SELECT ...").explain(extended=True)
Limitations¶
Infrastructure overhead - Requires cluster for large datasets
Startup time - JVM warm-up affects small benchmarks
Memory management - Complex tuning for optimal performance
Troubleshooting¶
Out of Memory¶
# Increase memory
adapter = SparkAdapter(
driver_memory="8g",
executor_memory="16g",
spark_config={
"spark.sql.shuffle.partitions": "400",
}
)
Shuffle Spill¶
# Reduce shuffle partition size
adapter = SparkAdapter(
spark_config={
"spark.sql.shuffle.partitions": "400",
"spark.shuffle.spill.compress": "true",
}
)
Connection to Master Failed¶
# Verify master is running
curl http://master:8080 # Web UI
# Check firewall rules