# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max Planck Institute of Neurobiology, Martinsried, Germany
# Authors: Philipp Schubert, Joergen Kornfeld
# import here, otherwise it might fail if it is imported after importing torch
# see https://github.com/pytorch/pytorch/issues/19739
try:
import open3d as o3d
except ImportError:
pass # for sphinx build
import os
import re
import shutil
from collections import Counter
from logging import Logger
from typing import Iterable, Union, Optional, Any, Tuple, List
import numpy as np
from knossos_utils import knossosdataset
from knossos_utils.chunky import ChunkDataset, save_dataset
from knossos_utils.knossosdataset import KnossosDataset
from scipy.special import softmax
from scipy.stats import entropy
from sklearn.decomposition import PCA
from sklearn.neighbors import KNeighborsClassifier
from .basics import read_txt_from_zip, get_filepaths_from_dir, \
parse_cc_dict_from_kzip
from .compression import load_from_h5py, save_to_h5py
from .. import global_params
from ..handler import log_handler, log_main, basics
from ..handler.basics import chunkify
from ..handler.config import initialize_logging
from ..mp import batchjob_utils as qu
from ..proc.image import apply_morphological_operations
from ..reps import log_reps
# for readthedocs build
try:
import torch
except ImportError:
pass
[docs]def load_gt_from_kzip(zip_fname, kd_p, raw_data_offset=75, verbose=False,
mag=1):
"""
Loads ground truth from zip file, generated with Knossos. Corresponding
dataset config file is located at kd_p.
Args:
zip_fname: str
kd_p: str or List[str]
raw_data_offset: int or np.array
number of voxels used for additional raw offset, i.e. the offset for the
raw data will be label_offset - raw_data_offset, while the raw data
volume will be label_volume + 2*raw_data_offset. It will
use 'kd.scaling' to account for dataset anisotropy if scalar or a
list of length 3 hast to be provided for a custom x, y, z offset.
verbose: bool
mag: int
Data mag. level.
Returns: np.array, np.array
raw data (float32) (multiplied with 1/255.), label data (uint16)
"""
if type(kd_p) is str or type(kd_p) is bytes:
kd_p = [kd_p]
raw_data = []
label_data = []
for curr_p in kd_p:
kd = basics.kd_factory(curr_p)
bb = kd.get_movement_area(zip_fname)
offset, size = bb[0], bb[1] - bb[0]
scaling = np.array(kd.scale, dtype=np.int32)
if np.isscalar(raw_data_offset):
raw_data_offset = np.array(scaling[0] * raw_data_offset / scaling,
dtype=np.int32)
if verbose:
log_handler.debug(f'Using scale adapted raw offset: {raw_data_offset}')
elif len(raw_data_offset) != 3:
raise ValueError("Offset for raw cubes has to have length 3.")
else:
raw_data_offset = np.array(raw_data_offset)
raw = kd.load_raw(size=(size // mag + 2 * raw_data_offset) * mag,
offset=(offset // mag - raw_data_offset) * mag,
mag=mag).swapaxes(0, 2)
raw_data.append(raw[None,])
label = kd.load_kzip_seg(zip_fname, mag=mag).swapaxes(0, 2)
label = label
label_data.append(label[None,])
raw = np.concatenate(raw_data, axis=0).astype(np.float32)
label = np.concatenate(label_data, axis=0)
try:
_ = parse_cc_dict_from_kzip(zip_fname)
except: # mergelist.txt does not exist
label = np.zeros(size)
return raw.astype(np.float32) / 255., label
return raw.astype(np.float32) / 255., label
[docs]def predict_kzip(kzip_p, m_path, kd_path, clf_thresh=0.5, mfp_active=False,
dest_path=None, overwrite=False, gpu_ix=0,
imposed_patch_size=None):
"""
Predicts data contained in k.zip file (defined by bounding box in knossos)
Args:
kzip_p: str
path to kzip containing the raw data cube information
m_path: str
path to predictive model
kd_path: str
path to knossos dataset
clf_thresh: float
classification threshold
mfp_active: False
dest_path: str
path to destination folder, if None folder of k.zip is used.
overwrite: bool
gpu_ix: int
imposed_patch_size: tuple
Returns:
"""
cube_name = os.path.splitext(os.path.basename(kzip_p))[0]
if dest_path is None:
dest_path = os.path.dirname(kzip_p)
from elektronn2.utils.gpu import initgpu
if not os.path.isfile(dest_path + "/%s_data.h5" % cube_name) or overwrite:
raw, labels = load_gt_from_kzip(kzip_p, kd_p=kd_path,
raw_data_offset=0)
raw = xyz2zxy(raw)
initgpu(gpu_ix)
from elektronn2.neuromancer.model import modelload
m = modelload(m_path, imposed_patch_size=list(imposed_patch_size)
if isinstance(imposed_patch_size, tuple) else imposed_patch_size,
override_mfp_to_active=mfp_active, imposed_batch_size=1)
original_do_rates = m.dropout_rates
m.dropout_rates = ([0.0, ] * len(original_do_rates))
pred = m.predict_dense(raw[None,], pad_raw=True)[1]
# remove area without sufficient FOV
pred = zxy2xyz(pred)
raw = zxy2xyz(raw)
save_to_h5py([pred, raw], dest_path + "/%s_data.h5" % cube_name,
["pred", "raw"])
else:
pred, raw = load_from_h5py(dest_path + "/%s_data.h5" % cube_name,
hdf5_names=["pred", "raw"])
offset = parse_movement_area_from_zip(kzip_p)[0]
overlaycubes2kzip(dest_path + "/%s_pred.k.zip" % cube_name,
(pred >= clf_thresh).astype(np.uint32),
offset, kd_path)
[docs]def predict_h5(h5_path, m_path, clf_thresh=None, mfp_active=False,
gpu_ix=0, imposed_patch_size=None, hdf5_data_key=None,
data_is_zxy=True, dest_p=None, dest_hdf5_data_key="pred",
as_uint8=True):
"""
Predicts data from h5 file. Assumes raw data is already float32.
Args:
h5_path: str
path to h5 containing the raw data
m_path: str
path to predictive model
clf_thresh: float
classification threshold, if None, no thresholding
mfp_active: False
gpu_ix: int
imposed_patch_size: tuple
hdf5_data_key: str
if None, it uses the first entry in the list returned by
'load_from_h5py'
data_is_zxy: bool
if False, it will assumes data is [X, Y, Z]
dest_p: str
dest_hdf5_data_key: str
as_uint8: bool
Returns:
"""
if hdf5_data_key:
raw = load_from_h5py(h5_path, hdf5_names=[hdf5_data_key])[0]
else:
raw = load_from_h5py(h5_path, hdf5_names=None)
assert len(raw) == 1, "'hdf5_data_key' not given but multiple hdf5 " \
"elements found. Please define raw data key."
raw = raw[0]
if not data_is_zxy:
raw = xyz2zxy(raw)
from elektronn2.utils.gpu import initgpu
initgpu(gpu_ix)
if raw.dtype.kind in ('u', 'i'):
raw = raw.astype(np.float32) / 255.
from elektronn2.neuromancer.model import modelload
m = modelload(m_path, imposed_patch_size=list(imposed_patch_size)
if isinstance(imposed_patch_size, tuple) else imposed_patch_size,
override_mfp_to_active=mfp_active, imposed_batch_size=1)
original_do_rates = m.dropout_rates
m.dropout_rates = ([0.0, ] * len(original_do_rates))
pred = m.predict_dense(raw[None,], pad_raw=True)[1]
pred = zxy2xyz(pred)
raw = zxy2xyz(raw)
if as_uint8:
pred = (pred * 255).astype(np.uint8)
raw = (raw * 255).astype(np.uint8)
if clf_thresh:
pred = (pred >= clf_thresh).astype(np.float32)
if dest_p is None:
dest_p = h5_path[:-3] + "_pred.h5"
if hdf5_data_key is None:
hdf5_data_key = "raw"
save_to_h5py([raw, pred], dest_p, [hdf5_data_key, dest_hdf5_data_key])
[docs]def overlaycubes2kzip(dest_p: str, vol: np.ndarray, offset: np.ndarray,
kd_path: str):
"""
Writes segmentation volume to kzip.
Args:
dest_p: str
path to k.zip
vol: np.array
Segmentation or prediction (unsigned integer, XYZ).
offset: np.array
kd_path: str
Returns: np.array [Z, X, Y]
"""
kd = basics.kd_factory(kd_path)
kd.from_matrix_to_cubes(offset=offset, kzip_path=dest_p,
mags=[1], data=vol)
[docs]def xyz2zxy(vol: np.ndarray) -> np.ndarray:
"""
Swaps axes to ELEKTRONN convention ([M, .., X, Y, Z] -> [M, .., Z, X, Y]).
Args:
vol: np.array [M, .., X, Y, Z]
Returns: np.array [M, .., Z, X, Y]
"""
# assert vol.ndim == 3 # removed for multi-channel support
# adapt data to ELEKTRONN conventions (speed-up)
vol = vol.swapaxes(-2, -3) # y x z
vol = vol.swapaxes(-3, -1) # z x y
return vol
[docs]def zxy2xyz(vol: np.ndarray) -> np.ndarray:
"""
Swaps axes to ELEKTRONN convention ([M, .., Z, X, Y] -> [M, .., X, Y, Z]).
Args:
vol: np.array [M, .., Z, X, Y]
Returns: np.array [M, .., X, Y, Z]
"""
# assert vol.ndim == 3 # removed for multi-channel support
vol = vol.swapaxes(-2, -3) # x z y
vol = vol.swapaxes(-2, -1) # x y z
return vol
[docs]def xyz2zyx(vol: np.ndarray) -> np.ndarray:
"""
Swaps axes to ELEKTRONN convention ([M, .., X, Y, Z] -> [M, .., Z, X, Y]).
Args:
vol: np.array [M, .., X, Y, Z]
Returns: np.array [M, .., Z, X, Y]
"""
# assert vol.ndim == 3 # removed for multi-channel support
# adapt data to ELEKTRONN conventions (speed-up)
vol = vol.swapaxes(-1, -3) # [..., z, y, x]
return vol
[docs]def zyx2xyz(vol: np.ndarray) -> np.ndarray:
"""
Swaps axes to ELEKTRONN convention ([M, .., Z, X, Y] -> [M, .., X, Y, Z]).
Args:
vol: np.array [M, .., Z, X, Y]
Returns: np.array [M, .., X, Y, Z]
"""
# assert vol.ndim == 3 # removed for multi-channel support
vol = vol.swapaxes(-1, -3) # [..., x, y, z]
return vol
[docs]def create_h5_from_kzip(zip_fname: str, kd_p: str,
foreground_ids: Optional[Iterable[int]] = None,
overwrite: bool = True, raw_data_offset: int = 75,
debug: bool = False, mag: int = 1,
squeeze_data: int = True,
target_labels: Optional[Iterable[int]] = None,
apply_mops_seg: Optional[List[str]] = None):
"""
Create .h5 files for elektronn3 (zyx) input. Only supports binary labels
(0=background, 1=foreground).
Examples:
Suppose your k.zip file contains the segmentation GT with two segmentation
IDs 1, 2 and is stored at ``kzip_fname``. The corresponding
``KnossosDataset`` is located at ``kd_path`` .
The following code snippet will create an ``.h5`` file in the folder of
``kzip_fname`` with the raw data (additional offset controlled by
``raw_data_offset``) and the label data (either binary or defined by
``target_labels``) with the keys ``raw`` and ``label`` respectively::
create_h5_from_kzip(d_p=kd_path, raw_data_offset=75,
zip_fname=kzip_fname, mag=1, foreground_ids=[1, 2],
target_labels=[1, 2])
Args:
zip_fname: Path to the annotated kzip file.
kd_p: Path to the underlying raw data stored as KnossosDataset.
foreground_ids: IDs which have to be converted to foreground, i.e. 1. Everything
else is considered background (0). If None, everything except 0 is
treated as foreground.
overwrite: If True, will overwrite existing .h5 files
raw_data_offset: Number of voxels used for additional raw offset, i.e. the offset for the
raw data will be label_offset - raw_data_offset, while the raw data
volume will be label_volume + 2*raw_data_offset. It will
use 'kd.scaling' to account for dataset anisotropy if scalar or a
list of length 3 hast to be provided for a custom x, y, z offset.
debug: If True, file will have an additional 'debug' suffix and
raw_data_offset is set to 0. Also their bit depths are adatped to be the
same.
mag: Data mag. level.
squeeze_data: If True, label and raw data will be squeezed.
target_labels: If set, `foreground_ids` must also be set. Each ID in
`foreground_ids` will be mapped to the corresponding label in
`target_labels`.
apply_mops_seg: List of string identifiers for ndimage morphological operations.
"""
if not squeeze_data and apply_mops_seg is not None:
raise ValueError('Data might have axis with length one if squeeze_data=False.')
if target_labels is not None and foreground_ids is None:
raise ValueError('`target_labels` is set, but `foreground_ids` is None.')
fname, ext = os.path.splitext(zip_fname)
if fname[-2:] == ".k":
fname = fname[:-2]
if debug:
file_appendix = '_debug'
raw_data_offset = 0
else:
file_appendix = ''
fname_dest = fname + file_appendix + ".h5"
if os.path.isfile(fname_dest) and not overwrite:
print("File at {} already exists. Skipping.".format(fname_dest))
return
raw, label = load_gt_from_kzip(zip_fname, kd_p, mag=mag,
raw_data_offset=raw_data_offset)
if squeeze_data:
raw = raw.squeeze()
label = label.squeeze()
if foreground_ids is None:
try:
cc_dc = parse_cc_dict_from_kzip(zip_fname)
foreground_ids = np.concatenate(list(cc_dc.values()))
except: # mergelist.txt does not exist
foreground_ids = []
print("Foreground IDs not assigned. Inferring from "
"'mergelist.txt' in k.zip.:", foreground_ids)
create_h5_gt_file(fname_dest, raw, label, foreground_ids, debug=debug,
target_labels=target_labels, apply_mops_seg=apply_mops_seg)
[docs]def create_h5_gt_file(fname: str, raw: np.ndarray, label: np.ndarray,
foreground_ids: Optional[Iterable[int]] = None,
target_labels: Optional[Iterable[int]] = None,
debug: bool = False,
apply_mops_seg: Optional[List[str]] = None):
"""
Create .h5 files for ELEKTRONN input from two arrays.
Only supports binary labels (0=background, 1=foreground). E.g. for creating
true negative cubes set foreground_ids=[] to be an empty list. If set to
None, everything except 0 is treated as foreground.
Args:
fname: str
Path where h5 file should be saved
raw: np.array
label: np.array
foreground_ids: iterable
ids which have to be converted to foreground, i.e. 1. Everything
else is considered background (0). If None, everything except 0 is
treated as foreground.
target_labels: Iterable
If set, `foreground_ids` must also be set. Each ID in `foreground_ids` will
be mapped to the corresponding label in `target_labels`.
debug: bool
will store labels and raw as uint8 ranging from 0 to 255
apply_mops_seg: List of string identifiers for ndimage morphological operations.
"""
if target_labels is not None and foreground_ids is None:
raise ValueError('`target_labels` is set, but `foreground_ids` is None.')
print(os.path.split(fname)[1])
print("Label (before):", label.shape, label.dtype, label.min(), label.max())
label = binarize_labels(label, foreground_ids, target_labels=target_labels)
label = xyz2zxy(label)
raw = xyz2zxy(raw)
if apply_mops_seg is not None:
label = apply_morphological_operations(label, morph_ops=apply_mops_seg)
label = label.astype(np.uint16)
print("Raw:", raw.shape, raw.dtype, raw.min(), raw.max())
print("Label (after mapping):", label.shape, label.dtype, label.min(), label.max())
print("-----------------\nGT Summary:\n%s\n" % str(Counter(label.flatten()).items()))
if not fname[-2:] == "h5":
fname = fname + ".h5"
if debug:
raw = (raw * 255).astype(np.uint8, copy=False)
label = label.astype(np.uint8) * 255
save_to_h5py([raw, label], fname, hdf5_names=["raw", "label"])
[docs]def binarize_labels(labels: np.ndarray, foreground_ids: Iterable[int],
target_labels: Optional[Iterable[int]] = None):
"""
Transforms label array to binary label array (0=background, 1=foreground) or
to the labels provided in `target_labels` by mapping the foreground IDs
accordingly.
Args:
labels: np.array
foreground_ids: iterable
target_labels: Iterable
labels used for mapping foreground IDs.
Returns: np.array
"""
new_labels = np.zeros_like(labels)
if foreground_ids is None:
target_labels = [1]
if len(np.unique(labels)) > 2:
print("------------ WARNING -------------\n"
"Found more than two different labels during label "
"conversion\n"
"----------------------------------")
new_labels[labels != 0] = 1
else:
try:
_ = iter(foreground_ids)
except TypeError:
foreground_ids = [foreground_ids]
if target_labels is None:
target_labels = [1 for _ in foreground_ids]
for ii, ix in enumerate(foreground_ids):
new_labels[labels == ix] = target_labels[ii]
labels = new_labels
assert len(np.unique(labels)) <= len(target_labels) + 1
assert 0 <= np.max(labels) <= np.max(target_labels)
assert 0 <= np.min(labels) <= np.max(target_labels)
return labels.astype(np.uint16)
[docs]def parse_movement_area_from_zip(zip_fname: str) -> np.ndarray:
"""
Parse MovementArea (e.g. bounding box of labeled volume) from annotation.xml
in (k.)zip file.
Args:
zip_fname: str
Returns: np.array
Movement Area [2, 3]
"""
anno_str = read_txt_from_zip(zip_fname, "annotation.xml").decode()
line = re.findall("MovementArea (.*)/>", anno_str)
assert len(line) == 1
line = line[0]
bb_min = np.array([re.findall(r'min.\w="(\d+)"', line)], dtype=np.uint64)
bb_max = np.array([re.findall(r'max.\w="(\d+)"', line)], dtype=np.uint64)
# Movement area is stored with 0-indexing! No adjustment needed
return np.concatenate([bb_min, bb_max])
[docs]def pred_dataset(*args, **kwargs):
log_handler.warning("'pred_dataset' will be replaced by 'predict_dense_to_kd' in"
" the near future.")
return _pred_dataset(*args, **kwargs)
def _pred_dataset(kd_p, kd_pred_p, cd_p, model_p, imposed_patch_size=None,
mfp_active=False, gpu_id=0, overwrite=False, i=None, n=None):
"""
Helper function for dataset prediction. Runs prediction on whole or partial
knossos dataset. Imposed patch size has to be given in Z, X, Y!
Args:
kd_p: str
path to knossos dataset .conf file
kd_pred_p: str
path to the knossos dataset head folder which will contain the prediction
cd_p: str
destination folder for chunk dataset containing prediction
model_p: str
path tho ELEKTRONN2 model
imposed_patch_size: tuple or None
patch size (Z, X, Y) of the model
mfp_active: bool
activate max-fragment pooling (might be necessary to change patch_size)
gpu_id: int
the GPU used
overwrite: bool
True: fresh predictions ; False: earlier prediction continues
i:
n:
Returns:
"""
from elektronn2.utils.gpu import initgpu
initgpu(gpu_id)
from elektronn2.neuromancer.model import modelload
kd = KnossosDataset()
kd.initialize_from_knossos_path(kd_p, fixed_mag=1)
m = modelload(model_p, imposed_patch_size=list(imposed_patch_size)
if isinstance(imposed_patch_size, tuple) else imposed_patch_size,
override_mfp_to_active=mfp_active, imposed_batch_size=1)
original_do_rates = m.dropout_rates
m.dropout_rates = ([0.0, ] * len(original_do_rates))
offset = m.target_node.shape.offsets
offset = np.array([offset[1], offset[2], offset[0]], dtype=np.int32)
cd = ChunkDataset()
cd.initialize(kd, kd.boundary, [512, 512, 256], cd_p, overlap=offset,
box_coords=np.zeros(3), fit_box_size=True)
ch_dc = cd.chunk_dict
print('Total number of chunks for GPU/GPUs:', len(ch_dc.keys()))
if i is not None and n is not None:
chunks = ch_dc.values()[i::n]
else:
chunks = ch_dc.values()
print("Starting prediction of %d chunks in gpu %d\n" % (len(chunks), gpu_id))
if not overwrite:
for chunk in chunks:
try:
_ = chunk.load_chunk("pred")[0]
except Exception as e:
chunk_pred(chunk, m)
else:
for chunk in chunks:
try:
chunk_pred(chunk, m)
except KeyboardInterrupt as e:
print("Exiting out from chunk prediction: ", str(e))
return
save_dataset(cd)
# single gpu processing also exports the cset to kd
"""TODO: Use pyknossos conf like here to support bigger cube size:
target_kd = knossosdataset.KnossosDataset()
target_kd._cube_shape = cube_shape
scale = np.array(global_params.config['scaling'])
target_kd.scales = [scale, ]
target_kd.initialize_without_conf(path, kd.boundary, scale, kd.experiment_name,
mags=[1, ], create_pyk_conf=True, create_knossos_conf=False)
target_kd = basics.kd_factory(path) # test if init is possible
"""
if n is None:
kd_pred = KnossosDataset()
kd_pred.initialize_without_conf(kd_pred_p, kd.boundary, kd.scale,
kd.experiment_name, mags=[1, 2, 4, 8])
cd.export_cset_to_kd(kd_pred, "pred", ["pred"], [4, 4], as_raw=True,
stride=[256, 256, 256])
[docs]def predict_dense_to_kd(kd_path: str, target_path: str, model_path: str,
n_channel: int, target_names: Optional[Iterable[str]] = None,
target_channels: Optional[Iterable[Iterable[int]]] = None,
channel_thresholds: Optional[Iterable[Union[float, Any]]] = None,
log: Optional[Logger] = None, mag: int = 1,
overlap_shape_tiles: Tuple[int, int, int] = (40, 40, 20),
cube_of_interest: Optional[Tuple[np.ndarray]] = None,
overwrite: bool = False,
cube_shape_kd: Optional[Tuple[int]] = None):
"""
Helper function for dense dataset prediction. Runs predictions on the whole
knossos dataset located at `kd_path`.
Prediction results will be written to KnossosDatasets called `target_names`
at `target_path`. If no threshold and only one channel per `target_names`
is given, the resulting KnossosDataset will contain a probability map in the
raw channel as uint8 (0..255).
Otherwise the classification results will be written to the overlay channel.
Notes:
* Has a high GPU memory requirement (minimum 12GB). Does should be controllable from the config or determined
automatically.
* Resulting KnossosDatasets currently do not use pyknossos confs.
Args:
kd_path: Path to KnossosDataset .conf file of the raw data.
target_path: Destination directory for the output KnossosDataset(s)
which contain the prediction(s).
model_path: Path to elektronn3 model for predictions. Loaded via the
:class:`~elektronn3.inference.inference.Predictor`.
n_channel: Number of channels predicted by the model.
target_names: Names of target knossos datasets, e.g.
``target_names=['synapse_fb', 'synapse_type']``. Defaults to
``['pred']``. Length must match with `target_channels`.
target_channels: Channel_ids in prediction for each target knossos data set
e.g. ``target_channels=[(1, 2)]`` if prediction has two foreground labels.
Defaults to ``[[ix for ix in range(n_channel)]]``.
Length must match with `target_names`.
channel_thresholds: Thresholds for channels: If None and number of channels
for target kd is 1: probabilities are stored. Else: 0.5 as default
e.g. ``channel_thresholds=[None,0.5,0.5]``.
log: Logger.
mag: Data magnification level.
overlap_shape_tiles: [WIP] Overlap in voxels [XYZ] used for each
tile predicted during inference.
Currently the following chunk/tile properties are used additionally
(`overlap_shape` is the per-chunk overlap)::
chunk_size = np.array([1024, 1024, 256], dtype=np.int32) # XYZ
n_tiles = np.array([4, 4, 16])
tile_shape = (chunk_size / n_tiles).astype(np.int32)
# the final input shape must be a multiple of tile_shape
overlap_shape = tile_shape // 2
cube_of_interest: Bounding box of the volume of interest (minimum and maximum
coordinate in voxels in the respective magnification (see kwarg `mag`).
overwrite: Overwrite existing KDs.
cube_shape_kd: Cube shape used to store sub-volumes in KnossosDataset on the file system.
"""
if log is None:
log = initialize_logging('dense_predictions', global_params.config.working_dir + '/logs/', overwrite=False)
if target_names is None:
target_names = ['pred']
if target_channels is None:
target_channels = [[ix for ix in range(n_channel)]]
if not len(target_names) == len(target_channels):
msg = 'For every target name the target channels have to be specified.'
log_reps.error(msg)
raise ValueError(msg)
if channel_thresholds is None:
channel_thresholds = [None for _ in range(n_channel)]
kd = basics.kd_factory(kd_path)
if cube_of_interest is None:
cube_of_interest = (np.zeros(3, ), kd.boundary // mag)
if cube_shape_kd is None:
cube_shape_kd = (256, 256, 256)
# TODO: these should be config parameters
overlap_shape_tiles = np.array([30, 31, 20])
overlap_shape = overlap_shape_tiles
chunk_size = np.array([482, 481, 236])
# if qu.batchjob_enabled():
# chunk_size *= 2
tile_shape = [271, 181, 138]
cd = ChunkDataset()
cd.initialize(kd, cube_of_interest[1], chunk_size, target_path + '/cd_tmp/',
box_coords=cube_of_interest[0], list_of_coords=[],
fit_box_size=True, overlap=overlap_shape)
chunk_ids = list(cd.chunk_dict.keys())
# init target KnossosDatasets
target_kd_path_list = [target_path + '/{}/'.format(tn) for tn in target_names]
for path in target_kd_path_list:
if os.path.isdir(path):
if not overwrite:
msg = f'Found existing KD at "{path}" but overwrite is set to False.'
log.error(msg)
raise ValueError(msg)
log.debug('Found existing KD at {}. Removing it now.'.format(path))
shutil.rmtree(path)
for path in target_kd_path_list:
target_kd = knossosdataset.KnossosDataset()
target_kd._cube_shape = cube_shape_kd
scale = np.array(global_params.config['scaling'])
target_kd.scales = [scale, ]
# TODO: use pyk conf!
target_kd.initialize_without_conf(path, kd.boundary, kd.scale,
kd.experiment_name, [2 ** x for x in range(6)],
create_pyk_conf=False, create_knossos_conf=True)
try: # make sure init works
basics.kd_factory(path)
except ValueError as e:
log.error(f'Could not initialize KnossosDataset at "{path}". {e}')
# init batchjob parameters
multi_params = chunk_ids
multi_params = chunkify(multi_params, global_params.config.ngpu_total)
multi_params = [(ch_ids, kd_path, target_path, model_path, overlap_shape,
overlap_shape_tiles, tile_shape, chunk_size, n_channel, target_channels,
target_kd_path_list, channel_thresholds, mag, cube_of_interest)
for ch_ids in multi_params]
log.info('Started dense prediction of {} in {:d} chunk(s).'.format(", ".join(target_names), len(chunk_ids)))
n_cores_per_job = global_params.config['ncores_per_node'] // global_params.config['ngpus_per_node'] if \
qu.batchjob_enabled() else global_params.config['ncores_per_node']
qu.batchjob_script(multi_params, "predict_dense", n_cores=n_cores_per_job, suffix='_' + '_'.join(target_names),
remove_jobfolder=True, log=log, additional_flags="--gres=gpu:1")
log.info('Finished dense prediction of {}'.format(", ".join(target_names)))
[docs]def dense_predictor(args):
"""
Volumes are transformed by XYZ <-> ZYX before they are passed to the
model.
Args:
args: Tuple(
chunk_ids: list
list of chunks in chunk dataset
kd_p : str
path to knossos dataset .conf file
cd_p : str
destination folder for chunk dataset containing prediction
model_p : str
path to model
offset :
chunk_size:
...
)
Returns:
"""
# TODO: remove chunk necessity
# TODO: clean up (e.g. redundant chunk sizes, ...)
#
chunk_ids, kd_p, target_p, model_p, overlap_shape, overlap_shape_tiles, tile_shape, chunk_size, n_channel, \
target_channels, target_kd_path_list, channel_thresholds, mag, cube_of_interest = args
# init KnossosDataset:
kd = KnossosDataset()
kd.initialize_from_knossos_path(kd_p)
# init ChunkDataset:
cd = ChunkDataset()
cd.initialize(kd, cube_of_interest[1], chunk_size, target_p + '/cd_tmp/',
box_coords=cube_of_interest[0], list_of_coords=[],
fit_box_size=True, overlap=overlap_shape)
# init Target KnossosDataset
target_kd_dict = {}
for path in target_kd_path_list:
target_kd = knossosdataset.KnossosDataset()
target_kd = basics.kd_factory(path)
target_kd_dict[path] = target_kd
# init Predictor
from elektronn3.inference import Predictor
ix = 0
tile_shape = np.array(tile_shape)
while True:
try:
out_shape = (chunk_size + 2 * np.array(overlap_shape)).astype(np.int32)[::-1] # ZYX
out_shape = np.insert(out_shape, 0, n_channel) # output must equal chunk size
predictor = Predictor(model_p, strict_shapes=True, tile_shape=tile_shape[::-1],
out_shape=out_shape, overlap_shape=overlap_shape_tiles[::-1],
apply_softmax=True)
predictor.model.ae = False
_ = predictor.predict(np.zeros(out_shape[1:])[None, None])
break
except RuntimeError: # cuda MemoryError
if np.all(tile_shape % 2):
raise ValueError('Cannot reduce tile shape anymore. Please adapt '
'the tile/overlap/chunk shape in the function '
'that is calling `dense_predictor`.')
while tile_shape[ix] % 2:
ix += 1
tile_sh_orig = np.array(tile_shape)
tile_shape[ix] = tile_shape[ix] // 2
log_main.warn(f'Changed tile shape from {tile_sh_orig} to '
f'{tile_shape} to reduce memory requirements.')
ix = (ix + 1) % 3 # permute spatial dimension which is reduced
# predict Chunks
for ch_id in chunk_ids:
ch = cd.chunk_dict[ch_id]
ol = ch.overlap
size = np.array(np.array(ch.size) + 2 * np.array(ol),
dtype=np.int32)
coords = np.array(np.array(ch.coordinates) - np.array(ol),
dtype=np.int32)
raw = kd.load_raw(size=size * mag, offset=coords * mag, mag=mag)
pred = dense_predicton_helper(raw.astype(np.float32) / 255., predictor,
is_zyx=True, return_zyx=True)
# slice out the original input volume along ZYX, i.e. the last three axes
pred = pred[..., ol[2]:-ol[2], ol[1]:-ol[1], ol[0]:-ol[0]]
for j in range(len(target_channels)):
ids = target_channels[j]
path = target_kd_path_list[j]
data = np.zeros_like(pred[0]).astype(np.uint64)
save_as_raw = not (len(ids) > 1)
for label in ids:
t = channel_thresholds[label]
# if threshold is given or multiple target labels per dataset
# store classification results
# TODO: argmax might be more reasonable
if not save_as_raw:
if t is None:
t = 255 / 2
if t < 1.:
t = 255 * t
pred_mask = pred[label] > t
data[pred_mask] = label
else:
# no thresholding and only one label in the target KnossosDataset
# -> store probability map.
data = pred[label]
if save_as_raw:
target_kd_dict[path].save_raw(
offset=ch.coordinates * mag, data=data.astype(np.uint8),
data_mag=mag, mags=[mag, mag * 2, mag * 4],
fast_resampling=True, upsample=False)
else:
target_kd_dict[path].save_seg(
offset=ch.coordinates * mag, data=data, data_mag=mag,
mags=[mag, mag * 2, mag * 4],
fast_resampling=True, upsample=False)
[docs]def dense_predicton_helper(raw: np.ndarray, predictor: 'Predictor', is_zyx=False,
return_zyx=False) -> np.ndarray:
"""
Args:
raw: The input data array in CXYZ.
predictor: The model which performs the inference. Requires ``predictor.predict``.
is_zyx:
return_zyx:
Returns:
The inference result in CXYZ as uint8 between 0..255.
"""
# transform raw data
if not is_zyx:
raw = xyz2zyx(raw)
# predict: pred of the form (N, C, [D,], H, W)
pred = predictor.predict(raw[None, None]).numpy()
pred = np.array(pred[0]) * 255 # remove N-axis
pred = pred.astype(np.uint8)
if not return_zyx:
pred = zyx2xyz(pred)
return pred
[docs]def to_knossos_dataset(kd_p, kd_pred_p, cd_p, model_p,
imposed_patch_size, mfp_active=False):
"""
Args:
kd_p:
kd_pred_p:
cd_p:
model_p:
imposed_patch_size:
mfp_active:
Returns:
"""
from elektronn2.neuromancer.model import modelload
log_reps.warning('Depracation Warning; "to_knossos_dataset" is deprecated and will be '
'replaced by "predict_dense_to_kd" which immediately .')
kd = KnossosDataset()
kd.initialize_from_knossos_path(kd_p, fixed_mag=1)
kd_pred = KnossosDataset()
m = modelload(model_p, imposed_patch_size=list(imposed_patch_size)
if isinstance(imposed_patch_size, tuple) else imposed_patch_size,
override_mfp_to_active=mfp_active, imposed_batch_size=1)
original_do_rates = m.dropout_rates
m.dropout_rates = ([0.0, ] * len(original_do_rates))
offset = m.target_node.shape.offsets
offset = np.array([offset[1], offset[2], offset[0]], dtype=np.int32)
cd = ChunkDataset()
cd.initialize(kd, kd.boundary, [512, 512, 256], cd_p, overlap=offset,
box_coords=np.zeros(3), fit_box_size=True)
"""TODO: Use pyknossos conf like here to support bigger cube size:
target_kd = knossosdataset.KnossosDataset()
target_kd._cube_shape = cube_shape
scale = np.array(global_params.config['scaling'])
target_kd.scales = [scale, ]
target_kd.initialize_without_conf(path, kd.boundary, scale, kd.experiment_name,
mags=[1, ], create_pyk_conf=True, create_knossos_conf=False)
target_kd = basics.kd_factory(path) # test if init is possible
"""
kd_pred.initialize_without_conf(kd_pred_p, kd.boundary, kd.scale,
kd.experiment_name, mags=[1, 2, 4, 8])
cd.export_cset_to_kd(kd_pred, "pred", ["pred"], [4, 4], as_raw=True,
stride=[256, 256, 256])
[docs]def prediction_helper(raw, model, override_mfp=True,
imposed_patch_size=None):
"""
Helper function for predicting raw volumes (range: 0 to 255; uint8).
Will change X, Y, Z to ELEKTRONN format (Z, X, Y) and returns prediction
in standard format [X, Y, Z]. Imposed patch size has to be given in Z, X, Y!
Args:
raw: np.array
volume [X, Y, Z]
model: str or model object
path to model (.mdl)
override_mfp: bool
imposed_patch_size: tuple
in Z, X, Y FORMAT!
Returns: np.array
prediction data [X, Y, Z]
"""
if type(model) == str:
from elektronn2.neuromancer.model import modelload
m = modelload(model, imposed_patch_size=list(imposed_patch_size)
if isinstance(imposed_patch_size, tuple) else imposed_patch_size,
override_mfp_to_active=override_mfp, imposed_batch_size=1)
original_do_rates = m.dropout_rates
m.dropout_rates = ([0.0, ] * len(original_do_rates))
else:
m = model
raw = xyz2zxy(raw)
if raw.dtype.kind in ('u', 'i'):
# convert to float 32 and scale it
raw = raw.astype(np.float32) / 255.
if not raw.dtype == np.float32:
# assume already normalized between 0 and 1
raw = raw.astype(np.float32)
assert 0 <= np.max(raw) <= 1.0 and 0 <= np.min(raw) <= 1.0
pred = m.predict_dense(raw[None,], pad_raw=True)[1]
return zxy2xyz(pred)
[docs]def chunk_pred(ch: 'chunky.Chunk', model: 'torch.nn.Module', debug: bool = False):
"""
Helper function to write chunks.
Args:
ch: Chunk
model: str or model object
debug: bool
Returns:
"""
raw = ch.raw_data()
pred = prediction_helper(raw, model) * 255
pred = pred.astype(np.uint8)
ch.save_chunk(pred, "pred", "pred", overwrite=True)
if debug:
ch.save_chunk(raw, "pred", "raw", overwrite=False)
[docs]def get_glia_model_e3():
"""Those networks are typically trained with `naive_view_normalization_new` """
from elektronn3.models.base import InferenceModel
m = InferenceModel(global_params.config.mpath_glia_e3, normalize_func=naive_view_normalization_new)
return m
[docs]def get_celltype_model_e3():
"""Those networks are typically trained with `naive_view_normalization_new`
Unlike the other e3 InferenceModel instances, here the view normalization
is applied in the downstream inference method (`predict_sso_celltype`)
because the celltype model also gets scalar values as input which should
not be normalized."""
try:
from elektronn3.models.base import InferenceModel
except ImportError as e:
msg = "elektronn3 could not be imported ({}). Please see 'https://github." \
"com/ELEKTRONN/elektronn3' for more information.".format(e)
log_main.error(msg)
raise ImportError(msg)
m = torch.jit.load(global_params.config.mpath_celltype_e3)
m = InferenceModel(m, bs=40, multi_gpu=True)
return m
[docs]def get_semseg_spiness_model():
try:
from elektronn3.models.base import InferenceModel
except ImportError as e:
msg = "elektronn3 could not be imported ({}). Please see 'https://github." \
"com/ELEKTRONN/elektronn3' for more information.".format(e)
log_main.error(msg)
raise ImportError(msg)
path = global_params.config.mpath_spiness
m = torch.jit.load(path)
m = InferenceModel(m)
m._path = path
return m
[docs]def get_semseg_axon_model():
try:
from elektronn3.models.base import InferenceModel
except ImportError as e:
msg = "elektronn3 could not be imported ({}). Please see 'https://github." \
"com/ELEKTRONN/elektronn3' for more information.".format(e)
log_main.error(msg)
raise ImportError(msg)
path = global_params.config.mpath_axonsem
m = torch.jit.load(path)
m = InferenceModel(m)
m._path = path
return m
[docs]def get_tripletnet_model_e3():
"""Those networks are typically trained with `naive_view_normalization_new` """
try:
from elektronn3.models.base import InferenceModel
except ImportError as e:
msg = "elektronn3 could not be imported ({}). Please see 'https://github." \
"com/ELEKTRONN/elektronn3' for more information.".format(e)
log_main.error(msg)
raise ImportError(msg)
m = torch.jit.load(global_params.config.mpath_tnet)
m = InferenceModel(m)
return m
[docs]def get_myelin_cnn():
"""
elektronn3 model trained to predict binary myelin-in class.
Returns:
The trained Inference model.
"""
try:
from elektronn3.inference.inference import Predictor
except ImportError as e:
msg = "elektronn3 could not be imported ({}). Please see 'https://github." \
"com/ELEKTRONN/elektronn3' for more information.".format(e)
log_main.error(msg)
raise ImportError(msg)
m = torch.jit.load(global_params.config.mpath_myelin)
m = Predictor(m)
return m
[docs]def get_knn_tnet_embedding_e3():
"""OUTDATED"""
tnet_eval_dir = "{}/pred/".format(global_params.config.mpath_tnet)
return knn_clf_tnet_embedding(tnet_eval_dir)
[docs]def get_pca_tnet_embedding_e3():
"""OUTDATED"""
tnet_eval_dir = "{}/pred/".format(global_params.config.mpath_tnet)
return pca_tnet_embedding(tnet_eval_dir)
[docs]def naive_view_normalization(d):
# TODO: Remove with new dataset, only necessary for backwards compat.
d = d.astype(np.float32)
# perform pseudo-normalization
# (proper normalization: how to store mean and std for inference?)
if not (np.min(d) >= 0 and np.max(d) <= 1.0):
for ii in range(len(d)):
curr_view = d[ii]
if 0 <= np.max(curr_view) <= 1.0:
curr_view = curr_view - 0.5
else:
curr_view = curr_view / 255. - 0.5
d[ii] = curr_view
else:
d = d - 0.5
return d
[docs]def naive_view_normalization_new(d):
return d.astype(np.float32) / 255. - 0.5
[docs]def knn_clf_tnet_embedding(fold, fit_all=False):
"""
Currently it assumes embedding for GT views has been created already in 'fold'
and put into l_train_%d.npy / l_valid_%d.npy files.
Args:
fold: str
fit_all: bool
Returns:
"""
train_fnames = get_filepaths_from_dir(
fold, fname_includes=["l_axoness_train"], ending=".npy")
valid_fnames = get_filepaths_from_dir(
fold, fname_includes=["l_axoness_valid"], ending=".npy")
train_d = []
train_l = []
valid_d = []
valid_l = []
for tf in train_fnames:
train_l.append(np.load(tf))
tf = tf.replace("l_axoness_train", "ls_axoness_train")
train_d.append(np.load(tf))
for tf in valid_fnames:
valid_l.append(np.load(tf))
tf = tf.replace("l_axoness_valid", "ls_axoness_valid")
valid_d.append(np.load(tf))
train_d = np.concatenate(train_d).astype(dtype=np.float32)
train_l = np.concatenate(train_l).astype(dtype=np.uint16)
valid_d = np.concatenate(valid_d).astype(dtype=np.float32)
valid_l = np.concatenate(valid_l).astype(dtype=np.uint16)
nbrs = KNeighborsClassifier(n_neighbors=5, algorithm='auto',
n_jobs=16, weights='uniform')
if fit_all:
nbrs.fit(np.concatenate([train_d, valid_d]),
np.concatenate([train_l, valid_l]).ravel())
else:
nbrs.fit(train_d, train_l.ravel())
return nbrs
[docs]def pca_tnet_embedding(fold, n_components=3, fit_all=False):
"""
Currently it assumes embedding for GT views has been created already in 'fold'
and put into l_train_%d.npy / l_valid_%d.npy files.
Args:
fold: str
n_components: int
fit_all: bool
Returns:
"""
train_fnames = get_filepaths_from_dir(
fold, fname_includes=["l_axoness_train"], ending=".npy")
valid_fnames = get_filepaths_from_dir(
fold, fname_includes=["l_axoness_valid"], ending=".npy")
train_d = []
train_l = []
valid_d = []
valid_l = []
for tf in train_fnames:
train_l.append(np.load(tf))
tf = tf.replace("l_axoness_train", "ls_axoness_train")
train_d.append(np.load(tf))
for tf in valid_fnames:
valid_l.append(np.load(tf))
tf = tf.replace("l_axoness_valid", "ls_axoness_valid")
valid_d.append(np.load(tf))
train_d = np.concatenate(train_d).astype(dtype=np.float32)
train_l = np.concatenate(train_l).astype(dtype=np.uint16)
valid_d = np.concatenate(valid_d).astype(dtype=np.float32)
valid_l = np.concatenate(valid_l).astype(dtype=np.uint16)
pca = PCA(n_components, whiten=True, random_state=0)
if fit_all:
pca.fit(np.concatenate([train_d, valid_d]))
else:
pca.fit(train_d)
return pca
[docs]def certainty_estimate(inp: np.ndarray, is_logit: bool = False) -> float:
"""
Estimates the certainty of (independent) predictions of the same sample:
1. If `is_logit` is True, Generate pseudo-probabilities from the
input using softmax.
2. Sum the evidence per class and (re-)normalize.
3. Compute the entropy, scale it with the maximum entropy (equal
probabilities) and subtract it from 1.
Args:
inp: 2D array of prediction results (N: number of samples,
C: Number of classes)
is_logit: If True, applies ``softmax(inp, axis=1)``.
Returns:
Certainty measure based on the entropy of a set of (independent)
predictions.
"""
if not inp.ndim == 2:
raise ValueError('Input is not two dimensional.')
if is_logit:
proba = softmax(inp, axis=1)
else:
proba = inp
# sum probabilities across samples and normalize
proba = np.mean(proba, axis=0)
# maximum entropy at equal probabilities: -sum(1/N*ln(1/N)) = ln(N)
entr_max = np.log(len(proba))
entr_norm = entropy(proba) / entr_max
# convert to certainty estimate
return 1 - entr_norm
[docs]def str2int_converter(comment: str, gt_type: str) -> int:
if gt_type == "axgt":
if comment == "gt_axon":
return 1
elif comment == "gt_dendrite":
return 0
elif comment == "gt_soma":
return 2
elif comment == "gt_bouton":
return 3
elif comment == "gt_terminal":
return 4
else:
return -1
elif gt_type == "spgt":
if "head" in comment:
return 1
elif "neck" in comment:
return 0
elif "shaft" in comment:
return 2
elif "other" in comment:
return 3
else:
return -1
elif gt_type == 'ctgt_j0251':
str2int_label = dict(STN=0, DA=1, MSN=2, LMAN=3, HVC=4, TAN=5, GPe=6, GPi=7,
FS=8, LTS=9)
return str2int_label[comment]
elif gt_type == 'ctgt_j0251_v2':
str2int_label = dict(STN=0, DA=1, MSN=2, LMAN=3, HVC=4, TAN=5, GPe=6, GPi=7,
FS=8, LTS=9, NGF=10)
return str2int_label[comment]
else:
raise ValueError("Given groundtruth type is not valid.")
# create function that converts information in string type to the
# information in integer type
[docs]def int2str_converter(label: int, gt_type: str) -> str:
"""
TODO: remove redundant definitions.
Converts integer label into semantic string.
Args:
label: int
gt_type: str
e.g. spgt for spines, axgt for cell compartments or ctgt for cell type
Returns: str
"""
if type(label) == str:
label = int(label)
if gt_type == "axgt":
if label == 1:
return "gt_axon"
elif label == 0:
return "gt_dendrite"
elif label == 2:
return "gt_soma"
elif label == 3:
return "gt_bouton"
elif label == 4:
return "gt_terminal"
else:
return -1 # TODO: Check if somewhere -1 is handled, otherwise return "N/A"
elif gt_type == "spgt":
if label == 1:
return "head"
elif label == 0:
return "neck"
elif label == 2:
return "shaft"
elif label == 3:
return "other"
else:
return -1 # TODO: Check if somewhere -1 is already used, otherwise return "N/A"
elif gt_type == 'ctgt':
if label == 1:
return "MSN"
elif label == 0:
return "EA"
elif label == 2:
return "GP"
elif label == 3:
return "INT"
else:
return -1 # TODO: Check if somewhere -1 is already used, otherwise return "N/A"
elif gt_type == 'ctgt_v2':
# DA and TAN are type modulatory, if this is changes, also change `certainty_celltype`
l_dc_inv = dict(STN=0, modulatory=1, MSN=2, LMAN=3, HVC=4, GP=5, INT=6)
l_dc = {v: k for k, v in l_dc_inv.items()}
try:
return l_dc[label]
except KeyError:
print('Unknown label "{}"'.format(label))
return -1
elif gt_type == 'ctgt_j0251':
str2int_label = dict(STN=0, DA=1, MSN=2, LMAN=3, HVC=4, TAN=5, GPe=6, GPi=7,
FS=8, LTS=9)
int2str_label = {v: k for k, v in str2int_label.items()}
return int2str_label[label]
elif gt_type == 'ctgt_j0251_v2':
str2int_label = dict(STN=0, DA=1, MSN=2, LMAN=3, HVC=4, TAN=5, GPe=6, GPi=7,
FS=8, LTS=9, NGF=10)
int2str_label = {v: k for k, v in str2int_label.items()}
return int2str_label[label]
else:
raise ValueError("Given ground truth type is not valid.")