# -*- coding: utf-8 -*- # Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys import os import re from calculate.core.datavars import DataVarsCore from calculate.lib.utils.files import readFile _ = lambda x: x from calculate.lib.cl_lang import setLocalTranslate setLocalTranslate('cl_core3', sys.modules[__name__]) class VerifyError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) # check recall of server certificate def verify(server_cert, crl_path, flag): import OpenSSL certobj = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, server_cert) serverSerial = certobj.get_serial_number() Issuer = certobj.get_issuer().get_components() CN, L = None, None for i in Issuer: if i[0] == 'CN': CN = i[1] elif i[0] == 'L': L = i[1] if CN and len(CN) > 2: crl_file = crl_path + CN elif L: try: host = L.split(':')[0] except Exception: if not flag: print(_('The CN and L fields in the CA certificate are ' 'incorrect!')) return 0 crl_file = crl_path + host else: if not flag: print(_('The CN and L fields in the CA certificate are incorrect!')) return 0 if not os.path.exists(crl_file): if not flag: print(_("This certificate cannot be verified in the CRL.")) return 0 with open(crl_file, 'r') as _crl_file: crl = "".join(_crl_file.readlines()) if crl == '': return 0 crl_object = OpenSSL.crypto.load_crl(OpenSSL.crypto.FILETYPE_PEM, crl) revoked_objects = crl_object.get_revoked() for rvk in revoked_objects: if serverSerial == int(rvk.get_serial(), 16): print(_("This certificate has been revoked!")) print (_("Serial") + ': %s\n' % rvk.get_serial() + _("Revoke date") + _(': %s') % rvk.get_rev_date()) raise VerifyError('CRL Exception') return 0 def get_CRL(path_to_cert): """ get new CRL (Certificate Revocation List) from all CA """ # local CRL CRL_path = path_to_cert + 'ca/crl/' if not os.path.exists(CRL_path): if not os.path.exists(path_to_cert + '/ca'): if not os.path.exists(path_to_cert): try: os.makedirs(path_to_cert) except OSError: print(_("Error creating directory %s") % path_to_cert) sys.exit() try: os.makedirs(path_to_cert + '/ca') except OSError: print(_("Error creating directory %s") % (path_to_cert + '/ca')) sys.exit() os.makedirs(CRL_path) clVars = DataVarsCore() clVars.importCore() clVars.flIniFile() # user and system ca and root certificates user_root_cert = clVars.Get('cl_user_root_cert') homePath = clVars.Get('ur_home_path') user_root_cert = user_root_cert.replace("~", homePath) glob_root_cert = clVars.Get('cl_glob_root_cert') if os.path.exists(user_root_cert): user_ca_certs = readFile(user_root_cert) else: user_ca_certs = '' if os.path.exists(glob_root_cert): glob_ca_certs = readFile(glob_root_cert) else: glob_ca_certs = '' # get certificates list fron text p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?') user_ca_certs_list = p.findall(user_ca_certs) glob_ca_certs_list = p.findall(glob_ca_certs) # association in one list all_ca_certs_list = user_ca_certs_list + glob_ca_certs_list import OpenSSL for ca in all_ca_certs_list: certobj = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, ca) # get url from certificates url = None CN = None Subject = certobj.get_subject().get_components() subj = None for subj in Subject: if subj[0] == 'L': url = "https://" + subj[1] + "/?wsdl" if subj[0] == 'CN': CN = subj[1] if subj and url: from calculate.core.client.client_class import Client_suds from .client_class import HTTPSClientCertTransport # connect to ca server (url get from certificates) try: client = Client_suds(url, transport=HTTPSClientCertTransport( None, None, path_to_cert)) client.set_parameters(path_to_cert, None, None) new_crl = client.service.get_crl() if new_crl: if CN and len(CN) > 2: CRL_file = CRL_path + CN else: host = subj[1].split(':')[0] CRL_file = CRL_path + host if new_crl == ' ': open(CRL_file, 'w').close() # if os.path.exists(CRL_file): # os.unlink(CRL_file) continue if os.path.exists(CRL_file): if readFile(CRL_file) == new_crl: continue with open(CRL_file, 'w') as fd: fd.write(new_crl) print(_("CRL added")) except VerifyError as e: print(e.value) # rm_ca_from_trusted(ca) sys.exit() except Exception: pass find_ca_in_crl(CRL_path, all_ca_certs_list) def find_ca_in_crl(CRL_path, all_ca_certs_list): import OpenSSL for ca in all_ca_certs_list: certobj = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, ca) Issuer = certobj.get_issuer().get_components() for item in Issuer: if item[0] == 'CN': CN = item[1] break else: continue server_serial = certobj.get_serial_number() CRL = CRL_path + CN if not os.path.exists(CRL): continue with open(CRL, 'r') as _crl_file: crl = "".join(_crl_file.readlines()) try: crl_object = OpenSSL.crypto.load_crl(OpenSSL.crypto.FILETYPE_PEM, crl) except OpenSSL.SSL.Error: continue revoked_objects = crl_object.get_revoked() for rvk in revoked_objects: if server_serial == int(rvk.get_serial(), 16): rm_ca_from_trusted(ca) def rm_ca_from_trusted(ca_cert): clVars = DataVarsCore() clVars.importCore() clVars.flIniFile() user_ca_dir = clVars.Get('cl_client_cert_dir') homePath = clVars.Get('ur_home_path') user_ca_dir = user_ca_dir.replace("~", homePath) user_ca_dir = os.path.join(user_ca_dir, 'ca') user_ca_list = os.path.join(user_ca_dir, 'cert_list') user_ca_db = clVars.Get('cl_user_root_cert') homePath = clVars.Get('ur_home_path') user_ca_db = user_ca_db.replace("~", homePath) system_ca_dir = clVars.Get('cl_core_cert_path') system_ca_db = clVars.Get('cl_glob_root_cert') import hashlib md5 = hashlib.md5() md5.update(ca_cert.encode("UTF-8")) md5sum = md5.hexdigest() # search ca certificate in user ca list newfile = '' with open(user_ca_list) as fd: t = fd.read() # See each line for line in t.splitlines(): newfile = '' # and each word in line words = line.split() if words[0] == md5sum: filename = os.path.join(user_ca_dir, words[1]) if ca_cert == readFile(filename): os.unlink(filename) else: newfile += (line + '\n') else: newfile += (line + '\n') fd.close() fn = open(user_ca_list, 'w') fn.write(newfile) fn.close() p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?') # open, write and split user ca certificates user_ca_certs = readFile(user_ca_db) user_ca_certs_list = p.findall(user_ca_certs) if ca_cert in user_ca_certs_list: new_user_ca_certs = [] for cert in user_ca_certs_list: if ca_cert != cert: new_user_ca_certs.append(cert) else: print(_("CA certificate deleted from the user's trusted " "certificates list")) with open(user_ca_db, 'w') as fd: for cert in new_user_ca_certs: fd.write(cert) if not os.path.exists(system_ca_db): open(system_ca_db, 'w').close() system_ca_certs = readFile(system_ca_db) system_ca_certs_list = p.findall(system_ca_certs) if ca_cert in system_ca_certs_list: new_system_ca_certs = [] for cert in system_ca_certs_list: if ca_cert != cert: new_system_ca_certs.append(cert) else: print(_('CA certificate deleted from the system trusted ' 'certificates list')) fd = open(system_ca_db, 'w') for cert in new_system_ca_certs: fd.write(cert) fd.close() return 0