Executor#
Execute a function on SLURM with an concurrent.futures.Executor
implementation called SlurmRestExecutor
from pyslurmutils.concurrent.futures import SlurmRestExecutor
with SlurmRestExecutor(
url=url, # SLURM REST URL
user_name=user_name, # SLURM user name
token=token, # SLURM access token
log_directory="/path/to/log", # for log files
data_directory="/path/to/data", # TCP communication when not provided
pre_script="module load ewoks", # load environment
parameters={"time_limit": "02:00:00"} # SLURM job parameters
python_cmd="python", # python command (python3 by default)
) as executor:
future = executor.submit(sum, [1, 1])
assert future.result() == 2
The python environment can be selected with the pre_script
argument.
Since pyslurmutils only relies on python builtins on the SLURM side,
no specific libraries are required. When data_directory
is not provided,
the SLURM job connects to a TCP port opened locally before submitting
the job.
Warning
The function is pickled locally and unpickled by the SLURM job which means it needs to have access to the module it is defined in. When the function is defined locally in a script, the source code is send instead of pickling. Even so, everything used in the function needs to exist remotely.
Multiple tasks per job#
By default every time you submit a task, a SLURM job is scheduled. To allow SLURM jobs to execute
more that one task, use the max_tasks_per_worker
argument. In addition, when more that one task
is executed in one SLURM job, you may want to initialize the SLURM job when it starts.
For example run two tasks per SLURM job and initialize a global variable
import os
import socket
def initializer():
global GLOBAL_VAR
GLOBAL_VAR = 0
def job():
global GLOBAL_VAR
GLOBAL_VAR += 1
host = socket.gethostname()
pid = os.getpid()
ncpus = len(os.sched_getaffinity(pid))
return f"{host=}, {pid=}, {ncpus=}, {GLOBAL_VAR=}"
if __name__ == "__main__":
from concurrent.futures import as_completed
with SlurmRestExecutor(
url=url,
token=token,
user_name=user_name,
max_tasks_per_worker=2,
initializer=initializer,
log_directory=log_directory,
) as executor:
futures = [executor.submit(job) for _ in range(16)]
for future in as_completed(futures):
print(future.result())
Warning
Note the __name__
statement. When not provided it gets executed by the SLURM job
since this is a local script, in which case the source code is send and execution by the SLURM job.
Keep SLURM jobs alive#
Keep SLURM jobs alive within the executor context
with SlurmRestExecutor(
url=url,
token=token,
user_name=user_name,
max_workers=4,
lazy_scheduling=False,
initializer=initializer,
log_directory=log_directory,
) as executor:
futures = [executor.submit(job) for _ in range(16)]
for future in as_completed(futures):
print(future.result())
This starts four SLURM jobs when entering the context and restarts them
when they go down due to max_tasks_per_worker
or a failure.