Source code for syconn.handler.compression

# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max Planck Institute of Neurobiology, Martinsried, Germany
# Authors: Philipp Schubert, Sven Dorkenwald, Joergen Kornfeld
import os
from typing import List, Tuple, Optional, Iterable, Union, Dict

import h5py
import numpy as np

from ..handler import log_handler

try:
    from lz4.block import compress, decompress
except ImportError:
    from lz4 import compress, decompress
from lz4.block import LZ4BlockError

try:
    import fasteners

    LOCKING = True
except ImportError:
    print("fasteners could not be imported. Locking will be disabled by default."
          "Please install fasteners to enable locking (pip install fasteners).")
    LOCKING = False

__all__ = ['arrtolz4string', 'lz4stringtoarr', 'load_lz4_compressed',
           'save_lz4_compressed', 'load_from_h5py',
           'save_to_h5py', 'lz4string_listtoarr', 'arrtolz4string_list']


[docs]def arrtolz4string(arr: np.ndarray) -> bytes: """ Converts (multi-dimensional) array to list of lz4 compressed strings. Args: arr: Input array. Returns: lz4 compressed string. """ if isinstance(arr, list): arr = np.array(arr) if len(arr) == 0: return b"" try: comp_arr = compress(arr.tobytes()) except OverflowError: log_handler.warning(OverflowError, "Overflow occurred when compression array." "Use 'arrtolz4string_list' instead.") comp_arr = arrtolz4string_list(arr) return comp_arr
[docs]def lz4stringtoarr(string: bytes, dtype: np.dtype = np.float32, shape: Optional[Tuple[int]] = None): """ Converts lz4 compressed string to 1d array. Args: string: Serialized array. dtype: Data type of original array. shape: Shape of original array. Returns: N-dimensional numpy array. """ if len(string) == 0: return np.zeros((0,), dtype=dtype) try: arr_1d = np.frombuffer(decompress(string), dtype=dtype) except TypeError: # python3 compatibility arr_1d = np.frombuffer(decompress(str.encode(string)), dtype=dtype) if shape is not None: arr_1d = arr_1d.reshape(shape) return arr_1d
[docs]def arrtolz4string_list(arr: np.ndarray) -> List[bytes]: """ Converts (multi-dimensional) array to list of lz4 compressed strings. Args: arr: Input array. Returns: lz4 compressed string. """ if isinstance(arr, list): arr = np.array(arr) if len(arr) == 0: return [b""] try: str_lst = [compress(arr.tobytes())] # catch Value error which is thrown in py3 lz4 version except (OverflowError, ValueError, LZ4BlockError): half_ix = len(arr) // 2 str_lst = arrtolz4string_list(arr[:half_ix]) + arrtolz4string_list(arr[half_ix:]) return str_lst
[docs]def lz4string_listtoarr(str_lst: Union[List[bytes], np.ndarray], dtype: np.dtype = np.float32, shape: Optional[Tuple[int]] = None) -> np.ndarray: """ Converts lz4 compressed strings to array. Args: str_lst: Binary string representation of the array. If numpy array, do nothing. dtype: Data type of the serialized array. shape: Shape of the serialized array. Returns: 1d numpy array. """ if type(str_lst) is np.ndarray: return str_lst if len(str_lst) == 0: return np.zeros((0,), dtype=dtype) arr_lst = [] for string in str_lst: arr_lst.append(lz4stringtoarr(string, dtype=dtype, shape=shape)) return np.concatenate(arr_lst)
def multi_lz4stringtoarr(args: tuple) -> np.ndarray: """ Helper function for multiprocessing. Args: args: see :func:`~syconn.handler.compression.lz4string_listtoarr`. Returns: 1d numpy array. """ return lz4string_listtoarr(*args)
[docs]def save_lz4_compressed(p: str, arr: np.ndarray, dtype: np.dtype = np.float32): """ Saves array as lz4 compressed string. Due to overflow in python2 added error handling by recursive splitting. Args: p: Path to the destination file. arr: Numpy array. dtype: Data type in which the array should be stored. """ arr = arr.astype(dtype) try: text_file = open(p, "wb") text_file.write(arrtolz4string(arr)) text_file.close() except (OverflowError, ValueError): # save dummy (emtpy) file text_file = open(p, "wb") text_file.write(b"") text_file.close() half_ix = len(arr) // 2 new_p1 = p[:-4] + "_1" + p[-4:] new_p2 = p[:-4] + "_2" + p[-4:] save_lz4_compressed(new_p1, arr[:half_ix]) save_lz4_compressed(new_p2, arr[half_ix:])
[docs]def load_lz4_compressed(p: str, shape: Tuple[int] = (-1, 20, 2, 128, 256), dtype: np.dtype = np.float32): """ Shape must be known in order to load (multi-dimensional) array from binary string. Due to overflow in python2 added recursive loading. Args: p: path to lz4 file shape: tuple dtype: type Returns: np.array """ with open(p, "rb") as text_file: decomp_arr = lz4stringtoarr(text_file.read(), dtype=dtype, shape=shape) # assume original array was split due to overflow error if len(decomp_arr) == 0: new_p1 = p[:-4] + "_1" + p[-4:] new_p2 = p[:-4] + "_2" + p[-4:] decomp_arr1 = load_lz4_compressed(new_p1, shape=shape, dtype=dtype) decomp_arr2 = load_lz4_compressed(new_p2, shape=shape, dtype=dtype) decomp_arr = np.concatenate([decomp_arr1, decomp_arr2]) return decomp_arr
# ---------------------------- HDF5 # ------------------------------------------------------------------------------
[docs]def load_from_h5py(path: str, hdf5_names: Optional[Iterable[str]] = None, as_dict: bool = False) \ -> Union[Dict[str, np.ndarray], List[np.ndarray]]: """ Loads data from a h5py File. Args: path: Path to .h5 file. hdf5_names: If None, all keys will be loaded. as_dict: If True, returns a dictionary. Returns: The data stored at `path` either as list of arrays (ordering as `hdf5_names`) or as dictionary. """ if as_dict: data = {} else: data = [] try: f = h5py.File(path, 'r') if hdf5_names is None: hdf5_names = f.keys() for hdf5_name in hdf5_names: if as_dict: data[hdf5_name] = f[hdf5_name][()] else: data.append(f[hdf5_name][()]) except Exception as e: msg = "Error ({}) raised when loading h5-file at path:" \ " {}, with labels: {}".format(e, path, hdf5_names) log_handler.error(msg) raise Exception(e) f.close() return data
[docs]def save_to_h5py(data: Union[Dict[str, np.ndarray], List[np.ndarray]], path: str, hdf5_names: Optional[List[str]] = None, overwrite: bool = False, compression: bool = True): """ Saves data to h5py File. Args: data: If list, hdf5_names has to be set. path: Forward-slash separated path to file. hdf5_names: Keys used to store arrays in `data`. Has to be the same length as `data`. overwrite: Determines whether existing files are overwritten. compression: If True, ``compression='gzip'`` is used which is recommended for sparse and ordered data. """ if (not type(data) is dict) and hdf5_names is None: raise TypeError("hdf5names has to be set when data is a list") if os.path.isfile(path) and overwrite: os.remove(path) f = h5py.File(path, "w") if type(data) is dict: for key in data.keys(): if compression: f.create_dataset(key, data=data[key], compression="gzip") else: f.create_dataset(key, data=data[key]) else: if len(hdf5_names) != len(data): f.close() msg = "Not enough or too many hdf5-names ({}) given during" \ " h5-file save attempt!".format(hdf5_names) log_handler.error(msg) raise ValueError(msg) for nb_data in range(len(data)): if compression: f.create_dataset(hdf5_names[nb_data], data=data[nb_data], compression="gzip") else: f.create_dataset(hdf5_names[nb_data], data=data[nb_data]) f.close()