# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max-Planck-Institute of Neurobiology, Munich, Germany
# Authors: Philipp Schubert, Sven Dorkenwald, Jörgen Kornfeld
import dill # supports pickling of lambda expressions
from . import log_mp
from .mp_utils import start_multiprocess_imap
from .. import global_params
from ..handler.basics import temp_seed, str_delta_sec
from ..handler.config import initialize_logging
import pickle as pkl
import threading
import getpass
import glob
import os
import io
import re
import datetime
from typing import Dict, Optional
import shutil
import string
import subprocess
import tqdm
import sys
import socket
import time
import numpy as np
from multiprocessing import cpu_count
from logging import Logger
import logging
[docs]def batchjob_enabled() -> bool:
"""
Checks if the active batch processing system is operational. The function
checks if either SLURM or QSUB is active.
Returns:
bool: Returns True if either SLURM or QSUB is active, else returns False.
"""
batch_proc_system = global_params.config['batch_proc_system']
if batch_proc_system is None or batch_proc_system == 'None':
return False
try:
if batch_proc_system == 'QSUB':
cmd_check = 'qstat'
elif batch_proc_system == 'SLURM':
cmd_check = 'squeue'
else:
raise NotImplementedError
with open(os.devnull, 'w') as devnull:
subprocess.check_call(cmd_check, shell=True,
stdout=devnull, stderr=devnull)
except subprocess.CalledProcessError as e:
logging.warning("BatchJobSystem '{}' specified but failed with error '{}' not found,"
" switching to single node multiprocessing.".format(batch_proc_system, e))
return False
return True
path_to_scripts_default = global_params.config.batchjob_script_folder
username = getpass.getuser()
python_path_global = sys.executable
[docs]def batchjob_script(params: list, name: str,
batchjob_folder: Optional[str] = None,
n_cores: int = 1, additional_flags: str = '',
suffix: str = "", job_name: str = "default",
script_folder: Optional[str] = None,
max_iterations: int = 10,
python_path: Optional[str] = None,
disable_batchjob: bool = False,
use_dill: bool = False,
remove_jobfolder: bool = False,
log: Logger = None, sleep_time: Optional[int] = None,
show_progress: bool = True, overwrite: bool = False,
exclude_nodes: Optional[list] = None):
"""
Submits batch jobs to process a list of parameters `params` with a python
script on the specified environment (either None, SLURM or QSUB; run
``global_params.config['batch_proc_system']`` to get the active system).
The memory available for each job is coupled to the number of cores
per job (`n_cores`). This function also supports faster submission by adding
sbatch array support and making script specification more generic.
Args:
params (list): List of all parameter sets to be processed.
name (str): Name of batch job submitted via the batch processing system.
batchjob_folder (str, optional): Directory which contains all submission
relevant files, e.g. bash scripts, logs, output files. Defaults to
``"{}/{}_folder{}/".format(global_params.config.qsub_work_folder, name, suffix)``.
n_cores (int, optional): Number of cores used for each job. Defaults to 1.
additional_flags (str, optional): Used to set additional parameters for each job.
To allocate one GPU for each worker use: ``additional_flags=--gres=gpu:1``.
Defaults to ''.
suffix (str, optional): Suffix added to `batchjob_folder`. Defaults to "".
job_name (str, optional): Name of the jobs submitted via the batch processing
system. Defaults to a random string of 8 letters.
script_folder (str, optional): Directory where to look for the script which is
executed. Looks for ``QSUB_{name}.py``. Defaults to None.
max_iterations (int, optional): Maximum number of retries of failed jobs.
Defaults to 10.
python_path (str, optional): Path to python binary. Defaults to None.
disable_batchjob (bool, optional): Use single node multiprocessing.
Defaults to False.
use_dill (bool, optional): Use dill to enable pickling of lambda expressions.
Defaults to False.
remove_jobfolder (bool, optional): Remove `batchjob_folder` after successful
termination. Defaults to False.
log (Logger, optional): Logger. Defaults to None.
sleep_time (int, optional): Sleep duration before checking batch job states
again. Defaults to None.
show_progress (bool, optional): Only used if ``disabled_batchjob=True``.
Defaults to True.
overwrite (bool, optional): Defaults to False.
exclude_nodes (list, optional): Nodes to exclude during job submission.
Defaults to None.
"""
starttime = datetime.datetime.today().strftime("%m.%d")
# Parameter handling
if n_cores is None:
n_cores = 1
if sleep_time is None:
sleep_time = 5
if python_path is None:
python_path = python_path_global
if job_name == "default":
with temp_seed(hash(time.time()) % (2 ** 32 - 1)):
letters = string.ascii_lowercase
job_name = "".join([letters[le] for le in np.random.randint(0, len(letters), 8)])
if batchjob_folder is None:
batchjob_folder = f"{global_params.config.qsub_work_folder}/{name}{suffix}_{job_name}/"
if os.path.exists(batchjob_folder):
if not overwrite:
raise FileExistsError(f'Batchjob folder already exists at "{batchjob_folder}". Please'
f' make sure it is safe for deletion, then set overwrite=True')
shutil.rmtree(batchjob_folder, ignore_errors=True)
batchjob_folder = batchjob_folder.rstrip('/')
# Check if fallback is required
if disable_batchjob or not batchjob_enabled():
return batchjob_fallback(params, name, n_cores, suffix, script_folder, python_path,
show_progress=show_progress, remove_jobfolder=remove_jobfolder,
log=log, overwrite=True, job_folder=batchjob_folder)
if log is None:
log_batchjob = initialize_logging("{}".format(name + suffix), log_dir=batchjob_folder)
else:
log_batchjob = log
if script_folder is not None:
path_to_scripts = script_folder
else:
path_to_scripts = path_to_scripts_default
path_to_script = f'{path_to_scripts}/QSUB_{name}.py'
if not os.path.exists(path_to_script):
if os.path.exists(f'{path_to_scripts}/batchjob_{name}.py'):
path_to_script = f'{path_to_scripts}/batchjob_{name}.py'
else:
raise FileNotFoundError(f'Specified script does not exist: {path_to_script}')
if global_params.config['batch_proc_system'] != 'SLURM':
msg = ('"batchjob_script" currently does not support any other batch processing '
'system than SLURM.')
log_batchjob.error(msg)
raise NotImplementedError(msg)
cpus_per_node = global_params.config['ncores_per_node']
mem_lim = int(global_params.config['mem_per_node'] /
cpus_per_node)
if '--mem' not in additional_flags:
additional_flags += ' --mem-per-cpu={}M'.format(mem_lim)
if exclude_nodes is None:
exclude_nodes = global_params.config['slurm']['exclude_nodes']
if exclude_nodes is not None:
additional_flags += f' --exclude={",".join(exclude_nodes)}'
log_batchjob.debug(f'Excluding slurm nodes: {",".join(exclude_nodes)}')
# Start SLURM job
if len(job_name) > 8:
msg = "job_name is longer than 8 characters. This is untested."
log_batchjob.error(msg)
raise ValueError(msg)
log_batchjob.info(
'Started BatchJob script "{}" ({}) (suffix="{}") with {} tasks, each'
' using {} core(s).'.format(name, job_name, suffix, len(params), n_cores))
# Create folder structure
path_to_storage = "%s/storage/" % batchjob_folder
path_to_sh = "%s/sh/" % batchjob_folder
path_to_log = "%s/log/" % batchjob_folder
path_to_err = "%s/err/" % batchjob_folder
path_to_out = "%s/out/" % batchjob_folder
if not os.path.exists(path_to_storage):
os.makedirs(path_to_storage)
if not os.path.exists(path_to_sh):
os.makedirs(path_to_sh)
if not os.path.exists(path_to_log):
os.makedirs(path_to_log)
if not os.path.exists(path_to_err):
os.makedirs(path_to_err)
if not os.path.exists(path_to_out):
os.makedirs(path_to_out)
# Submit jobs
pbar = tqdm.tqdm(total=len(params), miniters=1, mininterval=1, leave=False)
dtime_sub = 0
start_all = time.time()
job_exec_dc = {}
job2slurm_dc = {} # stores mapping of internal to SLURM job ID
slurm2job_dc = {} # stores mapping of SLURM to internal job ID
for job_id in range(len(params)):
this_storage_path = path_to_storage + "job_%d.pkl" % job_id
this_sh_path = path_to_sh + "job_%d.sh" % job_id
this_out_path = path_to_out + "job_%d.pkl" % job_id
job_log_path = path_to_log + "job_%d.log" % job_id
job_err_path = path_to_err + "job_%d.log" % job_id
with open(this_sh_path, "w") as f:
f.write("#!/bin/bash -l\n")
f.write('export syconn_wd="{4}"\n{0} {1} {2} {3}'.format(
python_path, path_to_script, this_storage_path,
this_out_path, global_params.config.working_dir))
with open(this_storage_path, "wb") as f:
for param in params[job_id]:
if use_dill:
dill.dump(param, f)
else:
pkl.dump(param, f)
os.chmod(this_sh_path, 0o744)
cmd_exec = "{0} --output={1} --error={2} --job-name={3} {4}".format(
additional_flags, job_log_path, job_err_path, job_name, this_sh_path)
if job_id == 0:
log_batchjob.debug(f'Starting jobs with command "{cmd_exec}".')
job_exec_dc[job_id] = cmd_exec
job_cmd = f'sbatch --cpus-per-task={n_cores} {cmd_exec}'
start = time.time()
max_relaunch_cnt = 0
while True:
process = subprocess.Popen(job_cmd, shell=True, stdout=subprocess.PIPE)
out_str, err = process.communicate()
if process.returncode != 0:
if max_relaunch_cnt == 5:
msg = f'Could not launch job with ID {job_id} and command "{job_cmd}".'
log_batchjob.error(msg)
raise RuntimeError(msg)
log_batchjob.warning(f'Could not launch job with ID {job_id} with command "{job_cmd}"'
f'for the {max_relaunch_cnt}. time.'
f'Attempting again in 5s. Error raised: {err}')
max_relaunch_cnt += 1
time.sleep(5)
else:
break
slurm_id = int(re.findall(r'(\d+)', out_str.decode())[0])
job2slurm_dc[job_id] = slurm_id
slurm2job_dc[slurm_id] = job_id
dtime_sub += time.time() - start
time.sleep(0.01)
# wait for jobs to be in SLURM memory
time.sleep(10)
# requeue failed jobs for `max_iterations`-times
js_dc = jobstates_slurm(job_name, starttime)
requeue_dc = {k: 0 for k in job2slurm_dc} # use internal job IDs!
nb_completed_compare = 0
last_failed = 0
while True:
nb_failed = 0
# get internal job ids from current job dict
job_ids = np.array(list(slurm2job_dc.values()))
# get states of slurm jobs with the same ordering as 'job_ids'
try:
job_states = np.array([js_dc[k] for k in slurm2job_dc.keys()])
except KeyError as e: # sometimes new SLURM job is not yet in the SLURM cache.
log_batchjob.warning(f'Did not find state of worker {e}\nFetching worker states '
f'again, SLURM cache may not have been updated yet.')
time.sleep(5)
js_dc = jobstates_slurm(job_name, starttime)
job_states = np.array([js_dc[k] for k in slurm2job_dc.keys()])
# all jobs which are not running, completed or pending have failed for
# some reason (states: failed, out_out_memory, ..).
for j in job_ids[(job_states != 'COMPLETED') & (job_states != 'PENDING')
& (job_states != 'RUNNING')]:
if requeue_dc[j] == max_iterations:
nb_failed += 1
continue
# restart job
if requeue_dc[j] == cpus_per_node: # TODO: use global_params NCORES_PER_NODE
log_batchjob.warning(f'About to re-submit job {j} ({job2slurm_dc[j]}) '
f'which already was assigned the maximum number '
f'of available CPUs.')
requeue_dc[j] = min(requeue_dc[j] + 1, cpus_per_node - n_cores) # n_cores is the base number of cores
new_core_init = requeue_dc[j] - 1 # do not increase if failed first time
# increment number of cores by one.
job_cmd = f'sbatch --cpus-per-task={new_core_init + n_cores} {job_exec_dc[j]}'
max_relaunch_cnt = 0
err_msg = None
if time.time() - last_failed > 5:
# if a job failed within the last 5 seconds, do not print the error
# message (assume same error)
try:
with open(f"{path_to_err}/job_{j}.log") as f:
err_msg = f.read()
except FileNotFoundError as e:
err_msg = f'FileNotFoundError: {e}'
last_failed = time.time()
if 'exceeded memory limit' in err_msg:
err_msg = None # do not report message of OOM errors
while True:
process = subprocess.Popen(job_cmd, shell=True, stdout=subprocess.PIPE)
out_str, err = process.communicate()
if process.returncode != 0:
if max_relaunch_cnt == 5:
raise RuntimeError(f'Could not launch job with ID {j} ({job2slurm_dc[j]}) '
f'and command "{job_cmd}".')
log_batchjob.warning(f'Could not re-launch job with ID {j} ({job2slurm_dc[j]}) '
f'with command "{job_cmd}" for the {max_relaunch_cnt}. '
f'time. Attempting again in 5s. Error raised: {err}')
max_relaunch_cnt += 1
time.sleep(5)
else:
break
slurm_id = int(re.findall(r'(\d+)', out_str.decode())[0])
slurm_id_orig = job2slurm_dc[j]
del slurm2job_dc[slurm_id_orig]
job2slurm_dc[j] = slurm_id
slurm2job_dc[slurm_id] = j
log_batchjob.info(f'Requeued job {j} ({requeue_dc[j]}/{max_iterations}). SLURM IDs: {slurm_id} (new), '
f'{slurm_id_orig} (old).')
if err_msg is not None:
log_batchjob.warning(f'Job {j} failed with: {err_msg}')
nb_completed = np.sum(job_states == 'COMPLETED')
pbar.update(nb_completed - nb_completed_compare)
nb_completed_compare = nb_completed
nb_finished = nb_completed + nb_failed
# check actually running files
if nb_finished == len(params):
break
time.sleep(sleep_time)
js_dc = jobstates_slurm(job_name, starttime)
pbar.close()
dtime_all = time.time() - start_all
dtime_all = str_delta_sec(dtime_all)
log_batchjob.info(f"All jobs ({name}, {job_name}) have finished after "
f"{dtime_all} ({dtime_sub:.1f}s submission): "
f"{nb_completed} completed, {nb_failed} failed.")
out_files = [fn for fn in glob.glob(path_to_out + "job_*.pkl") if re.search(r'job_(\d+).pkl', fn)]
if len(out_files) < len(params):
msg = f'Batch processing error during execution of {name} in job ' \
f'\"{job_name}\": Found {len(out_files)}, expected {len(params)}.'
log_batchjob.error(msg)
raise ValueError(msg)
if remove_jobfolder:
_delete_folder_daemon(batchjob_folder, log_batchjob, job_name)
return path_to_out
def _delete_folder_daemon(dirname, log, job_name, timeout=60):
"""
Starts a daemon thread to delete a directory after a given timeout.
Args:
dirname (str): The directory to be deleted.
log (Logger): Logger to log the status of the deletion.
job_name (str): The name of the job associated with the directory.
timeout (int): The time in seconds to wait before deleting the directory. Defaults to 60.
Returns:
None
"""
def _delete_folder(dn, lg, to=60):
start = time.time()
while to > time.time() - start:
try:
shutil.rmtree(dn)
break
except OSError as e:
time.sleep(5)
if time.time() - start > to:
shutil.rmtree(dn, ignore_errors=True)
if os.path.exists(dn):
dn_del = f"{os.path.dirname(dn)}/DEL/{os.path.basename(dn)}_DEL"
if os.path.exists(os.path.dirname(dn_del)):
shutil.rmtree(os.path.dirname(dn_del), ignore_errors=True)
os.makedirs(os.path.dirname(dn_del), exist_ok=True)
shutil.move(dn, dn_del)
lg.warning(f'Deletion of job folder "{dn}" timed out after {to}s.')
t = threading.Thread(name=f'jobfold_delete_{job_name}', target=_delete_folder,
args=(dirname, log, timeout))
t.setDaemon(True)
t.start()
[docs]def batchjob_fallback(params, name, n_cores=1, suffix="", script_folder=None, python_path=None, remove_jobfolder=False,
show_progress=True, log=None, overwrite=False, job_folder=None):
"""
Executes a batch job in a fallback mode if no batchjob submission system is available.
Args:
params (list): List of parameters for the batch job.
name (str): Name of the batch job.
n_cores (int): Number of cores to be used for the job. Defaults to 1.
suffix (str): Suffix to be added to the job folder. Defaults to "".
script_folder (str): Directory where the script to be executed is located.
Defaults to None.
python_path (str): Path to the python binary. Defaults to None.
remove_jobfolder (bool): Whether to remove the job folder after execution.
Defaults to False.
show_progress (bool): Whether to show progress during execution. Defaults to True.
log (Logger): Logger to log the status of the job. Defaults to None.
overwrite (bool): Whether to overwrite existing files. Defaults to False.
job_folder (str): Directory where the job files are located. Defaults to None.
Returns:
str: Path to the output of the job.
"""
if python_path is None:
python_path = python_path_global
if job_folder is None:
job_folder = "{}/{}_folder{}/".format(global_params.config.qsub_work_folder,
name, suffix)
if os.path.exists(job_folder):
if not overwrite:
raise FileExistsError(f'Batchjob folder already exists at "{job_folder}". '
f'Please make sure it is safe for deletion, then set overwrite=True')
shutil.rmtree(job_folder, ignore_errors=True)
job_folder = job_folder.rstrip('/')
if log is None:
log_batchjob = initialize_logging("{}".format(name + suffix),
log_dir=job_folder)
else:
log_batchjob = log
n_max_co_processes = cpu_count()
n_max_co_processes = np.min([cpu_count() // n_cores, n_max_co_processes])
n_max_co_processes = np.min([n_max_co_processes, len(params)])
n_max_co_processes = np.max([n_max_co_processes, 1])
log_batchjob.info(f'Started BatchJobFallback script "{name}" with {len(params)} tasks'
f' using {n_max_co_processes} parallel jobs, each using {n_cores} core(s).')
start = time.time()
if script_folder is not None:
path_to_scripts = script_folder
else:
path_to_scripts = path_to_scripts_default
path_to_script = f'{path_to_scripts}/QSUB_{name}.py'
if not os.path.exists(path_to_script):
if os.path.exists(f'{path_to_scripts}/batchjob_{name}.py'):
path_to_script = f'{path_to_scripts}/batchjob_{name}.py'
else:
raise FileNotFoundError(f'Specified script does not exist: {path_to_script}')
path_to_storage = "%s/storage/" % job_folder
path_to_sh = "%s/sh/" % job_folder
path_to_log = "%s/log/" % job_folder
path_to_err = "%s/err/" % job_folder
path_to_out = "%s/out/" % job_folder
if not os.path.exists(path_to_storage):
os.makedirs(path_to_storage)
if not os.path.exists(path_to_sh):
os.makedirs(path_to_sh)
if not os.path.exists(path_to_log):
os.makedirs(path_to_log)
if not os.path.exists(path_to_err):
os.makedirs(path_to_err)
if not os.path.exists(path_to_out):
os.makedirs(path_to_out)
multi_params = []
for i_job in range(len(params)):
job_id = i_job
this_storage_path = path_to_storage + "job_%d.pkl" % job_id
this_sh_path = path_to_sh + "job_%d.sh" % job_id
this_out_path = path_to_out + "job_%d.pkl" % job_id
with open(this_sh_path, "w") as f:
f.write('#!/bin/bash -l\n')
f.write('export syconn_wd="{4}"\n{0} {1} {2} {3}'.format(
python_path, path_to_script, this_storage_path,
this_out_path, global_params.config.working_dir))
with open(this_storage_path, "wb") as f:
for param in params[i_job]:
pkl.dump(param, f)
os.chmod(this_sh_path, 0o744)
cmd_exec = "sh {}".format(this_sh_path)
multi_params.append(cmd_exec)
out_str = start_multiprocess_imap(fallback_exec, multi_params, debug=False,
show_progress=show_progress, nb_cpus=n_max_co_processes)
out_files = glob.glob(path_to_out + "*.pkl")
if len(out_files) < len(params):
# report errors
msg = 'Critical errors occurred during "{}". {}/{} Batchjob fallback worker ' \
'failed.\n{}'.format(name, len(params) - len(out_files),
len(params), out_str)
log_mp.error(msg)
log_batchjob.error(msg)
raise ValueError(msg)
elif len("".join(out_str)) != 0:
msg = 'Warnings/errors occurred during ' \
'"{}".:\n{} See logs at {} for details.'.format(name, out_str, job_folder)
log_mp.warning(msg)
log_batchjob.warning(msg)
if remove_jobfolder:
# nfs might be slow and leaves .nfs files behind (possibly from the slurm worker)
try:
shutil.rmtree(job_folder)
except OSError:
job_folder_old = f"{os.path.dirname(job_folder)}/DEL/{os.path.basename(job_folder)}_DEL"
log_batchjob.warning(f'Deletion of job folder "{job_folder}" was not complete. Moving to '
f'{job_folder_old}')
if os.path.exists(os.path.dirname(job_folder_old)):
shutil.rmtree(os.path.dirname(job_folder_old), ignore_errors=True)
os.makedirs(os.path.dirname(job_folder_old), exist_ok=True)
if os.path.exists(job_folder_old):
shutil.rmtree(job_folder_old, ignore_errors=True)
shutil.move(job_folder, job_folder_old)
log_batchjob.debug('Finished "{}" after {:.2f}s.'.format(name, time.time() - start))
return path_to_out
[docs]def fallback_exec(cmd_exec):
"""
Executes a command using subprocess.Popen and returns any warnings or errors.
Args:
cmd_exec (str): The command to be executed.
Returns:
str: Warnings or errors during the execution of the command. This function
was previously a helper function for executing commands.
"""
ps = subprocess.Popen(cmd_exec, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = ps.communicate()
out_str = ""
reported = False
if 'error' in out.decode().lower() or 'error' in err.decode().lower() \
or 'killed' in out.decode().lower() or 'killed' in err.decode().lower() \
or 'segmentation fault' in out.decode().lower() \
or 'segmentation fault' in err.decode().lower():
reported = True
out_str = out.decode() + err.decode()
if not reported and ('warning' in out.decode().lower() or
'warning' in err.decode().lower()):
out_str = out.decode() + err.decode()
return out_str
[docs]def jobstates_slurm(job_name: str, start_time: str,
max_retry: int = 10) -> Dict[int, str]:
"""
Generates a dictionary which stores the state of every job belonging to `job_name`.
Args:
job_name (str): Name of the job.
start_time (str): Start time of the job. The following formats are
allowed: MMDD[YY] or MM/DD[/YY] or MM.DD[.YY], e.g.
``datetime.datetime.today().strftime("%m.%d")``.
max_retry (int): Number of retries for ``sacct`` SLURM query if
failing. Defaults to 10. Sleeps for 5s in-between retries.
Returns:
dict: Dictionary with the job states. (key: job ID, value: state)
"""
cmd_stat = f"sacct -b --name {job_name} -u {username} -S {start_time}"
job_states = dict()
cnt_retry = 0
while True:
process = subprocess.Popen(cmd_stat, shell=True,
stdout=subprocess.PIPE)
out, err = process.communicate()
if process.returncode != 0:
log_mp.warning(f'Delaying SLURM job state queries due to an error. '
f'Attempting again in 5s. {err}')
time.sleep(5)
cnt_retry += 1
if cnt_retry == max_retry:
log_mp.error(f'Could not query job states from SLURM: {err}\n'
f'Aborting due to maximum number of retries.')
break
continue
for line in out.decode().split('\n'):
str_parsed = re.findall(r"(\d+)[\s,\t]+([A-Z]+)", line)
if len(str_parsed) == 1:
str_parsed = str_parsed[0]
job_states[int(str_parsed[0])] = str_parsed[1]
break
return job_states
[docs]def nodestates_slurm() -> Dict[int, dict]:
"""
Generates a dictionary which stores the state of every node.
Args:
None
Returns:
dict: Dictionary with the node states. (key: node ID, value: state dict)
"""
cmd_stat = f'sinfo -N -o "%20N %10t %10c %10m %10G"'
# yields e.g.
"""
NODELIST STATE CPUS MEMORY GRES
compute001 idle 32 208990 gpu:GP100G
compute002 mix 32 208990 gpu:GP100G
compute003 idle 32 208990 gpu:GP100G
compute004 idle 32 208990 gpu:GP100G
compute005 idle 32 208990 gpu:GP100G
compute006 mix 32 208990 gpu:GP100G
compute007 idle 32 208990 gpu:GP100G
compute008 idle 32 208990 gpu:GP100G
compute009 alloc 32 208990 gpu:GP100G
compute010 idle 32 208990 gpu:GP100G
compute011 idle 32 208990 gpu:GP100G
compute012 dead 32 208990 gpu:GP100G
"""
node_states = dict()
attr_keys = [('state', str), ('cpus', int), ('memory', int), ('gres', int)]
process = subprocess.Popen(cmd_stat, shell=True, stdout=subprocess.PIPE)
out, err = process.communicate()
if process.returncode != 0:
log_mp.error(f'Error when getting node states with sinfo: {err}')
return node_states
for line in out.decode().split('\n')[1:]:
if len(line) == 0:
continue
node_uri, *str_parsed = re.findall(r"(\S+)", line)
ndc = dict()
for k, v in zip(attr_keys, str_parsed):
if k[0] == 'gres':
v = v.split(':')[-1]
ndc[k[0]] = k[1](v)
node_states[node_uri] = ndc
return node_states
[docs]def number_of_running_processes(job_name):
"""
Calculates the number of running jobs using qstat/squeue
Args:
job_name (str): Name of the job as shown in qstats.
Returns:
int: Number of running jobs.
"""
if global_params.config['batch_proc_system'] == 'QSUB':
cmd_stat = "qstat -u %s" % username
elif global_params.config['batch_proc_system'] == 'SLURM':
cmd_stat = "squeue -u %s" % username
else:
raise NotImplementedError
process = subprocess.Popen(cmd_stat, shell=True,
stdout=subprocess.PIPE)
nb_lines = 0
for line in io.TextIOWrapper(process.stdout, encoding="utf-8"):
if job_name[:10 if global_params.config['batch_proc_system'] == 'QSUB' else 8] in line:
nb_lines += 1
return nb_lines
[docs]def delete_jobs_by_name(job_name):
"""
Deletes a group of jobs that have the same name from the batch processing system. The function
checks the type of batch processing system (either QSUB or SLURM) and executes the appropriate
command to delete the jobs. If the batch processing system is not recognized, a
NotImplementedError is raised.
Args:
job_name (str): The name of the jobs to be deleted as shown in qstats.
Raises:
NotImplementedError: If the batch processing system is not recognized.
"""
if global_params.config['batch_proc_system'] == 'QSUB':
cmd_stat = "qstat -u %s" % username
elif global_params.config['batch_proc_system'] == 'SLURM':
cmd_stat = "squeue -u %s" % username
else:
raise NotImplementedError
process = subprocess.Popen(cmd_stat, shell=True,
stdout=subprocess.PIPE)
job_ids = []
for line in iter(process.stdout.readline, ''):
curr_line = str(line)
if job_name[:10] in curr_line:
job_ids.append(re.findall(r"[\d]+", curr_line)[0])
if global_params.config['batch_proc_system'] == 'QSUB':
cmd_del = "qdel "
for job_id in job_ids:
cmd_del += job_id + ", "
command = cmd_del[:-2]
subprocess.Popen(command, shell=True,
stdout=subprocess.PIPE)
elif global_params.config['batch_proc_system'] == 'SLURM':
cmd_del = "scancel -n {}".format(job_name)
subprocess.Popen(cmd_del, shell=True,
stdout=subprocess.PIPE)
else:
raise NotImplementedError
[docs]def restart_nodes_daemon():
"""
Restarts the nodes of a Google Cloud Engine (GCE) cluster. The function continuously checks the
state of the nodes and restarts any that are down or drained. The function currently only supports
GCE and is a work in progress.
Raises:
ValueError: If the gcloud compute instances stop command cannot be run.
"""
zone = 'us-east1-c'
cluster_name = 'slurm-gluster-gpu-'
"""
gcloud compute instances start slurm-gluster-gpu-client001 --zone us-east1-c"""
process = subprocess.Popen('gcloud compute instances stop --help', shell=True, stdout=subprocess.PIPE)
out, err = process.communicate()
if process.returncode != 0:
log_mp.error(f'Could not run gcloud compute instances stop command. Error: {err}')
raise ValueError
log_mp.debug('Restart-slurm-nodes daemon running..')
while True:
node_states = nodestates_slurm()
ixs = np.array(list(node_states.keys()))
states = np.array(['down' in v['state'] for v in node_states.values()])
if np.sum(states) > 0:
node_ix_str = " ".join([f'{cluster_name}{ix}' for ix in ixs[states]])
log_mp.debug(f'Restarting {np.sum(states)} node(s): "{node_ix_str}".')
process = subprocess.Popen(f'gcloud compute instances stop {node_ix_str} --zone {zone}', shell=True,
stdout=subprocess.PIPE)
out, err = process.communicate()
if process.returncode != 0:
log_mp.warning(f'Could not run "gcloud compute instances stop" command. Error: {err}')
process = subprocess.Popen(f'gcloud compute instances start {node_ix_str} --zone {zone}', shell=True,
stdout=subprocess.PIPE)
out, err = process.communicate()
if process.returncode != 0:
log_mp.warning(f'Could not run "gcloud compute instances start" command. Error: {err}')
time.sleep(30) # wait additional 30 s to give slurm time to start and update status
states = np.array(['drain' in v['state'] for v in node_states.values()])
if np.sum(states) > 0:
nodenames = f'client[{",".join(ix.replace("client", "") for ix in ixs)}]'
process = subprocess.Popen(f'sudo scontrol update nodename={nodenames} state=RESUME', shell=True,
stdout=subprocess.PIPE)
out, err = process.communicate()
if process.returncode != 0:
log_mp.warning(f'Could not run "scontrol update" command. Error: {err}')
time.sleep(30) # wait additional 30 s to give slurm time to start and update status
time.sleep(30)