Advanced Examples#

Advanced executor usage#

"""Advanced example illustrating the following concepts:

* Maximum number of workers
* Maximum number of tasks per worker
* Worker initialization
* Local log level applied remotely
* Remote logs showing locally
* Interchangeable executors

When using the SLURM executor you need three environment variables
and also specify

.. code:: bash

    export SLURM_TOKEN=$(scontrol token lifespan=3600)
    export SLURM_API_VERSION="v0.0.41"  # optional
    export SLURM_URL=...
    export SLURM_USER=...

    python executor.py slurm --slurm-root-directory=/tmp_14_days

Comparison with thread of process based executors

.. code:: bash

    python executor.py thread
    python executor.py process
"""

import os
import sys
import time
import socket
import getpass
import logging
import argparse

from concurrent.futures import as_completed
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor

from pyslurmutils.concurrent.futures import SlurmRestExecutor


def initializer():
    global GLOBAL_VAR
    GLOBAL_VAR = 0


def job():
    global GLOBAL_VAR
    GLOBAL_VAR += 1
    host = socket.gethostname()
    pid = os.getpid()
    ncpus = len(os.sched_getaffinity(pid))
    for i in range(5):
        print(f"print {i} (GLOBAL_VAR={GLOBAL_VAR})")
        logging.debug("log debug %d", i)
        logging.info("log info %d", i)
        logging.warning("log warning %d", i)
        logging.error("log error %d", i)
        time.sleep(0.1)
    return f"{host=}, {pid=}, {ncpus=}, {GLOBAL_VAR=}"


def executor_context(executor_type, slurm_root_directory, **kwargs):
    if executor_type == "slurm":
        url = os.environ.get("SLURM_URL")
        token = os.environ.get("SLURM_TOKEN")
        user_name = os.environ.get("SLURM_USER", getpass.getuser())

        if not slurm_root_directory:
            raise ValueError("--slurm-root-directory required")
        log_directory = os.path.join(slurm_root_directory, user_name, "slurm_logs")

        return SlurmRestExecutor(
            url=url,
            token=token,
            user_name=user_name,
            log_directory=log_directory,
            data_directory=None,
            std_split=False,
            **kwargs,
        )

    kwargs.pop("max_tasks_per_worker", None)

    if executor_type == "process":
        return ProcessPoolExecutor(**kwargs)
    if executor_type == "thread":
        return ThreadPoolExecutor(**kwargs)

    return ValueError(executor_type)


def main(executor_type, slurm_root_directory, log_level):
    logging.basicConfig(level=log_level, stream=sys.stdout)

    results = list()
    with executor_context(
        executor_type,
        slurm_root_directory,
        max_workers=2,
        max_tasks_per_worker=2,
        initializer=initializer,
    ) as executor:
        futures = [executor.submit(job) for _ in range(6)]
        for future in as_completed(futures):
            results.append(future.result())

    print()
    print("Results:")
    for result in results:
        print(result)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Run jobs using different executor types."
    )
    parser.add_argument(
        "executor_type",
        choices=["process", "thread", "slurm"],
        help="The type of executor to use (process, thread, slurm).",
    )
    parser.add_argument(
        "--slurm-root-directory",
        type=str,
        default=None,
        help="For logs files when using slurm.",
    )
    parser.add_argument(
        "--log-level",
        type=lambda s: getattr(logging, s.upper()),
        default=logging.INFO,
        help="For logs.",
    )

    args = parser.parse_args()
    main(args.executor_type, args.slurm_root_directory, args.log_level)

Pre-emptive job scheduling#

"""Advanced example illustrating pre-emptive job scheduling.

.. code:: bash

    export SLURM_TOKEN=$(scontrol token lifespan=3600)
    export SLURM_API_VERSION="v0.0.41"  # optional
    export SLURM_URL=...
    export SLURM_USER=...

    python preemptive.py --slurm-root-directory=/tmp_14_days
"""

import os
import sys
import time
import logging
import getpass
import argparse

from pyslurmutils.concurrent.futures import SlurmRestExecutor

logger = logging.getLogger(__name__)


def main(slurm_root_directory, log_level):
    logging.basicConfig(level=log_level, stream=sys.stdout)

    url = os.environ.get("SLURM_URL")
    token = os.environ.get("SLURM_TOKEN")
    user_name = os.environ.get("SLURM_USER", getpass.getuser())

    log_directory = os.path.join(slurm_root_directory, user_name, "slurm_logs")

    with SlurmRestExecutor(
        url=url,
        token=token,
        user_name=user_name,
        log_directory=log_directory,
        lazy_scheduling=False,
        max_workers=2,
        max_tasks_per_worker=10,
        parameters={"time_limit": "00:01:00"},
    ) as executor:

        logger.info("execute a job ...")
        assert executor.submit(sum, [1, 1]).result() == 2

        logger.info("wait longer than the job time limit ...")
        time.sleep(120)

        logger.info("execute a job ...")
        assert executor.submit(sum, [1, 1]).result() == 2


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Jobs stay alive within the executor context."
    )
    parser.add_argument(
        "--slurm-root-directory",
        type=str,
        required=True,
        help="For logs files.",
    )
    parser.add_argument(
        "--log-level",
        type=lambda s: getattr(logging, s.upper()),
        default=logging.INFO,
        help="For logs.",
    )

    args = parser.parse_args()
    main(args.slurm_root_directory, args.log_level)