py3 datavars changes

master
idziubenko 3 years ago
parent 940b125820
commit 2508450598

@ -41,7 +41,7 @@ def display_variables(variables):
dv.importVariables()
dv.flIniFile()
re_index = re.compile("^([\w.]+)(?:\[(-?\d+)\])?$")
re_index = re.compile(r"^([\w.]+)(?:\[(-?\d+)\])?$")
remove_quotes = lambda x: x if x != "''" else ""
for variable in variables:
try:

@ -280,7 +280,7 @@ class Backup(MethodsInterface):
"""
source_etc_dn = path.join(source_dn, subdn)
root_dn = path.join(target_dn, root_name)
reCfg = re.compile('._cfg\d{4}_')
reCfg = re.compile(r'._cfg\d{4}_')
try:
for fn in find(source_etc_dn, filetype=FindFileType.RegularFile):
if (not reCfg.search(fn) and

@ -132,7 +132,7 @@ def get_CRL(path_to_cert):
glob_ca_certs = ''
# get certificates list fron text
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?')
p = re.compile(r'[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?')
user_ca_certs_list = p.findall(user_ca_certs)
glob_ca_certs_list = p.findall(glob_ca_certs)
@ -270,7 +270,7 @@ def rm_ca_from_trusted(ca_cert):
fn.write(newfile)
fn.close()
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?')
p = re.compile(r'[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?')
# open, write and split user ca certificates
user_ca_certs = readFile(user_ca_db)

@ -68,7 +68,7 @@ class Admins(MutableMapping):
Проверить: выдавался ли сертификат указанному пользователю
"""
certdata = readFileEx(self.cert_database, grab=True)
return bool(re.search("^\S+\s+(?:\S+\s+){6}%s\s*$" % user,
return bool(re.search(r"^\S+\s+(?:\S+\s+){6}%s\s*$" % user,
certdata, flags=re.M))
def __setitem__(self, item, value):

@ -167,7 +167,6 @@ class Table(DataVarsSerializer):
fields=None, onClick=None, addAction=None, step=None,
records=None):
super(Table, self).__init__()
# print("DEBUG Table init")
if dv:
self.head = []
@ -281,7 +280,6 @@ class Field(DataVarsSerializer):
self.default = not varObj.wasSet
self.choice, self.comments = self.getChoice(varObj)
if self.type == "table":
# print("DEBUG_table creation")
self.tablevalue = Table(dv=dv, briefmode=briefmode,
varObj=varObj)
@ -425,9 +423,7 @@ class ViewInfo(DataVarsSerializer):
# interate all vars group
if step in (0, -1, None) or allsteps:
briefData = datavars.getBrief()
# print("DEBUG viewInfo")
# print("var groups: %s " % bool(varGroups))
# print(datavars)
self.groups.append(self.stepGroup(varGroups, brief_label,
help_value=briefData.get(
"help", None),
@ -645,8 +641,6 @@ class CoreWsdl(CoreServiceInterface):
Метод смены локализации интерфейса на лету (во время выбора
параметров метода)
"""
# print("DEBUG fixInstallLocalization")
# print(dv)
if "--start" not in sys.argv:
return False
import threading

@ -285,13 +285,13 @@ def clear_localuser_certificates(certbase):
certdn = os.path.dirname(certbase)
# оставляем только сертификаты, которые не содержат отметки
# для какого локального пользователя они созданы
writedata = "\n".join(x[0] for x in re.findall("^((\S+\s+){6}\S+)\s*$",
writedata = "\n".join(x[0] for x in re.findall(r"^((\S+\s+){6}\S+)\s*$",
certdata, flags=re.M))
with writeFile(certbase) as f:
f.write("%s\n"%writedata)
# удаляем физически сертификаты, созданные для локальных пользователей
for localcert in re.finditer("^(\S+)\s+(\S+\s+){6}\S+\s*$",
for localcert in re.finditer(r"^(\S+)\s+(\S+\s+){6}\S+\s*$",
certdata, flags=re.M):
cert_fn = "%s/%s.crt"%(certdn, localcert.group(1))
try:

@ -443,8 +443,8 @@ class Action(MethodsInterface):
return lambda dv: [dv.Get(x) if x not in (True, False) else x
for x in varnames]
reMethod = re.compile("^([A-Za-z]+)\.([A-Za-z0-9_]+)\(([^)]*)\)$")
reMessageVars = re.compile("\{([^}]+)\}")
reMethod = re.compile(r"^([A-Za-z]+)\.([A-Za-z0-9_]+)\(([^)]*)\)$")
reMessageVars = re.compile(r"\{([^}]+)\}")
def parseMethod(self, objs, dv, s, task):
"""
@ -745,7 +745,7 @@ class Action(MethodsInterface):
hideout = action.get("hideout", False)
cmdParam = map(lambda x: x.strip('"\''),
re.findall(
'["\'][^"\']+["\']|\S+',
r'["\'][^"\']+["\']|\S+',
action["command"]))
cmd = processProgress(*cmdParam)
for line in cmd.progress():
@ -1163,7 +1163,7 @@ class CoreWsdl(CoreServiceInterface):
onlyShow='', default=None):
from .api_types import Message
re_clean = re.compile('\[(?:\d+;)?\d+m')
re_clean = re.compile(r'\[(?:\d+;)?\d+m')
messageObj = Message(
type=type,
message=(
@ -1484,8 +1484,7 @@ class CoreWsdl(CoreServiceInterface):
@staticmethod
def get_lang(cls, sid, method_name=""):
""" get clients lang """
# print("-------------------------------------")
# print("DEBUG get_lang")
lang = None
SIDS_DIR = cls.sids
@ -1497,16 +1496,8 @@ class CoreWsdl(CoreServiceInterface):
with open(sid_file, 'r') as fd:
try:
# print("file exists: %s " % sid_file)
# print("sid: %s" % sid)
list_sid = pickle.load(fd)
list_sid_sid = int(list_sid[0])
# print(list_sid)
# print(list_sid_sid)
# print(sid)
# print(sid == list_sid_sid)
# print(type(list_sid_sid))
# print(type(sid))
if sid == list_sid_sid:
# print("SID FOUND")
lang = list_sid[3]
@ -1756,9 +1747,7 @@ class WsdlMeta(type):
"""
# @staticmethod
def wrapper(cls, sid, params):
# print("DEBUG view constructor")
dv = cls.get_cache(sid, kwargs["method_name"], "vars")
# print("got dv from cache: %s " % dv)
lang_changed = False
if kwargs["groups"]:
def group(*args, **kwargs):

@ -186,10 +186,8 @@ def local_method(metaObject, args, unknown_args):
"""
import os
# sym_link = os.path.basename(sys.argv[0])
#DEBUG - TODO remove this and uncomment thing above
sym_link = "cl-core"
sym_link = os.path.basename(sys.argv[0])
# sym_link = "cl-core"
if sym_link != 'cl-core':
if sym_link in LoadedMethods.conMethods.keys():
args.method = LoadedMethods.conMethods[sym_link][0]
@ -677,7 +675,7 @@ def has_force_arg(args):
if _args.force:
return True
re_force = re.compile("^--force|-[a-zA-Z0-9]*f[a-zA-Z0-9]*$")
re_force = re.compile(r"^--force|-[a-zA-Z0-9]*f[a-zA-Z0-9]*$")
for arg in args:
if re_force.search(arg):
return True

@ -44,8 +44,8 @@ class GotErrorField(Exception):
class BoolAction(argparse.Action):
reTrue = re.compile("^(?:on|yes)$", re.I)
reFalse = re.compile("^(?:off|no)$", re.I)
reTrue = re.compile(r"^(?:on|yes)$", re.I)
reFalse = re.compile(r"^(?:off|no)$", re.I)
available_values = ("on", "off","yes", "no")
def __init__(self, option_strings, dest, nargs="?",
@ -620,6 +620,7 @@ def collect_table(field, val_list, client, wait_thread=None,
return None
# if specified syntax for table row
if hasattr(field.opt, "syntax") and field.opt.syntax:
#TODO this could be buggy in py3, check
reArgs = re.compile(field.opt.syntax)
# check for correct sentense
for wrong in filterfalse(reArgs.search, val_list):

@ -71,7 +71,7 @@ def get_ca(cert_path):
if issuer_CN is None:
return '1'
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?')
p = re.compile(r'[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?')
ca_certs_list = p.findall(ca_certs)
for ca in ca_certs_list:
certobj = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, ca)

@ -88,7 +88,7 @@ class ClApplication(WsgiApplication):
""" check right client certificate for the method """
import OpenSSL
# rmethod = re.compile('[{\w]+[}]')
# rmethod = re.compile(r'[{\w]+[}]')
# method_rep = rmethod.findall(method_name)
# method_name = method_name.replace(method_rep[0], '')
import threading
@ -304,7 +304,7 @@ class ClApplication(WsgiApplication):
length = http_req_env.get("CONTENT_LENGTH")
input = http_req_env["wsgi.input"]
body = input.read(int(length))
res = re.search("<ns.:sid>(.*?)<\/ns.:sid>", body)
res = re.search(r"<ns.:sid>(.*?)<\/ns.:sid>", body)
#horrbile hack:
#cherrypy provides rfile in req_env which is consumed upon .read() without
# workarounds, and both we and spyne need the data on it

@ -72,7 +72,7 @@ class Variables(MethodsInterface):
"""
dv = self.clVars
removeQuotes = lambda x: x if x != "''" else ""
reIndex = re.compile("((?:\w+\.)?(\w+))(?:\[(\d+)\])")
reIndex = re.compile(r"((?:\w+\.)?(\w+))(?:\[(\d+)\])")
if showVal:
index = reIndex.search(showVal)
if index:

@ -28,8 +28,8 @@ get_pkgname_by_filename = templateFunction.get_pkgname_by_filename
class Cache(object):
reMerge = re.compile("(merge|mergepkg)\(([-\w/]*)(?:\[[^\]]\])?\)[-!=<>]")
rePatch = re.compile("^#\s*Calculate.*ac_install_patch==on")
reMerge = re.compile(r"(merge|mergepkg)\(([-\w/]*)(?:\[[^\]]\])?\)[-!=<>]")
rePatch = re.compile(r"^#\s*Calculate.*ac_install_patch==on")
PATCH_TYPE = "patch"
MERGE_TYPE = "merge"

@ -29,9 +29,9 @@ setLocalTranslate('cl_core3', sys.modules[__name__])
class VariableClListCertId(ReadonlyVariable):
def get(self):
cert_dir = self.Get('cl_core_client_certs_path')
return map(lambda x: x[:-4],
return list(map(lambda x: x[:-4],
filter(lambda x: x.endswith('.crt'),
listDirectory(cert_dir)))
listDirectory(cert_dir))))
class VariableClCertId(Variable):

@ -56,7 +56,7 @@ class VariableClCoreGroup(Variable):
def check(self, group):
if not group:
raise VariableError(_("Group name is a required parameter"))
name_re = re.compile("^[a-zA-Z_0-9]{2,20}$")
name_re = re.compile(r"^[a-zA-Z_0-9]{2,20}$")
if not name_re.findall(group):
raise VariableError(
_('The group name may only contain words, '

@ -155,7 +155,6 @@ class VariableClReqBaseData(Variable):
def get(self):
req_id = self.Get('cl_req_id')
certbase = self.Get('cl_core_database')
for line in readLinesFile(certbase):
data = line.strip().split()
if len(data) < 6:

@ -61,7 +61,7 @@ class VariableClCorePkgName(Variable):
set(chain(*map(lambda x: map(
lambda y: (y[0].rpartition('-')[0]
if y[2].startswith('r') else y[0]),
imap(lambda y: y.rpartition('-'),
map(lambda y: y.rpartition('-'),
listDirectory(x))),
listDirectory('/var/db/pkg', onlyDir=True,
fullPath=True)))))
@ -267,8 +267,8 @@ class VariableClCorePkgCategory(Variable):
if pkgname not in self.pkgCategories:
self.pkgCategories[pkgname] = \
glob.glob('/usr/portage/*/%s' % pkgname)
return map(lambda x: path.split(path.split(x)[0])[1],
self.pkgCategories[pkgname])
return list(map(lambda x: path.split(path.split(x)[0])[1],
self.pkgCategories[pkgname]))
def get(self):
category = isPkgInstalled(self.Get('cl_core_pkg_name'))

@ -62,7 +62,7 @@ class VariableClVariableData(TableVariable):
def fix_cmdline_params(fullname, location, v):
if location.startswith("="):
if re.match("^.*?:[a-z]+$", location):
if re.match(r"^.*?:[a-z]+$", location):
location = location.rpartition(':')[2]
else:
location = "system"
@ -73,7 +73,7 @@ class VariableClVariableData(TableVariable):
prevval = self.Get(fullname)
typevar = self.parent.getInfo(fullname).type
val = self.parent.unserialize(typevar, v)
prevval.extend(filter(lambda x: x not in prevval, val))
prevval.extend(list(filter(lambda x: x not in prevval, val)))
else:
return fullname, location, v
@ -162,8 +162,8 @@ class VariableClVariableType(VarHelper, ReadonlyVariable):
return "%s%s" % (varobj.mode, varobj.getCharType())
def get(self):
return map(self.fill_type,
self.Get('cl_variable_fullname'))
return list(map(self.fill_type,
self.Get('cl_variable_fullname')))
class VariableClVariableValue(VarHelper, Variable):
@ -203,8 +203,8 @@ class VariableClVariableValue(VarHelper, Variable):
return ""
def get(self):
return map(self.fill_var,
self.Get('cl_variable_fullname'))
return list(map(self.fill_var,
self.Get('cl_variable_fullname')))
def check(self, value):
checklist = []
@ -238,15 +238,15 @@ class VariableClVariableLocation(VarHelper, Variable):
self.label = _("Location")
def choice(self):
return [("", "Default")] + map(lambda x: (x, x),
self.Get('cl_env_location'))
return [("", "Default")] + list(map(lambda x: (x, x),
self.Get('cl_env_location')))
def fill_write(self, var):
return self.parent.isFromIni(var)
def get(self):
return map(lambda x: self.parent.isFromIni(x),
self.Get('cl_variable_fullname'))
return list(map(lambda x: self.parent.isFromIni(x),
self.Get('cl_variable_fullname')))
class VariableClVariableFilter(VarHelper, Variable):
@ -294,7 +294,7 @@ class VariableClVariableShow(VarHelper, Variable):
self.help = _("show the variable value")
def raiseWrongChoice(self, name, choice_val, val, error):
re_index = re.compile("((?:\w+\.)?(\w+))(?:\[(\d+)\])?")
re_index = re.compile(r"((?:\w+\.)?(\w+))(?:\[(\d+)\])?")
varname = re_index.search(val)
if varname and not varname.group(1) in choice_val:
raise VariableError(_("Variable %s not found") % val)

Loading…
Cancel
Save