# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max-Planck-Institute of Neurobiology, Munich, Germany
# Authors: Philipp Schubert, Joergen Kornfeld
import glob
import os
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, Optional, Tuple, List, Union, Iterable, Any
from scipy import spatial
import numpy as np
from . import log_reps
from . import rep_helper as rh
from .rep_helper import surface_samples
from .. import global_params
from ..backend.storage import AttributeDict, CompressedStorage, MeshStorage, \
VoxelStorage, SkeletonStorage, VoxelStorageDyn, VoxelStorageLazyLoading
from ..handler.basics import chunkify, temp_seed
from ..handler.multiviews import generate_rendering_locs
from ..mp.mp_utils import start_multiprocess_imap
from ..proc.graphs import create_graph_from_coords
if TYPE_CHECKING:
from ..reps.segmentation import SegmentationObject, SegmentationDataset
MeshType = Union[Tuple[np.ndarray, np.ndarray, np.ndarray], List[np.ndarray],
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]
[docs]def glia_pred_so(so: 'SegmentationObject', thresh: float,
pred_key_appendix: str) -> int:
"""
Classifies a cell supervoxel as neuron or glia based on the provided threshold.
Args:
so: The cell supervoxel object to classify.
thresh: The probability threshold above which the supervoxel is classified as glia.
pred_key_appendix: A string appended to the prediction key used to retrieve glia probabilities.
Returns:
int: The classification result, where 0 indicates neuron and 1 indicates glia.
"""
assert so.type == "sv"
pred_key = "glia_probas" + pred_key_appendix
if pred_key not in so.attr_dict:
so.load_attr_dict()
try:
preds = np.array(so.attr_dict[pred_key][:, 1] > thresh, dtype=np.int32)
pred = np.mean(so.attr_dict[pred_key][:, 1]) > thresh
except KeyError:
raise KeyError('Could not find glia proba key `{}` in so,attr_dict (keys: {})'.format(
pred_key, so.attr_dict.keys()))
if pred == 0:
return 0
glia_votes = np.sum(preds)
if glia_votes > int(len(preds) * 0.7):
return 1
return 0
[docs]def glia_proba_so(so: 'SegmentationObject', pred_key_appendix: str) -> float:
"""
Calculates the mean glia probability for a cell supervoxel.
Args:
so: The cell supervoxel object for which to calculate the probability.
pred_key_appendix: A string appended to the prediction key used to retrieve glia
probabilities.
Returns:
float: The mean glia probability of a cell supervoxel with 0 indicating neuron and
1 indicating glia.
"""
assert so.type == "sv"
pred_key = "glia_probas" + pred_key_appendix
if pred_key not in so.attr_dict:
so.load_attr_dict()
return np.mean(so.attr_dict[pred_key][:, 1])
[docs]def acquire_obj_ids(sd: 'SegmentationDataset'):
"""
Retrieves all object IDs present in the SegmentationDataset. If an id array is
available, it loads it to assemble the id list; otherwise, it iterates over all
voxel / attribute dicts which is slower.
Args:
sd: The SegmentationDataset from which to acquire object IDs.
Returns:
None: The object IDs are stored within the SegmentationDataset instance.
"""
sd._ids = sd.load_numpy_data('id')
if sd._ids is None:
paths = glob.glob(sd.so_storage_path + "/*/*/*/") + \
glob.glob(sd.so_storage_path + "/*/*/") + \
glob.glob(sd.so_storage_path + "/*/")
sd._ids = []
for path in paths:
if os.path.exists(path + "attr_dict.pkl"):
this_ids = list(AttributeDict(path + "attr_dict.pkl", read_only=True).keys())
else:
this_ids = []
sd._ids += this_ids
sd._ids = np.array(sd._ids)
np.save(sd.path_ids, sd._ids)
[docs]def save_voxels(so: 'SegmentationObject', bin_arr: np.ndarray,
offset: np.ndarray, overwrite: bool = False):
"""
Helper function to save SegmentationObject voxels.
Args:
so: SegmentationObject
The SegmentationObject to which the voxel data belongs.
bin_arr: np.array
A binary mask array indicating supervoxel locations. 0: background,
1: supervoxel locations.
offset: np.array
The offset of the binary mask array within the full dataset.
overwrite: bool
If True, existing voxel data will be overwritten.
Returns:
None: The voxel data is saved to the storage associated with the
SegmentationObject.
"""
assert bin_arr.dtype == bool
voxel_dc = VoxelStorage(so.voxel_path, read_only=False,
disable_locking=True)
if so.id in voxel_dc and not overwrite:
voxel_dc.append(so.id, bin_arr, offset)
else:
voxel_dc[so.id] = [bin_arr], [offset]
voxel_dc.push(so.voxel_path)
[docs]def load_voxels_depr(so: 'SegmentationObject',
voxel_dc: Optional[VoxelStorage] = None) -> np.ndarray:
"""
Helper function to load voxels of a SegmentationObject as 3D array.
Also calculates size and bounding box and assigns it to `so._size` and `so._bounding_box`
respectively (deprecated function).
Args:
so: The SegmentationObject from which to load voxels.
voxel_dc: An optional VoxelStorage instance to use for loading.
Returns: np.array
3D binary mask array, 0: background, 1: supervoxel locations.
"""
if voxel_dc is None:
voxel_dc = VoxelStorage(so.voxel_path, read_only=True,
disable_locking=True)
so._size = 0
if so.id not in voxel_dc:
msg = f"Voxels of {so} do not exist!"
log_reps.error(msg)
raise KeyError(msg)
bin_arrs, block_offsets = voxel_dc[so.id]
block_extents = []
for i_bin_arr in range(len(bin_arrs)):
block_extents.append(np.array(bin_arrs[i_bin_arr].shape) +
block_offsets[i_bin_arr])
block_offsets = np.array(block_offsets, dtype=np.int32)
block_extents = np.array(block_extents, dtype=np.int32)
so._bounding_box = np.array([block_offsets.min(axis=0),
block_extents.max(axis=0)])
voxels = np.zeros(so.bounding_box[1] - so.bounding_box[0],
dtype=np.bool)
for i_bin_arr in range(len(bin_arrs)):
box = [block_offsets[i_bin_arr] - so.bounding_box[0],
block_extents[i_bin_arr] - so.bounding_box[0]]
so._size += np.sum(bin_arrs[i_bin_arr])
voxels[box[0][0]: box[1][0],
box[0][1]: box[1][1],
box[0][2]: box[1][2]][bin_arrs[i_bin_arr]] = True
return voxels
[docs]def load_voxels_downsampled(so: 'SegmentationObject',
ds: Tuple[int, int, int] = (2, 2, 1)) -> Union[np.ndarray, List]:
"""
Loads downsampled voxels of a SegmentationObject.
Args:
so: The SegmentationObject from which to load downsampled voxels.
ds: A tuple indicating the downsampling factors along each axis.
Returns:
Union[np.ndarray, List]: A downsampled 3D binary mask or an empty list if no voxels are present.
"""
if isinstance(so.voxels, int):
return []
return so.voxels[::ds[0], ::ds[1], ::ds[2]]
[docs]def load_voxel_list(so: 'SegmentationObject') -> np.ndarray:
"""
Loads a list of voxel coordinates for a SegmentationObject.
Args:
so: SegmentationObject. The SegmentationObject from which to load voxel coordinates.
Returns:
np.ndarray: 2D array of coordinates to all voxels in the SegmentationObject.
"""
if so._voxels is not None:
voxel_list = np.transpose(np.nonzero(so.voxels)) + so.bounding_box[0]
elif so.type in ['syn', 'syn_ssv']:
voxel_dc = VoxelStorageLazyLoading(so.voxel_path)
voxel_list = voxel_dc[so.id]
voxel_dc.close()
else:
voxel_dc = VoxelStorageDyn(so.voxel_path, read_only=True, disable_locking=True)
bin_arrs, block_offsets = voxel_dc[so.id]
voxel_list = []
for i_bin_arr in range(len(bin_arrs)):
block_voxels = np.transpose(np.nonzero(bin_arrs[i_bin_arr]))
block_voxels += block_offsets[i_bin_arr]
voxel_list.append(block_voxels)
voxel_list = np.concatenate(voxel_list)
return voxel_list
[docs]def load_voxel_list_downsampled(so, downsampling=(2, 2, 1)):
"""
Loads a downsampled list of voxel coordinates for a SegmentationObject.
Args:
so: The SegmentationObject from which to load downsampled voxel
coordinates.
downsampling: A tuple indicating the downsampling factors along
each axis.
Returns:
np.ndarray: A downsampled list of voxel coordinates.
TODO: refactor, probably more efficient implementation possible.
"""
downsampling = np.array(downsampling)
dvoxels = so.load_voxels_downsampled(downsampling)
voxel_list = np.array(np.transpose(np.nonzero(dvoxels)), dtype=np.int32)
voxel_list = voxel_list * downsampling + np.array(so.bounding_box[0])
return voxel_list
[docs]def load_voxel_list_downsampled_adapt(so, downsampling=(2, 2, 1)):
"""
Loads a downsampled list of voxel coordinates for a SegmentationObject, adapting the downsampling if necessary.
Args:
so: The SegmentationObject from which to load downsampled voxel coordinates.
downsampling: A tuple indicating the initial downsampling factors along each axis.
Returns:
np.ndarray: An adaptively downsampled list of voxel coordinates.
"""
downsampling = np.array(downsampling, dtype=np.int32)
dvoxels = so.load_voxels_downsampled(downsampling)
if len(dvoxels) == 0:
return []
while True:
if True in dvoxels:
break
downsampling = downsampling // 2
downsampling[downsampling < 1] = 1
dvoxels = so.load_voxels_downsampled(downsampling)
voxel_list = np.array(np.transpose(np.nonzero(dvoxels)), dtype=np.int32)
voxel_list = voxel_list * downsampling + np.array(so.bounding_box[0])
return voxel_list
[docs]def load_mesh(so: 'SegmentationObject', recompute: bool = False) -> MeshType:
"""
Loads the mesh representation of a SegmentationObject.
Args:
so: The SegmentationObject from which to load the mesh.
recompute: If True, the mesh will be recomputed instead of loaded from
storage. If False, the mesh is loaded without re-computation.
Returns:
MeshType: A tuple containing indices, vertices, and normals of the mesh,
all flattened. TODO: Currently ignores potential color/label array.
"""
if not recompute and so.mesh_exists:
try:
mesh = MeshStorage(so.mesh_path,
disable_locking=True)[so.id]
if len(mesh) == 2:
indices, vertices = mesh
normals = np.zeros((0,), dtype=np.float32)
elif len(mesh) == 3:
indices, vertices, normals = mesh
col = np.zeros(0, dtype=np.uint8)
elif len(mesh) == 4:
indices, vertices, normals, col = mesh
except Exception as e:
msg = "\n%s\nException occured when loading mesh.pkl of SO (%s)" \
"with id %d.".format(e, so.type, so.id)
log_reps.error(msg)
return [np.zeros((0,)).astype(np.int32), np.zeros((0,)), np.zeros((0,))]
else:
if so.type == "sv" and not global_params.config.allow_mesh_gen_cells:
log_reps.error("Mesh of SV %d not found.\n" % so.id)
return [np.zeros((0,)).astype(np.int), np.zeros((0,)), np.zeros((0,))]
indices, vertices, normals = so.mesh_from_scratch()
col = np.zeros(0, dtype=np.uint8)
try:
so._save_mesh(indices, vertices, normals)
except Exception as e:
log_reps.error("Mesh of %s %d could not be saved:\n%s\n".format(
so.type, so.id, e))
vertices = np.array(vertices, dtype=np.int32)
indices = np.array(indices, dtype=np.int64)
normals = np.array(normals, dtype=np.float32)
col = np.array(col, dtype=np.uint8)
return [indices, vertices, normals]
[docs]def load_skeleton(so: 'SegmentationObject', recompute: bool = False) -> dict:
"""
Loads the skeleton representation of a SegmentationObject.
Args:
so: The SegmentationObject from which to load the skeleton.
recompute: If True, the skeleton will be recomputed and not
stored in SkeletonStorage. (default=False)
Returns:
dict: A dictionary containing "nodes", "diameters", and "edges" of
the skeleton.
"""
empty_skel = dict(nodes=np.zeros((0, 3)).astype(np.int64), edges=np.zeros((0, 2)),
diameters=np.zeros((0,)).astype(np.int32))
if not recompute and so.skeleton_exists:
try:
skeleton_dc = SkeletonStorage(so.skeleton_path, disable_locking=not so.enable_locking)
skel = skeleton_dc[so.id]
if np.ndim(skel['nodes']) == 1:
skel['nodes'] = skel['nodes'].reshape((-1, 3))
if np.ndim(skel['edges']) == 1:
skel['edges'] = skel['edges'].reshape((-1, 2))
except Exception as e:
log_reps.error("\n{}\nException occured when loading skeletons.pkl"
" of SO ({}) with id {}.".format(e, so.type, so.id))
return empty_skel
elif recompute:
skel = generate_skeleton_sv(so)
else:
msg = f"Skeleton of {so} (size: {so.size}) not found.\n"
if so.type == "sv":
if so.size == 1: # small SVs don't have a skeleton
log_reps.debug(msg)
else:
log_reps.error(msg)
raise ValueError(msg)
return empty_skel
return skel
[docs]def save_skeleton(so: 'SegmentationObject', overwrite: bool = False):
"""
Saves the skeleton data for a SegmentationObject.
Args:
so: The SegmentationObject whose skeleton data is to be saved.
overwrite: If True, existing skeleton data will be overwritten.
Returns:
None: The skeleton data is saved to the storage associated with
the SegmentationObject.
"""
skeleton_dc = SkeletonStorage(so.skeleton_path, read_only=False, disable_locking=not so.enable_locking)
if not overwrite and so.id in skeleton_dc:
raise ValueError(f"Skeleton of {so} already exists.")
skeleton_dc[so.id] = so.skeleton
skeleton_dc.push()
[docs]def sv_view_exists(args):
ps, woglia = args
missing_ids = []
for p in ps:
ad = AttributeDict(p + "/attr_dict.pkl", disable_locking=True)
obj_ixs = ad.keys()
view_dc_p = p + "/views_woglia.pkl" if woglia else p + "/views.pkl"
view_dc = CompressedStorage(view_dc_p, disable_locking=True)
missing_ids = np.setdiff1d(list(obj_ixs), list(view_dc.keys()))
return missing_ids
[docs]def find_missing_sv_views(sd, woglia, n_cores=20):
folders = sd.so_dir_paths
np.random.shuffle(folders)
multi_params = chunkify(folders, 1000)
params = [(ps, woglia) for ps in multi_params]
res = start_multiprocess_imap(sv_view_exists, params, nb_cpus=n_cores,
debug=False)
return np.concatenate(res)
[docs]def sv_skeleton_missing(sv):
if sv.skeleton is None:
sv.load_skeleton()
return (sv.skeleton is None) or (len(sv.skeleton['nodes']) == 0)
[docs]def find_missing_sv_skeletons(svs, n_cores=20):
res = start_multiprocess_imap(sv_skeleton_missing, svs, nb_cpus=n_cores,
debug=False)
return [svs[kk].id for kk in range(len(svs)) if res[kk]]
[docs]def sv_attr_exists(args):
ps, attr_key = args
missing_ids = []
for p in ps:
ad = AttributeDict(p + "/attr_dict.pkl", disable_locking=True)
for k, v in ad.items():
if attr_key not in v:
missing_ids.append(k)
return missing_ids
[docs]def find_missing_sv_attributes(sd: 'SegmentationDataset', attr_key: str, n_cores: int = 20):
"""
Identifies missing attributes for supervoxels in a SegmentationDataset.
Args:
sd: The SegmentationDataset to check for missing attributes.
attr_key (str): The attribute key to check for in each supervoxel.
n_cores (int): The number of CPU cores to use for parallel processing.
Returns:
None: The result is not returned but may be used internally for processing.
"""
multi_params = chunkify(sd.so_dir_paths, 100)
params = [(ps, attr_key) for ps in multi_params]
res = start_multiprocess_imap(sv_attr_exists, params, nb_cpus=n_cores, debug=False)
return np.concatenate(res)
[docs]def load_so_meshes_bulk(sos: Union[List['SegmentationObject'], Iterable['SegmentationObject']],
use_new_subfold: bool = True, cache_decomp=True) -> MeshStorage:
"""
Bulk loader for SegmentationObject (SO) meshes. Minimizes IO by loading IDs from the same storage at the same time.
This will not assign the ``_mesh`` attribute!
Args:
sos: SegmentationObjects or an iterable of SegmentationObjects for which to load meshes.
use_new_subfold: Use new sub-folder structure for storage if True.
cache_decomp: Cache decompressed meshes.
Returns:
Dictionary, key: ID, value: mesh data.
"""
md_out = MeshStorage(None) # in-memory dict with compression
if len(sos) == 0:
return md_out
base_path = sos[0].so_storage_path
nf = sos[0].n_folders_fs
subf_from_ix = rh.subfold_from_ix_new if use_new_subfold else \
rh.subfold_from_ix_OLD
sub2ids = defaultdict(list)
for so in sos:
if so._mesh is None:
subf = subf_from_ix(so.id, nf)
sub2ids[subf].append(so.id)
else:
md_out[so.id] = so._mesh
for subfold, ids in sub2ids.items():
mesh_path = f'{base_path}/{subfold}/mesh.pkl'
md = MeshStorage(mesh_path, disable_locking=True,
cache_decomp=cache_decomp)
for so_id in ids:
md_out._dc_intern[so_id] = md._dc_intern[so_id]
assert len(md_out) == len(sos)
return md_out
[docs]def load_so_attr_bulk(sos: List['SegmentationObject'],
attr_keys: Union[str, Iterable[str]],
use_new_subfold: bool = True,
allow_missing: bool = False) -> Union[Dict[str, Dict[int, Any]], Dict[int, Any]]:
"""
Bulk loader for SegmentationObject (SO) meshes. This method minimizes IO by
loading IDs from the same storage at the same time and organizes them into a
dictionary. If only one attr_key is provided, a single dictionary is returned;
otherwise, a dictionary of dictionaries for multiple keys is returned. Existing
attributes in the object's ``attr_dict`` are checked, leveraging
``cache_properties`` for efficiency with large datasets.
Args:
sos: A list of SegmentationObjects for which to load attributes.
attr_keys: A single attribute key or an iterable of keys to load.
use_new_subfold: If True, uses the new sub-folder structure for storage
paths, beneficial for managing storage.
allow_missing: If True, sets missing attribute values to None; otherwise,
raises a KeyError.
Returns:
Union[Dict[str, Dict[int, Any]], Dict[int, Any]]: A dictionary or a dict
of dicts with object IDs as keys and attribute values as values, based on
the provided attr_keys.
"""
if type(attr_keys) is str:
attr_keys = [attr_keys]
out = {attr_k: dict() for attr_k in attr_keys}
if len(sos) == 0:
if len(attr_keys) == 1:
out = out[attr_keys[0]]
return out
base_path = sos[0].so_storage_path
nf = sos[0].n_folders_fs
subf_from_ix = rh.subfold_from_ix_new if use_new_subfold else rh.subfold_from_ix_OLD
sub2ids = defaultdict(list)
for so in sos:
keys_missing = len(attr_keys)
# use cached/loaded attributes
for k in attr_keys:
if k in so.attr_dict:
out[k][so.id] = so.attr_dict[k]
keys_missing -= 1
if keys_missing == 0:
continue
subf = subf_from_ix(so.id, nf)
sub2ids[subf].append(so.id)
for subfold, ids in sub2ids.items():
attr_p = f'{base_path}/{subfold}/attr_dict.pkl'
ad = AttributeDict(attr_p, disable_locking=True)
for so_id in ids:
so_dict = ad[so_id]
for attr_key in attr_keys:
try:
out[attr_key][so_id] = so_dict[attr_key]
except KeyError as e:
if allow_missing:
out[attr_key][so_id] = None
else:
raise KeyError(e)
if len(attr_keys) == 1:
out = out[attr_keys[0]]
return out
[docs]def prepare_so_attr_cache(sd: 'SegmentationDataset', so_ids: np.ndarray, attr_keys: List[str]) -> Dict[str, dict]:
"""
Prepares a cache of attributes for a subset of SegmentationObjects within a SegmentationDataset.
Args:
sd: The SegmentationDataset containing the SegmentationObjects.
so_ids: An array of SegmentationObject IDs for which to collect attributes.
attr_keys: A list of attribute keys to collect. The corresponding numpy arrays
must exist.
Returns:
Dict[str, dict]: A dictionary with `attr_keys` as keys and dictionaries of
attribute values for the IDs `so_ids`. For example, `attr_cache[attr_keys[0]]
[so_ids[0]]` will return the attribute value of type `attr_keys[0]` for the
first SegmentationObject in `so_ids`.
"""
attr_cache = {k: dict() for k in attr_keys}
# TODO: Use BinarySearchStore
soid2ix = {so_id: sd.soid2ix[so_id] for so_id in so_ids}
sd._soid2ix = None # free memory
for attr in attr_keys:
np_cache = sd.load_numpy_data(attr, allow_nonexisting=False)
for so_id in so_ids:
attr_cache[attr][so_id] = np_cache[soid2ix[so_id]]
del np_cache
return attr_cache
[docs]def load_so_voxels_bulk(sos: List['SegmentationObject'],
use_new_subfold: bool = True, cache_decomp=True):
"""
Args:
sos:
use_new_subfold:
cache_decomp:
Returns:
"""
raise NotImplementedError('WIP')
vd_out = VoxelStorage(None, cache_decomp=cache_decomp) # in-memory dict with compression
if len(sos) == 0:
return vd_out
base_path = sos[0].so_storage_path
nf = sos[0].n_folders_fs
subf_from_ix = rh.subfold_from_ix_new if use_new_subfold else \
rh.subfold_from_ix_OLD
sub2ids = defaultdict(list)
for so in sos:
subf = subf_from_ix(so.id, nf)
sub2ids[subf].append(so.id)
cnt = 0
for subfold, ids in sub2ids.items():
voxel_path = f'{base_path}/{subfold}/voxel.pkl'
vd = VoxelStorage(voxel_path, disable_locking=True)
for so_id in ids:
cnt += 1
vd_out._dc_intern[so_id] = vd._dc_intern[so_id]
assert cnt == len(sos)
return vd_out
def _helper_func(args):
ps, use_vxsize = args
out = []
for p in ps:
if not use_vxsize:
w = len(AttributeDict(p + '/attr_dict.pkl', disable_locking=True))
else:
w = np.sum([v['size'] for v in AttributeDict(p + '/attr_dict.pkl', disable_locking=True).values()])
out.append(w)
return out
[docs]def get_sd_load_distribution(sd: 'SegmentationDataset', use_vxsize: bool = True) -> np.ndarray:
"""
Calculates the load distribution, which is the number of objects per storage, for
the given SegmentationDataset's AttributeDicts. This is useful for understanding
the distribution of data across different storage units and can help in optimizing
data access patterns.
Args:
sd: An instance of SegmentationDataset for which the load distribution is to be
calculated.
use_vxsize: A boolean flag indicating whether to use voxel size for calculating
the load distribution (default is False). If True, the load is
calculated based on the voxel size of each object.
Returns:
A numpy array representing the load distribution across the storage units of
the SegmentationDataset. Each element in the array corresponds to the number of
objects in a storage unit.
"""
n_objects = start_multiprocess_imap(_helper_func, [(ch, use_vxsize) for ch in chunkify(sd.so_dir_paths, 1000)],
nb_cpus=None)
return np.concatenate(n_objects).astype(np.int64)
[docs]def generate_skeleton_sv(so: 'SegmentationObject') -> Dict[str, np.ndarray]:
"""
Generates a simplified skeleton representation for a given SegmentationObject.
This function is a quick solution for creating a skeleton, primarily used for
glia predictions. The skeleton consists of nodes, edges, and uniform diameters.
Args:
so: The SegmentationObject instance for which the skeleton is to be
generated.
Returns:
A dictionary containing the keys "nodes", "edges", and "diameters". Each key
maps to a value of 1, indicating a basic scaffold structure without any
specific measurements or attributes.
"""
verts = so.mesh[1].reshape(-1, 3)
# choose random subset of surface vertices
np.random.seed(0)
ixs = np.arange(len(verts))
np.random.shuffle(ixs)
ixs = ixs[:int(0.5 * len(ixs))]
if global_params.config.use_new_renderings_locs:
locs = generate_rendering_locs(verts[ixs], 1000)
else:
locs = surface_samples(verts[ixs], bin_sizes=(1000, 1000, 1000),
max_nb_samples=10000, r=500)
g = create_graph_from_coords(locs, mst=True)
if g.number_of_edges() == 1:
edge_list = np.array(list(g.edges()))
else:
edge_list = np.array(g.edges())
del g
if edge_list.ndim != 2:
raise ValueError("Edge list ist not a 2D array: {}\n{}".format(
edge_list.shape, edge_list))
skeleton = dict()
skeleton["nodes"] = (locs / np.array(so.scaling)).astype(np.int32)
skeleton["edges"] = edge_list
skeleton["diameters"] = np.ones(len(locs))
return skeleton
[docs]def calc_center_of_mass(point_arr: np.ndarray) -> np.ndarray:
"""
Calculates the closest point to the center of mass from a given array of points.
This function identifies a representative point nearest to the geometric center of
a collection of points. It assumes uniform point distribution in isotropic units.
Args:
point_arr: A numpy array of points, where each point is represented by its
coordinates (x, y, z). The points should conform to isotropic
units preferably in nanometers.
Returns:
A numpy array representing the single point that is closest to the center of
mass of the input array of points.
"""
# downsampling to ensure fast processing - this is deterministic!
if len(point_arr) > 1e5:
with temp_seed(0):
idx = np.random.randint(0, len(point_arr), int(1e5))
point_arr = point_arr[idx]
# calculate mean
center_of_mass = np.mean(point_arr, axis=0)
# ensure that the point is contained inside of the object,
# i.e. use closest existing point to center of mass
kdtree = spatial.cKDTree(point_arr)
dd, ii = kdtree.query(center_of_mass, k=1)
center_point = point_arr[ii]
return center_point