# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max Planck Institute of Neurobiology, Martinsried, Germany
# Authors: Philipp Schubert, Sven Dorkenwald, Joergen Kornfeld
import os.path
import shutil
from collections import defaultdict
from typing import Any, Tuple, Optional, Union, List, Iterator, Dict
from ..backend import StorageClass
from ..backend import log_backend
from ..handler.basics import kd_factory
from ..handler.compression import lz4string_listtoarr, arrtolz4string_list
import h5py
import numpy as np
try:
from lz4.block import compress, decompress
except ImportError:
from lz4 import compress, decompress
[docs]class AttributeDict(StorageClass):
"""
This class is a general-purpose dictionary class that inherits from the StorageClass.
It is used to store attributes of segmentation objects in the SyConn toolkit. The
attributes are stored in a dictionary format which allows for easy access and
manipulation of the data.
"""
def __init__(self, inp_p, **kwargs):
"""
Initializes the AttributeDict object.
Args:
inp_p: The input path where the dictionary is stored.
**kwargs: Additional keyword arguments.
"""
super().__init__(inp_p, **kwargs)
def __getitem__(self, item):
"""
Retrieves the value of the given item from the dictionary. If the item does not exist, it initializes
an empty dictionary for the item.
Args:
item: The key of the item to retrieve from the dictionary.
Returns:
The value of the item in the dictionary.
"""
try:
return self._dc_intern[item]
except KeyError:
self._dc_intern[item] = {}
return self._dc_intern[item]
def __setitem__(self, key, value):
"""
Sets the value of the given key in the dictionary.
Args:
key: The key of the item to set in the dictionary.
value: The value to set for the key in the dictionary.
"""
self._dc_intern[key] = value
[docs] def update(self, other, **kwargs):
"""
Updates the dictionary with the key-value pairs from another dictionary.
Args:
other: The other dictionary to update from.
**kwargs: Additional keyword arguments.
"""
self._dc_intern.update(other, **kwargs)
[docs] def copy_intern(self):
"""
Creates a copy of the internal dictionary.
Returns:
A copy of the internal dictionary.
"""
return dict(self._dc_intern)
[docs]class CompressedStorage(StorageClass):
"""
This class is a customized dictionary used to store compressed numpy arrays. The compression and
decompression processes happen in the background, providing an intuitive user interface. It also has an
optional 'cache_decomp' kwarg to cache decompressed arrays for faster access when items are frequently
accessed.
"""
def __init__(self, inp: str, **kwargs):
"""
Initializes the CompressedStorage object.
Args:
inp: The input path where the dictionary is stored.
**kwargs: Additional keyword arguments.
"""
super().__init__(inp, **kwargs)
def __getitem__(self, item: Union[int, str]):
"""
Retrieves the decompressed array of the given item from the dictionary. If the item does not exist in
the cache, it decompresses the array and stores it in the cache if cache_decomp is enabled.
Args:
item: The key of the item to retrieve from the dictionary.
Returns:
The decompressed array of the item.
"""
try:
return self._cache_dc[item]
except KeyError:
pass
value_intern = self._dc_intern[item]
sh = value_intern["sh"]
dt = np.dtype(value_intern["dt"])
decomp_arr = lz4string_listtoarr(value_intern["arr"], dtype=dt, shape=sh)
if self._cache_decomp:
self._cache_dc[item] = decomp_arr
return decomp_arr
def __setitem__(self, key: Union[int, str], value: np.ndarray):
"""
Sets the value of the given key in the dictionary. The value is compressed and stored in the dictionary.
If cache_decomp is enabled, the original value is also stored in the cache.
Args:
key: The key of the item to set in the dictionary.
value: The numpy array to set for the key in the dictionary.
"""
if type(value) is not np.ndarray:
msg = "CompressedStorage supports np.array values only."
log_backend.error(msg)
raise ValueError(msg)
if self._cache_decomp:
self._cache_dc[key] = value
sh = list(value.shape)
sh[0] = -1
value_intern = {"arr": arrtolz4string_list(value), "sh": tuple(sh),
"dt": value.dtype.str}
self._dc_intern[key] = value_intern
def __delitem__(self, key):
"""
Deletes the given key and its value from the dictionary and the cache.
Args:
key: The key of the item to delete from the dictionary.
"""
del self._dc_intern[key]
if key in self._cache_dc:
del self._cache_dc[key]
[docs]class VoxelStorageL(StorageClass):
"""
This class is a customized dictionary used to store compressed numpy arrays
representing voxel masks. The compression and decompression processes happen
in the background, providing an intuitive user interface. It also has an
optional feature, 'cache_decomp', to cache decompressed arrays for faster
access, saving decompressing time.
"""
def __init__(self, inp: str, **kwargs):
"""
Initializes the VoxelStorageL object.
Args:
inp: The input path where the dictionary is stored.
**kwargs: Additional keyword arguments.
"""
super().__init__(inp, **kwargs)
def __getitem__(self, item: Union[int, str]):
"""
Retrieves the decompressed voxel masks and corresponding offsets of the given item
from the dictionary. If the item does not exist in the cache, it decompresses the
voxel masks and stores them in the cache if cache_decomp is enabled.
Args:
item: The key of the item to retrieve from the dictionary.
Returns:
The decompressed voxel masks and corresponding offsets of the item.
"""
try:
return self._cache_dc[item], self._dc_intern[item]["off"]
except KeyError:
pass
value_intern = self._dc_intern[item]
dt = np.dtype(value_intern["dt"])
sh = value_intern["sh"]
offsets = value_intern["off"]
comp_arrs = value_intern["arr"]
decomp_arrs = []
for i in range(len(sh)):
decomp_arrs.append(lz4string_listtoarr(comp_arrs[i], dt, sh[i]))
if self._cache_decomp:
self._cache_dc[item] = decomp_arrs
return decomp_arrs, offsets
def __setitem__(self, key: Union[int, str],
values: Tuple[List[np.ndarray], List[np.ndarray]]):
"""
Sets the voxel masks and corresponding offsets of the given key in the
dictionary. The voxel masks are compressed and stored in the dictionary.
If cache_decomp is enabled, the original voxel masks are also stored in
the cache.
Args:
key: The key of the item to set in the dictionary. E.g. SO ID.
values: The voxel masks and corresponding offsets to set for the key
in the dictionary. E.g. voxel masks.
"""
voxel_masks, offsets = values
assert np.all([voxel_masks[0].dtype == v.dtype for v in voxel_masks])
assert len(voxel_masks) == len(offsets)
if self._cache_decomp:
self._cache_dc[key] = voxel_masks
sh = [v.shape for v in voxel_masks]
for i in range(len(sh)):
curr_sh = list(sh[i])
curr_sh[0] = -1
sh[i] = curr_sh
value_intern = {"arr": [arrtolz4string_list(v) for v in voxel_masks],
"sh": sh, "dt": voxel_masks[0].dtype.str,
"off": offsets}
self._dc_intern[key] = value_intern
[docs] def append(self, key: int, voxel_mask: np.ndarray, offset: np.ndarray):
"""
Appends a voxel mask and its corresponding offset to the given key in the dictionary. The voxel mask is
compressed and stored in the dictionary.
Args:
key: The key of the item to append to in the dictionary.
voxel_mask: The voxel mask to append.
offset: The corresponding offset to append.
"""
value_intern = self._dc_intern[key]
dt = np.dtype(value_intern["dt"])
sh = value_intern["sh"]
offsets = value_intern["off"] + [offset]
comp_arrs = value_intern["arr"]
assert dt == voxel_mask.dtype.str
curr_sh = list(voxel_mask.shape)
curr_sh[0] = -1
sh.append(curr_sh)
value_intern = {"arr": comp_arrs + [arrtolz4string_list(voxel_mask)],
"sh": sh, "dt": dt, "off": offsets}
self._dc_intern[key] = value_intern
[docs]def VoxelStorage(inp, **kwargs):
"""
This function is an alias for the VoxelStorageDyn class. It initializes a
VoxelStorageDyn object and returns it.
Args:
inp: The input path where the dictionary is stored.
**kwargs: Additional keyword arguments.
Returns:
A VoxelStorageDyn object.
"""
obj = VoxelStorageDyn(inp, **kwargs)
return obj
[docs]class VoxelStorageClass(VoxelStorageL):
"""
This class is a customized dictionary used to store compressed numpy arrays representing voxel masks. The
compression and decompression processes happen in the background, providing an intuitive user interface.
It also has an optional feature, 'cache_decomp', to cache decompressed arrays for faster access. This class
does not provide any locking mechanism.
"""
def __init__(self, inp: str, **kwargs):
"""
Initializes the VoxelStorageClass object.
Args:
inp: The input path where the dictionary is stored.
**kwargs: Additional keyword arguments.
"""
if "disable_locking" in kwargs:
assert kwargs["disable_locking"], "Locking must be disabled " \
"in this class. Use VoxelDictL " \
"to enable locking."
super(VoxelStorageL, self).__init__(inp, **kwargs)
[docs]class VoxelStorageDyn(CompressedStorage):
"""
A class that extends the CompressedStorage class to provide a storage mechanism for voxel data. This class does not
store the voxels explicitly, but the information necessary to query the voxels of an object. It operates in two modes
depending on the `voxel_mode` flag. If `voxel_mode` is True, the getter method operates on the underlying data set to
retrieve voxels of an object and the `__setitem__` method throws a `RuntimeError`. The `__getitem__` method will return
a list of 3D binary cubes with ones at the object's locations (key: object ID). Note: The item ID has to match the
object ID in the segmentation. If `voxel_mode` is False, `__getitem__` and `__setitem__` allow manipulation of the
object's bounding boxes. In this case, `voxeldata_path` has to be given or already be existent in the loaded dictionary.
The `__setitem__` method requires the object ID as key and a 3D array with all bounding boxes defining the object (N, 2, 3).
Those bounding boxes are then used to query the object voxels. The bounding box is expected to be two 3D coordinates which
define the lower and the upper limits.
"""
def __init__(self, inp: str, voxel_mode: bool = True,
voxeldata_path: Optional[str] = None, **kwargs):
"""
Initializes the VoxelStorageDyn object. If the input does not end with '.pkl', '.pkl' is appended to it. The
`voxel_mode` and `voxeldata_path` are set according to the arguments. If 'meta' is not in the internal dictionary,
it is added with `voxeldata_path` as its value. If 'size' is not in the internal dictionary, it is added as a
defaultdict with int as the default factory. If 'rep_coord' is not in the internal dictionary, it is added as an
empty dictionary. If 'voxel_cache' is not in the internal dictionary, it is added as an empty dictionary. If
`voxeldata_path` is not None, it is used to overwrite the 'voxeldata_path' in the internal dictionary's 'meta'. If
`voxel_mode` is True and `voxeldata_path` is None, a ValueError is raised. If `voxel_mode` is True, `voxeldata_path`
is used to create a KnossosDataset which is set as the `voxeldata` attribute. The `_cache_dc` attribute is set as a
VoxelStorageLazyLoading object with the input (with '.pkl' replaced by '.npz') as the argument.
Args:
inp (str): The input path for the storage.
voxel_mode (bool, optional): The mode of operation. Defaults to True.
voxeldata_path (str, optional): The path to the voxel data. Defaults to None.
**kwargs: Arbitrary keyword arguments.
"""
if not inp.endswith('.pkl'):
inp = inp + '.pkl'
super().__init__(inp, **kwargs)
self.voxel_mode = voxel_mode
if 'meta' not in self._dc_intern:
# add meta information about underlying voxel data set to internal dictionary
self._dc_intern['meta'] = dict(voxeldata_path=voxeldata_path)
if 'size' not in self._dc_intern:
self._dc_intern['size'] = defaultdict(int)
if 'rep_coord' not in self._dc_intern:
self._dc_intern['rep_coord'] = dict()
if 'voxel_cache' not in self._dc_intern:
self._dc_intern['voxel_cache'] = dict()
if voxeldata_path is not None:
old_p = self._dc_intern['meta']['voxeldata_path']
new_p = voxeldata_path
if old_p != new_p:
log_backend.warn('Overwriting `voxeldata_path` in `VoxelStorageDyn` object (stored at "{}") '
'from `{}` to `{}`.'.format(inp, old_p, new_p))
self._dc_intern['meta']['voxeldata_path'] = voxeldata_path
voxeldata_path = self._dc_intern['meta']['voxeldata_path']
if voxel_mode:
if voxeldata_path is None:
msg = '`voxel_mode` is True but no path to voxeldata given / found.'
log_backend.error(msg)
raise ValueError(msg)
kd = kd_factory(voxeldata_path)
self.voxeldata = kd
self._cache_dc = VoxelStorageLazyLoading(inp.replace('.pkl', '.npz'))
def __setitem__(self, key: int, value: Any):
"""
Sets the value for a given key in the storage. If `voxel_mode` is True, a RuntimeError is raised. Otherwise, the
superclass's `__setitem__` method is called with the key and value as arguments.
Args:
key (int): The key for the value.
value (Any): The value to be set.
"""
if self.voxel_mode:
raise RuntimeError('`VoxelStorageDyn.__setitem__` may only be used when `voxel_mode=False`.')
else:
return super().__setitem__(key, value)
def __getitem__(self, item: int):
"""
Gets the voxel mask offset for a given item from the storage.
Args:
item (int): The item to get the voxel mask offset for.
Returns:
The voxel mask offset for the item.
"""
return self.get_voxelmask_offset(item)
[docs] def get_voxelmask_offset(self, item: int, overlap: int = 0):
"""
Gets the voxel mask offset for a given item from the storage. If `voxel_mode` is True, the voxel mask offset is
calculated for each bounding box of the item. Otherwise, the superclass's `__getitem__` method is called with the
item as the argument.
Args:
item (int): The item to get the voxel mask offset for.
overlap (int, optional): The overlap for the voxel mask. Defaults to 0.
Returns:
The voxel mask offset for the item.
"""
if self.voxel_mode:
res = []
bbs = super().__getitem__(item)
for bb in bbs: # iterate over all bounding boxes
size = bb[1] - bb[0] + 2 * overlap
off = bb[0] - overlap
curr_mask = self.voxeldata.load_seg(size=size, offset=off, mag=1) == item
res.append(curr_mask.swapaxes(0, 2))
return res, bbs[:, 0] # (N, 3) --> all offset
else:
return super().__getitem__(item)
[docs] def iter_voxelmask_offset(self, item: int, overlap: int = 0) -> Iterator[Tuple[np.ndarray, np.ndarray]]:
"""
Returns an iterator over the voxel mask offsets for a given item from the storage. For each bounding box of the
item, the voxel mask offset is calculated and yielded.
Args:
item (int): The item to iterate over the voxel mask offsets for.
overlap (int, optional): The overlap for the voxel mask. Defaults to 0.
Returns:
An iterator over the voxel mask offsets for the item.
"""
bbs = super().__getitem__(item)
for bb in bbs: # iterate over all bounding boxes
size = bb[1] - bb[0] + 2 * overlap
off = bb[0] - overlap
curr_mask = self.voxeldata.load_seg(size=size, offset=off, mag=1) == item
yield curr_mask.swapaxes(0, 2), bb[0]
[docs] def object_size(self, item):
"""
Gets the size of an object from the storage. If `voxel_mode` is False, a warning is logged. If the item is not in
the internal dictionary, a KeyError is raised. Otherwise, the size of the object is returned.
Args:
item: The object to get the size for.
Returns:
The size of the object.
"""
if not self.voxel_mode:
log_backend.warn('`object_size` sould only be called during `voxel_mode=True`.')
if item not in self._dc_intern:
raise KeyError('KeyError: Could not find key "{}" in `self._dc_intern`.`'.format(item))
return self._dc_intern['size'][item]
[docs] def increase_object_size(self, item, value):
"""
Increases the size of an object in the storage by a given value. If `voxel_mode` is True, a warning is logged. The
size of the object in the internal dictionary is increased by the value.
Args:
item: The object to increase the size for.
value: The value to increase the size by.
"""
if self.voxel_mode:
log_backend.warn('`increase_object_size` sould only be called when `voxel_mode=False`.')
self._dc_intern['size'][item] += value
[docs] def object_repcoord(self, item):
"""
Gets the representative coordinate of an object from the storage. If `voxel_mode` is False, a warning is logged. If
the item is not in the internal dictionary, a KeyError is raised. Otherwise, the representative coordinate of the
object is returned.
Args:
item: The object to get the representative coordinate for.
Returns:
The representative coordinate of the object.
"""
if not self.voxel_mode:
log_backend.warn('`object_repcoord` sould only be called when `voxel_mode=True`.')
if item not in self._dc_intern:
raise KeyError('KeyError: Could not find key "{}" in `self._dc_intern`.`'.format(item))
return self._dc_intern['rep_coord'][item]
[docs] def set_object_repcoord(self, item, value):
"""
Sets the representative coordinate of an object in the storage to a given value. If `voxel_mode` is True, a warning
is logged. The representative coordinate of the object in the internal dictionary is set to the value.
Args:
item: The object to set the representative coordinate for.
value: The value to set the representative coordinate to.
"""
if self.voxel_mode:
log_backend.warn('`set_object_repcoord` sould only be called when `voxel_mode=False`.')
self._dc_intern['rep_coord'][item] = value
[docs] def push(self):
"""
Pushes the changes to the storage. If the `_cache_dc` attribute has more than 0 items, its `push` method is called.
The superclass's `push` method is then called.
"""
if len(self._cache_dc) > 0:
self._cache_dc.push()
super().push()
[docs] def set_voxel_cache(self, key: int, voxel_coords: np.ndarray):
"""
Sets the voxel cache for a given key in the storage to a given array of voxel coordinates. This method operates
independently of the `__setitem__` method. The voxel coordinates are set for the key in the `_cache_dc` attribute.
Args:
key (int): The key to set the voxel cache for.
voxel_coords (np.ndarray): The array of voxel coordinates to set the voxel cache to.
"""
self._cache_dc[key] = voxel_coords
[docs] def get_voxel_cache(self, key: int):
"""
Gets the voxel cache for a given key from the storage. The voxel cache must have been added to the store via the
`set_voxel_cache` method. This implementation operates independently of the `get_voxeldata` method. The voxel cache
for the key is returned from the `_cache_dc` attribute.
Args:
key (int): Segment ID to get the voxel cache for.
Returns:
The voxel cache for the key.
"""
return self._cache_dc[key]
[docs] def get_voxeldata(self, item: int) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""
Gets the voxel data for a given item from the storage. The voxel data is a list of 3D binary masks with the
respective offsets (in voxels). The old `voxel_mode` is stored and `voxel_mode` is set to True. If
`voxeldata_path` is None, a ValueError is raised. `voxeldata_path` is used to create a KnossosDataset which is set
as the `voxeldata` attribute. The voxel data for the item is returned and `voxel_mode` is set back to the old
`voxel_mode`.
Args:
item (int): The item to get the voxel data for. This is the object ID.
Returns:
The voxel data for the item. This is a list of 3D binary masks and offsets (in voxels; xyz).
"""
old_vx_mode = self.voxel_mode
self.voxel_mode = True
if self._dc_intern['meta']['voxeldata_path'] is None:
msg = '`voxel_mode` is True but no path to' \
' voxeldata given / found.'
log_backend.error(msg)
raise ValueError(msg)
kd = kd_factory(self._dc_intern['meta']['voxeldata_path'])
self.voxeldata = kd
res = self[item]
self.voxel_mode = old_vx_mode
return res
[docs] def get_voxel_data_cubed(self, item: int) -> Tuple[np.ndarray, np.ndarray]:
"""
Gets the voxel data for a given item from the storage as a dense 3D array. The voxel data and the minimum offset
are obtained for the item. The size is calculated as the maximum extent minus the minimum offset. A voxel array of
zeros with the size as the shape is created. For each binary array and offset, the corresponding slice of the voxel
array is set to the binary array. The voxel array and the minimum offset are returned.
Args:
item (int): The item to get the voxel data for. This is the Object ID.
Returns:
The voxel data for the item as a dense 3D array and the minimum offset, which is the cube offset in voxels (xyz).
"""
bin_arrs, block_offsets = self[item]
min_off = np.min(block_offsets, axis=0)
block_extents = np.array([off + np.array(bin_arr.shape) for bin_arr, off in zip(bin_arrs, block_offsets)],
dtype=np.int32)
max_extent = np.max(block_extents, axis=0)
size = max_extent - min_off
block_offsets -= min_off
voxel_arr = np.zeros(size, dtype=np.bool)
for bin_arr, off in zip(bin_arrs, block_offsets):
sh = off + np.array(bin_arr.shape, dtype=np.int32)
voxel_arr[off[0]:sh[0], off[1]:sh[1], off[2]:sh[2]] = bin_arr
return voxel_arr, min_off
[docs] def get_boundingdata(self, item: int) -> List[np.ndarray]:
"""
Gets the bounding data for a given item from the storage. The old `voxel_mode` is stored and
`voxel_mode` is set to False. The bounding data for the item is returned and `voxel_mode` is
set back to the old `voxel_mode`.
Args:
item (int): The item to get the bounding data for. Object ID.
Returns:
The bounding data for the item. This is a list of bounding boxes (in voxels; xyz).
"""
old_vx_mode = self.voxel_mode
self.voxel_mode = False
res = self[item]
self.voxel_mode = old_vx_mode
return res
[docs] def keys(self):
"""
Gets the keys of the objects in the storage. The keys are the keys in the internal dictionary that are either
strings that can be converted to integers or are not strings.
Returns:
The keys of the objects in the storage.
"""
# do not return 'meta' and other helper items in self._dc_intern, only object IDs
# TODO: make this a generator, check usages beforehand!
obj_elements = list([k for k in self._dc_intern.keys() if (type(k) is str and k.isdigit())
or (type(k) is not str)])
return obj_elements
[docs]class VoxelStorageLazyLoading:
"""
This class is a variant of the `VoxelStorage` class that uses lazy loading via numpy npz files. It is designed
to handle the storage of voxel data in a memory-efficient manner. The class does not support modification of
npz storages once written, similar to `VoxelStorage`. It also only supports integer keys, which are internally
converted to strings as required by npz, and then always converted back to integers for external use (e.g.
:attr:`~keys`). When opening an existing npz file, the `close` method should be called (:attr:`~close`).
"""
def __init__(self, path: str, overwrite: bool = False):
"""
Initializes the VoxelStorageLazyLoading object. It sets the path for the npz file and checks if the file
already exists. If the file exists and overwrite is set to True, the existing file is removed. Otherwise,
the data from the existing file is loaded.
Args:
path (str): The path for the npz file.
overwrite (bool): If set to True, any existing file at the specified path will be removed. Defaults to False.
"""
if not path.endswith('.npz'):
path = path + '.npz'
self.path = path
self._dc_intern = {}
if os.path.isfile(path):
if overwrite:
os.remove(path)
else:
self.pull()
[docs] def pull(self):
"""
Loads the data from the npz file into the internal dictionary.
"""
self._dc_intern = np.load(self.path)
[docs] def push(self):
"""
Saves the data from the internal dictionary into a compressed npz file.
"""
np.savez_compressed(self.path, **self._dc_intern)
def __setitem__(self, key: int, value: np.ndarray):
"""
Sets the value for a given key in the internal dictionary. The key is
converted to a string as npz only allows string keys.
Args:
key (int): Segment ID. The key for which the value should be set.
value (np.ndarray): Voxel coordinates. The value to be set for the
given key.
"""
# npz only allows string keys
self._dc_intern[str(key)] = value
def __getitem__(self, item: int) -> np.ndarray:
"""
Retrieves the voxel coordinates for a given supervoxel ID from the internal dictionary. The ID is
converted to a string as npz only allows string keys.
Args:
item (int): The supervoxel ID for which the voxel coordinates should be retrieved.
Returns:
np.ndarray: The voxel coordinates corresponding to the given supervoxel ID.
"""
# npz only allows string keys
return self._dc_intern[str(item)]
def __contains__(self, item: int) -> bool:
"""
Checks if a given key is present in the internal dictionary. The key is
converted to a string as npz only allows string keys.
Args:
item (int): The key to check.
Returns:
bool: True if the key is in the internal dictionary, False otherwise.
"""
return str(item) in self._dc_intern
def __len__(self):
"""
Returns the number of items in the internal dictionary.
Returns:
int: The number of items in the internal dictionary.
"""
return len(self._dc_intern)
[docs] def keys(self):
"""
Returns a generator that yields the keys in the internal dictionary. The keys are converted back to integers
for external use.
Returns:
generator: A generator that yields the keys in the internal dictionary.
"""
for k in self._dc_intern.keys():
yield int(k)
[docs] def close(self):
"""
Closes the npz file if it is open.
"""
if isinstance(self._dc_intern, np.lib.npyio.NpzFile):
self._dc_intern.close()
[docs]class MeshStorage(StorageClass):
"""
This class is a customized dictionary designed to store compressed numpy arrays. It provides an intuitive user
interface where compression happens in the background. It also has an option, 'cache_decomp', to cache
decompressed arrays for faster access and saving decompressing time. This class is primarily used for storing
mesh data.
"""
def __init__(self, inp, load_colarr=False, compress=False, **kwargs):
"""
Initializes the MeshStorage object. It sets the input path, whether to load color arrays, and whether to
compress the data.
Args:
inp: The input path for the data.
load_colarr (bool): If set to True, color arrays will be loaded. Defaults to False.
compress (bool): If set to True, the data will be compressed. Defaults to False.
**kwargs: Additional keyword arguments.
"""
self.load_colarr = load_colarr
self.compress = compress
super().__init__(inp, **kwargs)
def __getitem__(self, item: Union[int, str]) -> List[np.ndarray]:
"""
Retrieves the mesh data for a given key. The data includes indices, vertices, normals,
and colors/labels.
Args:
item (Union[int, str]): The key for which the data should be retrieved.
Returns:
List[np.ndarray]: A list of numpy arrays containing the mesh data. This includes
indices, vertices, and optionally normals and colors/labels.
"""
try:
return self._cache_dc[item]
except KeyError:
pass
mesh = self._dc_intern[item]
# if no normals were given in file / cache append empty array
if len(mesh) == 2:
mesh.append([""])
# if no colors/labels were given in file / cache append empty array
if len(mesh) == 3:
mesh.append([""])
decomp_arrs = [lz4string_listtoarr(mesh[0], dtype=np.uint32),
lz4string_listtoarr(mesh[1], dtype=np.float32),
lz4string_listtoarr(mesh[2], dtype=np.float32),
lz4string_listtoarr(mesh[3], dtype=np.uint8)]
if not self.load_colarr:
decomp_arrs = decomp_arrs[:3]
if self._cache_decomp:
self._cache_dc[item] = decomp_arrs
return decomp_arrs
def __setitem__(self, key: int, mesh: List[np.ndarray]):
"""
Sets the mesh data for a given key. The data includes indices, vertices, normals,
and colors/labels.
Args:
key (int/str): The key for which the data should be set.
mesh (List[np.ndarray]): A list of numpy arrays particularly containing
[indices, vertices, normals, colors/labels].
"""
if len(mesh) == 2:
mesh.append(np.zeros((0,), dtype=np.float32))
if len(mesh) == 3:
mesh.append(np.zeros((0,), dtype=np.uint8))
if self._cache_decomp:
self._cache_dc[key] = mesh
if len(mesh[1]) != len(mesh[2]) > 0:
log_backend.warning('Lengths of vertex array and length of normal'
' array differ!')
# test if lengths of vertex and color array are identical or test
# if vertex array length is equal to 3x label array length. Arrays are flattened.
if len(mesh[3]) > 0 and not (len(mesh[1]) == len(mesh[3]) or
len(mesh[1]) == len(mesh[3]) * 3):
log_backend.warning('Lengths of vertex array and length of color/'
'label array differ!')
if self.compress:
transf = arrtolz4string_list
else:
def transf(x): return x
comp_ind = transf(mesh[0].astype(dtype=np.uint32))
comp_vert = transf(mesh[1].astype(dtype=np.float32))
comp_norm = transf(mesh[2].astype(dtype=np.float32))
comp_col = transf(mesh[3].astype(dtype=np.uint8))
self._dc_intern[key] = [comp_ind, comp_vert, comp_norm, comp_col]
[docs]class SkeletonStorage(StorageClass):
"""
This class is designed to store skeleton dictionaries as compressed numpy arrays. The keys of the
dictionaries are "nodes", "diameters", and "edges". This class is primarily used for storing
skeleton data.
"""
def __init__(self, inp, **kwargs):
"""
Initializes the SkeletonStorage object. It sets the input path for the data.
Args:
inp: The input path for the data.
**kwargs: Additional keyword arguments.
"""
super().__init__(inp, **kwargs)
def __getitem__(self, item):
"""
Retrieves the skeleton data for a given key. The data is a dictionary with keys
"nodes", "diameters", and "edges".
Args:
item: The key for which the data should be retrieved. Can be an int or str.
Returns:
dict: A dictionary containing the skeleton data.
"""
try:
return self._cache_dc[item]
except KeyError:
pass
comp_arrs = self._dc_intern[item]
skeleton = {"nodes": lz4string_listtoarr(comp_arrs[0], dtype=np.uint32),
"diameters": lz4string_listtoarr(comp_arrs[1], dtype=np.float32),
"edges": lz4string_listtoarr(comp_arrs[2], dtype=np.uint32)}
if len(comp_arrs) > 3:
for k, v in comp_arrs[3].items():
skeleton[k] = v
if self._cache_decomp:
self._cache_dc[item] = skeleton
return skeleton
def __setitem__(self, key, skeleton):
"""
Sets the skeleton data for a given key. The data is a dictionary with keys
"nodes", "diameters", "edges", and other attributes (uncompressed).
Args:
key (int/str): The key for which the data should be set.
skeleton (dict): A dictionary containing the skeleton data.
"""
if self._cache_decomp:
self._cache_dc[key] = skeleton
comp_n = arrtolz4string_list(skeleton["nodes"].astype(dtype=np.uint32))
comp_d = arrtolz4string_list(skeleton["diameters"].astype(dtype=np.float32))
comp_e = arrtolz4string_list(skeleton["edges"].astype(dtype=np.uint32))
entry = [comp_n, comp_d, comp_e, dict()]
if len(skeleton) > 3:
for k, v in skeleton.items():
if k in ['nodes', 'diameters', 'edges']:
continue
entry[3][k] = v
self._dc_intern[key] = entry
[docs]class BinarySearchStore:
"""
A data structure to store properties (values) of a corresponding ID array (keys). This class uses a binary search
internally, which uses a sorted representation of keys and values to enable sparse look-ups with a much lower
memory complexity than python dictionaries. The maximum ID is the last element of the id_array attribute.
Args:
fname (str): The file name.
id_array (Optional[np.ndarray]): An unsorted ID array.
attr_arrays (Optional[Dict[str, np.ndarray]]): Unsorted attribute arrays, must have the same ordering as the ID array.
overwrite (bool): If True, overwrite existing array files. Defaults to False.
n_shards (Optional[int]): The number of shards/chunks the ID and attribute arrays are split into. Defaults to None.
rdcc_nbytes (int): The size of h5 chunks in bytes. Default is 5 MiB.
"""
def __init__(self, fname: str, id_array: Optional[np.ndarray] = None,
attr_arrays: Optional[Dict[str, np.ndarray]] = None, overwrite: bool = False,
n_shards: Optional[int] = None, rdcc_nbytes: int = 5*2**20):
"""
Data structure to store properties (values) of a corresponding ID array (keys). Internally a binary search
is used that uses a sorted representation of keys and values to enable sparse look-ups with a much lower
memory complexity than python dictionaries.
Maximum ID is the last element of :attr:`~id_array`.
Args:
fname: File name.
id_array: (Unsorted) ID array.
attr_arrays: (Unsorted) attribute arrays, must have the same ordering as ID array.
overwrite: Overwrite existing array files.
n_shards: Number of shards/chunks the ID and attribute arrays are split into. Defaults to 5.
rdcc_nbytes: Size of h5 chunks in bytes. Default is 5 MiB.
"""
self.fname = fname
self._h5_file = None
if id_array is not None:
if attr_arrays is None:
raise ValueError('ID array is given, but no attribute array(s).')
if isinstance(fname, str) and os.path.isfile(fname):
if not overwrite:
raise FileExistsError(f'BinarySearchStore at "{fname}" already exists and overwrite is False."')
else:
os.remove(fname)
if n_shards is None:
n_shards = 5
if isinstance(fname, str):
os.makedirs(os.path.split(self.fname)[0], exist_ok=True)
# sort keys / ID array
ixs = np.argsort(id_array)
id_array = id_array[ixs]
bucket_ranges = []
h5_file = h5py.File(fname, 'w', libver='latest', rdcc_nbytes=rdcc_nbytes)
grp = h5_file.create_group("ids")
for ii, id_sub in enumerate(np.array_split(id_array, n_shards)):
bucket_ranges.append((id_sub[0], id_sub[-1]))
grp.create_dataset(f'{ii}', data=id_sub)
for k, v in attr_arrays.items():
v_sorted = v[ixs]
grp = h5_file.create_group(k)
grp.attrs['shape'] = v_sorted.shape
grp.attrs['dtype'] = np.dtype(v_sorted.dtype).str
for ii, attr_sub in enumerate(np.array_split(v_sorted, n_shards)):
grp.create_dataset(f'{ii}', data=attr_sub)
del ixs
h5_file.attrs['bucket_ranges'] = bucket_ranges
h5_file.close()
else:
if isinstance(fname, str) and not os.path.isfile(fname):
raise FileNotFoundError(f'Could not find BinarySearchStore at "{self.fname}".')
@property
def n_shards(self) -> int:
"""
Returns the number of shards/chunks the ID and attribute arrays are split into.
Returns:
int: The number of shards.
"""
with h5py.File(self.fname, 'r', libver='latest') as f:
n_shards = len(f.attrs['bucket_ranges'])
return n_shards
@property
def id_array(self) -> np.ndarray:
"""
Returns the flat ID array.
Returns:
np.ndarray: The flat ID array.
"""
ids = []
with h5py.File(self.fname, 'r', libver='latest') as f:
for bucket_id in range(len(f.attrs['bucket_ranges'])):
ids.append(f[f'ids/{bucket_id}'][()])
return np.concatenate(ids)
def _get_bucket_ids(self, obj_ids: np.ndarray) -> np.ndarray:
"""
Returns the bucket IDs for the given object IDs.
Args:
obj_ids (np.ndarray): The object IDs to get the bucket IDs for.
Returns:
np.ndarray: The bucket IDs.
"""
bucket_ids = np.ones(obj_ids.shape, dtype=np.int32) * -1
for ii, bucket_range in enumerate(self._h5_file.attrs['bucket_ranges']):
bucket_ids[(bucket_range[0] <= obj_ids) & (obj_ids <= bucket_range[1])] = ii
if -1 in bucket_ids:
raise ValueError(f'IDs {obj_ids[bucket_ids == -1]} not in {self.fname}.')
return bucket_ids
[docs] def get_attributes(self, obj_ids: np.ndarray, attr_key: str) -> np.ndarray:
"""
Queries attributes of the given object IDs. Note that this will not raise an Exception if an ID does not exist
in the store, as the lookup uses binary search.
Args:
obj_ids (np.ndarray): The object IDs to query.
attr_key (str): The value type obtained from the store.
Returns:
np.ndarray: The value array.
"""
self._h5_file = h5py.File(self.fname, 'r', libver='latest')
if attr_key not in self._h5_file.keys():
raise KeyError(f'Key "{attr_key}" does not exist.')
bucket_ids = self._get_bucket_ids(obj_ids)
grp = self._h5_file[f'{attr_key}']
sh = [len(obj_ids)]
if len(grp.attrs['shape']) > 1:
sh += list(grp.attrs['shape'])[1:]
data = np.zeros(sh, dtype=grp.attrs['dtype'])
for bucket_id in np.unique(bucket_ids):
ids = self._h5_file[f'ids/{bucket_id}'][()]
bucket_mask = bucket_ids == bucket_id
queries = obj_ids[bucket_mask]
ixs_sort = np.argsort(queries)
indices = np.searchsorted(ids, queries[ixs_sort])
d = grp[f'{bucket_id}'][list(indices)]
# undo sorting using argsort of argsort to match slicing mask on the left
data[bucket_mask] = d[np.argsort(ixs_sort)]
self._h5_file.close()
self._h5_file = None
return data
[docs]def bss_get_attr_helper(args):
"""
A helper function to query attributes from a BinarySearchStore instance.
Args:
args: A tuple containing a BinarySearchStore instance, query IDs,
and an attribute key.
Returns:
np.ndarray: The query result.
"""
bss, samples, key = args
return bss.get_attributes(samples, key)