You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/tools.py

656 lines
18 KiB

9 years ago
# -*- coding: utf-8 -*-
# Copyright 2014-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import tee
from functools import total_ordering, wraps
import warnings
from contextlib import contextmanager
import re
import sys
3 years ago
# from types import StringTypes
import os
import fcntl
from os import path
3 years ago
import errno
import time
from abc import ABCMeta, abstractmethod
from types import GeneratorType
from math import sqrt
9 years ago
try:
import json as json_module
except ImportError:
json_module = None
class SaveableIterator:
"""
Итератор с возможность сохранять и восстанавливать состояние
"""
9 years ago
def __init__(self, seq):
self.seq = iter(seq)
self.back = None
def __iter__(self):
return self
def save(self):
self.seq, self.back = tee(self.seq)
return self
def restore(self):
if self.back:
self.seq, self.back = tee(self.back)
return self
def next(self):
3 years ago
return next(self.seq)
def __next__(self):
return next(self.seq)
9 years ago
@contextmanager
def ignore(exception):
try:
yield
except exception:
pass
9 years ago
class AddonError(Exception):
"""
Исключение с добавочным сообщением
"""
def __init__(self, msg, addon=None):
self.message = msg
self.addon = addon
Exception.__init__(self, msg)
class FalseObject(object):
"""
Объект-заглушка при любом сравнении возвращает False
"""
9 years ago
def __lt__(self, other):
return False
def __nonzero__(self):
return False
def __bool__(self):
return False
__gt__ = __ge__ = __le__ = __eq__ = __ne__ = __lt__
class Sizes(object):
K = KiB = kibibyte = 1024
kB = kilobyte = 1000
M = MiB = mibibyte = K * 1024
Mb = megabyte = kB * 1000
G = GiB = gibibyte = M * 1024
Gb = gigabyte = Mb * 1000
T = TiB = tibibyte = G * 1024
Tb = terabyte = Gb * 1000
Sector = 512
9 years ago
def __getattr__(self, name):
if name.startswith('from_'):
9 years ago
return lambda x: x * getattr(Sizes, name[5:])
elif name.startswith('to_'):
return lambda x: x // getattr(Sizes, name[3:])
else:
raise AttributeError
def _from(self, name, count):
return count * getattr(Sizes, name)
def _to(self, name, count):
return count // getattr(Sizes, name)
9 years ago
def imap_regexp(re_compiled, l, whole=False):
"""
Обработать список регулярным выражением и вернуть полученные группы
"""
if whole:
retfunc = lambda x: x.group()
else:
retfunc = lambda x: x.groups()
return (retfunc(x) for x in (re_compiled.search(x) for x in l) if x)
9 years ago
def cached(each_instance=False):
"""
Кэширование результата
"""
9 years ago
def cache_wrapper(func):
value = {}
def wrapper(*args, **kwargs):
if each_instance:
if args[0] not in value:
value[args[0]] = func(*args, **kwargs)
return value[args[0]]
else:
9 years ago
if None not in value:
value[None] = func(*args, **kwargs)
return value[None]
9 years ago
return wrapper
9 years ago
if each_instance in (True, False):
return cache_wrapper
else:
return cache_wrapper(each_instance)
class Singleton(type):
"""
Существует только один экземпляр данного класса
"""
_instances = {}
9 years ago
def __call__(cls, *args, **kw):
if cls not in cls._instances:
9 years ago
create_obj = getattr(super(Singleton, cls), '__call__')
cls._instances[cls] = create_obj(*args, **kw)
return cls._instances[cls]
9 years ago
class SingletonParam(type):
"""
Параметризированный Singleton: может быть создано множество объектов одного
и того же класса, но только если у них разные параметры инициализации
"""
9 years ago
def __init__(cls, name, bases, dicts):
super(SingletonParam, cls).__init__(name, bases, dicts)
cls.instance = {}
def __call__(cls, *args, **kw):
if hasattr(cls, "param_id"):
keyarg = cls.param_id(*args, **kw)
else:
keyarg = args[0] if args else ""
9 years ago
if keyarg not in cls.instance:
create_obj = getattr(super(SingletonParam, cls), '__call__')
cls.instance[keyarg] = create_obj(*args, **kw)
return cls.instance[keyarg]
def classificate(iterator):
"""
Классифицировать элементы по признаку (first первый last последний)
"""
9 years ago
class Mark:
def __init__(self, first=False, last=False):
self.first = first
self.last = last
def __repr__(self):
9 years ago
return "Mark(first=%s,last=%s)" % (self.first, self.last)
iterator = iter(iterator)
3 years ago
obj = next(iterator)
try:
3 years ago
obj_next = next(iterator)
9 years ago
yield Mark(first=True, last=False), obj
try:
while True:
obj = obj_next
3 years ago
obj_next = next(iterator)
yield Mark(), obj
except StopIteration:
9 years ago
yield Mark(first=False, last=True), obj
except StopIteration:
9 years ago
yield Mark(first=True, last=True), obj
def debug(prefix="DEBUG {name}({args},{kw})={ret}"):
"""
Декоратор для вывода отладочных сообщений при вызове функции
:param prefix:
:return:
"""
def debug_decor(f):
@wraps(f)
def wrapper(*args, **kw):
ret = f(*args, **kw)
3 years ago
print(prefix.format(name=f.__name__, args=args, kw=kw, ret=ret))
return ret
9 years ago
return wrapper
9 years ago
return debug_decor
class ClassPropertyDescriptor(object):
def __init__(self, fget, fset=None):
self.fget = fget
self.fset = fset
def __get__(self, obj, klass=None):
if klass is None:
klass = type(obj)
return self.fget.__get__(obj, klass)()
def __set__(self, obj, value):
if not self.fset:
raise AttributeError("can't set attribute")
type_ = type(obj)
return self.fset.__get__(obj, type_)(value)
def setter(self, func):
if not isinstance(func, (classmethod, staticmethod)):
func = classmethod(func)
self.fset = func
return self
def classproperty(func):
"""
Классовое свойство
:param func:
:return:
"""
if not isinstance(func, (classmethod, staticmethod)):
func = classmethod(func)
return ClassPropertyDescriptor(func)
9 years ago
class LockError(Exception):
pass
class Locker(object):
locker_directory = '/run/calculate/locker'
def __init__(self, name=None, fn=None, timeout=None):
if name is not None:
if not path.exists(self.locker_directory):
os.makedirs(self.locker_directory)
self.fn = path.join(self.locker_directory, name)
else:
dn = path.dirname(fn)
if not path.exists(dn):
os.makedirs(dn)
self.fn = fn
self.lockf = None
if timeout is None:
self.timeout = 10
else:
self.timeout = 0
def lockfile(self, fn, timeout=None):
"""
Блокировка файла с таймаутом
"""
if timeout is None:
timeout = self.timeout
for i in range(0, timeout * 40 or 1):
try:
self.lockf = os.open(fn, os.O_CREAT | os.O_RDWR)
fcntl.flock(self.lockf, fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
3 years ago
except IOError as e:
3 years ago
if e.errno != errno.EAGAIN:
raise e
os.close(self.lockf)
self.lockf = None
3 years ago
except OSError as e:
3 years ago
if e.errno != errno.EEXIST:
raise e
time.sleep(0.25)
else:
raise LockError("Lock timeout of %s" % fn)
def acquire(self):
if self.lockf is None:
self.lockfile(self.fn)
return True
def release(self):
if self.lockf is not None:
os.close(self.lockf)
self.lockf = None
return True
def remove(self):
self.release()
if path.exists(self.fn):
try:
os.remove(self.fn)
except (OSError, IOError):
pass
def __enter__(self):
""" Activated when used in the with statement.
Should automatically acquire a lock to be used in the with block.
"""
self.acquire()
return self
def __exit__(self, type, value, traceback):
""" Activated at the end of the with statement.
It automatically releases the lock if it isn't locked.
"""
self.release()
def __del__(self):
""" Make sure that the FileLock instance doesn't leave a lockfile
lying around.
"""
self.release()
class Signal(object):
def __init__(self):
self.subscribers = []
def connect(self, subscriber, unique=True):
if not unique or subscriber not in self.subscribers:
self.subscribers.append(subscriber)
def emit(self, *args, **kwargs):
for subscriber in self.subscribers:
subscriber(*args, **kwargs)
def disconnect(self, subscriber, removeall=False):
while True:
if subscriber in self.subscribers:
self.subscribers.remove(subscriber)
if removeall:
continue
break
def max_default(iterable, key=lambda x: x, default=None):
try:
return max(iterable, key=key)
except ValueError:
return default
9 years ago
def traverse(o, tree_types=(list, tuple, GeneratorType)):
9 years ago
"""
Вернуть последовательно все элемены списка, "распаковов" встроенные
[1, 2, [1,2,[4,6,7]], 7] -> (1,2,1,2,4,6,7,7)
:param o:
:param tree_types: типы, которые будут распакованы
:return:
"""
if isinstance(o, tree_types):
for value in o:
for subvalue in traverse(value, tree_types):
yield subvalue
else:
yield o
@total_ordering
class ReverseKey(object):
"""
Объект для создания ключей обратной сортировки
s = [("A","B"), ("A","C"), ("B","A")]
sorted(s, key=lambda x: (x[0], ReverseKey(x[1]))
"""
__slots__ = ['data']
def __init__(self, data):
self.data = data
def __eq__(self, o):
return o.data == self.data
def __lt__(self, o):
return o.data < self.data
def unique(iterable):
"""
Исключить из списка дублирующие элементы, не меняя порядка
"""
seen = {}
for x in iterable:
if x in seen:
continue
seen[x] = 1
yield x
def sorteduniqresult(f):
@wraps(f)
def wrapper(*args, **kw):
for x in sorted(set(f(*args, **kw))):
yield x
return wrapper
def repeater(*args):
"""
Циклы для повторяющихся действий с ожидаением
:param args:
:return:
"""
yield 0
for i, x in enumerate(args):
if x:
time.sleep(x)
yield i+1
def iterate_list(l):
"""
Перебрать последовательность (исключая строки) поэлемено,
или же возвращает последовательность из одно элемента
l1 = 1
l2 = [1,2]
for x in iterate_list(l1):
print x
:param l: Список, кортеж, или прочее
:return:
"""
3 years ago
if not isinstance(l, str):
for x in l:
yield x
else:
yield l
def has_any(a, b):
"""
Содержат ли два списка хотя бы один одинаковый элемент
:param a:
:param b:
:return:
"""
return any(x in a for x in b)
def deprecated(func):
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emmitted
when the function is used."""
@wraps(func)
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) #turn off filter
warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2)
warnings.simplefilter('default', DeprecationWarning) #reset filter
return func(*args, **kwargs)
return new_func
def unpack_single_opts(args):
"""
Преобразовать одиночные аргументы
"""
re_single = re.compile("^-[A-Za-z0-9]")
re_right = re.compile("^-[A-Za-z0-9]+$")
for arg in args:
if re_single.search(arg):
if not re_right.match(arg):
sys.stderr.write("Wrong argument %s" % arg)
for letter in arg[1:]:
yield "-%s" % letter
else:
yield arg
drop_slash = lambda x: x[1:] if x.startswith('/') else x
class Cachable(object):
"""
Объект содержащий кэширующие методы
"""
def __init__(self):
self.clear_method_cache()
def clear_method_cache(self):
self._method_cache = {}
@staticmethod
def methodcached(key=lambda *args, **kw: hash(args)):
"""
Кэширование результата по параметрам метода, по ключевой функции
можно выбрать какие параметры играют роль
"""
def decorator(f):
3 years ago
funcname = f.__name__
@wraps(f)
def wrapper(self, *args, **kw):
keyval = key(*args, **kw)
assert isinstance(self, Cachable)
if funcname not in self._method_cache:
self._method_cache[funcname] = {}
cache = self._method_cache[funcname]
if keyval not in cache:
cache[keyval] = f(self, *args, **kw)
return cache[keyval]
@wraps(f)
def null_wrapper(self):
assert isinstance(self, Cachable)
if funcname not in self._method_cache:
self._method_cache[funcname] = f(self)
return self._method_cache[funcname]
3 years ago
if len(f.__code__.co_varnames) > 1:
return wrapper
else:
return null_wrapper
return decorator
def firstelement(i, fallback=''):
for x in i:
return x
return fallback
class GenericFs(metaclass=ABCMeta):
"""
Объект обобщения файловых систем
"""
@abstractmethod
def exists(self, fn):
pass
@abstractmethod
def read(self, fn, encoding="UTF-8", binary=False):
pass
@abstractmethod
def glob(self, fn):
pass
@abstractmethod
def realpath(self, fn):
pass
@abstractmethod
def write(self, fn, data, encoding="UTF-8", binary=False):
pass
@abstractmethod
def listdir(self, dn, fullpath=False):
pass
class Brokenable(object):
@property
def broken(self):
return False
@staticmethod
def broken_result(res=None):
def decor(f):
@wraps(f)
def wrapper(self, *args, **kw):
if self.broken:
return res
return f(self, *args, **kw)
return wrapper
return decor
def get_best_nearest_resolution(preferred_res, support_res, aspect_only=False):
"""
Получить наилучшее ближайшее к preferred_res разрешенение из support_res.
Разрешение берётся ближайшее по размеру диагонали, с применением "штрафного"
коэфициента по разнице аспектов
"""
width, height = map(int, preferred_res.split('x'))
gep = sqrt(height ** 2 + width ** 2)
k = float(width) / float(height)
s = float(width) * float(height)
support_res_int = (tuple(map(int, x.split("x"))) for x in support_res)
support_res_int = ((x, y, sqrt(y*y + x*x), abs(x/float(y)-k))
for x, y in support_res_int)
if not aspect_only:
support_res_int = [(x,y,g,dasp)
for x,y,g,dasp in support_res_int if x <= width and y <= height]
if not support_res_int:
return None
keyfunc = lambda x,y,g, dasp: g -g * dasp
bestRes = max(support_res_int, key=lambda x:keyfunc(*x) )
return "%sx%s" % bestRes[0:2]
def get_traceback_caller(e1, e2, frame):
"""
Получить модуль и строку вызвавшую эту функции с ошибкой
"""
while frame.tb_next:
frame = frame.tb_next
module, part = os.path.split(frame.tb_frame.f_back.f_code.co_filename)
if part.endswith('.py'):
part = part[:-3]
fallbackmod = part
modname = [part]
while module and module != '/' and not module.endswith('site-packages'):
module, part = os.path.split(module)
modname.insert(0, part)
if module.endswith('site-packages'):
modname = ".".join(modname)
else:
modname = fallbackmod
return modname, frame.tb_frame.f_back.f_lineno