Parsl is a parallel programming library for Python. It can be used to deploy large numbers of tasks in parallel and with complex dependencies on ALCF machines, and is particularly well suited to run high-throughput workflows. While Parsl is a Python library, it can execute tasks that run any compiled application. Parsl can also execute tasks that run mpi applications.
Parsl uses Python's concurrent futures module to create functions that return a Python futures object. A Parsl workflow operates by creating futures for tasks that the Parsl executor will then fulfill by running them on available compute resources.
A Parsl workflow contains two parts:
The workflow logic of applications, tasks and task dependencies
The configuration of compute resources that execute tasks
Here we sketch out some possible configurations for executing workflows on Aurora.
To get Python on Aurora, users can either load the AI frameworks module with module load frameworks or the basic Python 3.10 module with module load python/3.10.13
When using Parsl to distribute work over many PBS Jobs (first two examples below), your workflow script will be executed on a login node and will not return until all tasks are completed. In this situation, it is advisable to run your script in a screen session on the login node.
Parsl Config for a Large Ensemble of Single Tile tasks run over many PBS Jobs¶
A common use case is to run a large ensemble of tasks that each require one GPU tile on Aurora and to spread this workload over multiple PBS Jobs. The reason for spreading this workload over many PBS Jobs may be the size of the ensemble and/or the runtime of the tasks.
The Config object for this case is defined like this:
# config.pyimportosfromparsl.configimportConfig# PBSPro is the right provider for ALCF:fromparsl.providersimportPBSProProvider# The high throughput executor is for scaling large single core/tile/gpu tasks on HPC system:fromparsl.executorsimportHighThroughputExecutor# Use the MPI launcher to launch worker processes:fromparsl.launchersimportMpiExecLauncher# These options will run work in 1 node batch jobs run one at a timenodes_per_job=1max_num_jobs=1tile_names=[f'{gid}.{tid}'forgidinrange(6)fortidinrange(2)]# The config will launch workers from this directoryexecute_dir=os.getcwd()aurora_single_tile_config=Config(executors=[HighThroughputExecutor(# Ensures one worker per GPU tile on each nodeavailable_accelerators=tile_names,max_workers_per_node=12,# Distributes threads to workers/tiles in a way optimized for Auroracpu_affinity="list:0-7,104-111:8-15,112-119:16-23,120-127:24-31,128-135:32-39,136-143:40-47,144-151:52-59,156-163:60-67,164-171:68-75,172-179:76-83,180-187:84-91,188-195:92-99,196-203",# Increase if you have many more tasks than workersprefetch_capacity=0,# Options that specify properties of PBS Jobsprovider=PBSProProvider(# Project nameaccount="Aurora_deployment",# Submission queuequeue="debug",# Commands run before workers launched# Make sure to activate your environment where Parsl is installedworker_init=f'''source $HOME/_env/bin/activate; cd {execute_dir}''',# Wall time for batch jobswalltime="0:30:00",# Change if data/modules located on other filesystemscheduler_options="#PBS -l filesystems=home:flare",# Ensures 1 manger per node; the manager will distribute work to its 12 workers, one per tilelauncher=MpiExecLauncher(bind_cmd="--cpu-bind",overrides="--ppn 1"),# options added to #PBS -l select aside from ncpusselect_options="",# Number of nodes per PBS jobnodes_per_block=nodes_per_job,# Minimum number of concurrent PBS jobs running workflowmin_blocks=0,# Maximum number of concurrent PBS jobs running workflowmax_blocks=max_num_jobs,# Hardware threads per nodecpus_per_node=208,),),],# How many times to retry failed tasks# this is necessary if you have tasks that are interrupted by a PBS job ending# so that they will restart in the next jobretries=1,)
Import this Config object and use in a workflow script, e.g.:
# my_parsl_workflow.pyimportosimportparslfromparslimportbash_app,python_appfromconfigimportaurora_single_tile_config# Bash apps are for executing compiled applications or other shell commands@bash_appdefhello_affinity(stdout='hello.stdout',stderr='hello.stderr'):returnf'$HOME/GettingStarted/Examples/Aurora/affinity_gpu/sycl/hello_affinity'# Python apps are for executing native python functions@python_appdefhello_world(message,sleep_time=1):importtimetime.sleep(sleep_time)returnf"Hello {message}"working_directory=os.getcwd()print("Starting my_parsl_workflow")withparsl.load(aurora_single_tile_config):# Create 12 hello_world taskshello_world_futures=[hello_world(f"Aurora {i}")foriinrange(12)]print(f"Created {len(hello_world_futures)} hello_world tasks")# Create 12 hello_affinity taskshello_affinity_futures=[hello_affinity(stdout=f"{working_directory}/output/hello_{i}.stdout",stderr=f"{working_directory}/output/hello_{i}.stderr")foriinrange(12)]print(f"Created {len(hello_world_futures)} hello_affinity tasks")# This line will block until all hello_world results are returnedhello_world_results=[tf.result()fortfinhello_world_futures]print("hello_world tasks complete")print(f"python apps like hello_world return the function result, e.g. {hello_world_results[0]=}")# This line will block until all hello_affinity results are returnedhello_affinity_results=[tf.result()fortfinhello_affinity_futures]print("hello_affinity tasks complete")print(f"bash apps like hello_affinity return the return code of the executable, e.g. {hello_affinity_results[0]=}")print(f"Read results of hello_affinity from stdout file:")fori,tfinenumerate(hello_affinity_futures):withopen(f"{working_directory}/output/hello_{i}.stdout","r")asf:outputs=f.readlines()print(outputs)print("Tasks done!")
Note that a Parsl workflow script must block at some point on the result of all tasks that are created in order to ensure that the tasks complete.
When executing this script, the script will block until all tasks are completed. You may wish to check the scheduler to verify that Parsl queues a job to execute the tasks.
Parsl Config for Ensemble of Multinode MPI tasks tasks run over many PBS Jobs¶
In the previous example, mpiexec was used as a launcher, rather than an executor. In order to run applications that have MPI communication, mpiexec has to be used a different way by Parsl. To run MPI applications, use the SimpleLauncher and the MPIExecutor. Note that the configuration has to set max_workers_per_block to align with the resource needs of the application. The MPIExecutor can only run tasks that use more than one node.
Warning
Ensembles of tasks launched with mpiexec on multiple nodes are currently limited to 1000 total tasks run per batch job. This means when mpiexec calls return, the nodes they used can refill only a limited number of times, rather than an arbitrary number of times like on Polaris. This is due to a known issue with Slingshot and will be fixed in the future. Users running MPI application ensembles on Aurora with Parsl should take this into account when configuring their workflows.
This example Config object can be used to execute MPI tasks that use two nodes each:
importparslimportosfromparsl.configimportConfig# PBSPro is the right provider for ALCF:fromparsl.providersimportPBSProProvider# The MPIExecutor is for running MPI applications:fromparsl.executorsimportMPIExecutor# Use the Simple launcherfromparsl.launchersimportSimpleLauncher# These options will run work in 10 node batch jobs run one at a timenodes_per_task=2nodes_per_job=10max_num_jobs=1# We will save outputs in the current working directoryworking_directory=os.getcwd()mpi_ensemble_config=Config(executors=[MPIExecutor(# This creates 1 worker for each multinode task slotmax_workers_per_block=nodes_per_job//nodes_per_task,provider=PBSProProvider(account="Aurora_deployment",worker_init=f"""source $HOME/_env/bin/activate; \ cd {working_directory}""",walltime="0:30:00",queue="lustre_scaling",scheduler_options="#PBS -l filesystems=home:flare",launcher=SimpleLauncher(),select_options="",nodes_per_block=nodes_per_job,max_blocks=1,cpus_per_node=208,),),],retries=1,)
This example workflow uses this Config to run an ensemble of 2-node MPI tasks:
# my_parsl_workflow.pyimportosimportparslfromparslimportbash_appfromconfigimportmpi_ensemble_config# This app will run the hello_affinity application with mpiexec# Using the set_affinity_gpu_aurora.sh script will bind each mpi rank to a gpu tile@bash_appdefmpi_hello_affinity(parsl_resource_specification,depth=8,stdout='mpi_hello.stdout',stderr='mpi_hello.stderr'):APP_DIR="$HOME/GettingStarted"# PARSL_MPI_PREFIX will resolve to `mpiexec -n num_ranks -ppn ranks_per_node -hosts NODE001,NODE002`returnf"$PARSL_MPI_PREFIX --cpu-bind depth --depth={depth}{APP_DIR}/HelperScripts/Aurora/set_affinity_gpu_aurora.sh {APP_DIR}/Examples/Aurora/affinity_gpu/sycl/hello_affinity"print("Starting my_parsl_workflow")working_directory=os.getcwd()withparsl.load(mpi_ensemble_config):task_futures=[]# Create 2-node tasks# We set 12 ranks per node to match the number of gpu tiles on an aurora noderesource_specification={'num_nodes':2,# Number of nodes required for the application instance'ranks_per_node':12,# Number of ranks / application elements to be launched per node'num_ranks':24,# Number of ranks in total}print(f"Creating mpi tasks with {resource_specification['num_nodes']} nodes per task, {resource_specification['num_ranks']} ranks per task, and {resource_specification['ranks_per_node']} ranks per node")task_futures+=[mpi_hello_affinity(parsl_resource_specification=resource_specification,stdout=f"{working_directory}/mpi_output/{i}/hello.stdout",stderr=f"{working_directory}/mpi_output/{i}/hello.stderr")foriinrange(10)]# This loop will block until all task results are returnedprint(f"{len(task_futures)} tasks created, wating for completion")fortfintask_futures:tf.result()print("Tasks done!")
If your tasks can be run within a single PBS job, Parsl can be configured to run inside the PBS job, instead of submitting multiple jobs to the scheduler as shown in the examples above.
To run the single tile task ensemble from above in this alternate mode, use this Config object in the workflow script:
# config.pyimportosfromparsl.configimportConfig# Use LocalProvider to launch workers within a submitted batch jobfromparsl.providersimportLocalProvider# The high throughput executor is for scaling large single core/tile/gpu tasks on HPC system:fromparsl.executorsimportHighThroughputExecutor# Use the MPI launcher to launch worker processes:fromparsl.launchersimportMpiExecLaunchertile_names=[f'{gid}.{tid}'forgidinrange(6)fortidinrange(2)]# The config will launch workers from this directoryexecute_dir=os.getcwd()# Get the number of nodes:node_file=os.getenv("PBS_NODEFILE")withopen(node_file,"r")asf:node_list=f.readlines()num_nodes=len(node_list)aurora_single_tile_config=Config(executors=[HighThroughputExecutor(# Ensures one worker per GPU tile on each nodeavailable_accelerators=tile_names,max_workers_per_node=12,# Distributes threads to workers/tiles in a way optimized for Auroracpu_affinity="list:0-7,104-111:8-15,112-119:16-23,120-127:24-31,128-135:32-39,136-143:40-47,144-151:52-59,156-163:60-67,164-171:68-75,172-179:76-83,180-187:84-91,188-195:92-99,196-203",# Increase if you have many more tasks than workersprefetch_capacity=0,# Options that specify properties of PBS Jobsprovider=LocalProvider(# Number of nodes jobnodes_per_block=num_nodes,launcher=MpiExecLauncher(bind_cmd="--cpu-bind",overrides="--ppn 1"),init_blocks=1,max_blocks=1,),),],)
Then submit the the workflow with a PBS batch script: