"""Advanced example illustrating the following concepts:
* Maximum number of workers
* Maximum number of tasks per worker
* Worker initialization
* Local log level applied remotely
* Remote logs showing locally
* Interchangeable executors
When using the SLURM executor you need three environment variables
and also specify
.. code:: bash
export SLURM_TOKEN=$(scontrol token lifespan=3600)
export SLURM_API_VERSION="v0.0.41" # optional
export SLURM_URL=...
export SLURM_USER=...
python executor.py slurm --slurm-root-directory=/tmp_14_days
Comparison with thread of process based executors
.. code:: bash
python executor.py thread
python executor.py process
"""
import os
import sys
import time
import socket
import getpass
import logging
import argparse
from concurrent.futures import as_completed
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from pyslurmutils.concurrent.futures import SlurmRestExecutor
def initializer():
global GLOBAL_VAR
GLOBAL_VAR = 0
def job():
global GLOBAL_VAR
GLOBAL_VAR += 1
host = socket.gethostname()
pid = os.getpid()
ncpus = len(os.sched_getaffinity(pid))
for i in range(5):
print(f"print {i} (GLOBAL_VAR={GLOBAL_VAR})")
logging.debug("log debug %d", i)
logging.info("log info %d", i)
logging.warning("log warning %d", i)
logging.error("log error %d", i)
time.sleep(0.1)
return f"{host=}, {pid=}, {ncpus=}, {GLOBAL_VAR=}"
def executor_context(executor_type, slurm_root_directory, **kwargs):
if executor_type == "slurm":
url = os.environ.get("SLURM_URL")
token = os.environ.get("SLURM_TOKEN")
user_name = os.environ.get("SLURM_USER", getpass.getuser())
if not slurm_root_directory:
raise ValueError("--slurm-root-directory required")
log_directory = os.path.join(slurm_root_directory, user_name, "slurm_logs")
return SlurmRestExecutor(
url=url,
token=token,
user_name=user_name,
log_directory=log_directory,
data_directory=None,
std_split=False,
**kwargs,
)
kwargs.pop("max_tasks_per_worker", None)
if executor_type == "process":
return ProcessPoolExecutor(**kwargs)
if executor_type == "thread":
return ThreadPoolExecutor(**kwargs)
return ValueError(executor_type)
def main(executor_type, slurm_root_directory, log_level):
logging.basicConfig(level=log_level, stream=sys.stdout)
results = list()
with executor_context(
executor_type,
slurm_root_directory,
max_workers=2,
max_tasks_per_worker=2,
initializer=initializer,
) as executor:
futures = [executor.submit(job) for _ in range(6)]
for future in as_completed(futures):
results.append(future.result())
print()
print("Results:")
for result in results:
print(result)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run jobs using different executor types."
)
parser.add_argument(
"executor_type",
choices=["process", "thread", "slurm"],
help="The type of executor to use (process, thread, slurm).",
)
parser.add_argument(
"--slurm-root-directory",
type=str,
default=None,
help="For logs files when using slurm.",
)
parser.add_argument(
"--log-level",
type=lambda s: getattr(logging, s.upper()),
default=logging.INFO,
help="For logs.",
)
args = parser.parse_args()
main(args.executor_type, args.slurm_root_directory, args.log_level)