# -*- coding: utf-8 -*-
# SyConn - Synaptic connectivity inference toolkit
#
# Copyright (c) 2016 - now
# Max Planck Institute of Neurobiology, Martinsried, Germany
# Authors: Philipp Schubert, Sven Dorkenwald, Joergen Kornfeld
import os
import shutil
import time
from pickle import UnpicklingError
from .. import global_params
from ..extraction import log_extraction
from ..handler.basics import write_obj2pkl, load_pkl2obj
try:
from lz4.block import compress, decompress
except ImportError:
from lz4 import compress, decompress
try:
import fasteners
LOCKING = True
except ImportError:
print("fasteners could not be imported. Locking will be disabled by default."
"Please install fasteners to enable locking (pip install fasteners).")
LOCKING = False
__all__ = ['FSBase', 'BTBase']
class StorageBase(dict):
"""
Interface class for data IO.
"""
def __init__(self, cache_decomp):
super(StorageBase, self).__init__()
self._cache_decomp = cache_decomp
self._cache_dc = {}
self._dc_intern = {}
def __getitem__(self, key):
raise NotImplementedError
def __setitem__(self, key, value):
raise NotImplementedError
def __delitem__(self, key):
raise NotImplementedError
def __del__(self):
raise NotImplementedError
def __len__(self):
return self._dc_intern.__len__()
def __eq__(self, other):
if not isinstance(other, StorageBase):
return False
return self._dc_intern.__eq__(other._dc_intern)
def __ne__(self, other):
return not self.__eq__(other)
def __contains__(self, item):
return self._dc_intern.__contains__(item)
def __iter__(self):
return iter(self._dc_intern)
def __repr__(self):
return self._dc_intern.__repr__()
def update(self, other, **kwargs):
raise NotImplementedError
def copy(self):
raise NotImplementedError
def items(self):
for k in self._dc_intern.keys():
yield k, self[k]
def values(self):
for k in self._dc_intern.keys():
yield self[k]
def keys(self):
return self._dc_intern.keys()
def push(self, dest=None):
raise NotImplementedError
def pull(self, source=None):
raise NotImplementedError
# ---------------------------- BT
# ------------------------------------------------------------------------------
[docs]class BTBase(StorageBase):
def __init__(self, identifier, cache_decomp=False, read_only=True,
disable_locking=False):
# likely 'cache_decomp' not necessary, but needed to match interface of LZ4Dicts
super(BTBase, self).__init__(cache_decomp=False)
pass
def __eq__(self, other):
if not isinstance(other, BTBase):
return False
return self._dc_intern.__eq__(other._dc_intern)
# ---------------------------- lz4
# ------------------------------------------------------------------------------
[docs]class FSBase(StorageBase):
"""
Customized dictionary to store compressed numpy arrays, but with a
intuitive user interface, i.e. compression will happen in background.
kwarg 'cache_decomp' can be enabled to cache decompressed arrays
additionally (save decompressing time when accessing items frequently).
"""
def __init__(self, inp_p: str, cache_decomp: bool = False,
read_only: bool = True, max_delay: int = 100,
timeout: int = 1000, disable_locking: bool = True,
max_nb_attempts: int = 100):
"""
Args:
inp_p: Path to file.
cache_decomp: Cache deserialized arrays.
read_only: In case locking is enabled, no semaphore will be placed.
max_delay: attempt delay
timeout: Will throw `RuntimeError` after `timeout` seconds.
disable_locking: Disable file locking
max_nb_attempts: Number of total attempts
"""
super(FSBase, self).__init__(cache_decomp)
if not LOCKING and not disable_locking:
log_extraction.warning('Locking could not be enabled due to missing "fasteners" package.')
disable_locking = True
self.read_only = read_only
self.a_lock = None
self.max_delay = max_delay
self.timeout = timeout
self.disable_locking = disable_locking
self._cache_decomp = cache_decomp
self._max_nb_attempts = max_nb_attempts
self._cache_dc = {}
self._dc_intern = {}
self._path = inp_p
if inp_p is not None:
if type(inp_p) is str:
self.pull(inp_p)
else:
msg = "Unsupported initialization type {} for 'FSBase'.".format(type(inp_p))
log_extraction.error(msg)
raise NotImplementedError(msg)
def __delitem__(self, key):
try:
del self[key]
except KeyError:
msg = "No such attribute {} in dict at {}. Existing keys:" \
" {}.".format(key, self._path, list(self.keys()))
log_extraction.error(msg)
raise AttributeError(msg)
def __del__(self):
if self.a_lock is not None and self.a_lock.acquired:
self.a_lock.release()
del self._dc_intern, self._cache_dc
def __len__(self):
return self._dc_intern.__len__()
def __eq__(self, other):
if not isinstance(other, FSBase):
return False
return self._dc_intern.__eq__(other._dc_intern)
def __ne__(self, other):
return not self.__eq__(other)
def __contains__(self, item):
return self._dc_intern.__contains__(item)
def __iter__(self):
return iter(self._dc_intern)
def __repr__(self):
return self._dc_intern.__repr__()
[docs] def update(self, other, **kwargs):
raise NotImplementedError
[docs] def copy(self):
raise NotImplementedError
[docs] def items(self):
for k in self._dc_intern.keys():
yield k, self[k]
[docs] def values(self):
for k in self._dc_intern.keys():
yield self[k]
[docs] def keys(self):
return self._dc_intern.keys()
[docs] def push(self, dest: str = None):
"""
Pushes data to destination.
Args:
dest: storage destination
"""
if dest is None:
dest = self._path
if dest is None: # support virtual / temporary SSO objects
log_extraction.warning('"push" called but Storage object was initialized '
'with "None". Content will not be written.')
return
write_obj2pkl(dest, self._dc_intern)
if not self.read_only and not self.disable_locking:
self.a_lock.release()
[docs] def pull(self, source: str = None):
"""
Fetches data from source.
Args:
source: Source location
"""
if source is None:
source = self._path
fold, fname = os.path.split(source)
lock_path = fold + "/." + fname + ".lk"
# only create directory if read_only is false. -> support virtual SSO
if not os.path.isdir(fold) and not self.read_only:
try:
os.makedirs(fold)
except OSError as e: # if to jobs create the folder at the same time
log_extraction.warning("Tried to create folder of dict {}, but it already existed. "
"Multiple jobs might work on same chunk almost"
" simultaneously. Error {}.".format(self._path, e))
pass
# acquires lock until released when saving or after loading if self.read_only
if not self.disable_locking:
self.a_lock = fasteners.InterProcessLock(lock_path)
nb_attempts = 1
start = time.time()
while True:
try:
gotten = self.a_lock.acquire(blocking=True, delay=0.1,
max_delay=self.max_delay,
timeout=self.timeout / self._max_nb_attempts)
except ValueError:
gotten = False
# if not gotten and maximum attempts not reached yet keep trying
if not gotten and (nb_attempts < self._max_nb_attempts):
nb_attempts += 1
else:
break
if not gotten:
msg = "Unable to acquire file lock for {} after {:.0f}s.".format(source, time.time() - start)
log_extraction.warning(msg)
raise RuntimeError(msg)
if os.path.isfile(source):
try:
self._dc_intern = load_pkl2obj(source)
except (UnpicklingError, EOFError) as e:
log_extraction.warning("Could not load LZ4Dict ({}). 'push' will"
" overwrite broken .pkl file: {}.".format(self._path, e))
self._dc_intern = {}
else:
self._dc_intern = {}
if self.read_only and not self.disable_locking:
self.a_lock.release()