You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-core/pym/core/server/local_call.py

681 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pickle
import sys
import termios
import os
from os import path
from fcntl import ioctl
from array import array
import threading
import argparse
import re
from calculate.lib.utils.colortext import get_terminal_print
from calculate.lib.utils.colortext.palette import TextState
from calculate.lib.utils.text import tableReport
from calculate.lib.cl_print import color_print
from calculate.lib.datavars import VariableError, CriticalError
from calculate.lib.cl_lang import setLocalTranslate
from .api_types import FieldAdapter
from calculate.lib.utils.tools import ignore
from calculate.lib.utils.files import makeDirectory
from ..result_viewer import ResultViewer
from .loaded_methods import LoadedMethods
from ..result_viewer_gui import ProgressGui, ErrorGui, WarningGui
from .gen_pid import ProcessStatus
from .methods_func import (get_method_argparser, collect_object,
check_result_msg, get_param_pwd, _print,
display_error)
from .api_types import ViewInfo, ViewParams
from .cert_cmd import parse
from .methods_func import GotErrorField
from .func import shortTraceback, CommonMethods, CommonLink
_ = lambda x: x
setLocalTranslate('cl_core3', sys.modules[__name__])
class LocalCall():
method_status = ProcessStatus.NotFound
no_progress = None
gui_progress = None
gui_warning = None
no_questions = None
@staticmethod
def startprocess(cls, sid, target=None, method=None, method_name=None,
auto_delete=False, args_proc=()):
""" start process """
if "LANG" in os.environ:
curThread = threading.currentThread()
curThread.lang = os.environ["LANG"]
com = target(cls.no_progress, cls.gui_progress, cls.gui_warning,
cls.no_questions)
if len(com.__class__.__bases__) > 1 and \
hasattr(com.__class__.__bases__[1], '__init__'):
com.__class__.__bases__[1].__init__(com)
com.method_name = method_name
com.method_status = ProcessStatus.Worked
if getattr(com, method)(*args_proc):
cls.method_status = ProcessStatus.SuccessFinished
else:
cls.method_status = ProcessStatus.FailedFinished
com.method_status = cls.method_status
cls.del_pid_file(cls, os.getpid(), com.clVars)
return 0
@staticmethod
def del_pid_file(cls, pid, clVars=None):
if clVars:
pids = clVars.Get('core.cl_core_pids_path')
else:
pids = '/tmp'
pid_file = path.join(pids, '%d.pid' % pid)
with ignore(OSError):
if path.exists(pid_file):
os.unlink(pid_file)
class Common(CommonMethods, CommonLink):
""" class to interact with the processes """
def __init__(self, no_progress, gui_progress, gui_warning,
no_questions):
self.pid = 0
self.method_name = ""
self.method_status = ProcessStatus.Worked
self.color_print = color_print()
self.result_viewer = ResultViewer()
if no_questions:
self.result_viewer.set_no_questions()
if no_progress:
self.result_viewer.set_no_progress()
if gui_progress:
self.result_viewer = ErrorGui(ProgressGui(self.result_viewer))
if gui_warning:
self.result_viewer = WarningGui(self.result_viewer)
self.set_link(self.result_viewer)
def pauseProcess(self):
self.method_status = ProcessStatus.Paused
self.writeFile()
def resumeProcess(self):
self.method_status = ProcessStatus.Worked
self.writeFile()
def writeFile(self):
""" write data in file """
from .gen_pid import ProcessMode
if os.getuid():
return
pid = os.getpid()
pids = self.clVars.Get('core.cl_core_pids_path')
# пропустить создание файла если идет сборка пакета
if self.clVars.Get('cl_ebuild_phase'):
return
if self.clVars.Get('cl_root_readonly') == 'on':
return
build_id = ""
try:
from calculate.builder.variables.action import Actions
if self.clVars.Get('cl_action') in Actions.All:
build_id = self.clVars.Get('builder.cl_builder_id')
except Exception:
pass
if not os.path.exists(pids):
makeDirectory(pids)
pid_file = path.join(pids, '%d.pid' % pid)
try:
with open(pid_file, 'wb') as f:
d = {'name': self.method_name,
'mode': ProcessMode.LocalCall,
'os_pid': pid,
'status': self.method_status,
'id': build_id}
pickle.dump(d, f)
except (IOError, OSError) as e:
print(str(e))
print(_("Failed to write the PID file %s!") % pid_file)
def isInteractive(self):
"""
Check interactive ability
"""
return sys.stdin.isatty()
def cout_progress(string=None):
try:
h, w = array('h', ioctl(sys.stderr, termios.TIOCGWINSZ, '\0' * 8))[:2]
except IOError:
return
sys.stdout.write('\r' + (' ' * w))
if string:
sys.stdout.write('\r' + string)
else:
sys.stdout.write('\r')
sys.stdout.flush()
def local_method(metaObject, args, unknown_args):
"""
Call method from metaclass, check method existing.
Generate help, for method, run method by 'call_method'.
"""
import os
sym_link = os.path.basename(sys.argv[0])
# sym_link = "cl-core"
if not (sym_link == 'cl-core'):
if sym_link in LoadedMethods.conMethods.keys():
args.method = LoadedMethods.conMethods[sym_link][0]
else:
_print(_("Method not found for %s") % sym_link)
sys.exit(1)
if args.list_methods:
for k, v in sorted(LoadedMethods.conMethods.items(),
key=lambda x: x[1]):
name, user, title = v
print("%s - %s" % (name, title))
return 0
colorPrint = color_print()
# metaObject = metaclass()
method_name = args.method
method_view_name = method_name + '_view'
if args.method and args.help:
force_param = args.no_questions or has_force_arg(unknown_args)
while True:
view_obj = ViewParams()
view_obj.step = None
view_obj.expert = True
view_obj.brief = None
view_obj.onlyhelp = True
view_obj.help_set = True
view_obj.conargs = [x[0] for x in args._get_kwargs() if x[1] is not None]
view_obj.dispatch_usenew = \
force_param
try:
view = getattr(metaObject, method_view_name)(metaObject, 0, view_obj)
except AttributeError:
print("DE")
colorPrint.printERROR(_('Method not found: ') + method_view_name)
return 1
try:
method_parser = get_method_argparser(view, args, cl_core=True)
except Exception:
# import traceback
# for i in apply(traceback.format_exception, sys.exc_info()):
# sys.stderr.write(i)
# sys.stderr.flush()
metaObject.clear_cache(0, method_name)
return 1
_unknown_args = method_parser.fixBoolVariables(unknown_args)
_args, _unknown_args = method_parser.parse_known_args(_unknown_args)
if (view_obj.dispatch_usenew == _args.no_questions
or args.no_questions):
method_parser.print_help()
break
else:
force_param = _args.no_questions
metaObject.clear_cache(0, method_name)
else:
try:
call_method(metaObject, args, unknown_args, colorPrint)
metaObject.clear_cache(0, method_name)
return metaObject.method_status
except (VariableError) as e:
colorPrint.printERROR(str(e))
# colorPrint.printERROR(shortTraceback(*sys.exc_info()))
except (ValueError, CriticalError) as e:
colorPrint.printERROR(str(e))
# colorPrint.printERROR(shortTraceback(*sys.exc_info()))
except (KeyboardInterrupt, EOFError):
colorPrint.printERROR(_('Manually interrupted'))
except (GotErrorField,):
pass
except Exception:
colorPrint.printERROR(shortTraceback(*sys.exc_info()))
pass
# print 'Error: ', e
metaObject.clear_cache(0, method_name)
def call_method(metaObject, args, unknown_args, colorPrint):
"""
Function for call method through metaObject and args
"""
method_name = args.method
stdin_passwd = args.stdin_passwd
method_view_name = method_name + '_view'
metaObject.no_progress = args.no_progress
metaObject.gui_progress = args.gui_progress
metaObject.gui_warning = args.gui_warning
metaObject.no_questions = False
view = None
method_parser = None
dispatch_usenew = args.no_questions or has_force_arg(unknown_args)
while True:
view_obj = ViewInfo()
view_obj.step = None
view_obj.expert = True
view_obj.brief = None
view_obj.onlyhelp = True
view_obj.help_set = False
view_obj.conargs = [x for x in unknown_args]
view_obj.dispatch_usenew = dispatch_usenew
try:
view = getattr(metaObject, method_view_name)(metaObject, 0, view_obj)
except AttributeError:
colorPrint.printERROR(_('Method not found: ') + method_name)
return None
method_parser = get_method_argparser(view, args, cl_core=True)
_unknown_args = method_parser.fixBoolVariables(unknown_args)
_args, _unknown_args = method_parser.parse_known_args(_unknown_args)
if (view_obj.dispatch_usenew == _args.no_questions or
args.no_questions):
break
else:
dispatch_usenew = _args.no_questions
metaObject.clear_cache(0, method_name)
no_questions = dispatch_usenew
param_object = create_param_object(view)
try:
unknown_args = method_parser.fixBoolVariables(unknown_args)
args, unknown_args = method_parser.parse_known_args(unknown_args)
metaObject.no_questions = no_questions
except SystemExit:
return 1
except Exception:
import traceback
for i in traceback.format_exception(*sys.exc_info()):
sys.stderr.write(i)
sys.stderr.flush()
raise
for i in unknown_args:
if i.startswith('-'):
if i in parse(True).parse_known_args()[1]:
_print(_('Unknown parameter'), i)
return 1
else:
_print(_('Unknown argument'), i)
return 1
param_object, steps = collect_object(None, param_object, view, args,
stdin_passwd=stdin_passwd)
if view.has_brief:
setattr(param_object, 'CheckOnly', True)
check_res = {}
while True:
method_result = getattr(metaObject, method_name)(metaObject, 0, param_object)
if not method_result:
print(_('Method not available'))
return None
if method_result[0].type and method_result[0].type != "pid":
check_res = check_result_msg(method_result, view, check_res,
args)
if not check_res:
return None
else:
param_object = get_param_pwd(check_res, view,
param_object,
stdin_passwd=stdin_passwd)
else:
break
view_obj = ViewInfo()
view_obj.step = None
view_obj.expert = True
view_obj.brief = True
view_obj.onlyhelp = False
view_obj.help_set = False
view_obj.conargs = [x[0] for x in args._get_kwargs() if x[1] is not None]
view_obj.dispatch_usenew = dispatch_usenew
try:
view = getattr(metaObject, method_view_name)(metaObject, 0, view_obj)
except AttributeError:
colorPrint.printERROR(_('Method not found: ') + method_name)
print_brief(view, steps.label)
for group in view.groups:
for field in group.fields:
if "error" in field.name:
return None
if not no_questions:
if stdin_passwd:
colorPrint.printERROR("Could not use the interactive mode. "
"Use option '-f' for run the process.")
return None
try:
ask = ResultViewer().askConfirm(_("Run process?"))
except KeyboardInterrupt:
ask = "no"
if ask.lower() in ['n', 'no']:
colorPrint.printERROR(_('Manually interrupted'))
return None
setattr(param_object, 'CheckOnly', False)
method_result = []
try:
check_res = {}
while True:
method_result = getattr(metaObject, method_name)(metaObject, 0, param_object)
if not method_result:
colorPrint.printERROR(_('method unavailable'))
return None
if method_result[0].type and method_result[0].type != "pid":
check_res = check_result_msg(method_result, view, check_res,
args)
if not check_res:
return None
else:
param_object = get_param_pwd(check_res, view,
param_object,
stdin_passwd=stdin_passwd)
else:
break
except VariableError as e:
_print(e)
return None
#for ReturnedMessage in method_result:
# if ReturnedMessage.type and ReturnedMessage.type != "pid":
# display_error(ReturnedMessage, args, view.groups)
# # params_text = ''
# # for Group in view.groups:
# # for field in Group.fields:
# # if field.name == ReturnedMessage.field:
# # params_text += getErrorOnParam(args, field)
# # colorPrint.printERROR('\r' + params_text % \
# # str(ReturnedMessage))
# return None
return method_result
def create_param_object(view):
param_object = type('collect_object', (object,), {})
param_object.CheckAll = True
param_object._type_info = {}
for Group in view.groups:
if not Group.fields:
continue
for field in Group.fields:
setattr(param_object, field.name, None)
param_object._type_info[field.name] = None
return param_object
def print_brief(view, brief_label):
for Group in view.groups:
if Group.name:
if not Group.fields:
continue
print_brief_group(Group.fields, Group.name)
class ColorTable(tableReport):
def __init__(self, head, body, printer, head_printer=None,
line_printer=None, body_printer=None):
super().__init__(None, head, body, colSpan=0)
self.default_printer = printer
self.line_printer = line_printer or printer
self.head_printer = head_printer or printer
self.body_printer = body_printer or printer
self.head = head
self.body = body
class Display():
def __init__(self):
self._print = get_terminal_print(color_print().defaultPrint)
def print_info(self, label, value):
GREEN = TextState.Colors.GREEN
self.display_asterisk(GREEN)
self._print(_("%s: ") % label)
WHITE = TextState.Colors.WHITE
self._print.foreground(WHITE)(value)
self._print("\n")
def print_label(self, label):
GREEN = TextState.Colors.GREEN
self.display_asterisk(GREEN)
self._print(_("%s: ") % label)
self._print("\n")
def display_asterisk(self, color):
self._print(" ")
self._print.foreground(color).bold("*")
self._print(" ")
def print_error(self, message):
RED = TextState.Colors.RED
self.display_asterisk(RED)
self._print(message)
self._print("\n")
def print_warning(self, message):
YELLOW = TextState.Colors.YELLOW
self.display_asterisk(YELLOW)
self._print(message)
self._print("\n")
def print_table(self, data, head):
WHITE = TextState.Colors.WHITE
ColorTable(head, data, self._print,
body_printer=self._print.foreground(
WHITE).clone()).printReport(False)
# sys.stdout.write('%s\n' % printTable(data, head))
def print_group(self, label):
self._print(label)
self._print("\n")
class InformationElement():
def __init__(self, field, display):
self.value = ""
self.label = ""
self.display = display
@classmethod
def from_field(cls, field, display):
if field.type == 'steps':
return None
map_elements = {'input': ValueInfo,
'openfile': ValueInfo,
'combo': ChoiceInfo,
'comboEdit': ChoiceInfo,
'radio': ChoiceInfo,
'file': ChoiceInfo,
'multichoice': MultiChoiceInfo,
'multichoice_add': MultiChoiceInfo,
'selecttable': MultiChoiceInfo,
'selecttable_add': MultiChoiceInfo,
'error': ErrorInfo,
'check': CheckInfo,
'check_tristate': CheckInfo,
'table': TableInfo
}
if field.element in map_elements:
return map_elements[field.element](field, display)
return None
def show(self):
self.display.print_info(self.label, self.value)
class ValueInfo(InformationElement):
def __init__(self, field, display):
super().__init__(field, display)
self.value = field.value or ''
self.label = field.label
class CheckInfo(InformationElement):
def __init__(self, field, display):
super().__init__(field, display)
self.label = field.label
map_answer = {'on': _('yes'), 'off': _("no"), 'auto': _('auto')}
self.value = map_answer.get(field.value, field.value)
class ChoiceInfo(InformationElement):
def __init__(self, field, display):
super().__init__(field, display)
self.label = field.label or ''
if field.choice and field.comments:
map_comment = dict(zip(field.choice, field.comments))
self.value = map_comment.get(field.value, field.value) or ''
else:
self.value = field.value if field.value else ''
class MultiChoiceInfo(InformationElement):
def __init__(self, field, display):
super().__init__(field, display)
self.label = field.label or ''
if field.listvalue:
value = field.listvalue
# удалить пустой первый элемент (особенности wsdl)
if value and not value[0]:
value.pop(0)
if field.choice and field.comments:
map_comment = dict(zip(field.choice, field.comments))
else:
map_comment = {}
self.value = ", ".join([map_comment.get(x, x) or '' for x in value])
else:
self.value = field.value or ""
class ErrorInfo(InformationElement):
def __init__(self, field, display):
super().__init__(field, display)
self.label = field.label
def show(self):
self.display.print_error(self.label)
class TableInfo(InformationElement):
"""
Табличная информация
"""
def map_row(self, row, typedata):
map_answer = {'on': _('yes'), 'off': _("no"), 'auto': _('auto')}
for cell, typefield in zip(row, typedata):
if typefield in ['check', 'check_tristate']:
yield map_answer.get(cell, cell) or ""
elif "password" in typefield:
yield "***"
else:
yield cell or ""
def __init__(self, field, display):
super().__init__(field, display)
self.label = field.label
self.head = field.tablevalue.head
# удаление первого элемента строки (для wsdl)
body = [x[1:] if x and not x[0] else x for x in field.tablevalue.body]
if not [x for x in [x for x in body] if x]:
self.body = None
else:
type_values = [x.typefield for x in field.tablevalue.values]
self.body = [list(self.map_row(x, type_values)) for x in body]
def show(self):
if self.body:
self.display.print_label(self.label)
self.display.print_table(self.body, self.head)
def print_brief_group(Fields, group_name):
display = Display()
show_group = True
try:
for element in (x for x
in (InformationElement.from_field(FieldAdapter.from_detect(y),display) for y
in Fields
if not y.uncompatible)
if x):
if show_group:
display.print_group(group_name)
show_group = False
element.show()
except Exception:
import traceback
traceback.print_exc()
raise
class Methods(LocalCall.Common, object):
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(
cls, *args, **kwargs)
return cls._instance
def __init__(self):
LocalCall.Common.__init__(self, False, False, False, False)
def has_force_arg(args):
"""
Содержат ли аргумент force. Предварительное определение, так как на 100%
невозможно определить является ли -sf двумя опциями -s,-f или это одна
опция -s со значением "f"
:param args:
:return:
"""
force_parser = argparse.ArgumentParser(add_help=False)
force_parser.add_argument(
'-f', '--force', default=False, dest='force', action="store_true")
_args, _drop = force_parser.parse_known_args(args)
if _args.force:
return True
re_force = re.compile("^--force|-[a-zA-Z0-9]*f[a-zA-Z0-9]*$")
for arg in args:
if re_force.search(arg):
return True
else:
return False