You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-console-gui/libs_crutch/core/result_viewer.py

693 lines
21 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2011-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from itertools import cycle
from calculate.lib.utils.colortext import get_terminal_print, Terminal, \
TextState, convert_xml_to_terminal, Print
from calculate.lib.cl_progressbar import get_progress_bar
import sys
from calculate.lib.utils.files import getch, set_active_tty, get_active_tty
from calculate.lib.utils.text import tableReport
import threading
from calculate.lib.utils.tools import classificate
Colors = TextState.Colors
from calculate.lib.cl_lang import setLocalTranslate
_ = lambda x: x
setLocalTranslate('cl_core3', sys.modules[__name__])
class Spinner(threading.Thread):
def __init__(self, *args, **kwargs):
self.__halt = threading.Event()
self.__main_thread = threading.currentThread()
threading.Thread.__init__(self, *args, **kwargs)
self.start()
def run(self):
Terminal().cursor = False
try:
sys.stdout.write(" |")
for c in cycle('/-\|'):
sys.stdout.write('\b' + c)
sys.stdout.flush()
self.__halt.wait(0.2)
sys.stdout.flush()
if self.__halt.is_set():
sys.stdout.write('\b\b \b\b')
return
if not self.__main_thread.is_alive():
return
finally:
Terminal().cursor = True
def stop(self):
self.__halt.set()
self.join()
class Table(tableReport):
def __init__(self, *args, **kwargs):
self.res = []
tableReport.__init__(self, *args, **kwargs)
def printFunc(self, s):
self.res.append(s)
def printTable(self):
self.setAutosize()
self.printReport(printRows=False)
return "".join(self.res)
def printTable(data, header=None):
try:
if any(data):
return Table(None, header, data, colSpan=0).printTable()
else:
return ""
except Exception:
# print str(e)
raise
def echo_on(f):
def wrapper(self, *args, **kw):
oldecho = self.parent.terminal_info.echo
self.parent.terminal_info.echo = True
try:
return f(self, *args, **kw)
finally:
self.parent.terminal_info.echo = oldecho
return wrapper
class TaskState(object):
"""
Текущее состояние вывода сообщений
"""
def __init__(self, parent):
self.parent = parent
@property
def state(self):
return self.parent.task_state
def process_tags(self, s):
"""
Выполнить текстовое преобразование
"""
s = s or ""
return convert_xml_to_terminal(s).replace(" ", " ")
def display_asterisk(self, color):
"""
Отобразить маркер
"""
self.parent.printer(" ")
self.parent.printer.foreground(color).bold("*")
self.parent.printer(" ")
def _right_indent(self, indent, width=-1):
"""
Выполнить выравнивание от правого края
"""
if width > 0:
self.parent.printer('\r')
self.parent.printer.right(width - indent)
else:
self.parent.printer(" ")
def _change_asterisk(self, color, width=-1):
if width > 0:
self.parent.printer('\r')
self.display_asterisk(color)
def dotting(self):
if self.parent.spinner:
self.parent.spinner.stop()
self.parent.printer(" ...")
self.parent.printer.flush()
def _print_result(self, text, color):
width = self.parent.terminal_info.width
self._change_asterisk(color, width)
self._right_indent(len(text) + 4, width)
self.parent.printer.bold.foreground(TextState.Colors.BLUE)("[ ")
self.parent.printer.bold.foreground(color)(text)
self.parent.printer.bold.foreground(TextState.Colors.BLUE)(" ]")
self.parent.printer("\n")
def _print_ok(self):
self._print_result("ok", TextState.Colors.GREEN)
def _print_failed(self):
self._print_result("!!", TextState.Colors.RED)
def _print_skip(self):
self._print_result("skip", TextState.Colors.YELLOW)
def display_result(self, result):
func_map = {"skip": self._print_skip,
False: self._print_failed}
func_map.get(result, self._print_ok)()
self.parent.printer.flush()
def startTask(self, message, progress, num):
pass
def endTask(self, result, progress_message=None):
pass
def breakTask(self):
pass
def printMessage(self, color, message):
for i, line in classificate(self.process_tags(message).split('\n')):
self.display_asterisk(color)
self.parent.printer(line)
if not i.last:
self.parent.printer('\n')
try:
self.parent.printer.flush()
except IOError:
pass
def printERROR(self, message):
self.printMessage(Colors.RED, message)
def printSUCCESS(self, message):
self.printMessage(Colors.GREEN, message)
def printWARNING(self, message):
self.printMessage(Colors.YELLOW, message)
def startGroup(self, message):
self.parent.printer.foreground(Colors.WHITE)(self.process_tags(message))
self.parent.printer('\n')
def endGroup(self):
pass
def beginFrame(self, message):
self.parent.terminal_info.echo = False
def endFrame(self):
self.parent.terminal_info.echo = True
def addProgress(self, message):
pass
def setProgress(self, percent, short_message, long_message):
pass
@echo_on
def askConfirm(self, message, default):
self.parent.printer("\n")
while True:
try:
_print = Print(output=self.parent.printer.output)
if default in "yes":
yes_color, no_color = Colors.GREEN, Colors.LIGHT_RED
else:
yes_color, no_color = Colors.LIGHT_RED, Colors.GREEN
yes = _print.foreground(yes_color)("Yes")
no = _print.foreground(no_color)("No")
white_message = _print.foreground(Colors.WHITE)(message)
ask = raw_input(white_message + ' (%s/%s): ' % (yes, no))
except (EOFError, KeyboardInterrupt):
ask = 'no'
print()
if ask.lower() in ['n', 'no']:
return "no"
if ask.lower() in ['y', 'yes']:
return "yes"
if ask == '':
return default
def printPre(self, message):
self.parent.printer(self.process_tags(message))
self.parent.printer('\n')
def printDefault(self, message):
self.parent.printer(self.process_tags(message))
self.parent.printer('\n')
@echo_on
def askChoice(self, message, answers):
self.parent.printer("\n")
Colors = TextState.Colors
printer = self.parent.printer
_print = Print(output=printer.output)
# ability answer by first letter
firstletter = 0
i_value, i_comment = 0, 1
answerByChar = map(lambda x: x[i_value][firstletter], answers)
if filter(lambda x: answerByChar.count(x) > 1, answerByChar):
use_getch = False
sa = slice(0, None)
else:
use_getch = True
sa = slice(1)
message = _print.foreground(Colors.WHITE)(message)
full_message = (message +
' (%s): ' % ("/".join(map(
lambda x: "%s[%s]" % (x[i_comment], x[i_value][sa]),
answers))))
while True:
CTRC_C = chr(3)
if use_getch:
printer(full_message)
ask = getch()
printer("\n")
if ask in (CTRC_C, ""):
raise KeyboardInterrupt
else:
try:
ask = raw_input(full_message)
except (EOFError, KeyboardInterrupt):
printer("\n")
raise KeyboardInterrupt
ask = ask.lower()
like_answers = filter(lambda x: x[i_value].startswith(ask),
answers)
if not like_answers:
self.state.printERROR(_('The answer is uncertain'))
continue
if len(like_answers) == 1:
return like_answers[i_value][firstletter]
else:
self.state.printERROR(_('Ambiguous answer:') +
",".join(map(lambda x: x[i_comment],
like_answers)))
@echo_on
def askQuestion(self, message):
self.parent.printer("\n")
return raw_input(message + _(":"))
def askPassword(self, message, twice):
from calculate.lib.utils.common import getpass
old_tty = None
try:
if self.parent.terminal_info.is_boot_console():
old_tty = get_active_tty()
set_active_tty(1)
text1 = _("%s: ") % message
if not twice:
return getpass.getpass(text1)
text2 = _('Repeat: ')
pass1 = 'password'
pass2 = 'repeat'
try:
while pass1 != pass2:
pass1 = getpass.getpass(text1)
pass2 = getpass.getpass(text2)
if pass1 != pass2:
self.state.printERROR(_('Passwords do not match'))
except KeyboardInterrupt:
return None
passwd = pass1 if (pass1 and pass1 == pass2) else None
return passwd
finally:
if old_tty and old_tty.isdigit():
set_active_tty(int(old_tty))
def printTable(self, table_name, head, body):
self.state.printSUCCESS(message=table_name)
self.parent.printer(printTable(body, head))
class CleanState(TaskState):
"""
Ожидается вывод
"""
def startTask(self, message, progress, num):
self.printMessage(Colors.GREEN, message)
self.parent.spinner = Spinner()
self.parent.set_state('start')
if progress:
self.parent.addProgress()
def printERROR(self, message):
super(CleanState, self).printERROR(message)
self.parent.printer('\n')
def printSUCCESS(self, message):
super(CleanState, self).printSUCCESS(message)
self.parent.printer('\n')
def printWARNING(self, message):
super(CleanState, self).printWARNING(message)
self.parent.printer('\n')
class CleanStateNoProgress(CleanState):
"""
... без отображения прогрессов
"""
def startTask(self, message, progress, num):
self.display_asterisk(Colors.GREEN)
self.parent.printer(message)
self.dotting()
self.parent.set_state('start')
class StartState(TaskState):
"""
Выполняется задача (отображается spinner)
"""
def startTask(self, message, progress, num):
self.parent.endTask(True)
self.parent.startTask(message, progress, num)
def endTask(self, result, progress_message=None):
self.dotting()
self.parent.set_state('clean')
self.display_result(result)
def breakTask(self):
self.dotting()
self.parent.set_state('clean')
self.parent.printer('\n')
def printERROR(self, message):
self.dotting()
self.parent.printer('\n')
self.parent.set_state('clean')
self.state.printERROR(message)
def printSUCCESS(self, message):
self.dotting()
self.parent.set_state('breaked')
self.state.printSUCCESS(message)
def printWARNING(self, message):
self.dotting()
self.parent.set_state('breaked')
self.state.printWARNING(message)
def startGroup(self, message):
self.state.endTask(True)
self.state.startGroup(message)
def endGroup(self):
self.state.endTask(True)
self.state.endGroup()
def beginFrame(self, message):
self.state.endTask(True)
self.state.beginFrame(message)
def endFrame(self):
self.state.endTask(True)
self.state.endFrame()
def addProgress(self, message):
self.parent.set_state("pre-progress")
self.state.addProgress(message)
def printPre(self, message):
self.parent.endTask(True)
self.state.printPre(message)
def printDefault(self, message):
self.state.endTask(True)
self.state.printDefault(message)
def askChoice(self, message, answers):
self.breakTask()
return self.state.askChoice(message, answers)
def askQuestion(self, message):
self.breakTask()
return self.state.askQuestion(message)
def askPassword(self, message, twice):
self.breakTask()
return self.state.askPassword(message, twice)
def askConfirm(self, message, default):
self.breakTask()
return self.state.askConfirm(message, default)
def printTable(self, table_name, head, body):
self.breakTask()
self.state.printTable(table_name, head, body)
class StartStateNoProgress(StartState):
"""
... без прогресса
"""
def startTask(self, message, progress, num):
self.parent.endTask(True)
self.parent.startTask(message, progress, num)
def endTask(self, result, progress_message=None):
self.parent.set_state('clean')
self.display_result(result)
def breakTask(self):
self.parent.printer('\n')
def printERROR(self, message):
self.breakTask()
self.parent.set_state('clean')
self.state.printERROR(message)
def printSUCCESS(self, message):
self.breakTask()
self.parent.set_state('clean')
self.state.printSUCCESS(message)
def printWARNING(self, message):
self.breakTask()
self.parent.set_state('clean')
self.state.printWARNING(message)
def addProgress(self, message):
pass
class BreakedState(StartState):
"""
Во время выполнения задачи выведено сообщение
"""
def stop_spinner_newline(self):
self.parent.spinner.stop()
self.parent.printer('\n')
def startTask(self, message, progress, num):
self.state.endTask(True)
self.state.startTask(message, progress, num)
def breakTask(self):
self.stop_spinner_newline()
self.parent.set_state('clean')
def endTask(self, result, progress_message=None):
self.breakTask()
def printERROR(self, message):
self.parent.endTask(True)
self.state.printERROR(message)
def printSUCCESS(self, message):
self.stop_spinner_newline()
TaskState.printSUCCESS(self, message)
self.parent.spinner = Spinner()
def printWARNING(self, message):
self.stop_spinner_newline()
TaskState.printWARNING(self, message)
self.parent.spinner = Spinner()
class PreProgressState(StartState):
"""
Задача запрошена как с прогрессом но проценты еще не обрабатывались
"""
def addProgress(self, message):
pass
def setProgress(self, percent, short_message, long_message):
self.parent.set_state("progress")
self.dotting()
self.parent.printer("\n")
self.parent.add_progressbar()
self.parent.terminal_info.cursor = False
self.state.setProgress(percent, short_message, long_message)
class ProgressState(StartState):
"""
Отображается progressbar
"""
def finish_and_clean(self):
self.parent.printer('\r')
self.parent.printer.flush()
self.parent.progress.finish()
self.parent.terminal_info.cursor = True
self.parent.set_progressbar(None)
self.parent.printer.up(1).clear_line("")
self.parent.printer.up(1)("")
def setProgress(self, percent, short_message, long_message):
if not 0 <= percent <= 100:
self.breakTask()
else:
self.parent.progress.update(percent)
def breakTask(self):
self.finish_and_clean()
self.parent.set_state('clean')
self.parent.printer('\n')
def endTask(self, result, progress_message=None):
self.finish_and_clean()
self.parent.set_state('clean')
self.display_result(result)
def printERROR(self, message):
self.finish_and_clean()
self.parent.printer.down(1)("")
self.parent.set_state('clean')
self.state.printERROR(message)
def printSUCCESS(self, message):
self.finish_and_clean()
self.parent.set_state('breaked')
self.state.printSUCCESS(message)
def printWARNING(self, message):
self.finish_and_clean()
self.parent.set_state('breaked')
self.state.printWARNING(message)
class ResultViewer(object):
"""
Просмотрщик результатов
"""
def __init__(self):
self.printer = \
get_terminal_print(sys.stdout)
self.terminal_info = Terminal()
self.states = {'clean': CleanState(self),
'breaked': BreakedState(self),
'pre-progress': PreProgressState(self),
'progress': ProgressState(self),
'start': StartState(self)}
self.task_state = self.states['clean']
self.spinner = None
self.progress = None
self.no_questions = False
def set_no_progress(self):
self.states = {'clean': CleanStateNoProgress(self),
'start': StartStateNoProgress(self)}
self.set_state('clean')
def set_no_questions(self):
self.no_questions = True
def set_state(self, state):
self.task_state = self.states[state]
def add_progressbar(self):
self.set_progressbar(get_progress_bar())
def set_progressbar(self, pb):
self.progress = pb
def endTask(self, result=None, progress_message=None):
self.task_state.endTask(result, progress_message)
def startTask(self, message, progress=False, num=1):
self.task_state.startTask(message, progress, num)
def printERROR(self, message, onlyShow=None):
if onlyShow != 'gui':
self.task_state.printERROR(message)
def printSUCCESS(self, message, onlyShow=None):
if onlyShow != 'gui':
self.task_state.printSUCCESS(message)
def printWARNING(self, message, onlyShow=None):
if onlyShow != 'gui':
self.task_state.printWARNING(message)
def startGroup(self, message):
self.task_state.startGroup(message)
def endGroup(self):
self.task_state.endGroup()
def beginFrame(self, message=None):
self.task_state.beginFrame(message)
def endFrame(self):
self.task_state.endFrame()
def addProgress(self, message=None):
self.task_state.addProgress(message)
def setProgress(self, percent, short_message=None, long_message=None):
self.task_state.setProgress(percent, short_message, long_message)
def printPre(self, message, onlyShow=None):
if onlyShow != 'gui':
self.task_state.printPre(message)
def printDefault(self, message='', onlyShow=None):
if onlyShow != 'gui':
self.task_state.printDefault(message)
def askConfirm(self, message, default="yes"):
if self.no_questions:
return default
return self.task_state.askConfirm(message, default)
def askChoice(self, message, answers=(("yes", "Yes"), ("no", "No"))):
return self.task_state.askChoice(message, answers)
def askPassword(self, message, twice=False):
return self.task_state.askPassword(message, twice)
def askQuestion(self, message):
return self.task_state.askQuestion(message)
def printTable(self, table_name, head, body, fields=None,
onClick=None, addAction=None, step=None, records=None):
self.task_state.printTable(table_name, head, body)