|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
# Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org
|
|
|
#
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
# You may obtain a copy of the License at
|
|
|
#
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
#
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
|
|
|
|
import logging
|
|
|
import sys
|
|
|
from os import path
|
|
|
import calculate.contrib
|
|
|
from suds.transport import TransportError
|
|
|
from suds import WebFault
|
|
|
from .client_class import Client_suds
|
|
|
import urllib.request as urllib2
|
|
|
|
|
|
from calculate.core.datavars import DataVarsCore
|
|
|
from .client_class import HTTPSClientCertTransport
|
|
|
from calculate.lib.utils.dbus_tools import get_dbus_hostname
|
|
|
from functools import partial
|
|
|
from M2Crypto import X509
|
|
|
from .utils import clear
|
|
|
|
|
|
_ = lambda x: x
|
|
|
from calculate.lib.cl_lang import setLocalTranslate
|
|
|
setLocalTranslate('cl_core3', sys.modules[__name__])
|
|
|
|
|
|
def get_cert_groups(cert_file):
|
|
|
"""
|
|
|
Получить строку с группами сертификата
|
|
|
:param cert_file: файл сертификата
|
|
|
"""
|
|
|
try:
|
|
|
cert = X509.load_cert(cert_file)
|
|
|
val = cert.get_ext('nsComment').get_value()
|
|
|
if val.startswith('group:'):
|
|
|
return val[6:]
|
|
|
return ""
|
|
|
except (IOError, LookupError):
|
|
|
return ""
|
|
|
|
|
|
|
|
|
def get_certifactions_for_host(host, use_dbus=False):
|
|
|
"""
|
|
|
Получить список сертификатов подходящих для host
|
|
|
:param host:
|
|
|
:return: список полных путей файлов сертификатов
|
|
|
"""
|
|
|
log = logging.getLogger("certification")
|
|
|
cl_vars_core = DataVarsCore()
|
|
|
cl_vars_core.importCore()
|
|
|
cl_vars_core.flIniFile()
|
|
|
|
|
|
home_path = cl_vars_core.Get('ur_home_path')
|
|
|
|
|
|
port = cl_vars_core.GetInteger('core.cl_core_port')
|
|
|
path_to_cert = cl_vars_core.Get('core.cl_client_cert_dir')
|
|
|
path_to_cert = path_to_cert.replace("~", home_path)
|
|
|
|
|
|
url = "https://%s:%d/?wsdl" % (host, port)
|
|
|
|
|
|
clear()
|
|
|
if not use_dbus:
|
|
|
try:
|
|
|
client = Client_suds(url, transport=HTTPSClientCertTransport(
|
|
|
None, None, path_to_cert, None))
|
|
|
client.wsdl.services[0].setlocation(url)
|
|
|
server_host_name = client.service.get_server_host_name()
|
|
|
del client
|
|
|
except (urllib2.URLError, TransportError) as e:
|
|
|
log.debug(_('Failed to connect', ) + _(': ') + str(e))
|
|
|
return []
|
|
|
except KeyboardInterrupt:
|
|
|
log.debug(_("Manually interrupted"))
|
|
|
return []
|
|
|
except Exception as e:
|
|
|
log.debug(e)
|
|
|
else:
|
|
|
server_host_name = get_dbus_hostname() or "localhost"
|
|
|
try:
|
|
|
import glob
|
|
|
|
|
|
all_cert_list = glob.glob(path.join(path_to_cert, '*.crt'))
|
|
|
fit_cert_list = []
|
|
|
for client_cert_path in all_cert_list:
|
|
|
client_cert = client_cert_path.replace(path_to_cert, '')
|
|
|
client_cert_name = client_cert.replace('.crt', '')
|
|
|
if server_host_name.endswith(client_cert_name):
|
|
|
fit_cert_list.append(client_cert_name)
|
|
|
fit_cert_list.sort(key=len)
|
|
|
mkfullpath = partial(path.join, path_to_cert)
|
|
|
return [mkfullpath(x) for x in ["%s.crt" % y for y in fit_cert_list]]
|
|
|
# ----------------------------------------------------
|
|
|
except WebFault as f:
|
|
|
log.debug(_("Exception: %s") % f)
|
|
|
log.debug(f.fault)
|
|
|
except TransportError as te:
|
|
|
log.debug(_("Exception: %s") % te)
|
|
|
except KeyboardInterrupt:
|
|
|
log.debug(_("Manually interrupted"))
|
|
|
except Exception as e:
|
|
|
log.debug(e)
|
|
|
return []
|
|
|
|
|
|
|
|
|
def user_can_run_update(use_dbus):
|
|
|
"""
|
|
|
Текущий пользователь может запускать обновление системы
|
|
|
"""
|
|
|
for cert in get_certifactions_for_host("localhost", use_dbus=use_dbus):
|
|
|
groups = get_cert_groups(cert)
|
|
|
if any(x in groups for x in ("all", "update")):
|
|
|
return True
|
|
|
return False
|
|
|
|