Executor#

Execute a function on SLURM with an concurrent.futures.Executor implementation called SlurmRestExecutor

from pyslurmutils.concurrent.futures import SlurmRestExecutor

with SlurmRestExecutor(
    url=url,                               # SLURM REST URL
    user_name=user_name,                   # SLURM user name
    token=token,                           # SLURM access token
    log_directory="/path/to/log",          # for log files
    data_directory="/path/to/data",        # TCP communication when not provided
    pre_script="module load ewoks",        # load environment
    parameters={"time_limit": "02:00:00"}  # SLURM job parameters
    python_cmd="python",                   # python command (python3 by default)
) as executor:
    future = executor.submit(sum, [1, 1])
    assert future.result() == 2

The python environment can be selected with the pre_script argument. Since pyslurmutils only relies on python builtins on the SLURM side, no specific libraries are required. When data_directory is not provided, the SLURM job connects to a TCP port opened locally before submitting the job.

Warning

The function is pickled locally and unpickled by the SLURM job which means it needs to have access to the module it is defined in. When the function is defined locally in a script, the source code is send instead of pickling. Even so, everything used in the function needs to exist remotely.

Multiple tasks per job#

By default every time you submit a task, a SLURM job is scheduled. To allow SLURM jobs to execute more that one task, use the max_tasks_per_worker argument. In addition, when more that one task is executed in one SLURM job, you may want to initialize the SLURM job when it starts.

For example run two tasks per SLURM job and initialize a global variable

import os
import socket

def initializer():
    global GLOBAL_VAR
    GLOBAL_VAR = 0

def job():
    global GLOBAL_VAR
    GLOBAL_VAR += 1
    host = socket.gethostname()
    pid = os.getpid()
    ncpus = len(os.sched_getaffinity(pid))
    return f"{host=}, {pid=}, {ncpus=}, {GLOBAL_VAR=}"

if __name__ == "__main__":
    from concurrent.futures import as_completed

    with SlurmRestExecutor(
        url=url,
        token=token,
        user_name=user_name,
        max_tasks_per_worker=2,
        initializer=initializer,
        log_directory=log_directory,
    ) as executor:
        futures = [executor.submit(job) for _ in range(16)]
        for future in as_completed(futures):
            print(future.result())

Warning

Note the __name__ statement. When not provided it gets executed by the SLURM job since this is a local script, in which case the source code is send and execution by the SLURM job.

Keep SLURM jobs alive#

Keep SLURM jobs alive within the executor context

with SlurmRestExecutor(
    url=url,
    token=token,
    user_name=user_name,
    max_workers=4,
    lazy_scheduling=False,
    initializer=initializer,
    log_directory=log_directory,
) as executor:
    futures = [executor.submit(job) for _ in range(16)]
    for future in as_completed(futures):
        print(future.result())

This starts four SLURM jobs when entering the context and restarts them when they go down due to max_tasks_per_worker or a failure.