Source code for syconn.reps.super_segmentation_dataset

# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max Planck Institute of Neurobiology, Martinsried, Germany
# Authors: Philipp Schubert, Joergen Kornfeld
import copy
import glob
import os
import re
import shutil
from multiprocessing.pool import ThreadPool
from typing import List, Dict, Optional, Union, Tuple, Iterable, Generator
import pickle as pkl

import tqdm
import numpy as np

from . import log_reps
from .rep_helper import SegmentationBase
from .segmentation import SegmentationDataset, SegmentationObject
from .super_segmentation_helper import assemble_from_mergelist
from .super_segmentation_helper import view_embedding_of_sso_nocache
from .super_segmentation_object import SuperSegmentationObject
from .. import global_params
from ..handler.basics import load_pkl2obj, write_obj2pkl, chunkify, kd_factory
from ..handler.config import DynConfig
from ..handler import basics
from ..backend.storage import BinarySearchStore, bss_get_attr_helper
from ..mp import batchjob_utils as qu
from ..mp import mp_utils as sm

try:
    from knossos_utils import mergelist_tools
except ImportError:
    from knossos_utils import mergelist_tools_fallback as mergelist_tools


[docs]class SuperSegmentationDataset(SegmentationBase): """ Represents a set of agglomerated supervoxels, which are represented by `SegmentationObject` instances, and provides methods for accessing and manipulating their data. Examples: After initializing with `run_create_neuron_ssd` and subsequent analysis, SSV properties can be loaded via `load_cached_data` with keys corresponding to `ssv_ids`: * 'id': ID array, identical to `ssv_ids`. * 'bounding_box': Bounding box of every SSV. * 'size': Number of voxels of each SSV. * 'rep_coord': Representative coordinates for each SSV. * 'sv': Supervoxel IDs for every SSV. * 'sample_locations': Rendering locations for each SSV. * 'celltype_cnn_e3': Celltype classifications based on elektronn3 CMN. * 'celltype_cnn_e3_probas': Celltype logits as an array (M, C). * 'syn_ssv': Synapse IDs assigned to each SSV. * 'syn_sign_ratio': Area-weighted ratio of symmetric synapses. * 'sj': Synaptic junction object IDs mapped to each SSV. * 'mapping_sj_ids': Synaptic junction objects overlapping with SSVs. * 'mapping_sj_ratios': Overlap ratio of synaptic junctions. * 'vc': Vesicle clouds mapped to each SSV. * 'mapping_vc_ids': Vesicle cloud objects overlapping with SSVs. * 'mapping_vc_ratios': Overlap ratio of vesicle clouds. * 'mi': Mitochondria mapped to each SSV. * 'mapping_mi_ids': Mitochondria objects overlapping with SSVs. * 'mapping_mi_ratios': Overlap ratio of mitochondria. Initialize `SuperSegmentationDataset` and explore attributes: import numpy as np from syconn.reps.super_segmentation import * ssd = SuperSegmentationDataset(working_dir='~/SyConn/example_cube1/') n_synapses = [len(ssv.syn_ssv) for ssv in ssd.ssvs] path_length = [ssv.total_edge_length() for ssv in ssd.ssvs] syn_densities = np.array(n_synapses) / np.array(path_length) print(np.mean(syn_densities), np.std(syn_densities)) Obtain total number of synapses per cell type: celltypes = ssd.load_numpy_data('celltype_cnn_e3') n_synapses = np.array([len(el) for el in ssd.load_numpy_data('syn_ssv')]) n_synapes_per_type = {ct: np.sum(n_synapses[celltypes==ct]) for ct in range(np.max(celltypes))} print(n_synapes_per_type) Attributes: sso_caching (bool): Enables caching mechanisms in SuperSegmentationObjects returned via `get_super_segmentation_object`. sso_locking (bool): If True, locking is enabled for SSV files. Args: working_dir (Optional[str]): Path to the working directory. version (Optional[str]): Indicates the version of the dataset, e.g. '0', 'groundtruth' etc. ssd_type (str): Changes the directory prefix the dataset is stored in. version_dict (Optional[Dict[str, str]]): Dictionary with versions of other dataset types sharing the same working directory. sv_mapping (Optional[Union[Dict[int, int], str]]): Dictionary mapping supervoxel IDs to their super-supervoxel ID. scaling (Optional[Union[List, Tuple, np.ndarray]]): Array defining the voxel size in XYZ. Default is from `config.yml`. config (DynConfig): Config object, see `~syconn.handler.config.DynConfig`. Will be copied and fixed by setting `~syconn.handler.config.DynConfig.fix_config` to True. create (bool): Create folder if True. sd_lookup (Optional[Dict[str, SegmentationDataset]]): Lookup dict for `SegmentationDataset`, enabling usage of property cache arrays for all attributes specified in `property_cache` during init. of `SegmentationDataset`. cache_properties (Optional[List[str]]): Use numpy cache arrays to populate specified object properties when initializing `SuperSegmentationObject` via `get_super_segmentation_object`. overwrite (bool): Overwrite existing data if True. """ def __init__(self, working_dir: Optional[str] = None, version: Optional[str] = None, ssd_type: str = 'ssv', version_dict: Optional[Dict[str, str]] = None, sv_mapping: Optional[Union[Dict[int, int], str]] = None, scaling: Optional[Union[List, Tuple, np.ndarray]] = None, config: DynConfig = None, sso_caching: bool = False, sso_locking: bool = False, create: bool = False, sd_lookup: Optional[Dict[str, SegmentationDataset]] = None, cache_properties: Optional[List[str]] = None, overwrite: bool = False): """ Args: working_dir: Path to the working directory. version: Indicates the version of the dataset, e.g. '0', 'groundtruth' etc. ssd_type: Changes the directory prefix the dataset is stored in. Currently there is no real use-case for this. version_dict: Dictionary which contains the versions of other dataset types which share the same working directory. sv_mapping: Dictionary mapping supervoxel IDs (key) to their super-supervoxel ID. scaling: Array defining the voxel size in XYZ. Default is taken from the `config.yml` file. config: Config. object, see :class:`~syconn.handler.config.DynConfig`. Will be copied and then fixed by setting :py:attr:`~syconn.handler.config.DynConfig.fix_config` to True. sso_caching: WIP, enables caching mechanism in SuperSegmentationObjects returned via `get_super_segmentation_object` sso_locking: If True, locking is enabled for SSV files. sd_lookup: Lookup dict for :py:class:`~syconn.reps.segmentation.SegmentationDataset`, this will enable usage of property cache arrays for all attributes which have been specified in `property_cache` during init. of `SegmentationDataset` (see :class:`~syconn.reps.segmentation.SegmentationObject`). cache_properties: Use numpy cache arrays to populate the specified object properties when initializing :py:class:`~syconn.reps.super_segmentation_object.SuperSegmentationObject` via :py:func:`~get_super_segmentation_object`. create: Create folder. """ self.ssv_dict = {} self._mapping_dict = None self.sso_caching = sso_caching self.sso_locking = sso_locking self._mapping_lookup_reverse = None self.overwrite = overwrite self._type = ssd_type self._ssv_ids = None # cache mechanism self._ssoid2ix = None self._property_cache = dict() if cache_properties is None: cache_properties = tuple() if version == 'temp': version = 'tmp' self._setup_working_dir(working_dir, config, version, scaling) if version is not 'tmp' and self._config is not None: self._config = copy.copy(self._config) self._config.fix_config = True if version is None: try: self._version = self.config["versions"][self.type] except KeyError: raise Exception(f"Unclear version '{version}' during initialization of {self}.") elif version == "new": other_datasets = glob.glob(self.working_dir + "/%s_*" % self.type) max_version = -1 for other_dataset in other_datasets: try: other_version = \ int(re.findall(r"[\d]+", os.path.basename(other_dataset))[-1]) if max_version < other_version: max_version = other_version # version is not an integer, found version could be e.g. 'tmp' except IndexError: pass self._version = max_version + 1 else: self._version = version # init sd lookup if sd_lookup is None: sd_lookup = {"sv": None, "vc": None, "mi": None, "sj": None, "syn_ssv": None} self.sd_lookup = sd_lookup if version_dict is None: try: self.version_dict = self.config["versions"] except KeyError: raise ValueError("No version dict specified in config") else: if isinstance(version_dict, dict): self.version_dict = version_dict elif isinstance(version_dict, str) and version_dict == "load": if self.version_dict_exists: self.load_version_dict() else: raise ValueError("No version dict specified in config") if create: os.makedirs(self.path, exist_ok=True) if sv_mapping is not None: if type(sv_mapping) is dict and 0 in sv_mapping: raise ValueError('Zero ID in mapping dict which is not supported.') self.apply_mergelist(sv_mapping) self.enable_property_cache(cache_properties) def __repr__(self): """ Represents the SuperSegmentationDataset as a string. Returns: str: String representation of the SuperSegmentationDataset. """ return (f'{type(self).__name__}(ssd_type="{self.type}", ' f'version="{self.version}", working_dir="{self.working_dir}")') @property def type(self) -> str: """ Retrieves the type of the underlying supervoxel objects. This method returns the class type of the supervoxel objects used in the `SuperSegmentationObject`. Returns: str: The type of the supervoxel objects as represented in the `SuperSegmentationObject`. See Also: :class:`~syconn.reps.super_segmentation_object.SuperSegmentationObject`. """ return str(self._type) @property def scaling(self) -> np.ndarray: """ Retrieves the voxel size in nanometers (XYZ). The default values are obtained from the `config.yml` file and can be accessed through the :py:attr:`~config`. Returns: np.ndarray: Voxel size in nanometers as a numpy array. """ return self._scaling @property def working_dir(self) -> str: """ Retrieves the working directory. Returns: str: Working directory path. """ return self._working_dir @property def config(self) -> DynConfig: """ Retrieves the configuration object containing dataset-specific parameters, ensuring all relevant details from the previous documentation are retained. Returns: DynConfig: An object with all dataset-specific parameters. """ if self._config is None: self._config = global_params.config return self._config @property def path(self) -> str: """ Retrieves the full path to the dataset directory. Returns: str: Full path to the dataset directory ensuring to include all relevant information and resolve any conflicts with the old docstring. """ return "%s/%s_%s/" % (self._working_dir, self.type, self.version) @property def version(self) -> str: """ Retrieves the version of the dataset, included in the dataset's folder name. Returns: str: Dataset version, as part of the folder naming convention. """ return str(self._version) @property def version_dict_path(self) -> str: """ Retrieves the path to the version dictionary file. Returns: str: Path to the version dictionary file. """ return self.path + "/version_dict.pkl" @property def mapping_dict_exists(self) -> bool: """ Checks if the mapping dictionary exists (upper-supervoxel ID to supervoxel IDs). Returns: bool: True if the mapping dictionary exists, False otherwise. """ return os.path.exists(self.mapping_dict_path) @property def mapping_dict_path(self) -> str: """ Retrieves the path to the mapping dictionary file. Returns: str: Path to the mapping dictionary .pkl file. """ return self.path + "/mapping_dict.pkl" @property def mapping_lookup_reverse_path(self) -> str: """ Retrieves the path to the data structure storing the lookup from supervoxel ID to cell ID. Returns: str: Path to the data structure that stores the lookup from supervoxel ID to cell ID. """ return self.path + "/mapping_lookup_reverse.h5" @property def version_dict_exists(self) -> bool: """ Checks whether the version dictionary exists at :py:attr:`~version_dict_path`. Returns: bool: True if the version dictionary exists at the specified path, False otherwise. """ return os.path.exists(self.version_dict_path) @property def mapping_dict(self) -> Dict[int, np.ndarray]: """ Retrieves the dictionary containing the supervoxel IDs for each super-supervoxel. Returns: Dict[int, np.ndarray]: A dictionary where each key is the super-supervoxel identifier and the corresponding value is an array of supervoxel IDs. """ if self._mapping_dict is None: if self.mapping_dict_exists: self.load_mapping_dict() else: self._mapping_dict = {} return self._mapping_dict
[docs] def sv2ssv_ids(self, ids: np.ndarray, nb_cpus=1) -> Dict[int, int]: """ Queries the cell ID for a given array of supervoxel IDs using the `mapping_lookup_reverse`. Args: ids (np.ndarray): Array of unique IDs to find the corresponding cell IDs. nb_cpus (int): Number of CPUs to use for the query. IDs that are not in `sv_ids` will not be added to the output dictionary. Returns: Dict[int, int]: Dictionary with supervoxel ID as key and cell ID as value. """ assert np.ndim(ids) == 1 # explicitly cast to uint64 because if `ids` is a list of python int intersect auto-casts to float queries = np.intersect1d(ids, self.sv_ids).astype(np.uint64) log_reps.debug(f'Finished intersection of {len(ids)} query IDs.') if nb_cpus <= 1: query_res = self.mapping_lookup_reverse.get_attributes(queries, 'ssv_ids') else: params = [(self.mapping_lookup_reverse, ch, 'ssv_ids') for ch in np.array_split(queries, nb_cpus)] query_res = sm.start_multiprocess(bss_get_attr_helper, params, nb_cpus=nb_cpus, debug=nb_cpus <= 1) query_res = np.concatenate(query_res) log_reps.debug('Finished queries.') return dict(zip(queries, query_res))
@property def mapping_lookup_reverse(self) -> BinarySearchStore: """ Retrieves the BinarySearchStore for efficient look-ups from supervoxel ID to cell ID. Returns: BinarySearchStore: Lookup store object. """ if self._mapping_lookup_reverse is None: self._mapping_lookup_reverse = BinarySearchStore(self.mapping_lookup_reverse_path) return self._mapping_lookup_reverse
[docs] def create_mapping_lookup_reverse(self): """ Creates a data structure for efficient look-ups from supervoxel ID to cell ID, see :py:class:`syconn.backend.storage.BinarySearchStore`. """ ids, ssv_ids = [], [] for ssv_id, sv_ids in self.mapping_dict.items(): ssv_ids.extend([ssv_id] * len(sv_ids)) ids.extend(list(sv_ids)) ids = np.array(ids, dtype=np.uint64) ssv_ids = np.array(ssv_ids, dtype=np.uint64) BinarySearchStore( self.mapping_lookup_reverse_path, id_array=ids, attr_arrays=dict(ssv_ids=ssv_ids), overwrite=self.overwrite)
@property def ssv_ids(self) -> np.ndarray: """ Retrieves the super-supervoxel IDs that are part of this :class:`~syconn.reps.super_segmentation_dataset.SuperSegmentationDataset` object. Returns: np.ndarray: Array of super-supervoxel IDs. """ if self._ssv_ids is None: # do not change the order of the if statements as it is crucial # for the resulting ordering of self.ssv_ids (only id.npy matches # with all the other cached numpy arrays). self._ssv_ids = self.load_numpy_data('id', suppress_warning=True) if self._ssv_ids is not None: pass elif len(self.mapping_dict) > 0: self._ssv_ids = np.array(list(self.mapping_dict.keys())) elif self.mapping_dict_exists: self.load_mapping_dict() self._ssv_ids = np.array(list(self.mapping_dict.keys())) else: paths = glob.glob(self.path + "/so_storage/*/*/*/") self._ssv_ids = np.array([int(os.path.basename(p.strip("/"))) for p in paths], dtype=np.uint64) return self._ssv_ids @property def ssvs(self) -> Generator[SuperSegmentationObject, None, None]: """ Generator of :class:`~syconn.reps.super_segmentation_object.SuperSegmentationObject` objects which are part of this :class:`~syconn.reps.super_segmentation_dataset. SuperSegmentationDataset` object. Yields: :class:`~syconn.reps.super_segmentation_object.SuperSegmentationObject`: SuperSegmentationObject instances. """ ix = 0 tot_nb_ssvs = len(self.ssv_ids) while ix < tot_nb_ssvs: yield self.get_super_segmentation_object(self.ssv_ids[ix]) ix += 1 @property def sv_ids(self) -> np.ndarray: """ Retrieves a flat array of supervoxel IDs that are part of the cells (:attr:`~.ssv_ids`) in this :class:`~syconn.reps.super_segmentation_dataset.SuperSegmentationDataset` object. Returns: np.ndarray: Array of supervoxel IDs. """ return self.mapping_lookup_reverse.id_array
[docs] def load_numpy_data(self, prop_name: str, allow_nonexisting: bool = True, suppress_warning: bool = False) -> \ Optional[np.ndarray]: """ Loads a numpy array of a cached property. Todo: * remove 's' appendix in file names. Args: prop_name (str): Identifier for the requested cache array. Ordering of the array is the same as :py:attr:`~ssv_ids`. allow_nonexisting (bool): If False, will raise an error for missing numpy files. suppress_warning (bool): If True, suppresses the warning if the property does not exist. Returns: Optional[np.ndarray]: Numpy array of the cached property, or None if not found. """ if os.path.exists(self.path + prop_name + ".npy"): return np.load(self.path + prop_name + ".npy", allow_pickle=True) elif os.path.exists(self.path + prop_name + "s.npy"): log_reps.warning(f'Using "s" appendix in numpy cache file ' f'"{self.path + prop_name + "s.npy"}", this is deprecated.') return np.load(self.path + prop_name + "s.npy", allow_pickle=True) else: msg = f'Requested data cache "{prop_name}" did not exist in {self}.' if not allow_nonexisting: log_reps.error(msg) raise FileNotFoundError(msg) if not suppress_warning: log_reps.warning(msg)
[docs] def get_segmentationdataset(self, obj_type: str) -> SegmentationDataset: """ Retrieves the SegmentationDataset for a given object type. Args: obj_type (str): Object type identifier. Returns: SegmentationDataset: Corresponding SegmentationDataset. """ assert obj_type in self.version_dict return SegmentationDataset(obj_type, version=self.version_dict[obj_type], working_dir=self.working_dir)
[docs] def apply_mergelist(self, sv_mapping: Union[Dict[int, int], str]): """ Applies a supervoxel agglomeration to the dataset. Args: sv_mapping (Union[Dict[int, int], str]): Agglomeration mapping or path to the mapping file. """ os.makedirs(self.path, exist_ok=True) assemble_from_mergelist(self, sv_mapping)
[docs] def get_super_segmentation_object(self, obj_id: Union[int, Iterable[int]], new_mapping: bool = False, caching: Optional[bool] = None, create: bool = False) \ -> Union[SuperSegmentationObject, List[SuperSegmentationObject]]: """ Factory method for creating SuperSegmentationObject instances. Args: obj_id (Union[int, Iterable[int]]): ID(s) of the super-supervoxel(s) to instantiate. Can also be an iterable. new_mapping (bool): If True, uses the latest supervoxel agglomeration. Returns :class:`~syconn.reps.super_segmentation_object.SuperSegmentationObject` based on :py:attr:`~mapping_dict`. caching (Optional[bool], default False): Enables caching of various attributes. create (bool): If True, creates the directory structure for the super-supervoxel in the dataset folder structure. Notes: * `caching` parameter's default value updated to False, as per PS 20Feb2019. Returns: Union[SuperSegmentationObject, List[SuperSegmentationObject]]: The requested SuperSegmentationObject instance(s) corresponding to the given `obj_id` (int or Iterable[int]). """ kwargs_def = dict(ssd_type=self.type, create=create, scaling=self.scaling, object_caching=caching, voxel_caching=caching, mesh_caching=caching, view_caching=caching, enable_locking_so=False, enable_locking=self.sso_locking, config=self.config) if caching is None: caching = self.sso_caching if np.isscalar(obj_id): if new_mapping: sso = SuperSegmentationObject(obj_id, self.version, self.version_dict, self.working_dir, sv_ids=self.mapping_dict[obj_id], **kwargs_def) else: sso = SuperSegmentationObject(obj_id, self.version, self.version_dict, self.working_dir, **kwargs_def) for k, v in self._property_cache.items(): sso.attr_dict[k] = v[self._ssoid2ix[obj_id]] sso._ssd = self else: # TODO: use generator sso = [] for ix in obj_id: # call it with scalar input recursively sso.append(self.get_super_segmentation_object(ix, create=create, new_mapping=new_mapping, caching=caching)) return sso
[docs] def store_total_edge_lengths(self, ax_pred_key: Optional[str] = "axoness_avg10000", overwrite: Optional[bool] = False, nb_cpus: Optional[int] = None): """ Stores total edge lengths of all the cells in this dataset in nanometers. Same ordering as :attr:`~.ssv_ids`. Args: ax_pred_key (Optional[str]): Key of compartment prediction stored in the skeleton attribute. overwrite (Optional[bool]): If True, overwrites the existing `total_edge_lengths.npy` file. Defaults to False. nb_cpus (Optional[int]): Number of CPUs to use for the computation. Defaults to None. """ if os.path.exists(self.path + "total_edge_lengths.npy") and not overwrite: log_reps.warning(f"Total edge lengths already exist in {self.path}. To overwrite, set overwrite=True.") return if nb_cpus is None: import multiprocessing nb_cpus = multiprocessing.cpu_count() params = [(ch, ax_pred_key) for ch in list(basics.chunkify_successive(self.ssv_ids, 500))] total_edge_lengths = np.concatenate(sm.start_multiprocess_imap(get_total_edge_lengths, params=params, nb_cpus=nb_cpus), axis=0) np.save(self.path + "total_edge_lengths.npy", total_edge_lengths)
[docs] def store_path_densities_seg_objs(self, obj_type: str, compartments_of_interest: Optional[list] = None, ax_pred_key: Optional[str] = 'axoness_avg10000', overwrite: Optional[bool] = False, nb_cpus: Optional[int] = None): """ Stores path densities of all cells in the dataset for a given sub-cellular structure. The order corresponds to :attr:`~.ssv_ids`. Args: obj_type (str): Key to any available sub-cellular structure. compartments_of_interest (Optional[list]): Compartments to include in the calculation. Specify axon: 1, dendrite: 0, soma: 2 for filtering. ax_pred_key (Optional[str]): Key of compartment prediction stored in the skeleton. Defaults to 'axoness_avg10000' if `compartments_of_interest` is set. overwrite (Optional[bool]): If True, overwrites the existing file. Defaults to False. nb_cpus (Optional[int]): Number of CPUs to use for the computation. Defaults to None. """ if os.path.exists(self.path + obj_type + "_path_densities.npy") and not overwrite: log_reps.warning(f"Path densities for {obj_type} already exist in {self.path}. To overwrite, set overwrite=True.") return if nb_cpus is None: import multiprocessing nb_cpus = multiprocessing.cpu_count() params = [(obj_type, ch, compartments_of_interest, ax_pred_key) for ch in list(basics.chunkify_successive(self.ssv_ids, 500))] path_densities = np.concatenate(sm.start_multiprocess_imap(get_path_density_seg_obj, params=params, nb_cpus=nb_cpus), axis=0) np.save(self.path + obj_type + "_path_densities" + ".npy", path_densities)
[docs] def save_dataset_shallow(self, overwrite: bool = False): """ Saves :py:attr:`~version_dict`, :py:attr:`~mapping_dict`. Args: overwrite (bool): If True, allows overwriting existing files. Do not replace existing files. """ if not self.version_dict_exists or overwrite: self.save_version_dict() if not self.mapping_dict_exists or overwrite: self.save_mapping_dict()
[docs] def save_dataset_deep(self, extract_only: bool = False, attr_keys: Iterable[str] = (), n_jobs: Optional[int] = None, nb_cpus: Optional[int] = None, use_batchjob=True, new_mapping: bool = True): """ Saves attributes of all SSVs within the given SSD and computes properties like size and representative coordinate. The order of :py:attr:`~ssv_ids` may change each run. Populates the ``sv_ids`` attribute of all SSVs. The behavior and order of operations might differ from run to run. See :func:`~syconn.reps.super_segmentation_dataset.save_dataset_deep`. Args: extract_only (bool): Only cache attributes `attr_keys` from attribute dict. If True, adds suffix '_sel' to the cache array file names, updates will not apply to the :func:`~load_cached_data` method. attr_keys (Iterable[str]): Attributes to cache, only used if `extract_only` is True. n_jobs (Optional[int]): Enables batch job system if set. Requires any string to enable batch job system, will be replaced by a global flag soon. nb_cpus (Optional[int]): Number of CPUs per worker. use_batchjob (bool): If True, uses batchjob processing instead of local multiprocessing. new_mapping (bool): If True, applies new supervoxel agglomeration. """ save_dataset_deep(self, extract_only=extract_only, attr_keys=attr_keys, n_jobs=n_jobs, nb_cpus=nb_cpus, new_mapping=new_mapping, overwrite=self.overwrite, use_batchjob=use_batchjob)
[docs] def save_version_dict(self): """ Saves the version dictionary to a `.pkl` file. """ if len(self.version_dict) > 0: write_obj2pkl(self.version_dict_path, self.version_dict)
[docs] def load_version_dict(self): """ Loads the version dictionary from the specified `.pkl` file. """ assert self.version_dict_exists self.version_dict = load_pkl2obj(self.version_dict_path)
[docs] def save_mapping_dict(self): """ Save the mapping dictionary to a `.pkl` file. """ if len(self.mapping_dict) > 0: write_obj2pkl(self.mapping_dict_path, self.mapping_dict) else: log_reps.warn(f'No entries in mapping dict of {self}.')
[docs] def load_mapping_dict(self): """ Loads the mapping dictionary from the specified `.pkl` file. """ assert self.mapping_dict_exists self._mapping_dict = load_pkl2obj(self.mapping_dict_path)
[docs] def enable_property_cache(self, property_keys: List[str]): """ Enables caching for specified properties. Args: property_keys (List[str]): List of property keys to cache. Numpy cache arrays must exist. """ # look-up for so IDs to index in cache arrays if len(property_keys) == 0: return if self._ssoid2ix is None: self._ssoid2ix = {k: ix for ix, k in enumerate(self.ssv_ids)} self._property_cache.update({k: self.load_numpy_data(k, allow_nonexisting=False) for k in property_keys})
[docs]def save_dataset_deep(ssd: SuperSegmentationDataset, extract_only: bool = False, attr_keys: Iterable = (), n_jobs: Optional[int] = None, nb_cpus: Optional[int] = None, use_batchjob=True, new_mapping: bool = True, overwrite=False): """ Saves attributes of all SSVs within the given SSD and computes properties like size and representative coordinate. The order of `id.npy` may change after repeated runs. Todo: * Extract_only requires refactoring as it stores cache arrays under a different filename. * Allow partial updates of a subset of attributes (e.g. use already existing `id.npy` in case of updating, aka `extract_only=True`). * Check consistency of ordering for different runs. Args: ssd: SuperSegmentationDataset to operate on, representing a collection of super-segmented volume data. extract_only: If True, caches only the specified attributes in `attr_keys`. Adds a suffix `_sel` to the numpy cache array file names, excluding them from the `load_cached_data` method. attr_keys: Iterable of the attribute keys to cache, utilized when `extract_only` is True. n_jobs: The number of jobs to use for batch processing. Defaults to 'Any String' but will be updated with a global flag for easier configuration. nb_cpus: Number of CPUs allocated per worker in the batch job system. use_batchjob: If True, enables batch job processing. If False, uses local multiprocessing, overriding the `n_jobs` setting. new_mapping: If set, applies the mapping from `ssd.mapping_dict` using `ssd.load_mapping_dict` to update the `sv_ids` attribute for all SSVs. overwrite: If True, deletes the existing SSD folder before saving. If False and the folder already exists, triggers a FileExistsError to prevent data loss. """ # This is to only remove files for overwriting that are actually generated here; e.g. mapping_lookup_reverse # is not written here and thus should not be deleted here if overwrite = True. deep_ssd_storage_pths = [ ssd.mapping_dict_path, ssd.mapping_dict_path, ssd.version_dict_path, f'{ssd.path}/id.npy', f'{ssd.path}/size.npy', f'{ssd.path}/sv.npy', f'{ssd.path}/rep_coord.npy', f'{ssd.path}/bounding_box.npy', ] # check if ssv storages already exists if new_mapping and os.path.exists(ssd.path) and len(glob.glob(ssd.path + '/so_storage/*')) > 1: if not overwrite: msg = f'{ssd} already exists and overwrite is False.' log_reps.error(msg) raise FileExistsError(msg) else: for cur_pth in deep_ssd_storage_pths: if not os.path.exists(cur_pth): continue if os.path.isdir(cur_pth): shutil.rmtree(cur_pth) else: os.remove(cur_pth) ssd.save_dataset_shallow(overwrite=overwrite) if n_jobs is None: n_jobs = ssd.config.ncore_total ssv_id_blocks = chunkify(ssd.ssv_ids, n_jobs) # provide SV IDs for every SSV if new_mapping: log_reps.debug('Preparing SSV->SVs look-up.') multi_params = [] for ssv_id_block in tqdm.tqdm(ssv_id_blocks, desc='SSV ID'): id_map = dict() for ssv_id in ssv_id_block: id_map[ssv_id] = ssd.mapping_dict[ssv_id] multi_params.append((ssv_id_block, ssd.version, ssd.version_dict, ssd.working_dir, extract_only, attr_keys, ssd._type, id_map)) else: multi_params = [(ssv_id_block, ssd.version, ssd.version_dict, ssd.working_dir, extract_only, attr_keys, ssd._type, None) for ssv_id_block in ssv_id_blocks] if not qu.batchjob_enabled() or not use_batchjob: results = sm.start_multiprocess(_write_super_segmentation_dataset_thread, multi_params, nb_cpus=nb_cpus) else: path_to_out = qu.batchjob_script(multi_params, "write_super_segmentation_dataset", n_cores=nb_cpus) out_files = glob.glob(path_to_out + "/*") results = [] for out_file in out_files: with open(out_file, 'rb') as f: results.append(pkl.load(f)) shutil.rmtree(os.path.abspath(path_to_out + "/../"), ignore_errors=True) attr_dict = {} for this_attr_dict in results: for attribute in this_attr_dict.keys(): if attribute not in attr_dict: attr_dict[attribute] = [] attr_dict[attribute] += this_attr_dict[attribute] if not ssd.mapping_dict_exists or len(ssd.mapping_dict) == 0: # initialize mapping dict ssd._mapping_dict = dict(zip(attr_dict["id"], attr_dict["sv"])) ssd.save_dataset_shallow() for attribute in attr_dict.keys(): if extract_only: np.save(ssd.path + "/%ss_sel.npy" % attribute, # Why '_sel'? attr_dict[attribute]) else: if len(ssd.ssv_ids) != len(attr_dict[attribute]): log_reps.critical(f'Number of cells ({len(ssd.ssv_ids)}) is different to attribute array ' f'({len(attr_dict[attribute])}, "{attribute}")') np.save(ssd.path + "/%s.npy" % attribute, attr_dict[attribute]) log_reps.info(f'Finished `save_dataset_deep` of {ssd}.')
def _write_super_segmentation_dataset_thread(args): """ Writes attributes of SuperSegmentationObjects to the dataset in a thread-safe manner. Args: args: A tuple containing parameters for the thread function. """ ssv_obj_ids = args[0] version = args[1] version_dict = args[2] working_dir = args[3] extract_only = args[4] attr_keys = args[5] ssd_type = args[6] mapping_dict = args[7] if not extract_only: # TODO: use prepare_so_attr_cache to reduce memory consumption - only works if mapping_dict is not None # needs to be built in downstream in ssv_obj... or overwrite sd_sv cache dict. # from syconn.reps.segmentation_helper import prepare_so_attr_cache # attr_cache = prepare_so_attr_cache(segmentation.SegmentationDataset('sv', working_dir=working_dir), # np.concatenate(list(mapping_dict.values())), ['size', 'bounding_box']) sd_sv = SegmentationDataset(working_dir=working_dir, obj_type='sv', cache_properties=['size', 'bounding_box']) sd_lookup = dict(sv=sd_sv) else: sd_lookup = None ssd = SuperSegmentationDataset(working_dir=working_dir, version=version, sd_lookup=sd_lookup, ssd_type=ssd_type, version_dict=version_dict) new_mapping = False if mapping_dict is not None: ssd._mapping_dict = mapping_dict new_mapping = True attr_dict = dict(id=[]) for ssv_obj_id in ssv_obj_ids: # assume SSV exists already if new_mapping is False to save IO. ssv_obj = ssd.get_super_segmentation_object(ssv_obj_id, new_mapping=new_mapping, create=new_mapping) if ssv_obj.attr_dict_exists: ssv_obj.load_attr_dict() if not extract_only: if len(ssv_obj.attr_dict["sv"]) == 0: raise Exception(f"No mapping information found for {ssv_obj}.") if not extract_only: if "rep_coord" not in ssv_obj.attr_dict: ssv_obj.attr_dict["rep_coord"] = ssv_obj.rep_coord if "bounding_box" not in ssv_obj.attr_dict: ssv_obj.attr_dict["bounding_box"] = ssv_obj.bounding_box if "size" not in ssv_obj.attr_dict: ssv_obj.attr_dict["size"] = ssv_obj.size if extract_only: ignore = False for attribute in attr_keys: if attribute not in ssv_obj.attr_dict: ignore = True break if ignore: continue attr_dict["id"].append(ssv_obj_id) for attribute in attr_keys: if attribute not in attr_dict: attr_dict[attribute] = [] if attribute in ssv_obj.attr_dict: attr_dict[attribute].append(ssv_obj.attr_dict[attribute]) else: attr_dict[attribute].append(None) else: attr_dict["id"].append(ssv_obj_id) for attribute in ssv_obj.attr_dict.keys(): if attribute not in attr_dict: attr_dict[attribute] = [] attr_dict[attribute].append(ssv_obj.attr_dict[attribute]) ssv_obj.save_attr_dict() return attr_dict
[docs]def load_voxels_downsampled(sso, downsampling=(2, 2, 1), nb_threads=10): """ Loads downsampled voxels for a given SuperSegmentationObject. Args: sso: The SuperSegmentationObject to load voxels for. downsampling: The downsampling factor as a tuple (x, y, z). nb_threads: The number of threads to use for loading. Returns: A numpy array of downsampled voxels. """ def _load_sv_voxels_thread(args): sv_id = args[0] sv = SegmentationObject(sv_id, obj_type="sv", version=sso.version_dict["sv"], working_dir=sso.working_dir, config=sso.config, voxel_caching=False) if sv.voxels_exist: box = [np.array(sv.bounding_box[0] - sso.bounding_box[0], dtype=np.int32)] box[0] /= downsampling size = np.array(sv.bounding_box[1] - sv.bounding_box[0], dtype=np.float32) size = np.ceil(size.astype(np.float32) / downsampling).astype(np.int32) box.append(box[0] + size) sv_voxels = sv.voxels if not isinstance(sv_voxels, int): sv_voxels = sv_voxels[::downsampling[0], ::downsampling[1], ::downsampling[2]] voxels[box[0][0]: box[1][0], box[0][1]: box[1][1], box[0][2]: box[1][2]][sv_voxels] = True downsampling = np.array(downsampling, dtype=np.int32) if len(sso.sv_ids) == 0: return None voxel_box_size = sso.bounding_box[1] - sso.bounding_box[0] voxel_box_size = voxel_box_size.astype(np.float32) voxel_box_size = np.ceil(voxel_box_size / downsampling).astype(np.int32) voxels = np.zeros(voxel_box_size, dtype=np.bool) multi_params = [] for sv_id in sso.sv_ids: multi_params.append([sv_id]) if nb_threads > 1: pool = ThreadPool(nb_threads) pool.map(_load_sv_voxels_thread, multi_params) pool.join() pool.close() else: map(_load_sv_voxels_thread, multi_params) return voxels
[docs]def copy_ssvs2new_SSD_simple(ssvs: List[SuperSegmentationObject], new_version: str, target_wd: Optional[str] = None, n_jobs: int = 1, safe: bool = True): """ Creates a new SSD specified with `new_version` and a copy of the given SSVs. Usually used for generating distinct GT SSDs. Super-supervoxel dataset is specified in the `config.yml` file, default: `version=ssv_0`. Args: ssvs: Source SuperSegmentationObjects from default SSD in working directory or a list of SSOs if specified. new_version: Version of the new SSD where SSVs will be copied to. target_wd: Optional target working directory. If None, uses the default from :py:attr:`~syconn.global_params`. n_jobs: Number of jobs to use for the copying process. safe: If set to True, existing data will not be overwritten. Returns: None """ # update existing SSV IDs # TODO: currently this requires a new mapping dict Unclear what to # do in order to enable updates on existing SSD (e.g. after adding new SSVs) # paths = glob.glob(ssd.path + "/so_storage/*/*/*/") # ssd._ssv_ids = np.array([int(os.path.basename(p.strip("/"))) # for p in paths], dtype=np.uint64) if target_wd is None: target_wd = global_params.config.working_dir scaling = ssvs[0].scaling new_ssd = SuperSegmentationDataset(working_dir=target_wd, version=new_version, scaling=scaling) for old_ssv in ssvs: new_ssv = SuperSegmentationObject(old_ssv.id, version=new_version, working_dir=target_wd, sv_ids=old_ssv.sv_ids, scaling=old_ssv.scaling) old_ssv.copy2dir(dest_dir=new_ssv.ssv_dir, safe=safe) log_reps.info("Saving dataset deep.") new_ssd.save_dataset_deep(new_mapping=False, nb_cpus=n_jobs)
[docs]def exctract_ssv_morphology_embedding(args: Union[tuple, list]): """ Infers local morphology embeddings of a cell reconstruction. Args: args (tuple/list): A collection of parameters where `args[0]` contains the cell reconstruction IDs (`ssv_obj_ids`), `args[1:4]` are used to initialize the :class:`~syconn.reps.super_segmentation_dataset .SuperSegmentationDataset`, and `args[4]` (`pred_key_appendix`) is an optional addition to the default key for storing the embeddings. See :func:`~syconn.reps.super_segmentation_object .SuperSegmentationObject.predict_views_embedding` for more details. """ ssv_obj_ids = args[0] nb_cpus = args[1] pred_key_appendix = args[2] use_onthefly_views = global_params.config.use_onthefly_views view_props = global_params.config['views']['view_properties'] ssd = SuperSegmentationDataset() from ..handler.prediction import get_tripletnet_model_e3 for ssv_id in ssv_obj_ids: ssv = ssd.get_super_segmentation_object(ssv_id) m = get_tripletnet_model_e3() ssv.nb_cpus = nb_cpus ssv._view_caching = True if use_onthefly_views: view_embedding_of_sso_nocache(ssv, m, pred_key_appendix=pred_key_appendix, overwrite=True, **view_props) else: ssv.predict_views_embedding(m, pred_key_appendix)
[docs]def get_total_edge_lengths(ssv_ids: Union[np.ndarray, list], ax_pred_key: str) -> np.ndarray: """ Retrieves the total edge lengths of the super-supervoxels' :py:attr:`~skeleton` in nanometers. The compartments used to compute the edge lengths are as follows: axon: 1, axon terminals: 3, 4, dendrite: 0, soma: 2. Args: ssv_ids: Array or list of super-supervoxel IDs for which the edge lengths are calculated. ax_pred_key: Key for compartment prediction stored in the :py:attr:`~skeleton`. Returns: An array of the total edge lengths (L2 norm) for the specified super-supervoxels. """ total_edge_lengths = [] ssd = SuperSegmentationDataset() for ssv_id in ssv_ids: ssv = ssd.get_super_segmentation_object(ssv_id) ssv.load_skeleton() if ax_pred_key not in ssv.skeleton.keys(): ax_pred_key = 'axoness' # fall back to the old key total_edge_lengths.append(ssv.total_edge_length(compartments_of_interest=[0, 1, 2, 3, 4], ax_pred_key=ax_pred_key)) return np.array(total_edge_lengths)
[docs]def get_path_density_seg_obj(args: Union[tuple, list]) -> np.ndarray: """ Retrieves the path density of sub-cellular structures for a set of super-supervoxels. Args: *args: A sequence containing three elements: `obj_type`: Key to any available sub-cellular structure, args[0], `ssv_ids`: Cell reconstruction ids, args[1], `compartments_of_interest`: Compartment labels to calculate path densities for, args[2]. Valid compartment labels are axon (1), dendrite (0), soma (2), en-passant bouton (3), terminal bouton (4). Optionally, `ax_pred_key`: Key of compartment prediction stored in :attr:`~skeleton` attribute, used if `compartments_of_interest` is provided. Returns: An array of average volume per path length (um^3/um) for the specified super-supervoxels, indicating the path density of various sub-cellular structures. """ obj_type = args[0] ssv_ids = args[1] compartments_of_interest = args[2] ax_pred_key = args[3] path_densities = [] ssd = SuperSegmentationDataset() for ssv_id in ssv_ids: ssv = ssd.get_super_segmentation_object(ssv_id) ssv.load_skeleton() path_densities.append(ssv.path_density_seg_obj(obj_type, compartments_of_interest=compartments_of_interest, ax_pred_key=ax_pred_key)) return np.array(path_densities)
[docs]def filter_ssd_by_total_pathlength(ssd: SuperSegmentationDataset, min_edge_length: float) -> np.ndarray: """ Filters cells concurrently based on a minimum skeleton edge length. Args: ssd: The SuperSegmentationDataset to filter. min_edge_length: The minimum total path length of the skeleton in µm. Returns: An array of :class:`~SuperSegmentationObject` IDs where the total skeleton edge length exceeds the `min_edge_length` criterion. """ # TODO: @hashirah adapt numpy cache key total_path_lengths = ssd.load_numpy_data('total_edge_length') if total_path_lengths is not None: return ssd.ssv_ids[total_path_lengths >= min_edge_length] if total_path_lengths == 0: return ssd.ssv_ids params = [(ch, min_edge_length) for ch in chunkify(ssd.ssv_ids, min(len(ssd.ssv_ids), 1000))] filtered_ssv_ids = np.concatenate(sm.start_multiprocess_imap(_filter_ssvs_by_total_pathlength, params)) return filtered_ssv_ids
def _filter_ssvs_by_total_pathlength(args: tuple) -> list: """ Filters a subset of super-supervoxels based on a minimum total path length of their skeletons. Args: args: A tuple containing super-supervoxel IDs and the minimum edge length in micrometers. Returns: A list of super-supervoxel IDs that meet the minimum path length criterion. """ ssv_ids, min_edge_length = args ssv_ids_of_interest = [] ssd = SuperSegmentationDataset() for ssv_id in ssv_ids: ssv = ssd.get_super_segmentation_object(ssv_id) length = ssv.total_edge_length() / 1e3 # nm to µm if length < min_edge_length: continue ssv_ids_of_interest.append(ssv_id) return ssv_ids_of_interest