syconn.mp package
syconn.mp.batchjob_utils module
- syconn.mp.batchjob_utils.batchjob_enabled()[source]
Checks if the active batch processing system is operational. The function checks if either SLURM or QSUB is active.
- Returns
Returns True if either SLURM or QSUB is active, else returns False.
- Return type
bool
- syconn.mp.batchjob_utils.batchjob_fallback(params, name, n_cores=1, suffix='', script_folder=None, python_path=None, remove_jobfolder=False, show_progress=True, log=None, overwrite=False, job_folder=None)[source]
Executes a batch job in a fallback mode if no batchjob submission system is available.
- Parameters
params (list) – List of parameters for the batch job.
name (str) – Name of the batch job.
n_cores (int) – Number of cores to be used for the job. Defaults to 1.
suffix (str) – Suffix to be added to the job folder. Defaults to “”.
script_folder (str) – Directory where the script to be executed is located. Defaults to None.
python_path (str) – Path to the python binary. Defaults to None.
remove_jobfolder (bool) – Whether to remove the job folder after execution. Defaults to False.
show_progress (bool) – Whether to show progress during execution. Defaults to True.
log (Logger) – Logger to log the status of the job. Defaults to None.
overwrite (bool) – Whether to overwrite existing files. Defaults to False.
job_folder (str) – Directory where the job files are located. Defaults to None.
- Returns
Path to the output of the job.
- Return type
str
- syconn.mp.batchjob_utils.batchjob_script(params, name, batchjob_folder=None, n_cores=1, additional_flags='', suffix='', job_name='default', script_folder=None, max_iterations=10, python_path=None, disable_batchjob=False, use_dill=False, remove_jobfolder=False, log=None, sleep_time=None, show_progress=True, overwrite=False, exclude_nodes=None)[source]
Submits batch jobs to process a list of parameters params with a python script on the specified environment (either None, SLURM or QSUB; run
global_params.config['batch_proc_system']to get the active system).The memory available for each job is coupled to the number of cores per job (n_cores). This function also supports faster submission by adding sbatch array support and making script specification more generic.
- Parameters
params (list) – List of all parameter sets to be processed.
name (str) – Name of batch job submitted via the batch processing system.
batchjob_folder (str, optional) – Directory which contains all submission relevant files, e.g. bash scripts, logs, output files. Defaults to
"{}/{}_folder{}/".format(global_params.config.qsub_work_folder, name, suffix).n_cores (int, optional) – Number of cores used for each job. Defaults to 1.
additional_flags (str, optional) – Used to set additional parameters for each job. To allocate one GPU for each worker use:
additional_flags=--gres=gpu:1. Defaults to ‘’.suffix (str, optional) – Suffix added to batchjob_folder. Defaults to “”.
job_name (str, optional) – Name of the jobs submitted via the batch processing system. Defaults to a random string of 8 letters.
script_folder (str, optional) – Directory where to look for the script which is executed. Looks for
QSUB_{name}.py. Defaults to None.max_iterations (int, optional) – Maximum number of retries of failed jobs. Defaults to 10.
python_path (str, optional) – Path to python binary. Defaults to None.
disable_batchjob (bool, optional) – Use single node multiprocessing. Defaults to False.
use_dill (bool, optional) – Use dill to enable pickling of lambda expressions. Defaults to False.
remove_jobfolder (bool, optional) – Remove batchjob_folder after successful termination. Defaults to False.
log (Logger, optional) – Logger. Defaults to None.
sleep_time (int, optional) – Sleep duration before checking batch job states again. Defaults to None.
show_progress (bool, optional) – Only used if
disabled_batchjob=True. Defaults to True.overwrite (bool, optional) – Defaults to False.
exclude_nodes (list, optional) – Nodes to exclude during job submission. Defaults to None.
- syconn.mp.batchjob_utils.delete_jobs_by_name(job_name)[source]
Deletes a group of jobs that have the same name from the batch processing system. The function checks the type of batch processing system (either QSUB or SLURM) and executes the appropriate command to delete the jobs. If the batch processing system is not recognized, a NotImplementedError is raised.
- Parameters
job_name (str) – The name of the jobs to be deleted as shown in qstats.
- Raises
NotImplementedError – If the batch processing system is not recognized.
- syconn.mp.batchjob_utils.fallback_exec(cmd_exec)[source]
Executes a command using subprocess.Popen and returns any warnings or errors.
- Parameters
cmd_exec (str) – The command to be executed.
- Returns
Warnings or errors during the execution of the command. This function was previously a helper function for executing commands.
- Return type
str
- syconn.mp.batchjob_utils.jobstates_slurm(job_name, start_time, max_retry=10)[source]
Generates a dictionary which stores the state of every job belonging to job_name.
- Parameters
job_name (str) – Name of the job.
start_time (str) – Start time of the job. The following formats are allowed: MMDD[YY] or MM/DD[/YY] or MM.DD[.YY], e.g.
datetime.datetime.today().strftime("%m.%d").max_retry (int) – Number of retries for
sacctSLURM query if failing. Defaults to 10. Sleeps for 5s in-between retries.
- Returns
Dictionary with the job states. (key: job ID, value: state)
- Return type
dict
- syconn.mp.batchjob_utils.nodestates_slurm()[source]
Generates a dictionary which stores the state of every node.
- Parameters
None –
- Returns
Dictionary with the node states. (key: node ID, value: state dict)
- Return type
dict
- syconn.mp.batchjob_utils.number_of_running_processes(job_name)[source]
Calculates the number of running jobs using qstat/squeue
- Parameters
job_name (str) – Name of the job as shown in qstats.
- Returns
Number of running jobs.
- Return type
int
- syconn.mp.batchjob_utils.restart_nodes_daemon()[source]
Restarts the nodes of a Google Cloud Engine (GCE) cluster. The function continuously checks the state of the nodes and restarts any that are down or drained. The function currently only supports GCE and is a work in progress.
- Raises
ValueError – If the gcloud compute instances stop command cannot be run.
syconn.mp.mp_utils module
- syconn.mp.mp_utils.multi_helper_obj(args)[source]
Helper method for multiprocessed jobs. Calls the given object method.
- Parameters
args (Iterable) – Contains object, method name, and optional kwargs.
- Returns
The result of the method execution.
- Return type
Any
- syconn.mp.mp_utils.parallel_process(array, function, n_jobs, use_kwargs=False, front_num=0, show_progress=True, use_dill=False)[source]
Executes a function in parallel over an array with a progress bar.
- Parameters
array (Union[list, np.ndarray]) – An array to iterate over.
function (Callable) – A python function to apply to the elements of array.
n_jobs (int) – The number of cores to use.
use_kwargs (bool, optional) – If True, elements of array are considered as dictionaries of keyword arguments to function. Defaults to False.
front_num (int, optional) – The number of iterations to run serially before kicking off the parallel job. Useful for catching bugs. Defaults to 0.
show_progress (bool, optional) – If True, shows progress bar. Defaults to True.
use_dill (bool, optional) – If True, uses dill to serialize data. Defaults to False.
- Returns
Returns a list of results from applying the function to the array.
- Return type
list
- syconn.mp.mp_utils.start_multiprocess(func, params, debug=False, verbose=False, nb_cpus=None)[source]
Executes a function in parallel with multiple parameters.
- Parameters
func (Callable) – The function to execute.
params (list) – The parameters for the function.
debug (bool, optional) – If True, uses only one CPU. Defaults to False.
verbose (bool, optional) – If True, prints debug information. Defaults
False. (to) –
nb_cpus (int, optional) – The number of CPUs to use. Defaults to None.
- Returns
A list of results from the function execution.
- Return type
list
- syconn.mp.mp_utils.start_multiprocess_imap(func, params, debug=False, verbose=False, nb_cpus=None, show_progress=True, ignore_cpu_cnt=False, desc=None, use_dill=False)[source]
Executes a function in parallel with multiple parameters using imap.
- Parameters
func (Callable) – The function to execute.
params (list) – The parameters for the function.
debug (bool, optional) – If True, uses only one CPU. Defaults to False.
verbose (bool, optional) – If True, prints debug information. Defaults to False.
nb_cpus (int, optional) – The number of CPUs to use. Defaults to None.
show_progress (bool, optional) – If True, shows a progress bar. Defaults to True.
ignore_cpu_cnt (bool, optional) – If True, ignores CPU count limit. Defaults to False.
desc (str, optional) – Task description. Used for progress bar. Defaults to None.
use_dill (bool, optional) – If True, uses dill to serialize data. Defaults to False.
- Returns
A list of results from the function execution.
- Return type
list
- syconn.mp.mp_utils.start_multiprocess_obj(func_name, params, debug=False, verbose=False, nb_cpus=None)[source]
Executes a function of an object in parallel with multiple parameters.
- Parameters
func_name (str) – The name of the function to execute.
params (list) – The parameters for the function. Each element in params
optional (must be an object with attribute func_name (+) – kwargs).
debug (bool, optional) – If True, uses only one CPU. Defaults to False.
verbose (bool, optional) – If True, prints debug information. Defaults to
False. –
nb_cpus (int, optional) – The number of CPUs to use. Defaults to None.
- Returns
A list of results from the function execution. Each element in the list is a function return.
- Return type
list