# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max-Planck-Institute of Neurobiology, Munich, Germany
# Authors: Philipp Schubert
import os
import shutil
import time
from logging import Logger
from multiprocessing import Process
from typing import Optional, Callable, Tuple, Dict, Any, Union
import networkx as nx
import numpy as np
import tqdm
from syconn import global_params
from syconn.extraction import object_extraction_wrapper as oew
from syconn.handler.basics import chunkify, kd_factory
from syconn.handler.config import initialize_logging
from syconn.mp import batchjob_utils as qu
from syconn.mp.mp_utils import start_multiprocess_imap
from syconn.proc import sd_proc
from syconn.proc import ssd_proc
from syconn.proc.graphs import create_ccsize_dict
from syconn.reps.segmentation import SegmentationDataset
from syconn.reps.super_segmentation import SuperSegmentationDataset, SuperSegmentationObject
[docs]def run_create_neuron_ssd(apply_ssv_size_threshold: bool = False, ncores_per_job: int = 1, overwrite: bool = False):
"""
Creates a :class:`~syconn.reps.super_segmentation_dataset.SuperSegmentationDataset` with
``version=0`` at the currently active working directory based on the SV graph
at :attr:`~syconn.handler.config.DynConfig.neuron_svgraph_path`. In case astrocyte splitting is active,
this will be the SV graph after astrocyte removal, if it was disabled it is identical to ``pruned_svgraph.bz2``.
Args:
apply_ssv_size_threshold: Apply filter with minimum bounding box diagonal. This is usually not needed as the
filter is applied either in :func:`~run_create_rag` (prior_astrocyte_removal=False) or during the astrocyte
separation.
ncores_per_job: Number of cores per worker for
:func:`~syconn.reps.super_segmentation_dataset.save_dataset_deep`.
overwrite:
Notes:
* This is a memory intensiv step, consider increasing `ncores_per_job`.
* Requires :func:`~syconn.exec_init.init_cell_subcell_sds` and
optionally :func:`~run_astrocyte_splitting`.
* Networkx requires a lot of memory for >1e9 edges, graph_tool and igraph are not usable for this either.
Currently the work-around is to not use the graph information when storing the cell SV graph, but only the
connected component as a graph with N-1 edges (N nodes). [TODO]
Returns:
"""
working_dir = global_params.config.working_dir
log = initialize_logging('ssd_generation', working_dir + '/logs/', overwrite=False)
cc_dict = {}
if apply_ssv_size_threshold:
g_p = global_params.config.neuron_svgraph_path
sv_g = nx.read_edgelist(g_p, nodetype=np.uint64)
sd = SegmentationDataset("sv", working_dir=working_dir)
sv_size_dict = {}
bbs = sd.load_numpy_data('bounding_box') * sd.scaling
for ii in range(len(sd.ids)):
sv_size_dict[sd.ids[ii]] = bbs[ii]
ccsize_dict = create_ccsize_dict(sv_g, sv_size_dict)
log.info("Finished preparation of SSV size dictionary based "
"on bounding box diagional of corresponding SVs.")
before_cnt = len(sv_g.nodes())
for ix in list(sv_g.nodes()):
if ccsize_dict[ix] < global_params.config['min_cc_size_ssv']:
sv_g.remove_node(ix)
log.info("Removed %d neuron CCs because of size." % (before_cnt - len(sv_g.nodes())))
for cc in nx.connected_components(sv_g):
cc_arr = np.array(list(cc), dtype=np.uint64)
cc_dict[np.min(cc_arr)] = cc_arr
else:
g_p = global_params.config.neuron_svagg_list_path
with open(g_p, 'r') as f:
for line in f:
cc = [int(el) for el in line.split(',')]
cc = np.array(cc, dtype=np.uint64)
cc_dict[np.min(cc)] = cc
cc_dict_inv = {}
for ssv_id, cc in cc_dict.items():
for sv_id in cc:
cc_dict_inv[sv_id] = ssv_id
log.info('Parsed RAG from {} with {} SSVs and {} SVs.'.format(g_p, len(cc_dict), len(cc_dict_inv)))
ssd = SuperSegmentationDataset(working_dir=working_dir, version='0',
ssd_type="ssv", sv_mapping=cc_dict_inv, create=True,
overwrite=overwrite)
# create cache-arrays for frequently used attributes
# also executes 'ssd.save_dataset_shallow()' and populates sv_ids attribute of all SSVs
ssd.save_dataset_deep(nb_cpus=ncores_per_job)
max_n_jobs = global_params.config['ncores_per_node'] * 3
# Write SSV RAGs
multi_params = ssd.ssv_ids[np.argsort(ssd.load_numpy_data('size'))[::-1]]
# split all cells into chunks within upper half and lower half (sorted by size)
# -> process a balanced load of large cells with the first jobs, and then the other, smaller half
half_ix = len(multi_params) // 2
njobs_per_half = max(max_n_jobs // 2, 1)
multi_params = chunkify(multi_params[:half_ix], njobs_per_half) + \
chunkify(multi_params[half_ix:], njobs_per_half)
multi_params = [(ssv_ids, [cc_dict[ssv_id] for ssv_id in ssv_ids]) for ssv_ids in multi_params]
start_multiprocess_imap(_ssv_svgraph_writer, multi_params, debug=False,
nb_cpus=global_params.config['ncores_per_node'])
log.info('Finished saving cell SV graphs.')
log.info('Finished SSD initialization. Starting cellular organelle mapping.')
# map cellular organelles to SSVs
ssd_proc.aggregate_segmentation_object_mappings(ssd, global_params.config['process_cell_organelles'],
nb_cpus=ncores_per_job)
ssd_proc.apply_mapping_decisions(ssd, global_params.config['process_cell_organelles'],
nb_cpus=ncores_per_job)
log.info('Finished mapping of cellular organelles to SSVs.')
def _ssv_svgraph_writer(args):
ssv_ids, sv_lists = args
config = global_params.config
for ssv_id, sv_list in zip(ssv_ids, sv_lists):
ssv = SuperSegmentationObject(ssv_id, config=config)
# create "dummy" graph
if len(sv_list) == 1:
sv_list = [sv_list[0], sv_list[0]] # this will add an edge between the single supervoxel..
sv_graph = nx.from_edgelist([(sv_list[ii], sv_list[ii+1]) for ii in range(len(sv_list) - 1)])
if not nx.is_connected(sv_graph):
print(f'WARNING: SV graph of SSO with ID={ssv_id} is not connected.')
nx.write_edgelist(sv_graph, ssv.edgelist_path)
[docs]def sd_init(co: str, max_n_jobs: int, log: Optional[Logger] = None):
"""
Initialize :class:`~syconn.reps.segmentation.SegmentationDataset` of given
supervoxel type `co`.
Args:
co: Cellular organelle identifier (e.g. 'mi', 'vc', ...).
max_n_jobs: Number of parallel jobs.
log: Logger.
"""
sd_seg = SegmentationDataset(obj_type=co, working_dir=global_params.config.working_dir,
version="0")
multi_params = chunkify(sd_seg.so_dir_paths, max_n_jobs)
so_kwargs = dict(working_dir=global_params.config.working_dir, obj_type=co)
multi_params = [[par, so_kwargs] for par in multi_params]
if not global_params.config.use_new_meshing and \
(co != "sv" or (co == "sv" and global_params.config.allow_mesh_gen_cells)):
_ = qu.batchjob_script(
multi_params, 'mesh_caching', suffix=co, remove_jobfolder=False, log=log)
# write mesh properties to attribute dictionaries if old meshing is active
if not global_params.config.use_new_meshing:
sd_proc.dataset_analysis(sd_seg, recompute=False, compute_meshprops=True)
[docs]def kd_init(co, chunk_size, transf_func_kd_overlay: Optional[Callable],
load_cellorganelles_from_kd_overlaycubes: bool,
cube_of_interest_bb: Tuple[np.ndarray],
log: Logger):
"""
Replaced by a single call of :func:`~generate_subcell_kd_from_proba`.
Initializes a per-object segmentation KnossosDataset for the given supervoxel type
`co` based on an initial prediction which location has to be defined in the config.yml file
for the `co` object, e.g. ``'kd_mi'`` for ``co='mi'``
(see :func:`~syconn.handler.config.generate_default_conf`). Results will be stored as a
KnossosDataset at `"{}/knossosdatasets/{}_seg/".format(global_params.config.working_dir, co)`.
Appropriate parameters have to be set inside the config.yml file, see
:func:`~syconn.extraction.object_extraction_wrapper.generate_subcell_kd_from_proba`
or :func:`~syconn.handler.config.generate_default_conf` for more details.
Examples:
Was used to process sub-cellular structures independently:
ps = [Process(target=kd_init, args=[co, chunk_size, transf_func_kd_overlay,
load_cellorganelles_from_kd_overlaycubes, cube_of_interest_bb, log])
for co in global_params.config['process_cell_organelles']]
for p in ps:
p.start()
time.sleep(5)
for p in ps:
p.join()
if p.exitcode != 0:
raise Exception(f'Worker {p.name} stopped unexpectedly with exit code {p.exitcode}.')
Args:
co: Type of cell organelle supervoxels, e.g 'mi' for mitochondria or 'vc' for
vesicle clouds.
chunk_size: Size of the cube which are processed by each worker.
transf_func_kd_overlay: Transformation applied on the prob. map or segmentation
data.
load_cellorganelles_from_kd_overlaycubes:
cube_of_interest_bb: Bounding of the (sub-) volume of the dataset
which is processed.
log: Logger.
"""
oew.generate_subcell_kd_from_proba(
co, chunk_size=chunk_size, transf_func_kd_overlay=transf_func_kd_overlay,
load_cellorganelles_from_kd_overlaycubes=load_cellorganelles_from_kd_overlaycubes,
cube_of_interest_bb=cube_of_interest_bb, log=log)
[docs]def init_cell_subcell_sds(chunk_size: Optional[Tuple[int, int, int]] = None,
n_folders_fs: int = 10000, n_folders_fs_sc: int = 10000,
max_n_jobs: Optional[int] = None,
load_cellorganelles_from_kd_overlaycubes: bool = False,
transf_func_kd_overlay: Optional[Dict[Any, Callable]] = None,
cube_of_interest_bb: Optional[Union[tuple, np.ndarray]] = None,
overwrite=False):
"""
Convert binary class segmentation mask of sub-cellular structure predictions into an isntance segmentation
using connected components / watershed.
Subsequently, the properties of sub-cellular structures (voxel count, coordinate, bounding box, mesh, ..) and
their associations with cell fragments (calculating the overlap between every sub-cellular structure and
cell fragment instance) are extracted.
Args:
chunk_size: Size of the cube which are processed by each worker.
n_folders_fs: Number of folders used to create the folder structure in
the resulting :class:`~syconn.reps.segmentation.SegmentationDataset`
for the cell supervoxels (``version='sv'``).
n_folders_fs_sc: Number of folders used to create the folder structure in
the resulting :class:`~syconn.reps.segmentation.SegmentationDataset`
for the cell organelle supervxeols (e.g. ``version='mi'``).
max_n_jobs: Number of parallel jobs.
load_cellorganelles_from_kd_overlaycubes:
transf_func_kd_overlay: Transformation applied on the prob. map or segmentation
data.
cube_of_interest_bb: Bounding of the (sub-) volume of the dataset
which is processed (minimum and maximum coordinates in mag1 voxels,
XYZ).
overwrite: If True, will overwrite existing data.
"""
log = initialize_logging('sd_generation', global_params.config.working_dir +
'/logs/', overwrite=True)
if transf_func_kd_overlay is None:
transf_func_kd_overlay = {k: None for k in global_params.config['process_cell_organelles']}
if chunk_size is None:
chunk_size = [512, 512, 512]
chunk_size_kdinit = chunk_size
if max_n_jobs is None:
max_n_jobs = global_params.config.ncore_total * 2
# loading cached data or adapt number of jobs/cache size dynamically,
# dependent on the dataset
kd = kd_factory(global_params.config.kd_seg_path)
if cube_of_interest_bb is None:
cube_of_interest_bb = [np.zeros(3, dtype=np.int32), kd.boundary]
log.info('Converting the predictions of the following cellular organelles to'
' KnossosDatasets: {}.'.format(global_params.config['process_cell_organelles']))
start = time.time()
oew.generate_subcell_kd_from_proba(
global_params.config['process_cell_organelles'],
chunk_size=chunk_size_kdinit, transf_func_kd_overlay=transf_func_kd_overlay,
load_cellorganelles_from_kd_overlaycubes=load_cellorganelles_from_kd_overlaycubes,
cube_of_interest_bb=cube_of_interest_bb, log=log, n_chunk_jobs=max_n_jobs,
overwrite=overwrite)
log.info('Finished KD generation after {:.0f}s.'.format(time.time() - start))
log.info('Generating SegmentationDatasets for subcellular structures {} and'
' cell supervoxels.'.format(global_params.config['process_cell_organelles']))
start = time.time()
sd_proc.map_subcell_extract_props(
global_params.config.kd_seg_path, global_params.config.kd_organelle_seg_paths,
n_folders_fs=n_folders_fs, n_folders_fs_sc=n_folders_fs_sc, n_chunk_jobs=max_n_jobs,
cube_of_interest_bb=cube_of_interest_bb, chunk_size=chunk_size, log=log,
overwrite=overwrite)
log.info('Finished extraction and mapping after {:.2f}s.'
''.format(time.time() - start))
log.info('Caching properties of subcellular structures {} and cell'
' supervoxels'.format(global_params.config['process_cell_organelles']))
start = time.time()
ps = [Process(target=sd_init, args=(co, max_n_jobs, log))
for co in ["sv"] + global_params.config['process_cell_organelles']]
for p in ps:
p.start()
time.sleep(2)
for p in ps:
p.join()
if p.exitcode != 0:
raise Exception(f'Worker {p.name} stopped unexpectedly with exit '
f'code {p.exitcode}.')
p.close()
log.info('Finished SD caching after {:.2f}s.'
''.format(time.time() - start))
[docs]def run_create_rag(graph_node_dtype=None):
# TODO: use BinarySearchStore
"""
If ``global_params.config.prior_astrocyte_removal==True``:
stores pruned RAG at ``global_params.config.pruned_svgraph_path``, required for all glia
removal steps. :func:`~syconn.exec.exec_inference.run_astrocyte_splitting`
will finally store the neuron SV graph.
else:
stores pruned SV graph at :attr:`~syconn.handler.config.DynConfig.pruned_svgraph_path`,
required by :func:`~syconn.exec.exec_init.run_create_neuron_ssd`.
Args:
graph_node_dtype: Defaults to ``np.uint64``.
"""
log = initialize_logging('sd_generation', global_params.config.working_dir +
'/logs/', overwrite=False)
if graph_node_dtype is None:
graph_node_dtype = np.uint64
# Crop RAG according to cell SVs found during SD generation and apply size threshold
G = nx.read_edgelist(global_params.config.init_svgraph_path, nodetype=graph_node_dtype)
if 0 in G.nodes():
G.remove_node(0)
log.warning('Found background node 0 in original graph. Removing.')
all_sv_ids_in_rag = np.array(list(G.nodes()), dtype=np.uint64)
log.info("Found {} SVs in initial RAG.".format(len(all_sv_ids_in_rag)))
# add single SV connected components to initial graph
sd = SegmentationDataset(obj_type='sv', working_dir=global_params.config.working_dir, cache_properties=['size'])
diff = np.setdiff1d(sd.ids, all_sv_ids_in_rag)
log.info(f'Found {len(diff)} single-element connected component SVs which were missing in initial RAG.')
for ix in diff:
G.add_edge(ix, ix)
log.debug("Found {} SVs in initial RAG after adding size-one connected "
"components.".format(G.number_of_nodes()))
# remove small connected components
sv_size_dict = {}
bbs = sd.load_numpy_data('bounding_box') * sd.scaling
for ii in range(len(sd.ids)):
sv_size_dict[sd.ids[ii]] = bbs[ii]
try:
ccsize_dict = create_ccsize_dict(G, sv_size_dict)
except ValueError as e:
raise ValueError from e
log.debug("Finished preparation of SSV size dictionary based "
"on bounding box diagonal of corresponding SVs.")
before_cnt = G.number_of_nodes()
# explicit copy needed, as G is modified in the loop
for ix in tqdm.tqdm(list(G.nodes()), total=before_cnt, desc='CC size filter'):
if ccsize_dict[ix] <= global_params.config['min_cc_size_ssv']:
G.remove_node(ix)
# TODO: check if this loop (despite cache_properties=['size'], see above) is limiting speed
total_size = 0
for n in tqdm.tqdm(G.nodes(), total=G.number_of_nodes(), desc='Total size'):
total_size += sd.get_segmentation_object(n).size
total_size_cmm = np.prod(sd.scaling) * total_size / 1e18
log.info(f"Removed {before_cnt - G.number_of_nodes()} SVs from RAG because of size (bounding box diagonal <= "
f"{global_params.config['min_cc_size_ssv']} nm). Final RAG contains {G.number_of_nodes()} SVs in "
f"{nx.number_connected_components(G)} CCs ({total_size_cmm} mm^3; {total_size / 1e9} Gvx).")
nx.write_edgelist(G, global_params.config.pruned_svgraph_path)
with open(global_params.config.pruned_svagg_list_path, 'w') as f:
for cc in nx.connected_components(G):
f.write(','.join([str(el) for el in cc]) + '\n')
log.debug("SV agglomerations have been written to file.")
if not global_params.config.prior_astrocyte_removal:
os.makedirs(global_params.config.working_dir + '/glia/', exist_ok=True)
shutil.copy(global_params.config.pruned_svgraph_path, global_params.config.neuron_svgraph_path)
shutil.copy(global_params.config.pruned_svagg_list_path, global_params.config.neuron_svagg_list_path)