Skip to content

Parsl on Aurora

Parsl is a parallel programming library for Python. It can be used to deploy large numbers of tasks in parallel and with complex dependencies on ALCF machines, and is particularly well suited to run high-throughput workflows. While Parsl is a Python library, it can execute tasks that run any compiled application. Parsl can also execute tasks that run mpi applications.

Parsl uses Python's concurrent futures module to create functions that return a Python futures object. A Parsl workflow operates by creating futures for tasks that the Parsl executor will then fulfill by running them on available compute resources.

A Parsl workflow contains two parts: * the workflow logic of applications, tasks and task dependencies * the configuration of compute resources that execute tasks

Here we sketch out some possible configurations for executing workflows on Aurora. These docs were written for Parsl 2025.1.13.

Installation and Setup

Parsl is a Python library and can be installed with pip. For example, in a Python virtual environment:

1
2
3
python -m venv $HOME/_env
source $HOME/_env/bin/activate
pip install parsl

When using Parsl to distribute work over many PBS Jobs (first two examples below), your workflow script will be executed on a login node and will not return until all tasks are completed. In this situation, it is advisable to run your script in a screen session on the login node.

Parsl Config for a Large Ensemble of Single Tile tasks run over many PBS Jobs

A common use case is to run a large ensemble of tasks that each require one GPU tile on Aurora and to spread this workload over multiple PBS Jobs. The reason for spreading this workload over many PBS Jobs may be the size of the ensemble and/or the runtime of the tasks.

The Config object for this case is defined like this:

config.py
# config.py
import os
from parsl.config import Config

# PBSPro is the right provider for ALCF:
from parsl.providers import PBSProProvider
# The high throughput executor is for scaling large single core/tile/gpu tasks on HPC system:
from parsl.executors import HighThroughputExecutor
# Use the MPI launcher to launch worker processes:
from parsl.launchers import MpiExecLauncher

# These options will run work in 1 node batch jobs run one at a time
nodes_per_job = 1
max_num_jobs = 1
tile_names = [f'{gid}.{tid}' for gid in range(6) for tid in range(2)]

# The config will launch workers from this directory
execute_dir = os.getcwd()

aurora_single_tile_config = Config(
    executors=[
        HighThroughputExecutor(
            # Ensures one worker per GPU tile on each node
            available_accelerators=tile_names,
            max_workers_per_node=12,
            # Distributes threads to workers/tiles in a way optimized for Aurora 
            cpu_affinity="list:0-7,104-111:8-15,112-119:16-23,120-127:24-31,128-135:32-39,136-143:40-47,144-151:52-59,156-163:60-67,164-171:68-75,172-179:76-83,180-187:84-91,188-195:92-99,196-203",
            # Increase if you have many more tasks than workers
            prefetch_capacity=0,
            # Options that specify properties of PBS Jobs
            provider=PBSProProvider(
                # Project name
                account="Aurora_deployment",
                # Submission queue
                queue="debug",
                # Commands run before workers launched
                # Make sure to activate your environment where Parsl is installed
                worker_init=f'''source $HOME/_env/bin/activate; cd {execute_dir}''',
                # Wall time for batch jobs
                walltime="0:30:00",
                # Change if data/modules located on other filesystem
                scheduler_options="#PBS -l filesystems=home:flare",
                # Ensures 1 manger per node; the manager will distribute work to its 12 workers, one per tile
                launcher=MpiExecLauncher(bind_cmd="--cpu-bind", overrides="--ppn 1"),
                # options added to #PBS -l select aside from ncpus
                select_options="",
                # Number of nodes per PBS job
                nodes_per_block=nodes_per_job,
                # Minimum number of concurrent PBS jobs running workflow
                min_blocks=0,
                # Maximum number of concurrent PBS jobs running workflow
                max_blocks=max_num_jobs,
                # Hardware threads per node
                cpus_per_node=208,
            ),
        ),
    ],
    # How many times to retry failed tasks
    # this is necessary if you have tasks that are interrupted by a PBS job ending
    # so that they will restart in the next job
    retries=1,
)

Import this Config object and use in a workflow script, e.g.:

my_parsl_workflow.py
# my_parsl_workflow.py
import os
import parsl
from parsl import bash_app, python_app

from config import aurora_single_tile_config

# Bash apps are for executing compiled applications or other shell commands
@bash_app
def hello_affinity(stdout='hello.stdout', stderr='hello.stderr'):
    return f'$HOME/GettingStarted/Examples/Aurora/affinity_gpu/sycl/hello_affinity'

# Python apps are for executing native python functions
@python_app
def hello_world(message, sleep_time=1):
    import time
    time.sleep(sleep_time)
    return f"Hello {message}"

working_directory = os.getcwd()

print("Starting my_parsl_workflow")

with parsl.load(aurora_single_tile_config):

    # Create 12 hello_world tasks
    hello_world_futures = [hello_world(f"Aurora {i}") for i in range(12)]
    print(f"Created {len(hello_world_futures)} hello_world tasks")

    # Create 12 hello_affinity tasks
    hello_affinity_futures = [hello_affinity(stdout=f"{working_directory}/output/hello_{i}.stdout",
                                             stderr=f"{working_directory}/output/hello_{i}.stderr")
                              for i in range(12)]
    print(f"Created {len(hello_world_futures)} hello_affinity tasks")

    # This line will block until all hello_world results are returned
    hello_world_results = [tf.result() for tf in hello_world_futures]
    print("hello_world tasks complete")
    print(f"python apps like hello_world return the function result, e.g. {hello_world_results[0]=}")

    # This line will block until all hello_affinity results are returned
    hello_affinity_results = [tf.result() for tf in hello_affinity_futures]
    print("hello_affinity tasks complete")
    print(f"bash apps like hello_affinity return the return code of the executable, e.g. {hello_affinity_results[0]=}")

    print(f"Read results of hello_affinity from stdout file:")
    for i,tf in enumerate(hello_affinity_futures):
        with open(f"{working_directory}/output/hello_{i}.stdout", "r") as f:
            outputs = f.readlines()
            print(outputs)

    print("Tasks done!")
Note that a Parsl workflow script must block at some point on the result of all tasks that are created in order to ensure that the tasks complete.

To run this workflow script:

source $HOME/_env/bin/activate
python my_parsl_workflow.py

When executing this script, the script will block until all tasks are completed. You may wish to check the scheduler to verify that Parsl queues a job to execute the tasks.

Parsl Config for Ensemble of Multinode MPI tasks tasks run over many PBS Jobs

In the previous example, mpiexec was used as a launcher, rather than an executor. In order to run applications that have MPI communication, mpiexec has to be used a different way by Parsl. To run MPI applications, use the SimpleLauncher and the MPIExecutor. Note that the configuration has to set max_workers_per_block to align with the resource needs of the application. The MPIExecutor can only run tasks that use more than one node.

Warning

Ensembles of tasks launched with mpiexec on multiple nodes are currently limited to 1000 total tasks run per batch job. This means when mpiexec calls return, the nodes they used can refill only a limited number of times, rather than an arbitrary number of times like on Polaris. This is due to a known issue with Slingshot and will be fixed in the future. Users running MPI application ensembles on Aurora with Parsl should take this into account when configuring their workflows.

This example Config object can be used to execute MPI tasks that use two nodes each:

config.py
import parsl
import os
from parsl.config import Config
# PBSPro is the right provider for ALCF:
from parsl.providers import PBSProProvider
# The MPIExecutor is for running MPI applications:
from parsl.executors import MPIExecutor
# Use the Simple launcher
from parsl.launchers import SimpleLauncher

# These options will run work in 10 node batch jobs run one at a time
nodes_per_task = 2
nodes_per_job = 10
max_num_jobs = 1

# We will save outputs in the current working directory
working_directory = os.getcwd()

mpi_ensemble_config = Config(
    executors=[
        MPIExecutor(
            # This creates 1 worker for each multinode task slot
            max_workers_per_block=nodes_per_job//nodes_per_task, 
            provider=PBSProProvider(
                account="Aurora_deployment",
                worker_init=f"""source $HOME/_env/bin/activate; \
                                cd {working_directory}""",
                walltime="0:30:00",
                queue="lustre_scaling",
                scheduler_options="#PBS -l filesystems=home:flare",
                launcher=SimpleLauncher(),
                select_options="",
                nodes_per_block=nodes_per_job,
                max_blocks=1,
                cpus_per_node=208,
            ),
        ),
    ],
    retries=1,
)

This example workflow uses this Config to run an ensemble of 2-node MPI tasks:

my_parsl_workflow.py
# my_parsl_workflow.py
import os
import parsl
from parsl import bash_app

from config import mpi_ensemble_config

# This app will run the hello_affinity application with mpiexec
# Using the set_affinity_gpu_aurora.sh script will bind each mpi rank to a gpu tile
@bash_app
def mpi_hello_affinity(parsl_resource_specification, depth=8, stdout='mpi_hello.stdout', stderr='mpi_hello.stderr'):
    APP_DIR = "$HOME/GettingStarted"
    # PARSL_MPI_PREFIX will resolve to `mpiexec -n num_ranks -ppn ranks_per_node -hosts NODE001,NODE002`
    return f"$PARSL_MPI_PREFIX --cpu-bind depth --depth={depth} {APP_DIR}/HelperScripts/Aurora/set_affinity_gpu_aurora.sh {APP_DIR}/Examples/Aurora/affinity_gpu/sycl/hello_affinity"

print("Starting my_parsl_workflow")

working_directory = os.getcwd()

with parsl.load(mpi_ensemble_config):

    task_futures = []

    # Create 2-node tasks
    # We set 12 ranks per node to match the number of gpu tiles on an aurora node
    resource_specification = {'num_nodes': 2, # Number of nodes required for the application instance
                              'ranks_per_node': 12, # Number of ranks / application elements to be launched per node
                              'num_ranks': 24, # Number of ranks in total
                             }

    print(f"Creating mpi tasks with {resource_specification['num_nodes']} nodes per task, {resource_specification['num_ranks']} ranks per task, and {resource_specification['ranks_per_node']} ranks per node")
    task_futures += [mpi_hello_affinity(
                            parsl_resource_specification=resource_specification,
                            stdout=f"{working_directory}/mpi_output/{i}/hello.stdout",
                            stderr=f"{working_directory}/mpi_output/{i}/hello.stderr")
                        for i in range(10)]

    # This loop will block until all task results are returned
    print(f"{len(task_futures)} tasks created, wating for completion")
    for tf in task_futures:
        tf.result()

    print("Tasks done!")

Run Parsl Workflow within a single PBS Job

If your tasks can be run within a single PBS job, Parsl can be configured to run inside the PBS job, instead of submitting multiple jobs to the scheduler as shown in the examples above.

To run the single tile task ensemble from above in this alternate mode, use this Config object in the workflow script:

config.py
# config.py
import os
from parsl.config import Config

# Use LocalProvider to launch workers within a submitted batch job
from parsl.providers import LocalProvider
# The high throughput executor is for scaling large single core/tile/gpu tasks on HPC system:
from parsl.executors import HighThroughputExecutor
# Use the MPI launcher to launch worker processes:
from parsl.launchers import MpiExecLauncher

tile_names = [f'{gid}.{tid}' for gid in range(6) for tid in range(2)]

# The config will launch workers from this directory
execute_dir = os.getcwd()

# Get the number of nodes:
node_file = os.getenv("PBS_NODEFILE")
with open(node_file,"r") as f:
    node_list = f.readlines()
    num_nodes = len(node_list)

aurora_single_tile_config = Config(
    executors=[
        HighThroughputExecutor(
            # Ensures one worker per GPU tile on each node
            available_accelerators=tile_names,
            max_workers_per_node=12,
            # Distributes threads to workers/tiles in a way optimized for Aurora 
            cpu_affinity="list:0-7,104-111:8-15,112-119:16-23,120-127:24-31,128-135:32-39,136-143:40-47,144-151:52-59,156-163:60-67,164-171:68-75,172-179:76-83,180-187:84-91,188-195:92-99,196-203",
            # Increase if you have many more tasks than workers
            prefetch_capacity=0,
            # Options that specify properties of PBS Jobs
            provider=LocalProvider(
                # Number of nodes job
                nodes_per_block=num_nodes,
                launcher=MpiExecLauncher(bind_cmd="--cpu-bind", overrides="--ppn 1"),
                init_blocks=1,
                max_blocks=1,
            ),
        ),
    ],
)

Then submit the the workflow with a PBS batch script:

#!/bin/bash -l
#PBS -l select=1
#PBS -l place=scatter
#PBS -l walltime=0:30:00
#PBS -q debug
#PBS -A Aurora_deployment

cd ${PBS_O_WORKDIR}

source $HOME/_env/bin/activate
python my_workflow_script.py

Note that if the workflow does not complete before the end of the PBS job, outstanding tasks will not complete.