Source code for syconn.backend.storage

# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max Planck Institute of Neurobiology, Martinsried, Germany
# Authors: Philipp Schubert, Sven Dorkenwald, Joergen Kornfeld
import os.path
import shutil
from collections import defaultdict
from typing import Any, Tuple, Optional, Union, List, Iterator, Dict

from ..backend import StorageClass
from ..backend import log_backend
from ..handler.basics import kd_factory
from ..handler.compression import lz4string_listtoarr, arrtolz4string_list

import h5py
import numpy as np

try:
    from lz4.block import compress, decompress
except ImportError:
    from lz4 import compress, decompress


[docs]class AttributeDict(StorageClass): """ This class is a general-purpose dictionary class that inherits from the StorageClass. It is used to store attributes of segmentation objects in the SyConn toolkit. The attributes are stored in a dictionary format which allows for easy access and manipulation of the data. """ def __init__(self, inp_p, **kwargs): """ Initializes the AttributeDict object. Args: inp_p: The input path where the dictionary is stored. **kwargs: Additional keyword arguments. """ super().__init__(inp_p, **kwargs) def __getitem__(self, item): """ Retrieves the value of the given item from the dictionary. If the item does not exist, it initializes an empty dictionary for the item. Args: item: The key of the item to retrieve from the dictionary. Returns: The value of the item in the dictionary. """ try: return self._dc_intern[item] except KeyError: self._dc_intern[item] = {} return self._dc_intern[item] def __setitem__(self, key, value): """ Sets the value of the given key in the dictionary. Args: key: The key of the item to set in the dictionary. value: The value to set for the key in the dictionary. """ self._dc_intern[key] = value
[docs] def update(self, other, **kwargs): """ Updates the dictionary with the key-value pairs from another dictionary. Args: other: The other dictionary to update from. **kwargs: Additional keyword arguments. """ self._dc_intern.update(other, **kwargs)
[docs] def copy_intern(self): """ Creates a copy of the internal dictionary. Returns: A copy of the internal dictionary. """ return dict(self._dc_intern)
[docs]class CompressedStorage(StorageClass): """ This class is a customized dictionary used to store compressed numpy arrays. The compression and decompression processes happen in the background, providing an intuitive user interface. It also has an optional 'cache_decomp' kwarg to cache decompressed arrays for faster access when items are frequently accessed. """ def __init__(self, inp: str, **kwargs): """ Initializes the CompressedStorage object. Args: inp: The input path where the dictionary is stored. **kwargs: Additional keyword arguments. """ super().__init__(inp, **kwargs) def __getitem__(self, item: Union[int, str]): """ Retrieves the decompressed array of the given item from the dictionary. If the item does not exist in the cache, it decompresses the array and stores it in the cache if cache_decomp is enabled. Args: item: The key of the item to retrieve from the dictionary. Returns: The decompressed array of the item. """ try: return self._cache_dc[item] except KeyError: pass value_intern = self._dc_intern[item] sh = value_intern["sh"] dt = np.dtype(value_intern["dt"]) decomp_arr = lz4string_listtoarr(value_intern["arr"], dtype=dt, shape=sh) if self._cache_decomp: self._cache_dc[item] = decomp_arr return decomp_arr def __setitem__(self, key: Union[int, str], value: np.ndarray): """ Sets the value of the given key in the dictionary. The value is compressed and stored in the dictionary. If cache_decomp is enabled, the original value is also stored in the cache. Args: key: The key of the item to set in the dictionary. value: The numpy array to set for the key in the dictionary. """ if type(value) is not np.ndarray: msg = "CompressedStorage supports np.array values only." log_backend.error(msg) raise ValueError(msg) if self._cache_decomp: self._cache_dc[key] = value sh = list(value.shape) sh[0] = -1 value_intern = {"arr": arrtolz4string_list(value), "sh": tuple(sh), "dt": value.dtype.str} self._dc_intern[key] = value_intern def __delitem__(self, key): """ Deletes the given key and its value from the dictionary and the cache. Args: key: The key of the item to delete from the dictionary. """ del self._dc_intern[key] if key in self._cache_dc: del self._cache_dc[key]
[docs]class VoxelStorageL(StorageClass): """ This class is a customized dictionary used to store compressed numpy arrays representing voxel masks. The compression and decompression processes happen in the background, providing an intuitive user interface. It also has an optional feature, 'cache_decomp', to cache decompressed arrays for faster access, saving decompressing time. """ def __init__(self, inp: str, **kwargs): """ Initializes the VoxelStorageL object. Args: inp: The input path where the dictionary is stored. **kwargs: Additional keyword arguments. """ super().__init__(inp, **kwargs) def __getitem__(self, item: Union[int, str]): """ Retrieves the decompressed voxel masks and corresponding offsets of the given item from the dictionary. If the item does not exist in the cache, it decompresses the voxel masks and stores them in the cache if cache_decomp is enabled. Args: item: The key of the item to retrieve from the dictionary. Returns: The decompressed voxel masks and corresponding offsets of the item. """ try: return self._cache_dc[item], self._dc_intern[item]["off"] except KeyError: pass value_intern = self._dc_intern[item] dt = np.dtype(value_intern["dt"]) sh = value_intern["sh"] offsets = value_intern["off"] comp_arrs = value_intern["arr"] decomp_arrs = [] for i in range(len(sh)): decomp_arrs.append(lz4string_listtoarr(comp_arrs[i], dt, sh[i])) if self._cache_decomp: self._cache_dc[item] = decomp_arrs return decomp_arrs, offsets def __setitem__(self, key: Union[int, str], values: Tuple[List[np.ndarray], List[np.ndarray]]): """ Sets the voxel masks and corresponding offsets of the given key in the dictionary. The voxel masks are compressed and stored in the dictionary. If cache_decomp is enabled, the original voxel masks are also stored in the cache. Args: key: The key of the item to set in the dictionary. E.g. SO ID. values: The voxel masks and corresponding offsets to set for the key in the dictionary. E.g. voxel masks. """ voxel_masks, offsets = values assert np.all([voxel_masks[0].dtype == v.dtype for v in voxel_masks]) assert len(voxel_masks) == len(offsets) if self._cache_decomp: self._cache_dc[key] = voxel_masks sh = [v.shape for v in voxel_masks] for i in range(len(sh)): curr_sh = list(sh[i]) curr_sh[0] = -1 sh[i] = curr_sh value_intern = {"arr": [arrtolz4string_list(v) for v in voxel_masks], "sh": sh, "dt": voxel_masks[0].dtype.str, "off": offsets} self._dc_intern[key] = value_intern
[docs] def append(self, key: int, voxel_mask: np.ndarray, offset: np.ndarray): """ Appends a voxel mask and its corresponding offset to the given key in the dictionary. The voxel mask is compressed and stored in the dictionary. Args: key: The key of the item to append to in the dictionary. voxel_mask: The voxel mask to append. offset: The corresponding offset to append. """ value_intern = self._dc_intern[key] dt = np.dtype(value_intern["dt"]) sh = value_intern["sh"] offsets = value_intern["off"] + [offset] comp_arrs = value_intern["arr"] assert dt == voxel_mask.dtype.str curr_sh = list(voxel_mask.shape) curr_sh[0] = -1 sh.append(curr_sh) value_intern = {"arr": comp_arrs + [arrtolz4string_list(voxel_mask)], "sh": sh, "dt": dt, "off": offsets} self._dc_intern[key] = value_intern
[docs]def VoxelStorage(inp, **kwargs): """ This function is an alias for the VoxelStorageDyn class. It initializes a VoxelStorageDyn object and returns it. Args: inp: The input path where the dictionary is stored. **kwargs: Additional keyword arguments. Returns: A VoxelStorageDyn object. """ obj = VoxelStorageDyn(inp, **kwargs) return obj
[docs]class VoxelStorageClass(VoxelStorageL): """ This class is a customized dictionary used to store compressed numpy arrays representing voxel masks. The compression and decompression processes happen in the background, providing an intuitive user interface. It also has an optional feature, 'cache_decomp', to cache decompressed arrays for faster access. This class does not provide any locking mechanism. """ def __init__(self, inp: str, **kwargs): """ Initializes the VoxelStorageClass object. Args: inp: The input path where the dictionary is stored. **kwargs: Additional keyword arguments. """ if "disable_locking" in kwargs: assert kwargs["disable_locking"], "Locking must be disabled " \ "in this class. Use VoxelDictL " \ "to enable locking." super(VoxelStorageL, self).__init__(inp, **kwargs)
[docs]class VoxelStorageDyn(CompressedStorage): """ A class that extends the CompressedStorage class to provide a storage mechanism for voxel data. This class does not store the voxels explicitly, but the information necessary to query the voxels of an object. It operates in two modes depending on the `voxel_mode` flag. If `voxel_mode` is True, the getter method operates on the underlying data set to retrieve voxels of an object and the `__setitem__` method throws a `RuntimeError`. The `__getitem__` method will return a list of 3D binary cubes with ones at the object's locations (key: object ID). Note: The item ID has to match the object ID in the segmentation. If `voxel_mode` is False, `__getitem__` and `__setitem__` allow manipulation of the object's bounding boxes. In this case, `voxeldata_path` has to be given or already be existent in the loaded dictionary. The `__setitem__` method requires the object ID as key and a 3D array with all bounding boxes defining the object (N, 2, 3). Those bounding boxes are then used to query the object voxels. The bounding box is expected to be two 3D coordinates which define the lower and the upper limits. """ def __init__(self, inp: str, voxel_mode: bool = True, voxeldata_path: Optional[str] = None, **kwargs): """ Initializes the VoxelStorageDyn object. If the input does not end with '.pkl', '.pkl' is appended to it. The `voxel_mode` and `voxeldata_path` are set according to the arguments. If 'meta' is not in the internal dictionary, it is added with `voxeldata_path` as its value. If 'size' is not in the internal dictionary, it is added as a defaultdict with int as the default factory. If 'rep_coord' is not in the internal dictionary, it is added as an empty dictionary. If 'voxel_cache' is not in the internal dictionary, it is added as an empty dictionary. If `voxeldata_path` is not None, it is used to overwrite the 'voxeldata_path' in the internal dictionary's 'meta'. If `voxel_mode` is True and `voxeldata_path` is None, a ValueError is raised. If `voxel_mode` is True, `voxeldata_path` is used to create a KnossosDataset which is set as the `voxeldata` attribute. The `_cache_dc` attribute is set as a VoxelStorageLazyLoading object with the input (with '.pkl' replaced by '.npz') as the argument. Args: inp (str): The input path for the storage. voxel_mode (bool, optional): The mode of operation. Defaults to True. voxeldata_path (str, optional): The path to the voxel data. Defaults to None. **kwargs: Arbitrary keyword arguments. """ if not inp.endswith('.pkl'): inp = inp + '.pkl' super().__init__(inp, **kwargs) self.voxel_mode = voxel_mode if 'meta' not in self._dc_intern: # add meta information about underlying voxel data set to internal dictionary self._dc_intern['meta'] = dict(voxeldata_path=voxeldata_path) if 'size' not in self._dc_intern: self._dc_intern['size'] = defaultdict(int) if 'rep_coord' not in self._dc_intern: self._dc_intern['rep_coord'] = dict() if 'voxel_cache' not in self._dc_intern: self._dc_intern['voxel_cache'] = dict() if voxeldata_path is not None: old_p = self._dc_intern['meta']['voxeldata_path'] new_p = voxeldata_path if old_p != new_p: log_backend.warn('Overwriting `voxeldata_path` in `VoxelStorageDyn` object (stored at "{}") ' 'from `{}` to `{}`.'.format(inp, old_p, new_p)) self._dc_intern['meta']['voxeldata_path'] = voxeldata_path voxeldata_path = self._dc_intern['meta']['voxeldata_path'] if voxel_mode: if voxeldata_path is None: msg = '`voxel_mode` is True but no path to voxeldata given / found.' log_backend.error(msg) raise ValueError(msg) kd = kd_factory(voxeldata_path) self.voxeldata = kd self._cache_dc = VoxelStorageLazyLoading(inp.replace('.pkl', '.npz')) def __setitem__(self, key: int, value: Any): """ Sets the value for a given key in the storage. If `voxel_mode` is True, a RuntimeError is raised. Otherwise, the superclass's `__setitem__` method is called with the key and value as arguments. Args: key (int): The key for the value. value (Any): The value to be set. """ if self.voxel_mode: raise RuntimeError('`VoxelStorageDyn.__setitem__` may only be used when `voxel_mode=False`.') else: return super().__setitem__(key, value) def __getitem__(self, item: int): """ Gets the voxel mask offset for a given item from the storage. Args: item (int): The item to get the voxel mask offset for. Returns: The voxel mask offset for the item. """ return self.get_voxelmask_offset(item)
[docs] def get_voxelmask_offset(self, item: int, overlap: int = 0): """ Gets the voxel mask offset for a given item from the storage. If `voxel_mode` is True, the voxel mask offset is calculated for each bounding box of the item. Otherwise, the superclass's `__getitem__` method is called with the item as the argument. Args: item (int): The item to get the voxel mask offset for. overlap (int, optional): The overlap for the voxel mask. Defaults to 0. Returns: The voxel mask offset for the item. """ if self.voxel_mode: res = [] bbs = super().__getitem__(item) for bb in bbs: # iterate over all bounding boxes size = bb[1] - bb[0] + 2 * overlap off = bb[0] - overlap curr_mask = self.voxeldata.load_seg(size=size, offset=off, mag=1) == item res.append(curr_mask.swapaxes(0, 2)) return res, bbs[:, 0] # (N, 3) --> all offset else: return super().__getitem__(item)
[docs] def iter_voxelmask_offset(self, item: int, overlap: int = 0) -> Iterator[Tuple[np.ndarray, np.ndarray]]: """ Returns an iterator over the voxel mask offsets for a given item from the storage. For each bounding box of the item, the voxel mask offset is calculated and yielded. Args: item (int): The item to iterate over the voxel mask offsets for. overlap (int, optional): The overlap for the voxel mask. Defaults to 0. Returns: An iterator over the voxel mask offsets for the item. """ bbs = super().__getitem__(item) for bb in bbs: # iterate over all bounding boxes size = bb[1] - bb[0] + 2 * overlap off = bb[0] - overlap curr_mask = self.voxeldata.load_seg(size=size, offset=off, mag=1) == item yield curr_mask.swapaxes(0, 2), bb[0]
[docs] def object_size(self, item): """ Gets the size of an object from the storage. If `voxel_mode` is False, a warning is logged. If the item is not in the internal dictionary, a KeyError is raised. Otherwise, the size of the object is returned. Args: item: The object to get the size for. Returns: The size of the object. """ if not self.voxel_mode: log_backend.warn('`object_size` sould only be called during `voxel_mode=True`.') if item not in self._dc_intern: raise KeyError('KeyError: Could not find key "{}" in `self._dc_intern`.`'.format(item)) return self._dc_intern['size'][item]
[docs] def increase_object_size(self, item, value): """ Increases the size of an object in the storage by a given value. If `voxel_mode` is True, a warning is logged. The size of the object in the internal dictionary is increased by the value. Args: item: The object to increase the size for. value: The value to increase the size by. """ if self.voxel_mode: log_backend.warn('`increase_object_size` sould only be called when `voxel_mode=False`.') self._dc_intern['size'][item] += value
[docs] def object_repcoord(self, item): """ Gets the representative coordinate of an object from the storage. If `voxel_mode` is False, a warning is logged. If the item is not in the internal dictionary, a KeyError is raised. Otherwise, the representative coordinate of the object is returned. Args: item: The object to get the representative coordinate for. Returns: The representative coordinate of the object. """ if not self.voxel_mode: log_backend.warn('`object_repcoord` sould only be called when `voxel_mode=True`.') if item not in self._dc_intern: raise KeyError('KeyError: Could not find key "{}" in `self._dc_intern`.`'.format(item)) return self._dc_intern['rep_coord'][item]
[docs] def set_object_repcoord(self, item, value): """ Sets the representative coordinate of an object in the storage to a given value. If `voxel_mode` is True, a warning is logged. The representative coordinate of the object in the internal dictionary is set to the value. Args: item: The object to set the representative coordinate for. value: The value to set the representative coordinate to. """ if self.voxel_mode: log_backend.warn('`set_object_repcoord` sould only be called when `voxel_mode=False`.') self._dc_intern['rep_coord'][item] = value
[docs] def push(self): """ Pushes the changes to the storage. If the `_cache_dc` attribute has more than 0 items, its `push` method is called. The superclass's `push` method is then called. """ if len(self._cache_dc) > 0: self._cache_dc.push() super().push()
[docs] def set_voxel_cache(self, key: int, voxel_coords: np.ndarray): """ Sets the voxel cache for a given key in the storage to a given array of voxel coordinates. This method operates independently of the `__setitem__` method. The voxel coordinates are set for the key in the `_cache_dc` attribute. Args: key (int): The key to set the voxel cache for. voxel_coords (np.ndarray): The array of voxel coordinates to set the voxel cache to. """ self._cache_dc[key] = voxel_coords
[docs] def get_voxel_cache(self, key: int): """ Gets the voxel cache for a given key from the storage. The voxel cache must have been added to the store via the `set_voxel_cache` method. This implementation operates independently of the `get_voxeldata` method. The voxel cache for the key is returned from the `_cache_dc` attribute. Args: key (int): Segment ID to get the voxel cache for. Returns: The voxel cache for the key. """ return self._cache_dc[key]
[docs] def get_voxeldata(self, item: int) -> Tuple[List[np.ndarray], List[np.ndarray]]: """ Gets the voxel data for a given item from the storage. The voxel data is a list of 3D binary masks with the respective offsets (in voxels). The old `voxel_mode` is stored and `voxel_mode` is set to True. If `voxeldata_path` is None, a ValueError is raised. `voxeldata_path` is used to create a KnossosDataset which is set as the `voxeldata` attribute. The voxel data for the item is returned and `voxel_mode` is set back to the old `voxel_mode`. Args: item (int): The item to get the voxel data for. This is the object ID. Returns: The voxel data for the item. This is a list of 3D binary masks and offsets (in voxels; xyz). """ old_vx_mode = self.voxel_mode self.voxel_mode = True if self._dc_intern['meta']['voxeldata_path'] is None: msg = '`voxel_mode` is True but no path to' \ ' voxeldata given / found.' log_backend.error(msg) raise ValueError(msg) kd = kd_factory(self._dc_intern['meta']['voxeldata_path']) self.voxeldata = kd res = self[item] self.voxel_mode = old_vx_mode return res
[docs] def get_voxel_data_cubed(self, item: int) -> Tuple[np.ndarray, np.ndarray]: """ Gets the voxel data for a given item from the storage as a dense 3D array. The voxel data and the minimum offset are obtained for the item. The size is calculated as the maximum extent minus the minimum offset. A voxel array of zeros with the size as the shape is created. For each binary array and offset, the corresponding slice of the voxel array is set to the binary array. The voxel array and the minimum offset are returned. Args: item (int): The item to get the voxel data for. This is the Object ID. Returns: The voxel data for the item as a dense 3D array and the minimum offset, which is the cube offset in voxels (xyz). """ bin_arrs, block_offsets = self[item] min_off = np.min(block_offsets, axis=0) block_extents = np.array([off + np.array(bin_arr.shape) for bin_arr, off in zip(bin_arrs, block_offsets)], dtype=np.int32) max_extent = np.max(block_extents, axis=0) size = max_extent - min_off block_offsets -= min_off voxel_arr = np.zeros(size, dtype=np.bool) for bin_arr, off in zip(bin_arrs, block_offsets): sh = off + np.array(bin_arr.shape, dtype=np.int32) voxel_arr[off[0]:sh[0], off[1]:sh[1], off[2]:sh[2]] = bin_arr return voxel_arr, min_off
[docs] def get_boundingdata(self, item: int) -> List[np.ndarray]: """ Gets the bounding data for a given item from the storage. The old `voxel_mode` is stored and `voxel_mode` is set to False. The bounding data for the item is returned and `voxel_mode` is set back to the old `voxel_mode`. Args: item (int): The item to get the bounding data for. Object ID. Returns: The bounding data for the item. This is a list of bounding boxes (in voxels; xyz). """ old_vx_mode = self.voxel_mode self.voxel_mode = False res = self[item] self.voxel_mode = old_vx_mode return res
[docs] def keys(self): """ Gets the keys of the objects in the storage. The keys are the keys in the internal dictionary that are either strings that can be converted to integers or are not strings. Returns: The keys of the objects in the storage. """ # do not return 'meta' and other helper items in self._dc_intern, only object IDs # TODO: make this a generator, check usages beforehand! obj_elements = list([k for k in self._dc_intern.keys() if (type(k) is str and k.isdigit()) or (type(k) is not str)]) return obj_elements
[docs]class VoxelStorageLazyLoading: """ This class is a variant of the `VoxelStorage` class that uses lazy loading via numpy npz files. It is designed to handle the storage of voxel data in a memory-efficient manner. The class does not support modification of npz storages once written, similar to `VoxelStorage`. It also only supports integer keys, which are internally converted to strings as required by npz, and then always converted back to integers for external use (e.g. :attr:`~keys`). When opening an existing npz file, the `close` method should be called (:attr:`~close`). """ def __init__(self, path: str, overwrite: bool = False): """ Initializes the VoxelStorageLazyLoading object. It sets the path for the npz file and checks if the file already exists. If the file exists and overwrite is set to True, the existing file is removed. Otherwise, the data from the existing file is loaded. Args: path (str): The path for the npz file. overwrite (bool): If set to True, any existing file at the specified path will be removed. Defaults to False. """ if not path.endswith('.npz'): path = path + '.npz' self.path = path self._dc_intern = {} if os.path.isfile(path): if overwrite: os.remove(path) else: self.pull()
[docs] def pull(self): """ Loads the data from the npz file into the internal dictionary. """ self._dc_intern = np.load(self.path)
[docs] def push(self): """ Saves the data from the internal dictionary into a compressed npz file. """ np.savez_compressed(self.path, **self._dc_intern)
def __setitem__(self, key: int, value: np.ndarray): """ Sets the value for a given key in the internal dictionary. The key is converted to a string as npz only allows string keys. Args: key (int): Segment ID. The key for which the value should be set. value (np.ndarray): Voxel coordinates. The value to be set for the given key. """ # npz only allows string keys self._dc_intern[str(key)] = value def __getitem__(self, item: int) -> np.ndarray: """ Retrieves the voxel coordinates for a given supervoxel ID from the internal dictionary. The ID is converted to a string as npz only allows string keys. Args: item (int): The supervoxel ID for which the voxel coordinates should be retrieved. Returns: np.ndarray: The voxel coordinates corresponding to the given supervoxel ID. """ # npz only allows string keys return self._dc_intern[str(item)] def __contains__(self, item: int) -> bool: """ Checks if a given key is present in the internal dictionary. The key is converted to a string as npz only allows string keys. Args: item (int): The key to check. Returns: bool: True if the key is in the internal dictionary, False otherwise. """ return str(item) in self._dc_intern def __len__(self): """ Returns the number of items in the internal dictionary. Returns: int: The number of items in the internal dictionary. """ return len(self._dc_intern)
[docs] def keys(self): """ Returns a generator that yields the keys in the internal dictionary. The keys are converted back to integers for external use. Returns: generator: A generator that yields the keys in the internal dictionary. """ for k in self._dc_intern.keys(): yield int(k)
[docs] def close(self): """ Closes the npz file if it is open. """ if isinstance(self._dc_intern, np.lib.npyio.NpzFile): self._dc_intern.close()
[docs]class MeshStorage(StorageClass): """ This class is a customized dictionary designed to store compressed numpy arrays. It provides an intuitive user interface where compression happens in the background. It also has an option, 'cache_decomp', to cache decompressed arrays for faster access and saving decompressing time. This class is primarily used for storing mesh data. """ def __init__(self, inp, load_colarr=False, compress=False, **kwargs): """ Initializes the MeshStorage object. It sets the input path, whether to load color arrays, and whether to compress the data. Args: inp: The input path for the data. load_colarr (bool): If set to True, color arrays will be loaded. Defaults to False. compress (bool): If set to True, the data will be compressed. Defaults to False. **kwargs: Additional keyword arguments. """ self.load_colarr = load_colarr self.compress = compress super().__init__(inp, **kwargs) def __getitem__(self, item: Union[int, str]) -> List[np.ndarray]: """ Retrieves the mesh data for a given key. The data includes indices, vertices, normals, and colors/labels. Args: item (Union[int, str]): The key for which the data should be retrieved. Returns: List[np.ndarray]: A list of numpy arrays containing the mesh data. This includes indices, vertices, and optionally normals and colors/labels. """ try: return self._cache_dc[item] except KeyError: pass mesh = self._dc_intern[item] # if no normals were given in file / cache append empty array if len(mesh) == 2: mesh.append([""]) # if no colors/labels were given in file / cache append empty array if len(mesh) == 3: mesh.append([""]) decomp_arrs = [lz4string_listtoarr(mesh[0], dtype=np.uint32), lz4string_listtoarr(mesh[1], dtype=np.float32), lz4string_listtoarr(mesh[2], dtype=np.float32), lz4string_listtoarr(mesh[3], dtype=np.uint8)] if not self.load_colarr: decomp_arrs = decomp_arrs[:3] if self._cache_decomp: self._cache_dc[item] = decomp_arrs return decomp_arrs def __setitem__(self, key: int, mesh: List[np.ndarray]): """ Sets the mesh data for a given key. The data includes indices, vertices, normals, and colors/labels. Args: key (int/str): The key for which the data should be set. mesh (List[np.ndarray]): A list of numpy arrays particularly containing [indices, vertices, normals, colors/labels]. """ if len(mesh) == 2: mesh.append(np.zeros((0,), dtype=np.float32)) if len(mesh) == 3: mesh.append(np.zeros((0,), dtype=np.uint8)) if self._cache_decomp: self._cache_dc[key] = mesh if len(mesh[1]) != len(mesh[2]) > 0: log_backend.warning('Lengths of vertex array and length of normal' ' array differ!') # test if lengths of vertex and color array are identical or test # if vertex array length is equal to 3x label array length. Arrays are flattened. if len(mesh[3]) > 0 and not (len(mesh[1]) == len(mesh[3]) or len(mesh[1]) == len(mesh[3]) * 3): log_backend.warning('Lengths of vertex array and length of color/' 'label array differ!') if self.compress: transf = arrtolz4string_list else: def transf(x): return x comp_ind = transf(mesh[0].astype(dtype=np.uint32)) comp_vert = transf(mesh[1].astype(dtype=np.float32)) comp_norm = transf(mesh[2].astype(dtype=np.float32)) comp_col = transf(mesh[3].astype(dtype=np.uint8)) self._dc_intern[key] = [comp_ind, comp_vert, comp_norm, comp_col]
[docs]class SkeletonStorage(StorageClass): """ This class is designed to store skeleton dictionaries as compressed numpy arrays. The keys of the dictionaries are "nodes", "diameters", and "edges". This class is primarily used for storing skeleton data. """ def __init__(self, inp, **kwargs): """ Initializes the SkeletonStorage object. It sets the input path for the data. Args: inp: The input path for the data. **kwargs: Additional keyword arguments. """ super().__init__(inp, **kwargs) def __getitem__(self, item): """ Retrieves the skeleton data for a given key. The data is a dictionary with keys "nodes", "diameters", and "edges". Args: item: The key for which the data should be retrieved. Can be an int or str. Returns: dict: A dictionary containing the skeleton data. """ try: return self._cache_dc[item] except KeyError: pass comp_arrs = self._dc_intern[item] skeleton = {"nodes": lz4string_listtoarr(comp_arrs[0], dtype=np.uint32), "diameters": lz4string_listtoarr(comp_arrs[1], dtype=np.float32), "edges": lz4string_listtoarr(comp_arrs[2], dtype=np.uint32)} if len(comp_arrs) > 3: for k, v in comp_arrs[3].items(): skeleton[k] = v if self._cache_decomp: self._cache_dc[item] = skeleton return skeleton def __setitem__(self, key, skeleton): """ Sets the skeleton data for a given key. The data is a dictionary with keys "nodes", "diameters", "edges", and other attributes (uncompressed). Args: key (int/str): The key for which the data should be set. skeleton (dict): A dictionary containing the skeleton data. """ if self._cache_decomp: self._cache_dc[key] = skeleton comp_n = arrtolz4string_list(skeleton["nodes"].astype(dtype=np.uint32)) comp_d = arrtolz4string_list(skeleton["diameters"].astype(dtype=np.float32)) comp_e = arrtolz4string_list(skeleton["edges"].astype(dtype=np.uint32)) entry = [comp_n, comp_d, comp_e, dict()] if len(skeleton) > 3: for k, v in skeleton.items(): if k in ['nodes', 'diameters', 'edges']: continue entry[3][k] = v self._dc_intern[key] = entry
[docs]class BinarySearchStore: """ A data structure to store properties (values) of a corresponding ID array (keys). This class uses a binary search internally, which uses a sorted representation of keys and values to enable sparse look-ups with a much lower memory complexity than python dictionaries. The maximum ID is the last element of the id_array attribute. Args: fname (str): The file name. id_array (Optional[np.ndarray]): An unsorted ID array. attr_arrays (Optional[Dict[str, np.ndarray]]): Unsorted attribute arrays, must have the same ordering as the ID array. overwrite (bool): If True, overwrite existing array files. Defaults to False. n_shards (Optional[int]): The number of shards/chunks the ID and attribute arrays are split into. Defaults to None. rdcc_nbytes (int): The size of h5 chunks in bytes. Default is 5 MiB. """ def __init__(self, fname: str, id_array: Optional[np.ndarray] = None, attr_arrays: Optional[Dict[str, np.ndarray]] = None, overwrite: bool = False, n_shards: Optional[int] = None, rdcc_nbytes: int = 5*2**20): """ Data structure to store properties (values) of a corresponding ID array (keys). Internally a binary search is used that uses a sorted representation of keys and values to enable sparse look-ups with a much lower memory complexity than python dictionaries. Maximum ID is the last element of :attr:`~id_array`. Args: fname: File name. id_array: (Unsorted) ID array. attr_arrays: (Unsorted) attribute arrays, must have the same ordering as ID array. overwrite: Overwrite existing array files. n_shards: Number of shards/chunks the ID and attribute arrays are split into. Defaults to 5. rdcc_nbytes: Size of h5 chunks in bytes. Default is 5 MiB. """ self.fname = fname self._h5_file = None if id_array is not None: if attr_arrays is None: raise ValueError('ID array is given, but no attribute array(s).') if isinstance(fname, str) and os.path.isfile(fname): if not overwrite: raise FileExistsError(f'BinarySearchStore at "{fname}" already exists and overwrite is False."') else: os.remove(fname) if n_shards is None: n_shards = 5 if isinstance(fname, str): os.makedirs(os.path.split(self.fname)[0], exist_ok=True) # sort keys / ID array ixs = np.argsort(id_array) id_array = id_array[ixs] bucket_ranges = [] h5_file = h5py.File(fname, 'w', libver='latest', rdcc_nbytes=rdcc_nbytes) grp = h5_file.create_group("ids") for ii, id_sub in enumerate(np.array_split(id_array, n_shards)): bucket_ranges.append((id_sub[0], id_sub[-1])) grp.create_dataset(f'{ii}', data=id_sub) for k, v in attr_arrays.items(): v_sorted = v[ixs] grp = h5_file.create_group(k) grp.attrs['shape'] = v_sorted.shape grp.attrs['dtype'] = np.dtype(v_sorted.dtype).str for ii, attr_sub in enumerate(np.array_split(v_sorted, n_shards)): grp.create_dataset(f'{ii}', data=attr_sub) del ixs h5_file.attrs['bucket_ranges'] = bucket_ranges h5_file.close() else: if isinstance(fname, str) and not os.path.isfile(fname): raise FileNotFoundError(f'Could not find BinarySearchStore at "{self.fname}".') @property def n_shards(self) -> int: """ Returns the number of shards/chunks the ID and attribute arrays are split into. Returns: int: The number of shards. """ with h5py.File(self.fname, 'r', libver='latest') as f: n_shards = len(f.attrs['bucket_ranges']) return n_shards @property def id_array(self) -> np.ndarray: """ Returns the flat ID array. Returns: np.ndarray: The flat ID array. """ ids = [] with h5py.File(self.fname, 'r', libver='latest') as f: for bucket_id in range(len(f.attrs['bucket_ranges'])): ids.append(f[f'ids/{bucket_id}'][()]) return np.concatenate(ids) def _get_bucket_ids(self, obj_ids: np.ndarray) -> np.ndarray: """ Returns the bucket IDs for the given object IDs. Args: obj_ids (np.ndarray): The object IDs to get the bucket IDs for. Returns: np.ndarray: The bucket IDs. """ bucket_ids = np.ones(obj_ids.shape, dtype=np.int32) * -1 for ii, bucket_range in enumerate(self._h5_file.attrs['bucket_ranges']): bucket_ids[(bucket_range[0] <= obj_ids) & (obj_ids <= bucket_range[1])] = ii if -1 in bucket_ids: raise ValueError(f'IDs {obj_ids[bucket_ids == -1]} not in {self.fname}.') return bucket_ids
[docs] def get_attributes(self, obj_ids: np.ndarray, attr_key: str) -> np.ndarray: """ Queries attributes of the given object IDs. Note that this will not raise an Exception if an ID does not exist in the store, as the lookup uses binary search. Args: obj_ids (np.ndarray): The object IDs to query. attr_key (str): The value type obtained from the store. Returns: np.ndarray: The value array. """ self._h5_file = h5py.File(self.fname, 'r', libver='latest') if attr_key not in self._h5_file.keys(): raise KeyError(f'Key "{attr_key}" does not exist.') bucket_ids = self._get_bucket_ids(obj_ids) grp = self._h5_file[f'{attr_key}'] sh = [len(obj_ids)] if len(grp.attrs['shape']) > 1: sh += list(grp.attrs['shape'])[1:] data = np.zeros(sh, dtype=grp.attrs['dtype']) for bucket_id in np.unique(bucket_ids): ids = self._h5_file[f'ids/{bucket_id}'][()] bucket_mask = bucket_ids == bucket_id queries = obj_ids[bucket_mask] ixs_sort = np.argsort(queries) indices = np.searchsorted(ids, queries[ixs_sort]) d = grp[f'{bucket_id}'][list(indices)] # undo sorting using argsort of argsort to match slicing mask on the left data[bucket_mask] = d[np.argsort(ixs_sort)] self._h5_file.close() self._h5_file = None return data
[docs]def bss_get_attr_helper(args): """ A helper function to query attributes from a BinarySearchStore instance. Args: args: A tuple containing a BinarySearchStore instance, query IDs, and an attribute key. Returns: np.ndarray: The query result. """ bss, samples, key = args return bss.get_attributes(samples, key)