Source code for syconn.handler.basics

# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max Planck Institute of Neurobiology, Martinsried, Germany
# Authors: Philipp Schubert, Joergen Kornfeld

import collections
import contextlib
import gc
import glob
import os
import pickle as pkl
import re
import shutil
import signal
import tempfile
import zipfile
from collections import defaultdict
from typing import List, Union

import networkx as nx
import numpy as np
import tqdm
from knossos_utils import KnossosDataset
from knossos_utils.skeleton import SkeletonAnnotation, SkeletonNode
from plyfile import PlyData

from . import log_handler
from .. import global_params


[docs]def kd_factory(kd_path: str, channel: str = 'jpg'): """ Initializes a KnossosDataset at the given `kd_path`. This function attempts to initialize a KnossosDataset by searching for configuration files in the specified path. It prioritizes pyk.conf files and attempts to handle different scenarios where the configuration might be located. Notes: * Prioritizes pyk.conf files. Todo: * Requires additional adjustment of the data type, i.e., setting the channel explicitly currently leads to uint32 <-> uint64 issues in the CS segmentation. Args: kd_path: The file system path where the KnossosDataset configuration is expected to be found. channel: The channel to use for the dataset. This argument is currently not used in the function. Returns: An initialized KnossosDataset object. If the initialization fails due to missing configuration files, a ValueError is raised. Raises: ValueError: If no configuration file can be found at the specified path. """ kd = KnossosDataset() # TODO: set appropriate channel # # kd.set_channel(channel) if os.path.isfile(kd_path): kd.initialize_from_conf(kd_path) elif len(glob.glob(f'{kd_path}/*.pyk.conf')) == 1: pyk_confs = glob.glob(f'{kd_path}/*.pyk.conf') kd.initialize_from_pyknossos_path(pyk_confs[0]) elif os.path.isfile(kd_path + "/mag1/knossos.conf"): # Initializes the dataset by parsing the knossos.conf in path + "mag1" kd_path += "/mag1/knossos.conf" kd.initialize_from_knossos_path(kd_path) else: raise ValueError(f'Could not find KnossosDataset config at {kd_path}.') return kd
[docs]def switch_array_entries(this_array, entries): """ Switches two specified entries in an array. This function swaps the values at two specified indices in the given array. Args: this_array: The array in which the entries will be switched. entries: A list or tuple containing two indices whose corresponding values in `this_array` will be swapped. Returns: The array with the two entries switched. """ entry_0 = this_array[entries[0]] this_array[entries[0]] = this_array[entries[1]] this_array[entries[1]] = entry_0 return this_array
[docs]def crop_bool_array(arr): """ Crops a bool array to its True region. This function finds the bounding box of the True region in a 3D boolean array and returns the cropped array along with the offset of the crop. Args: arr: 3D boolean array The array to be cropped. Returns: A tuple containing the cropped 3D boolean array and a list representing the offset of the crop in the format [x_min, y_min, z_min]. """ in_mask_indices = [np.flatnonzero(arr.sum(axis=(1, 2))), np.flatnonzero(arr.sum(axis=(0, 2))), np.flatnonzero(arr.sum(axis=(0, 1)))] return arr[in_mask_indices[0].min(): in_mask_indices[0].max() + 1, in_mask_indices[1].min(): in_mask_indices[1].max() + 1, in_mask_indices[2].min(): in_mask_indices[2].max() + 1],\ [in_mask_indices[0].min(), in_mask_indices[1].min(), in_mask_indices[2].min()]
[docs]def group_ids_to_so_storage(ids, params, significant_digits=5): """ Groups IDs and corresponding parameters for storage optimization. This function creates a dictionary where keys are strings representing the last `significant_digits` of each ID, and values are lists of IDs that share the same key. It also groups corresponding parameters in the same manner. Args: ids: A list of integer IDs to be grouped. params: A list of parameters corresponding to each ID. significant_digits: The number of digits from the end of the ID to use for grouping. Returns: A list containing the dictionary of grouped IDs and dictionaries of grouped parameters. """ id_dict = defaultdict(list) param_dicts = [defaultdict(list) for _ in range(len(params))] for i_id in range(len(ids)): this_id = ids[i_id] this_id_str = "%.5d" % this_id id_dict[this_id_str[-significant_digits:]].append(this_id) for i_param in range(len(params)): param_dicts[i_param][this_id_str[-significant_digits:]].\ append(params[i_param][i_id]) return [id_dict] + param_dicts
[docs]def majority_element_1d(arr): """ Finds the most frequent element in a 1D array. This function returns the element that appears most frequently in the provided array. If multiple elements have the same highest frequency, the function returns the first one encountered. Args: arr: np.array Returns: scalar - The most frequent element in the array. """ uni_el, cnts = np.unique(arr, return_counts=True) return uni_el[np.argmax(cnts)]
[docs]def get_paths_of_skelID(id_list, traced_skel_dir): """ Gather paths to kzip of skeletons with ID in id_list Args: id_list: list of str skeleton ID's traced_skel_dir: str directory of mapped skeletons Returns: list of str paths of skeletons in id_list. If a skeleton ID does not have a corresponding file, `None` is returned in its place. """ mapped_skel_paths = get_filepaths_from_dir(traced_skel_dir) mapped_skel_ids = re.findall(r'iter_\d+_(\d+)', ''.join(mapped_skel_paths)) wanted_paths = [] for skelID in id_list: try: path = mapped_skel_paths[mapped_skel_ids.index(str(skelID))] wanted_paths.append(path) except: wanted_paths.append(None) return wanted_paths
[docs]def coordpath2anno(coords, scaling=None, add_edges=True): """ Creates a SkeletonAnnotation from a path of coordinates. This function generates a SkeletonAnnotation object from a list of coordinates, optionally scaling them and adding edges between consecutive nodes. The assumption is made that the coordinates are scaled and in order for edge creation. Args: coords: A numpy array of scaled coordinates. scaling: A tuple representing the scaling factors for each coordinate axis. If not provided, the global scaling parameters are used. add_edges: A boolean indicating whether to add edges between consecutive nodes. Returns: A SkeletonAnnotation object representing the skeleton formed by the coordinates. """ if scaling is None: scaling = global_params.config['scaling'] anno = SkeletonAnnotation() anno.scaling = scaling scaling = np.array(scaling, dtype=np.int32) rep_nodes = [] coords = np.array(coords, dtype=np.int32) for c in coords: unscaled_c = c / scaling n = SkeletonNode().from_scratch(anno, unscaled_c[0], unscaled_c[1], unscaled_c[2]) anno.addNode(n) rep_nodes.append(n) if add_edges: for i in range(1, len(rep_nodes)): anno.addEdge(rep_nodes[i-1], rep_nodes[i]) return anno
[docs]def get_filepaths_from_dir(directory, ending=('k.zip',), recursively=False, exclude_endings=False, fname_includes=()): """ Retrieves file paths with specific endings from a directory. This function collects all file paths from a given directory that have specified endings. It can search recursively and include or exclude files based on the endings and substrings in the filenames. Args: directory: The directory to search for files. ending: A tuple, list, or string specifying the file endings to include. recursively: A boolean indicating whether to search subdirectories. exclude_endings: A boolean indicating whether to exclude files with the specified endings. fname_includes: A string or list of substrings that must be included in the filenames. Returns: A list of strings representing the paths to files that match the specified criteria. """ # make it backwards compatible if type(ending) is str: ending = [ending] if type(fname_includes) is str: fname_includes = [fname_includes] files = [] corr_incl = True corr_end = True if recursively: for r, s, fs in os.walk(directory): for f in fs: if len(ending) > 0: corr_end = np.any( [f[-len(end):] == end for end in ending]) if exclude_endings: corr_end = not corr_end if len(fname_includes) > 0: corr_incl = np.any([substr in f for substr in fname_includes]) if corr_end and corr_incl: files.append(os.path.join(r, f)) else: for f in next(os.walk(directory))[2]: if len(ending) > 0: corr_end = np.any( [f[-len(end):] == end for end in ending]) if exclude_endings: corr_end = not corr_end if len(fname_includes) > 0: corr_incl = np.any([substr in f for substr in fname_includes]) if corr_end and corr_incl: files.append(os.path.join(directory, f)) return files
[docs]def read_txt_from_zip(zip_fname, fname_in_zip): """ Reads a text file from a zip archive. This function extracts and reads the contents of a text file stored within a zip archive. Args: zip_fname (str): The path to the zip file. fname_in_zip (str): The name of the text file within the zip archive. Returns: bytes: The content of the text file. """ with zipfile.ZipFile(zip_fname, allowZip64=True) as z: txt = z.read(fname_in_zip) return txt
[docs]def read_mesh_from_zip(zip_fname, fname_in_zip): """ Reads a PLY mesh file from a zip archive. This function extracts and reads the vertex and face data from a PLY file stored within a zip archive. Currently, it does not support reading normals. Args: zip_fname: The path to the zip file. fname_in_zip: The name of the PLY file within the zip archive. Returns: A tuple containing three np.array objects for indices, vertices, and normals (the latter is not supported and will be `None`). """ with zipfile.ZipFile(zip_fname, allowZip64=True) as z: txt = z.open(fname_in_zip) plydata = PlyData.read(txt) vert = plydata['vertex'].data vert = vert.view((np.float32, len(vert.dtype.names))).flatten() ind = np.array(plydata['face'].data['vertex_indices'].tolist()).flatten() # TODO: support normals # norm = plydata['normals'].data # norm = vert.view((np.float32, len(vert.dtype.names))).flatten() return [ind, vert, None]
[docs]def read_meshes_from_zip(zip_fname, fnames_in_zip): """ Reads multiple PLY mesh files from a zip archive. This function extracts and reads the vertex and face data from multiple PLY files stored within a zip archive. Currently, it does not support reading normals or other additional data. Args: zip_fname: The path to the zip file containing PLY files. fnames_in_zip: A list of filenames of the PLY files within the zip. Returns: Three numpy arrays containing the vertices, faces, and a placeholder for normals (which is currently set to `None`) for each PLY file. """ meshes = [] with zipfile.ZipFile(zip_fname, allowZip64=True) as z: for fname_in_zip in fnames_in_zip: txt = z.open(fname_in_zip) plydata = PlyData.read(txt) vert = plydata['vertex'].data vert = vert.view((np.float32, len(vert.dtype.names))).flatten() ind = np.array(plydata['face'].data['vertex_indices'].tolist()).flatten() # TODO: support normals # norm = plydata['normals'].data # norm = vert.view((np.float32, len(vert.dtype.names))).flatten() meshes.append((ind, vert, None)) return meshes
[docs]def write_txt2kzip(kzip_path, text, fname_in_zip, force_overwrite=False): """ Writes a text string to a file within a k.zip archive. This function creates or updates a k.zip archive by adding a text or byte file with the specified content. It can optionally overwrite existing files if force_overwrite is set to True. Args: kzip_path: The path to the k.zip archive. text: The text or bytes content to write to the file within the archive. fname_in_zip: The name of the file to be created or updated within the archive. force_overwrite: A boolean indicating whether to overwrite existing files with the same name in the archive. Returns: None. """ texts2kzip(kzip_path, [text], [fname_in_zip], force_overwrite=force_overwrite)
[docs]def texts2kzip(kzip_path, texts, fnames_in_zip, force_overwrite=False): """ Writes multiple text strings to files within a k.zip archive. This function creates or updates a k.zip archive by adding multiple text files with the specified contents. It can optionally overwrite existing files if the 'force_overwrite' parameter is set to True. Args: kzip_path (str): The path to the k.zip archive. texts (List[str]): A list of text contents to write to the files within the archive. fnames_in_zip (List[str]): A list of names for the files to be created or updated within the archive, indicating the name of the file when added to the zip. force_overwrite (bool): A boolean indicating whether to overwrite existing files with the same names. Returns: None. """ if not kzip_path.endswith('.k.zip'): kzip_path += '.k.zip' if os.path.isfile(kzip_path): try: if force_overwrite: with zipfile.ZipFile(kzip_path, "w", zipfile.ZIP_DEFLATED) as zf: for i in range(len(texts)): zf.writestr(fnames_in_zip[i], texts[i]) else: for i in range(len(texts)): remove_from_zip(kzip_path, fnames_in_zip[i]) with zipfile.ZipFile(kzip_path, "a", zipfile.ZIP_DEFLATED) as zf: for i in range(len(texts)): zf.writestr(fnames_in_zip[i], texts[i]) except Exception as e: log_handler.error("Couldn't open file {} for reading and overwri" "ting. {}".format(kzip_path, e)) else: try: with zipfile.ZipFile(kzip_path, "w", zipfile.ZIP_DEFLATED) as zf: for i in range(len(texts)): zf.writestr(fnames_in_zip[i], texts[i]) except Exception as e: log_handler.error("Couldn't open file {} for writing. {}" "".format(kzip_path, e))
[docs]def write_data2kzip(kzip_path, fpath, fname_in_zip=None, force_overwrite=False): """ Writes a file to a k.zip archive. This function adds a file to a k.zip archive, optionally overwriting an existing file with the same name. Args: kzip_path (str): The path to the k.zip archive. fpath (str): The path to the file to be added to the archive. fname_in_zip (str): The name of the file within the archive. If not provided, the original file name is used. force_overwrite (bool): A boolean indicating whether to overwrite an existing file with the same name. Returns: None. """ data2kzip(kzip_path, [fpath], [fname_in_zip], force_overwrite)
[docs]def data2kzip(kzip_path: str, fpaths, fnames_in_zip=None, force_overwrite=True, verbose=False): """ Writes multiple files to a k.zip archive and optionally removes original files. This function adds multiple files to a k.zip archive and can optionally overwrite existing files with the same names in the archive. If `force_overwrite` is set to True, it will overwrite files. After adding the files to the archive, it removes the original files specified by `fpaths` if not contradicted by the calling code. Args: kzip_path: The path to the k.zip archive. fpaths: A list of paths to the files to be added to the archive. fnames_in_zip: A list of names for the files within the archive. If not provided, the original file names are used. force_overwrite: A boolean indicating whether to overwrite existing files in the archive with the same names. verbose: A boolean indicating whether to print progress information. Returns: None. """ if not kzip_path.endswith('.k.zip'): kzip_path += '.k.zip' nb_files = len(fpaths) if verbose: log_handler.info('Writing {} files to .zip.'.format(nb_files)) pbar = tqdm.tqdm(total=nb_files, leave=False) if os.path.isfile(kzip_path): try: if force_overwrite: with zipfile.ZipFile(kzip_path, "w", zipfile.ZIP_DEFLATED, allowZip64=True) as zf: for ii in range(nb_files): file_name = os.path.split(fpaths[ii])[1] if fnames_in_zip[ii] is not None: file_name = fnames_in_zip[ii] zf.write(fpaths[ii], file_name) if verbose: pbar.update() else: for ii in range(nb_files): file_name = os.path.split(fpaths[ii])[1] if fnames_in_zip[ii] is not None: file_name = fnames_in_zip[ii] remove_from_zip(kzip_path, file_name) with zipfile.ZipFile(kzip_path, "a", zipfile.ZIP_DEFLATED, allowZip64=True) as zf: for ii in range(nb_files): file_name = os.path.split(fpaths[ii])[1] if fnames_in_zip[ii] is not None: file_name = fnames_in_zip[ii] zf.write(fpaths[ii], file_name) if verbose: pbar.update() except Exception as e: log_handler.error("Couldn't open file {} for reading and" " overwriting. Error: {}".format(kzip_path, e)) else: try: with zipfile.ZipFile(kzip_path, "w", zipfile.ZIP_DEFLATED, allowZip64=True) as zf: for ii in range(nb_files): file_name = os.path.split(fpaths[ii])[1] if fnames_in_zip[ii] is not None: file_name = fnames_in_zip[ii] zf.write(fpaths[ii], file_name) if verbose: pbar.update() except Exception as e: log_handler.error("Couldn't open file {} for writing. Error: " "{}".format(kzip_path, e)) for ii in range(nb_files): os.remove(fpaths[ii]) if verbose: pbar.close() log_handler.info('Done writing files to .zip.')
[docs]def remove_from_zip(zipfname, *filenames): """ Removes specified files from a zip archive. This function deletes files with the given names from a zip archive. Args: zipfname: The path to the zip archive. *filenames: A variable number of strings representing the names of the files to be removed from the archive. Returns: None. """ tempdir = tempfile.mkdtemp() try: tempname = os.path.join(tempdir, 'new.zip') with zipfile.ZipFile(zipfname, 'r', allowZip64=True) as zipread: with zipfile.ZipFile(tempname, 'w', allowZip64=True) as zipwrite: for item in zipread.infolist(): if item.filename not in filenames: data = zipread.read(item.filename) zipwrite.writestr(item, data) shutil.move(tempname, zipfname) finally: shutil.rmtree(tempdir)
[docs]def write_obj2pkl(path, objects): """ Writes object to pickle file. This function writes a given object to a pickle file at the specified path. Args: path (str): Destination. objects (object): The object to be serialized and written to the file. Returns: None. """ gc.disable() if isinstance(path, str): with open(path + ".tmp", 'wb') as output: pkl.dump(objects, output, protocol=pkl.HIGHEST_PROTOCOL) shutil.move(path + ".tmp", path) else: log_handler.warn("Write_obj2pkl takes arguments 'path' (str) and " "'objects' (python object).") with open(objects + ".tmp", 'wb') as output: pkl.dump(path, output, protocol=pkl.HIGHEST_PROTOCOL) shutil.move(objects + ".tmp", objects) gc.enable()
[docs]def load_pkl2obj(path): """ Deserializes and loads an object from a pickle file. This function reads a pickle file from the specified path and returns the deserialized object. Args: path (str): The path to the pickle file. Returns: The object deserialized from the pickle file. """ gc.disable() try: with open(path, 'rb') as inp: objects = pkl.load(inp) except UnicodeDecodeError: # python3 compatibility with open(path, 'rb') as inp: objects = pkl.loads(inp.read(), encoding='bytes') objects = convert_keys_byte2str(objects) gc.enable() return objects
[docs]def convert_keys_byte2str(dc): """ Converts byte string keys in a dictionary to regular strings. This function recursively traverses a dictionary and converts all keys that are byte strings to regular strings. Args: dc: The dictionary with byte string keys. Returns: The dictionary with all keys converted to regular strings. """ if type(dc) is not dict: return dc for k in list(dc.keys()): v = convert_keys_byte2str(dc[k]) if type(k) is bytes: dc[k.decode('utf-8')] = v del dc[k] return dc
[docs]def chunkify(lst: Union[list, np.ndarray], n: int) -> List[list]: """ Splits a list or array into a specified number of approximately equal-sized chunks. This function divides a list or array into `n` chunks, where `n` is the minimum of the specified number and the length of the list. Each chunk contains consecutive elements from the original list. Args: lst: The list or numpy array to be chunked. n: The desired number of chunks. Examples: >>> chunkify(np.arange(10), 2) >>> chunkify(np.arange(10), 100) Returns: A list of chunks. Length is `np.min([n, len(lst)])`. """ if len(lst) < n: n = len(lst) return [lst[i::n] for i in range(n)]
[docs]def chunkify_weighted(lst, n, weights): """ Splits a list into weighted sub-lists. This function divides a list into `n` sub-lists based on the provided weights. The weights are not necessarily used for sorting; they determine the distribution of elements across the sub-lists. Args: lst: The list to be chunked. n: The number of chunks to create. weights: An array of weights corresponding to the elements of `lst`. Returns: A list of `n` chunks, where each chunk is a sublist of the original list. """ if len(lst) < n: n = len(lst) return [lst[i::n] for i in range(n)] # no weighting needed ordered = np.argsort(weights) lst = lst[ordered[::-1]] return [lst[i::n] for i in range(n)]
[docs]def chunkify_successive(l, n): """ Yield successive n-sized chunks from l. This generator function divides a list into chunks of size `n` and yields each chunk in turn, ensuring that all elements in the list are presented in these chunks. Args: l: The list to be chunked. n: The size of each chunk. Yields: A generator of successive n-sized chunks of the list `l`. """ for i in range(0, len(l), n): yield l[i:i + n]
[docs]def flatten_list(lst): """ Flattens a list of lists into a single list. This function takes a list of lists and concatenates their elements into a single list, preserving the order of elements, similar to `np.concatenate`. Args: lst: A list of lists to be flattened. Returns: A single list containing all the elements of the sublists. """ res = np.array([el for sub in lst for el in sub]) return res
[docs]def flatten(x): """ Recursively flattens a nested iterable into a flat list. This function replaces the deprecated compiler.ast.flatten by performing recursive flattening. It takes a nested iterable (e.g., list of lists of lists) and flattens it into a single list containing all the non-iterable elements. Originally shared under Public domain code: https://stackoverflow.com/questions/16176742/python-3-replacement-for- deprecated-compiler-ast-flatten-function Args: x: The nested iterable to be flattened. Returns: A flat list containing all the non-iterable elements of the input. """ def iselement(e): return not(isinstance(e, collections.Iterable) and not isinstance(e, str)) for el in x: if iselement(el): yield el else: # py2 compat # yield from flatten(el) for subel in flatten(el): yield subel
[docs]def get_skelID_from_path(skel_path): """ Extracts the skeleton ID from a file path. This function parses the file path of a skeleton file to extract the skeleton ID, which is represented as an integer. Args: skel_path: str The file path of the skeleton. Returns: int: The ID of the skeleton. """ return int(re.findall(r'iter_0_(\d+)', skel_path)[0])
[docs]def safe_copy(src, dest, safe=True): """ Copies file and optionally throws an exception if destination exists. This function copies a file from the specified source path (`src`) to the destination path (`dest`). If the `safe` parameter is True, the function checks if the destination file exists and throws an exception to prevent overwriting. If `safe` is False, the file is copied and any existing file at the destination is replaced. Note: Credit to Misandrist on Stackoverflow for the original implementation date 03/31/17. Args: src (str): The source file path. dest (str): The destination file path. safe (bool): If True, raise an exception if the destination file exists. If False, allows overwriting of the destination file. Returns: None. """ if safe: fd = os.open(dest, os.O_CREAT | os.O_EXCL | os.O_WRONLY) # Copy the file and automatically close files at the end with os.fdopen(fd, 'wb') as f: with open(src, 'rb') as sf: shutil.copyfileobj(sf, f) else: shutil.copy(src, dest)
# https://gist.github.com/tcwalther/ae058c64d5d9078a9f333913718bba95 # class based on: http://stackoverflow.com/a/21919644/487556
[docs]class DelayedInterrupt(object): def __init__(self, signals): """ Initializes a context manager to delay interrupts. This context manager is used to delay handling of specified signals until the context block is exited. Args: signals: A list or tuple of signal numbers to be delayed. """ if not isinstance(signals, list) and not isinstance(signals, tuple): signals = [signals] self.sigs = signals def __enter__(self): """ Enters the context, setting up the delay for the specified signals. This method sets up handlers for the specified signals to delay their processing until the context is exited. Returns: None. """ self.signal_received = {} self.old_handlers = {} for sig in self.sigs: self.signal_received[sig] = False self.old_handlers[sig] = signal.getsignal(sig) def handler(s, frame): self.signal_received[sig] = (s, frame) # Note: in Python 3.5, you can use signal.Signals(sig).name log_handler.info('Signal %s received. Delaying KeyboardInterrupt.' % sig) self.old_handlers[sig] = signal.getsignal(sig) signal.signal(sig, handler) def __exit__(self, type, value, traceback): """ Exits the context, restoring original signal handlers and processing any received signals. This method restores the original signal handlers and, if any signals were received during the context, processes them using the original handlers. Args: type: The type of the exception, if any occurred. value: The value of the exception, if any occurred. traceback: The traceback of the exception, if any occurred. Returns: None. """ for sig in self.sigs: signal.signal(sig, self.old_handlers[sig]) if self.signal_received[sig] and self.old_handlers[sig]: self.old_handlers[sig](*self.signal_received[sig])
[docs]def prase_cc_dict_from_txt(txt): """ Parse connected components from a Knossos mergelist text file. Args: txt (str or bytes): The content of a Knossos mergelist text file. Returns: dict: A mapping of each component ID to its associated node IDs. """ cc_dict = {} for line in txt.splitlines()[::4]: if type(line) is bytes: curr_line = line.decode() else: curr_line = line line_nb = np.array(re.findall(r"(\d+)", curr_line), dtype=np.uint64) curr_ixs = line_nb[3:] cc_ix = line_nb[0] curr_ixs = curr_ixs[curr_ixs != 0] cc_dict[cc_ix] = curr_ixs return cc_dict
[docs]def parse_cc_dict_from_kml(kml_path): """ Parses connected components from a Knossos mergelist file specified by the path and returns a dictionary mapping each component ID to its associated node IDs. Args: kml_path (str): The file path to the Knossos mergelist file. Returns: dict: A dictionary where each key is a component ID and the corresponding value is a list of node IDs belonging to that component. The information regarding the numpy array in the generated docstring has been replaced with 'list' from the old docstring to resolve the conflict in the return type information. """ txt = open(kml_path, "rb").read().decode() return prase_cc_dict_from_txt(txt)
[docs]def parse_cc_dict_from_g(g): """ Parses connected components from a graph object and returns a dictionary mapping each component ID to its associated node IDs. Args: g (networkx.Graph): The graph object containing connected components. Returns: dict: A dictionary where each key is a component ID and the corresponding value is a set of node IDs belonging to that component. """ cc_dict = {} # use minimum ID in CC as SSV ID for cc in sorted(nx.connected_components(g), key=len, reverse=True): cc_dict[cc[0]] = cc return cc_dict
[docs]def parse_cc_dict_from_kzip(k_path): """ Parses connected components from a Knossos mergelist text file within a zip archive and returns a dictionary mapping each component ID to its associated node IDs. Args: k_path (str): The file path to the zip archive containing the Knossos mergelist file. Returns: dict: A dictionary where each key is a component ID and the corresponding value is a numpy array of node IDs belonging to that component. """ txt = read_txt_from_zip(k_path, "mergelist.txt").decode() return prase_cc_dict_from_txt(txt)
[docs]@contextlib.contextmanager def temp_seed(seed): """ A context manager for temporarily setting the random seed within a block of code to ensure reproducibility of random operations. (From https://stackoverflow.com/questions/49555991/can-i-create-a-local-numpy-random-seed) Args: seed (int): The seed value to set for random number generation. Returns: None: This context manager does not return any value but ensures that the random state is reset to its original state after the block of code is executed. """ state = np.random.get_state() np.random.seed(seed) try: yield finally: np.random.set_state(state)
[docs]def str_delta_sec(seconds: int) -> str: """ Converts a time duration in seconds to a human-readable string, omitting time units that are zero. Examples: >>> sec = 2 * 24 * 3600 + 12 * 3600 + 5 * 60 + 1 >>> str_rep = str_delta_sec(sec) >>> assert str_rep == '2d:12h:05min:01s' >>> assert str_delta_sec(4 * 3600 + 20 * 60 + 10) == '4h:20min:10s' Args: seconds (int): The time duration in seconds, e.g., result of a time delta. Returns: str: A human-readable string representation of the time duration, formatted as 'Xd:Xh:XXmin:XXs' where X represents non-zero time units, e.g. '2d:12h:05min:01s' for sec = 1 + 5 * 60 + 12 * 3600 + 2 * 24 * 3600. """ m, s = divmod(int(seconds), 60) h, m = divmod(m, 60) d, h = divmod(h, 24) str_rep = '' if d > 0: str_rep += f'{d:d}d:' if h > 0: str_rep += f'{h:d}h:' if m > 0: str_rep += f'{m:02d}min:' str_rep += f'{s:02d}s' return str_rep