Apache Spark
Apache Spark is a fast and general-purpose cluster computing system.
Spark Job
Spark Job is a set of scripts that interfaces with Cobalt that automates the job submission process while using Apache Spark. These scripts can be found on Cooley and Theta under /soft/datascience/Spark_Job. If you want to make custom modifications, you can copy or git clone it to your own directory. In the following, we will call the installation directory, /PATH/TO/SPARK_JOB
.
The guaranteed stable user interface is the file:
/PATH/TO/SPARK_JOB/submit-spark.sh
It is designed to minimize the changes required for deploying Spark applications. For absolute stability, you can use explicit version number for the path, eg: /soft/datascience/Spark_Job_v1.0.2.
Usage
submit-spark.sh [options] [JOBFILE [arguments ...]]
JOBFILE (optional) can be:
script.py pyspark scripts
bin.jar java binaries
run-example CLASS run spark example CLASS
scripts other executable scripts (requires `-s`)
Required options:
-A PROJECT Allocation name
-t WALLTIME Max run time in minutes
-n NODES Job node count
-q QUEUE Queue name
Optional options:
-o OUTPUTDIR Directory for COBALT output files (default: current dir)
-s Enable script mode
-m Master uses a separate node
-p <2|3> Python version (default: 3)
-I Start an interactive ssh session
-w WAITTIME Time to wait for prompt in minutes (default: 30)
-h Print this help message
Without specifying a JOBFILE, the script will submit a job via Cobalt, start Spark, and launch an ssh session to the master node in Bash, with all the environment properly set up and directory changed to
OUTPUTDIR`. The Cobalt job will be deleted once you exit the ssh session.
With a JOBFILE
and optionally more arguments, the script will submit a job via Cobalt, start Spark, and pass the JOBFILE to $SPARK_HOME/bin/spark-submit
automatically, unless -s
is given (see below).
The required options, -A
, -t
, -n
, -q
, correspond to their counterparts for qsub
(part of Cobalt), and will be passed to qsub
, see the manual page of qsub
for details.
The option -o OUTPUTDIR
instructs the script to use OUTPUTDIR
to save all the files. By default, Cobalt will save files, $COBALT_JOBID.{cobaltlog,error,output}
, under this directory.
You can find a list of relevant environment variables in the file, $COBALT_JOBID/env
. In addition, under this OUTPUTDIR
, Spark will use $COBALT_JOBID/conf
as SPARK_CONF_DIR
, $COBALT_JOBID/logs
for logs, and $COBALT_JOBID/workers
for Spark temporaries.
The option -s
instructs the script to run the JOBFILE as a generic executable scripts, within which you may call, o launch, YOURFILE, which is a Spark jar file or a PySpark script.
The option -m
instructs the script to avoid launch Spark executor processes on the master node, such that only the Spark driver runs on the master node. This means that the actual number of nodes used by the executors is one less than the nodes requested (by -n
).
The option -p <2|3>
sets a default python environment (Intel Python), either version 2 or 3. If you pass an argument other than 2 or 3, no default python environment will be setup. In this case, you are responsible to set it up in env_local.sh
, if you intend to use PySpark.
The option -I
always launchs an ssh session to the master node, even if you also pass a JOBFILE. The JOBFILE will start running, while the ssh session is established, so that you can inspect the running job. Note that the job will be deleted once you exit the ssh session even if the JOBFILE is still running.
The option -w WAITTIME
instructs the script, if running in interactive mode (no JOBFILE or with -I
), to wait at most WAITTIME minutes before it quits and deletes the job.
In addition to the above options, a file env_local.sh
, if exists under the OUTPUTDIR
(see Optional options above), will be sourced by the script, to setup environment. The file env_local.sh
must be compatible to Bash installed in the host environment (both login nodes and compute nodes).
Environment Variables
The scripts set a few environment variables for informationalpurposes, and for controlling the behavior.
As an example, the environment variables below show various information, taken from the command line, the job scheduler, and the system. Please do not modify these variables unless you know what you are doing.
SPARKJOB_HOST="theta"
SPARKJOB_INTERACTIVE="1"
SPARKJOB_JOBID="242842"
SPARKJOB_PYVERSION="3"
SPARKJOB_SCRIPTMODE="0"
SPARKJOB_SCRIPTS_DIR="/lus/theta-fs0/projects/datascience/xyjin/Spark_Job"
SPARKJOB_SEPARATE_MASTER="0"
SPARKJOB_OUTPUT_DIR="/lus/theta-fs0/projects/datascience/xyjin/Spark_Job/example"
SPARK_MASTER_URI=spark://nid03838:7077
MASTER_HOST=nid03838
USER SETUP
The file, env_theta.sh
or env_cooley.sh
, contains preset configurations for either machine. To override these presets, create a file, env_local.sh
, under the OUTPUTDIR
, which by default is where you launch submit-spark.sh
. Note that the file,env_local.sh
, will be sourced by bash mutiple times. You can change the default output directory, where env_local.sh
should reside, by using the -o
option to submit-spark.sh. Below are some useful customizable variables.
SPARK_HOME="/soft/datascience/apache_spark"
SPARK_CONF_DIR="/lus/theta-fs0/projects/datascience/xyjin/Spark_Job/example/242842/conf"
PYSPARK_PYTHON="/opt/intel/python/2017.0.035/intelpython35/bin/python"
SPARKJOB_WORKING_DIR="/lus/theta-fs0/projects/datascience/xyjin/Spark_Job/example/242842"
SPARKJOB_WORKING_ENVS="/lus/theta-fs0/projects/datascience/xyjin/Spark_Job/example/242842/envs"
/projects/datascience/xyjin/Spark_Job/example
The variable SPARKJOB_OUTPUT_DIR
contains the directory path, and SPARKJOB_WORKING_DIR
and SPARKJOB_WORKING_ENVS
depends on SPARKJOB_OUTPUT_DIR
. You can set customizable variables in env_local.sh
. We provide an example copy of this file under the example
directory. TUNING PARAMETERS We use env_local.sh
for generating spark-defaults.conf
for each individual job. Typically for a scala job on Theta, you con put the following in the env_local.sh
file.
# The created spark-defaults.conf file will only affect spark
# submitted under the current directory where this file resides.
# The parameters here may require tuning depending on the machine and workload.
[[ -s $SPARK_CONF_DIR/spark-defaults.conf ]] ||
cat > "$SPARK_CONF_DIR/spark-defaults.conf" <<'EOF'
spark.task.cpus 4
spark.driver.memory 32g
spark.executor.memory 128g
spark.driver.extraJavaOptions -XX:+UseParallelGC -XX:ParallelGCThreads=8
spark.executor.extraJavaOptions -XX:+UseParallelGC -XX:ParallelGCThreads=8
EOF
Tune These Numbers for Your Workload
spark.task.cpus 4
spark.scheduler.maxRegisteredResourcesWaitingTime 4000s
spark.scheduler.minRegisteredResourcesRatio 1
spark.scheduler.listenerbus.eventqueue.capacity 100000
spark.worker.timeout 24000
spark.executor.heartbeatInterva l4000s
spark.files.fetchTimeout 12000s
spark.network.timeout 24000s
spark.locality.wait 6000s
spark.driver.memory 16g
spark.executor.memory 128g
spark.driver.extraJavaOptions; -XX:+UseParallelGC -XX:ParallelGCThreads=8
spark.executor.extraJavaOptions -XX:+UseParallelGC -XX:ParallelGCThreads=8
Other tunings
- Number of partitions for your RDD
- Point spark.local.dir to the local SSD
- Do not use "Dynamic Allocation" unless you have a strong reason
Scala Interactive
Start an interactive job: /soft/datascience/Spark_Job/submit-spark.sh -A datascience -t 60 -n 2 -q debug-cache-quad
Launch a scala shell: $SPARK_HOME/bin/spark-shell --master $SPARK_MASTER_URI
In the spawned scala shell, you can enter scala statements as follows:
sc.getExecutorMemoryStatus
(java.net.InetAddress.getLocalHost, Runtime.getRuntime.maxMemory)
sc.parallelize(1 to 10).
map((_, java.net.InetAddress.getLocalHost, Runtime.getRuntime.maxMemory)).
collect
1 to 10
above) than the number of worker nodes for the above statement to actually run on all nodes.
Example Submit Commands
$/PATH/TO/SPARK_JOB/submit-spark.sh -A datascience -t 60 -n 2 -q debug-cache-quad run-example SparkPi
The script will submit a COBALT job using the datascience
allocation, for a maximum wall clock time of 60 minutes, request 2 nodes, using the debug-cache-quad
queue. The job will run the scala exampleSparkPi
came with the default install of apache spark.
$/PATH/TO/SPARK_JOB/submit-spark.sh -A datascience -t 60 -n 2 -q debug $SPARK_HOME/examples/src/main/python/pi.py 10000
The script will submit a COBALT job using the datascience
allocation, for a maximum wall clock time of 60 minutes, request 2 nodes, using the debug
queue. The job will run the pyspark example pi.py
came with the default install of apache spark.
The script will submit a COBALT job using the datascience
allocation, for a maximum wall clock time of 60 minutes, request 2 nodes, using the debug
queue. The job will run the executable SomeExe
with arguments Args
on the compute node that has the spark master running. Spark related variables will populate the running environment.
datascience
allocation, for a maximum wall clock time of 60 minutes, request 2 nodes, using the debug
queue. The job will drop in to a shell environment on the compute node of the spark master.
Bugs and Limitations
Paths or environment variables containing quotes may break the scripts.
Current JVM on Theta is not aware of NUMA, and we recommend the use of cache mode (--attrs=mcdram=cache
).