Source code for syconn.reps.super_segmentation_object

# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max Planck Institute of Neurobiology, Martinsried, Germany
# Authors: Philipp Schubert, Joergen Kornfeld
import glob
import os
import re
import shutil
import time
from collections import Counter, defaultdict
from typing import Optional, Dict, List, Tuple, Union, Iterable, Any, TYPE_CHECKING
import pickle as pkl

import networkx as nx
import numpy as np
import scipy.spatial
from scipy import spatial

from . import super_segmentation_helper as ssh
from .rep_helper import knossos_ml_from_sso, colorcode_vertices, knossos_ml_from_svixs, subfold_from_ix_SSO, \
    SegmentationBase
from .segmentation import SegmentationObject, SegmentationDataset
from .segmentation_helper import load_so_attr_bulk
from .. import global_params
from ..backend.storage import CompressedStorage, MeshStorage
from ..handler.basics import write_txt2kzip, get_filepaths_from_dir, safe_copy, coordpath2anno, load_pkl2obj, \
    write_obj2pkl, flatten_list, chunkify, data2kzip
from ..handler.config import DynConfig
from ..handler.prediction import certainty_estimate
from ..mp import batchjob_utils as qu
from ..mp import mp_utils as sm
from ..proc.graphs import split_glia, split_subcc_join, create_graph_from_coords
from ..proc.meshes import write_mesh2kzip, merge_someshes, compartmentalize_mesh, mesh2obj_file, write_meshes2kzip, \
    _calc_pca_components
from ..proc.rendering import render_sampled_sso, load_rendering_func, render_sso_coords, render_sso_coords_index_views
from ..proc.image import normalize_img
from ..proc.sd_proc import predict_sos_views
from ..reps import log_reps

if TYPE_CHECKING:
    from .super_segmentation_dataset import SuperSegmentationDataset
from knossos_utils import skeleton
from knossos_utils.skeleton_utils import load_skeleton as load_skeleton_kzip
from knossos_utils.skeleton_utils import write_skeleton as write_skeleton_kzip

try:
    from knossos_utils import mergelist_tools
except ImportError:
    from knossos_utils import mergelist_tools_fallback as mergelist_tools
from syconn.proc.graphs import stitch_skel_nx

MeshType = Union[Tuple[np.ndarray, np.ndarray, np.ndarray], List[np.ndarray],
                 Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]


[docs]class SuperSegmentationObject(SegmentationBase): """ Class instances represent individual neuron reconstructions, defined by a list of agglomerated supervoxels (see :class:`~syconn.reps.segmentation.SegmentationObject`). This class is used to create a cell reconstruction object after successful execution of `run_create_neuron_ssd`. It can be instantiated directly with an SSV ID or via the `SuperSegmentationDataset`. The class provides methods to access and manipulate various attributes and properties related to the neuron reconstruction, such as meshes, skeletons, and segmentation objects for different cellular components. Examples: This class can be used to create a cell reconstruction object after successful executing :func:`~syconn.exec.exec_inference.run_create_neuron_ssd` as follows:: from syconn import global_params # import SuperSegmentationObject and SuperSegmentationDataset from syconn.reps.super_segmentation import * # set the current working directory SyConn-wide global_params.wd = '~/SyConn/example_cube1/' ssd = SuperSegmentationDataset() cell_reconstr_ids = ssd.ssv_ids # call constructor explicitly ... cell = SuperSegmentationObject(cell_reconstr_ids[0]) # ... or via the SuperSegmentationDataset - both contain the same data cell = ssd.get_super_segmentation_object(cell_reconstr_ids[0]) # inspect existing attributes cell.load_attr_dict() print(cell.attr_dict.keys()) To cache SegmentationObject attributes use the ``cache_properties`` argument during initialization of the :class:`~syconn.reps.segmentation.SegmentationDataset` and pass it on to the ``SuperSegmentationDataset`` instantiation: sd_mi = SegmentationDataset(obj_type='mi', cache_properties=['rep_coord']) ssd = SuperSegmentationDataset(sd_lookup=dict(mi=sd_mi)) ssv = ssd.get_super_segmentation_object(ssd.ssv_ids[0]) # :class:`~syconn.reps.segmentation.SegmentationObject` from ``mis`` don't require loading ``rep_coord`` # from its storage file. for mi in ssv.mis: rc = mi.rep_coord # normally this requires to load the attribute dict storage file. Subsequent analysis steps (see the ``SyConn/scripts/example_run/start.py``) augment the cell reconstruction with more properties:: # to iterate over all cell reconstructions use the generator: for cell in ssd.ssvs: # e.g. collect some analysis results cell.load_attr_dict() n_synapses = len(cell.syn_ssv) celltype = cell.attr_dict["celltype_cnn_e3"] ... # write out cell mesh cell.mesh2kzip('~/cell{}_mesh.k.zip'.format(cell.id)) # write out cell mesh and meshes of all existing cell organelles cell.meshes2kzip('~/cell{}_meshes.k.zip'.format(cell.id)) # color the cell mesh according to a semantic prediction cell.semseg2mesh(semseg_key='spiness', dest_path='~/cell{}_spines.k.zip'.format(cell.id)) See also ``SyConn/docs/api.md`` (WIP). Attributes: attr_dict: Attribute dictionary which serves as a general-purpose container. Accessed via the :class:`~syconn.backend.storage.AttributeDict` interface. After successfully executing :func:`syconn.exec.exec_init.run_create_neuron_ssd` and subsequent analysis steps (see the ``SyConn/scripts/example_run/start.py``) the following keys are present in :attr:`~attr_dict`: * 'id': ID array, identical to :py:attr:`~ssv_ids`. All other properties have the same ordering as this array, i.e. if SSV with ID 1234 has index 42 in the 'id'-array you will find its properties at index 42 in all other cache-arrays. * 'bounding_box': Bounding box of every SSV. * 'size': Number voxels of each SSV. * 'rep_coord': Representative coordinates for each SSV. * 'sv': Supervoxel IDs for every SSV. * 'sample_locations': Lists of rendering locations for each SSV. Each entry is a list (length corresponds to the number of supervoxels) of coordinate arrays for the corresponding SSV. * 'celltype_cnn_e3': Celltype classifications based on the elektronn3 CMN. * 'celltype_cnn_e3_probas': Celltype logits for the different types as an array of shape (M, C; M: Number of predicted random multi-view sets, C: Number of classes). In the example run there are currently 9 predicted classes: STN=0, DA=1, MSN=2, LMAN=3, HVC=4, GP=5, FS=6, TAN=7, INT=8. * 'syn_ssv': Synapse IDs assigned to each SSV. * 'syn_sign_ratio': Area-weighted atio of symmetric synapses, see :func:`~syconn.reps.super_segmentation_object.SuperSegmentationObject.syn_sign_ratio`. * 'sj': Synaptic junction object IDs which were mapped to each SSV. These are used for view rendering and also to generate the 'syn_ssv' objects in combination with contact sites (see corresponding section in the documentation). * 'mapping_sj_ids': Synaptic junction objects which overlap with the respective SSVs. * 'mapping_sj_ratios': Overlap ratio of the synaptic junctions. * 'vc': Vesicle clouds mapped to each SSV. * 'mapping_vc_ids': Vesicle cloud objects which overlap with the respective SSVs. * 'mapping_vc_ratios': Overlap ratio of the vesicle clouds. * 'mi': Mitochondria mapped to each SSV. * 'mapping_mi_ids': Mitochondria objects which overlap with the respective SSVs. * 'mapping_mi_ratios': Overlap ratio of the mitochondria. skeleton: The skeleton representation of this super-supervoxel. Keys which are currently in use: * 'nodes': Array of the node coordinates (in nanometers). * 'edges': Edges between nodes. * 'diameters': Estimated cell diameter at every node. * various node properties, e.g. 'axoness' and 'axoness_avg10000'. Check the available keys ``sso.skeleton.keys()`` of an initialized :class:`~SuperSegmentationObject` object ``sso`` after loading the skeleton (``sso.load_skeleton()``). enable_locking_so: Locking flag for all :class:`syconn.reps.segmentation.SegmentationObject` assigned to this object (e.g. SV, mitochondria, vesicle clouds, ...) nb_cpus: Number of cpus for parallel jobs. will only be used in some processing steps. view_dict: A dictionary for caching 2D projection views. Those are stored as a numpy array of shape (M, N, CH, x, y). M: Length of :py:attr:`~sample_locations` and has the same ordering; N: Number of views per location; CH: Number of channels (1 for glia prediction containing only the cell shape and 4 for neuron analysis containing cell and cell organelle shapes. Stored at :py:attr:`~view_path` and accessed via the :class:`~syconn.backend.storage.CompressedStorage` interface. version_dict: A dictionary which contains the versions of other dataset types which share the same working directory. Defaults to the `Versions` entry in the `config.yml` file. """ def __init__(self, ssv_id: int, version: Optional[str] = None, version_dict: Optional[Dict[str, str]] = None, working_dir: Optional[str] = None, create: bool = False, sv_ids: Optional[Union[np.ndarray, List[int]]] = None, scaling: Optional[np.ndarray] = None, object_caching: bool = True, voxel_caching: bool = True, mesh_caching: bool = True, view_caching: bool = False, config: Optional[DynConfig] = None, nb_cpus: int = 1, enable_locking: bool = False, enable_locking_so: bool = False, ssd_type: Optional[str] = None, ssd: Optional['SuperSegmentationDataset'] = None, sv_graph: Optional[nx.Graph] = None): """ Args: ssv_id: unique SSV ID. version: Version string identifier. if 'tmp' is used, no data will be saved to disk. version_dict: Dictionary which contains the versions of other dataset types which share the same working directory. Defaults to the `versions` entry in the `config.yml`file. working_dir (): Path to the working directory. create: If True, the folder to its storage location :py:attr:`~ssv_dir` will be created. sv_ids: List of agglomerated supervoxels which define the neuron reconstruction. scaling: Array defining the voxel size in nanometers (XYZ). object_caching: :class:`~syconn.reps.segmentation.SegmentationObject` retrieved by :func:`~syconn.reps.segmentation.SegmentationObject.get_seg_objects` will be cached in a dictionary. voxel_caching: Voxel array will be cached at :py:attr:`~syconn.reps.segmentation.SegmentationObject._voxels`. mesh_caching: Meshes (cell fragments, mitos, vesicles, ..) will be cached at :py:attr:`~syconn.reps.segmentation.SegmentationObject._meshes`. view_caching: Views can be cached at :py:attr:`~view_dict`. config: Retrieved from :py:attr:`~syconn.global_params.config`, otherwise must be initialized with a :class:`~syconn.handler.config.DynConfig` nb_cpus: Number of cpus for parallel jobs. will only be used in some processing steps. enable_locking: Enable posix locking for IO operations. enable_locking_so: Locking flag for all :class:`syconn.reps.segmentation.SegmentationObject` assigned. to this object (e.g. SV, mitochondria, vesicle clouds, ...) ssd_type: Type of cell reconstruction. Default: 'ssv'. If speficied and `ssd` is given, types must match. ssd: :py:class:`~syconn.reps.super_segmentation_dataset.SuperSegmentationDataset`; if given it will be used to check if property caching can be used in `:py:class:`~syconn.reps.segmentation.SegmentationDataset``. Property caching can be used by passing the datasets (attributes for caching have to be specified in init. via ``cache_properties``) of interest via the kwarg ``sd_lookup`` during :py:attr:`~syconn.reps.super_segmentation_dataset.SuperSegmentationDataset` initialization. sv_graph: Sueprvoxel graph. Nodes must be uint SV IDs. """ if version == 'temp': version = 'tmp' self._allow_skeleton_calc = False if version == "tmp": self.enable_locking = False create = False self._allow_skeleton_calc = True else: self.enable_locking = enable_locking self._object_caching = object_caching self._voxel_caching = voxel_caching self._mesh_caching = mesh_caching self._view_caching = view_caching self.enable_locking_so = enable_locking_so self.nb_cpus = nb_cpus self._id = ssv_id self.attr_dict = {} self._ssd = ssd if self._ssd is not None: if ssd_type is not None and self._ssd.type != ssd_type: raise TypeError(f'Mis-match between given "ssd_type"={ssd_type} and type of "ssd"={ssd}.') else: ssd_type = self._ssd.type elif ssd_type is None: ssd_type = 'ssv' self._type = ssd_type self._rep_coord = None self._size = None self._bounding_box = None self._objects = {} self.skeleton = None self._voxels = None self._voxels_xy_downsampled = None self._voxels_downsampled = None self._rag = None self._sv_graph_uint = sv_graph # init mesh dicts self._meshes = {"sv": None, "vc": None, "mi": None, "sj": None, "syn_ssv": None, "syn_ssv_sym": None, "syn_ssv_asym": None, "er": None, "golgi": None} self._views = None self._weighted_graph = None self._sample_locations = None self._rot_mat = None self._label_dict = {} self.view_dict = {} if sv_ids is not None: self.attr_dict["sv"] = sv_ids self._setup_working_dir(working_dir, config, version, scaling) if version is None: try: self._version = self.config["versions"][self.type] except KeyError: raise Exception(f"Unclear version '{version}' during initialization of {self}.") elif version == "new": other_datasets = glob.glob(self.working_dir + "/%s_*" % self.type) max_version = -1 for other_dataset in other_datasets: other_version = int(re.findall(r"[\d]+", os.path.basename(other_dataset))[-1]) if max_version < other_version: max_version = other_version self._version = max_version + 1 else: self._version = version if version_dict is None: try: self.version_dict = self.config["versions"] except KeyError: raise ValueError(f"Unclear version '{version}' during initialization of {self}.") else: if isinstance(version_dict, dict): self.version_dict = version_dict else: raise ValueError("No version dict specified in config.") if create: os.makedirs(self.ssv_dir, exist_ok=True) def __hash__(self) -> int: return hash((self.id, self.type, frozenset(self.sv_ids))) def __eq__(self, other: Any) -> bool: if not isinstance(other, self.__class__): return False return self.id == other.id and self.type == other.type and frozenset(self.sv_ids) == frozenset(other.sv_ids) def __ne__(self, other: Any) -> bool: return not self.__eq__(other) def __repr__(self) -> str: return (f'{type(self).__name__}(ssv_id={self.id}, ssd_type="{self.type}", ' f'version="{self.version}", working_dir="{self.working_dir}")') def __getitem__(self, item): return self.attr_dict[item] # IMMEDIATE PARAMETERS @property def type(self) -> str: """ The type of this super-sueprvoxel. Default: 'ssv'. Returns: String identifier of the object type. """ return self._type @property def id(self) -> int: """ Default value is the smalles SV ID which is part of this cell reconstruction. Returns: Globally unique identifier of this object. """ return self._id @property def version(self) -> str: """ Version of the `~SuperSegmentationDataset` this object belongs to. Can be any character or string like '0' or 'axongroundtruth'. Returns: String identifier of the object's version. """ return str(self._version) @property def object_caching(self) -> bool: """If True, :class:`~syconn.reps.segmentation.SegmentationObject`s which are part of this cell reconstruction are cached.""" return self._object_caching @property def voxel_caching(self) -> bool: """If True, voxel data is cached.""" return self._voxel_caching @property def mesh_caching(self) -> bool: """If True, mesh data is cached.""" return self._mesh_caching @property def view_caching(self) -> bool: """If True, view data is cached.""" return self._view_caching @property def scaling(self) -> np.ndarray: """ Voxel size in nanometers (XYZ). Default is taken from the `config.yml` file and accessible via :py:attr:`~config`. """ return self._scaling # PATHS @property def working_dir(self) -> str: """ Working directory. """ return self._working_dir @property def identifier(self) -> str: """ Identifier used to create the folder name of the :class: `~syconn.reps.super_segmentation_dataset.SuperSegmentationDataset` this object belongs to. """ return "%s_%s" % (self.type, self.version.lstrip("_")) @property def ssd_dir(self) -> str: """ Path to the :class:`~syconn.reps.super_segmentation_dataset.SuperSegmentationDataset` directory this object belongs to. """ return "%s/%s/" % (self.working_dir, self.identifier) @property def ssd_kwargs(self) -> dict: return dict(working_dir=self.working_dir, version=self.version, config=self._config, ssd_type=self.type, sso_locking=self.enable_locking) @property def ssv_kwargs(self) -> dict: kw = dict(ssv_id=self.id, working_dir=self.working_dir, version=self.version, config=self._config, ssd_type=self.type, enable_locking=self.enable_locking) if self.version == 'tmp': kw.update(sv_ids=self.sv_ids) return kw @property def ssv_dir(self) -> str: """ Path to the folder where the data of this super-supervoxel is stored. """ return "%s/so_storage/%s/" % (self.ssd_dir, subfold_from_ix_SSO(self.id)) @property def attr_dict_path(self) -> str: """ Path to the attribute storage. :py:attr:`~attr_dict` can be loaded from here. """ # Kept for backwards compatibility, remove if not needed anymore if os.path.isfile(self.ssv_dir + "atrr_dict.pkl"): return self.ssv_dir + "atrr_dict.pkl" return self.ssv_dir + "attr_dict.pkl" @property def skeleton_kzip_path(self) -> str: """ Path to the skeleton storage. """ return self.ssv_dir + "skeleton.k.zip" @property def skeleton_kzip_path_views(self) -> str: """ Path to the skeleton storage. Todo: * probably deprecated. """ return self.ssv_dir + "skeleton_views.k.zip" @property def objects_dense_kzip_path(self) -> str: """Identifier of cell organell overlays""" return self.ssv_dir + "objects_overlay.k.zip" @property def skeleton_path(self) -> str: """Identifier of SSV skeleton""" return self.ssv_dir + "skeleton.pkl" @property def edgelist_path(self) -> str: """Identifier of SSV graph""" return self.ssv_dir + "edge_list.bz2" @property def view_path(self) -> str: """Identifier of view storage""" return self.ssv_dir + "views.pkl" @property def mesh_dc_path(self) -> str: """Identifier of mesh storage""" return self.ssv_dir + "mesh_dc.pkl" @property def vlabel_dc_path(self) -> str: """Identifier of vertex label storage""" return self.ssv_dir + "vlabel_dc.pkl" # IDS @property def sv_ids(self) -> np.ndarray: """ All cell supervoxel IDs which are assigned to this cell reconstruction. """ return np.array(self.lookup_in_attribute_dict("sv"), dtype=np.uint64) @property def sj_ids(self) -> np.ndarray: """ All synaptic junction (sj) supervoxel IDs which are assigned to this cell reconstruction. """ return np.array(self.lookup_in_attribute_dict("sj"), dtype=np.uint64) @property def mi_ids(self) -> np.ndarray: """ All mitochondria (mi) supervoxel IDs which are assigned to this cell reconstruction. """ return np.array(self.lookup_in_attribute_dict("mi"), dtype=np.uint64) @property def vc_ids(self) -> np.ndarray: """ All vesicle cloud (vc) supervoxel IDs which are assigned to this cell reconstruction. """ return np.array(self.lookup_in_attribute_dict("vc"), dtype=np.uint64) @property def dense_kzip_ids(self) -> Dict[str, int]: """ ? """ return dict([("mi", 1), ("vc", 2), ("sj", 3)]) # SegmentationObjects @property def svs(self) -> List[SegmentationObject]: """ All cell :class:`~syconn.reps.segmentation.SegmentationObjects` objects which are assigned to this cell reconstruction. """ return self.get_seg_objects("sv") @property def sjs(self) -> List[SegmentationObject]: """ All synaptic junction (sj) :class:`~syconn.reps.segmentation.SegmentationObjects` objects which are assigned to this cell reconstruction. These objects are based on the initial synapse predictions and may contain synapse-synapse merger. See :py:attr:`~syn_ssv` for merger-free inter-neuron synapses. """ return self.get_seg_objects("sj") @property def mis(self) -> List[SegmentationObject]: """ All mitochondria (mi) :class:`~syconn.reps.segmentation.SegmentationObjects` objects which are assigned to this cell reconstruction. """ return self.get_seg_objects("mi") @property def vcs(self) -> List[SegmentationObject]: """ All vesicle cloud (vc) :class:`~syconn.reps.segmentation.SegmentationObjects` objects which are assigned to this cell reconstruction. """ return self.get_seg_objects("vc") @property def syn_ssv(self) -> List[SegmentationObject]: """ All synaptic junctions :class:`~syconn.reps.segmentation.SegmentationObject` objects which are between super-supervoxels (syn_ssv) and assigned to this cell reconstruction. These objects are generated as an agglomeration of 'syn' objects, which themselves have been generation as a combination of synaptic junction (sj) and contact site (cs) objects to remove merges in the sj objects. """ return self.get_seg_objects("syn_ssv") # MESHES
[docs] def load_mesh(self, mesh_type) -> Optional[MeshType]: """ Load mesh of a specific type, e.g. 'mi', 'sv' (cell supervoxel), 'sj' (connected components of the original synaptic junction predictions), 'syn_ssv' (overlap of 'sj' with cell contact sites), 'syn_ssv_sym' and 'syn_ssv_asym' (only if syn-type predictions are available). Args: mesh_type: Type of :class:`~syconn.reps.segmentation.SegmentationObject` used for mesh retrieval. Returns: Three flat arrays: indices, vertices, normals Raises ValueError: If `mesh_type` does not exist in :py:attr:`~_meshes`. """ if mesh_type in ('syn_ssv_sym', 'syn_ssv_asym'): self.typedsyns2mesh() if mesh_type not in self._meshes: raise ValueError(f'Unknown mesh type for objects "{mesh_type}" in {self}."') if self._meshes[mesh_type] is None: if not self.mesh_caching: return self._load_obj_mesh(mesh_type) self._meshes[mesh_type] = self._load_obj_mesh(mesh_type) return self._meshes[mesh_type]
@property def mesh(self) -> Optional[MeshType]: """ Mesh of all cell supervoxels. """ return self.load_mesh("sv") @property def sj_mesh(self) -> Optional[MeshType]: """ Mesh of all synaptic junction (sj) supervoxels. These objects are based on the original synapse prediction and might contain merger. """ return self.load_mesh("sj") @property def vc_mesh(self) -> Optional[MeshType]: """ Mesh of all vesicle clouds (vc) supervoxels. """ return self.load_mesh("vc") @property def mi_mesh(self) -> Optional[MeshType]: """ Mesh of all mitochondria (mi) supervoxels. """ return self.load_mesh("mi") @property def syn_ssv_mesh(self) -> Optional[MeshType]: """ Mesh of all inter-neuron synapses junction (syn_ssv) supervoxels. These objects are generated as a combination of contact sites and synaptic junctions (sj). """ return self.load_mesh("syn_ssv")
[docs] def label_dict(self, data_type='vertex') -> CompressedStorage: """ Retrieves the compressed storage of labels for a specific data type like 'vertex'. This dictionary stores various predictions, with keys and the associated labels corresponding to the mesh vertices. The ordering is consistent with the vertex order in ``self.mesh[1]``. Uses the :class:`~syconn.backend.storage.CompressedStorage` interface. Args: data_type (str): The key for the stored labels. Returns: CompressedStorage: The storage object containing the requested labels. Raises: ValueError: If the requested data type is not supported. """ if data_type == 'vertex': if data_type in self._label_dict: pass else: self._label_dict[data_type] = CompressedStorage( None if self.version == 'tmp' else self.vlabel_dc_path) return self._label_dict[data_type] else: raise ValueError('Label dict for data type "{}" not supported.' ''.format(data_type))
# PROPERTIES @property def config(self) -> DynConfig: """ Retrieves the configuration object containing all dataset-specific parameters. Refer to :class:`~syconn.handler.config.DynConfig` for more details. Returns: DynConfig: The configuration object. """ if self._config is None: self._config = global_params.config return self._config @property def size(self) -> int: """ Retrieves the number of voxels associated with this SuperSegmentationObject. Returns: int: The number of voxels associated with this SuperSegmentationObject. """ if self._size is None: self._size = self.lookup_in_attribute_dict("size") if self._size is None: self.calculate_size() return self._size @property def bounding_box(self) -> List[np.ndarray]: """ Retrieves the bounding box of this SuperSegmentationObject. Returns: List[np.ndarray]: The bounding box represented by two numpy arrays. """ if self._bounding_box is None: self._bounding_box = self.lookup_in_attribute_dict("bounding_box") if self._bounding_box is None: self.calculate_bounding_box() return self._bounding_box @property def shape(self) -> np.ndarray: """ Retrieves the XYZ extent of this SuperSegmentationObject in voxels. Returns: np.ndarray: The shape/extent of this SSV object in voxels (XYZ). """ return self.bounding_box[1] - self.bounding_box[0] @property def rep_coord(self) -> np.ndarray: """ Retrieves the representative coordinate of this SuperSegmentationObject, which is the representative coordinate of the first supervoxel in `~svs`. Returns: np.ndarray: A 1D array of the representative coordinate (XYZ). """ if self._rep_coord is None: self._rep_coord = self.lookup_in_attribute_dict("rep_coord") if self._rep_coord is None: self._rep_coord = self.svs[0].rep_coord return self._rep_coord @property def attr_dict_exists(self) -> bool: """ Checks if an attribute dictionary file exists at :py:attr:`~attr_dict_path` for this SuperSegmentationObject. Returns: bool: True if the attribute dictionary file exists, False otherwise. """ return os.path.isfile(self.attr_dict_path)
[docs] def mesh_exists(self, obj_type: str) -> bool: """ Checks if the mesh of :class:`~syconn.reps.segmentation.SegmentationObject`s of type `obj_type` is stored in the :class:`~syconn.backend.storage.MeshStorage` at :py:attr:`~mesh_dc_path`. Args: obj_type (str): The type of requested :class:`~syconn.reps.segmentation.SegmentationObject`. Returns: bool: True if the mesh exists, False otherwise. """ mesh_dc = MeshStorage(self.mesh_dc_path, disable_locking=True) return obj_type in mesh_dc
@property def voxels(self) -> Optional[np.ndarray]: """ Retrieves the voxels associated with the SuperSegmentationObject. Returns: Optional[np.ndarray]: A 3D binary array indicating voxel locations. If no voxels are present, returns None. """ if len(self.sv_ids) == 0: return None if self._voxels is None: voxels = np.zeros(self.bounding_box[1] - self.bounding_box[0], dtype=np.bool) for sv in self.svs: sv._voxel_caching = False if sv.voxels_exist: log_reps.debug(np.sum(sv.voxels), sv.size) box = [sv.bounding_box[0] - self.bounding_box[0], sv.bounding_box[1] - self.bounding_box[0]] voxels[box[0][0]: box[1][0], box[0][1]: box[1][1], box[0][2]: box[1][2]][sv.voxels] = True else: log_reps.warning("missing voxels from %d" % sv.id) if self.voxel_caching: self._voxels = voxels else: return voxels return self._voxels @property def voxels_xy_downsampled(self) -> Optional[np.ndarray]: """ Retrieves the XY-downsampled voxels associated with this SuperSegmentationObject. Returns: Optional[np.ndarray]: A 3D binary array indicating downsampled voxel locations, or None if no voxels are present. """ if self._voxels_xy_downsampled is None: if self.voxel_caching: self._voxels_xy_downsampled = \ self.load_voxels_downsampled((2, 2, 1)) else: return self.load_voxels_downsampled((2, 2, 1)) return self._voxels_xy_downsampled @property def rag(self) -> nx.Graph: """ Retrieves the region adjacency graph (defining the supervoxel graph) of this SuperSegmentationObject. Returns: nx.Graph: The supervoxel graph with nodes of type SegmentationObject, as defined in the syconn.reps.segmentation class. """ if self._rag is None: self._rag = self.load_sv_graph() return self._rag @property def compartment_meshes(self) -> dict: """ Retrieves the compartment mesh storage for this SuperSegmentationObject. Returns: dict: A dictionary representing the meshes of each compartment in the SuperSegmentationObject. """ if "axon" not in self._meshes: self._load_compartment_meshes() return {k: self._meshes[k] for k in ["axon", "dendrite", "soma"]} def _load_compartment_meshes(self, overwrite: bool = False): """ Loading compartment meshes in-place as 'axon', 'dendrite', 'soma' to :py:attr:`~syconn.reps.super_segmentation_object.SuperSegmentationObject._meshes`. Args: overwrite: Overwrites existing compartment meshes """ mesh_dc = MeshStorage(self.mesh_dc_path, disable_locking=not self.enable_locking) if "axon" not in mesh_dc or overwrite: mesh_compartments = compartmentalize_mesh(self) mesh_dc["axon"] = mesh_compartments["axon"] mesh_dc["dendrite"] = mesh_compartments["dendrite"] mesh_dc["soma"] = mesh_compartments["soma"] mesh_dc.push() comp_meshes = {k: mesh_dc[k] for k in ["axon", "dendrite", "soma"]} self._meshes.update(comp_meshes)
[docs] def load_voxels_downsampled(self, downsampling: tuple = (2, 2, 1), nb_threads: int = 10) -> np.ndarray: """ Loads all voxels of this SuperSegmentationObject. Args: downsampling (tuple): The downsampling factors for each axis. nb_threads (int): The number of threads to use for loading. Returns: np.ndarray: An array of downsampled voxel coordinates. """ return ssh.load_voxels_downsampled(self, downsampling=downsampling, nb_threads=nb_threads)
[docs] def get_seg_objects(self, obj_type: str) -> List[SegmentationObject]: """ Factory method for creating :class:`~syconn.reps.segmentation.SegmentationObject`s of a specified `obj_type`. Args: obj_type (str): Type of requested :class:`~syconn.reps.segmentation.SegmentationObject`s. Returns: List[:class:`~syconn.reps.segmentation.SegmentationObject`]: A list of :class:`~syconn.reps.segmentation.SegmentationObject`s of the specified `obj_type`, sharing the same working directory as this SSV object. """ if obj_type not in self._objects: objs = [] for obj_id in self.lookup_in_attribute_dict(obj_type): objs.append(self.get_seg_obj(obj_type, obj_id)) if self.object_caching: self._objects[obj_type] = objs else: return objs return self._objects[obj_type]
[docs] def get_seg_obj(self, obj_type: str, obj_id: int) -> SegmentationObject: """ Factory method for creating a SegmentationObject of a specific type and ID. Args: obj_type (str): Type of the requested SegmentationObject from :class:`~syconn.reps.segmentation.SegmentationObject`. obj_id (int): ID of the requested object. Returns: SegmentationObject: The created SegmentationObject of type `obj_type`, sharing the same working directory as this SSV object. """ kwargs = dict(enable_locking=self.enable_locking_so, mesh_caching=self.mesh_caching, voxel_caching=self.voxel_caching) if self._ssd is not None and obj_type in self._ssd.sd_lookup and self._ssd.sd_lookup[obj_type] is not None: sd_obj = self._ssd.sd_lookup[obj_type] if str(sd_obj.version) != str(self.version_dict[obj_type]) or sd_obj.working_dir != self.working_dir: msg = (f'Inconsistent working directory or version for {obj_type} stored in {self} and look up ' f'dataset {sd_obj}.') log_reps.error(msg) raise ValueError(msg) return sd_obj.get_segmentation_object(obj_id, **kwargs) return SegmentationObject(obj_id=obj_id, obj_type=obj_type, version=self.version_dict[obj_type], working_dir=self.working_dir, create=False, scaling=self.scaling, config=self.config)
[docs] def get_seg_dataset(self, obj_type: str) -> SegmentationDataset: """ Factory method for the :class:`~syconn.reps.segmentation.SegmentationDataset` of a specified `obj_type`. Args: obj_type (str): Type of the requested :class:`~syconn.reps.segmentation.SegmentationDataset`. Returns: :class:`~syconn.reps.segmentation.SegmentationDataset`: The SegmentationDataset of type `obj_type`, sharing the same working directory as this SSV object. """ return SegmentationDataset(obj_type, version_dict=self.version_dict, version=self.version_dict[obj_type], scaling=self.scaling, working_dir=self.working_dir)
[docs] def load_attr_dict(self) -> int: """ Loads the attribute dictionary of the SuperSegmentationObject stored at :py:attr:`~ssv_dir` from disk. Returns: int: Status code (0 for success, -1 for failure). """ try: self.attr_dict = load_pkl2obj(self.attr_dict_path) return 0 except (IOError, EOFError, pkl.UnpicklingError) as e: if '[Errno 2] No such file or' not in str(e): log_reps.critical(f"Could not load SSO attributes from {self.attr_dict_path} due to '{e}'.") return -1
@property def sv_graph_uint(self) -> nx.Graph: """ Retrieves the supervoxel graph with unsigned integer node IDs. Returns: nx.Graph: The supervoxel graph with uint node IDs. """ if self._sv_graph_uint is None: if os.path.isfile(self.edgelist_path): self._sv_graph_uint = nx.read_edgelist(self.edgelist_path, nodetype=np.uint64) else: raise ValueError("Could not find graph data for SSV {}.".format(self.id)) return self._sv_graph_uint
[docs] def load_sv_graph(self) -> nx.Graph: """ Loads the supervoxel graph for this SuperSegmentationObject, where node objects are instance of :class:`~syconn.reps.segmentation.SegmentationObject`. The graph is derived from the supervoxel ID graph stored in :py:attr:`_sv_graph` or the edge list stored at :py:attr:`edgelist_path`. Returns: nx.Graph: The supervoxel graph with :class:`~syconn.reps.segmentation.SegmentationObject` nodes. """ G = self.sv_graph_uint # # Might be useful as soon as global graph path is available # else: # if os.path.isfile(global_params.config.neuron_svgraph_path): # G_glob = nx.read_edgelist(global_params.config.neuron_svgraph_path, # nodetype=np.uint64) # G = nx.Graph() # cc = nx.node_connected_component(G_glob, self.sv_ids[0]) # assert len(set(cc).difference(set(self.sv_ids))) == 0, \ # "SV IDs in graph differ from SSV SVs." # for e in G_glob.edges(cc): # G.add_edge(*e) if len(set(list(G.nodes())).difference(set(self.sv_ids))) != 0: msg = "SV IDs in graph differ from SSV SVs." log_reps.error(msg) raise ValueError(msg) # create graph with SV nodes new_G = nx.Graph() for e in G.edges(): new_G.add_edge(self.get_seg_obj("sv", e[0]), self.get_seg_obj("sv", e[1])) return new_G
[docs] def load_sv_edgelist(self) -> List[Tuple[int, int]]: """ Loads the edge list of the supervoxel graph. Returns: Edge list representing the supervoxel graph. """ g = self.load_sv_graph() return list(g.edges())
def _load_obj_mesh(self, obj_type: str = "sv", rewrite: bool = False) -> MeshType: """ Loads the mesh for a specified `obj_type`, merging meshes from the underlying super-voxel objects if :func:`~mesh_exists` is False. Does not currently support color array and sym. asym synapse type. Args: obj_type (str): The type of object for which the mesh is to be loaded. rewrite (bool): Whether to overwrite existing meshes (default: False). Returns: MeshType: A tuple containing indices, vertices, and normals of the mesh. """ if not rewrite and self.mesh_exists(obj_type) and not \ self.version == "tmp": mesh_dc = MeshStorage(self.mesh_dc_path, disable_locking=not self.enable_locking) if len(mesh_dc[obj_type]) == 3: ind, vert, normals = mesh_dc[obj_type] else: ind, vert = mesh_dc[obj_type] normals = np.zeros((0,), dtype=np.float32) else: ind, vert, normals = merge_someshes(self.get_seg_objects(obj_type), nb_cpus=self.nb_cpus, use_new_subfold=self.config.use_new_subfold) if not self.version == "tmp": mesh_dc = MeshStorage(self.mesh_dc_path, read_only=False, disable_locking=not self.enable_locking) mesh_dc[obj_type] = [ind, vert, normals] mesh_dc.push() return np.array(ind, dtype=np.int32), np.array(vert, dtype=np.float32), np.array(normals, dtype=np.float32) def _load_obj_mesh_compr(self, obj_type: str = "sv") -> MeshType: """ Loads a single mesh of all objects of a specified type assigned to this SuperSegmentationObject. Args: obj_type (str): Type of requested objects. Returns: A single mesh of all objects. """ mesh_dc = MeshStorage(self.mesh_dc_path, disable_locking=not self.enable_locking) return mesh_dc._dc_intern[obj_type]
[docs] def save_attr_dict(self): """ Saves the SuperSegmentationObject's attribute dictionary to disk. """ if self.version == 'tmp': log_reps.warning('"save_attr_dict" called but this SSV has version "tmp", attribute dict will' ' not be saved to disk.') return try: orig_dc = load_pkl2obj(self.attr_dict_path) except (IOError, EOFError, FileNotFoundError, pkl.UnpicklingError) as e: if '[Errno 2] No such file or' not in str(e): log_reps.critical(f"Could not load SSO attributes from {self.attr_dict_path} due to '{e}'. Overwriting") orig_dc = {} orig_dc.update(self.attr_dict) write_obj2pkl(self.attr_dict_path, orig_dc)
[docs] def save_attributes(self, attr_keys: List[str], attr_values: List[Any]): """ Writes attributes to the attribute dictionary on the file system. Does not care about self.attr_dict. Args: attr_keys (List[str]): A list of attribute keys to save. attr_values (List[Any]): A list of corresponding attribute values to save. """ if self.version == 'tmp': log_reps.warning('"save_attributes" called but this SSV has version "tmp", attributes will' ' not be saved to disk.') return if not hasattr(attr_keys, "__len__"): attr_keys = [attr_keys] if not hasattr(attr_values, "__len__"): attr_values = [attr_values] try: attr_dict = load_pkl2obj(self.attr_dict_path) except (IOError, EOFError, FileNotFoundError) as e: if not "[Errno 13] Permission denied" in str(e): pass else: log_reps.critical(f"Could not load SSO attributes at {self.attr_dict_path} due to {e}.") attr_dict = {} for k, v in zip(attr_keys, attr_values): attr_dict[k] = v try: write_obj2pkl(self.attr_dict_path, attr_dict) except IOError as e: if not "[Errno 13] Permission denied" in str(e): raise (IOError, e) else: log_reps.warn("Could not save SSO attributes to %s due to missing permissions." % self.attr_dict_path, RuntimeWarning)
[docs] def attr_exists(self, attr_key: str) -> bool: """ Checks if a specified attribute exists for this SuperSegmentationObject. Args: attr_key (str): The attribute key to check. Returns: bool: True if the attribute exists in :py:attr:`~attr_dict`, False otherwise. """ return attr_key in self.attr_dict
[docs] def lookup_in_attribute_dict(self, attr_key: str) -> Optional[Any]: """ Retrieves the value associated with a specified attribute key from the attribute dictionary stored in :py:attr:`~attr_dict`. Args: attr_key (str): Attribute key to look up. Returns: Optional[Any]: The value to the key ``attr_key``, or None if the key is not existent. """ if attr_key in self.attr_dict: return self.attr_dict[attr_key] # TODO: this is somehow arbitrary elif len(self.attr_dict) <= 4: if self.load_attr_dict() == -1: return None if attr_key in self.attr_dict: return self.attr_dict[attr_key] else: return None
[docs] def load_so_attributes(self, obj_type: str, attr_keys: List[str]): """ Collects attributes from instances of the SegmentationObject class of a specified type. The attribute value ordering for each key is the same as the `svs` attribute. Args: obj_type (str): The type of SegmentationObject to collect attributes from. attr_keys (List[str]): A list of attribute keys to collect; these keys must exist for the requested `obj_type`. Returns: List[Any]: A list of attribute values for each key in `attr_keys`. """ attr_values = [[] for _ in range(len(attr_keys))] for obj in self.get_seg_objects(obj_type): for ii, attr_key in enumerate(attr_keys): # lookup_in_attribute_dict uses attribute caching of the obj itself or, if enabled, # the SegmentationDataset cache in the SSD of this SSO. attr = obj.lookup_in_attribute_dict(attr_key) attr_values[ii].append(attr) return attr_values
[docs] def calculate_size(self): """ Calculates the size (number of voxels) of this SuperSegmentationObject, referenced as :py:attr:`size`. """ self._size = np.sum(self.load_so_attributes('sv', ['size']))
[docs] def calculate_bounding_box(self): """ Calculates the bounding box and size of this SuperSegmentationObject (refer :py:attr:`~bounding_box` and :py:attr:`size`). """ if len(self.sv_ids) == 0: self._bounding_box = np.zeros((2, 3), dtype=np.int32) self._size = 0 return self._bounding_box = np.ones((2, 3), dtype=np.int32) * np.inf self._size = np.inf bounding_boxes, sizes = self.load_so_attributes('sv', ['bounding_box', 'size']) self._size = np.sum(sizes) self._bounding_box[0] = np.min(bounding_boxes, axis=0)[0] self._bounding_box[1] = np.max(bounding_boxes, axis=0)[1] self._bounding_box = self._bounding_box.astype(np.int32)
[docs] def calculate_skeleton(self, force: bool = False, **kwargs): """ Merges existing supervoxel skeletons or calculates them from scratch using :func:`~syconn.reps.super_segmentation_helper.create_sso_skeletons_wrapper` if ``allow_ssv_skel_gen=True``. Skeleton will be saved at :py:attr:`~skeleton_path`. Args: force (bool): If True, skips :func:`~load_skeleton` and forces the calculation of the skeleton regardless of existing data. """ if force or self._allow_skeleton_calc: return ssh.create_sso_skeletons_wrapper([self], **kwargs) if self.skeleton is not None and len(self.skeleton["nodes"]) != 0 \ and not force: return ssh.create_sso_skeletons_wrapper([self], **kwargs)
[docs] def save_skeleton_to_kzip(self, dest_path: Optional[str] = None, name: str = 'skeleton', additional_keys: Optional[List[str]] = None, comments: Optional[Union[np.ndarray, List[str]]] = None): """ Saves the skeleton representation of the super-supervoxel to a KNOSSOS compatible k.zip file, including additional properties and comments. Args: dest_path: Destination path for k.zip file. If None, the default skeleton k.zip path is used. name: Identifier or name of saved skeleton that appears in KNOSSOS. additional_keys: Additional skeleton keys to be converted into KNOSSOS skeleton node properties. Always attempts to write out the keys 'axoness', 'cell_type', and 'meta'. comments: np.ndarray of strings or list of strings of length N where N equals the number of skeleton nodes. These comments will be converted into KNOSSOS skeleton node comments. Returns: None. The method saves a KNOSSOS compatible k.zip file containing the super-supervoxel skeleton and its node properties. """ if type(additional_keys) == str: additional_keys = [additional_keys] try: if self.skeleton is None: self.load_skeleton() if additional_keys is not None: for k in additional_keys: assert k in self.skeleton, "Additional key %s is not " \ "part of SSV %d self.skeleton.\nAvailable keys: %s" % \ (k, self.id, repr(self.skeleton.keys())) a = skeleton.SkeletonAnnotation() a.scaling = self.scaling a.comment = name skel_nodes = [] for i_node in range(len(self.skeleton["nodes"])): c = self.skeleton["nodes"][i_node] r = self.skeleton["diameters"][i_node] / 2 skel_nodes.append(skeleton.SkeletonNode(). from_scratch(a, c[0], c[1], c[2], radius=r)) pred_key_ax = "{}_avg{}".format(self.config['compartments'][ 'view_properties_semsegax']['semseg_key'], self.config['compartments'][ 'dist_axoness_averaging']) if pred_key_ax in self.skeleton: skel_nodes[-1].data[pred_key_ax] = self.skeleton[pred_key_ax][ i_node] if "meta" in self.skeleton: skel_nodes[-1].data["meta"] = self.skeleton["meta"][i_node] if additional_keys is not None: for k in additional_keys: skel_nodes[-1].data[k] = self.skeleton[k][i_node] if comments is not None: skel_nodes[-1].setComment(str(comments[i_node])) a.addNode(skel_nodes[-1]) for edge in self.skeleton["edges"]: a.addEdge(skel_nodes[edge[0]], skel_nodes[edge[1]]) if dest_path is None: dest_path = self.skeleton_kzip_path elif not dest_path.endswith('.k.zip'): dest_path += '.k.zip' write_skeleton_kzip(dest_path, [a]) except Exception as e: log_reps.warning("[SSO: %d] Could not load/save skeleton:\n%s" % (self.id, repr(e)))
[docs] def save_objects_to_kzip_sparse(self, obj_types: Optional[Iterable[str]] = None, dest_path: Optional[str] = None): """ Exports cellular organelles as coordinates with size, shape, and overlap properties in a KNOSSOS compatible format to a k.zip file. Args: obj_types: Type identifiers of the supervoxel objects which are exported. dest_path: Path to the destination file. If None, results will be stored at the skeleton k.zip path. Returns: None. The method saves a KNOSSOS compatible k.zip file with the specific cellular organelles. """ if obj_types is None: obj_types = self.config['process_cell_organelles'] annotations = [] for obj_type in obj_types: assert obj_type in self.attr_dict map_ratio_key = "mapping_%s_ratios" % obj_type if not map_ratio_key in self.attr_dict.keys(): log_reps.warning("%s not yet mapped. Object nodes are not " "written to k.zip." % obj_type) continue overlap_ratios = np.array(self.attr_dict[map_ratio_key]) overlap_ids = np.array(self.attr_dict["mapping_%s_ids" % obj_type]) a = skeleton.SkeletonAnnotation() a.scaling = self.scaling a.comment = obj_type so_objs = self.get_seg_objects(obj_type) for so_obj in so_objs: c = so_obj.rep_coord # somewhat approximated from sphere volume: r = np.power(so_obj.size / 3., 1 / 3.) skel_node = skeleton.SkeletonNode(). \ from_scratch(a, c[0], c[1], c[2], radius=r) skel_node.data["overlap"] = \ overlap_ratios[overlap_ids == so_obj.id][0] skel_node.data["size"] = so_obj.size skel_node.data["shape"] = so_obj.shape a.addNode(skel_node) annotations.append(a) if dest_path is None: dest_path = self.skeleton_kzip_path elif not dest_path.endswith('.k.zip'): dest_path += '.k.zip' write_skeleton_kzip(dest_path, annotations)
[docs] def save_objects_to_kzip_dense(self, obj_types: List[str], dest_path: Optional[str] = None): """ Exports cellular organelles as coordinates with size, shape, and overlap properties in a KNOSSOS compatible format. Args: obj_types: Type identifiers of the supervoxel objects which are exported. dest_path: Path to the destination file. If None, result will be stored at the objects_dense_kzip_path. Returns: None. The function saves a KNOSSOS compatible k.zip file with the specified cellular organelles. """ if dest_path is None: dest_path = self.objects_dense_kzip_path if os.path.exists(self.objects_dense_kzip_path[:-6]): shutil.rmtree(self.objects_dense_kzip_path[:-6]) if os.path.exists(self.objects_dense_kzip_path): os.remove(self.objects_dense_kzip_path) for obj_type in obj_types: so_objs = self.get_seg_objects(obj_type) for so_obj in so_objs: so_obj.save_kzip(path=dest_path, write_id=self.dense_kzip_ids[obj_type])
[docs] def total_edge_length(self, compartments_of_interest: Optional[List[int]] = None, ax_pred_key: str = 'axoness_avg10000') -> Union[np.ndarray, float]: """ Calculates the total edge length of the super-supervoxel skeleton in nanometers. Args: compartments_of_interest: A list specifying which compartments to include in the calculation. Options include axon (1), dendrite (0), and soma (2). ax_pred_key: The key of the compartment prediction stored in the skeleton. This is used only if compartments_of_interest is set. Returns: The total sum of all edge lengths (L2 norm) in the skeleton, measured in nanometers. """ if self.skeleton is None: self.load_skeleton() nodes = self.skeleton["nodes"] edges = self.skeleton["edges"] if compartments_of_interest is None: return np.sum([np.linalg.norm( self.scaling * (nodes[e[0]] - nodes[e[1]])) for e in edges]) else: node_labels = self.skeleton[ax_pred_key] edge_length = 0 for e in edges: if (node_labels[e[0]] in compartments_of_interest) and (node_labels[e[1]] in compartments_of_interest): edge_length += np.linalg.norm(self.scaling * (nodes[e[0]] - nodes[e[1]])) return edge_length
[docs] def save_skeleton(self, to_kzip=False, to_object=True): """ Saves the skeleton to default locations as `.pkl` and optionally as `.k.zip`. Args: to_kzip (bool): If True, stores the skeleton as a KNOSSOS compatible XML inside a k.zip file. to_object (bool): If True, stores the skeleton as a dictionary in a pickle file. Returns: None. The skeleton is saved to the specified formats and locations. """ if self.version == 'tmp': log_reps.debug('"save_skeleton" called but this SSV ' 'has version "tmp", skeleton will' ' not be saved to disk.') return if to_object: write_obj2pkl(self.skeleton_path, self.skeleton) if to_kzip: self.save_skeleton_to_kzip()
[docs] def load_skeleton(self) -> bool: """ Loads the skeleton from file or computes it if it does not exist and generation is allowed (requires ``allow_ssv_skel_gen=True``). Returns: A boolean indicating whether the skeleton was successfully loaded or generated. """ if self.skeleton is not None: return True try: self.skeleton = load_pkl2obj(self.skeleton_path) self.skeleton["nodes"] = self.skeleton["nodes"].astype(np.float32) return True except: if global_params.config.allow_ssv_skel_gen: if global_params.config.use_kimimaro: # add per ssv skeleton generation for kimimaro raise NotImplementedError('Individual cells cannot be processed with kimimaro.') else: self.calculate_skeleton() return True return False
[docs] def celltype(self, key: Optional[str] = None) -> int: """ Retrieves the cell type classification result. If `key` is specified, it returns the corresponding value loaded by :func:`~lookup_in_attribute_dict`. Otherwise, the default CMN model is used. Args: key: The key where the classification result is stored. If None, the default key is used. Returns: The cell type classification result. """ if key is None: key = 'celltype_cnn_e3' return self.lookup_in_attribute_dict(key)
[docs] def weighted_graph(self, add_node_attr: Iterable[str] = ()) -> nx.Graph: """ Creates a Euclidean distance (in nanometers) weighted graph representation of the skeleton of this SSV object. The node IDs represent the index in the `node` array part of the skeleton. Weights are stored as 'weight' in the graph, which allows usage of methods like `nx.single_source_dijkstra_path(..)`. Args: add_node_attr: An iterable of node attributes to be added to the graph. These must exist in the skeleton. Returns: A networkx graph representing the SSV object's skeleton with Euclidean distance (in nanometers) weights. """ if self._weighted_graph is None or np.any([len(nx.get_node_attributes( self._weighted_graph, k)) == 0 for k in add_node_attr]): if self.skeleton is None: self.load_skeleton() node_scaled = self.skeleton["nodes"] * self.scaling edges = np.array(self.skeleton["edges"], dtype=np.int64) edge_coords = node_scaled[edges] weights = np.linalg.norm(edge_coords[:, 0] - edge_coords[:, 1], axis=1) self._weighted_graph = nx.Graph() self._weighted_graph.add_nodes_from( [(ix, dict(position=coord)) for ix, coord in enumerate(self.skeleton['nodes'])]) self._weighted_graph.add_weighted_edges_from( [(edges[ii][0], edges[ii][1], weights[ii]) for ii in range(len(weights))]) for k in add_node_attr: dc = {} for n in self._weighted_graph.nodes(): dc[n] = self.skeleton[k][n] nx.set_node_attributes(self._weighted_graph, dc, k) return self._weighted_graph
[docs] def syn_sign_ratio(self, weighted: bool = True, recompute: bool = True, comp_types: Optional[List[int]] = None, comp_types_partner: Optional[List[int]] = None) -> float: """ Computes the ratio of symmetric synapses between specified functional compartments. The ratio ranges between 0 and 1, and -1 is returned if no synapse objects exist. The predicted boutons are converted into axon labels; 3 (en-passant) changes to 1 and 4 (terminal) to 1. Todo: * Propagate the default of synapse type to this method and return -1 if synapse type predictions are unavailable. Args: weighted (bool): If True, computes an area-weighted ratio of symmetric synapses. recompute (bool): If True, disregards any existing value and recomputes the ratio. comp_types (list): The functional compartment types on this cell to consider for the ratio. Default is [1,]. comp_types_partner (list): The types on the partner cell to consider. Default is [0,]. Returns: float: The (area-weighted) ratio of symmetric synapses, or -1 if no synapses are present. """ if comp_types is None: comp_types = [1, ] if comp_types_partner is None: comp_types_partner = [0, ] ratio = self.lookup_in_attribute_dict("syn_sign_ratio") if not recompute and ratio is not None: return ratio syn_signs = [] syn_sizes = [] props = load_so_attr_bulk(self.syn_ssv, ('partner_axoness', 'syn_sign', 'mesh_area', 'neuron_partners'), use_new_subfold=self.config.use_new_subfold) for syn in self.syn_ssv: ax = np.array(props['partner_axoness'][syn.id]) # convert boutons to axon class ax[ax == 3] = 1 ax[ax == 4] = 1 partners = props['neuron_partners'][syn.id] this_cell_ix = list(partners).index(self.id) other_cell_ix = 1 - this_cell_ix if ax[this_cell_ix] not in comp_types: continue if ax[other_cell_ix] not in comp_types_partner: continue syn_signs.append(props['syn_sign'][syn.id]) syn_sizes.append(props['mesh_area'][syn.id] / 2) log_reps.debug(f'Used {len(syn_signs)} synapses with a total size of {np.sum(syn_sizes)} um^2 between {comp_types} ' f'(this cell) and {comp_types_partner} (other cells).') if len(syn_signs) == 0 or np.sum(syn_sizes) == 0: return -1 syn_signs = np.array(syn_signs) syn_sizes = np.array(syn_sizes) if weighted: ratio = np.sum(syn_sizes[syn_signs == -1]) / float(np.sum(syn_sizes)) else: ratio = np.sum(syn_signs == -1) / float(len(syn_signs)) return ratio
[docs] def aggregate_segmentation_object_mappings(self, obj_types: List[str], save: bool = False): """ Aggregates mapping information of cellular organelles from the supervoxels of the SSV. After this step, :func:`~apply_mapping_decision` can be called to apply final assignments. Examples: A mitochondrion can extend over multiple supervoxels, so it will overlap with all of them partially. Here, the overlap information of all supervoxels assigned to this SSV will be aggregated. Args: obj_types: A list of cell organelle types to process. save: If True, saves the attribute dictionary at the end. Returns: None. This method aggregates the mapping information. """ assert isinstance(obj_types, list) mappings = dict((obj_type, Counter()) for obj_type in obj_types) for sv in self.svs: sv.load_attr_dict() for obj_type in obj_types: if "mapping_%s_ids" % obj_type in sv.attr_dict: keys = sv.attr_dict["mapping_%s_ids" % obj_type] values = sv.attr_dict["mapping_%s_ratios" % obj_type] mappings[obj_type] += Counter(dict(zip(keys, values))) for obj_type in obj_types: if obj_type in mappings: self.attr_dict["mapping_%s_ids" % obj_type] = list(mappings[obj_type].keys()) self.attr_dict["mapping_%s_ratios" % obj_type] = list(mappings[obj_type].values()) if save: self.save_attr_dict()
[docs] def apply_mapping_decision(self, obj_type: str, correct_for_background: bool = True, lower_ratio: Optional[float] = None, upper_ratio: Optional[float] = None, sizethreshold: Optional[float] = None, save: bool = True): """ Applies mapping decisions of cellular organelles to the SSV object based on overlap ratios. A SegmentationObject in question is assigned to this SuperSegmentationObject if they share the highest overlap. For more details see `SyConn/docs/object_mapping.md`. Default parameters for the mapping will be taken from the `config.yml` file. Args: obj_type: Type of SegmentationObject which are to be mapped. correct_for_background: If True, ignores the background ID during mapping. lower_ratio: The minimum overlap ratio required for objects to be mapped. upper_ratio: The maximum overlap ratio allowed for objects to be mapped. sizethreshold: The minimum voxel size of an object to be considered for mapping, objects below will be ignored. save: If True, saves the attribute dictionary after mapping. Todo: * check what `correct_for_background` was for. Any usecase for `correct_for_background=False`? * duplicate of ssd_proc._apply_mapping_decisions_thread, implement common-use method. Returns: None. Applies the mapping decisions directly to the SSV. """ assert obj_type in self.version_dict self.load_attr_dict() if not "mapping_%s_ratios" % obj_type in self.attr_dict: log_reps.error("No mapping ratios found") return if not "mapping_%s_ids" % obj_type in self.attr_dict: log_reps.error("no mapping ids found") return if lower_ratio is None: try: lower_ratio = self.config['cell_objects']["lower_mapping_ratios"][ obj_type] except KeyError: msg = "Lower ratio undefined" log_reps.error(msg) raise ValueError(msg) if upper_ratio is None: try: upper_ratio = self.config['cell_objects']["upper_mapping_ratios"][ obj_type] except: log_reps.critical("Upper ratio undefined - 1. assumed") upper_ratio = 1. if sizethreshold is None: try: sizethreshold = self.config['cell_objects']["sizethresholds"][obj_type] except KeyError: msg = "Size threshold undefined" log_reps.error(msg) raise ValueError(msg) obj_ratios = np.array(self.attr_dict["mapping_%s_ratios" % obj_type]) if correct_for_background: for i_so_id in range( len(self.attr_dict["mapping_%s_ids" % obj_type])): so_id = self.attr_dict["mapping_%s_ids" % obj_type][i_so_id] obj_version = self.config["versions"][obj_type] this_so = SegmentationObject(so_id, obj_type, version=obj_version, scaling=self.scaling, working_dir=self.working_dir) this_so.load_attr_dict() if 0 in this_so.attr_dict["mapping_ids"]: ratio_0 = this_so.attr_dict["mapping_ratios"][ this_so.attr_dict["mapping_ids"] == 0][0] obj_ratios[i_so_id] /= (1 - ratio_0) id_mask = obj_ratios > lower_ratio if upper_ratio < 1.: id_mask[obj_ratios > upper_ratio] = False candidate_ids = \ np.array(self.attr_dict["mapping_%s_ids" % obj_type])[id_mask] self.attr_dict[obj_type] = [] for candidate_id in candidate_ids: obj = SegmentationObject(candidate_id, obj_type=obj_type, version=self.version_dict[obj_type], working_dir=self.working_dir, config=self.config) if obj.size > sizethreshold: self.attr_dict[obj_type].append(candidate_id) if save: self.save_attr_dict()
def _map_cellobjects(self, obj_types: Optional[List[str]] = None, save: bool = True): """ Maps all existing cell organelles (as defined in :py:attr:`~config['process_cell_organelles']`) to the SuperSegmentationObject. Args: obj_types: Type of :class:`~syconn.reps.super_segmentation_object.SuperSegmentationObject` which should be mapped. If None, uses the default list. save: If True, saves the attribute dictionary of the SuperSegmentationObject afterwards. Returns: None. The method maps the specified cell organelles to the SuperSegmentationObject. """ if obj_types is None: obj_types = self.config['process_cell_organelles'] self.aggregate_segmentation_object_mappings(obj_types, save=save) for obj_type in obj_types: # TODO: remove handling of sj? self.apply_mapping_decision(obj_type, save=save, correct_for_background=obj_type == "sj")
[docs] def clear_cache(self): """ Clears the following cached data related to the SSV: * :py:attr:`~voxels` * :py:attr:`~voxels_xy_downsampled` * :py:attr:`~sample_locations` * :py:attr:`~_objects` * :py:attr:`~_views` * :py:attr:`~skeleton` * :py:attr:`~_meshes` Returns: None. The method clears various cached attributes of the SSV. """ self._objects = {} self._voxels = None self._voxels_xy_downsampled = None self._views = None self._sample_locations = None self._meshes = {"sv": None, "sj": None, "syn_ssv": None, "vc": None, "mi": None, "conn": None, "syn_ssv_sym": None, "syn_ssv_asym": None} self.skeleton = None
[docs] def preprocess(self): """ Processes object mapping (requires prior assignment of object candidates), caches object meshes, and calculates the SSV skeleton. Returns: None. This method performs preprocessing steps on the SSV. """ self.load_attr_dict() self._map_cellobjects() for sv_type in self.config['process_cell_organelles'] + ["sv", "syn_ssv"]: _ = self._load_obj_mesh(obj_type=sv_type, rewrite=False) self.calculate_skeleton()
[docs] def copy2dir(self, dest_dir: str, safe: bool = True): """ Copies the content at :py:attr:`~ssv_dir` to another directory. Example: To copy the data of this SSV object (``ssv_orig``) to another, yet not existing SSV (``ssv_target``), call ``ssv_orig.copy2dir(ssv_target.ssv_dir)``. All files contained in the directory py:attr:`~ssv_dir` of ``ssv_orig`` will be copied to ``ssv_target.ssv_dir``. Args: dest_dir: The destination directory where the SSV data will be copied. safe: If True, does not overwrite existing data. Returns: None. The method copies the SSV data to the specified directory. """ # get all files in home directory fps = get_filepaths_from_dir(self.ssv_dir, ending=["pkl", "k.zip"]) fnames = [os.path.split(fname)[1] for fname in fps] # Open the file and raise an exception if it exists if not os.path.isdir(dest_dir): os.makedirs(dest_dir) for i in range(len(fps)): src_filename = fps[i] dest_filename = dest_dir + "/" + fnames[i] try: safe_copy(src_filename, dest_filename, safe=safe) log_reps.debug("Copied %s to %s." % (src_filename, dest_filename)) except Exception as e: log_reps.error("Skipped '{}', due to the following error: '{}'" "".format(fnames[i], str(e))) pass self.load_attr_dict() if os.path.isfile(dest_dir + "/attr_dict.pkl"): dest_attr_dc = load_pkl2obj(dest_dir + "/attr_dict.pkl") else: dest_attr_dc = {} dest_attr_dc.update(self.attr_dict) write_obj2pkl(dest_dir + "/attr_dict.pkl", dest_attr_dc)
[docs] def partition_cc(self, max_nb_sv: Optional[int] = None, lo_first_n: Optional[int] = None) -> List[List[Any]]: """ Splits the supervoxel graph of this SSV into subgraphs. These subgraphs are defined based on the specified parameters, with default values generated from :py:attr:`~.config`. Args: max_nb_sv: The maximum number of supervoxels per subgraph. This defines the sub-graph context. lo_first_n: The number of first traversed nodes to exclude from new BFS traversals. This allows the partitioning of the original supervoxel graph of size `N` into ``N//lo_first_n`` sub-graphs. Returns: A list of lists, each sublist representing a partitioned subgraph. """ if lo_first_n is None: lo_first_n = self.config['glia']['subcc_chunk_size_big_ssv'] if max_nb_sv is None: max_nb_sv = self.config['glia']['subcc_size_big_ssv'] + 2 * (lo_first_n - 1) init_g = self.rag partitions = split_subcc_join(init_g, max_nb_sv, lo_first_n=lo_first_n) return partitions
# -------------------------------------------------------------------- VIEWS
[docs] def save_views(self, views: np.ndarray, view_key: str = "views"): """ Saves the view array to the SSV's view storage under the specified view key. However, this will only save views on SSV level and not for each individual SV. If the SSV version is 'tmp', a warning is logged and views are not saved to disk. Args: views: The view array. view_key: The key used for the look-up. Returns: None """ if self.version == 'tmp': log_reps.warning('"save_views" called but this SSV ' 'has version "tmp", views will' ' not be saved to disk.') return view_dc = CompressedStorage(self.view_path, read_only=False, disable_locking=not self.enable_locking) view_dc[view_key] = views view_dc.push()
[docs] def load_views(self, view_key: Optional[str] = None, woglia: bool = True, raw_only: bool = False, force_reload: bool = False, nb_cpus: Optional[int] = None, ignore_missing: bool = False, index_views: bool = False) -> np.ndarray: """ Loads views for the SuperSegmentationObject (SSV) either from the view dictionary, :py:attr:`~view_dict`, or from the view storage, :py:attr:`view_path`, given the key `view_key`. If the specified key does not exist at the SSV level or is None, it tries to load the views from the underlying SegmentationObjects. If views are cached and the key is present, they are returned directly. Args: view_key: The key used for the view retrieval. woglia: If True, loads views rendered from the glia-free agglomeration. raw_only: If True, only returns the cell shape channel in the views. force_reload: If True, forces the views to reload from the storage. nb_cpus: The number of CPUs to use for loading views in parallel. ignore_missing: If True, it will not raise KeyError if SegmentationObject is missing. index_views: If True, loads views that contain the indices of the vertices at the respective pixels, used for semantic label mapping onto mesh vertices. Returns: An array containing concatenated views for each SegmentationObject in self.svs with shape [N_LOCS, N_CH, N_VIEWS, X, Y]. """ if self.view_caching and view_key in self.view_dict: # self.view_dict stores list of views with length of sample_locations return self.view_dict[view_key] view_dc = CompressedStorage(self.view_path, read_only=True, disable_locking=not self.enable_locking) if view_key in view_dc and not force_reload: if self.view_caching: self.view_dict[view_key] = view_dc[view_key] return self.view_dict[view_key] return view_dc[view_key] del view_dc # delete previous initialized view dictionary params = [[sv, {'woglia': woglia, 'raw_only': raw_only, 'index_views': index_views, 'ignore_missing': ignore_missing, 'view_key': view_key}] for sv in self.svs] # load views from underlying SVs views = sm.start_multiprocess_obj("load_views", params, nb_cpus=self.nb_cpus if nb_cpus is None else nb_cpus) views = np.concatenate(views) # stores list of views with length of sample_locations if self.view_caching and view_key is not None: self.view_dict[view_key] = views return views
[docs] def view_existence(self, woglia: bool = True, index_views: bool = False, view_key: Optional[str] = None) -> List[bool]: """ Checks whether a specific set of views exists for this object. Args: woglia: If True, will load the views render from the glia-free agglomeration. index_views: Views which contain the indices of the vertices at the respective pixels. Used as look-up to map the predicted semantic labels onto the mesh vertices. view_key: The key used for the look-up. Returns: True if the specified views exist. """ view_paths = set([sv.view_path(woglia=woglia, index_views=index_views, view_key=view_key) for sv in self.svs]) cached_ids = [] for vp in view_paths: cached_ids += list(CompressedStorage(vp, disable_locking=True).keys()) cached_ids = set(cached_ids).intersection(self.sv_ids) so_views_exist = [svid in cached_ids for svid in self.sv_ids] return so_views_exist
[docs] def render_views(self, add_cellobjects: bool = False, verbose: bool = False, overwrite: bool = True, cellobjects_only: bool = False, woglia: bool = True, skip_indexviews: bool = False): """ Renders views for each SegmentationObject based on the context of the SSV and stores them at the SV level. Typically used for initial glia or axoness prediction, the results are distributed to each SegmentationObject of this object. Views are not cached in the view_dict or view_path of the SSV and are used during initial glia, compartment and cell type predictions. Refer to _render_rawviews for storing views in the SSV storage, which is used during GT generation. Args: add_cellobjects (bool): If True, adds cellular organelle channels to the 2D projection views. verbose (bool): If True, logs additional information. overwrite (bool): If True, re-renders views at all rendering locations. cellobjects_only (bool): If True, renders only cellular organelle channels (currently not used). woglia (bool): If True, loads views from the glia-free agglomeration. skip_indexviews (bool): If True, does not generate index views. Used for initial SSV glia-removal rendering. Returns: None """ # TODO: partial rendering currently does not support index view generation (-> vertex # indices will be different for each partial mesh) if len(self.sv_ids) > self.config['glia']['rendering_max_nb_sv'] and not woglia: if not skip_indexviews: raise ValueError('Index view rendering is currently not supported with partial ' 'cell rendering.') part = self.partition_cc() log_reps.info('Partitioned huge SSV into {} subgraphs with each {}' ' SVs.'.format(len(part), len(part[0]))) log_reps.info("Rendering SSO. {} SVs left to process" ".".format(len(self.sv_ids))) params = [[so.id for so in el] for el in part] params = chunkify(params, self.config.ngpu_total * 2) so_kwargs = {'version': self.svs[0].version, 'working_dir': self.working_dir, 'obj_type': self.svs[0].type} render_kwargs = {"overwrite": overwrite, 'woglia': woglia, "render_first_only": self.config['glia']['subcc_chunk_size_big_ssv'], 'add_cellobjects': add_cellobjects, "cellobjects_only": cellobjects_only, 'skip_indexviews': skip_indexviews} params = [[par, so_kwargs, render_kwargs] for par in params] qu.batchjob_script( params, "render_views_partial", suffix="_SSV{}".format(self.id), n_cores=self.config['ncores_per_node'] // self.config['ngpus_per_node'], remove_jobfolder=True, additional_flags="--gres=gpu:1") else: # render raw data rot_mat = render_sampled_sso( self, add_cellobjects=add_cellobjects, verbose=verbose, overwrite=overwrite, return_rot_mat=True, cellobjects_only=cellobjects_only, woglia=woglia) if skip_indexviews: return # render index views render_sampled_sso(self, verbose=verbose, overwrite=overwrite, index_views=True, rot_mat=rot_mat)
[docs] def render_indexviews(self, nb_views=2, save=True, force_recompute=False, verbose=False, view_key=None, ws=None, comp_window=None): """ Renders SSV raw views with a non-default number of views and stores them in the SSV view dictionary. Default raw/index/prediction views are stored decentralized in corresponding SVs. If the views already exist and recompute is not forced, they are returned directly. Otherwise, they are rendered and optionally saved. Args: nb_views (int): The number of views to render. save (bool): If True, saves the rendered views. force_recompute (bool): If True, forces re-rendering of the views. verbose (bool): If True, logs additional information. view_key (Optional[str]): The key used to store the view array. Default: 'index{}'.format(nb_views) ws (Tuple[int]): The window size in pixels for rendering. comp_window (float): The physical extent in nm of the view-window along the y-axis (see `ws` to infer pixel size). Returns: np.array: An array of the rendered views if save is False, otherwise None. """ if view_key is None: view_key = 'index{}'.format(nb_views) if not force_recompute: try: views = self.load_views(view_key) if not save: return views else: return except KeyError: pass locs = np.concatenate(self.sample_locations(cache=False)) if self._rot_mat is None: index_views, rot_mat = render_sso_coords_index_views( self, locs, nb_views=nb_views, verbose=verbose, return_rot_matrices=True, ws=ws, comp_window=comp_window) self._rot_mat = rot_mat else: index_views = render_sso_coords_index_views(self, locs, nb_views=nb_views, verbose=verbose, rot_mat=self._rot_mat, ws=ws, comp_window=comp_window) if self.view_caching: self.view_dict[view_key] = index_views if not save: return index_views self.save_views(index_views, view_key)
def _render_rawviews(self, nb_views=2, save=True, force_recompute=False, add_cellobjects=True, verbose=False, view_key=None, ws=None, comp_window=None): """ Renders raw views for SSV in case a non-default number of views is needed, and stores them in the SSV view dictionary. Default raw/index/prediction views are stored decentralized in corresponding SVs. If views already exist and recompute is not forced, they are directly returned. Otherwise, they are rendered and optionally saved. Args: nb_views (int): The number of views to render. save (bool): If True, saves the rendered views. force_recompute (bool): If True, forces re-rendering of the views. add_cellobjects (bool): If True, includes cellular organelle channels in the views. verbose (bool): If True, logs additional information. view_key (Optional[str]): Key used for storing view array. Default: 'raw{}'.format(nb_views) ws (Tuple[int]): Window size in pixels [y, x] for rendering. comp_window (float): Physical extent in nm of the view-window along y (see `ws` to infer pixel size). Returns: np.array: An array of rendered raw views if save is False, otherwise None. """ if view_key is None: view_key = 'raw{}'.format(nb_views) if not force_recompute: try: views = self.load_views(view_key) if not save: return views return except KeyError: pass locs = np.concatenate(self.sample_locations(cache=False)) if self._rot_mat is None: views, rot_mat = render_sso_coords(self, locs, verbose=verbose, ws=ws, add_cellobjects=add_cellobjects, comp_window=comp_window, nb_views=nb_views, return_rot_mat=True) self._rot_mat = rot_mat else: views = render_sso_coords(self, locs, verbose=verbose, ws=ws, add_cellobjects=add_cellobjects, comp_window=comp_window, nb_views=nb_views, rot_mat=self._rot_mat) if self.view_caching: self.view_dict[view_key] = views if save: self.save_views(views, view_key) else: return views
[docs] def predict_semseg(self, m, semseg_key, nb_views=None, verbose=False, raw_view_key=None, save=False, ws=None, comp_window=None, add_cellobjects: Union[bool, Iterable] = True, bs: int = 10): """ Generates label views based on an input model and stores it under the 'semseg_key'. If 'nb_views' or 'raw_view_key' are provided, it requires pre-rendered raw views. Otherwise, it uses default views stored at the SSV's SegmentationObjects. Args: semseg_key (str): Key under which the semantic segmentation results are stored. nb_views (Optional[int]): Number of views used for prediction (if non-default). k (int): An unspecified parameter. verbose (bool): If True, logs additional information. raw_view_key (str): Key used for storing raw view arrays. Default: 'raw{}'.format(nb_views). If key does not exist, views will be re-rendered with properties defined in config or as given in the kwargs `ws`, `nb_views`, and `comp_window` (if non-default). save (bool): If True, saves the predicted label views. ws (tuple[int]): Window size in pixels [y, x] for rendering (if non-default). comp_window (float): Physical extent in nm of the view-window along the y-axis (if non-default). add_cellobjects: Adds cell objects to views during rendering. Either bool or a list of structures used for rendering. Only used if `raw_view_key` or `nb_views` is None. bs: Batch size used during inference. Returns: None """ view_props_default = self.config['views']['view_properties'] if (nb_views is not None) or (raw_view_key is not None): # treat as special view rendering if nb_views is None: nb_views = view_props_default['nb_views'] if raw_view_key is None: raw_view_key = 'raw{}'.format(nb_views) if raw_view_key in self.view_dict: views = self.load_views(raw_view_key) else: self._render_rawviews(nb_views, ws=ws, comp_window=comp_window, save=save, view_key=raw_view_key, verbose=verbose, force_recompute=True, add_cellobjects=add_cellobjects) views = self.load_views(raw_view_key) if len(views) != len(np.concatenate(self.sample_locations(cache=False))): raise ValueError("Unequal number of views and redering locations.") labeled_views = ssh.predict_views_semseg(views, m, verbose=verbose, batch_size=bs) assert labeled_views.shape[2] == nb_views, \ "Predictions have wrong shape." if self.view_caching: self.view_dict[semseg_key] = labeled_views if save: self.save_views(labeled_views, semseg_key) else: # treat as default view rendering views = self.load_views() locs = self.sample_locations(cache=False) assert len(views) == len(np.concatenate(locs)), \ "Unequal number of views and rendering locations." # re-order number of views according to SV rendering locations # TODO: move view reordering to 'pred_svs_semseg', check other usages before! reordered_views = [] cumsum = np.cumsum([0] + [len(el) for el in locs]) for ii in range(len(locs)): sv_views = views[cumsum[ii]:cumsum[ii + 1]] reordered_views.append(sv_views) if self.version == 'tmp': log_reps.warning('"predict_semseg" called but this SSV ' 'has version "tmp", results will' ' not be saved to disk.') ssh.pred_svs_semseg(m, reordered_views, semseg_key, self.svs, nb_cpus=self.nb_cpus, verbose=verbose, return_pred=self.version == 'tmp', bs=bs) # do not write to disk
[docs] def semseg2mesh(self, semseg_key: str, dest_path: Optional[str] = None, nb_views: Optional[int] = None, k: int = 1, force_recompute: bool = False, index_view_key: Optional[str] = None): """ Generates vertex labels from semantic segmentation results and stores them in the SSV's label storage. Optionally, stores the labeled mesh as a .ply file in a k.zip at the given path. Examples: Default situation: ``semseg_key = 'spiness'``, ``nb_views=None`` This will load the index and label views stored at the SSV's SVs. Non-default: ``semseg_key = 'spiness4'``, ``nb_views=4`` This requires to run ``self._render_rawviews(nb_views=4)``, ``self.render_indexviews(nb_views=4)`` and ``predict_semseg(MODEL, 'spiness4', nb_views=4)``. This method then has to be called like: ``self.semseg2mesh('spiness4', nb_views=4)`` Args: semseg_key: The key used to retrieve semantic segmentation results. dest_path: The path where the labeled mesh will be stored as a .ply in a k.zip. nb_views: The number of views used for generating the segmentation (if non-default). k: The number of nearest vertices to average over for smoothing the segmentation. If k=0 unpredicted vertices will be treated as 'unpredicted' class. force_recompute: If True, forces re-computation of the vertex labels. index_view_key: The key used to retrieve index views (if non-default). Returns: None """ # colors are only needed if dest_path is given # (last two colors correspond to background and undpredicted vertices (k=0)) cols = None if dest_path is not None: if 'spiness' in semseg_key or 'dnho' in semseg_key or 'do' in semseg_key: cols = np.array([[0.6, 0.6, 0.6, 1], [0.9, 0.2, 0.2, 1], [0.1, 0.1, 0.1, 1], [0.05, 0.6, 0.6, 1], [0.9, 0.9, 0.9, 1], [0.1, 0.1, 0.9, 1]]) elif 'axon' in semseg_key: # cols = np.array([[0.6, 0.6, 0.6, 1], [0.9, 0.2, 0.2, 1], # [0.1, 0.1, 0.1, 1], [0.9, 0.9, 0.9, 1], # [0.1, 0.1, 0.9, 1]]) # dendrite, axon, soma, bouton, terminal, background, unpredicted cols = np.array([[0.6, 0.6, 0.6, 1], [0.9, 0.2, 0.2, 1], [0.1, 0.1, 0.1, 1], [0.05, 0.6, 0.6, 1], [0.8, 0.8, 0.1, 1], [0.9, 0.9, 0.9, 1], [0.1, 0.1, 0.9, 1]]) elif 'ads' in semseg_key: # dendrite, axon, soma, unpredicted cols = np.array([[0.6, 0.6, 0.6, 1], [0.9, 0.2, 0.2, 1], [0.1, 0.1, 0.1, 1], [0.1, 0.1, 0.9, 1]]) elif 'abt' in semseg_key: # axon, bouton, terminal, unpredicted cols = np.array([[0.9, 0.2, 0.2, 1], [0.05, 0.6, 0.6, 1], [0.8, 0.8, 0.1, 1], [0.1, 0.1, 0.9, 1]]) elif 'dnh' in semseg_key: # dendrite, neck, head, unpredicted cols = np.array([[0.6, 0.6, 0.6, 1], [0.1, 0.1, 0.1, 1], [0.9, 0.2, 0.2, 1], [0.1, 0.1, 0.9, 1]]) elif '3models' in semseg_key or 'dasbt' in semseg_key: # dendrite, axon, soma, bouton, terminal, neck, head, unpredicted cols = np.array([[0.6, 0.6, 0.6, 1], [0.6, 0.1, 0.1, 1], [0.1, 0.1, 0.1, 1], [0.05, 0.6, 0.6, 1], [0.4, 0.4, 0.8, 1], [0.8, 0.8, 0.1, 1], [0.9, 0.4, 0.4, 1], [0.1, 0.1, 0.9, 1]]) else: raise ValueError('Semantic segmentation of "{}" is not supported.' ''.format(semseg_key)) cols = (cols * 255).astype(np.uint8) return ssh.semseg2mesh(self, semseg_key, nb_views, dest_path, k, cols, force_recompute=force_recompute, index_view_key=index_view_key)
[docs] def semseg_for_coords(self, coords: np.ndarray, semseg_key: str, k: int = 5, ds_vertices: int = 20, ignore_labels: Optional[Iterable[int]] = None): """ Retrieves the semantic segmentation with key `semseg_key` from the `k` nearest vertices at every coordinate in `coords`. Args: coords: np.array Voxel coordinates, unscaled! [N, 3] semseg_key: str The key of the semantic segmentation to use. k: int Number of nearest neighbors (NN) during k-NN classification. ds_vertices: int Striding factor for vertices. Uses ``max(1, ds_vertices // 10)`` if ``len(vertices) < 5e6``. ignore_labels: List[int] Vertices with labels in `ignore_labels` will be ignored during majority vote, e.g., used to exclude unpredicted vertices. Returns: np.array An array of the same length as `coords`. For every coordinate in `coords` returns the majority label based on its `k`-nearest neighbors. """ # TODO: Allow multiple keys as in self.attr_for_coords, e.g. to # include semseg axoness in a single query if ignore_labels is None: ignore_labels = [] coords = np.array(coords) * self.scaling vertices = self.mesh[1].reshape((-1, 3)) if len(vertices) == 0: return np.zeros((0, ), dtype=np.int32) if len(vertices) < 5e6: ds_vertices = max(1, ds_vertices // 10) vertex_labels = self.label_dict('vertex')[semseg_key][::ds_vertices] if np.ndim(vertex_labels) == 2: vertex_labels = vertex_labels.squeeze(1) vertices = vertices[::ds_vertices] for ign_l in ignore_labels: vertices = vertices[vertex_labels != ign_l] vertex_labels = vertex_labels[vertex_labels != ign_l] if len(vertex_labels) != len(vertices): raise ValueError('Size of vertices and their labels does not match!') if len(vertices) < k: log_reps.warning(f'Number of vertices ({len(vertices)}) is less than the given ' f'value of k ({k}). Setting k to lower value.') k = len(vertices) maj_vote = colorcode_vertices(coords, vertices, vertex_labels, k=k, return_color=False, nb_cpus=self.nb_cpus) return maj_vote
[docs] def get_spine_compartments(self, semseg_key: str = 'spiness', k: int = 1, min_spine_cc_size: Optional[int] = None, dest_folder: Optional[str] = None) \ -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Retrieves connected components of vertex spine predictions using a specified semantic segmentation key and k number of nearest neighbors for majority label vote. If a connected component has a minimum number of vertices specified by min_spine_cc_size, it's considered valid. If dest_folder is specified, the mean location and size of the head and neck components are stored as numpy array files in this folder. Args: semseg_key: The key of the semantic segmentation used for spine prediction. k: Number of nearest neighbors for smoothing of classification results. min_spine_cc_size: Minimum number of vertices to consider a connected component a valid object. dest_folder: Default is None, else provide a path (str) to a folder. The mean location and size of the head and neck connected components will be stored as numpy array file (npy). Returns: A tuple containing arrays of neck locations, neck sizes, head locations, and head sizes. Location and size arrays have the same ordering. """ if min_spine_cc_size is None: min_spine_cc_size = self.config['spines']['min_spine_cc_size'] vertex_labels = self.label_dict('vertex')[semseg_key] vertices = self.mesh[1].reshape((-1, 3)) max_dist = self.config['spines']['min_edge_dist_spine_graph'] g = create_graph_from_coords(vertices, force_single_cc=True, max_dist=max_dist) g_orig = g.copy() for e in g_orig.edges(): l0 = vertex_labels[e[0]] l1 = vertex_labels[e[1]] if l0 != l1: g.remove_edge(e[0], e[1]) log_reps.info("Starting connected components for SSV {}." "".format(self.id)) all_ccs = list(sorted(nx.connected_components(g), key=len, reverse=True)) log_reps.info("Finished connected components for SSV {}." "".format(self.id)) sizes = np.array([len(c) for c in all_ccs]) thresh_ix = np.argmax(sizes < min_spine_cc_size) all_ccs = all_ccs[:thresh_ix] sizes = sizes[:thresh_ix] cc_labels = [] cc_coords = [] for c in all_ccs: curr_v_ixs = list(c) curr_v_l = vertex_labels[curr_v_ixs] curr_v_c = vertices[curr_v_ixs] if len(np.unique(curr_v_l)) != 1: msg = '"get_spine_compartments": Connected component ' \ 'contains multiple labels.' log_reps.error(msg) raise ValueError(msg) cc_labels.append(curr_v_l[0]) cc_coords.append(np.mean(curr_v_c, axis=0)) cc_labels = np.array(cc_labels) cc_coords = np.array(cc_coords) np.random.seed(0) neck_c = (cc_coords[cc_labels == 0] / self.scaling).astype(np.uint64) neck_s = sizes[cc_labels == 0] head_c = (cc_coords[cc_labels == 1] / self.scaling).astype(np.uint64) head_s = sizes[cc_labels == 1] if dest_folder is not None: np.save("{}/neck_coords_ssv{}_k{}_{}_ccsize{}.npy".format( dest_folder, self.id, k, semseg_key, min_spine_cc_size), neck_c) np.save("{}/head_coords_ssv{}_k{}_{}_ccsize{}.npy".format( dest_folder, self.id, k, semseg_key, min_spine_cc_size), head_c) return neck_c, neck_s, head_c, head_s
[docs] def sample_locations(self, force=False, cache=True, verbose=False, ds_factor=None): """ Samples coordinates for rendering views for each SegmentationObject in the SSV. Optionally forces resampling and caches the locations. Args: force (bool): If True, forces resampling of locations. cache (bool): If True, saves the sampled locations in the SSV's attribute dictionary. verbose (bool): If True, logs additional information. ds_factor (float): The downscaling factor used to generate locations. Returns: list of array: Contains sampled coordinates for each SegmentationObject in the SSV. """ if self.version == 'tmp' and cache: cache = False if not force and self._sample_locations is not None: return self._sample_locations if not force: if self.attr_exists("sample_locations"): return self.attr_dict["sample_locations"] if verbose: start = time.time() params = [[sv, {"force": force, 'save': cache, 'ds_factor': ds_factor}] for sv in self.svs] # list of arrays # TODO: currently does not support multiprocessing locs = sm.start_multiprocess_obj("sample_locations", params, nb_cpus=1) # self.nb_cpus) if cache: self.save_attributes(["sample_locations"], [locs]) if verbose: dur = time.time() - start log_reps.debug("Sampling locations from {} SVs took {:.2f}s." " {.4f}s/SV (incl. read/write)".format( len(self.sv_ids), dur, dur / len(self.sv_ids))) return locs
# ------------------------------------------------------------------ EXPORTS
[docs] def pklskel2kzip(self): """ Converts the skeleton stored in a pickle file to a KNOSSOS compatible k.zip file. """ self.load_skeleton() es = self.skeleton["edges"] ns = self.skeleton["nodes"] a = skeleton.SkeletonAnnotation() a.scaling = self.scaling a.comment = "skeleton" for e in es: n0 = skeleton.SkeletonNode().from_scratch(a, ns[e[0]][0], ns[e[0]][1], ns[e[0]][2]) n1 = skeleton.SkeletonNode().from_scratch(a, ns[e[1]][0], ns[e[1]][1], ns[e[1]][2]) a.addNode(n0) a.addNode(n1) a.addEdge(n0, n1) write_skeleton_kzip(self.skeleton_kzip_path, a)
[docs] def write_locations2kzip(self, dest_path: Optional[str] = None): """ Writes the sampled locations to a KNOSSOS annotation file within a k.zip file. Args: dest_path (Optional[str]): The destination path for the k.zip file. If None, the default skeleton k.zip path is used. If provided without '.k.zip' extension, it is appended. Returns: None """ if dest_path is None: dest_path = self.skeleton_kzip_path_views elif not dest_path.endswith('.k.zip'): dest_path += '.k.zip' loc = np.concatenate(self.sample_locations()) new_anno = coordpath2anno(loc, add_edges=False) new_anno.setComment("sample_locations") write_skeleton_kzip(dest_path, [new_anno])
[docs] def mergelist2kzip(self, dest_path: Optional[str] = None): """ Writes the mergelist to a text file within a k.zip file. Args: dest_path (Optional[str]): The destination path for the k.zip file. If None, the default skeleton k.zip path is used. Returns: None """ if len(self.attr_dict) == 0: self.load_attr_dict() kml = knossos_ml_from_sso(self) if dest_path is None: dest_path = self.skeleton_kzip_path write_txt2kzip(dest_path, kml, "mergelist.txt")
[docs] def mesh2kzip(self, dest_path: Optional[str] = None, obj_type: str = "sv", ext_color: Optional[np.ndarray] = None, **kwargs): """ Writes the mesh of a specified object type to a k.zip file as a .ply file. Args: dest_path (Optional[str]): The destination path for the k.zip file. If None, the default skeleton k.zip path is used. obj_type (str): The type of object to retrieve the mesh for. Options include 'sv' for cell surface, 'mi' for mitochondria, 'vc' for vesicle clouds, and 'sj' for synaptic junctions. ext_color (Optional[np.ndarray]): An optional color array to apply to the mesh. If scalar, it must be an integer between 0 and 255. If array, it must be of type uint/int and of shape (N, 4) where N is the number of vertices of the SSV cell surface mesh: N = len(self.mesh[1].reshape((-1, 3))). Returns: None """ color = None if dest_path is None: dest_path = self.skeleton_kzip_path # TODO: revisit re-definition of `obj_type` to 'sj'. if obj_type == "syn_ssv": mesh = self.syn_ssv_mesh # also store it as 'sj' s.t. `init_sso_from_kzip` can use it for rendering. # TODO: add option to rendering code which enables rendering of arbitrary cell organelles obj_type = 'sj' else: mesh = self.load_mesh(obj_type) if ext_color is not None: if type(ext_color) is list: ext_color = np.array(ext_color) if np.isscalar(ext_color) and ext_color == 0: color = None elif np.isscalar(ext_color): color = ext_color elif type(ext_color) is np.ndarray: if ext_color.ndim != 2: msg = "'ext_color' is numpy array of dimension {}." \ " Only 2D arrays are allowed.".format(ext_color.ndim) log_reps.error(msg) raise ValueError(msg) if ext_color.shape[1] == 3: # add alpha channel alpha_sh = (len(ext_color), 1) alpha_arr = (np.ones(alpha_sh) * 255).astype(ext_color.dtype) ext_color = np.concatenate([ext_color, alpha_arr], axis=1) color = ext_color.flatten() write_mesh2kzip(dest_path, mesh[0], mesh[1], mesh[2], color, ply_fname=obj_type + ".ply", **kwargs)
[docs] def meshes2kzip(self, dest_path: Optional[str] = None, sv_color: Optional[np.ndarray] = None, synssv_instead_sj: bool = True, object_types: Optional[List[str]] = None, **kwargs): """ Writes multiple object meshes including SV, mito, vesicle cloud, and synaptic junction to a k.zip file. Args: dest_path (Optional[str]): The destination path for the k.zip file. If not provided, the default path is used. sv_color (Optional[np.ndarray]): An optional color array with RGBA values to apply to the supervoxel mesh. If not provided, default values are used (see :func:`~mesh2kzip`). synssv_instead_sj (bool): If True, uses 'syn_ssv' objects instead of 'sj' for synaptic junctions. object_types (Optional[List[str]]): A list of object types to export. Defaults to ['sj', 'vc', 'mi', 'sv']. Returns: None """ if dest_path is None: dest_path = self.skeleton_kzip_path if object_types is None: object_types = ["sj", "vc", "mi", "sv"] for ot in object_types: # determines rendering order in KNOSSOS if ot == "sj" and synssv_instead_sj: ot = 'syn_ssv' self.mesh2kzip(obj_type=ot, dest_path=dest_path, ext_color=sv_color if ot == "sv" else None, **kwargs)
[docs] def mesh2file(self, dest_path=None, center=None, color=None, scale=None, obj_type='sv'): """ Writes a mesh to a file (e.g. .ply, .stl, .obj) using the 'openmesh' library. If possible, the function writes it in binary format. Args: dest_path (str): The destination path for the mesh file. center (Optional[np.ndarray]): Scaled center coordinates in nanometers. color (Optional[np.ndarray]): Either a single color (1D; will be applied to all vertices) or a per-vertex color array (2D). scale (Optional[float]): A scaling factor to apply to vertex locations after centering. obj_type (str): Defines the object type used for loading the mesh with :func:`~load_mesh`. Returns: None """ mesh2obj_file(dest_path, self.load_mesh(obj_type), center=center, color=color, scale=scale)
[docs] def export2kzip(self, dest_path: str, attr_keys: Iterable[str] = ('skeleton',), rag: Optional[nx.Graph] = None, sv_color: Optional[np.ndarray] = None, individual_sv_meshes: bool = True, object_meshes: Optional[tuple] = None, synssv_instead_sj: bool = True): """ Writes the SuperSegmentationObject to a KNOSSOS loadable k.zip file. This includes the mergelist, its meshes, dataset specific information and extra data (`attr_keys`). The range of values for the exported object lies within 0 to 255. The saved SSO can also be reloaded as an SSO instance. Todo: * Switch to .json format for storing meta information. Notes: Will not invoke :func:`~load_attr_dict`. Args: dest_path (str): The destination path for the k.zip file. attr_keys (Iterable[str]): Currently allowed: 'sample_locations', 'skeleton', 'attr_dict', 'rag'. rag (Optional[nx.Graph]): The region adjacency graph of the SuperSegmentationObject. sv_color (Optional[np.ndarray]): Supervoxel colors. Array with RGBA (0...255) values or None to use default values. individual_sv_meshes (bool): If True, exports individual supervoxel meshes. object_meshes (Optional[tuple]): Default to subcellular organelles defined in config.yml ('process_cell_organelles'). synssv_instead_sj (bool): If True, uses 'syn_ssv' objects instead of 'sj'. Returns: None """ # # The next two calls are deprecated but might be useful at some point # self.save_skeleton_to_kzip(dest_path=dest_path) # self.save_objects_to_kzip_sparse(["mi", "sj", "vc"], # dest_path=dest_path) if not dest_path.endswith('.k.zip'): dest_path += '.k.zip' if os.path.isfile(dest_path): raise FileExistsError(f'k.zip file already exists at "{dest_path}".') tmp_dest_p = [] target_fnames = [] attr_keys = list(attr_keys) if 'rag' in attr_keys: if rag is None and not os.path.isfile(self.edgelist_path): log_reps.warn("Could not find SV graph of SSV {}. Please" " pass `sv_graph` as kwarg.".format(self)) else: tmp_dest_p.append('{}_rag.bz2'.format(dest_path)) target_fnames.append('rag.bz2') if rag is None: rag = self.sv_graph_uint nx.write_edgelist(rag, tmp_dest_p[-1]) attr_keys.remove('rag') if object_meshes is None: object_meshes = list(self.config['process_cell_organelles']) + ['sv', 'syn_ssv'] else: object_meshes = list(object_meshes) allowed_attributes = ('sample_locations', 'skeleton', 'attr_dict') for attr in attr_keys: if attr not in allowed_attributes: raise ValueError('Invalid attribute specified. Currently suppor' 'ted attributes for export: {}'.format(allowed_attributes)) if attr == 'skeleton' and self.skeleton is None: self.load_skeleton() tmp_dest_p.append('{}_{}.pkl'.format(dest_path, attr)) target_fnames.append('{}.pkl'.format(attr)) sso_attr = getattr(self, attr) if hasattr(sso_attr, '__call__'): sso_attr = sso_attr() write_obj2pkl(tmp_dest_p[-1], sso_attr) # always write meta dict tmp_dest_p.append('{}_{}.pkl'.format(dest_path, 'meta')) target_fnames.append('{}.pkl'.format('meta')) write_obj2pkl(tmp_dest_p[-1], {'version_dict': self.version_dict, 'scaling': self.scaling, 'working_dir': self.working_dir, 'sso_id': self.id}) # write all data data2kzip(dest_path, tmp_dest_p, target_fnames) if individual_sv_meshes and 'sv' in object_meshes: object_meshes.remove('sv') self.write_svmeshes2kzip(dest_path, force_overwrite=False) self.meshes2kzip(dest_path=dest_path, sv_color=sv_color, force_overwrite=False, synssv_instead_sj=synssv_instead_sj, object_types=object_meshes) self.mergelist2kzip(dest_path=dest_path) if 'skeleton' in attr_keys: self.save_skeleton_to_kzip(dest_path=dest_path)
[docs] def typedsyns2mesh(self, dest_path: Optional[str] = None, rewrite: bool = False): """ Generates typed meshes for 'syn_ssv' and saves them at :py:attr:`~mesh_dc_path` (keys: ``'syn_ssv_sym'`` and ``'syn_ssv_asym'``), and writes them to an optional `dest_path` (if provided). The meshes can be accessed with the respective keys via :py:attr:`~load_mesh`. Synapse types are identified in the 'syn_ssv' AttributeDicts and handled as follows: * excitatory / asymmetric: 1 * inhibitory / symmetric: -1 Args: dest_path (Optional[str]): An optional output path for the synapse meshes. rewrite (bool): If set to True, disregards existing meshes in :py:attr:`~_meshes` or at :py:attr:`~mesh_dc_path` and overwrites them. Returns: None """ if not rewrite and self.mesh_exists('syn_ssv_sym') and self.mesh_exists('syn_ssv_asym') \ and not self.version == "tmp": return syn_signs = load_so_attr_bulk(self.syn_ssv, 'syn_sign', use_new_subfold=self.config.use_new_subfold) sym_syns = [] asym_syns = [] for syn in self.syn_ssv: syn_sign = syn_signs[syn.id] if syn_sign == -1: sym_syns.append(syn) elif syn_sign == 1: asym_syns.append(syn) else: raise ValueError(f'Unknown synapse sign {syn_sign}.') sym_syn_mesh = list(merge_someshes(sym_syns, use_new_subfold=self.config.use_new_subfold)) asym_syn_mesh = list(merge_someshes(asym_syns, use_new_subfold=self.config.use_new_subfold)) if self.version is not "tmp": mesh_dc = MeshStorage(self.mesh_dc_path, read_only=False, disable_locking=not self.enable_locking) mesh_dc['syn_ssv_sym'] = sym_syn_mesh mesh_dc['syn_ssv_asym'] = asym_syn_mesh mesh_dc.push() self._meshes['syn_ssv_sym'] = sym_syn_mesh self._meshes['syn_ssv_asym'] = asym_syn_mesh if dest_path is None: return # TODO: add appropriate ply fname and/or comment write_mesh2kzip(dest_path, asym_syn_mesh[0], asym_syn_mesh[1], asym_syn_mesh[2], color=np.array((240, 50, 50, 255)), ply_fname='10.ply') write_mesh2kzip(dest_path, sym_syn_mesh[0], sym_syn_mesh[1], sym_syn_mesh[2], color=np.array((50, 50, 240, 255)), ply_fname='11.ply')
[docs] def write_svmeshes2kzip(self, dest_path: Optional[str] = None, **kwargs): """ Writes individual cell supervoxel ('sv') meshes in ply format to a k.zip file. Args: dest_path (Optional[str]): Target file name for the k.zip file. Returns: None """ if dest_path is None: dest_path = self.skeleton_kzip_path inds, verts, norms, cols, ply_fnames = [], [], [], [], [] for sv in self.svs: inds.append(sv.mesh[0]) verts.append(sv.mesh[1]) norms.append(sv.mesh[2]) cols.append(None) ply_fnames.append(f"sv_{sv.id}.ply") write_meshes2kzip(dest_path, inds, verts, norms, cols, ply_fnames=ply_fnames, **kwargs)
def _svattr2mesh(self, dest_path, attr_key, cmap, normalize_vals=False): """ Writes a mesh colored by a supervoxel attribute to a k.zip file. Args: dest_path (str): The destination path for the k.zip file. attr_key (str): The key of the supervoxel attribute to use for coloring. cmap: A colormap to apply to the attribute values. normalize_vals (bool): If True, normalizes the attribute values before coloring. Returns: None """ sv_attrs = np.array([sv.lookup_in_attribute_dict(attr_key).squeeze() for sv in self.svs]) if normalize_vals: min_val = sv_attrs.min() sv_attrs -= min_val sv_attrs /= sv_attrs.max() ind, vert, norm, col = merge_someshes(self.svs, color_vals=sv_attrs, cmap=cmap, use_new_subfold=self.config.use_new_subfold) write_mesh2kzip(dest_path, ind, vert, norm, col, "%s.ply" % attr_key)
[docs] def svprobas2mergelist(self, key="glia_probas", dest_path=None): """ Writes the supervoxel probabilities to a mergelist within a k.zip file. Args: key (str): The key of the supervoxel probabilities to write. dest_path (Optional[str]): The destination path for the k.zip file. Returns: None """ if dest_path is None: dest_path = self.skeleton_kzip_path coords = np.array([sv.rep_coord for sv in self.svs]) sv_comments = ["%s; %s" % (str(np.mean(sv.attr_dict[key], axis=0)), str(sv.attr_dict[key]).replace('\n', '')) for sv in self.svs] kml = knossos_ml_from_svixs([sv.id for sv in self.svs], coords, comments=sv_comments) write_txt2kzip(dest_path, kml, "mergelist.txt")
def _pred2mesh(self, pred_coords: np.ndarray, preds: np.ndarray, ply_fname: Optional[str] = None, dest_path: Optional[str] = None, colors: Optional[Union[tuple, np.ndarray, list]] = None, k: int = 1, **kwargs): """ Writes a mesh with predictions to a k.zip file or returns the mesh components. If either `dest_path` or `ply_fname` is None, the indices, vertices, and colors are returned. Otherwise, the Mesh is written to a k.zip file as specified. Args: pred_coords (np.ndarray): The coordinates of the predictions, scaled to nanometers (N x 3). preds (np.ndarray): The label array for the predictions (N x 1). ply_fname (Optional[str]): The file name for the .ply file. dest_path (Optional[str]): The destination path for the k.zip file. colors (Optional[Union[tuple, np.ndarray, list]]): Color for each possible prediction value (range(np.max(preds))). k (int): The number of nearest neighbors for averaging predictions. **kwargs: Keyword arguments passed to `colorcode_vertices`. Returns: None or a tuple of mesh components (indices, vertices, colors) i.e. [np.array, np.array, np.array]. """ if ply_fname is not None and not ply_fname.endswith(".ply"): ply_fname += ".ply" if dest_path is not None and ply_fname is None: msg = "Specify 'ply_fanme' in order to save colored " \ "mesh to k.zip." log_reps.error(msg) raise ValueError(msg) mesh = self.mesh col = colorcode_vertices(mesh[1].reshape((-1, 3)), pred_coords, preds, colors=colors, k=k, **kwargs) if dest_path is None: return mesh[0], mesh[1], col else: write_mesh2kzip(dest_path, mesh[0], mesh[1], mesh[2], col, ply_fname=ply_fname) # --------------------------------------------------------------------- GLIA
[docs] def gliaprobas2mesh(self, dest_path=None, pred_key_appendix=""): """ Writes glia probabilities as a mesh to a k.zip file. Args: dest_path (Optional[str]): The destination path for the k.zip file. pred_key_appendix (str): An appendix to the prediction key. Returns: None """ if dest_path is None: dest_path = self.skeleton_kzip_path_views import seaborn as sns mcmp = sns.diverging_palette(250, 15, s=99, l=60, center="dark", as_cmap=True) self._svattr2mesh(dest_path, "glia_probas" + pred_key_appendix, cmap=mcmp)
[docs] def gliapred2mesh(self, dest_path=None, thresh=None, pred_key_appendix=""): """ Writes glia predictions as a mesh to a k.zip file. Args: dest_path (Optional[str]): The destination path for the k.zip file. thresh (Optional[float]): The threshold for classifying glia. pred_key_appendix (str): An appendix to the prediction key. Returns: None """ if thresh is None: thresh = self.config['glia']['glia_thresh'] astrocyte_svs = [sv for sv in self.svs if sv.glia_pred(thresh, pred_key_appendix) == 1] nonastrocyte_svs = [sv for sv in self.svs if sv.glia_pred(thresh, pred_key_appendix) == 0] if dest_path is None: dest_path = self.skeleton_kzip_path_views mesh = merge_someshes(astrocyte_svs, use_new_subfold=self.config.use_new_subfold) neuron_mesh = merge_someshes(nonastrocyte_svs, use_new_subfold=self.config.use_new_subfold) write_meshes2kzip(dest_path, [mesh[0], neuron_mesh[0]], [mesh[1], neuron_mesh[1]], [mesh[2], neuron_mesh[2]], [None, None], ["glia_%0.2f.ply" % thresh, "nonglia_%0.2f.ply" % thresh])
[docs] def gliapred2mergelist(self, dest_path=None, thresh=None, pred_key_appendix=""): """ Writes glia predictions to a mergelist within a k.zip file. Args: dest_path (Optional[str]): The destination path for the k.zip file. thresh (Optional[float]): The threshold for classifying glia. pred_key_appendix (str): An appendix to the prediction key. Returns: None """ if thresh is None: thresh = self.config['glia']['glia_thresh'] if dest_path is None: dest_path = self.skeleton_kzip_path_views params = [[sv, ] for sv in self.svs] coords = sm.start_multiprocess_obj("rep_coord", params, nb_cpus=self.nb_cpus) coords = np.array(coords) params = [[sv, {"thresh": thresh, "pred_key_appendix": pred_key_appendix}] for sv in self.svs] glia_preds = sm.start_multiprocess_obj("glia_pred", params, nb_cpus=self.nb_cpus) glia_preds = np.array(glia_preds) glia_comments = ["%0.4f" % gp for gp in glia_preds] kml = knossos_ml_from_svixs([sv.id for sv in self.svs], coords, comments=glia_comments) write_txt2kzip(dest_path, kml, "mergelist.txt")
[docs] def gliasplit(self, recompute=False, thresh=None, verbose=False, pred_key_appendix=""): """ Splits the supervoxels into glia and non-glia based on predictions. Args: recompute (bool): If True, recomputes the split even if it already exists. thresh (Optional[float]): The threshold for classifying glia. verbose (bool): If True, logs additional information. pred_key_appendix (str): An appendix to the prediction key. Returns: None """ astrocyte_svs_key = "astrocyte_svs" + pred_key_appendix neuron_svs_key = "neuron_svs" + pred_key_appendix if thresh is None: thresh = self.config['glia']['glia_thresh'] if recompute or not (self.attr_exists(astrocyte_svs_key) and self.attr_exists(neuron_svs_key)): if verbose: log_reps.debug("Splitting glia in SSV {} with {} SV's.".format( self.id, len(self.sv_ids))) start = time.time() nonglia_ccs, astrocyte_ccs = split_glia(self, thresh=thresh, pred_key_appendix=pred_key_appendix) if verbose: log_reps.debug("Splitting glia in SSV %d with %d SV's finished " "after %.4gs." % (self.id, len(self.sv_ids), time.time() - start)) non_glia_ccs_ixs = [[so.id for so in nonglia] for nonglia in nonglia_ccs] astrocyte_ccs_ixs = [[so.id for so in glia] for glia in astrocyte_ccs] self.attr_dict[astrocyte_svs_key] = astrocyte_ccs_ixs self.attr_dict[neuron_svs_key] = non_glia_ccs_ixs self.save_attributes([astrocyte_svs_key, neuron_svs_key], [astrocyte_ccs_ixs, non_glia_ccs_ixs]) else: log_reps.critical('Skipping SSO {}, glia splits already exist' '.'.format(self.id))
[docs] def gliasplit2mesh(self, dest_path=None, pred_key_appendix=""): """ Writes the result of glia splitting to meshes in a k.zip file. Args: dest_path (Optional[str]): The destination path for the k.zip file. pred_key_appendix (str): An appendix to the prediction key. Returns: None """ # TODO: adapt writemesh2kzip to work with multiple writes # to same file or use write_meshes2kzip here. astrocyte_svs_key = "astrocyte_svs" + pred_key_appendix neuron_svs_key = "neuron_svs" + pred_key_appendix if dest_path is None: dest_path = self.skeleton_kzip_path_views # write meshes of CC's astrocyte_ccs = self.attr_dict[astrocyte_svs_key] for kk, astrocyte in enumerate(astrocyte_ccs): mesh = merge_someshes([self.get_seg_obj("sv", ix) for ix in astrocyte], use_new_subfold=self.config.use_new_subfold) write_mesh2kzip(dest_path, mesh[0], mesh[1], mesh[2], None, "astrocyte_cc%d.ply" % kk) non_glia_ccs = self.attr_dict[neuron_svs_key] for kk, nonglia in enumerate(non_glia_ccs): mesh = merge_someshes([self.get_seg_obj("sv", ix) for ix in nonglia], use_new_subfold=self.config.use_new_subfold) write_mesh2kzip(dest_path, mesh[0], mesh[1], mesh[2], None, "nonglia_cc%d.ply" % kk)
[docs] def morphembed2mesh(self, dest_path, pred_key='latent_morph', whiten=True): """ Writes a morphology embedding as RGB to a k.zip file. Args: dest_path (str): The destination path for the k.zip file. pred_key (str): The key for the morphology embedding. whiten (bool): If True, applies whitening to the embedding before coloring. Returns: None """ if self.skeleton is None: self.load_skeleton() d = np.array(self.skeleton[pred_key]) if whiten: d -= d.mean(axis=0) eig = _calc_pca_components(d) d_transf = np.dot(d, eig[:, :3]) d_transf -= d_transf.min(axis=0) d_transf /= d_transf.max(axis=0) vert_col = colorcode_vertices(self.mesh[1].reshape((-1, 3)), self.skeleton['nodes'] * self.scaling, np.arange(len(self.skeleton['nodes'])), normalize_img(d_transf)) self.mesh2kzip(dest_path, ext_color=vert_col)
[docs] def write_gliapred_cnn(self, dest_path=None): """ Writes glia predictions from a CNN to a k.zip file. Args: dest_path (Optional[str]): The destination path for the k.zip file. Returns: None """ if dest_path is None: dest_path = self.skeleton_kzip_path_views skel = load_skeleton_kzip(self.skeleton_kzip_path_views)[ "sample_locations"] n_nodes = [n for n in skel.getNodes()] pred_coords = [n.getCoordinate() * np.array(self.scaling) for n in n_nodes] preds = [int(n.data["glia_pred"]) for n in n_nodes] self._pred2mesh(pred_coords, preds, "gliapred.ply", dest_path=dest_path, colors=[[11, 129, 220, 255], [218, 73, 58, 255]])
[docs] def predict_views_gliaSV(self, model, verbose=True, pred_key_appendix=""): """ Predicts glia views for supervoxels using a given model. Args: model: The model used for prediction. verbose (bool): If True, logs additional information. pred_key_appendix (str): An appendix to the prediction key. Returns: None """ if self.version == 'tmp': log_reps.warning('"predict_views_gliaSV" called but this SSV ' 'has version "tmp", results will' ' not be saved to disk.') start = time.time() pred_key = "glia_probas" pred_key += pred_key_appendix # 'tmp'-version: do not write to disk predict_sos_views(model, self.svs, pred_key, nb_cpus=self.nb_cpus, verbose=verbose, woglia=False, raw_only=True, return_proba=self.version == 'tmp') end = time.time() log_reps.debug("Prediction of %d SV's took %0.2fs (incl. read/write). " "%0.4fs/SV" % (len(self.sv_ids), end - start, float(end - start) / len(self.sv_ids)))
# ------------------------------------------------------------------ AXONESS def _load_skelfeatures(self, key): """ Loads skeleton features from the skeleton dictionary. Args: key (str): The key of the skeleton features to load. Returns: The skeleton features if they exist, otherwise None. """ if not self.skeleton: self.load_skeleton() assert self.skeleton is not None, "Skeleton does not exist." if key in self.skeleton: assert len(self.skeleton["nodes"]) == len(self.skeleton[key]), \ "Length of skeleton features is not equal to number of nodes." return self.skeleton[key] else: return None def _save_skelfeatures(self, k, features, overwrite=False): """ Saves skeleton features to the skeleton dictionary. Args: k (str): The key under which to save the features. features: The features to save. overwrite (bool): If True, overwrites existing features with the same key. Returns: None """ if not self.skeleton: self.load_skeleton() assert self.skeleton is not None, "Skeleton does not exist." if k in self.skeleton and not overwrite: raise ValueError("Key {} already exists in skeleton" " feature dict.".format(k)) self.skeleton[k] = features assert len(self.skeleton["nodes"]) == len(self.skeleton[k]), \ "Length of skeleton features is not equal to number of nodes." self.save_skeleton()
[docs] def write_axpred_rfc(self, dest_path=None, k=1): """ Writes axon predictions from an RFC to a k.zip file. Args: dest_path (Optional[str]): The destination path for the k.zip file. k (int): The number of nearest neighbors for averaging predictions. Returns: None """ if dest_path is None: dest_path = self.skeleton_kzip_path if self.load_skeleton(): if not "axoness" in self.skeleton: return False axoness = self.skeleton["axoness"].copy() axoness[self.skeleton["axoness"] == 1] = 0 axoness[self.skeleton["axoness"] == 0] = 1 self._pred2mesh(self.skeleton["nodes"] * self.scaling, axoness, k=k, dest_path=dest_path)
[docs] def skelproperty2mesh(self, key, dest_path=None, k=1): """ Writes a skeleton property to a mesh in a k.zip file. Args: key (str): The key of the skeleton property to write. dest_path (Optional[str]): The destination path for the k.zip file. k (int): The number of nearest neighbors for averaging predictions. Returns: None """ if self.skeleton is None: self.load_skeleton() if dest_path is None: dest_path = self.skeleton_kzip_path self._pred2mesh(self.skeleton["nodes"] * self.scaling, self.skeleton[key], k=k, dest_path=dest_path, ply_fname=key + ".ply")
[docs] def axoness_for_coords(self, coords, radius_nm=4000, pred_type="axoness"): """ Returns the majority label for given coordinates within a specified radius. This method is not limited to axoness, it supports any attribute stored in self.skeleton. Args: coords (np.ndarray): Voxel coordinates. These should be unscaled. radius_nm (float): The search radius specified in nanometers. pred_type (str): The type of prediction to retrieve. Returns: np.ndarray: An array of the same length as coords. Each element corresponds to the majority label within the specified radius for the corresponding coordinate. """ return np.array(self.attr_for_coords(coords, [pred_type], radius_nm))
[docs] def attr_for_coords(self, coords, attr_keys, radius_nm=None, k=1): """ Queries skeleton node attributes at specified coordinates. Any attribute stored in self.skeleton is supported. If a radius_nm is provided, it will assign the majority attribute value. Args: coords (np.array): Unscaled voxel coordinates, format - [N, 3]. radius_nm (Optional[float]): If None, only the attribute of the nearest node is used, otherwise, the majority attribute value is assigned. attr_keys (List[str]): Identifier for the attribute. k (int): Number of nearest neighbors, applicable only if `radius_nm` is None. Returns: list: Returns a list of the same length as coords. For each coordinate in coords, it returns the majority label within radius_nm or [-1] if the Key does not exist. """ if type(attr_keys) is str: attr_keys = [attr_keys] coords = np.array(coords) if self.skeleton is None: self.load_skeleton() if self.skeleton is None or len(self.skeleton["nodes"]) == 0: log_reps.warn("Skeleton did not exist for SSV {} (size: {}; rep. coord.: " "{}).".format(self.id, self.size, self.rep_coord)) return -1 * np.ones((len(coords), len(attr_keys))) # get close locations if k > 1 and len(self.skeleton["nodes"]) < k: log_reps.warn(f'Number of skeleton nodes ({len(self.skeleton["nodes"])}) ' f'is smaller than k={k} in SSO {self.id}. Lowering k.') k = len(self.skeleton["nodes"]) kdtree = scipy.spatial.cKDTree(self.skeleton["nodes"] * self.scaling) if radius_nm is None: _, close_node_ids = kdtree.query(coords * self.scaling, k=k, n_jobs=self.nb_cpus) else: close_node_ids = kdtree.query_ball_point(coords * self.scaling, radius_nm) attr_dc = defaultdict(list) for i_coord in range(len(coords)): curr_close_node_ids = close_node_ids[i_coord] for attr_key in attr_keys: # e.g. for glia SSV axoness does not exist. if attr_key not in self.skeleton: el = -1 if k == 1 else [-1] * k attr_dc[attr_key].append(el) continue # use nodes within radius_nm, there might be multiple node ids if radius_nm is not None: if len(curr_close_node_ids) == 0: dist, curr_close_node_ids = kdtree.query(coords * self.scaling) log_reps.info( "Couldn't find skeleton nodes within {} nm. Using nearest " "one with distance {} nm. SSV ID {}, coordinate at {}." "".format(radius_nm, dist[0], self.id, coords[i_coord])) cls, cnts = np.unique( np.array(self.skeleton[attr_key])[np.array(curr_close_node_ids)], return_counts=True) if len(cls) > 0: attr_dc[attr_key].append(cls[np.argmax(cnts)]) else: log_reps.info("Did not find any skeleton node within {} nm at {}." " SSV {} (size: {}; rep. coord.: {}).".format( radius_nm, i_coord, self.id, self.size, self.rep_coord)) attr_dc[attr_key].append(-1) else: # only nearest node ID attr_dc[attr_key].append(self.skeleton[attr_key][curr_close_node_ids]) # in case latent morphology was not predicted / needed if "latent_morph" in attr_keys: latent_morph = attr_dc["latent_morph"] for i in range(len(latent_morph)): curr_latent = latent_morph[i] if np.isscalar(curr_latent) and curr_latent == -1: curr_latent = np.array([np.inf] * self.config['tcmn']['ndim_embedding']) latent_morph[i] = curr_latent return [np.array(attr_dc[k]) for k in attr_keys]
[docs] def predict_views_axoness(self, model, verbose=False, pred_key_appendix=""): """ Predicts axoness and stores resulting labels at vertex dictionary. Args: model: The prediction model to be used. verbose (bool): If True, additional log information will be printed. pred_key_appendix (str): Additional key to be appended to the prediction key. """ start = time.time() pred_key = "axoness_probas" pred_key += pred_key_appendix if self.version == 'tmp': log_reps.warning('"predict_views_axoness" called but this SSV ' 'has version "tmp", results will' ' not be saved to disk.') try: predict_sos_views(model, self.svs, pred_key, nb_cpus=self.nb_cpus, verbose=verbose, woglia=True, raw_only=False, return_proba=self.version == 'tmp') # do not write to disk except KeyError: log_reps.error("Re-rendering SSV %d (%d SVs), because views are missing." % (self.id, len(self.sv_ids))) self.render_views(add_cellobjects=True, woglia=True, overwrite=True) predict_sos_views(model, self.svs, pred_key, nb_cpus=self.nb_cpus, verbose=verbose, woglia=True, raw_only=False, return_proba=self.version == 'tmp') # do not write to disk) end = time.time() log_reps.debug("Prediction of %d SV's took %0.2fs (incl. read/write). " "%0.4fs/SV" % (len(self.sv_ids), end - start, float(end - start) / len(self.sv_ids)))
[docs] def predict_views_embedding(self, model, pred_key_appendix="", view_key=None): """ Saves a latent vector capturing a local morphology fingerprint for every skeleton node location based on the nearest rendering location. This method requires existing views. For on the fly view rendering, use `view_embedding_of_sso_nocache`. Todo: * Add option for on the fly rendering and call `view_embedding_of_sso_nocache` here. Args: model: The prediction model to be used. pred_key_appendix (str): Additional key to be appended to the prediction key. view_key (str): View identifier, e.g. if views have been pre-rendered and are stored in `self.view_dict`. """ from ..handler.prediction import naive_view_normalization_new pred_key = "latent_morph" pred_key += pred_key_appendix if self.version == 'tmp': log_reps.warning('"predict_views_embedding" called but this SSV ' 'has version "tmp", results will' ' not be saved to disk.') views = self.load_views(view_key=view_key) # [N, 4, 2, y, x] # TODO: add normalization to model - prevent potentially different normalization! views = naive_view_normalization_new(views) # The inference with TNets can be optimzed, via splititng the views into three equally sized parts. inp = (views[:, :, 0], np.zeros_like(views[:, :, 0]), np.zeros_like(views[:, :, 0])) # return dist1, dist2, inp1, inp2, inp3 latent _, _, latent, _, _ = model.predict_proba(inp) # only use first view for now # map latent vecs at rendering locs to skeleton node locations via nearest neighbor self.load_skeleton() if 'view_ixs' not in self.skeleton: hull_tree = spatial.cKDTree(np.concatenate(self.sample_locations())) dists, ixs = hull_tree.query(self.skeleton["nodes"] * self.scaling, n_jobs=self.nb_cpus, k=1) self.skeleton["view_ixs"] = ixs self.skeleton[pred_key] = latent[self.skeleton["view_ixs"]] self.save_skeleton()
[docs] def cnn_axoness2skel(self, **kwargs): """ Maps CNN axoness predictions to the skeleton. Args: **kwargs: Additional keyword arguments. """ locking_tmp = self.enable_locking self.enable_locking = False # all SV operations are read-only # (enable_locking is inherited by sso.svs); # SSV operations not, but SSO file structure is not chunked res = ssh.cnn_axoness2skel(self, **kwargs) self.enable_locking = locking_tmp return res
[docs] def average_node_axoness_views(self, **kwargs): """ Applies a sliding window averaging along the axon predictions stored at the nodes of the skeleton. See :func:`~syconn.reps.super_segmentation_helper._average_node_axoness_views` for more details. This will call :func:`~save_skeleton`. Args: **kwargs: Key word arguments used for :func:`~syconn.reps.super_segmentation_helper._average_node_axoness_views`. """ locking_tmp = self.enable_locking self.enable_locking = False # all SV operations are read-only # (enable_locking is inherited by sso.svs); # SSV operations not, but SSO file structure is not chunked res = ssh.average_node_axoness_views(self, **kwargs) self.save_skeleton() self.enable_locking = locking_tmp return res
[docs] def axoness2mesh(self, dest_path, k=1, pred_key_appendix=''): """ Deprecated. Writes the per-location CMN axon predictions to a kzip file. See :func:`~semseg2mesh`. Args: dest_path (str): Path to the kzip file. k (int): Number of nearest neighbors used for the majority vote. pred_key_appendix (str): Key to load specific predictions. """ ssh.write_axpred_cnn(self, pred_key_appendix=pred_key_appendix, k=k, dest_path=dest_path)
# --------------------------------------------------------------- CELL TYPES
[docs] def predict_celltype_multiview(self, model, pred_key_appendix, model_tnet=None, view_props=None, onthefly_views=False, overwrite=True, model_props=None, verbose: bool = False, save_to_attr_dict: bool = True): """ Infers celltype classification using `model`, optionally includes cell morphology embedding via `model_tnet`. Stores results as ``celltype_cnn_e3`` and ``celltype_cnn_e3_probas`` in the :py:attr:`~attr_dict`. Args: model (nn.Module): The prediction model for celltype classification. pred_key_appendix (str): Additional key to append to the prediction key. model_tnet (Optional[nn.Module]): Optional model for cell morphology embedding. The resulting embedding is stored as ``latent_morph_ct``. view_props (Optional[dict]): Dictionary containing view properties. If None, defaults defined in :py:attr:`~config` will be used. onthefly_views (bool): If True, views will be rendered on-the-fly. overwrite (bool): If True, overwrites existing predictions. model_props (Optional[dict]): Model properties as defined in config.yml. verbose (bool): If True, prints additional log information. save_to_attr_dict (bool): If True, saves predictions in the attribute dictionary. """ if model_props is None: model_props = {} view_props_def = self.config['views']['view_properties'] if view_props is not None: view_props_def.update(view_props) view_props = view_props_def if not onthefly_views: ssh.predict_sso_celltype(self, model, pred_key_appendix=pred_key_appendix, save_to_attr_dict=save_to_attr_dict, overwrite=overwrite, **model_props) else: ssh.celltype_of_sso_nocache(self, model, pred_key_appendix=pred_key_appendix, save_to_attr_dict=save_to_attr_dict, overwrite=overwrite, verbose=verbose, **view_props, **model_props) if model_tnet is not None: view_props = dict(view_props) # create copy if 'use_syntype' in view_props: del view_props['use_syntype'] ssh.view_embedding_of_sso_nocache(self, model_tnet, pred_key_appendix=pred_key_appendix, overwrite=True, **view_props)
[docs] def predict_cell_morphology_pts(self, **kwargs): """ Stores local cell morphology with key 'latent_morph' (+ `pred_key_appendix`) in the SSV skeleton. Args: **kwargs: Additional keyword arguments. """ from syconn.handler.prediction_pts import infere_cell_morphology_ssd ssd_kwargs = dict(working_dir=self.working_dir, config=self.config) ssv_params = [dict(ssv_id=self.id, **ssd_kwargs)] infere_cell_morphology_ssd(ssv_params, **kwargs)
[docs] def render_ortho_views_vis(self, dest_folder=None, colors=None, ws=(2048, 2048), obj_to_render=("sv",)): """ Renders orthogonal views for visualization. Args: dest_folder (Optional[str]): Destination folder for saving images. colors (Optional[dict]): Dictionary specifying colors for rendering. ws (tuple): Window size for rendering views. obj_to_render (tuple): Objects to render. """ multi_view_sso = load_rendering_func('multi_view_sso') if colors is None: colors = {"sv": (0.5, 0.5, 0.5, 0.5), "mi": (0, 0, 1, 1), "vc": (0, 1, 0, 1), "sj": (1, 0, 0, 1)} views = multi_view_sso(self, colors, ws=ws, obj_to_render=obj_to_render) if dest_folder: from scipy.misc import imsave # TODO: use new imageio package for ii, v in enumerate(views): imsave("%s/SSV_%d_%d.png" % (dest_folder, self.id, ii), v) else: return views
[docs] def certainty_celltype(self, pred_key: Optional[str] = None) -> float: """ Estimates the certainty of the celltype prediction. 1. If `is_logit` is True, generates pseudo-probabilities from the input using softmax. 2. Sums the evidence per class and (re-)normalizes. 3. Computes the entropy, scales it with the maximum entropy (equal probabilities) and subtracts it from 1. Note: See :func:`~syconn.handler.prediction.certainty_estimate` Args: pred_key (Optional[str]): Key of classification results (one C-class probability vector for every multi-view sample). ``pred_key + '_probas'`` must exist in :py:attr:`~attr_dict`. Returns: float: Certainty measure based on the entropy of the cell type logits. """ if pred_key is None: pred_key = 'celltype_cnn_e3' cert = self.lookup_in_attribute_dict(pred_key + '_certainty') if cert is not None: return cert logits = self.lookup_in_attribute_dict(pred_key + '_probas') return certainty_estimate(logits, is_logit=True)
[docs] def majority_vote(self, prop_key: str, max_dist: float) -> np.ndarray: """ Smooths property prediction in annotation using a sliding window of 2 times max_dist and a majority vote. Args: prop_key (str): Property to average. max_dist (float): Maximum distance (in nm) for the sliding window used in majority voting. Returns: np.ndarray: Smoothed property predictions. """ assert prop_key in self.skeleton, "Given key does not exist in self.skeleton" prop_array = self.skeleton[prop_key] assert prop_array.squeeze().ndim == 1, "Property array has to be 1D." maj_votes = np.zeros_like(prop_array) for ii in range(len(self.skeleton["nodes"])): paths = nx.single_source_dijkstra_path(self.weighted_graph(), ii, max_dist) neighs = np.array(list(paths.keys()), dtype=np.int64) labels, cnts = np.unique(prop_array[neighs], return_counts=True) maj_label = labels[np.argmax(cnts)] maj_votes[ii] = maj_label return maj_votes
[docs] def shortestpath2soma(self, coordinates: np.ndarray, axoness_key: Optional[str] = None) -> List[float]: """ Computes the shortest path to the soma along the skeleton. Cell compartment predictions must exist in the skeleton's 'axoness_avg10000', as per the 'syconn.exec.exec_inference.run_semsegaxoness_mapping' function. A populated skeleton is required, e.g., via the 'load_skeleton' function. Args: coordinates (np.ndarray): Starting coordinates in voxel coordinates with shape (N, 3). axoness_key (Optional[str]): Key to axon prediction stored in the skeleton. Raises: KeyError: If axon prediction does not exist. Examples: To get the shortest paths between all synapses and the soma, use the following code: from syconn.reps.super_segmentation import * from syconn import global_params global_params.wd = '~/SyConn/example_cube1/' ssd = SuperSegmentationDataset() # get any cell reconstruction ssv = ssd.get_super_segmentation_object(ssd.ssv_ids[0]) # get synapse coordinates in voxels. syns = np.array([syn.rep_coord for syn in ssv.syn_ssv]) shortest_paths = ssv.shortestpath2soma(syns) Returns: List[float]: The shortest path in nanometers for each start coordinate. """ if axoness_key is None: axoness_key = 'axoness_avg{}'.format(self.config['compartments'][ 'dist_axoness_averaging']) nodes = self.skeleton['nodes'] soma_ixs = np.nonzero(self.skeleton[axoness_key] == 2)[0] if np.sum(soma_ixs) == 0: return [np.inf] * len(coordinates) graph = self.weighted_graph(add_node_attr=[axoness_key]) kdt = scipy.spatial.cKDTree(nodes) dists, start_ixs = kdt.query(coordinates, n_jobs=self.nb_cpus) log_reps.debug(f'Computing shortest paths to soma for {len(start_ixs)} ' f'starting nodes.') shortest_paths_of_interest = [] for ix in start_ixs: shortest_paths = nx.single_source_dijkstra_path_length(graph, ix) # get the shortest path to a soma curr_path = np.min([shortest_paths[soma_ix] for soma_ix in soma_ixs]) shortest_paths_of_interest.append(curr_path) return shortest_paths_of_interest
[docs] def path_density_seg_obj(self, obj_type: str, compartments_of_interest: Optional[List[int]] = None, ax_pred_key: str = 'axoness_avg10000') -> float: """ Calculates the average volume per path length of a segmented object. Args: obj_type (str): Key to any available sub-cellular structure. compartments_of_interest (Optional[List[int]]): Which compartments to take into account for calculation. axon: 1, dendrite: 0, soma: 2 ax_pred_key (str): Key of compartment prediction stored in the skeleton, only used if `compartments_of_interest` was set. Returns: float: Average volume per path length (um^3 / um). """ objs = np.array(self.get_seg_objects(obj_type)) if self.skeleton is None: self.load_skeleton() skel = self.skeleton if compartments_of_interest is not None: node_labels = skel[ax_pred_key] node_labels[node_labels == 3] = 1 node_labels[node_labels == 4] = 1 tree = spatial.cKDTree(skel['nodes'] * self.scaling) _, ixs = tree.query(np.array([obj.rep_coord for obj in objs]) * self.scaling, k=1, n_jobs=self.nb_cpus) obj_labels = node_labels[ixs] mask = np.zeros(len(objs), dtype=np.bool) for comp_label in compartments_of_interest: mask = mask | (obj_labels == comp_label) objs = objs[mask] if len(objs) > 0: vx_count = np.sum([obj.size for obj in objs]) else: vx_count = 0 obj_vol = vx_count * np.prod(self.scaling) / 1e9 # in um^3 path_length = self.total_edge_length(compartments_of_interest) / 1e3 # in um if path_length == 0: return 0.0 else: return obj_vol / path_length
# ------------------------------------------------------------------------------ # SO rendering code
[docs]def render_sampled_sos_cc(sos, ws=(256, 128), verbose=False, woglia=True, render_first_only=0, add_cellobjects=True, overwrite=False, cellobjects_only=False, index_views=False, enable_locking=True): """ Renders views at various sampled locations from the combined mesh of all super voxels (SVs). The number of sampled locations depends on the size of the SV mesh, with a scaling factor considered. Args: sos: A list of SegmentationObject instances representing super voxels. ws: A tuple indicating the window size for rendering views. verbose: A bool, if True enables verbose output. woglia: A bool, if True renders view without glia components. render_first_only: An int, indicating the number of initial SVs to render. If 0, all are rendered. add_cellobjects: A bool, if True adds cellular objects to the views. overwrite: A bool, if True overwrites existing views. cellobjects_only: A bool, if True renders only cellular object channels. index_views: A bool, if True enables rendering of index views. enable_locking: A bool, if True enables system locking when writing views. Returns: None """ # initilaize temporary SSO if not overwrite: if render_first_only: if np.all([sos[ii].views_exist(woglia=woglia) for ii in range(render_first_only)]): return else: if np.all([sv.views_exist(woglia=woglia) for sv in sos]): return sso = SuperSegmentationObject(sos[0].id, create=False, enable_locking=False, working_dir=sos[0].working_dir, version="tmp", scaling=sos[0].scaling) sso._objects["sv"] = sos if render_first_only: coords = [sos[ii].sample_locations() for ii in range(render_first_only)] else: coords = sso.sample_locations(cache=False) if add_cellobjects: sso._map_cellobjects(save=False) part_views = np.cumsum([0] + [len(c) for c in coords]) if index_views: views = render_sso_coords_index_views(sso, flatten_list(coords), ws=ws, verbose=verbose) else: views = render_sso_coords(sso, flatten_list(coords), add_cellobjects=add_cellobjects, ws=ws, verbose=verbose, cellobjects_only=cellobjects_only) for i in range(len(coords)): # TODO: write chunked v = views[part_views[i]:part_views[i + 1]] if np.sum(v) == 0 or np.sum(v) == np.prod(v.shape): log_reps.warn("Empty views detected after rendering.", RuntimeWarning) sv_obj = sos[i] sv_obj.save_views(views=v, woglia=woglia, index_views=index_views, cellobjects_only=cellobjects_only, enable_locking=True)
[docs]def render_so(so, ws=(256, 128), add_cellobjects=True, verbose=False): """ Renders super voxel views at given locations without writing views to so.views_path. Args: so: A SegmentationObject instance representing a super voxel ID. ws: A tuple indicating the rendering window size. add_cellobjects: A boolean flag indicating whether to include cellular objects in the views. verbose: A boolean flag for verbose output. Returns: A numpy array containing the rendered views. """ # initilaize temporary SSO for cellobject mapping purposes sso = SuperSegmentationObject(so.id, create=False, working_dir=so.working_dir, version="tmp", scaling=so.scaling) sso._objects["sv"] = [so] coords = sso.sample_locations(cache=False)[0] if add_cellobjects: sso._map_cellobjects(save=False) views = render_sso_coords(sso, coords, ws=ws, add_cellobjects=add_cellobjects, verbose=verbose) return views
[docs]def celltype_predictor(args) -> Iterable: """ Predicts the cell type for a set of super segmentation objects (SSOs) using a specified model. Args: args: A tuple containing the SSO IDs, number of CPUs, and model properties. Returns: An iterable of missing or failed SSO IDs. """ from ..handler.prediction import get_celltype_model_e3 ssv_ids, nb_cpus, model_props = args use_onthefly_views = global_params.config.use_onthefly_views view_props = global_params.config['views']['view_properties'] m = get_celltype_model_e3() missing_ssvs = [] for ix in ssv_ids: ssv = SuperSegmentationObject(ix, working_dir=global_params.config.working_dir) ssv.nb_cpus = nb_cpus ssv._view_caching = True try: ssv.predict_celltype_multiview(m, pred_key_appendix="", onthefly_views=use_onthefly_views, overwrite=True, view_props=view_props, model_props=model_props) except RuntimeError as e: missing_ssvs.append(ssv.id) msg = 'ERROR during celltype prediction of SSV {}. {}'.format(ssv.id, repr(e)) log_reps.error(msg) return missing_ssvs
[docs]def semsegaxoness_predictor(args) -> List[int]: """ Predicts axoness for a set of super segmentation objects (SSOs) and stores the results in the vertex dictionary. Args: args: A tuple containing the SSO IDs, view properties, number of CPUs, mapping properties, prediction key, and maximum distance for mapping. Returns: A list of missing or failed SSO IDs. """ from ..handler.prediction import get_semseg_axon_model ssv_ids, view_props, nb_cpus, map_properties, pred_key, max_dist, bs = args m = get_semseg_axon_model() missing_ssvs = [] for ix in ssv_ids: ssv = SuperSegmentationObject(ix, working_dir=global_params.config.working_dir) ssv.nb_cpus = nb_cpus ssv._view_caching = True try: ssh.semseg_of_sso_nocache(ssv, m, bs=bs, **view_props) semsegaxoness2skel(ssv, map_properties, pred_key, max_dist) except RuntimeError as e: missing_ssvs.append(ssv.id) msg = 'Error during sem. seg. prediction of SSV {}. {}'.format(ssv.id, repr(e)) log_reps.error(msg) del ssv return missing_ssvs
[docs]def semsegaxoness2skel(sso: SuperSegmentationObject, map_properties: dict, pred_key: str, max_dist: int): """ This function populates the skeleton of the provided SuperSegmentationObject (SSO) with axoness predictions and executes a majority vote smoothing operation. It adds two keys to the SSO's skeleton dictionary: - "{}_avg{}".format(pred_key, max_dist): This holds the raw axoness predictions. - "{}_avg{}_comp_maj".format(pred_key, max_dist): This contains the smoothed axoness predictions after the majority vote. Args: sso (SuperSegmentationObject): The SuperSegmentationObject whose skeleton will be populated with axoness predictions. map_properties (dict): The properties that map the vertex predictions to the skeleton nodes. pred_key (str): The key used for retrieving vertex labels and storing the mapped node labels in the skeleton. max_dist (int): The distance used in the majority vote for the smoothing operation. Notes: - If there are no mesh vertices available or there are no nodes, the node predictions will be zero. - The skeleton must be loaded for this function to work. If it's not, the function will attempt to load it. - If the skeleton has no nodes or the SSO doesn't have any mesh vertices, warnings will be logged and the skeleton keys will be filled with zeros. Returns: None: This function modifies the skeleton of the SSO in-place. """ if sso.skeleton is None: sso.load_skeleton() if sso.skeleton is None: log_reps.warning(f"Skeleton of {sso} hdoes not exist.") return if len(sso.skeleton["nodes"]) == 0 or len(sso.mesh[1]) == 0: log_reps.warning(f"Skeleton of {sso} has zero nodes or no mesh vertices.") sso.skeleton["{}_avg{}".format(pred_key, max_dist)] = np.zeros((len(sso.skeleton['nodes']), 1)) sso.skeleton["{}_avg{}_comp_maj".format(pred_key, max_dist)] = np.zeros((len(sso.skeleton['nodes']), 1)) sso.save_skeleton() return # vertex predictions node_preds = sso.semseg_for_coords( sso.skeleton['nodes'], semseg_key=pred_key, **map_properties) # perform average only on axon dendrite and soma predictions nodes_ax_den_so = np.array(node_preds, dtype=np.int32) # set en-passant and terminal boutons to axon class for averaging # bouton labels are stored in node_preds nodes_ax_den_so[nodes_ax_den_so == 3] = 1 nodes_ax_den_so[nodes_ax_den_so == 4] = 1 sso.skeleton[pred_key] = nodes_ax_den_so # average along skeleton, stored as: "{}_avg{}".format(pred_key, max_dist) ssh.majorityvote_skeleton_property(sso, prop_key=pred_key, max_dist=max_dist) # suffix '_avg{}' is added by `_average_node_axoness_views` nodes_ax_den_so = sso.skeleton["{}_avg{}".format(pred_key, max_dist)] # recover bouton predictions within axons and store smoothed result nodes_ax_den_so[(node_preds == 3) & (nodes_ax_den_so == 1)] = 3 nodes_ax_den_so[(node_preds == 4) & (nodes_ax_den_so == 1)] = 4 sso.skeleton["{}_avg{}".format(pred_key, max_dist)] = nodes_ax_den_so # will create a compartment majority voting after removing all soma nodes # the restul will be written to: ``ax_pred_key + "_comp_maj"`` ssh.majority_vote_compartments(sso, "{}_avg{}".format(pred_key, max_dist)) nodes_ax_den_so = sso.skeleton["{}_avg{}_comp_maj".format(pred_key, max_dist)] # recover bouton predictions within axons and store majority result nodes_ax_den_so[(node_preds == 3) & (nodes_ax_den_so == 1)] = 3 nodes_ax_den_so[(node_preds == 4) & (nodes_ax_den_so == 1)] = 4 sso.skeleton["{}_avg{}_comp_maj".format(pred_key, max_dist)] = nodes_ax_den_so sso.save_skeleton()
[docs]def semsegspiness_predictor(args) -> List[int]: """ Predicts spiness for a list of SuperSegmentationObjects and stores the resulting labels in their vertex dictionaries. Args: args (tuple): A tuple containing the following elements: - ssv_ids (List[int]): List of SuperSegmentationObject IDs to process. - view_props (dict): Properties for the view rendering. - nb_cpus (int): Number of CPUs to use for parallel processing. - kwargs_semseg2mesh (dict): Keyword arguments for the semseg2mesh function. - kwargs_semsegforcoords (dict): Keyword arguments for the semseg_for_coords function. Returns: List[int]: List of IDs of SuperSegmentationObjects where prediction has failed. """ from ..handler.prediction import get_semseg_spiness_model m = get_semseg_spiness_model() ssv_ids, view_props, nb_cpus, kwargs_semseg2mesh, kwargs_semsegforcoords = args missing_ssvs = [] for ix in ssv_ids: ssv = SuperSegmentationObject(ix, working_dir=global_params.config.working_dir) ssv.nb_cpus = nb_cpus ssv._view_caching = True try: ssh.semseg_of_sso_nocache(ssv, m, **view_props, **kwargs_semseg2mesh) # map to skeleton ssv.load_skeleton() if ssv.skeleton is None or len(ssv.skeleton["nodes"]) == 0: log_reps.warning(f"Skeleton of SSV {ssv.id} has zero nodes.") node_preds = np.zeros((0, ), dtype=np.int32) else: # vertex predictions node_preds = ssv.semseg_for_coords(ssv.skeleton['nodes'], kwargs_semseg2mesh['semseg_key'], **kwargs_semsegforcoords) ssv.skeleton[kwargs_semseg2mesh['semseg_key']] = node_preds ssv.save_skeleton() except RuntimeError as e: missing_ssvs.append(ssv.id) msg = 'Error during sem. seg. prediction of SSV {}. {}'.format(ssv.id, repr(e)) log_reps.error(msg) return missing_ssvs