syconn.mp package

syconn.mp.batchjob_utils module

syconn.mp.batchjob_utils.batchjob_enabled()[source]

Checks if the active batch processing system is operational. The function checks if either SLURM or QSUB is active.

Returns

Returns True if either SLURM or QSUB is active, else returns False.

Return type

bool

syconn.mp.batchjob_utils.batchjob_fallback(params, name, n_cores=1, suffix='', script_folder=None, python_path=None, remove_jobfolder=False, show_progress=True, log=None, overwrite=False, job_folder=None)[source]

Executes a batch job in a fallback mode if no batchjob submission system is available.

Parameters
  • params (list) – List of parameters for the batch job.

  • name (str) – Name of the batch job.

  • n_cores (int) – Number of cores to be used for the job. Defaults to 1.

  • suffix (str) – Suffix to be added to the job folder. Defaults to “”.

  • script_folder (str) – Directory where the script to be executed is located. Defaults to None.

  • python_path (str) – Path to the python binary. Defaults to None.

  • remove_jobfolder (bool) – Whether to remove the job folder after execution. Defaults to False.

  • show_progress (bool) – Whether to show progress during execution. Defaults to True.

  • log (Logger) – Logger to log the status of the job. Defaults to None.

  • overwrite (bool) – Whether to overwrite existing files. Defaults to False.

  • job_folder (str) – Directory where the job files are located. Defaults to None.

Returns

Path to the output of the job.

Return type

str

syconn.mp.batchjob_utils.batchjob_script(params, name, batchjob_folder=None, n_cores=1, additional_flags='', suffix='', job_name='default', script_folder=None, max_iterations=10, python_path=None, disable_batchjob=False, use_dill=False, remove_jobfolder=False, log=None, sleep_time=None, show_progress=True, overwrite=False, exclude_nodes=None)[source]

Submits batch jobs to process a list of parameters params with a python script on the specified environment (either None, SLURM or QSUB; run global_params.config['batch_proc_system'] to get the active system).

The memory available for each job is coupled to the number of cores per job (n_cores). This function also supports faster submission by adding sbatch array support and making script specification more generic.

Parameters
  • params (list) – List of all parameter sets to be processed.

  • name (str) – Name of batch job submitted via the batch processing system.

  • batchjob_folder (str, optional) – Directory which contains all submission relevant files, e.g. bash scripts, logs, output files. Defaults to "{}/{}_folder{}/".format(global_params.config.qsub_work_folder, name, suffix).

  • n_cores (int, optional) – Number of cores used for each job. Defaults to 1.

  • additional_flags (str, optional) – Used to set additional parameters for each job. To allocate one GPU for each worker use: additional_flags=--gres=gpu:1. Defaults to ‘’.

  • suffix (str, optional) – Suffix added to batchjob_folder. Defaults to “”.

  • job_name (str, optional) – Name of the jobs submitted via the batch processing system. Defaults to a random string of 8 letters.

  • script_folder (str, optional) – Directory where to look for the script which is executed. Looks for QSUB_{name}.py. Defaults to None.

  • max_iterations (int, optional) – Maximum number of retries of failed jobs. Defaults to 10.

  • python_path (str, optional) – Path to python binary. Defaults to None.

  • disable_batchjob (bool, optional) – Use single node multiprocessing. Defaults to False.

  • use_dill (bool, optional) – Use dill to enable pickling of lambda expressions. Defaults to False.

  • remove_jobfolder (bool, optional) – Remove batchjob_folder after successful termination. Defaults to False.

  • log (Logger, optional) – Logger. Defaults to None.

  • sleep_time (int, optional) – Sleep duration before checking batch job states again. Defaults to None.

  • show_progress (bool, optional) – Only used if disabled_batchjob=True. Defaults to True.

  • overwrite (bool, optional) – Defaults to False.

  • exclude_nodes (list, optional) – Nodes to exclude during job submission. Defaults to None.

syconn.mp.batchjob_utils.delete_jobs_by_name(job_name)[source]

Deletes a group of jobs that have the same name from the batch processing system. The function checks the type of batch processing system (either QSUB or SLURM) and executes the appropriate command to delete the jobs. If the batch processing system is not recognized, a NotImplementedError is raised.

Parameters

job_name (str) – The name of the jobs to be deleted as shown in qstats.

Raises

NotImplementedError – If the batch processing system is not recognized.

syconn.mp.batchjob_utils.fallback_exec(cmd_exec)[source]

Executes a command using subprocess.Popen and returns any warnings or errors.

Parameters

cmd_exec (str) – The command to be executed.

Returns

Warnings or errors during the execution of the command. This function was previously a helper function for executing commands.

Return type

str

syconn.mp.batchjob_utils.jobstates_slurm(job_name, start_time, max_retry=10)[source]

Generates a dictionary which stores the state of every job belonging to job_name.

Parameters
  • job_name (str) – Name of the job.

  • start_time (str) – Start time of the job. The following formats are allowed: MMDD[YY] or MM/DD[/YY] or MM.DD[.YY], e.g. datetime.datetime.today().strftime("%m.%d").

  • max_retry (int) – Number of retries for sacct SLURM query if failing. Defaults to 10. Sleeps for 5s in-between retries.

Returns

Dictionary with the job states. (key: job ID, value: state)

Return type

dict

syconn.mp.batchjob_utils.nodestates_slurm()[source]

Generates a dictionary which stores the state of every node.

Parameters

None

Returns

Dictionary with the node states. (key: node ID, value: state dict)

Return type

dict

syconn.mp.batchjob_utils.number_of_running_processes(job_name)[source]

Calculates the number of running jobs using qstat/squeue

Parameters

job_name (str) – Name of the job as shown in qstats.

Returns

Number of running jobs.

Return type

int

syconn.mp.batchjob_utils.restart_nodes_daemon()[source]

Restarts the nodes of a Google Cloud Engine (GCE) cluster. The function continuously checks the state of the nodes and restarts any that are down or drained. The function currently only supports GCE and is a work in progress.

Raises

ValueError – If the gcloud compute instances stop command cannot be run.

syconn.mp.mp_utils module

syconn.mp.mp_utils.multi_helper_obj(args)[source]

Helper method for multiprocessed jobs. Calls the given object method.

Parameters

args (Iterable) – Contains object, method name, and optional kwargs.

Returns

The result of the method execution.

Return type

Any

syconn.mp.mp_utils.parallel_process(array, function, n_jobs, use_kwargs=False, front_num=0, show_progress=True, use_dill=False)[source]

Executes a function in parallel over an array with a progress bar.

Parameters
  • array (Union[list, np.ndarray]) – An array to iterate over.

  • function (Callable) – A python function to apply to the elements of array.

  • n_jobs (int) – The number of cores to use.

  • use_kwargs (bool, optional) – If True, elements of array are considered as dictionaries of keyword arguments to function. Defaults to False.

  • front_num (int, optional) – The number of iterations to run serially before kicking off the parallel job. Useful for catching bugs. Defaults to 0.

  • show_progress (bool, optional) – If True, shows progress bar. Defaults to True.

  • use_dill (bool, optional) – If True, uses dill to serialize data. Defaults to False.

Returns

Returns a list of results from applying the function to the array.

Return type

list

syconn.mp.mp_utils.start_multiprocess(func, params, debug=False, verbose=False, nb_cpus=None)[source]

Executes a function in parallel with multiple parameters.

Parameters
  • func (Callable) – The function to execute.

  • params (list) – The parameters for the function.

  • debug (bool, optional) – If True, uses only one CPU. Defaults to False.

  • verbose (bool, optional) – If True, prints debug information. Defaults

  • False. (to) –

  • nb_cpus (int, optional) – The number of CPUs to use. Defaults to None.

Returns

A list of results from the function execution.

Return type

list

syconn.mp.mp_utils.start_multiprocess_imap(func, params, debug=False, verbose=False, nb_cpus=None, show_progress=True, ignore_cpu_cnt=False, desc=None, use_dill=False)[source]

Executes a function in parallel with multiple parameters using imap.

Parameters
  • func (Callable) – The function to execute.

  • params (list) – The parameters for the function.

  • debug (bool, optional) – If True, uses only one CPU. Defaults to False.

  • verbose (bool, optional) – If True, prints debug information. Defaults to False.

  • nb_cpus (int, optional) – The number of CPUs to use. Defaults to None.

  • show_progress (bool, optional) – If True, shows a progress bar. Defaults to True.

  • ignore_cpu_cnt (bool, optional) – If True, ignores CPU count limit. Defaults to False.

  • desc (str, optional) – Task description. Used for progress bar. Defaults to None.

  • use_dill (bool, optional) – If True, uses dill to serialize data. Defaults to False.

Returns

A list of results from the function execution.

Return type

list

syconn.mp.mp_utils.start_multiprocess_obj(func_name, params, debug=False, verbose=False, nb_cpus=None)[source]

Executes a function of an object in parallel with multiple parameters.

Parameters
  • func_name (str) – The name of the function to execute.

  • params (list) – The parameters for the function. Each element in params

  • optional (must be an object with attribute func_name (+) – kwargs).

  • debug (bool, optional) – If True, uses only one CPU. Defaults to False.

  • verbose (bool, optional) – If True, prints debug information. Defaults to

  • False.

  • nb_cpus (int, optional) – The number of CPUs to use. Defaults to None.

Returns

A list of results from the function execution. Each element in the list is a function return.

Return type

list