Source code for syconn.mp.mp_utils

# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max-Planck-Institute of Neurobiology, Munich, Germany
# Authors: Philipp Schubert, Sven Dorkenwald, Jörgen Kornfeld
import multiprocessing.pool
import time
import dill
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import cpu_count
from typing import Callable, List, Union

import numpy as np
import tqdm

from . import log_mp

MyPool = multiprocessing.Pool


[docs]def parallel_process(array: Union[list, np.ndarray], function: Callable, n_jobs: int, use_kwargs: bool = False, front_num: int = 0, show_progress: bool = True, use_dill: bool = False) -> list: """From http://danshiebler.com/2016-09-14-parallel-progress-bar/ A parallel version of the map function with a progress bar. Args: array (array-like): An array to iterate over. function (function): A python function to apply to the elements of array n_jobs (int, default=16): The number of cores to use use_kwargs (boolean, default=False): Whether to consider the elements of array as dictionaries of keyword arguments to function front_num (int, default=3): The number of iterations to run serially before kicking off the parallel job. Useful for catching bugs. n_jobs: show_progress: show progress use_dill: Returns: [function(array[0]), function(array[1]), ...] """ # We run the first few iterations serially to catch bugs if front_num > 0: front = [function(**a) if use_kwargs else function(a) for a in array[:front_num]] else: front = [] # Assemble the workers pool = ProcessPoolExecutor(max_workers=n_jobs) try: # Pass the elements of array into function if use_dill: futures = [pool.submit(_run_dill_encoded, (dill.dumps((function, a)))) for a in array[front_num:]] elif use_kwargs: futures = [pool.submit(function, **a) for a in array[front_num:]] else: futures = [pool.submit(function, a) for a in array[front_num:]] kwargs = { 'total': len(futures), 'unit': 'job', 'unit_scale': True, 'leave': False, 'ncols': 80, 'dynamic_ncols': False, 'miniters': 1, 'mininterval': 1 } # Print out the progress as tasks complete for f in tqdm.tqdm(as_completed(futures), disable=not show_progress, **kwargs): pass finally: pool.shutdown() out = [] # Get the results from the futures. for i, future in enumerate(futures): try: out.append(future.result()) except Exception as e: msg = "In function '{}': {}".format(str(function), e) log_mp.error(msg) raise Exception(e) out.append(e) return front + out
def _run_dill_encoded(payload): fun, args = dill.loads(payload) return fun(args)
[docs]def start_multiprocess(func: Callable, params: list, debug: bool = False, verbose: bool = False, nb_cpus: int = None): """ Args: func (callable) : function params (list): function parameters debug (bool): verbose (bool): nb_cpus (int): Returns: result (list): list of function returns """ if nb_cpus is None: nb_cpus = cpu_count() if debug: nb_cpus = 1 nb_cpus = min(nb_cpus, len(params), cpu_count()) if verbose: log_mp.debug("Computing %d parameters with %d cpus." % (len(params), nb_cpus)) start = time.time() if nb_cpus > 1: pool = MyPool(nb_cpus) try: result = pool.map(func, params) finally: pool.close() pool.join() else: result = list(map(func, params)) if verbose: log_mp.debug("Time to compute: {:.1f} min".format((time.time() - start) / 60.)) return result
[docs]def start_multiprocess_imap(func: Callable, params, debug=False, verbose=False, nb_cpus=None, show_progress=True, ignore_cpu_cnt=False, desc: str = None, use_dill: bool = False): """ Args: func: params: debug: verbose: nb_cpus: show_progress: ignore_cpu_cnt: desc: Task description. Used for progress bar. use_dill: Returns: list of function returns. """ if nb_cpus is None: nb_cpus = cpu_count() if ignore_cpu_cnt: cpu_cnt = 999999999 else: cpu_cnt = cpu_count() if desc is None: if hasattr(func, '__name__'): desc = f'{func.__name__}' else: desc = str(func) nb_cpus = min(nb_cpus, len(params), cpu_cnt) if debug: nb_cpus = 1 if verbose: log_mp.debug("Computing %d parameters with %d cpus." % (len(params), nb_cpus)) start = time.time() if nb_cpus > 1: result = parallel_process(params, func, nb_cpus, show_progress=show_progress, use_dill=use_dill) else: if show_progress: pbar = tqdm.tqdm(total=len(params), ncols=80, leave=False, miniters=1, mininterval=1, unit='job', unit_scale=True, dynamic_ncols=False, desc=desc) result = [] for p in params: result.append(func(p)) pbar.update(1) pbar.close() else: result = [] for p in params: result.append(func(p)) if verbose: log_mp.debug("Time to compute: {:.1f} min".format((time.time() - start) / 60.)) return result
[docs]def start_multiprocess_obj(func_name, params, debug=False, verbose=False, nb_cpus=None): """ Args: func_name (str): params (list): List[List] each element in params must be object with attribute func_name (+ optional: kwargs) debug (bool): verbose (bool): nb_cpus (int): Returns: result (list): list of function returns """ if nb_cpus is None: nb_cpus = cpu_count() if debug: nb_cpus = 1 nb_cpus = min(nb_cpus, len(params), cpu_count()) if verbose: log_mp.debug("Computing %d parameters with %d cpus." % (len(params), nb_cpus)) for el in params: el.insert(0, func_name) start = time.time() if nb_cpus > 1: pool = MyPool(nb_cpus) try: result = pool.map(multi_helper_obj, params) finally: pool.close() pool.join() else: result = list(map(multi_helper_obj, params)) if verbose: log_mp.debug("Time to compute: {:.1f} min".format((time.time() - start) / 60.)) return result
[docs]def multi_helper_obj(args): """ Generic helper emthod for multiprocessed jobs. Calls the given object method. Args: args (Iterable): object, method name, optional: kwargs Returns: """ attr_str = args[0] obj = args[1] if len(args) == 3: kwargs = args[2] else: kwargs = {} attr = getattr(obj, attr_str) # check if attr is callable, i.e. a method to be called if not hasattr(attr, '__call__'): return attr return attr(**kwargs)