Source code for syconn.reps.segmentation

# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max-Planck-Institute of Neurobiology, Munich, Germany
# Authors: Philipp Schubert, Joergen Kornfeld
import copy
import re
from typing import Union, Tuple, List, Optional, Dict, Generator, Any, Iterator

import networkx as nx
from knossos_utils import knossosdataset
from scipy import spatial

from .rep_helper import subfold_from_ix, knossos_ml_from_svixs, SegmentationBase, get_unique_subfold_ixs
from .segmentation_helper import *
from ..handler.basics import get_filepaths_from_dir, safe_copy, \
    write_txt2kzip
from ..handler.basics import load_pkl2obj, write_obj2pkl, kd_factory
from ..handler.config import DynConfig
from ..proc import meshes
from ..proc.meshes import mesh_area_calc
from ..backend.storage import VoxelStorageDyn

MeshType = Union[Tuple[np.ndarray, np.ndarray, np.ndarray], List[np.ndarray],
                 Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]


[docs]class SegmentationObject(SegmentationBase): """ Represents individual supervoxels. Used for cell shape ('sv'), cell organelles, e.g. mitochondria ('mi'), vesicle clouds ('vc') and synaptic junctions ('sj'). Examples: Can be used to initialized single :class:`~SegmentationObject` object of a specific type, is also returned by :func:`~SegmentationDataset.get_segmentation_object`:: from syconn.reps.segmentation import SegmentationObject, SegmentationDataset cell_sv = SegmentationObject(obj_id=.., obj_type='sv', working_dir='..') cell_sv.load_attr_dict() # populates `cell_sv.attr_dict` cell_sd = SegmentationDataset(obj_type='sv', working_dir='..') cell_sv_from_sd = cell_sd.get_segmentation_object(obj_id=cell_sv.id) cell_sv_from_sd.load_attr_dict() keys1 = set(cell_sv.attr_dict.keys()) keys2 = set(cell_sv_from_sd.attr_dict.keys()) print(keys1 == keys2) Attributes: attr_dict: Attribute dictionary which serves as a general-purpose container. Accessed via the :class:`~syconn.backend.storage.AttributeDict` interface. enable_locking: If True, enables file locking. """ def __init__(self, obj_id: int, obj_type: str = "sv", version: Optional[str] = None, working_dir: Optional[str] = None, rep_coord: Optional[np.ndarray] = None, size: Optional[int] = None, scaling: Optional[np.ndarray] = None, create: bool = False, voxel_caching: bool = True, mesh_caching: bool = False, view_caching: bool = False, config: DynConfig = None, n_folders_fs: int = None, enable_locking: bool = True, skeleton_caching: bool = True, mesh: Optional[MeshType] = None): """ If `working_dir` is given and the directory contains a valid `config.yml`file, all other optional kwargs will be defined by the :class:`~syconn.handler.config.DynConfig` object available in :attr:`~syconn.global_params.config`. Args: obj_id: Unique supervoxel ID. obj_type: Type of the supervoxel, keys used currently are: * 'mi': Mitochondria * 'vc': Vesicle clouds * 'sj': Synaptic junction * 'syn_ssv': Synapses between two * 'syn': Synapse fragment between two :class:`~syconn.reps.segmentation.SegmentationObject`s. :class:`~syconn.reps.super_segmentation_object.SuperSegmentationObject`s. * 'cs': Contact site version: Version string identifier. if 'tmp' is used, no data will be saved to disk. working_dir: Path to folder which contains SegmentationDataset of type 'obj_type'. rep_coord: Representative coordinate. size: Number of voxels. scaling: Array defining the voxel size in nanometers (XYZ). create: If True, the folder to its storage location :py:attr:`~segobj_dir` will be created. voxel_caching: Enables caching for voxel data. mesh_caching: Enables caching for mesh data. view_caching: Enables caching for view data. skeleton_caching: Enables caching for skeleton data. config: :class:`~syconn.handler.config.DynConfig` object. n_folders_fs: Number of folders within the :class:`~syconn.reps.segmentation.SegmentationDataset`'s folder structure. enable_locking: If True, enables file locking. mesh: Mesh data as flat arrays: (indices, vertices, ) or (indices, vertices, normals) """ self._id = int(obj_id) self._type = obj_type self._rep_coord = rep_coord self._size = size self._n_folders_fs = n_folders_fs self.attr_dict = {} self._bounding_box = None self._paths_to_voxels = None self.enable_locking = enable_locking self._voxel_caching = voxel_caching self._mesh_caching = mesh_caching self._mesh_bb = None self._view_caching = view_caching self._voxels = None self._voxel_list = None self._mesh = mesh self._config = config self._views = None self._skeleton = None self._skeleton_caching = skeleton_caching if version == 'temp': version = 'tmp' self._setup_working_dir(working_dir, config, version, scaling) if version is None: try: self._version = self.config["versions"][self.type] except KeyError: raise Exception(f"Unclear version '{version}' during initialization of {self}.") else: self._version = version if create: os.makedirs(self.segobj_dir, exist_ok=True) # IMMEDIATE PARAMETERS def __hash__(self): return hash((self.id, self.type.__hash__())) def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.id == other.id and self.type == other.type def __ne__(self, other): return not self.__eq__(other) def __repr__(self): return (f'{type(self).__name__}(obj_id={self.id}, obj_type="{self.type}", ' f'version="{self.version}", working_dir="{self.working_dir}")') def __reduce__(self): """ Support pickling of class instances. """ return self.__class__, (self._id, self._type, self._version, self._working_dir, self._rep_coord, self._size, self._scaling, False, self._voxel_caching, self._mesh_caching, self._view_caching, self._config, self._n_folders_fs, self.enable_locking, self._skeleton_caching, self._mesh) @property def type(self) -> str: """ The `type` of the supervoxel. Examples: Keys which are currently used: * 'mi': Mitochondria. * 'vc': Vesicle clouds. * 'sj': Synaptic junction. * 'syn_ssv': Synapses between two * 'syn': Synapse fragment between two :class:`~SegmentationObject` objects. * 'cs': Contact site. Can be used to initialized single :class:`~SegmentationObject` object of a specific type or the corresponding dataset collection handled with the :class:`~SegmentationDataset` class:: from syconn.reps.segmentation import SegmentationObject, SegmentationDataset cell_sv = SegmentationObject(obj_id=.., obj_type='sv', working_dir='..') cell_sv.load_attr_dict() # populates `cell_sv.attr_dict` cell_sd = SegmentationDataset(obj_type='sv', working_dir='..') cell_sv_from_sd = cell_sd.get_segmentation_object(obj_id=cell_sv.id) cell_sv_from_sd.load_attr_dict() keys1 = set(cell_sv.attr_dict.keys()) keys2 = set(cell_sv_from_sd.attr_dict.keys()) print(keys1 == keys2) Returns: String identifier. """ return self._type @property def n_folders_fs(self) -> int: """ Number of folders used to store the data of :class:`~SegmentationObject`s. Defines the hierarchy of the folder structure organized by :class:`~SegmentationDataset`. Returns: The number of (leaf-) folders used for storing supervoxel data. """ if self._n_folders_fs is None: ps = glob.glob( "%s/%s*/" % (self.segds_dir, self.so_storage_path_base)) if len(ps) == 0: raise Exception("No storage folder found at '{}' and no number of " "subfolders specified (n_folders_fs))".format(self.segds_dir)) bp = os.path.basename(ps[0].strip('/')) for p in ps: bp = os.path.basename(p.strip('/')) if bp == self.so_storage_path_base: bp = os.path.basename(p.strip('/')) break if bp == self.so_storage_path_base: self._n_folders_fs = 100000 else: self._n_folders_fs = int(re.findall(r'[\d]+', bp)[-1]) return self._n_folders_fs @property def id(self) -> int: """ Returns: Globally unique identifier of this object. """ return self._id @property def version(self) -> str: """ Version of the :class:`~SegmentationDataset` this object belongs to. Returns: String identifier of the object's version. """ return str(self._version) @property def voxel_caching(self) -> bool: """If True, voxel data is cached after loading.""" return self._voxel_caching @property def mesh_caching(self) -> bool: """If True, mesh data is cached.""" return self._mesh_caching @property def skeleton_caching(self): """If True, skeleton data is cached.""" return self._skeleton_caching @property def view_caching(self): """If True, view data is cached.""" return self._view_caching @property def scaling(self): """ Voxel size in nanometers (XYZ). Default is taken from the `config.yml` file and accessible via `self.config`. """ if self._scaling is None: try: self._scaling = \ np.array(self.config['scaling'], dtype=np.float32) except: self._scaling = np.array([1, 1, 1]) return self._scaling @property def dataset(self) -> 'SegmentationDataset': """ Factory method for the `~syconn.reps.segmentation.SegmentationDataset` this object belongs to. """ return SegmentationDataset(self.type, self.version, self._working_dir) @property def config(self) -> DynConfig: """ Config. object which contains all dataset-sepcific parameters. """ if self._config is None: self._config = global_params.config return self._config # PATHS @property def working_dir(self) -> str: """ Working directory. """ return self._working_dir @property def identifier(self) -> str: """ Identifier used to create the folder name of the `~syconn.reps.segmentation.SegmentationDataset`. """ return "%s_%s" % (self.type, self.version.lstrip("_")) @property def segds_dir(self) -> str: """ Path to the `~syconn.reps.segmentation.SegmentationDataset` directory. """ return "%s/%s/" % (self.working_dir, self.identifier) @property def so_storage_path_base(self) -> str: """ Base folder name. Todo: * refactor. """ return "so_storage" @property def so_storage_path(self) -> str: """ Path to entry folder of the directory tree where all supervoxel data of the corresponding `~syconn.reps.segmentation.SegmentationDataset` is located. """ if self._n_folders_fs is None and os.path.exists("%s/%s/" % ( self.segds_dir, self.so_storage_path_base)): return "%s/%s/" % (self.segds_dir, self.so_storage_path_base) elif self._n_folders_fs == 100000 and os.path.exists("%s/%s/" % ( self.segds_dir, self.so_storage_path_base)): return "%s/%s/" % (self.segds_dir, self.so_storage_path_base) else: return "%s/%s_%d/" % (self.segds_dir, self.so_storage_path_base, self.n_folders_fs) @property def segobj_dir(self) -> str: """ Path to the folder where the data of this supervoxel is stored. """ base_path = f"{self.so_storage_path}/" \ f"{subfold_from_ix(self.id, self.n_folders_fs)}/" if os.path.exists(f"{base_path}/voxel.pkl"): return base_path else: # use old folder scheme with leading 0s, e.g. '09' return "%s/%s/" % (self.so_storage_path, subfold_from_ix( self.id, self.n_folders_fs, old_version=True)) @property def mesh_path(self) -> str: """ Path to the mesh storage. """ return self.segobj_dir + "mesh.pkl" @property def skeleton_path(self) -> str: """ Path to the skeleton storage. """ return self.segobj_dir + "skeletons.pkl" @property def attr_dict_path(self) -> str: """ Path to the attribute storage. """ return self.segobj_dir + "attr_dict.pkl"
[docs] def view_path(self, woglia=True, index_views=False, view_key=None) -> str: """ Path to the view storage. """ if view_key is not None and not (woglia and not index_views): raise ValueError('view_path with custom view key is only allowed for default settings.') # TODO: change bool index_views and bool woglia to respective view_key identifier if view_key is not None: return self.segobj_dir + 'views_{}.pkl'.format(view_key) if index_views: return self.segobj_dir + "views_index.pkl" elif woglia: return self.segobj_dir + "views_woglia.pkl" return self.segobj_dir + "views.pkl"
@property def locations_path(self) -> str: """ Path to the rendering location storage. """ return self.segobj_dir + "locations.pkl" @property def voxel_path(self) -> str: """ Path to the voxel storage. See :class:`~syconn.backend.storage.VoxelStorageDyn` for details. """ # file type is inferred by either VoxelStorageLazyLoading or VoxelStorageDyn return self.segobj_dir + "/voxel" # PROPERTIES @property def cs_partner(self) -> Optional[List[int]]: """ Contact site specific attribute. Returns: None if object is not of type 'cs', else return the IDs to the two supervoxels which are part of the contact site. """ # TODO: use `cs_id_to_partner_ids_vec` (single source of truth) if self.type in ['cs', 'syn']: partner = [self.id >> 32] partner.append(self.id - (partner[0] << 32)) return partner else: return None @property def size(self) -> int: """ Returns: Number of voxels. """ if self._size is None and 'size' in self.attr_dict: self._size = self.attr_dict['size'] elif self._size is None and self.attr_dict_exists: self._size = self.lookup_in_attribute_dict("size") if self._size is None: self.calculate_size() return self._size @property def shape(self) -> np.ndarray: """ The XYZ extent of this SSV object in voxels. Returns: The shape/extent of thiss SSV object in voxels (XYZ). """ return self.bounding_box[1] - self.bounding_box[0] @property def bounding_box(self) -> np.ndarray: if self._bounding_box is None and 'bounding_box' in self.attr_dict: self._bounding_box = self.attr_dict['bounding_box'] elif self._bounding_box is None and self.attr_dict_exists: self._bounding_box = self.lookup_in_attribute_dict('bounding_box') if self._bounding_box is None: self.calculate_bounding_box() return self._bounding_box @property def rep_coord(self) -> np.ndarray: """ Representative coordinate of this SSV object. Will be the `rep_coord` of the first supervoxel in :py:attr:`~svs`. Returns: 1D array of the coordinate (XYZ). """ if self._rep_coord is None and 'rep_coord' in self.attr_dict: self._rep_coord = self.attr_dict['rep_coord'] elif self._rep_coord is None and self.attr_dict_exists: self._rep_coord = self.lookup_in_attribute_dict("rep_coord") if self._rep_coord is None: self.calculate_rep_coord() return self._rep_coord @property def attr_dict_exists(self) -> bool: """ Checks if a attribute dictionary file exists at :py:attr:`~attr_dict_path`. Returns: True if the attribute dictionary file exists. """ if self.version == 'tmp': return False if not os.path.isfile(self.attr_dict_path): return False glob_attr_dc = AttributeDict(self.attr_dict_path, disable_locking=True) # look-up only, PS 12Dec2018 return self.id in glob_attr_dc @property def voxels_exist(self) -> bool: if self.version == 'tmp': return False if self.type in ['syn', 'syn_ssv']: voxel_dc = VoxelStorageLazyLoading(self.voxel_path) exists = self.id in voxel_dc voxel_dc.close() else: voxel_dc = VoxelStorageDyn(self.voxel_path, read_only=True, disable_locking=True) self.id in voxel_dc return exists @property def voxels(self) -> np.ndarray: """ Voxels associated with this SSV object. Returns: 3D binary array indicating voxel locations. """ if self._voxels is None: return self.load_voxels() else: return self._voxels @property def voxel_list(self) -> np.ndarray: """ Voxels associated with this SSV object. Returns: 2D array with sparse voxel coordinates. """ if self._voxel_list is None: voxel_list = load_voxel_list(self) if self.voxel_caching: self._voxel_list = voxel_list return voxel_list else: return self._voxel_list @property def mesh_exists(self) -> bool: """ Returns: True if mesh exists. """ if self.version == 'tmp': return False mesh_dc = MeshStorage(self.mesh_path, disable_locking=True) return self.id in mesh_dc @property def skeleton_exists(self) -> bool: """ Returns: True if skeleton exists. """ if self.version == 'tmp': return False skeleton_dc = SkeletonStorage(self.skeleton_path, disable_locking=True) return self.id in skeleton_dc @property def mesh(self) -> MeshType: """ Mesh of this object. Returns: Three flat arrays: indices, vertices, normals. """ if self._mesh is None: if self.mesh_caching: self._mesh = load_mesh(self) return self._mesh else: return load_mesh(self) else: return self._mesh @property def skeleton(self) -> dict: """ The skeleton representation of this supervoxel. Returns: Dict of at least three numpy arrays: "nodes", estimated node "diameters" and "edges". """ if self._skeleton is None: if self.skeleton_caching: self._skeleton = load_skeleton(self) return self._skeleton else: return load_skeleton(self) else: return self._skeleton @property def mesh_bb(self) -> np.ndarray: """ Bounding box of the object meshes (in nanometers). Approximately the same as scaled 'bounding_box'. """ if self._mesh_bb is None and 'mesh_bb' in self.attr_dict: self._mesh_bb = self.attr_dict['mesh_bb'] elif self._mesh_bb is None: if len(self.mesh[1]) == 0 or len(self.mesh[0]) == 0: self._mesh_bb = self.bounding_box * self.scaling else: verts = self.mesh[1].reshape(-1, 3) self._mesh_bb = np.array([np.min(verts, axis=0), np.max(verts, axis=0)], dtype=np.float32) return self._mesh_bb @property def mesh_size(self) -> float: """ Length of bounding box diagonal (BBD). Returns: Diagonal length of the mesh bounding box in nanometers. """ return np.linalg.norm(self.mesh_bb[1] - self.mesh_bb[0], ord=2) @property def mesh_area(self) -> float: """ Returns: Mesh surface area in um^2 """ # TODO: decide if caching should be possible mesh_area = self.lookup_in_attribute_dict('mesh_area') if mesh_area is None: mesh_area = mesh_area_calc(self.mesh) if np.isnan(mesh_area) or np.isinf(mesh_area): raise ValueError('Invalid mesh area.') return mesh_area @property def sample_locations_exist(self) -> bool: """ Returns: True if rendering locations have been stored at :py:attr:`~locations_path`. """ if self.version == 'tmp': return False location_dc = CompressedStorage(self.locations_path, disable_locking=True) return self.id in location_dc
[docs] def views_exist(self, woglia: bool, index_views: bool = False, view_key: Optional[str] = None) -> bool: """ True if rendering locations have been stored at :func:`~view_path`. Args: woglia: If True, looks for views without glia, i.e. after astrocyte separation. index_views: If True, refers to index views. view_key: Identifier of the requested views. """ if self.version == 'tmp': return False view_dc = CompressedStorage(self.view_path(woglia=woglia, index_views=index_views, view_key=view_key), disable_locking=True) return self.id in view_dc
[docs] def views(self, woglia: bool, index_views: bool = False, view_key: Optional[str] = None) -> Union[np.ndarray, int]: """ Getter method for the views of this supervoxel. Only valid for cell fragments, i.e. :py:attr:`~type` must be `sv`. Args: woglia: If True, looks for views without glia, i.e. after astrocyte separation. index_views: If True, refers to index views. view_key: Identifier of the requested views. Returns: The requested view array or `-1` if it does not exist. """ assert self.type == "sv" if self._views is None: if self.views_exist(woglia): if self.view_caching: self._views = self.load_views(woglia=woglia, index_views=index_views, view_key=view_key) return self._views else: return self.load_views(woglia=woglia, index_views=index_views, view_key=view_key) else: return -1 else: return self._views
[docs] def sample_locations(self, force=False, save=True, ds_factor=None): """ Getter method for the rendering locations of this supervoxel. Only valid for cell fragments, i.e. :py:attr:`~type` must be `sv`. Args: force: Overwrite existing data. save: If True, saves the result at :py:attr:`~locations_path`. Uses :class:`~syconn.backend.storage.CompressedStorage`. ds_factor: Down sampling factor used to generate the rendering locations. Returns: Array of rendering locations (XYZ) with shape (N, 3) in nanometers! """ assert self.type == "sv" if self.sample_locations_exist and not force: return CompressedStorage(self.locations_path, disable_locking=True)[self.id] else: verts = self.mesh[1].reshape(-1, 3) if len(verts) == 0: # only return scaled rep. coord as [1, 3] array return np.array([self.rep_coord, ], dtype=np.float32) * self.scaling if ds_factor is None: ds_factor = 2000 if self.config.use_new_renderings_locs: coords = generate_rendering_locs(verts, ds_factor).astype(np.float32) else: coords = surface_samples(verts, [ds_factor] * 3, r=ds_factor / 2).astype(np.float32) if save: loc_dc = CompressedStorage(self.locations_path, read_only=False, disable_locking=not self.enable_locking) loc_dc[self.id] = coords.astype(np.float32) loc_dc.push() return coords.astype(np.float32)
[docs] def load_voxels(self, voxel_dc: Optional[Union[VoxelStorageDyn, VoxelStorage]] = None) -> np.ndarray: """ Loader method of :py:attr:`~voxels`. Args: voxel_dc: Pre-loaded dictionary which contains the voxel data of this object. Returns: 3D array of the all voxels which belong to this supervoxel. """ # syn_ssv do not have a segmentation KD; voxels are cached in their VoxelStorage if self.type in ['syn', 'syn_ssv']: vxs_list = self.voxel_list - self.bounding_box[0] voxels = np.zeros(self.bounding_box[1] - self.bounding_box[0] + 1, dtype=np.bool) voxels[vxs_list[..., 0], vxs_list[..., 1], vxs_list[..., 2]] = True else: if voxel_dc is None: voxel_dc = VoxelStorageDyn(self.voxel_path, read_only=True, disable_locking=True) voxels = voxel_dc.get_voxel_data_cubed(self.id)[0] if self.voxel_caching: self._voxels = voxels return voxels
[docs] def load_voxels_downsampled(self, downsampling=(2, 2, 1)): return load_voxels_downsampled(self, ds=downsampling)
[docs] def load_voxel_list(self): """ Loader method of :py:attr:`~voxel_list`. Returns: Sparse, 2-dimensional array of voxel coordinates. """ return load_voxel_list(self)
[docs] def load_voxel_list_downsampled(self, downsampling=(2, 2, 1)): return load_voxel_list_downsampled(self, downsampling=downsampling)
[docs] def load_voxel_list_downsampled_adapt(self, downsampling=(2, 2, 1)): return load_voxel_list_downsampled_adapt(self, downsampling=downsampling)
[docs] def load_skeleton(self, recompute: bool = False) -> dict: """ Loader method of :py:attr:`~skeleton`. Args: recompute: Recompute the skeleton. Currently not implemented. Returns: Dict of flat arrays of indices, vertices, diameters and attributes. """ return load_skeleton(self, recompute=recompute)
[docs] def save_skeleton(self, overwrite: bool = False): """ Save method of :py:attr:`~skeleton`. Args: overwrite: Overwrite existing skeleton entry. Returns: Flat arrays of indices, vertices, normals. """ return save_skeleton(self, overwrite=overwrite)
[docs] def glia_pred(self, thresh: float, pred_key_appendix: str = "") -> int: """ SV glia prediction (0: neuron, 1: glia). Only valid if :py:attr:`type` is `sv`. Args: thresh: Classification threshold. pred_key_appendix: Identifier for specific glia predictions. Only used during development. Returns: The glia prediction of this supervoxel. """ assert self.type == "sv" if self.config.use_point_models: return int(self.glia_proba(pred_key_appendix) >= thresh) return glia_pred_so(self, thresh, pred_key_appendix)
[docs] def glia_proba(self, pred_key_appendix: str = "") -> float: """ SV glia probability (0: neuron, 1: glia). Only valid if :py:attr:`type` is `sv`. Args: pred_key_appendix: Identifier for specific glia predictions. Only used during development. Returns: The glia prediction of this supervoxel. """ assert self.type == "sv" return glia_proba_so(self, pred_key_appendix)
[docs] def axoness_preds(self, pred_key_appendix: str = "") -> np.ndarray: """ Axon prediction (0: dendrite, 1: axon, 2: soma) based on `img2scalar` CMN. Args: pred_key_appendix: Identifier for specific axon predictions. Only used during development. Returns: The axon prediction of this supervoxel at every :py:attr:`~sample_locations`. """ pred = np.argmax(self.axoness_probas(pred_key_appendix), axis=1) return pred
[docs] def axoness_probas(self, pred_key_appendix: str = "") -> np.ndarray: """ Axon probability (0: dendrite, 1: axon, 2: soma) based on `img2scalar` CMN. Probability underlying the attribute :py:attr:`axoness_preds`. Only valid if :py:attr:`type` is `sv`. Args: pred_key_appendix: Identifier for specific axon predictions. Only used during development. Returns: The axon probabilities of this supervoxel at every :py:attr:`~sample_locations`. """ assert self.type == "sv" pred_key = "axoness_probas" + pred_key_appendix if pred_key not in self.attr_dict: self.load_attr_dict() if pred_key not in self.attr_dict: msg = (f"WARNING: Requested axoness {pred_key} for SV {self.id} is not available. Existing " f"keys: {self.attr_dict.keys()}") raise ValueError(msg) return self.attr_dict[pred_key]
# FUNCTIONS
[docs] def total_edge_length(self) -> Union[np.ndarray, float]: """ Total edge length of the supervoxel :py:attr:`~skeleton` in nanometers. Returns: Sum of all edge lengths (L2 norm) in :py:attr:`~skeleton`. """ if self.skeleton is None: self.load_skeleton() nodes = self.skeleton['nodes'].astype(np.float32) edges = self.skeleton['edges'] return np.sum([np.linalg.norm(self.scaling * (nodes[e[0]] - nodes[e[1]])) for e in edges])
[docs] def mesh_from_scratch(self, ds: Optional[Tuple[int, int, int]] = None, **kwargs: dict) -> List[np.ndarray]: """ Calculate the mesh based on :func:`~syconn.proc.meshes.get_object_mesh`. Args: ds: Downsampling of the object's voxel data. **kwargs: Key word arguments passed to :func:`~syconn.proc.meshes.triangulation`. Returns: """ _supported_types = ['syn_ssv', 'syn', 'cs_ssv', 'cs'] if self.type not in _supported_types: raise ValueError(f'"mesh_from_scratch" does not support type "{self.type}". Supported types: ' f'{_supported_types}') if ds is None: ds = self.config['meshes']['downsampling'][self.type] return meshes.get_object_mesh(self, ds, mesher_kwargs=kwargs)
def _save_mesh(self, ind: np.ndarray, vert: np.ndarray, normals: np.ndarray): """ Save given mesh at :py:attr:`~mesh_path`. Uses the :class:`~syconn.backend.storage.MeshStorage` interface. Args: ind: Flat index array. vert: Flat vertex array. normals: Flat normal array. """ mesh_dc = MeshStorage(self.mesh_path, read_only=False, disable_locking=not self.enable_locking) mesh_dc[self.id] = [ind, vert, normals] mesh_dc.push()
[docs] def mesh2kzip(self, dest_path: str, ext_color: Optional[Union[ Tuple[int, int, int, int], List, np.ndarray]] = None, ply_name: str = ""): """ Write :py:attr:`~mesh` to k.zip. Args: dest_path: Path to the k.zip file which contains the :py:attr:`~mesh`. ext_color: If set to 0 no color will be written out. Use to adapt color inKnossos. ply_name: Name of the ply file in the k.zip, must not end with `.ply`. """ mesh = self.mesh if self.type == "sv": color = (130, 130, 130, 160) elif self.type == "cs": color = (100, 200, 30, 255) elif self.type == "syn": color = (150, 50, 200, 255) elif self.type == "syn_ssv": color = (240, 50, 50, 255) elif self.type == "cs_ssv": color = (100, 200, 30, 255) elif self.type == "sj": color = (int(0.849 * 255), int(0.138 * 255), int(0.133 * 255), 255) elif self.type == "vc": color = (int(0.175 * 255), int(0.585 * 255), int(0.301 * 255), 255) elif self.type == "mi": color = (0, 153, 255, 255) else: raise TypeError("Color for bbject type '{}' does not exist." "".format(self.type)) color = np.array(color, dtype=np.uint8) if ext_color is not None: if ext_color == 0: color = None else: color = ext_color if ply_name == "": ply_name = str(self.id) meshes.write_mesh2kzip(dest_path, mesh[0], mesh[1], mesh[2], color, ply_fname=ply_name + ".ply")
[docs] def mergelist2kzip(self, dest_path: str): """ Writes the supervoxel agglomeration to a KNOSSOS compatible format. Args: dest_path: Path to k.zip file. """ self.load_attr_dict() kml = knossos_ml_from_svixs([self.id], coords=[self.rep_coord]) write_txt2kzip(dest_path, kml, "mergelist.txt")
[docs] def load_views(self, woglia: bool = True, raw_only: bool = False, ignore_missing: bool = False, index_views: bool = False, view_key: Optional[str] = None): """ Loader method of :py:attr:`~views`. Args: woglia: If True, looks for views without glia, i.e. after astrocyte separation. index_views: If True, refers to index views. view_key: Identifier of the requested views. raw_only: If True, ignores cell organelles projections. ignore_missing: If True, will not throw ValueError if views do not exist. Returns: Views with requested properties. """ view_p = self.view_path(woglia=woglia, index_views=index_views, view_key=view_key) view_dc = CompressedStorage(view_p, disable_locking=not self.enable_locking) try: views = view_dc[self.id] except KeyError as e: if ignore_missing: log_reps.warning("Views of SV {} were missing. Skipping.".format(self.id)) views = np.zeros((0, 4, 2, 128, 256), dtype=np.uint8) else: raise KeyError(e) if raw_only: views = views[:, :1] return views
[docs] def save_views(self, views: np.ndarray, woglia: bool = True, cellobjects_only: bool = False, index_views: bool = False, view_key: Optional[str] = None, enable_locking: Optional[bool] = None): """ Saves views according to its properties. If view_key is given it has to be a special type of view, e.g. spine predictions. If in this case any other kwarg is not set to default it will raise an error. Todo: * remove `cellobjects_only`. Args: woglia: If True, looks for views without glia, i.e. after astrocyte separation. index_views: If True, refers to index views. view_key: Identifier of the requested views. views: View array. cellobjects_only: Only render cell organelles (deprecated). enable_locking: Enable file locking. """ if not (woglia and not cellobjects_only and not index_views) and view_key is not None: raise ValueError('If views are saved to custom key, all other settings have to be defaults!') if enable_locking is None: enable_locking = self.enable_locking view_dc = CompressedStorage(self.view_path(woglia=woglia, index_views=index_views, view_key=view_key), read_only=False, disable_locking=not enable_locking) if cellobjects_only: assert self.id in view_dc, "SV must already contain raw views " \ "if adding views for cellobjects only." view_dc[self.id] = np.concatenate([view_dc[self.id][:, :1], views], axis=1) else: view_dc[self.id] = views view_dc.push()
[docs] def load_attr_dict(self) -> int: """ Loader method of :py:attr:`~attr_dict`. Returns: 0 if successful, -1 if attribute dictionary storage does not exist. """ try: glob_attr_dc = AttributeDict(self.attr_dict_path, disable_locking=True) # disable locking, PS 07June2019 self.attr_dict = glob_attr_dc[self.id] except (IOError, EOFError) as e: log_reps.critical("Could not load SSO attributes at {} due to " "{}.".format(self.attr_dict_path, e)) return -1
[docs] def save_attr_dict(self): """ Saves :py:attr:`~attr_dict` to attr:`~attr_dict_path`. Already existing dictionary will be updated. """ glob_attr_dc = AttributeDict(self.attr_dict_path, read_only=False, disable_locking=not self.enable_locking) if self.id in glob_attr_dc: orig_dc = glob_attr_dc[self.id] orig_dc.update(self.attr_dict) else: orig_dc = self.attr_dict glob_attr_dc[self.id] = orig_dc glob_attr_dc.push()
[docs] def save_attributes(self, attr_keys: List[str], attr_values: List[Any]): """ Writes attributes to attribute storage. Ignores :py:attr:`~attr_dict`. Values have to be serializable and will be written via the :class:`~syconn.backend.storage.AttributeDict` interface. Args: attr_keys: List of attribute keys which will be written to :py:attr:`~attr_dict_path`. attr_values: List of attribute values which will be written to :py:attr:`~attr_dict_path`. """ if not hasattr(attr_keys, "__len__"): attr_keys = [attr_keys] if not hasattr(attr_values, "__len__"): attr_values = [attr_values] assert len(attr_keys) == len(attr_values), "Key-value lengths did not" \ " agree while saving attri" \ "butes of SSO %d." % self.id glob_attr_dc = AttributeDict(self.attr_dict_path, read_only=False, disable_locking=not self.enable_locking) for k, v in zip(attr_keys, attr_values): glob_attr_dc[self.id][k] = v glob_attr_dc.push()
[docs] def load_attributes(self, attr_keys: List[str]) -> List[Any]: """ Reads attributes from attribute storage. It will ignore self.attr_dict and will always pull it from the storage. Does not throw KeyError, but returns None for missing keys. Args: attr_keys: List of attribute keys which will be loaded from :py:attr:`~attr_dict_path`. Returns: Attribute values corresponding to `attr_keys` """ glob_attr_dc = AttributeDict(self.attr_dict_path, read_only=True, disable_locking=not self.enable_locking) return [glob_attr_dc[self.id][attr_k] if attr_k in glob_attr_dc[self.id] else None for attr_k in attr_keys]
[docs] def attr_exists(self, attr_key: str) -> bool: """ Checks if `attr_key` exists in either :py:attr:`~attr_dict` or at :py:attr:`~attr_dict_path`. Args: attr_key: Attribute key to look for. Returns: True if attribute exists, False otherwise. """ if len(self.attr_dict) == 0: self.load_attr_dict() try: _ = self.attr_dict[attr_key] except (KeyError, EOFError): return False return True
[docs] def lookup_in_attribute_dict(self, attr_key: str) -> Any: """ Returns Args: attr_key: Attribute key to look for. Returns: Value of `attr_key` in :py:attr:`~attr_dict` or None if it does not exist. If key does not exist in :py:attr:`~attr_dict`, tries to load from :py:attr:`~attr_dict_path`. """ if len(self.attr_dict) == 0: self.load_attr_dict() if self.attr_exists(attr_key): return self.attr_dict[attr_key] else: return None
[docs] def calculate_rep_coord(self, voxel_dc: Optional[Dict[int, np.ndarray]] = None): """ Calculate/loads supervoxel representative coordinate. Args: voxel_dc: Pre-loaded dictionary which contains the voxel data of this object. """ if voxel_dc is None: if self.type in ['syn', 'syn_ssv']: voxel_dc = VoxelStorageLazyLoading(self.voxel_path) else: voxel_dc = VoxelStorageDyn(self.voxel_path, read_only=True, disable_locking=True) if self.id not in voxel_dc: self._bounding_box = np.array([[-1, -1, -1], [-1, -1, -1]]) log_reps.warning("No voxels found in VoxelDict!") return if isinstance(voxel_dc, VoxelStorageDyn): self._rep_coord = voxel_dc.object_repcoord(self.id) return elif isinstance(voxel_dc, VoxelStorageLazyLoading): self._rep_coord = voxel_dc[self.id][len(voxel_dc[self.id]) // 2] # any rep coord return else: raise ValueError(f'Invalid voxel storage class: {type(voxel_dc)}')
[docs] def calculate_bounding_box(self, voxel_dc: Optional[Dict[int, np.ndarray]] = None): """ Calculate supervoxel :py:attr:`~bounding_box`. Args: voxel_dc: Pre-loaded dictionary which contains the voxel data of this object. """ if voxel_dc is None: if self.type in ['syn', 'syn_ssv']: voxel_dc = VoxelStorageLazyLoading(self.voxel_path) else: voxel_dc = VoxelStorageDyn(self.voxel_path, read_only=True, disable_locking=True) if not isinstance(voxel_dc, VoxelStorageDyn): _ = self.load_voxels(voxel_dc=voxel_dc) else: bbs = voxel_dc.get_boundingdata(self.id) bb = np.array([bbs[:, 0].min(axis=0), bbs[:, 1].max(axis=0)]) self._bounding_box = bb
[docs] def calculate_size(self, voxel_dc: Optional[Union[VoxelStorageDyn, VoxelStorage]] = None): """ Calculate supervoxel object :py:attr:`~size`. Args: voxel_dc: Pre-loaded dictionary which contains the voxel data of this object. """ if voxel_dc is None: if self.type in ['syn', 'syn_ssv']: voxel_dc = VoxelStorageLazyLoading(self.voxel_path) else: voxel_dc = VoxelStorageDyn(self.voxel_path, read_only=True, disable_locking=True) if not isinstance(voxel_dc, VoxelStorageDyn): _ = self.load_voxels(voxel_dc=voxel_dc) else: size = voxel_dc.object_size(self.id) self._size = size
[docs] def save_kzip(self, path: str, kd: Optional[knossosdataset.KnossosDataset] = None, write_id: Optional[int] = None): """ Write supervoxel segmentation to k.zip. Todo: * Broken, segmentation not rendered in K. Args: path: kd: write_id: Supervoxel ID. """ if write_id is None: write_id = self.id if kd is None: try: kd = kd_factory(self.config.kd_seg_path) except: raise ValueError("KnossosDataset could not be loaded") kd.save_to_kzip(offset=self.bounding_box[0], data=self.voxels.astype(np.uint64).swapaxes(0, 2) * write_id, kzip_path=path, data_mag=1, mags=[1])
[docs] def clear_cache(self): """ Clears the following, cached data: * :py:attr:`~voxels` * :py:attr:`~voxel_list` * :py:attr:`~views` * :py:attr:`~skeleton` """ self._voxels = None self._voxel_list = None self._mesh = None self._views = None self._skeleton = None
# SKELETON @property def skeleton_dict_path(self) -> str: """ Returns: Path to skeleton storage. """ return self.segobj_dir + "/skeletons.pkl"
[docs] def copy2dir(self, dest_dir, safe=True): """ Examples: To copy the content of this SV object (``sv_orig``) to the destination of another (e.g. yet not existing) SV (``sv_target``), call ``sv_orig.copy2dir(sv_target.segobj_dir)``. All files contained in the directory py:attr:`~segobj_dir` of ``sv_orig`` will be copied to ``sv_target.segobj_dir``. Args: dest_dir: Destination directory where all files contained in py:attr:`~segobj_dir` will be copied to. safe: If ``True``, will not overwrite existing data. """ # get all files in home directory fps = get_filepaths_from_dir(self.segobj_dir, ending="") fnames = [os.path.split(fname)[1] for fname in fps] if not os.path.isdir(dest_dir): os.makedirs(dest_dir) for i in range(len(fps)): src_filename = fps[i] dest_filename = dest_dir + "/" + fnames[i] try: safe_copy(src_filename, dest_filename, safe=safe) except Exception as e: log_reps.warning("{}. Skipped {}.".format(e, fnames[i])) pass # copy attr_dict values self.load_attr_dict() if os.path.isfile(dest_dir + "/attr_dict.pkl"): dest_attr_dc = load_pkl2obj(dest_dir + "/attr_dict.pkl") else: dest_attr_dc = {} # overwrite existing keys in the destination attribute dict dest_attr_dc.update(self.attr_dict) self.attr_dict = dest_attr_dc self.save_attr_dict()
[docs] def split_component(self, dist, new_sd, new_id): """ Todo: * refactor -> VoxelStorageDyn Args: dist: new_sd: new_id: Returns: """ raise NotImplementedError('WORK IN PROGRESS') kdtree = spatial.cKDTree(self.voxel_list) graph = nx.from_edgelist(kdtree.query_pairs(dist)) ccs = list(nx.connected_components(graph)) partner_ids = [self.id - ((self.id >> 32) << 32), self.id >> 32] if len(ccs) == 1: new_so_obj = new_sd.get_segmentation_object(new_id, create=True) new_id += 1 new_so_obj.attr_dict["paths_to_voxels"] = self.paths_to_voxels new_so_obj.attr_dict["%s_partner_ids" % self.type] = partner_ids new_so_obj.save_attr_dict() else: for cc in ccs: new_so_obj = new_sd.get_segmentation_object(new_id, create=True) new_so_obj.attr_dict["%s_partner_ids" % self.type] = partner_ids new_so_obj.save_attr_dict() new_id += 1 voxel_ids = np.array(list(cc), dtype=np.int32) this_voxel_list = self.voxel_list[voxel_ids] bb = [np.min(this_voxel_list, axis=0), np.max(this_voxel_list, axis=0)] this_voxel_list -= bb[0] this_voxels = np.zeros(bb[1] - bb[0] + 1, dtype=np.bool) this_voxels[this_voxel_list[:, 0], this_voxel_list[:, 1], this_voxel_list[:, 2]] = True save_voxels(new_so_obj, this_voxels, bb[0])
[docs]class SegmentationDataset(SegmentationBase): """ This class represents a set of supervoxels. Examples: To initialize the :class:`~syconn.reps.segmentation.SegmentationDataset` for cell supervoxels you need to call ``sd_cell = SegmentationDataset('sv')``. This requires an initialized working directory, for this please refer to :class:`~syconn.handler.config.DynConfig` or see:: $ python SyConn/scripts/example_runs/start.py After successfully executing :class:`~syconn.exec.exec_init.init_cell_subcell_sds`, *cell* supervoxel properties can be loaded from numpy arrays via the following keys: * 'id': ID array, identical to :py:attr:`~ids`. * 'bounding_box': Bounding box of every SV. * 'size': Number voxels of each SV. * 'rep_coord': Representative coordinates for each SV. * 'mesh_area': Surface area as computed from the object mesh triangles. * 'mapping_sj_ids': Synaptic junction objects which overlap with the respective SVs. * 'mapping_sj_ratios': Overlap ratio of the synaptic junctions. * 'mapping_vc_ids': Vesicle cloud objects which overlap with the respective SVs. * 'mapping_vc_ratios': Overlap ratio of the vesicle clouds. * 'mapping_mi_ids': Mitochondria objects which overlap with the respective SVs. * 'mapping_mi_ratios': Overlap ratio of the mitochondria. If astrocyte separation is performed, the following attributes will be stored as numpy array as well: * 'glia_probas': Glia probabilities as array of shape (N, 2; N: Rendering locations, 2: 0-index=neuron, 1-index=glia). The 'mapping' attributes are only computed for cell supervoxels and not for cellular organelles (e.g. 'mi', 'vc', etc.; see :py:attr:`~syconn.global_params.config['process_cell_organelles']`). For the :class:`~syconn.reps.segmentation.SegmentationDataset` of type 'syn_ssv' (which represent the actual synapses between two cell reconstructions), the following properties are stored as numpy arrays: * 'id': ID array, identical to :py:attr:`~ids`. * 'bounding_box': Bounding box of every SV. * 'size': Number voxels of each SV. * 'rep_coord': Representative coordinates of each SV. * 'mesh_area': Surface area as computed from the object mesh triangles. * 'mesh_bb': Bounding box of the object meshes (in nanometers). Approximately the same as scaled 'bounding_box'. * 'latent_morph': Latent morphology vector at each rendering location; predicted by the tCMN. * 'neuron_partners': IDs of the two :class:`~syconn.reps.super_segmentation_object.SuperSegmentationObject` forming the synapse. The ordering of the subsequent 'partner' attributes is identical to 'neuron_partners', e.g. 'neuron_partners'=[3, 49] and 'partner_celltypes'=[0, 1] means that SSV with ID 3 is an excitatory axon targeting the MSN SSV with ID 49. * 'partner_celltypes': Celltypes of the two SSVs. * 'partner_spiness': Spine predictions (0: neck, 1: head, 2: shaft, 3: other) of the two sites. * 'partner_axoness': Compartment predictions (0: dendrite, 1: axon, 2: soma, 3: en-passant bouton, 4: terminal bouton) of the two sites. * 'syn_prob': Synapse probability as inferred by the RFC (see corresponding section the documentation). * 'asym_prop': Mean probability of the 'syn_ssv' object voxels for the asymmetric type. See :func:`~syconn.extraction.cs_processing_steps._extract_synapse_type_thread` . * 'sym_prop': Mean probability of the 'syn_ssv' object voxels for the symmetric type. See :func:`~syconn.extraction.cs_processing_steps._extract_synapse_type_thread` . * 'syn_type_sym_ratio': ``sym_prop / float(asym_prop + sym_prop)``. See :func:`~syconn.extraction.cs_processing_steps._extract_synapse_type_thread` . * 'syn_sign': Synaptic "sign" (-1: symmetric, +1: asymmetric). For threshold see :py:attr:`~syconn.global_params.config['cell_objects']['sym_thresh']` . * 'cs_ids': Contact site IDs associated with each 'syn_ssv' synapse. """ def __init__(self, obj_type: str, version: Optional[Union[str, int]] = None, working_dir: Optional[str] = None, scaling: Optional[Union[List, Tuple, np.ndarray]] = None, version_dict: Optional[Dict[str, str]] = None, create: bool = False, config: Optional[Union[str, DynConfig]] = None, n_folders_fs: Optional[int] = None, cache_properties: Optional[List[str]] = None): """ Args: obj_type: Type of :class:`~syconn.reps.segmentation.SegmentationObject`, e.g.: 'vc', 'sj', 'mi', 'cs', 'sv'. version: Version of dataset to distinguish it from others of the same type. working_dir: Path to the working directory. scaling: Scaling of the raw data to nanometer version_dict: Dictionary which contains the versions of other dataset types which share the same working directory. create: Whether or not to create this dataset's directory. config: Config. object, see :class:`~syconn.handler.config.DynConfig`. Will be copied and then fixed by setting :py:attr:`~syconn.handler.config.DynConfig.fix_config` to True. n_folders_fs: Number of folders within the dataset's folder structure. cache_properties: Use numpy arrays to populate the specified object properties when initializing :py:class:`~syconn.reps.segmentation.SegmentationObject` via :py:func:`~get_segmentation_object`. """ self._type = obj_type self._n_folders_fs = n_folders_fs self._sizes = None self._ids = None self._rep_coords = None self._config = config self._soid2ix = None self._property_cache = dict() self._version = None if cache_properties is None: cache_properties = tuple() if n_folders_fs is not None: if n_folders_fs not in [10 ** i for i in range(6)]: raise Exception("n_folders_fs must be in", [10 ** i for i in range(6)]) if version == 'temp': version = 'tmp' self._setup_working_dir(working_dir, config, version, scaling) if version is not 'tmp' and self._config is not None: self._config = copy.copy(self._config) self._config.fix_config = True if create and (version is None): version = 'new' if version is None and create is False: try: self._version = self.config["versions"][self.type] except KeyError: raise Exception(f"Unclear version '{version}' during initialization of {self}.") elif version == "new": other_datasets = \ glob.glob(self.working_dir + "/%s_[0-9]" % self.type) + \ glob.glob(self.working_dir + "/%s_[0-9][0-9]" % self.type) + \ glob.glob(self.working_dir + "/%s_[0-9][0-9][0-9]" % self.type) max_version = -1 for other_dataset in other_datasets: other_version = \ int(re.findall(r"[\d]+", os.path.basename(other_dataset.strip('/')))[-1]) if max_version < other_version: max_version = other_version self._version = max_version + 1 else: self._version = version if version_dict is None: try: self.version_dict = self.config["versions"] except KeyError: raise Exception("No version dict specified in config") else: if isinstance(version_dict, dict): self.version_dict = version_dict elif isinstance(version_dict, str) and version_dict == "load": if self.version_dict_exists: self.load_version_dict() else: raise Exception("No version dict specified in config") if create: os.makedirs(self.path, exist_ok=True) os.makedirs(self.so_storage_path, exist_ok=True) self.enable_property_cache(cache_properties) def __repr__(self): return (f'{type(self).__name__}(obj_type="{self.type}", version="{self.version}", ' f'working_dir="{self.working_dir}")') @property def type(self) -> str: """ The type of :class:`~syconn.reps.segmentation.SegmentationObject`s contained in this :class:`~syconn.reps.segmentation.SegmentationDataset`. Returns: String identifier of the object type. """ return self._type @property def n_folders_fs(self) -> int: """ Returns: The number of folders in this :class:`~syconn.reps.segmentation.SegmentationDataset` directory tree. """ if self._n_folders_fs is None: ps = glob.glob("%s/%s*/" % (self.path, self.so_storage_path_base)) if len(ps) == 0: raise Exception("No storage folder found at '{}' and no number of " "subfolders specified (n_folders_fs))".format(self.path)) bp = os.path.basename(ps[0].strip('/')) for p in ps: bp = os.path.basename(p.strip('/')) if bp == self.so_storage_path_base: bp = os.path.basename(p.strip('/')) break if bp == self.so_storage_path_base: self._n_folders_fs = 100000 else: self._n_folders_fs = int(re.findall(r'[\d]+', bp)[-1]) return self._n_folders_fs @property def working_dir(self) -> str: """ Returns: The working directory of this :class:`~syconn.reps.segmentation.SegmentationDataset`. """ return self._working_dir @property def version(self) -> str: """ Returns: String identifier of the version. """ return str(self._version) @property def path(self) -> str: """ Returns: The path to this :class:`~syconn.reps.segmentation.SegmentationDataset`. """ return "%s/%s_%s/" % (self._working_dir, self.type, self.version) @property def exists(self) -> bool: """ Checks whether :py:attr:`~path` exists. """ return os.path.isdir(self.path) @property def path_sizes(self) -> str: """ Path to the cache array of the object voxel sizes. Returns: Path to the numpy file. """ return self.path + "/sizes.npy" @property def path_rep_coords(self) -> str: """ Path to the cache array of the object representative coordinates. Returns: Path to the numpy file. """ return self.path + "/rep_coords.npy" @property def path_ids(self) -> str: """ Path to the cache array of the object IDs. Returns: Path to the numpy file. """ return self.path + "/ids.npy" @property def version_dict_path(self) -> str: """ Path to the version dictionary pickle file. Returns: Path to the pickle file. """ return self.path + "/version_dict.pkl" @property def version_dict_exists(self) -> bool: """ Checks whether :py:attr:`~version_dict_path` exists. """ return os.path.exists(self.version_dict_path) @property def so_storage_path_base(self) -> str: """ Name of the base of the root folder (``'so_storage'``). """ return "so_storage" @property def so_storage_path(self) -> str: """ Path to the root folder. """ if self._n_folders_fs is None and os.path.exists("%s/so_storage/" % self.path): return "%s/so_storage/" % self.path elif self._n_folders_fs == 100000 and os.path.exists("%s/so_storage/" % self.path): return "%s/so_storage/" % self.path else: return "%s/%s_%d/" % (self.path, self.so_storage_path_base, self.n_folders_fs) @property def so_dir_paths(self) -> List[str]: """ Sorted paths to all supervoxel object directories in the directory tree :py:attr:`~so_storage_path`. """ depth = int(np.log10(self.n_folders_fs) // 2 + np.log10(self.n_folders_fs) % 2) p = "".join([self.so_storage_path] + ["/*" for _ in range(depth)]) return sorted(glob.glob(p))
[docs] def iter_so_dir_paths(self) -> Iterator[str]: """ Iterator over all possible `SegmentationObject` storage base directories. Notes: In contrast to :attr:`~so_dir_paths` this iterator may return paths to storages that do not exist, in the case that no object fell into its ID bucket. Returns: Path to ID storage base folder. """ storage_location_ids = get_unique_subfold_ixs(self.n_folders_fs) for ix in storage_location_ids: yield self.so_storage_path + subfold_from_ix(ix, self.n_folders_fs)
@property def config(self) -> DynConfig: """ The configuration object which contain all dataset-specific parameters. See :class:`~syconn.handler.config.DynConfig`. Returns: The configuration object. """ if self._config is None: self._config = global_params.config return self._config @property def sizes(self) -> np.ndarray: """ Returns: A size array of all supervoxel which are part of this dataset. The ordering of the returned array will correspond to :py:attr:`~ids`. """ if self._sizes is None: if os.path.exists(self.path_sizes): self._sizes = np.load(self.path_sizes) else: msg = "sizes were not calculated... Please run dataset_analysis" log_reps.error(msg) raise ValueError(msg) return self._sizes @property def rep_coords(self) -> np.ndarray: """ Returns: Representative coordinates of all supervoxel which are part of this dataset. The ordering of the returned array will correspond to :py:attr:`~ids`. """ if self._rep_coords is None: if os.path.exists(self.path_rep_coords): self._rep_coords = np.load(self.path_rep_coords) else: msg = "rep_coords were not calculated... Please run dataset_analysis" log_reps.error(msg) raise ValueError(msg) return self._rep_coords @property def ids(self) -> np.ndarray: """ Returns: All supervoxel IDs which are part of this dataset. """ if self._ids is None: acquire_obj_ids(self) return self._ids @property def scaling(self) -> np.ndarray: """ Returns: Voxel size in nanometers (XYZ). """ if self._scaling is None: self._scaling = np.array(self.config['scaling'], dtype=np.float32) return self._scaling @property def sos(self) -> Generator[SegmentationObject, None, None]: """ Generator for all :class:`~syconn.reps.segmentation.SegmentationObject` objects associated with this dataset. Yields: :class:`~syconn.reps.segmentation.SegmentationObject` """ ix = 0 tot_nb_sos = len(self.ids) while ix < tot_nb_sos: yield self.get_segmentation_object(self.ids[ix]) ix += 1
[docs] def load_numpy_data(self, prop_name, allow_nonexisting: bool = True) -> np.ndarray: """ Load cached array. The ordering of the returned array will correspond to :py:attr:`~ids`. Todo: * remove 's' appendix in file names. * remove 'celltype' replacement for 'celltype_cnn_e3' as soon as 'celltype_cnn_e3' was renamed package-wide Args: prop_name: Identifier of the requested cache array. allow_nonexisting: If False, will fail for missing numpy files. Returns: numpy array of property `prop_name`. """ if prop_name == 'celltype': prop_name = 'celltype_cnn_e3' if os.path.exists(self.path + prop_name + "s.npy"): return np.load(self.path + prop_name + "s.npy", allow_pickle=True) else: msg = f'Requested data cache "{prop_name}" did not exist in {self}.' if not allow_nonexisting: log_reps.error(msg) raise FileNotFoundError(msg) log_reps.warning(msg)
[docs] def get_segmentationdataset(self, obj_type: str) -> 'SegmentationDataset': """ Factory method for :class:`~syconn.reps.segmentation.SegmentationDataset` which are part of this dataset. Args: obj_type: Dataset of supervoxels with type `obj_type`. Returns: The requested :class:`~syconn.reps.segmentation.SegmentationDataset` object. """ if obj_type not in self.version_dict: raise ValueError('Requested object type {} not part of version_dict ' '{}.'.format(obj_type, self.version_dict)) return SegmentationDataset(obj_type, version=self.version_dict[obj_type], working_dir=self.working_dir)
[docs] def get_segmentation_object(self, obj_id: Union[int, List[int]], create: bool = False, **kwargs) -> Union[SegmentationObject, List[SegmentationObject]]: """ Factory method for :class:`~syconn.reps.segmentation.SegmentationObject` which are part of this dataset. Args: obj_id: Supervoxel ID. create: If True, creates the folder hierarchy down to the requested supervoxel. Returns: The requested :class:`~syconn.reps.segmentation.SegmentationObject` object. """ if np.isscalar(obj_id): return self._get_segmentation_object(obj_id, create, **kwargs) else: res = [] for ix in obj_id: obj = self._get_segmentation_object(ix, create, **kwargs) res.append(obj) return res
def _get_segmentation_object(self, obj_id: int, create: bool, **kwargs) -> SegmentationObject: """ Initialize :py:class:`~SegmentationObject`. Args: obj_id: Object ID. create: Create folder structure. Default: False. Returns: Supervoxel object. """ kwargs_def = dict(obj_id=obj_id, obj_type=self.type, version=self.version, working_dir=self.working_dir, scaling=self.scaling, create=create, n_folders_fs=self.n_folders_fs, config=self.config) kwargs_def.update(kwargs) so = SegmentationObject(**kwargs_def) for k, v in self._property_cache.items(): so.attr_dict[k] = v[self.soid2ix[obj_id]] return so
[docs] def save_version_dict(self): """ Save the version dictionary to the `.pkl` file. """ write_obj2pkl(self.version_dict_path, self.version_dict)
[docs] def load_version_dict(self): """ Load the version dictionary from the `.pkl` file. """ try: self.version_dict = load_pkl2obj(self.version_dict_path) except Exception as e: raise FileNotFoundError('Version dictionary of SegmentationDataset not found. {}'.format(str(e)))
@property def soid2ix(self): if self._soid2ix is None: self._soid2ix = {k: ix for ix, k in enumerate(self.ids)} return self._soid2ix
[docs] def enable_property_cache(self, property_keys: Iterable[str]): """ Add properties to cache. Args: property_keys: Property keys. Numpy cache arrays must exist. """ # look-up for so IDs to index in cache arrays property_keys = list(property_keys) # copy for k in self._property_cache: if k in property_keys: property_keys.remove(k) if len(property_keys) == 0: return # init index array _ = self.soid2ix self._property_cache.update({k: self.load_numpy_data(k, allow_nonexisting=False) for k in property_keys})
[docs] def get_volume(self, source: str = 'total') -> float: """ Calculate the RAG volume. Args: source: Allowed sources: 'total' (all SVs contained in SegmentationDataset('sv')), 'neuron' (use glia-free RAG), 'glia' (use glia RAG). Returns: Volume in mm^3. """ self.enable_property_cache(['size']) if source == 'neuron': g = nx.read_edgelist(global_params.config.pruned_svgraph_path, nodetype=np.uint64) svids = g.nodes() elif source == 'glia': g = nx.read_edgelist(global_params.config.working_dir + "/glia/astrocyte_svgraph.bz2", nodetype=np.uint64) svids = g.nodes() elif source == 'total': svids = self.ids else: raise ValueError(f'Unknown source type "{source}".') total_size = 0 for svid in svids: total_size += self.get_segmentation_object(svid).size total_size_cmm = np.prod(self.scaling) * total_size / 1e18 return total_size_cmm