# -*- coding: utf-8 -*- # Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import pickle import sys import termios import os from os import path from fcntl import ioctl from array import array import threading import argparse import re from calculate.lib.utils.colortext import get_terminal_print from calculate.lib.utils.colortext.palette import TextState from calculate.lib.utils.text import tableReport from calculate.lib.cl_print import color_print from calculate.lib.datavars import VariableError, CriticalError from calculate.lib.cl_lang import setLocalTranslate from calculate.core.server.api_types import FieldAdapter from calculate.lib.utils.tools import ignore from calculate.lib.utils.files import makeDirectory from calculate.core.result_viewer import ResultViewer from calculate.core.server.loaded_methods import LoadedMethods from calculate.core.result_viewer_gui import ProgressGui, ErrorGui, WarningGui from .gen_pid import ProcessStatus from .methods_func import (get_method_argparser, collect_object, check_result_msg, get_param_pwd, _print, display_error) from .api_types import ViewInfo, ViewParams from .cert_cmd import parse from .methods_func import GotErrorField from .func import shortTraceback, CommonMethods, CommonLink _ = lambda x: x setLocalTranslate('cl_core3', sys.modules[__name__]) class LocalCall(): method_status = ProcessStatus.NotFound no_progress = None gui_progress = None gui_warning = None no_questions = None @staticmethod def startprocess(cls, sid, target=None, method=None, method_name=None, auto_delete=False, args_proc=()): """ start process """ if "LANG" in os.environ: curThread = threading.currentThread() curThread.lang = os.environ["LANG"] com = target(cls.no_progress, cls.gui_progress, cls.gui_warning, cls.no_questions) if len(com.__class__.__bases__) > 1 and \ hasattr(com.__class__.__bases__[1], '__init__'): com.__class__.__bases__[1].__init__(com) com.method_name = method_name com.method_status = ProcessStatus.Worked if getattr(com, method)(*args_proc): cls.method_status = ProcessStatus.SuccessFinished else: cls.method_status = ProcessStatus.FailedFinished com.method_status = cls.method_status cls.del_pid_file(cls, os.getpid(), com.clVars) return 0 @staticmethod def del_pid_file(cls, pid, clVars=None): if clVars: pids = clVars.Get('core.cl_core_pids_path') else: pids = '/tmp' pid_file = path.join(pids, '%d.pid' % pid) with ignore(OSError): if path.exists(pid_file): os.unlink(pid_file) class Common(CommonMethods, CommonLink): """ class to interact with the processes """ def __init__(self, no_progress, gui_progress, gui_warning, no_questions): self.pid = 0 self.method_name = "" self.method_status = ProcessStatus.Worked self.color_print = color_print() self.result_viewer = ResultViewer() if no_questions: self.result_viewer.set_no_questions() if no_progress: self.result_viewer.set_no_progress() if gui_progress: self.result_viewer = ErrorGui(ProgressGui(self.result_viewer)) if gui_warning: self.result_viewer = WarningGui(self.result_viewer) self.set_link(self.result_viewer) def pauseProcess(self): self.method_status = ProcessStatus.Paused self.writeFile() def resumeProcess(self): self.method_status = ProcessStatus.Worked self.writeFile() def writeFile(self): """ write data in file """ from calculate.core.server.gen_pid import ProcessMode if os.getuid(): return pid = os.getpid() pids = self.clVars.Get('core.cl_core_pids_path') # пропустить создание файла если идет сборка пакета if self.clVars.Get('cl_ebuild_phase'): return if self.clVars.Get('cl_root_readonly') == 'on': return build_id = "" try: from calculate.builder.variables.action import Actions if self.clVars.Get('cl_action') in Actions.All: build_id = self.clVars.Get('builder.cl_builder_id') except Exception: pass if not os.path.exists(pids): makeDirectory(pids) pid_file = path.join(pids, '%d.pid' % pid) try: with open(pid_file, 'wb') as f: d = {'name': self.method_name, 'mode': ProcessMode.LocalCall, 'os_pid': pid, 'status': self.method_status, 'id': build_id} pickle.dump(d, f) except (IOError, OSError) as e: print(str(e)) print(_("Failed to write the PID file %s!") % pid_file) def isInteractive(self): """ Check interactive ability """ return sys.stdin.isatty() def cout_progress(string=None): try: h, w = array('h', ioctl(sys.stderr, termios.TIOCGWINSZ, '\0' * 8))[:2] except IOError: return sys.stdout.write('\r' + (' ' * w)) if string: sys.stdout.write('\r' + string) else: sys.stdout.write('\r') sys.stdout.flush() def local_method(metaObject, args, unknown_args): """ Call method from metaclass, check method existing. Generate help, for method, run method by 'call_method'. """ import os sym_link = os.path.basename(sys.argv[0]) # sym_link = "cl-core" if not (sym_link == 'cl-core' or sym_link == 'cl-core_py3'): if sym_link in LoadedMethods.conMethods.keys(): args.method = LoadedMethods.conMethods[sym_link][0] else: _print(_("Method not found for %s") % sym_link) sys.exit(1) if args.list_methods: for k, v in sorted(LoadedMethods.conMethods.items(), key=lambda x: x[1]): name, user, title = v print("%s - %s" % (name, title)) return 0 colorPrint = color_print() # metaObject = metaclass() method_name = args.method method_view_name = method_name + '_view' if args.method and args.help: force_param = args.no_questions or has_force_arg(unknown_args) while True: view_obj = ViewParams() view_obj.step = None view_obj.expert = True view_obj.brief = None view_obj.onlyhelp = True view_obj.help_set = True view_obj.conargs = [x[0] for x in args._get_kwargs() if x[1] is not None] view_obj.dispatch_usenew = \ force_param try: view = getattr(metaObject, method_view_name)(metaObject, 0, view_obj) except AttributeError: print("DE") colorPrint.printERROR(_('Method not found: ') + method_view_name) return 1 try: method_parser = get_method_argparser(view, args, cl_core=True) except Exception: # import traceback # for i in apply(traceback.format_exception, sys.exc_info()): # sys.stderr.write(i) # sys.stderr.flush() metaObject.clear_cache(0, method_name) return 1 _unknown_args = method_parser.fixBoolVariables(unknown_args) _args, _unknown_args = method_parser.parse_known_args(_unknown_args) if (view_obj.dispatch_usenew == _args.no_questions or args.no_questions): method_parser.print_help() break else: force_param = _args.no_questions metaObject.clear_cache(0, method_name) else: try: call_method(metaObject, args, unknown_args, colorPrint) metaObject.clear_cache(0, method_name) return metaObject.method_status except (VariableError) as e: colorPrint.printERROR(str(e)) # colorPrint.printERROR(shortTraceback(*sys.exc_info())) except (ValueError, CriticalError) as e: colorPrint.printERROR(str(e)) # colorPrint.printERROR(shortTraceback(*sys.exc_info())) except (KeyboardInterrupt, EOFError): colorPrint.printERROR(_('Manually interrupted')) except (GotErrorField,): pass except Exception: colorPrint.printERROR(shortTraceback(*sys.exc_info())) pass # print 'Error: ', e metaObject.clear_cache(0, method_name) def call_method(metaObject, args, unknown_args, colorPrint): """ Function for call method through metaObject and args """ method_name = args.method stdin_passwd = args.stdin_passwd method_view_name = method_name + '_view' metaObject.no_progress = args.no_progress metaObject.gui_progress = args.gui_progress metaObject.gui_warning = args.gui_warning metaObject.no_questions = False view = None method_parser = None dispatch_usenew = args.no_questions or has_force_arg(unknown_args) while True: view_obj = ViewInfo() view_obj.step = None view_obj.expert = True view_obj.brief = None view_obj.onlyhelp = True view_obj.help_set = False view_obj.conargs = [x for x in unknown_args] view_obj.dispatch_usenew = dispatch_usenew try: view = getattr(metaObject, method_view_name)(metaObject, 0, view_obj) except AttributeError: colorPrint.printERROR(_('Method not found: ') + method_name) return None method_parser = get_method_argparser(view, args, cl_core=True) _unknown_args = method_parser.fixBoolVariables(unknown_args) _args, _unknown_args = method_parser.parse_known_args(_unknown_args) if (view_obj.dispatch_usenew == _args.no_questions or args.no_questions): break else: dispatch_usenew = _args.no_questions metaObject.clear_cache(0, method_name) no_questions = dispatch_usenew param_object = create_param_object(view) try: unknown_args = method_parser.fixBoolVariables(unknown_args) args, unknown_args = method_parser.parse_known_args(unknown_args) metaObject.no_questions = no_questions except SystemExit: return 1 except Exception: import traceback for i in traceback.format_exception(*sys.exc_info()): sys.stderr.write(i) sys.stderr.flush() raise for i in unknown_args: if i.startswith('-'): if i in parse(True).parse_known_args()[1]: _print(_('Unknown parameter'), i) return 1 else: _print(_('Unknown argument'), i) return 1 param_object, steps = collect_object(None, param_object, view, args, stdin_passwd=stdin_passwd) if view.has_brief: setattr(param_object, 'CheckOnly', True) check_res = {} while True: method_result = getattr(metaObject, method_name)(metaObject, 0, param_object) if not method_result: print(_('Method not available')) return None if method_result[0].type and method_result[0].type != "pid": check_res = check_result_msg(method_result, view, check_res, args) if not check_res: return None else: param_object = get_param_pwd(check_res, view, param_object, stdin_passwd=stdin_passwd) else: break view_obj = ViewInfo() view_obj.step = None view_obj.expert = True view_obj.brief = True view_obj.onlyhelp = False view_obj.help_set = False view_obj.conargs = [x[0] for x in args._get_kwargs() if x[1] is not None] view_obj.dispatch_usenew = dispatch_usenew try: view = getattr(metaObject, method_view_name)(metaObject, 0, view_obj) except AttributeError: colorPrint.printERROR(_('Method not found: ') + method_name) print_brief(view, steps.label) for group in view.groups: for field in group.fields: if "error" in field.name: return None if not no_questions: if stdin_passwd: colorPrint.printERROR("Could not use the interactive mode. " "Use option '-f' for run the process.") return None try: ask = ResultViewer().askConfirm(_("Run process?")) except KeyboardInterrupt: ask = "no" if ask.lower() in ['n', 'no']: colorPrint.printERROR(_('Manually interrupted')) return None setattr(param_object, 'CheckOnly', False) method_result = [] try: check_res = {} while True: method_result = getattr(metaObject, method_name)(metaObject, 0, param_object) if not method_result: colorPrint.printERROR(_('method unavailable')) return None if method_result[0].type and method_result[0].type != "pid": check_res = check_result_msg(method_result, view, check_res, args) if not check_res: return None else: param_object = get_param_pwd(check_res, view, param_object, stdin_passwd=stdin_passwd) else: break except VariableError as e: _print(e) return None #for ReturnedMessage in method_result: # if ReturnedMessage.type and ReturnedMessage.type != "pid": # display_error(ReturnedMessage, args, view.groups) # # params_text = '' # # for Group in view.groups: # # for field in Group.fields: # # if field.name == ReturnedMessage.field: # # params_text += getErrorOnParam(args, field) # # colorPrint.printERROR('\r' + params_text % \ # # str(ReturnedMessage)) # return None return method_result def create_param_object(view): param_object = type('collect_object', (object,), {}) param_object.CheckAll = True param_object._type_info = {} for Group in view.groups: if not Group.fields: continue for field in Group.fields: setattr(param_object, field.name, None) param_object._type_info[field.name] = None return param_object def print_brief(view, brief_label): for Group in view.groups: if Group.name: if not Group.fields: continue print_brief_group(Group.fields, Group.name) class ColorTable(tableReport): def __init__(self, head, body, printer, head_printer=None, line_printer=None, body_printer=None): super().__init__(None, head, body, colSpan=0) self.default_printer = printer self.line_printer = line_printer or printer self.head_printer = head_printer or printer self.body_printer = body_printer or printer self.head = head self.body = body class Display(): def __init__(self): self._print = get_terminal_print(color_print().defaultPrint) def print_info(self, label, value): GREEN = TextState.Colors.GREEN self.display_asterisk(GREEN) self._print(_("%s: ") % label) WHITE = TextState.Colors.WHITE self._print.foreground(WHITE)(value) self._print("\n") def print_label(self, label): GREEN = TextState.Colors.GREEN self.display_asterisk(GREEN) self._print(_("%s: ") % label) self._print("\n") def display_asterisk(self, color): self._print(" ") self._print.foreground(color).bold("*") self._print(" ") def print_error(self, message): RED = TextState.Colors.RED self.display_asterisk(RED) self._print(message) self._print("\n") def print_warning(self, message): YELLOW = TextState.Colors.YELLOW self.display_asterisk(YELLOW) self._print(message) self._print("\n") def print_table(self, data, head): WHITE = TextState.Colors.WHITE ColorTable(head, data, self._print, body_printer=self._print.foreground( WHITE).clone()).printReport(False) # sys.stdout.write('%s\n' % printTable(data, head)) def print_group(self, label): self._print(label) self._print("\n") class InformationElement(): def __init__(self, field, display): self.value = "" self.label = "" self.display = display @classmethod def from_field(cls, field, display): if field.type == 'steps': return None map_elements = {'input': ValueInfo, 'openfile': ValueInfo, 'combo': ChoiceInfo, 'comboEdit': ChoiceInfo, 'radio': ChoiceInfo, 'file': ChoiceInfo, 'multichoice': MultiChoiceInfo, 'multichoice_add': MultiChoiceInfo, 'selecttable': MultiChoiceInfo, 'selecttable_add': MultiChoiceInfo, 'error': ErrorInfo, 'check': CheckInfo, 'check_tristate': CheckInfo, 'table': TableInfo } if field.element in map_elements: return map_elements[field.element](field, display) return None def show(self): self.display.print_info(self.label, self.value) class ValueInfo(InformationElement): def __init__(self, field, display): super().__init__(field, display) self.value = field.value or '' self.label = field.label class CheckInfo(InformationElement): def __init__(self, field, display): super().__init__(field, display) self.label = field.label map_answer = {'on': _('yes'), 'off': _("no"), 'auto': _('auto')} self.value = map_answer.get(field.value, field.value) class ChoiceInfo(InformationElement): def __init__(self, field, display): super().__init__(field, display) self.label = field.label or '' if field.choice and field.comments: map_comment = dict(zip(field.choice, field.comments)) self.value = map_comment.get(field.value, field.value) or '' else: self.value = field.value if field.value else '' class MultiChoiceInfo(InformationElement): def __init__(self, field, display): super().__init__(field, display) self.label = field.label or '' if field.listvalue: value = field.listvalue # удалить пустой первый элемент (особенности wsdl) if value and not value[0]: value.pop(0) if field.choice and field.comments: map_comment = dict(zip(field.choice, field.comments)) else: map_comment = {} self.value = ", ".join([map_comment.get(x, x) or '' for x in value]) else: self.value = field.value or "" class ErrorInfo(InformationElement): def __init__(self, field, display): super().__init__(field, display) self.label = field.label def show(self): self.display.print_error(self.label) class TableInfo(InformationElement): """ Табличная информация """ def map_row(self, row, typedata): map_answer = {'on': _('yes'), 'off': _("no"), 'auto': _('auto')} for cell, typefield in zip(row, typedata): if typefield in ['check', 'check_tristate']: yield map_answer.get(cell, cell) or "" elif "password" in typefield: yield "***" else: yield cell or "" def __init__(self, field, display): super().__init__(field, display) self.label = field.label self.head = field.tablevalue.head # удаление первого элемента строки (для wsdl) body = [x[1:] if x and not x[0] else x for x in field.tablevalue.body] if not [x for x in [x for x in body] if x]: self.body = None else: type_values = [x.typefield for x in field.tablevalue.values] self.body = [list(self.map_row(x, type_values)) for x in body] def show(self): if self.body: self.display.print_label(self.label) self.display.print_table(self.body, self.head) def print_brief_group(Fields, group_name): display = Display() show_group = True try: for element in (x for x in (InformationElement.from_field(FieldAdapter.from_detect(y),display) for y in Fields if not y.uncompatible) if x): if show_group: display.print_group(group_name) show_group = False element.show() except Exception: import traceback traceback.print_exc() raise class Methods(LocalCall.Common, object): _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__( cls, *args, **kwargs) return cls._instance def __init__(self): LocalCall.Common.__init__(self, False, False, False, False) def has_force_arg(args): """ Содержат ли аргумент force. Предварительное определение, так как на 100% невозможно определить является ли -sf двумя опциями -s,-f или это одна опция -s со значением "f" :param args: :return: """ force_parser = argparse.ArgumentParser(add_help=False) force_parser.add_argument( '-f', '--force', default=False, dest='force', action="store_true") _args, _drop = force_parser.parse_known_args(args) if _args.force: return True re_force = re.compile(r"^--force|-[a-zA-Z0-9]*f[a-zA-Z0-9]*$") for arg in args: if re_force.search(arg): return True else: return False