You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/tools.py

629 lines
17 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2014-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import tee
from functools import total_ordering, wraps
import warnings
from contextlib import contextmanager
import re
import sys
from types import StringTypes
import os
import fcntl
from os import path
import time
from abc import ABCMeta, abstractmethod
from types import GeneratorType
from math import sqrt
try:
import json as json_module
except ImportError:
json_module = None
class SavableIterator:
"""
Итератор с возможность сохранять и восстанавливать состояние
"""
def __init__(self, seq):
self.seq = iter(seq)
self.back = None
def __iter__(self):
return self
def save(self):
self.seq, self.back = tee(self.seq)
return self
def restore(self):
if self.back:
self.seq, self.back = tee(self.back)
return self
def next(self):
return self.seq.next()
@contextmanager
def ignore(exception):
try:
yield
except exception:
pass
class AddonError(Exception):
"""
Исключение с добавочным сообщением
"""
def __init__(self, msg, addon=None):
self.message = msg
self.addon = addon
Exception.__init__(self, msg)
class FalseObject(object):
"""
Объект-заглушка при любом сравнении возвращает False
"""
def __lt__(self, other):
return False
def __nonzero__(self):
return False
__gt__ = __ge__ = __le__ = __eq__ = __ne__ = __lt__
class Sizes(object):
K = KiB = kibibyte = 1024
kB = kilobyte = 1000
M = MiB = mibibyte = K * 1024
Mb = megabyte = kB * 1000
G = GiB = gibibyte = M * 1024
Gb = gigabyte = Mb * 1000
T = TiB = tibibyte = G * 1024
Tb = terabyte = Gb * 1000
Sector = 512
def __getattr__(self, name):
if name.startswith('from_'):
return lambda x: x * getattr(Sizes, name[5:])
elif name.startswith('to_'):
return lambda x: x / getattr(Sizes, name[3:])
else:
raise AttributeError
def _from(self, name, count):
return count * getattr(Sizes, name)
def _to(self, name, count):
return count / getattr(Sizes, name)
def imap_regexp(re_compiled, l, whole=False):
"""
Обработать список регулярным выражением и вернуть полученные группы
"""
if whole:
retfunc = lambda x: x.group()
else:
retfunc = lambda x: x.groups()
return (retfunc(x) for x in (re_compiled.search(x) for x in l) if x)
def cached(each_instance=False):
"""
Кэширование результата
"""
def cache_wrapper(func):
value = {}
def wrapper(*args, **kwargs):
if each_instance:
if args[0] not in value:
value[args[0]] = func(*args, **kwargs)
return value[args[0]]
else:
if None not in value:
value[None] = func(*args, **kwargs)
return value[None]
return wrapper
if each_instance in (True, False):
return cache_wrapper
else:
return cache_wrapper(each_instance)
class Singleton(type):
"""
Существует только один экземпляр данного класса
"""
_instances = {}
def __call__(cls, *args, **kw):
if cls not in cls._instances:
create_obj = getattr(super(Singleton, cls), '__call__')
cls._instances[cls] = create_obj(*args, **kw)
return cls._instances[cls]
class SingletonParam(type):
"""
Параметризированный Singleton: может быть создано множество объектов одного
и того же класса, но только если у них разные параметры инициализации
"""
def __init__(cls, name, bases, dicts):
super(SingletonParam, cls).__init__(name, bases, dicts)
cls.instance = {}
def __call__(cls, *args, **kw):
if hasattr(cls, "param_id"):
keyarg = cls.param_id(*args, **kw)
else:
keyarg = args[0] if args else ""
if keyarg not in cls.instance:
create_obj = getattr(super(SingletonParam, cls), '__call__')
cls.instance[keyarg] = create_obj(*args, **kw)
return cls.instance[keyarg]
def classificate(iterator):
"""
Классифицировать элементы по признаку (first первый last последний)
"""
class Mark:
def __init__(self, first=False, last=False):
self.first = first
self.last = last
def __repr__(self):
return "Mark(first=%s,last=%s)" % (self.first, self.last)
iterator = iter(iterator)
obj = iterator.next()
try:
obj_next = iterator.next()
yield Mark(first=True, last=False), obj
try:
while True:
obj = obj_next
obj_next = iterator.next()
yield Mark(), obj
except StopIteration:
yield Mark(first=False, last=True), obj
except StopIteration:
yield Mark(first=True, last=True), obj
def debug(prefix="DEBUG {name}({args},{kw})={ret}"):
"""
Декоратор для вывода отладочных сообщений при вызове функции
:param prefix:
:return:
"""
def debug_decor(f):
@wraps(f)
def wrapper(*args, **kw):
ret = f(*args, **kw)
print prefix.format(name=f.func_name, args=args, kw=kw, ret=ret)
return ret
return wrapper
return debug_decor
class ClassPropertyDescriptor(object):
def __init__(self, fget, fset=None):
self.fget = fget
self.fset = fset
def __get__(self, obj, klass=None):
if klass is None:
klass = type(obj)
return self.fget.__get__(obj, klass)()
def __set__(self, obj, value):
if not self.fset:
raise AttributeError("can't set attribute")
type_ = type(obj)
return self.fset.__get__(obj, type_)(value)
def setter(self, func):
if not isinstance(func, (classmethod, staticmethod)):
func = classmethod(func)
self.fset = func
return self
def classproperty(func):
"""
Классовое свойство
:param func:
:return:
"""
if not isinstance(func, (classmethod, staticmethod)):
func = classmethod(func)
return ClassPropertyDescriptor(func)
class LockError(Exception):
pass
class Locker(object):
locker_directory = '/run/calculate/locker'
def __init__(self, name=None, fn=None, timeout=None):
if name is not None:
if not path.exists(self.locker_directory):
os.makedirs(self.locker_directory)
self.fn = path.join(self.locker_directory, name)
else:
dn = path.dirname(fn)
if not path.exists(dn):
os.makedirs(dn)
self.fn = fn
self.lockf = None
if timeout is None:
self.timeout = 10
else:
self.timeout = 0
def lockfile(self, fn, timeout=None):
"""
Блокировка файла с таймаутом
"""
if timeout is None:
timeout = self.timeout
for i in range(0, timeout * 40 or 1):
try:
self.lockf = os.open(fn, os.O_CREAT | os.O_RDWR)
fcntl.flock(self.lockf, fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except IOError, e:
if e.errno != os.errno.EAGAIN:
raise e
os.close(self.lockf)
self.lockf = None
except OSError, e:
if e.errno != os.errno.EEXIST:
raise e
time.sleep(0.25)
else:
raise LockError("Lock timeout of %s" % fn)
def acquire(self):
if self.lockf is None:
self.lockfile(self.fn)
return True
def release(self):
if self.lockf is not None:
os.close(self.lockf)
self.lockf = None
return True
def remove(self):
self.release()
if path.exists(self.fn):
try:
os.remove(self.fn)
except (OSError, IOError):
pass
def __enter__(self):
""" Activated when used in the with statement.
Should automatically acquire a lock to be used in the with block.
"""
self.acquire()
return self
def __exit__(self, type, value, traceback):
""" Activated at the end of the with statement.
It automatically releases the lock if it isn't locked.
"""
self.release()
def __del__(self):
""" Make sure that the FileLock instance doesn't leave a lockfile
lying around.
"""
self.release()
class Signal(object):
def __init__(self):
self.subscribers = []
def connect(self, subscriber, unique=True):
if not unique or subscriber not in self.subscribers:
self.subscribers.append(subscriber)
def emit(self, *args, **kwargs):
for subscriber in self.subscribers:
subscriber(*args, **kwargs)
def disconnect(self, subscriber, removeall=False):
while True:
if subscriber in self.subscribers:
self.subscribers.remove(subscriber)
if removeall:
continue
break
def max_default(iterable, key=lambda x: x, default=None):
try:
return max(iterable, key=key)
except ValueError:
return default
def traverse(o, tree_types=(list, tuple, GeneratorType)):
"""
Вернуть последовательно все элемены списка, "распаковов" встроенные
[1, 2, [1,2,[4,6,7]], 7] -> (1,2,1,2,4,6,7,7)
:param o:
:param tree_types: типы, которые будут распакованы
:return:
"""
if isinstance(o, tree_types):
for value in o:
for subvalue in traverse(value, tree_types):
yield subvalue
else:
yield o
@total_ordering
class ReverseKey(object):
"""
Объект для создания ключей обратной сортировки
s = [("A","B"), ("A","C"), ("B","A")]
sorted(s, key=lambda x: (x[0], ReverseKey(x[1]))
"""
__slots__ = ['data']
def __init__(self, data):
self.data = data
def __eq__(self, o):
return o.data == self.data
def __lt__(self, o):
return o.data < self.data
def unique(iterable):
"""
Исключить из списка дублирующие элементы, не меняя порядка
"""
seen = {}
for x in iterable:
if x in seen:
continue
seen[x] = 1
yield x
def sorteduniqresult(f):
@wraps(f)
def wrapper(*args, **kw):
for x in sorted(set(f(*args, **kw))):
yield x
return wrapper
def repeater(*args):
"""
Циклы для повторяющихся действий с ожидаением
:param args:
:return:
"""
yield 0
for i, x in enumerate(args):
if x:
time.sleep(x)
yield i+1
def iterate_list(l):
"""
Перебрать последовательность (исключая строки) поэлемено,
или же возвращает последовательность из одно элемента
l1 = 1
l2 = [1,2]
for x in iterate_list(l1):
print x
:param l: Список, кортеж, или прочее
:return:
"""
if not isinstance(l, StringTypes):
for x in l:
yield x
else:
yield l
def has_any(a, b):
"""
Содержат ли два списка хотя бы один одинаковый элемент
:param a:
:param b:
:return:
"""
return any(x in a for x in b)
def deprecated(func):
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emmitted
when the function is used."""
@wraps(func)
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) #turn off filter
warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2)
warnings.simplefilter('default', DeprecationWarning) #reset filter
return func(*args, **kwargs)
return new_func
def unpack_single_opts(args):
"""
Преобразовать одиночные аргументы
"""
re_single = re.compile("^-[A-Za-z0-9]")
re_right = re.compile("^-[A-Za-z0-9]+$")
for arg in args:
if re_single.search(arg):
if not re_right.match(arg):
sys.stderr.write("Wrong argument %s" % arg)
for letter in arg[1:]:
yield "-%s" % letter
else:
yield arg
drop_slash = lambda x: x[1:] if x.startswith('/') else x
class Cachable(object):
"""
Объект содержащий кэширующие методы
"""
def __init__(self):
self.clear_method_cache()
def clear_method_cache(self):
self._method_cache = {}
@staticmethod
def methodcached(key=lambda *args, **kw: hash(args)):
"""
Кэширование результата по параметрам метода, по ключевой функции
можно выбрать какие параметры играют роль
"""
def decorator(f):
funcname = f.func_name
@wraps(f)
def wrapper(self, *args, **kw):
keyval = key(*args, **kw)
assert isinstance(self, Cachable)
if funcname not in self._method_cache:
self._method_cache[funcname] = {}
cache = self._method_cache[funcname]
if keyval not in cache:
cache[keyval] = f(self, *args, **kw)
return cache[keyval]
@wraps(f)
def null_wrapper(self):
assert isinstance(self, Cachable)
if funcname not in self._method_cache:
self._method_cache[funcname] = f(self)
return self._method_cache[funcname]
if len(f.func_code.co_varnames) > 1:
return wrapper
else:
return null_wrapper
return decorator
def firstelement(i, fallback=''):
for x in i:
return x
return fallback
class GenericFs(object):
"""
Объект обобщения файловых систем
"""
__metaclass__ = ABCMeta
@abstractmethod
def exists(self, fn):
pass
@abstractmethod
def read(self, fn):
pass
@abstractmethod
def glob(self, fn):
pass
@abstractmethod
def realpath(self, fn):
pass
@abstractmethod
def write(self, fn, data):
pass
@abstractmethod
def listdir(self, dn, fullpath=False):
pass
class Brokenable(object):
@property
def broken(self):
return False
@staticmethod
def broken_result(res=None):
def decor(f):
@wraps(f)
def wrapper(self, *args, **kw):
if self.broken:
return res
return f(self, *args, **kw)
return wrapper
return decor
def get_best_nearest_resolution(preferred_res, support_res, aspect_only=False):
"""
Получить наилучшее ближайшее к preferred_res разрешенение из support_res.
Разрешение берётся ближайшее по размеру диагонали, с применением "штрафного"
коэфициента по разнице аспектов
"""
width, height = map(int, preferred_res.split('x'))
gep = sqrt(height ** 2 + width ** 2)
k = float(width) / float(height)
s = float(width) * float(height)
support_res_int = (tuple(map(int, x.split("x"))) for x in support_res)
support_res_int = ((x, y, sqrt(y*y + x*x), abs(x/float(y)-k))
for x, y in support_res_int)
if not aspect_only:
support_res_int = [(x,y,g,dasp)
for x,y,g,dasp in support_res_int if x <= width and y <= height]
if not support_res_int:
return None
keyfunc = lambda x,y,g, dasp: g -g * dasp
bestRes = max(support_res_int, key=lambda x:keyfunc(*x) )
return "%sx%s" % bestRes[0:2]