# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max Planck Institute of Neurobiology, Martinsried, Germany
# Authors: Philipp Schubert, Sven Dorkenwald, Joergen Kornfeld
import os.path
import shutil
from collections import defaultdict
from typing import Any, Tuple, Optional, Union, List, Iterator, Dict
from ..backend import StorageClass
from ..backend import log_backend
from ..handler.basics import kd_factory
from ..handler.compression import lz4string_listtoarr, arrtolz4string_list
import h5py
import numpy as np
try:
from lz4.block import compress, decompress
except ImportError:
from lz4 import compress, decompress
[docs]class AttributeDict(StorageClass):
"""
General purpose dictionary class inherited from
:class:`syconn.backend.base.StorageClass`.
"""
def __init__(self, inp_p, **kwargs):
super().__init__(inp_p, **kwargs)
def __getitem__(self, item):
try:
return self._dc_intern[item]
except KeyError:
self._dc_intern[item] = {}
return self._dc_intern[item]
def __setitem__(self, key, value):
self._dc_intern[key] = value
[docs] def update(self, other, **kwargs):
self._dc_intern.update(other, **kwargs)
[docs] def copy_intern(self):
return dict(self._dc_intern)
[docs]class CompressedStorage(StorageClass):
"""
Customized dictionary to store compressed numpy arrays, but with a
intuitive user interface, i.e. compression will happen in background.
kwarg 'cache_decomp' can be enabled to cache decompressed arrays
additionally (save decompressing time when accessing items frequently).
"""
def __init__(self, inp: str, **kwargs):
super().__init__(inp, **kwargs)
def __getitem__(self, item: Union[int, str]):
try:
return self._cache_dc[item]
except KeyError:
pass
value_intern = self._dc_intern[item]
sh = value_intern["sh"]
dt = np.dtype(value_intern["dt"])
decomp_arr = lz4string_listtoarr(value_intern["arr"], dtype=dt, shape=sh)
if self._cache_decomp:
self._cache_dc[item] = decomp_arr
return decomp_arr
def __setitem__(self, key: Union[int, str], value: np.ndarray):
if type(value) is not np.ndarray:
msg = "CompressedStorage supports np.array values only."
log_backend.error(msg)
raise ValueError(msg)
if self._cache_decomp:
self._cache_dc[key] = value
sh = list(value.shape)
sh[0] = -1
value_intern = {"arr": arrtolz4string_list(value), "sh": tuple(sh),
"dt": value.dtype.str}
self._dc_intern[key] = value_intern
def __delitem__(self, key):
del self._dc_intern[key]
if key in self._cache_dc:
del self._cache_dc[key]
[docs]class VoxelStorageL(StorageClass):
"""
Customized dictionary to store compressed numpy arrays, but with a
intuitive user interface, i.e. compression will happen in background.
kwarg 'cache_decomp' can be enabled to cache decompressed arrays
additionally (save decompressing time).
"""
def __init__(self, inp: str, **kwargs):
super().__init__(inp, **kwargs)
def __getitem__(self, item: Union[int, str]):
"""
Args:
item:
Returns:
Decompressed voxel masks with corresponding offsets.
"""
try:
return self._cache_dc[item], self._dc_intern[item]["off"]
except KeyError:
pass
value_intern = self._dc_intern[item]
dt = np.dtype(value_intern["dt"])
sh = value_intern["sh"]
offsets = value_intern["off"]
comp_arrs = value_intern["arr"]
decomp_arrs = []
for i in range(len(sh)):
decomp_arrs.append(lz4string_listtoarr(comp_arrs[i], dt, sh[i]))
if self._cache_decomp:
self._cache_dc[item] = decomp_arrs
return decomp_arrs, offsets
def __setitem__(self, key: Union[int, str],
values: Tuple[List[np.ndarray], List[np.ndarray]]):
"""
Args:
key: E.g. SO ID.
values: E.g. voxel masks
Returns:
"""
voxel_masks, offsets = values
assert np.all([voxel_masks[0].dtype == v.dtype for v in voxel_masks])
assert len(voxel_masks) == len(offsets)
if self._cache_decomp:
self._cache_dc[key] = voxel_masks
sh = [v.shape for v in voxel_masks]
for i in range(len(sh)):
curr_sh = list(sh[i])
curr_sh[0] = -1
sh[i] = curr_sh
value_intern = {"arr": [arrtolz4string_list(v) for v in voxel_masks],
"sh": sh, "dt": voxel_masks[0].dtype.str,
"off": offsets}
self._dc_intern[key] = value_intern
[docs] def append(self, key: int, voxel_mask: np.ndarray, offset: np.ndarray):
value_intern = self._dc_intern[key]
dt = np.dtype(value_intern["dt"])
sh = value_intern["sh"]
offsets = value_intern["off"] + [offset]
comp_arrs = value_intern["arr"]
assert dt == voxel_mask.dtype.str
curr_sh = list(voxel_mask.shape)
curr_sh[0] = -1
sh.append(curr_sh)
value_intern = {"arr": comp_arrs + [arrtolz4string_list(voxel_mask)],
"sh": sh, "dt": dt, "off": offsets}
self._dc_intern[key] = value_intern
[docs]def VoxelStorage(inp, **kwargs):
"""
Alias for :class:`~VoxelStorageDyn`.
Args:
inp:
**kwargs:
Returns:
"""
obj = VoxelStorageDyn(inp, **kwargs)
return obj
[docs]class VoxelStorageClass(VoxelStorageL):
"""
Customized dictionary to store compressed numpy arrays, but with a
intuitive user interface, i.e. compression will happen in background.
kwarg 'cache_decomp' can be enabled to cache decompressed arrays
additionally (save decompressing time).
No locking provided in this class!
"""
def __init__(self, inp: str, **kwargs):
if "disable_locking" in kwargs:
assert kwargs["disable_locking"], "Locking must be disabled " \
"in this class. Use VoxelDictL " \
"to enable locking."
super(VoxelStorageL, self).__init__(inp, **kwargs)
[docs]class VoxelStorageDyn(CompressedStorage):
"""
Similar to `VoxelStorageL` but does not store the voxels explicitly,
but the information necessary to query the voxels of an object.
If ``voxel_mode = True`` getter method will operate on underlying data set
to retrieve voxels of an object. `__setitem__` throws `RuntimeError`.
`__getitem__` will return a list of 3D binary cubes with ones at the
object's locations (key: object ID). Note: The item ID has to match the
object ID in the segmentation.
Otherwise (``voxel_mode = False``) `__getitem__` and `__setitem__` allow
manipulation of the object's bounding boxes. In this case `voxeldata_path`
has to be given or already be existent in loaded dictionary. Expects the
source path of a KnossoDataset (see knossos_utils), like:
kd = KnossoDataset()
kd.initialize_from_knossos_path(SOURCE_PATH)
or
kd = kd_factory(SOURCE_PATH)
`__setitem__` requires the object ID as key and an 3 dimensional array with
all bounding boxes defining the object (N, 2, 3). Those BBs are then used to
query the object voxels. The bounding box is expected to be two 3D
coordinates which define the lower and the upper limits.
"""
def __init__(self, inp: str, voxel_mode: bool = True,
voxeldata_path: Optional[str] = None, **kwargs):
if not inp.endswith('.pkl'):
inp = inp + '.pkl'
super().__init__(inp, **kwargs)
self.voxel_mode = voxel_mode
if 'meta' not in self._dc_intern:
# add meta information about underlying voxel data set to internal dictionary
self._dc_intern['meta'] = dict(voxeldata_path=voxeldata_path)
if 'size' not in self._dc_intern:
self._dc_intern['size'] = defaultdict(int)
if 'rep_coord' not in self._dc_intern:
self._dc_intern['rep_coord'] = dict()
if 'voxel_cache' not in self._dc_intern:
self._dc_intern['voxel_cache'] = dict()
if voxeldata_path is not None:
old_p = self._dc_intern['meta']['voxeldata_path']
new_p = voxeldata_path
if old_p != new_p:
log_backend.warn('Overwriting `voxeldata_path` in `VoxelStorageDyn` object (stored at "{}") '
'from `{}` to `{}`.'.format(inp, old_p, new_p))
self._dc_intern['meta']['voxeldata_path'] = voxeldata_path
voxeldata_path = self._dc_intern['meta']['voxeldata_path']
if voxel_mode:
if voxeldata_path is None:
msg = '`voxel_mode` is True but no path to voxeldata given / found.'
log_backend.error(msg)
raise ValueError(msg)
kd = kd_factory(voxeldata_path)
self.voxeldata = kd
self._cache_dc = VoxelStorageLazyLoading(inp.replace('.pkl', '.npz'))
def __setitem__(self, key: int, value: Any):
if self.voxel_mode:
raise RuntimeError('`VoxelStorageDyn.__setitem__` may only be used when `voxel_mode=False`.')
else:
return super().__setitem__(key, value)
def __getitem__(self, item: int):
return self.get_voxelmask_offset(item)
[docs] def get_voxelmask_offset(self, item: int, overlap: int = 0):
if self.voxel_mode:
res = []
bbs = super().__getitem__(item)
for bb in bbs: # iterate over all bounding boxes
size = bb[1] - bb[0] + 2 * overlap
off = bb[0] - overlap
curr_mask = self.voxeldata.load_seg(size=size, offset=off, mag=1) == item
res.append(curr_mask.swapaxes(0, 2))
return res, bbs[:, 0] # (N, 3) --> all offset
else:
return super().__getitem__(item)
[docs] def iter_voxelmask_offset(self, item: int, overlap: int = 0) -> Iterator[Tuple[np.ndarray, np.ndarray]]:
bbs = super().__getitem__(item)
for bb in bbs: # iterate over all bounding boxes
size = bb[1] - bb[0] + 2 * overlap
off = bb[0] - overlap
curr_mask = self.voxeldata.load_seg(size=size, offset=off, mag=1) == item
yield curr_mask.swapaxes(0, 2), bb[0]
[docs] def object_size(self, item):
if not self.voxel_mode:
log_backend.warn('`object_size` sould only be called during `voxel_mode=True`.')
if item not in self._dc_intern:
raise KeyError('KeyError: Could not find key "{}" in `self._dc_intern`.`'.format(item))
return self._dc_intern['size'][item]
[docs] def increase_object_size(self, item, value):
if self.voxel_mode:
log_backend.warn('`increase_object_size` sould only be called when `voxel_mode=False`.')
self._dc_intern['size'][item] += value
[docs] def object_repcoord(self, item):
if not self.voxel_mode:
log_backend.warn('`object_repcoord` sould only be called when `voxel_mode=True`.')
if item not in self._dc_intern:
raise KeyError('KeyError: Could not find key "{}" in `self._dc_intern`.`'.format(item))
return self._dc_intern['rep_coord'][item]
[docs] def set_object_repcoord(self, item, value):
if self.voxel_mode:
log_backend.warn('`set_object_repcoord` sould only be called when `voxel_mode=False`.')
self._dc_intern['rep_coord'][item] = value
[docs] def push(self):
if len(self._cache_dc) > 0:
self._cache_dc.push()
super().push()
[docs] def set_voxel_cache(self, key: int, voxel_coords: np.ndarray):
"""
This is only used to store the voxels during the synapse extraction step. This method operates independent of
:func:`~__setitem__`.
Args:
key: Segment ID.
voxel_coords: Voxel coordinates.
"""
self._cache_dc[key] = voxel_coords
[docs] def get_voxel_cache(self, key: int):
"""
Voxels corresponding to item `key` must have been added to store via :func:`~set_voxel_cache`.
This implementation operates independent of :func:`~get_voxeldata`.
Args:
key: Segment ID.
Returns:
Voxel coordinates.
"""
return self._cache_dc[key]
[docs] def get_voxeldata(self, item: int) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""
Get the object binary mask as list of 3D cubes with the respective offsets
(in voxels). All in xyz.
Args:
item: Object ID.
Returns:
List of 3D binary masks and offsets (in voxels; xyz).
"""
old_vx_mode = self.voxel_mode
self.voxel_mode = True
if self._dc_intern['meta']['voxeldata_path'] is None:
msg = '`voxel_mode` is True but no path to' \
' voxeldata given / found.'
log_backend.error(msg)
raise ValueError(msg)
kd = kd_factory(self._dc_intern['meta']['voxeldata_path'])
self.voxeldata = kd
res = self[item]
self.voxel_mode = old_vx_mode
return res
[docs] def get_voxel_data_cubed(self, item: int) -> Tuple[np.ndarray, np.ndarray]:
"""
Get the object binary mask as dense 3D array (xyz).
Args:
item: Object ID.
Returns:
3D mask, cube offset in voxels (xyz).
"""
bin_arrs, block_offsets = self[item]
min_off = np.min(block_offsets, axis=0)
block_extents = np.array([off + np.array(bin_arr.shape) for bin_arr, off in zip(bin_arrs, block_offsets)],
dtype=np.int32)
max_extent = np.max(block_extents, axis=0)
size = max_extent - min_off
block_offsets -= min_off
voxel_arr = np.zeros(size, dtype=np.bool)
for bin_arr, off in zip(bin_arrs, block_offsets):
sh = off + np.array(bin_arr.shape, dtype=np.int32)
voxel_arr[off[0]:sh[0], off[1]:sh[1], off[2]:sh[2]] = bin_arr
return voxel_arr, min_off
[docs] def get_boundingdata(self, item: int) -> List[np.ndarray]:
"""
Get the object bounding boxes.
Args:
item: Object ID.
Returns:
List of bounding boxes (in voxels; xyz).
"""
old_vx_mode = self.voxel_mode
self.voxel_mode = False
res = self[item]
self.voxel_mode = old_vx_mode
return res
[docs] def keys(self):
# do not return 'meta' and other helper items in self._dc_intern, only object IDs
# TODO: make this a generator, check usages beforehand!
obj_elements = list([k for k in self._dc_intern.keys() if (type(k) is str and k.isdigit())
or (type(k) is not str)])
return obj_elements
[docs]class VoxelStorageLazyLoading:
"""
Similar to `VoxelStorage` but uses lazy loading via numpy npz files.
Notes:
* Once written, npz storages will not support modification via ``__setitem__``.
* Key of types other than int are not supported. Internally, keys are converted to string,
as required by npz, and then always converted to int for "external" use (e.g. :attr:`~keys`).
* Call :attr:`~close` when opening an existing npz file.
"""
def __init__(self, path: str, overwrite: bool = False):
if not path.endswith('.npz'):
path = path + '.npz'
self.path = path
self._dc_intern = {}
if os.path.isfile(path):
if overwrite:
os.remove(path)
else:
self.pull()
[docs] def pull(self):
self._dc_intern = np.load(self.path)
[docs] def push(self):
np.savez_compressed(self.path, **self._dc_intern)
def __setitem__(self, key: int, value: np.ndarray):
"""
Args:
key: Segment ID.
value: Voxel coordinates.
"""
# npz only allows string keys
self._dc_intern[str(key)] = value
def __getitem__(self, item: int) -> np.ndarray:
"""
Voxels corresponding to `item` (supervoxel ID).
Args:
item: Segment ID.
Returns:
Voxel coordinates belonging to ID `item`.
"""
# npz only allows string keys
return self._dc_intern[str(item)]
def __contains__(self, item: int) -> bool:
"""
npz only allows string IDs.
Args:
item: Integer key.
Returns:
True if item in storage.
"""
return str(item) in self._dc_intern
def __len__(self):
return len(self._dc_intern)
[docs] def keys(self):
for k in self._dc_intern.keys():
yield int(k)
[docs] def close(self):
if isinstance(self._dc_intern, np.lib.npyio.NpzFile):
self._dc_intern.close()
[docs]class MeshStorage(StorageClass):
"""
Customized dictionary to store compressed numpy arrays, but with a
intuitive user interface, i.e. compression will happen in background.
kwarg 'cache_decomp' can be enabled to cache decompressed arrays
additionally (save decompressing time).
"""
def __init__(self, inp, load_colarr=False, compress=False, **kwargs):
self.load_colarr = load_colarr
self.compress = compress
super().__init__(inp, **kwargs)
def __getitem__(self, item: Union[int, str]) -> List[np.ndarray]:
"""
Args:
item: Key.
Returns:
Flat arrays: (indices, vertices, [normals, [colors/labels]])
"""
try:
return self._cache_dc[item]
except KeyError:
pass
mesh = self._dc_intern[item]
# if no normals were given in file / cache append empty array
if len(mesh) == 2:
mesh.append([""])
# if no colors/labels were given in file / cache append empty array
if len(mesh) == 3:
mesh.append([""])
decomp_arrs = [lz4string_listtoarr(mesh[0], dtype=np.uint32),
lz4string_listtoarr(mesh[1], dtype=np.float32),
lz4string_listtoarr(mesh[2], dtype=np.float32),
lz4string_listtoarr(mesh[3], dtype=np.uint8)]
if not self.load_colarr:
decomp_arrs = decomp_arrs[:3]
if self._cache_decomp:
self._cache_dc[item] = decomp_arrs
return decomp_arrs
def __setitem__(self, key: int, mesh: List[np.ndarray]):
"""
Args:
key : int/str
mesh : List[np.array] particulary [indices, vertices, normals, colors/labels]
"""
if len(mesh) == 2:
mesh.append(np.zeros((0,), dtype=np.float32))
if len(mesh) == 3:
mesh.append(np.zeros((0,), dtype=np.uint8))
if self._cache_decomp:
self._cache_dc[key] = mesh
if len(mesh[1]) != len(mesh[2]) > 0:
log_backend.warning('Lengths of vertex array and length of normal'
' array differ!')
# test if lengths of vertex and color array are identical or test
# if vertex array length is equal to 3x label array length. Arrays are flattened.
if len(mesh[3]) > 0 and not (len(mesh[1]) == len(mesh[3]) or
len(mesh[1]) == len(mesh[3]) * 3):
log_backend.warning('Lengths of vertex array and length of color/'
'label array differ!')
if self.compress:
transf = arrtolz4string_list
else:
def transf(x): return x
comp_ind = transf(mesh[0].astype(dtype=np.uint32))
comp_vert = transf(mesh[1].astype(dtype=np.float32))
comp_norm = transf(mesh[2].astype(dtype=np.float32))
comp_col = transf(mesh[3].astype(dtype=np.uint8))
self._dc_intern[key] = [comp_ind, comp_vert, comp_norm, comp_col]
[docs]class SkeletonStorage(StorageClass):
"""
Stores skeleton dictionaries (keys: "nodes", "diameters", "edges") as compressed numpy arrays.
"""
def __init__(self, inp, **kwargs):
super().__init__(inp, **kwargs)
def __getitem__(self, item):
"""
Args:
item: int/str
Returns:
dict
"""
try:
return self._cache_dc[item]
except KeyError:
pass
comp_arrs = self._dc_intern[item]
skeleton = {"nodes": lz4string_listtoarr(comp_arrs[0], dtype=np.uint32),
"diameters": lz4string_listtoarr(comp_arrs[1], dtype=np.float32),
"edges": lz4string_listtoarr(comp_arrs[2], dtype=np.uint32)}
if len(comp_arrs) > 3:
for k, v in comp_arrs[3].items():
skeleton[k] = v
if self._cache_decomp:
self._cache_dc[item] = skeleton
return skeleton
def __setitem__(self, key, skeleton):
"""
Args:
key: int/str
skeleton : dict
keys: nodes diameters edges and other attributes (uncompressed).
"""
if self._cache_decomp:
self._cache_dc[key] = skeleton
comp_n = arrtolz4string_list(skeleton["nodes"].astype(dtype=np.uint32))
comp_d = arrtolz4string_list(skeleton["diameters"].astype(dtype=np.float32))
comp_e = arrtolz4string_list(skeleton["edges"].astype(dtype=np.uint32))
entry = [comp_n, comp_d, comp_e, dict()]
if len(skeleton) > 3:
for k, v in skeleton.items():
if k in ['nodes', 'diameters', 'edges']:
continue
entry[3][k] = v
self._dc_intern[key] = entry
[docs]class BinarySearchStore:
def __init__(self, fname: str, id_array: Optional[np.ndarray] = None,
attr_arrays: Optional[Dict[str, np.ndarray]] = None, overwrite: bool = False,
n_shards: Optional[int] = None, rdcc_nbytes: int = 5*2**20):
"""
Data structure to store properties (values) of a corresponding ID array (keys). Internally a binary search
is used that uses a sorted representation of keys and values to enable sparse look-ups with a much lower
memory complexity than python dictionaries.
Maximum ID is the last element of :attr:`~id_array`.
Args:
fname: File name.
id_array: (Unsorted) ID array.
attr_arrays: (Unsorted) attribute arrays, must have the same ordering as ID array.
overwrite: Overwrite existing array files.
n_shards: Number of shards/chunks the ID and attribute arrays are split into. Defaults to 5.
rdcc_nbytes: Size of h5 chunks in bytes. Default is 5 MiB.
"""
self.fname = fname
self._h5_file = None
if id_array is not None:
if attr_arrays is None:
raise ValueError('ID array is given, but no attribute array(s).')
if isinstance(fname, str) and os.path.isfile(fname):
if not overwrite:
raise FileExistsError(f'BinarySearchStore at "{fname}" already exists and overwrite is False."')
else:
os.remove(fname)
if n_shards is None:
n_shards = 5
if isinstance(fname, str):
os.makedirs(os.path.split(self.fname)[0], exist_ok=True)
# sort keys / ID array
ixs = np.argsort(id_array)
id_array = id_array[ixs]
bucket_ranges = []
h5_file = h5py.File(fname, 'w', libver='latest', rdcc_nbytes=rdcc_nbytes)
grp = h5_file.create_group("ids")
for ii, id_sub in enumerate(np.array_split(id_array, n_shards)):
bucket_ranges.append((id_sub[0], id_sub[-1]))
grp.create_dataset(f'{ii}', data=id_sub)
for k, v in attr_arrays.items():
v_sorted = v[ixs]
grp = h5_file.create_group(k)
grp.attrs['shape'] = v_sorted.shape
grp.attrs['dtype'] = np.dtype(v_sorted.dtype).str
for ii, attr_sub in enumerate(np.array_split(v_sorted, n_shards)):
grp.create_dataset(f'{ii}', data=attr_sub)
del ixs
h5_file.attrs['bucket_ranges'] = bucket_ranges
h5_file.close()
else:
if isinstance(fname, str) and not os.path.isfile(fname):
raise FileNotFoundError(f'Could not find BinarySearchStore at "{self.fname}".')
@property
def n_shards(self) -> int:
"""
Number of shards/chunks the ID and attribute arrays are split into.
Returns:
"""
with h5py.File(self.fname, 'r', libver='latest') as f:
n_shards = len(f.attrs['bucket_ranges'])
return n_shards
@property
def id_array(self) -> np.ndarray:
"""
Returns:
Flat ID array.
"""
ids = []
with h5py.File(self.fname, 'r', libver='latest') as f:
for bucket_id in range(len(f.attrs['bucket_ranges'])):
ids.append(f[f'ids/{bucket_id}'][()])
return np.concatenate(ids)
def _get_bucket_ids(self, obj_ids: np.ndarray) -> np.ndarray:
bucket_ids = np.ones(obj_ids.shape, dtype=np.int32) * -1
for ii, bucket_range in enumerate(self._h5_file.attrs['bucket_ranges']):
bucket_ids[(bucket_range[0] <= obj_ids) & (obj_ids <= bucket_range[1])] = ii
if -1 in bucket_ids:
raise ValueError(f'IDs {obj_ids[bucket_ids == -1]} not in {self.fname}.')
return bucket_ids
[docs] def get_attributes(self, obj_ids: np.ndarray, attr_key: str) -> np.ndarray:
"""
Query attributes of given `obj_ids`. Note that this will not raise an Exception if a ID does not exist in the
store, as the lookup uses binary search.
Args:
obj_ids: Object IDs to query.
attr_key: Value type obtained from the store.
Returns:
Value array.
"""
self._h5_file = h5py.File(self.fname, 'r', libver='latest')
if attr_key not in self._h5_file.keys():
raise KeyError(f'Key "{attr_key}" does not exist.')
bucket_ids = self._get_bucket_ids(obj_ids)
grp = self._h5_file[f'{attr_key}']
sh = [len(obj_ids)]
if len(grp.attrs['shape']) > 1:
sh += list(grp.attrs['shape'])[1:]
data = np.zeros(sh, dtype=grp.attrs['dtype'])
for bucket_id in np.unique(bucket_ids):
ids = self._h5_file[f'ids/{bucket_id}'][()]
bucket_mask = bucket_ids == bucket_id
queries = obj_ids[bucket_mask]
ixs_sort = np.argsort(queries)
indices = np.searchsorted(ids, queries[ixs_sort])
d = grp[f'{bucket_id}'][list(indices)]
# undo sorting using argsort of argsort to match slicing mask on the left
data[bucket_mask] = d[np.argsort(ixs_sort)]
self._h5_file.close()
self._h5_file = None
return data
[docs]def bss_get_attr_helper(args):
"""
Helper function to query attributes from a BinarySearchStore instance.
Args:
args: BinarySearchStore, query_ids, attribute key.
Returns:
Query result.
"""
bss, samples, key = args
return bss.get_attributes(samples, key)