Source code for

# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
# Copyright (c) 2016 - now
# Max-Planck-Institute of Neurobiology, Munich, Germany
# Authors: Philipp Schubert, Sven Dorkenwald, Jörgen Kornfeld
import dill  # supports pickling of lambda expressions

from . import log_mp
from .mp_utils import start_multiprocess_imap
from .. import global_params
from ..handler.basics import temp_seed, str_delta_sec
from ..handler.config import initialize_logging

import pickle as pkl
import threading
import getpass
import glob
import os
import io
import re
import datetime
from typing import Dict, Optional
import shutil
import string
import subprocess
import tqdm
import sys
import socket
import time
import numpy as np
from multiprocessing import cpu_count
from logging import Logger
import logging

[docs]def batchjob_enabled() -> bool: """ Checks if active batch processing system is actually working. Returns: True if either SLURM or QSUB is active. """ batch_proc_system = global_params.config['batch_proc_system'] if batch_proc_system is None or batch_proc_system == 'None': return False try: if batch_proc_system == 'QSUB': cmd_check = 'qstat' elif batch_proc_system == 'SLURM': cmd_check = 'squeue' else: raise NotImplementedError with open(os.devnull, 'w') as devnull: subprocess.check_call(cmd_check, shell=True, stdout=devnull, stderr=devnull) except subprocess.CalledProcessError as e: logging.warning("BatchJobSystem '{}' specified but failed with error '{}' not found," " switching to single node multiprocessing.".format(batch_proc_system, e)) return False return True
path_to_scripts_default = global_params.config.batchjob_script_folder username = getpass.getuser() python_path_global = sys.executable
[docs]def batchjob_script(params: list, name: str, batchjob_folder: Optional[str] = None, n_cores: int = 1, additional_flags: str = '', suffix: str = "", job_name: str = "default", script_folder: Optional[str] = None, max_iterations: int = 10, python_path: Optional[str] = None, disable_batchjob: bool = False, use_dill: bool = False, remove_jobfolder: bool = False, log: Logger = None, sleep_time: Optional[int] = None, show_progress: bool = True, overwrite: bool = False, exclude_nodes: Optional[list] = None): """ Submits batch jobs to process a list of parameters `params` with a python script on the specified environment (either None, SLURM or QSUB; run ``global_params.config['batch_proc_system']`` to get the active system). Notes: * The memory available for each job is coupled to the number of cores per job (`n_cores`). Todo: * Add sbatch array support -> faster submission * Make script specification more generic Args: params: List of all parameter sets to be processed. name: Name of batch job submitted via the batch processing system. batchjob_folder: Directory which contains all submission relevant files, e.g. bash scripts, logs, output files, .. Defaults to ``"{}/{}_folder{}/".format(global_params.config.qsub_work_folder, name, suffix)``. n_cores: Number of cores used for each job. additional_flags: Used to set additional parameters for each job. To allocate one GPU for each worker use: ``additional_flags=--gres=gpu:1``. suffix: Suffix added to `batchjob_folder`. job_name: Name of the jobs submitted via the batch processing system. Defaults to a random string of 8 letters. script_folder: Directory where to look for the script which is executed. Looks for ``QSUB_{name}.py``. max_iterations: Maximum number of retries of failed jobs. python_path: Path to python binary. disable_batchjob: Use single node multiprocessing. use_dill: Use dill to enable pickling of lambda expressions. remove_jobfolder: Remove `batchjob_folder` after successful termination. remove_jobfolder: log: Logger. sleep_time: Sleep duration before checking batch job states again. show_progress: Only used if ``disabled_batchjob=True``. overwrite: exclude_nodes: Nodes to exclude during job submission. """ starttime ="%m.%d") # Parameter handling if n_cores is None: n_cores = 1 if sleep_time is None: sleep_time = 5 if python_path is None: python_path = python_path_global if job_name == "default": with temp_seed(hash(time.time()) % (2 ** 32 - 1)): letters = string.ascii_lowercase job_name = "".join([letters[le] for le in np.random.randint(0, len(letters), 8)]) if batchjob_folder is None: batchjob_folder = f"{global_params.config.qsub_work_folder}/{name}{suffix}_{job_name}/" if os.path.exists(batchjob_folder): if not overwrite: raise FileExistsError(f'Batchjob folder already exists at "{batchjob_folder}". Please' f' make sure it is safe for deletion, then set overwrite=True') shutil.rmtree(batchjob_folder, ignore_errors=True) batchjob_folder = batchjob_folder.rstrip('/') # Check if fallback is required if disable_batchjob or not batchjob_enabled(): return batchjob_fallback(params, name, n_cores, suffix, script_folder, python_path, show_progress=show_progress, remove_jobfolder=remove_jobfolder, log=log, overwrite=True, job_folder=batchjob_folder) if log is None: log_batchjob = initialize_logging("{}".format(name + suffix), log_dir=batchjob_folder) else: log_batchjob = log if script_folder is not None: path_to_scripts = script_folder else: path_to_scripts = path_to_scripts_default path_to_script = f'{path_to_scripts}/QSUB_{name}.py' if not os.path.exists(path_to_script): if os.path.exists(f'{path_to_scripts}/batchjob_{name}.py'): path_to_script = f'{path_to_scripts}/batchjob_{name}.py' else: raise FileNotFoundError(f'Specified script does not exist: {path_to_script}') if global_params.config['batch_proc_system'] != 'SLURM': msg = ('"batchjob_script" currently does not support any other batch processing ' 'system than SLURM.') log_batchjob.error(msg) raise NotImplementedError(msg) cpus_per_node = global_params.config['ncores_per_node'] mem_lim = int(global_params.config['mem_per_node'] / cpus_per_node) if '--mem' not in additional_flags: additional_flags += ' --mem-per-cpu={}M'.format(mem_lim) if exclude_nodes is None: exclude_nodes = global_params.config['slurm']['exclude_nodes'] if exclude_nodes is not None: additional_flags += f' --exclude={",".join(exclude_nodes)}' log_batchjob.debug(f'Excluding slurm nodes: {",".join(exclude_nodes)}') # Start SLURM job if len(job_name) > 8: msg = "job_name is longer than 8 characters. This is untested." log_batchjob.error(msg) raise ValueError(msg) 'Started BatchJob script "{}" ({}) (suffix="{}") with {} tasks, each' ' using {} core(s).'.format(name, job_name, suffix, len(params), n_cores)) # Create folder structure path_to_storage = "%s/storage/" % batchjob_folder path_to_sh = "%s/sh/" % batchjob_folder path_to_log = "%s/log/" % batchjob_folder path_to_err = "%s/err/" % batchjob_folder path_to_out = "%s/out/" % batchjob_folder if not os.path.exists(path_to_storage): os.makedirs(path_to_storage) if not os.path.exists(path_to_sh): os.makedirs(path_to_sh) if not os.path.exists(path_to_log): os.makedirs(path_to_log) if not os.path.exists(path_to_err): os.makedirs(path_to_err) if not os.path.exists(path_to_out): os.makedirs(path_to_out) # Submit jobs pbar = tqdm.tqdm(total=len(params), miniters=1, mininterval=1, leave=False) dtime_sub = 0 start_all = time.time() job_exec_dc = {} job2slurm_dc = {} # stores mapping of internal to SLURM job ID slurm2job_dc = {} # stores mapping of SLURM to internal job ID for job_id in range(len(params)): this_storage_path = path_to_storage + "job_%d.pkl" % job_id this_sh_path = path_to_sh + "" % job_id this_out_path = path_to_out + "job_%d.pkl" % job_id job_log_path = path_to_log + "job_%d.log" % job_id job_err_path = path_to_err + "job_%d.log" % job_id with open(this_sh_path, "w") as f: f.write("#!/bin/bash -l\n") f.write('export syconn_wd="{4}"\n{0} {1} {2} {3}'.format( python_path, path_to_script, this_storage_path, this_out_path, global_params.config.working_dir)) with open(this_storage_path, "wb") as f: for param in params[job_id]: if use_dill: dill.dump(param, f) else: pkl.dump(param, f) os.chmod(this_sh_path, 0o744) cmd_exec = "{0} --output={1} --error={2} --job-name={3} {4}".format( additional_flags, job_log_path, job_err_path, job_name, this_sh_path) if job_id == 0: log_batchjob.debug(f'Starting jobs with command "{cmd_exec}".') job_exec_dc[job_id] = cmd_exec job_cmd = f'sbatch --cpus-per-task={n_cores} {cmd_exec}' start = time.time() max_relaunch_cnt = 0 while True: process = subprocess.Popen(job_cmd, shell=True, stdout=subprocess.PIPE) out_str, err = process.communicate() if process.returncode != 0: if max_relaunch_cnt == 5: msg = f'Could not launch job with ID {job_id} and command "{job_cmd}".' log_batchjob.error(msg) raise RuntimeError(msg) log_batchjob.warning(f'Could not launch job with ID {job_id} with command "{job_cmd}"' f'for the {max_relaunch_cnt}. time.' f'Attempting again in 5s. Error raised: {err}') max_relaunch_cnt += 1 time.sleep(5) else: break slurm_id = int(re.findall(r'(\d+)', out_str.decode())[0]) job2slurm_dc[job_id] = slurm_id slurm2job_dc[slurm_id] = job_id dtime_sub += time.time() - start time.sleep(0.01) # wait for jobs to be in SLURM memory time.sleep(10) # requeue failed jobs for `max_iterations`-times js_dc = jobstates_slurm(job_name, starttime) requeue_dc = {k: 0 for k in job2slurm_dc} # use internal job IDs! nb_completed_compare = 0 last_failed = 0 while True: nb_failed = 0 # get internal job ids from current job dict job_ids = np.array(list(slurm2job_dc.values())) # get states of slurm jobs with the same ordering as 'job_ids' try: job_states = np.array([js_dc[k] for k in slurm2job_dc.keys()]) except KeyError as e: # sometimes new SLURM job is not yet in the SLURM cache. log_batchjob.warning(f'Did not find state of worker {e}\nFetching worker states ' f'again, SLURM cache may not have been updated yet.') time.sleep(5) js_dc = jobstates_slurm(job_name, starttime) job_states = np.array([js_dc[k] for k in slurm2job_dc.keys()]) # all jobs which are not running, completed or pending have failed for # some reason (states: failed, out_out_memory, ..). for j in job_ids[(job_states != 'COMPLETED') & (job_states != 'PENDING') & (job_states != 'RUNNING')]: if requeue_dc[j] == max_iterations: nb_failed += 1 continue # restart job if requeue_dc[j] == cpus_per_node: # TODO: use global_params NCORES_PER_NODE log_batchjob.warning(f'About to re-submit job {j} ({job2slurm_dc[j]}) ' f'which already was assigned the maximum number ' f'of available CPUs.') requeue_dc[j] = min(requeue_dc[j] + 1, cpus_per_node - n_cores) # n_cores is the base number of cores new_core_init = requeue_dc[j] - 1 # do not increase if failed first time # increment number of cores by one. job_cmd = f'sbatch --cpus-per-task={new_core_init + n_cores} {job_exec_dc[j]}' max_relaunch_cnt = 0 err_msg = None if time.time() - last_failed > 5: # if a job failed within the last 5 seconds, do not print the error # message (assume same error) try: with open(f"{path_to_err}/job_{j}.log") as f: err_msg = except FileNotFoundError as e: err_msg = f'FileNotFoundError: {e}' last_failed = time.time() if 'exceeded memory limit' in err_msg: err_msg = None # do not report message of OOM errors while True: process = subprocess.Popen(job_cmd, shell=True, stdout=subprocess.PIPE) out_str, err = process.communicate() if process.returncode != 0: if max_relaunch_cnt == 5: raise RuntimeError(f'Could not launch job with ID {j} ({job2slurm_dc[j]}) ' f'and command "{job_cmd}".') log_batchjob.warning(f'Could not re-launch job with ID {j} ({job2slurm_dc[j]}) ' f'with command "{job_cmd}" for the {max_relaunch_cnt}. ' f'time. Attempting again in 5s. Error raised: {err}') max_relaunch_cnt += 1 time.sleep(5) else: break slurm_id = int(re.findall(r'(\d+)', out_str.decode())[0]) slurm_id_orig = job2slurm_dc[j] del slurm2job_dc[slurm_id_orig] job2slurm_dc[j] = slurm_id slurm2job_dc[slurm_id] = j'Requeued job {j} ({requeue_dc[j]}/{max_iterations}). SLURM IDs: {slurm_id} (new), ' f'{slurm_id_orig} (old).') if err_msg is not None: log_batchjob.warning(f'Job {j} failed with: {err_msg}') nb_completed = np.sum(job_states == 'COMPLETED') pbar.update(nb_completed - nb_completed_compare) nb_completed_compare = nb_completed nb_finished = nb_completed + nb_failed # check actually running files if nb_finished == len(params): break time.sleep(sleep_time) js_dc = jobstates_slurm(job_name, starttime) pbar.close() dtime_all = time.time() - start_all dtime_all = str_delta_sec(dtime_all)"All jobs ({name}, {job_name}) have finished after " f"{dtime_all} ({dtime_sub:.1f}s submission): " f"{nb_completed} completed, {nb_failed} failed.") out_files = [fn for fn in glob.glob(path_to_out + "job_*.pkl") if'job_(\d+).pkl', fn)] if len(out_files) < len(params): msg = f'Batch processing error during execution of {name} in job ' \ f'\"{job_name}\": Found {len(out_files)}, expected {len(params)}.' log_batchjob.error(msg) raise ValueError(msg) if remove_jobfolder: _delete_folder_daemon(batchjob_folder, log_batchjob, job_name) return path_to_out
def _delete_folder_daemon(dirname, log, job_name, timeout=60): def _delete_folder(dn, lg, to=60): start = time.time() while to > time.time() - start: try: shutil.rmtree(dn) break except OSError as e: time.sleep(5) if time.time() - start > to: shutil.rmtree(dn, ignore_errors=True) if os.path.exists(dn): dn_del = f"{os.path.dirname(dn)}/DEL/{os.path.basename(dn)}_DEL" if os.path.exists(os.path.dirname(dn_del)): shutil.rmtree(os.path.dirname(dn_del), ignore_errors=True) os.makedirs(os.path.dirname(dn_del), exist_ok=True) shutil.move(dn, dn_del) lg.warning(f'Deletion of job folder "{dn}" timed out after {to}s.') t = threading.Thread(name=f'jobfold_delete_{job_name}', target=_delete_folder, args=(dirname, log, timeout)) t.setDaemon(True) t.start()
[docs]def batchjob_fallback(params, name, n_cores=1, suffix="", script_folder=None, python_path=None, remove_jobfolder=False, show_progress=True, log=None, overwrite=False, job_folder=None): """ # TODO: utilize log and error files ('path_to_err', path_to_log') Fallback method in case no batchjob submission system is available. Args: params list[Any]: name (str): n_cores (int): CPUs per job. suffix (str): script_folder (str): python_path (str): remove_jobfolder (bool): show_progress (bool): log: Logger Logger. overwrite: job_folder: Returns: str: Path to output. """ if python_path is None: python_path = python_path_global if job_folder is None: job_folder = "{}/{}_folder{}/".format(global_params.config.qsub_work_folder, name, suffix) if os.path.exists(job_folder): if not overwrite: raise FileExistsError(f'Batchjob folder already exists at "{job_folder}". ' f'Please make sure it is safe for deletion, then set overwrite=True') shutil.rmtree(job_folder, ignore_errors=True) job_folder = job_folder.rstrip('/') if log is None: log_batchjob = initialize_logging("{}".format(name + suffix), log_dir=job_folder) else: log_batchjob = log n_max_co_processes = cpu_count() n_max_co_processes = np.min([cpu_count() // n_cores, n_max_co_processes]) n_max_co_processes = np.min([n_max_co_processes, len(params)]) n_max_co_processes = np.max([n_max_co_processes, 1])'Started BatchJobFallback script "{name}" with {len(params)} tasks' f' using {n_max_co_processes} parallel jobs, each using {n_cores} core(s).') start = time.time() if script_folder is not None: path_to_scripts = script_folder else: path_to_scripts = path_to_scripts_default path_to_script = f'{path_to_scripts}/QSUB_{name}.py' if not os.path.exists(path_to_script): if os.path.exists(f'{path_to_scripts}/batchjob_{name}.py'): path_to_script = f'{path_to_scripts}/batchjob_{name}.py' else: raise FileNotFoundError(f'Specified script does not exist: {path_to_script}') path_to_storage = "%s/storage/" % job_folder path_to_sh = "%s/sh/" % job_folder path_to_log = "%s/log/" % job_folder path_to_err = "%s/err/" % job_folder path_to_out = "%s/out/" % job_folder if not os.path.exists(path_to_storage): os.makedirs(path_to_storage) if not os.path.exists(path_to_sh): os.makedirs(path_to_sh) if not os.path.exists(path_to_log): os.makedirs(path_to_log) if not os.path.exists(path_to_err): os.makedirs(path_to_err) if not os.path.exists(path_to_out): os.makedirs(path_to_out) multi_params = [] for i_job in range(len(params)): job_id = i_job this_storage_path = path_to_storage + "job_%d.pkl" % job_id this_sh_path = path_to_sh + "" % job_id this_out_path = path_to_out + "job_%d.pkl" % job_id with open(this_sh_path, "w") as f: f.write('#!/bin/bash -l\n') f.write('export syconn_wd="{4}"\n{0} {1} {2} {3}'.format( python_path, path_to_script, this_storage_path, this_out_path, global_params.config.working_dir)) with open(this_storage_path, "wb") as f: for param in params[i_job]: pkl.dump(param, f) os.chmod(this_sh_path, 0o744) cmd_exec = "sh {}".format(this_sh_path) multi_params.append(cmd_exec) out_str = start_multiprocess_imap(fallback_exec, multi_params, debug=False, show_progress=show_progress, nb_cpus=n_max_co_processes) out_files = glob.glob(path_to_out + "*.pkl") if len(out_files) < len(params): # report errors msg = 'Critical errors occurred during "{}". {}/{} Batchjob fallback worker ' \ 'failed.\n{}'.format(name, len(params) - len(out_files), len(params), out_str) log_mp.error(msg) log_batchjob.error(msg) raise ValueError(msg) elif len("".join(out_str)) != 0: msg = 'Warnings/errors occurred during ' \ '"{}".:\n{} See logs at {} for details.'.format(name, out_str, job_folder) log_mp.warning(msg) log_batchjob.warning(msg) if remove_jobfolder: # nfs might be slow and leaves .nfs files behind (possibly from the slurm worker) try: shutil.rmtree(job_folder) except OSError: job_folder_old = f"{os.path.dirname(job_folder)}/DEL/{os.path.basename(job_folder)}_DEL" log_batchjob.warning(f'Deletion of job folder "{job_folder}" was not complete. Moving to ' f'{job_folder_old}') if os.path.exists(os.path.dirname(job_folder_old)): shutil.rmtree(os.path.dirname(job_folder_old), ignore_errors=True) os.makedirs(os.path.dirname(job_folder_old), exist_ok=True) if os.path.exists(job_folder_old): shutil.rmtree(job_folder_old, ignore_errors=True) shutil.move(job_folder, job_folder_old) log_batchjob.debug('Finished "{}" after {:.2f}s.'.format(name, time.time() - start)) return path_to_out
[docs]def fallback_exec(cmd_exec): """ Helper function to execute commands via ``subprocess.Popen``. """ ps = subprocess.Popen(cmd_exec, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = ps.communicate() out_str = "" reported = False if 'error' in out.decode().lower() or 'error' in err.decode().lower() \ or 'killed' in out.decode().lower() or 'killed' in err.decode().lower() \ or 'segmentation fault' in out.decode().lower() \ or 'segmentation fault' in err.decode().lower(): reported = True out_str = out.decode() + err.decode() if not reported and ('warning' in out.decode().lower() or 'warning' in err.decode().lower()): out_str = out.decode() + err.decode() return out_str
[docs]def jobstates_slurm(job_name: str, start_time: str, max_retry: int = 10) -> Dict[int, str]: """ Generates a dictionary which stores the state of every job belonging to `job_name`. Args: job_name: start_time: The following formats are allowed: MMDD[YY] or MM/DD[/YY] or MM.DD[.YY], e.g. ``"%m.%d")``. max_retry: Number of retries for ``sacct`` SLURM query if failing (5s sleep in-between). Returns: Dictionary with the job states. (key: job ID, value: state) """ cmd_stat = f"sacct -b --name {job_name} -u {username} -S {start_time}" job_states = dict() cnt_retry = 0 while True: process = subprocess.Popen(cmd_stat, shell=True, stdout=subprocess.PIPE) out, err = process.communicate() if process.returncode != 0: log_mp.warning(f'Delaying SLURM job state queries due to an error. ' f'Attempting again in 5s. {err}') time.sleep(5) cnt_retry += 1 if cnt_retry == max_retry: log_mp.error(f'Could not query job states from SLURM: {err}\n' f'Aborting due to maximum number of retries.') break continue for line in out.decode().split('\n'): str_parsed = re.findall(r"(\d+)[\s,\t]+([A-Z]+)", line) if len(str_parsed) == 1: str_parsed = str_parsed[0] job_states[int(str_parsed[0])] = str_parsed[1] break return job_states
[docs]def nodestates_slurm() -> Dict[int, dict]: """ Generates a dictionary which stores the state of every job belonging to `job_name`. Args: Returns: Dictionary with the node states. (key: job ID, value: state dict) """ cmd_stat = f'sinfo -N -o "%20N %10t %10c %10m %10G"' # yields e.g. """ NODELIST STATE CPUS MEMORY GRES compute001 idle 32 208990 gpu:GP100G compute002 mix 32 208990 gpu:GP100G compute003 idle 32 208990 gpu:GP100G compute004 idle 32 208990 gpu:GP100G compute005 idle 32 208990 gpu:GP100G compute006 mix 32 208990 gpu:GP100G compute007 idle 32 208990 gpu:GP100G compute008 idle 32 208990 gpu:GP100G compute009 alloc 32 208990 gpu:GP100G compute010 idle 32 208990 gpu:GP100G compute011 idle 32 208990 gpu:GP100G compute012 dead 32 208990 gpu:GP100G """ node_states = dict() attr_keys = [('state', str), ('cpus', int), ('memory', int), ('gres', int)] process = subprocess.Popen(cmd_stat, shell=True, stdout=subprocess.PIPE) out, err = process.communicate() if process.returncode != 0: log_mp.error(f'Error when getting node states with sinfo: {err}') return node_states for line in out.decode().split('\n')[1:]: if len(line) == 0: continue node_uri, *str_parsed = re.findall(r"(\S+)", line) ndc = dict() for k, v in zip(attr_keys, str_parsed): if k[0] == 'gres': v = v.split(':')[-1] ndc[k[0]] = k[1](v) node_states[node_uri] = ndc return node_states
[docs]def number_of_running_processes(job_name): """ Calculates the number of running jobs using qstat/squeue Args: job_name (str): job_name as shown in qstats Returns: nb_jobs (int): number of running jobs """ if global_params.config['batch_proc_system'] == 'QSUB': cmd_stat = "qstat -u %s" % username elif global_params.config['batch_proc_system'] == 'SLURM': cmd_stat = "squeue -u %s" % username else: raise NotImplementedError process = subprocess.Popen(cmd_stat, shell=True, stdout=subprocess.PIPE) nb_lines = 0 for line in io.TextIOWrapper(process.stdout, encoding="utf-8"): if job_name[:10 if global_params.config['batch_proc_system'] == 'QSUB' else 8] in line: nb_lines += 1 return nb_lines
[docs]def delete_jobs_by_name(job_name): """ Deletes a group of jobs that have the same name Args: job_name (str): job_name as shown in qstats Returns: """ if global_params.config['batch_proc_system'] == 'QSUB': cmd_stat = "qstat -u %s" % username elif global_params.config['batch_proc_system'] == 'SLURM': cmd_stat = "squeue -u %s" % username else: raise NotImplementedError process = subprocess.Popen(cmd_stat, shell=True, stdout=subprocess.PIPE) job_ids = [] for line in iter(process.stdout.readline, ''): curr_line = str(line) if job_name[:10] in curr_line: job_ids.append(re.findall(r"[\d]+", curr_line)[0]) if global_params.config['batch_proc_system'] == 'QSUB': cmd_del = "qdel " for job_id in job_ids: cmd_del += job_id + ", " command = cmd_del[:-2] subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) elif global_params.config['batch_proc_system'] == 'SLURM': cmd_del = "scancel -n {}".format(job_name) subprocess.Popen(cmd_del, shell=True, stdout=subprocess.PIPE) else: raise NotImplementedError
[docs]def restart_nodes_daemon(): """ Only support gce. [WIP] Returns: """ zone = 'us-east1-c' cluster_name = 'slurm-gluster-gpu-' """ gcloud compute instances start slurm-gluster-gpu-client001 --zone us-east1-c""" process = subprocess.Popen('gcloud compute instances stop --help', shell=True, stdout=subprocess.PIPE) out, err = process.communicate() if process.returncode != 0: log_mp.error(f'Could not run gcloud compute instances stop command. Error: {err}') raise ValueError log_mp.debug('Restart-slurm-nodes daemon running..') while True: node_states = nodestates_slurm() ixs = np.array(list(node_states.keys())) states = np.array(['down' in v['state'] for v in node_states.values()]) if np.sum(states) > 0: node_ix_str = " ".join([f'{cluster_name}{ix}' for ix in ixs[states]]) log_mp.debug(f'Restarting {np.sum(states)} node(s): "{node_ix_str}".') process = subprocess.Popen(f'gcloud compute instances stop {node_ix_str} --zone {zone}', shell=True, stdout=subprocess.PIPE) out, err = process.communicate() if process.returncode != 0: log_mp.warning(f'Could not run "gcloud compute instances stop" command. Error: {err}') process = subprocess.Popen(f'gcloud compute instances start {node_ix_str} --zone {zone}', shell=True, stdout=subprocess.PIPE) out, err = process.communicate() if process.returncode != 0: log_mp.warning(f'Could not run "gcloud compute instances start" command. Error: {err}') time.sleep(30) # wait additional 30 s to give slurm time to start and update status states = np.array(['drain' in v['state'] for v in node_states.values()]) if np.sum(states) > 0: nodenames = f'client[{",".join(ix.replace("client", "") for ix in ixs)}]' process = subprocess.Popen(f'sudo scontrol update nodename={nodenames} state=RESUME', shell=True, stdout=subprocess.PIPE) out, err = process.communicate() if process.returncode != 0: log_mp.warning(f'Could not run "scontrol update" command. Error: {err}') time.sleep(30) # wait additional 30 s to give slurm time to start and update status time.sleep(30)