# -*- coding: utf-8 -*- # Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import re import sys import OpenSSL from calculate.console.application.function import _print from calculate.core.datavars import DataVarsCore from calculate.lib.cl_lang import setLocalTranslate _ = lambda x: x setLocalTranslate('cl_console3', sys.modules[__name__]) class VerifyError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) # check recall of server certificate def verify(server_cert, crl_path, flag): certobj = OpenSSL.crypto.load_certificate( OpenSSL.SSL.FILETYPE_PEM, server_cert) serverSerial = certobj.get_serial_number() Issuer = certobj.get_issuer().get_components() CN, L = None, None for i in Issuer: if i[0] == 'CN': CN = i[1] elif i[0] == 'L': L = i[1] if CN and len(CN) > 2: crl_file = crl_path + CN elif L: try: host = L.split(':')[0] except: if not flag: print _("fields CN and L in the CA certificate are incorrect!") return 0 crl_file = crl_path + host else: if not flag: print _("fields CN and L in the CA certificate are incorrect!") return 0 if not os.path.exists(crl_file): if not flag: pass # print _("This certificate can not be verified in the CRL.") return 0 with open(crl_file, 'r') as _crl_file: crl = "".join(_crl_file.readlines()) if crl == '': return 0 crl_object = OpenSSL.crypto.load_crl(OpenSSL.crypto.FILETYPE_PEM, crl) revoked_objects = crl_object.get_revoked() for rvk in revoked_objects: if serverSerial == int(rvk.get_serial(), 16): print _("This certificate has been revoked!") print _("Serial") + _(': %s\n') % rvk.get_serial() + _( "Revoke date") + _(': %s') % rvk.get_rev_date() raise VerifyError('CRL Exception') return 0 def get_CRL(path_to_cert): print 'update CRL' """ get new CRL (Certificate Revocation List) from all CA """ # local CRL CRL_path = os.path.join(path_to_cert, 'ca/crl/') if not os.path.exists(CRL_path): if not os.path.exists(os.path.join(path_to_cert, 'ca')): if not os.path.exists(path_to_cert): try: os.makedirs(path_to_cert) except OSError: print _("Failed to create directory %s") % path_to_cert raise Exception(1) try: os.makedirs(os.path.join(path_to_cert, 'ca')) except OSError: print _("Failed to create directory %s") % ( os.path.join(path_to_cert, 'ca')) raise Exception(1) os.makedirs(CRL_path) clVars = DataVarsCore() clVars.importCore() clVars.flIniFile() # user and system ca and root certificates user_root_cert = clVars.Get('core.cl_user_root_cert') homePath = clVars.Get('ur_home_path') user_root_cert = user_root_cert.replace("~", homePath) glob_root_cert = clVars.Get('core.cl_glob_root_cert') if os.path.exists(user_root_cert): user_ca_certs = open(user_root_cert, 'r').read() else: user_ca_certs = '' if os.path.exists(glob_root_cert): glob_ca_certs = open(glob_root_cert, 'r').read() else: glob_ca_certs = '' # get certificates list fron text p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?') user_ca_certs_list = p.findall(user_ca_certs) glob_ca_certs_list = p.findall(glob_ca_certs) # association in one list all_ca_certs_list = user_ca_certs_list + glob_ca_certs_list for ca in all_ca_certs_list: certobj = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, ca) # get url from certificates url = None CN = None Subject = certobj.get_subject().get_components() last_subj = "" for subj in Subject: if subj[0] == 'L': url = "https://" + subj[1] + "/?wsdl" if subj[0] == 'CN': CN = subj[1] last_subj = subj if url: new_crl = None from client_class import Client_suds from client_class import HTTPSClientCertTransport # connect to ca server (url get from certificates) client = None try: client = Client_suds( url, transport=HTTPSClientCertTransport(None, None, path_to_cert)) client.set_parameters(path_to_cert, None, None) new_crl = client.service.get_crl() except VerifyError, e: _print(e.value) # rm_ca_from_trusted(ca) raise Exception(1) except: pass client.wsdl.services[0].setlocation(url) if new_crl: if CN and len(CN) > 2: CRL_file = CRL_path + CN else: host = last_subj[1].split(':')[0] CRL_file = CRL_path + host if new_crl == ' ': open(CRL_file, 'w') # if os.path.exists(CRL_file): # os.unlink(CRL_file) continue if os.path.exists(CRL_file): if open(CRL_file, 'r').read() == new_crl: continue fd = open(CRL_file, 'w') fd.write(new_crl) fd.close() print _("CRL added") find_ca_in_crl(CRL_path, all_ca_certs_list) def find_ca_in_crl(CRL_path, all_ca_certs_list): for ca in all_ca_certs_list: certobj = OpenSSL.crypto.load_certificate( OpenSSL.SSL.FILETYPE_PEM, ca) Issuer = certobj.get_issuer().get_components() CN = "" for item in Issuer: if item[0] == 'CN': CN = item[1] serverSerial = certobj.get_serial_number() CRL = CRL_path + CN if not os.path.exists(CRL): continue with open(CRL, 'r') as _crl_file: crl = "".join(_crl_file.readlines()) try: crl_object = OpenSSL.crypto.load_crl( OpenSSL.crypto.FILETYPE_PEM, crl) except: continue revoked_objects = crl_object.get_revoked() for rvk in revoked_objects: if serverSerial == int(rvk.get_serial(), 16): rm_ca_from_trusted(ca) def rm_ca_from_trusted(ca_cert): clVars = DataVarsCore() clVars.importCore() clVars.flIniFile() user_ca_dir = clVars.Get('core.cl_client_cert_dir') homePath = clVars.Get('ur_home_path') user_ca_dir = user_ca_dir.replace("~", homePath) user_ca_dir = os.path.join(user_ca_dir, 'ca') user_ca_list = os.path.join(user_ca_dir, 'cert_list') user_ca_db = clVars.Get('core.cl_user_root_cert') homePath = clVars.Get('ur_home_path') user_ca_db = user_ca_db.replace("~", homePath) system_ca_dir = clVars.Get('core.cl_core_cert_path') system_ca_list = os.path.join(system_ca_dir, 'cert_list') system_ca_db = clVars.Get('core.cl_glob_root_cert') import hashlib md5 = hashlib.md5() md5.update(ca_cert) md5sum = md5.hexdigest() # search ca certificate in user ca list newfile = '' with open(user_ca_list) as fd: t = fd.read() # See each line for line in t.splitlines(): newfile = '' # and each word in line words = line.split() if words[0] == md5sum: filename = os.path.join(user_ca_dir, words[1]) if ca_cert == open(filename, 'r').read(): try: os.unlink(filename) except OSError, e: _print(e.message) else: newfile += (line + '\n') else: newfile += (line + '\n') fd.close() fn = open(user_ca_list, 'w') fn.write(newfile) fn.close() p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?') # open, write and split user ca certificates user_ca_certs = open(user_ca_db, 'r').read() user_ca_certs_list = p.findall(user_ca_certs) if ca_cert in user_ca_certs_list: new_user_ca_certs = [] for cert in user_ca_certs_list: if ca_cert != cert: new_user_ca_certs.append(cert) else: print _("CA certificate deleted from the list of user " "trusted certificates") fd = open(user_ca_db, 'w') for cert in new_user_ca_certs: fd.write(cert) fd.close() if not os.path.exists(system_ca_db): open(system_ca_db, 'w') system_ca_certs = open(system_ca_db, 'r').read() system_ca_certs_list = p.findall(system_ca_certs) if ca_cert in system_ca_certs_list: new_system_ca_certs = [] for cert in system_ca_certs_list: if ca_cert != cert: new_system_ca_certs.append(cert) else: print _("CA certificate deleted from the list of system " "trusted certificates") fd = open(system_ca_db, 'w') for cert in new_system_ca_certs: fd.write(cert) fd.close() return 0