You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-lib/pym/calculate/lib/utils/tools.py

204 lines
5.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#-*- coding: utf-8 -*-
# Copyright 2014 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import traceback
from functools import wraps
from itertools import tee
from contextlib import contextmanager
class SavableIterator:
"""
Итератор с возможность сохранять и восстанавливать состояние
"""
def __init__(self,seq):
self.seq = iter(seq)
self.back = None
def __iter__(self):
return self
def save(self):
self.seq, self.back = tee(self.seq)
return self
def restore(self):
if self.back:
self.seq, self.back = tee(self.back)
return self
def next(self):
return self.seq.next()
@contextmanager
def ignore(exception):
try:
yield
except exception:
pass
class AddonError(Exception):
"""
Исключение с добавочным сообщением
"""
def __init__(self, msg, addon=None):
self.message = msg
self.addon = addon
Exception.__init__(self, msg)
class FalseObject(object):
"""
Объект-заглушка при любом сравнении возвращает False
"""
def __lt__(self,other):
return False
def __nonzero__(self):
return False
__gt__ = __ge__ = __le__ = __eq__ = __ne__ = __lt__
class Sizes(object):
K = KiB = kibibyte = 1024
kB = kilobyte = 1000
M = MiB = mibibyte = K * 1024
Mb = megabyte = kB * 1000
G = GiB = gibibyte = M * 1024
Gb = gigabyte = Mb * 1000
T = TiB = tibibyte = G * 1024
Tb = terabyte = Gb * 1000
def __getattr__(self,name):
if name.startswith('from_'):
return lambda x:x*getattr(Sizes,name[5:])
elif name.startswith('to_'):
return lambda x:x/getattr(Sizes,name[3:])
else:
raise AttributeError
def imap_regexp(re_compiled, l, whole=False):
"""
Обработать список регулярным выражением и вернуть полученные группы
"""
if whole:
retfunc = lambda x: x.group()
else:
retfunc = lambda x: x.groups()
return (retfunc(x) for x in (re_compiled.search(x) for x in l) if x)
def cached(each_instance=False):
"""
Кэширование результата
"""
def cache_wrapper(func):
value = {}
def wrapper(*args, **kwargs):
if each_instance:
if args[0] not in value:
value[args[0]] = func(*args, **kwargs)
return value[args[0]]
else:
if not None in value:
value[None] = func(*args, **kwargs)
return value[None]
return wrapper
if each_instance in (True,False):
return cache_wrapper
else:
return cache_wrapper(each_instance)
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args,
**kwargs)
return cls._instances[cls]
class ClassifyingIterator(object):
"""
Классифицирующий итератор
"""
def classificate(iterator):
"""
Классифицировать элементы по признаку (first первый last последний)
"""
class Mark:
def __init__(self, first=False, last=False):
self.first = first
self.last = last
def __repr__(self):
return "Mark(first=%s,last=%s)"%(self.first,self.last)
iterator = iter(iterator)
obj = iterator.next()
try:
obj_next = iterator.next()
yield Mark(first=True,last=False), obj
try:
while True:
obj = obj_next
obj_next = iterator.next()
yield Mark(), obj
except StopIteration:
yield Mark(first=False,last=True), obj
except StopIteration:
yield Mark(first=True,last=True), obj
def debug(prefix="DEBUG {name}({args},{kw})={ret}"):
def debug_decor(f):
@wraps(f)
def wrapper(*args, **kw):
ret = f(*args, **kw)
print prefix.format(name=f.func_name, args=args, kw=kw, ret=ret)
return ret
return wrapper
return debug_decor
def short_traceback(e1, e2, e3):
"""
Return short traceback
"""
frame = e3
for i in apply(traceback.format_exception, (e1, e2, e3)):
print i,
while frame.tb_next:
frame = frame.tb_next
module, part = os.path.split(frame.tb_frame.f_code.co_filename)
if part.endswith('.py'):
part = part[:-3]
fallback_mod = part
modname = [part]
while module != '/' and not module.endswith('site-packages'):
module, part = os.path.split(module)
modname.insert(0, part)
if module.endswith('site-packages'):
modname = ".".join(modname)
else:
modname = fallback_mod
return "%s:%s(%s:%s)" % (e1.__name__, str(e2), modname, frame.tb_lineno)