Source code for syconn.backend.base

# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max Planck Institute of Neurobiology, Martinsried, Germany
# Authors: Philipp Schubert, Sven Dorkenwald, Joergen Kornfeld

import os
import shutil
import time
from pickle import UnpicklingError

from .. import global_params
from ..extraction import log_extraction
from ..handler.basics import write_obj2pkl, load_pkl2obj

try:
    from lz4.block import compress, decompress
except ImportError:
    from lz4 import compress, decompress
try:
    import fasteners
    LOCKING = True
except ImportError:
    print("fasteners could not be imported. Locking will be disabled by default."
          "Please install fasteners to enable locking (pip install fasteners).")
    LOCKING = False

__all__ = ['FSBase', 'BTBase']


class StorageBase(dict):
    """
    Interface class for data IO.
    """

    def __init__(self, cache_decomp):
        super(StorageBase, self).__init__()
        self._cache_decomp = cache_decomp
        self._cache_dc = {}
        self._dc_intern = {}

    def __getitem__(self, key):
        raise NotImplementedError

    def __setitem__(self, key, value):
        raise NotImplementedError

    def __delitem__(self, key):
        raise NotImplementedError

    def __del__(self):
        raise NotImplementedError

    def __len__(self):
        return self._dc_intern.__len__()

    def __eq__(self, other):
        if not isinstance(other, StorageBase):
            return False
        return self._dc_intern.__eq__(other._dc_intern)

    def __ne__(self, other):
        return not self.__eq__(other)

    def __contains__(self, item):
        return self._dc_intern.__contains__(item)

    def __iter__(self):
        return iter(self._dc_intern)

    def __repr__(self):
        return self._dc_intern.__repr__()

    def update(self, other, **kwargs):
        raise NotImplementedError

    def copy(self):
        raise NotImplementedError

    def items(self):
        for k in self._dc_intern.keys():
            yield k, self[k]

    def values(self):
        for k in self._dc_intern.keys():
            yield self[k]

    def keys(self):
        return self._dc_intern.keys()

    def push(self, dest=None):
        raise NotImplementedError

    def pull(self, source=None):
        raise NotImplementedError


# ---------------------------- BT
# ------------------------------------------------------------------------------
[docs]class BTBase(StorageBase): def __init__(self, identifier, cache_decomp=False, read_only=True, disable_locking=False): # likely 'cache_decomp' not necessary, but needed to match interface of LZ4Dicts super(BTBase, self).__init__(cache_decomp=False) pass def __eq__(self, other): if not isinstance(other, BTBase): return False return self._dc_intern.__eq__(other._dc_intern)
# ---------------------------- lz4 # ------------------------------------------------------------------------------
[docs]class FSBase(StorageBase): """ Customized dictionary to store compressed numpy arrays, but with a intuitive user interface, i.e. compression will happen in background. kwarg 'cache_decomp' can be enabled to cache decompressed arrays additionally (save decompressing time when accessing items frequently). """ def __init__(self, inp_p: str, cache_decomp: bool = False, read_only: bool = True, max_delay: int = 100, timeout: int = 1000, disable_locking: bool = True, max_nb_attempts: int = 100): """ Args: inp_p: Path to file. cache_decomp: Cache deserialized arrays. read_only: In case locking is enabled, no semaphore will be placed. max_delay: attempt delay timeout: Will throw `RuntimeError` after `timeout` seconds. disable_locking: Disable file locking max_nb_attempts: Number of total attempts """ super(FSBase, self).__init__(cache_decomp) if not LOCKING and not disable_locking: log_extraction.warning('Locking could not be enabled due to missing "fasteners" package.') disable_locking = True self.read_only = read_only self.a_lock = None self.max_delay = max_delay self.timeout = timeout self.disable_locking = disable_locking self._cache_decomp = cache_decomp self._max_nb_attempts = max_nb_attempts self._cache_dc = {} self._dc_intern = {} self._path = inp_p if inp_p is not None: if type(inp_p) is str: self.pull(inp_p) else: msg = "Unsupported initialization type {} for 'FSBase'.".format(type(inp_p)) log_extraction.error(msg) raise NotImplementedError(msg) def __delitem__(self, key): try: del self[key] except KeyError: msg = "No such attribute {} in dict at {}. Existing keys:" \ " {}.".format(key, self._path, list(self.keys())) log_extraction.error(msg) raise AttributeError(msg) def __del__(self): if self.a_lock is not None and self.a_lock.acquired: self.a_lock.release() del self._dc_intern, self._cache_dc def __len__(self): return self._dc_intern.__len__() def __eq__(self, other): if not isinstance(other, FSBase): return False return self._dc_intern.__eq__(other._dc_intern) def __ne__(self, other): return not self.__eq__(other) def __contains__(self, item): return self._dc_intern.__contains__(item) def __iter__(self): return iter(self._dc_intern) def __repr__(self): return self._dc_intern.__repr__()
[docs] def update(self, other, **kwargs): raise NotImplementedError
[docs] def copy(self): raise NotImplementedError
[docs] def items(self): for k in self._dc_intern.keys(): yield k, self[k]
[docs] def values(self): for k in self._dc_intern.keys(): yield self[k]
[docs] def keys(self): return self._dc_intern.keys()
[docs] def push(self, dest: str = None): """ Pushes data to destination. Args: dest: storage destination """ if dest is None: dest = self._path if dest is None: # support virtual / temporary SSO objects log_extraction.warning('"push" called but Storage object was initialized ' 'with "None". Content will not be written.') return write_obj2pkl(dest, self._dc_intern) if not self.read_only and not self.disable_locking: self.a_lock.release()
[docs] def pull(self, source: str = None): """ Fetches data from source. Args: source: Source location """ if source is None: source = self._path fold, fname = os.path.split(source) lock_path = fold + "/." + fname + ".lk" # only create directory if read_only is false. -> support virtual SSO if not os.path.isdir(fold) and not self.read_only: try: os.makedirs(fold) except OSError as e: # if to jobs create the folder at the same time log_extraction.warning("Tried to create folder of dict {}, but it already existed. " "Multiple jobs might work on same chunk almost" " simultaneously. Error {}.".format(self._path, e)) pass # acquires lock until released when saving or after loading if self.read_only if not self.disable_locking: self.a_lock = fasteners.InterProcessLock(lock_path) nb_attempts = 1 start = time.time() while True: try: gotten = self.a_lock.acquire(blocking=True, delay=0.1, max_delay=self.max_delay, timeout=self.timeout / self._max_nb_attempts) except ValueError: gotten = False # if not gotten and maximum attempts not reached yet keep trying if not gotten and (nb_attempts < self._max_nb_attempts): nb_attempts += 1 else: break if not gotten: msg = "Unable to acquire file lock for {} after {:.0f}s.".format(source, time.time() - start) log_extraction.warning(msg) raise RuntimeError(msg) if os.path.isfile(source): try: self._dc_intern = load_pkl2obj(source) except (UnpicklingError, EOFError) as e: log_extraction.warning("Could not load LZ4Dict ({}). 'push' will" " overwrite broken .pkl file: {}.".format(self._path, e)) self._dc_intern = {} else: self._dc_intern = {} if self.read_only and not self.disable_locking: self.a_lock.release()