Source code for ewoksdask.schedulers

from dask.distributed import LocalCluster
from dask_jobqueue import SLURMCluster


[docs]def local_scheduler(**kw): """ :param n_workers: :param processes: True by default :param threads_per_worker: :param scheduler_port: :param dashboard_address: :param worker_dashboard_address: """ # Run this on any machine kw.setdefault("n_workers", 1) cluster = LocalCluster(**kw) print("Address:", cluster.scheduler_address) print("Dashboard:", cluster.dashboard_link) return cluster
[docs]def slurm_scheduler(**kw): """ :param address: :param n_workers: :param minimum_jobs: :param maximum_jobs: :param cores: :param processes: :param memory: """ # Run this on slurm-access # Parameters for each execute_graph: kw.setdefault("project", "esrftaskgraph") kw.setdefault("walltime", "01:00:00") kw.setdefault("cores", 2) kw.setdefault("processes", 1) kw.setdefault("memory", "1GB") minimum_jobs = kw.pop("minimum_jobs", 0) maximum_jobs = kw.pop("maximum_jobs", 0) kw.setdefault("n_workers", int(not maximum_jobs)) cluster = SLURMCluster(**kw) if maximum_jobs: cluster.adapt(minimum_jobs=minimum_jobs, maximum_jobs=maximum_jobs) print("Address:", cluster.scheduler_address) print("Dashboard:", cluster.dashboard_link) return cluster