Source code for syconn.extraction.cs_extraction_steps

# -*- coding: utf-8 -*-
# distutils: language=c++
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max Planck Institute of Neurobiology, Martinsried, Germany
# Authors: Philipp Schubert, Joergen Kornfeld
import gc
import glob
import os
import pickle as pkl
import shutil
import time
from collections import defaultdict
from logging import Logger
from typing import Optional, Dict, List, Tuple, Union, Callable

from multiprocessing import Process

import numpy as np
import scipy.ndimage
import tqdm
from knossos_utils import chunky
from knossos_utils import knossosdataset

from . import log_extraction
from .block_processing_C import extract_cs_syntype
from .object_extraction_steps import export_cset_to_kd_batchjob
from .object_extraction_wrapper import calculate_chunk_numbers_for_box
from .. import global_params
from ..backend.storage import AttributeDict, VoxelStorageDyn, VoxelStorage, CompressedStorage
from ..handler import compression, basics
from ..mp import batchjob_utils as qu
from ..mp.mp_utils import start_multiprocess_imap
from ..proc.sd_proc import _cache_storage_paths
from ..proc.sd_proc import merge_prop_dicts, dataset_analysis
from ..proc.image import apply_morphological_operations, get_aniso_struct
from ..reps import rep_helper
from ..reps import segmentation
from .find_object_properties import merge_type_dicts, detect_cs_64bit, detect_cs, find_object_properties, \
find_object_properties_cs_64bit, merge_voxel_dicts


[docs]def extract_contact_sites(chunk_size: Optional[Tuple[int, int, int]] = None, log: Optional[Logger] = None, max_n_jobs: Optional[int] = None, cube_of_interest_bb: Optional[np.ndarray] = None, n_folders_fs: int = 1000, cube_shape: Optional[Tuple[int]] = None, overwrite: bool = False, transf_func_sj_seg: Optional[Callable] = None): """ Extracts contact sites and their overlap with ``sj`` objects and stores them in a :class:`~syconn.reps.segmentation.SegmentationDataset` of type ``cs`` and ``syn`` respectively. If synapse type is available, this information will be stored as the voxel-ratio per class in the attribute dictionary of the ``syn`` objects (keys: ``sym_prop``, ``asym_prop``). These properties will further be used by :func:`~syconn.extraction.cs_processing_steps.combine_and_split_syn` which aggregates per-SV synapse fragments (syn) to per-SSV synapses (syn_ssv). Examples: The synapse type labels and KnossosDatasets are defined in the `config.yml` file and can be set initially by changing the following attributes depending on how the synapse type prediction is stored. (i) The type prediction is stored as segmentation in a single data set with three labels (0: background, 1: symmetric, 2: asymmetric): kd_asym_path = root_dir + 'kd_asym_sym/' kd_sym_path = root_dir + 'kd_asym_sym/' key_val_pairs_conf = [ ('cell_objects', {'sym_label': 1, 'asym_label': 2,} ) ] generate_default_conf(working_dir, kd_sym=kd_sym_path, kd_asym=kd_asym_path, key_value_pairs=key_val_pairs_conf) (ii) The type prediction is stored as segmentation in a two data sets each with two labels (0: background, 1: symmetric and 0: background, 1: asymmetric): kd_asym_path = root_dir + 'kd_asym/' kd_sym_path = root_dir + 'kd_sym/' key_val_pairs_conf = [ ('cell_objects', {'sym_label': 1, 'asym_label': 1,} ) ] generate_default_conf(working_dir, kd_sym=kd_sym_path, kd_asym=kd_asym_path, key_value_pairs=key_val_pairs_conf) (iii) The type prediction is stored as probability map in the raw channel (uint8, range: 0..255) in a data set for each type: kd_asym_path = root_dir + 'kd_asym/' kd_sym_path = root_dir + 'kd_sym/' key_val_pairs_conf = [ ('cell_objects', {'sym_label': None, 'asym_label': None,} ) ] generate_default_conf(working_dir, kd_sym=kd_sym_path, kd_asym=kd_asym_path, key_value_pairs=key_val_pairs_conf) Notes: * Deletes existing KnossosDataset and SegmentationDataset of type 'syn' and 'cs'! * Replaced ``find_contact_sites``, ``extract_agg_contact_sites``, ` `syn_gen_via_cset`` and ``extract_synapse_type``. Args: chunk_size: Sub-cube volume which is processed at a time. log: Logger. max_n_jobs: Maximum number of jobs, only used as a lower bound. cube_of_interest_bb: Sub-volume of the data set which is processed. Default: Entire data set. n_folders_fs: Number of folders used for organizing supervoxel data. cube_shape: Cube shape used within 'syn' and 'cs' KnossosDataset. overwrite: Overwrite existing cache. transf_func_sj_seg: Method that converts the cell organelle segmentation into a binary mask of background vs. sj foreground. """ if extract_cs_syntype is None: msg = '`extract_contact_sites` requires the cythonized method ' \ '`extract_cs_syntype`.' log_extraction.error(msg) raise ImportError(msg) kd = basics.kd_factory(global_params.config.kd_seg_path) if cube_of_interest_bb is None: cube_of_interest_bb = [np.zeros(3, dtype=np.int32), kd.boundary] if cube_shape is None: cube_shape = (256, 256, 256) if chunk_size is None: chunk_size = (512, 512, 512) if np.any(np.array(chunk_size) % np.array(cube_shape)): raise ValueError('Chunk size must be divisible by cube shape.') if max_n_jobs is None: max_n_jobs = global_params.config.ncore_total * 8 size = cube_of_interest_bb[1] - cube_of_interest_bb[0] + 1 offset = cube_of_interest_bb[0] if global_params.config.use_new_subfold: target_dir_func = rep_helper.subfold_from_ix_new else: target_dir_func = rep_helper.subfold_from_ix_OLD # check for existing SDs sd_syn = segmentation.SegmentationDataset(working_dir=global_params.config.working_dir, obj_type='syn', version=0) sd_cs = segmentation.SegmentationDataset(working_dir=global_params.config.working_dir, obj_type='cs', version=0) if os.path.exists(sd_syn.path) or os.path.exists(sd_cs.path): if overwrite: shutil.rmtree(sd_syn.path, ignore_errors=True) shutil.rmtree(sd_cs.path, ignore_errors=True) else: raise FileExistsError(f'Overwrite was set to False, but SegmentationDataset "syn" or' f' "cs" already exists.') # Initial contact site extraction cd_dir = global_params.config.temp_path + "/chunkdatasets/cs/" # Class that contains a dict of chunks (with coordinates) after initializing it cset = chunky.ChunkDataset() cset.initialize(kd, kd.boundary, chunk_size, cd_dir, box_coords=[0, 0, 0], fit_box_size=True) if log is None: log = log_extraction if size is not None and offset is not None: chunk_list, _ = \ calculate_chunk_numbers_for_box(cset, offset, size) else: chunk_list = [ii for ii in range(len(cset.chunk_dict))] all_times = [] step_names = [] dict_paths_tmp = [] dir_props = f"{global_params.config.temp_path}/tmp_props_cssyn/" # remove previous temporary results. if os.path.isdir(dir_props): if not overwrite: msg = f'Could not start extraction of supervoxel objects ' \ f'because temporary files already exist at "{dir_props}" ' \ f'and overwrite was set to False.' log.error(msg) raise FileExistsError(msg) log.debug(f'Found existing cache folder at {dir_props}. Removing it now.') shutil.rmtree(dir_props) os.makedirs(dir_props) # init KD for syn and cs for ot in ['cs', 'syn']: path_kd = f"{global_params.config.working_dir}/knossosdatasets/{ot}_seg/" if os.path.isdir(path_kd): log.debug('Found existing KD at {}. Removing it now.'.format(path_kd)) shutil.rmtree(path_kd) target_kd = knossosdataset.KnossosDataset() target_kd._cube_shape = cube_shape scale = np.array(global_params.config['scaling']) target_kd.scales = [scale, ] target_kd.initialize_without_conf(path_kd, kd.boundary, scale, kd.experiment_name, mags=[1, ], create_pyk_conf=True, create_knossos_conf=False) multi_params = [] iter_params = basics.chunkify(chunk_list, max_n_jobs) for ii, chunk_k in enumerate(iter_params): multi_params.append([[cset.chunk_dict[k] for k in chunk_k], global_params.config.kd_seg_path, ii, dir_props, transf_func_sj_seg]) # reduce step start = time.time() cs_worker_dc_fname = f'{global_params.config.temp_path}/cs_worker_dict.pkl' dict_paths_tmp += [cs_worker_dc_fname, dir_props] syn_ids = [] cs_ids = [] cs_worker_mapping = dict() # cs include syns if qu.batchjob_enabled(): path_to_out = qu.batchjob_script(multi_params, "contact_site_extraction", log=log, use_dill=True) out_files = glob.glob(path_to_out + "/*") for out_file in tqdm.tqdm(out_files, leave=False): with open(out_file, 'rb') as f: worker_nr, worker_res = pkl.load(f) syn_ids_curr = np.array(worker_res['syn'], dtype=np.uint64) cs_ids_curr = np.array(worker_res['cs'], dtype=np.uint64) syn_ids.append(syn_ids_curr) cs_ids.append(cs_ids_curr) cs_worker_mapping[worker_nr] = cs_ids_curr else: results = start_multiprocess_imap(_contact_site_extraction_thread, multi_params, verbose=False, debug=False, use_dill=True) for worker_nr, worker_res in tqdm.tqdm(results, leave=False): syn_ids_curr = np.array(worker_res['syn'], dtype=np.uint64) cs_ids_curr = np.array(worker_res['cs'], dtype=np.uint64) syn_ids.append(syn_ids_curr) cs_ids.append(cs_ids_curr) cs_worker_mapping[worker_nr] = cs_ids_curr del results log_extraction.debug(f'Collected partial results from {len(cs_worker_mapping)} workers.') with open(cs_worker_dc_fname, 'wb') as f: pkl.dump(cs_worker_mapping, f, protocol=4) del cs_worker_mapping, cset syn_ids = np.unique(np.concatenate(syn_ids)).astype(np.uint64) n_syn = len(syn_ids) del syn_ids cs_ids = np.unique(np.concatenate(cs_ids)).astype(np.uint64) n_cs = len(cs_ids) # only required as syn is a subset of cs! dest_p = f'{global_params.config.temp_path}/storage_targets_cs.pkl' dict_paths_tmp.append(dest_p) _ = _cache_storage_paths((dest_p, cs_ids, n_folders_fs)) del cs_ids step_names.append("extract objects and collect properties of cs and syn.") all_times.append(time.time() - start) log.info(f'Finished extraction of initial contact sites (#objects: {n_cs}) and synapses' f' (#objects: {n_syn}).') if n_syn == 0: log.critical('WARNING: Did not find any synapses during extraction step.') # create folders for existing (sub-)cell supervoxels to prevent collisions using makedirs for ii, struct in enumerate(['cs', 'syn']): sc_sd = segmentation.SegmentationDataset( working_dir=global_params.config.working_dir, obj_type=struct, version=0, n_folders_fs=n_folders_fs) if os.path.isdir(sc_sd.path): log.debug('Found existing SD at {}. Removing it now.'.format(sc_sd.path)) shutil.rmtree(sc_sd.path) ids = rep_helper.get_unique_subfold_ixs(n_folders_fs) for ix in tqdm.tqdm(ids, leave=False): curr_dir = sc_sd.so_storage_path + target_dir_func( ix, n_folders_fs) os.makedirs(curr_dir, exist_ok=True) # Write SD path = "{}/knossosdatasets/syn_seg/".format(global_params.config.working_dir) path_cs = "{}/knossosdatasets/cs_seg/".format(global_params.config.working_dir) storage_location_ids = rep_helper.get_unique_subfold_ixs(n_folders_fs) max_n_jobs = min(max_n_jobs, len(storage_location_ids)) n_cores = 2 if qu.batchjob_enabled() else 1 # use additional cores for loading data from disk # slightly increase ncores per worker to compensate IO related downtime multi_params = [(sv_id_block, n_folders_fs, path, path_cs, dir_props, n_cores) for sv_id_block in basics.chunkify(storage_location_ids, max_n_jobs)] if not qu.batchjob_enabled(): start_multiprocess_imap(_write_props_to_syn_thread, multi_params, debug=False) else: qu.batchjob_script(multi_params, "write_props_to_syn", log=log, n_cores=1, remove_jobfolder=True) # Mesh props are not computed as this is done for the agglomerated versions (only syn_ssv) da_kwargs = dict(recompute=False, compute_meshprops=False) procs = [Process(target=dataset_analysis, args=(sd_syn,), kwargs=da_kwargs), Process(target=dataset_analysis, args=(sd_cs,), kwargs=da_kwargs)] for p in procs: p.start() for p in procs: p.join() if p.exitcode != 0: raise Exception(f'Worker {p.name} stopped unexpectedly with exit ' f'code {p.exitcode}.') p.close() # remove temporary files for p in dict_paths_tmp: if os.path.isfile(p): os.remove(p) elif os.path.isdir(p): shutil.rmtree(p) shutil.rmtree(cd_dir, ignore_errors=True) if qu.batchjob_enabled(): jobfolder = os.path.abspath(f'{path_to_out}/../') try: shutil.rmtree(jobfolder, ignore_errors=False) except Exception as e: log.error(f'Could not delete job folder at "{jobfolder}". {str(e)}')
def _contact_site_extraction_thread(args: Union[tuple, list]) \ -> Tuple[int, Dict[str, List[dict]]]: """ Helper function to extract properties of ``cs`` and ``syn`` objects. Args: args: * ``Chunk`` objects * Path to KnossosDataset containing the cell supervoxels. Todo: * Get rid of the second argument -> use config parameter instead. Returns: Two lists of dictionaries (representative coordinates, bounding box and voxel count) for ``cs`` and ``syn`` objects, per-synapse counts of symmetric and asymmetric voxels. """ chunks = args[0] knossos_path = args[1] worker_nr = args[2] dir_props = args[3] transf_func_sj_seg = args[4] worker_dir_props = f"{dir_props}/{worker_nr}/" os.makedirs(worker_dir_props, exist_ok=True) morph_ops = global_params.config['cell_objects']['extract_morph_op'] scaling = np.array(global_params.config['scaling']) struct = get_aniso_struct(scaling) if global_params.config.syntype_available and \ (global_params.config.sym_label == global_params.config.asym_label) and \ (global_params.config.kd_sym_path == global_params.config.kd_asym_path): raise ValueError('Both KnossosDatasets and labels for symmetric and ' 'asymmetric synapses are identical. Either one ' 'must differ.') # init target KD for cs and syn segmentation kd_cs = basics.kd_factory(f"{global_params.config.working_dir}/knossosdatasets/cs_seg/") kd_syn = basics.kd_factory(f"{global_params.config.working_dir}/knossosdatasets/syn_seg/") # init. synaptic junction (sj) KD kd_sj = basics.kd_factory(global_params.config.kd_sj_path) # init synapse type KD if available if global_params.config.syntype_available: kd_syntype_sym = basics.kd_factory(global_params.config.kd_sym_path) kd_syntype_asym = basics.kd_factory(global_params.config.kd_asym_path) else: kd_syntype_sym, kd_syntype_asym = None, None # cell segmentation kd = basics.kd_factory(knossos_path) cs_props = [{}, defaultdict(list), {}] syn_props = [{}, defaultdict(list), {}] syn_voxels = {} tot_sym_cnt = {} tot_asym_cnt = {} cs_filtersize = np.array(global_params.config['cell_objects']['cs_filtersize']) cs_dilation = global_params.config['cell_objects']['cs_dilation'] # stencil_offset is used to load more data as these will be cropped when performing detect_cs(_64bit) stencil_offset = cs_filtersize // 2 # additional overlap, e.g. to prevent boundary artifacts by dilation/closing overlap = max(stencil_offset) for chunk in chunks: offset = np.array(chunk.coordinates - overlap) # also used for loading synapse data size = 2 * overlap + np.array(chunk.size) # also used for loading synapse data data = kd.load_seg(size=size + 2 * stencil_offset, offset=offset - stencil_offset, mag=1, datatype=np.uint64).astype(np.uint32, copy=False).swapaxes(0, 2) # contacts has size as given with `size`, because detect_cs performs valid conv. # -> contacts result is cropped by stencil_offset on each side contacts = np.asarray(detect_cs(data)) if transf_func_sj_seg is None: sj_d = (kd_sj.load_raw(size=size, offset=offset, mag=1).swapaxes(0, 2) > 255 * global_params.config['cell_objects']["probathresholds"]['sj']).astype('u1') else: sj_d = transf_func_sj_seg( kd_sj.load_seg(size=size, offset=offset, mag=1).swapaxes(0, 2)).astype('u1', copy=False) # apply morphological operations on sj binary mask if 'sj' in morph_ops: sj_d = apply_morphological_operations( sj_d.copy(), morph_ops['sj'], mop_kwargs=dict(structure=struct)).astype('u1', copy=False) # get binary mask for symmetric and asymmetric syn. type per voxel if global_params.config.syntype_available: if global_params.config.kd_asym_path != global_params.config.kd_sym_path: # TODO: add thresholds to global_params if global_params.config.sym_label is None: sym_d = (kd_syntype_sym.load_raw(size=size, offset=offset, mag=1).swapaxes(0, 2) >= 123).astype('u1', copy=False) else: sym_d = (kd_syntype_sym.load_seg(size=size, offset=offset, mag=1).swapaxes(0, 2) == global_params.config.sym_label).astype('u1', copy=False) if global_params.config.asym_label is None: asym_d = (kd_syntype_asym.load_raw(size=size, offset=offset, mag=1).swapaxes(0, 2) >= 123).astype('u1', copy=False) else: asym_d = (kd_syntype_asym.load_seg(size=size, offset=offset, mag=1).swapaxes(0, 2) == global_params.config.asym_label).astype('u1', copy=False) else: assert global_params.config.asym_label is not None, \ 'Label of asymmetric synapses is not set.' assert global_params.config.sym_label is not None, \ 'Label of symmetric synapses is not set.' # load synapse type classification results stored in the same KD sym_d = kd_syntype_sym.load_seg(size=size, offset=offset, mag=1).swapaxes(0, 2) # create copy asym_d = np.array(sym_d == global_params.config.asym_label, dtype=np.uint8) sym_d = np.array(sym_d == global_params.config.sym_label, dtype=np.uint8) else: sym_d = np.zeros_like(sj_d) asym_d = np.zeros_like(sj_d) # close gaps of contact sites prior to overlapping synaptic junction map with contact sites # returns rep. coords, bounding box and size for every ID in contacts # used to get location of every contact site to perform closing operation _, bb_dc, _ = find_object_properties(contacts) n_closings = overlap for ix in bb_dc.keys(): obj_start, obj_end = np.array(bb_dc[ix]) obj_start -= n_closings obj_start[obj_start < 0] = 0 obj_end += n_closings # create slice obj new_obj_slices = tuple(slice(obj_start[ii], obj_end[ii], None) for ii in range(3)) sub_vol = contacts[new_obj_slices] binary_mask = (sub_vol == ix).astype(np.int8, copy=False) if n_closings > 0: res = scipy.ndimage.binary_closing( binary_mask, iterations=n_closings) else: res = binary_mask # reduce fragmenting of contact sites if cs_dilation > 0: res = scipy.ndimage.binary_dilation(res, iterations=cs_dilation) # only update background or the objects itself and do not remove object voxels (res == 1), e.g. at boundary proc_mask = ((binary_mask == 1) | (sub_vol == 0)) & (res == 1) contacts[new_obj_slices][proc_mask] = res[proc_mask] * ix # this counts SJ foreground voxels overlapping with the CS objects # and the asym and sym voxels, do not use overlap here! curr_cs_p, curr_syn_p, asym_cnt, sym_cnt, curr_syn_vx = extract_cs_syntype( contacts[overlap:-overlap, overlap:-overlap, overlap:-overlap], sj_d[overlap:-overlap, overlap:-overlap, overlap:-overlap], asym_d[overlap:-overlap, overlap:-overlap, overlap:-overlap], # overlap was removed; use correct offset for the analysis of the object properties sym_d[overlap:-overlap, overlap:-overlap, overlap:-overlap], offset=offset + overlap) kd_cs.save_seg(offset=offset + overlap, mags=[1, ], data=contacts[overlap:-overlap, overlap:-overlap, overlap:-overlap].swapaxes(0, 2), data_mag=1) # syn segmentation contains the intersecting voxels between SJ and CS contacts[sj_d == 0] = 0 kd_syn.save_seg(offset=offset + overlap, mags=[1, ], data=contacts[overlap:-overlap, overlap:-overlap, overlap:-overlap].swapaxes(0, 2), data_mag=1) # overlap was removed; use correct offset for the analysis of the object properties merge_prop_dicts([cs_props, curr_cs_p], offset=offset + overlap) merge_prop_dicts([syn_props, curr_syn_p], offset=offset + overlap) merge_voxel_dicts([syn_voxels, curr_syn_vx], key_to_str=True) merge_type_dicts([tot_asym_cnt, asym_cnt]) merge_type_dicts([tot_sym_cnt, sym_cnt]) del curr_cs_p, curr_syn_p, asym_cnt, sym_cnt basics.write_obj2pkl(f'{worker_dir_props}/cs_props_{worker_nr}.pkl', cs_props) basics.write_obj2pkl(f'{worker_dir_props}/syn_props_{worker_nr}.pkl', syn_props) np.savez(f'{worker_dir_props}/syn_voxels_{worker_nr}.npz', **syn_voxels) basics.write_obj2pkl(f'{worker_dir_props}/tot_asym_cnt_{worker_nr}.pkl', tot_asym_cnt) basics.write_obj2pkl(f'{worker_dir_props}/tot_sym_cnt_{worker_nr}.pkl', tot_sym_cnt) return worker_nr, dict(cs=list(cs_props[0].keys()), syn=list(syn_props[0].keys())) # iterate over the subcellular SV ID chunks def _write_props_to_syn_thread(args): cs_ids_ch = args[0] n_folders_fs = args[1] knossos_path = args[2] knossos_path_cs = args[3] dir_props = args[4] if len(args) < 6: nb_cores = 4 else: nb_cores = args[5] min_obj_vx_dc = global_params.config['cell_objects']['min_obj_vx'] tmp_path = global_params.config.temp_path if global_params.config.use_new_subfold: target_dir_func = rep_helper.subfold_from_ix_new else: target_dir_func = rep_helper.subfold_from_ix_OLD for obj_id_mod in cs_ids_ch: # get destination paths for the current objects dest_dc_tmp = CompressedStorage(f'{tmp_path}/storage_targets_cs.pkl', disable_locking=True) k = target_dir_func(obj_id_mod, n_folders_fs) if k not in dest_dc_tmp or len(dest_dc_tmp[k]) == 0: continue obj_keys = dest_dc_tmp[k] del dest_dc_tmp sd = segmentation.SegmentationDataset(n_folders_fs=n_folders_fs, obj_type='syn', working_dir=global_params.config.working_dir, version=0) sd_cs = segmentation.SegmentationDataset(n_folders_fs=n_folders_fs, obj_type='cs', working_dir=global_params.config.working_dir, version=0) cs_props = [{}, defaultdict(list), {}] syn_props = [{}, defaultdict(list), {}] cs_sym_cnt = {} cs_asym_cnt = {} syn_voxels = {} # get cached worker lookup with open(f'{global_params.config.temp_path}/cs_worker_dict.pkl', "rb") as f: cs_workers_tmp = pkl.load(f) params = [(dir_props, worker_id, np.intersect1d(obj_ids, obj_keys)) for worker_id, obj_ids in cs_workers_tmp.items()] del cs_workers_tmp res = start_multiprocess_imap(_write_props_collect_helper, params, nb_cpus=nb_cores, show_progress=False, debug=False) for tmp_dcs_cs, tmp_dcs_syn, tmp_sym_dc, tmp_asym_dc, tmp_syn_vxs in res: if len(tmp_dcs_cs) == 0: continue # cs merge_prop_dicts([cs_props, tmp_dcs_cs]) del tmp_dcs_cs # syn merge_prop_dicts([syn_props, tmp_dcs_syn]) del tmp_dcs_syn merge_type_dicts([cs_asym_cnt, tmp_asym_dc]) del tmp_asym_dc merge_type_dicts([cs_sym_cnt, tmp_sym_dc]) del tmp_sym_dc merge_voxel_dicts([syn_voxels, tmp_syn_vxs], key_to_str=False) del tmp_syn_vxs # get dummy segmentation object to fetch attribute dictionary for this batch of object IDs dummy_so = sd.get_segmentation_object(obj_id_mod) attr_p = dummy_so.attr_dict_path vx_p = dummy_so.voxel_path this_attr_dc = AttributeDict(attr_p, read_only=False, disable_locking=True) # this class is only used to query the voxel data voxel_dc = VoxelStorageDyn(vx_p, voxel_mode=False, voxeldata_path=knossos_path, read_only=False, disable_locking=True) # get dummy CS segmentation object to fetch attribute dictionary for this batch of object IDs dummy_so_cs = sd_cs.get_segmentation_object(obj_id_mod) attr_p_cs = dummy_so_cs.attr_dict_path vx_p_cs = dummy_so_cs.voxel_path this_attr_dc_cs = AttributeDict(attr_p_cs, read_only=False, disable_locking=True) voxel_dc_cs = VoxelStorageDyn(vx_p_cs, voxel_mode=False, voxeldata_path=knossos_path_cs, read_only=False, disable_locking=True) ids_to_load_voxels = [] for cs_id in obj_keys: # write cs to dict if cs_props[2][cs_id] < min_obj_vx_dc['cs']: continue rp_cs = np.array(cs_props[0][cs_id], dtype=np.int32) bbs_cs = np.concatenate(cs_props[1][cs_id]) size_cs = cs_props[2][cs_id] this_attr_dc_cs[cs_id]["rep_coord"] = rp_cs this_attr_dc_cs[cs_id]["bounding_box"] = np.array( [bbs_cs[:, 0].min(axis=0), bbs_cs[:, 1].max(axis=0)]) this_attr_dc_cs[cs_id]["size"] = size_cs voxel_dc_cs[cs_id] = bbs_cs voxel_dc_cs.increase_object_size(cs_id, size_cs) voxel_dc_cs.set_object_repcoord(cs_id, rp_cs) # write syn to dict if cs_id not in syn_props[0] or syn_props[2][cs_id] < min_obj_vx_dc['syn']: continue rp = np.array(syn_props[0][cs_id], dtype=np.int32) bbs = np.concatenate(syn_props[1][cs_id]) size = syn_props[2][cs_id] this_attr_dc[cs_id]["rep_coord"] = rp bb = np.array( [bbs[:, 0].min(axis=0), bbs[:, 1].max(axis=0)]) this_attr_dc[cs_id]["bounding_box"] = bb this_attr_dc[cs_id]["size"] = size try: sym_prop = cs_sym_cnt[cs_id] / size except KeyError: sym_prop = 0 try: asym_prop = cs_asym_cnt[cs_id] / size except KeyError: asym_prop = 0 this_attr_dc[cs_id]["sym_prop"] = sym_prop this_attr_dc[cs_id]["asym_prop"] = asym_prop add_feat_dict = {'cs_id': cs_id, 'cs_size': size_cs} this_attr_dc[cs_id].update(add_feat_dict) voxel_dc[cs_id] = bbs voxel_dc.increase_object_size(cs_id, size) voxel_dc.set_object_repcoord(cs_id, rp) ids_to_load_voxels.append(cs_id) # # write voxels explicitly - this assumes reasonably sized synapses voxel_dc.set_voxel_cache(cs_id, np.array(syn_voxels[cs_id], dtype=np.uint32)) del syn_voxels[cs_id] voxel_dc.push() voxel_dc_cs.push() this_attr_dc.push() this_attr_dc_cs.push() def _write_props_collect_helper(args) -> Tuple[List[dict], List[dict], dict, dict, dict]: dir_props, worker_id, intersec = args if len(intersec) == 0: return [{}, {}, {}], [{}, {}, {}], {}, {}, {} worker_dir_props = f"{dir_props}/{worker_id}/" # cs fname = f'{worker_dir_props}/cs_props_{worker_id}.pkl' dc = basics.load_pkl2obj(fname) # convert lists to numpy arrays tmp_dcs_cs = [dict(), defaultdict(list), dict()] for k in intersec: tmp_dcs_cs[0][k] = np.array(dc[0][k], dtype=np.int32) tmp_dcs_cs[1][k] = np.array(dc[1][k], dtype=np.int32) tmp_dcs_cs[2][k] = dc[2][k] del dc # syn fname = f'{worker_dir_props}/syn_props_{worker_id}.pkl' dc = basics.load_pkl2obj(fname) fname = f'{worker_dir_props}/tot_sym_cnt_{worker_id}.pkl' curr_sym_cnt = basics.load_pkl2obj(fname) fname = f'{worker_dir_props}/tot_asym_cnt_{worker_id}.pkl' curr_asym_cnt = basics.load_pkl2obj(fname) fname = f'{worker_dir_props}/syn_voxels_{worker_id}.npz' curr_syn_vxs = np.load(fname) tmp_dcs_syn = [dict(), defaultdict(list), dict()] tmp_sym_dc = dict() tmp_asym_dc = dict() tmp_syn_vx = dict() for k in intersec: if k not in dc[0]: continue tmp_dcs_syn[0][k] = dc[0][k] tmp_dcs_syn[1][k] = dc[1][k] tmp_dcs_syn[2][k] = dc[2][k] tmp_syn_vx[k] = curr_syn_vxs[str(k)] # savez only allows string keys if k in curr_sym_cnt: tmp_sym_dc[k] = curr_sym_cnt[k] if k in curr_asym_cnt: tmp_asym_dc[k] = curr_asym_cnt[k] return tmp_dcs_cs, tmp_dcs_syn, tmp_sym_dc, tmp_asym_dc, tmp_syn_vx def _generate_storage_lookup(args): """ Generates a look-up dictionary for given storage destinations to corresponding object IDs in `id_chunk` (used for SegmentationObjects) by calling `rep_helper.subfold_from_ix` Args: args : List or Tuple id_chunk: SegmentationObject IDs req_subfold_keys : keys of requested storages n_folders_fs : number of folders Returns: dict: look-up dictionary: [key -> value] storage destination -> list of IDs """ id_chunk, req_subfold_keys, n_folders_fs = args dest_dc_tmp = defaultdict(list) cs_ids_ch_set = set(req_subfold_keys) for obj_id in id_chunk: subfold_key = rep_helper.subfold_from_ix(obj_id, n_folders_fs) if subfold_key in cs_ids_ch_set: dest_dc_tmp[subfold_key].append(obj_id) return dest_dc_tmp