|
|
import os
|
|
|
from functools import wraps
|
|
|
from abc import ABCMeta, abstractmethod
|
|
|
from inspect import getcallargs
|
|
|
|
|
|
|
|
|
class Cachable:
|
|
|
'''Базовый класс для создания классов, кэширующих вывод своих методов.
|
|
|
Декоратор @Cachable.method_cached() предназначен для указания методов
|
|
|
с кэшируемым выводом.'''
|
|
|
def __init__(self):
|
|
|
self.clear_method_cache()
|
|
|
|
|
|
def clear_method_cache(self):
|
|
|
self._method_cache = {}
|
|
|
|
|
|
@staticmethod
|
|
|
def method_cached(key=lambda *args, **kwargs: hash(args)):
|
|
|
def decorator(function):
|
|
|
function_name = function.__name__
|
|
|
|
|
|
@wraps(function)
|
|
|
def wrapper(self, *args, **kwargs):
|
|
|
keyval = key(*args, **kwargs)
|
|
|
assert isinstance(self, Cachable)
|
|
|
if function_name not in self._method_cache:
|
|
|
self._method_cache[function_name] = {}
|
|
|
cache = self._method_cache[function_name]
|
|
|
if keyval not in cache:
|
|
|
cache[keyval] = function(self, *args, **kwargs)
|
|
|
return cache[keyval]
|
|
|
|
|
|
@wraps(function)
|
|
|
def null_wrapper(self):
|
|
|
assert isinstance(self, Cachable)
|
|
|
if function_name not in self._method_cache:
|
|
|
self._method_cache[function_name] = function(self)
|
|
|
return self._method_cache[function_name]
|
|
|
|
|
|
if len(function.__code__.co_varnames) > 1:
|
|
|
return wrapper
|
|
|
else:
|
|
|
return null_wrapper
|
|
|
|
|
|
return decorator
|
|
|
|
|
|
|
|
|
class Singleton(type):
|
|
|
'''Метакласс для создания синглтонов.'''
|
|
|
_instances = {}
|
|
|
|
|
|
def __call__(cls, *args, **kwargs):
|
|
|
if cls not in cls._instances:
|
|
|
cls._instances[cls] = super().__call__(*args, **kwargs)
|
|
|
return cls._instances[cls]
|
|
|
|
|
|
|
|
|
class SingletonParam(type):
|
|
|
'''Метакласс для создания синглтонов по параметрам.'''
|
|
|
_instances = {}
|
|
|
_init = {}
|
|
|
|
|
|
def __init__(cls, class_name, base_classes, class_dict):
|
|
|
cls._init[cls] = class_dict.get('__init__', None)
|
|
|
|
|
|
def __call__(cls, *args, **kwargs):
|
|
|
init = cls._init[cls]
|
|
|
if init is not None:
|
|
|
key_value = (cls, frozenset(getcallargs(init, None, *args,
|
|
|
**kwargs).items()))
|
|
|
else:
|
|
|
key_value = cls
|
|
|
|
|
|
if key_value not in cls._instances:
|
|
|
cls._instances[key_value] = super().__call__(*args, **kwargs)
|
|
|
return cls._instances[key_value]
|
|
|
|
|
|
|
|
|
class GenericFS(metaclass=ABCMeta):
|
|
|
'''Абстрактный класс для работы с файловыми системами.'''
|
|
|
@abstractmethod
|
|
|
def exists(self, path):
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def read(self, path):
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def glob(self, path):
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def realpath(self, path):
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def write(self, path, data):
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def listdir(self, path, fullpath=False):
|
|
|
pass
|
|
|
|
|
|
|
|
|
def get_traceback_caller(exception_type, exception_object,
|
|
|
exception_traceback):
|
|
|
'''Возвращает имя модуля, в котором было сгенерировано исключение,
|
|
|
и соответствующий номер строки.'''
|
|
|
while exception_traceback.tb_next:
|
|
|
exception_traceback = exception_traceback.tb_next
|
|
|
|
|
|
line_number = exception_traceback.tb_lineno
|
|
|
|
|
|
module_path, module_name = os.path.split(exception_traceback.tb_frame.
|
|
|
f_code.co_filename)
|
|
|
if module_name.endswith('.py'):
|
|
|
module_name = module_name[:-3]
|
|
|
full_module_name = [module_name]
|
|
|
|
|
|
while (module_path and module_path != '/' and not
|
|
|
module_path.endswith('site-packages')):
|
|
|
module_path, package_name = os.path.split(module_path)
|
|
|
full_module_name.insert(0, package_name)
|
|
|
|
|
|
if module_path.endswith('site-packages'):
|
|
|
module_name = '.'.join(full_module_name)
|
|
|
|
|
|
return module_name, line_number
|
|
|
|
|
|
|
|
|
def unique(iterable):
|
|
|
'''Возвращает итерируемый объект, содержащий только уникальные элементы
|
|
|
входного объекта, сохраняя их порядок.
|
|
|
'''
|
|
|
output = []
|
|
|
for element in iterable:
|
|
|
if element not in output:
|
|
|
output.append(element)
|
|
|
return output
|
|
|
|
|
|
|
|
|
def flat_iterable(iterable, types=(list, tuple, map, zip, filter)):
|
|
|
'''Распаковывает все вложенные итерируемые объекты во входном объекте.
|
|
|
Например:
|
|
|
[1, 2, [3, 4, [5, 6], 7], [8, 9], 10] -> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|
|
|
'''
|
|
|
if isinstance(iterable, types):
|
|
|
for it in iterable:
|
|
|
for sub_iterable in flat_iterable(it, types=types):
|
|
|
yield sub_iterable
|
|
|
else:
|
|
|
yield iterable
|