#-*- coding: utf-8 -*- # Copyright 2012 Calculate Ltd. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os, re, glob, sys import OpenSSL from calculate.core.datavars import DataVarsCore from calculate.lib.cl_lang import setLocalTranslate setLocalTranslate('cl_console3',sys.modules[__name__]) class VerifyError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) # check recall of server certificate def verify(server_cert, crl_path, flag): certobj = OpenSSL.crypto.load_certificate \ (OpenSSL.SSL.FILETYPE_PEM, server_cert) serverSerial = certobj.get_serial_number() Issuer = certobj.get_issuer().get_components() CN, L = None, None for i in Issuer: if i[0] == 'CN': CN = i[1] elif i[0] == 'L': L = i[1] if CN and len(CN) > 2: crl_file = crl_path + CN elif L: try: host = L.split(':')[0] except: if not flag: print _("fields CN and L in the CA certificate are incorrect!") return 0 crl_file = crl_path + host else: if not flag: print _( "fields CN and L in the CA certificate are incorrect!") return 0 if not os.path.exists(crl_file): if not flag: pass # print _("This certificate can not be verified in the CRL.") return 0 with open(crl_file, 'r') as _crl_file: crl = "".join(_crl_file.readlines()) if crl == '': return 0 crl_object = OpenSSL.crypto.load_crl(OpenSSL.crypto.FILETYPE_PEM, crl) revoked_objects = crl_object.get_revoked() for rvk in revoked_objects: if serverSerial == int(rvk.get_serial(), 16): print _("This certificate has been revoked!") print _("Serial")+ ': %s\n' %rvk.get_serial() + _("Revoke date") + \ ': %s' %rvk.get_rev_date() raise VerifyError('CRL Exception') return 0 def get_CRL(path_to_cert): print 'update CRL' """ get new CRL (Certificate Revocation List) from all CA """ # local CRL CRL_path = os.path.join(path_to_cert, 'ca/crl/') if not os.path.exists(CRL_path): if not os.path.exists(os.path.join(path_to_cert, 'ca')): if not os.path.exists(path_to_cert): try: os.makedirs(path_to_cert) except OSError: print _("Failed to create directory %s") %path_to_cert raise Exception(1) try: os.makedirs(os.path.join(path_to_cert, 'ca')) except OSError: print _("Failed to create directory %s") \ %(os.path.join(path_to_cert, 'ca')) raise Exception(1) os.makedirs(CRL_path) clVars = DataVarsCore() clVars.importCore() clVars.flIniFile() # user and system ca and root certificates user_root_cert = clVars.Get('core.cl_user_root_cert') homePath = clVars.Get('ur_home_path') user_root_cert = user_root_cert.replace("~",homePath) glob_root_cert = clVars.Get('core.cl_glob_root_cert') if os.path.exists(user_root_cert): user_ca_certs = open(user_root_cert, 'r').read() else: user_ca_certs = '' if os.path.exists(glob_root_cert): glob_ca_certs = open(glob_root_cert, 'r').read() else: glob_ca_certs = '' # get certificates list fron text p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?') user_ca_certs_list = p.findall(user_ca_certs) glob_ca_certs_list = p.findall(glob_ca_certs) # association in one list all_ca_certs_list = user_ca_certs_list + glob_ca_certs_list for ca in all_ca_certs_list: certobj = OpenSSL.crypto.load_certificate \ (OpenSSL.SSL.FILETYPE_PEM, ca) # get url from certificates url = None CN = None Subject = certobj.get_subject().get_components() for subj in Subject: if subj[0] == 'L': url = "https://" + subj[1] +"/?wsdl" if subj[0] == 'CN': CN = subj[1] if url: from client_class import Client_suds from client_class import HTTPSClientCertTransport # connect to ca server (url get from certificates) try: client = Client_suds(url,\ transport = HTTPSClientCertTransport(None, None, \ path_to_cert)) client.set_parameters (path_to_cert, None, None) new_crl = client.service.get_crl() except VerifyError, e: _print (e.value) #rm_ca_from_trusted(ca) raise Exception(1) except: pass client.wsdl.services[0].setlocation(url) if 'new_crl' in locals(): if new_crl: if CN and len(CN) > 2: CRL_file = CRL_path + CN else: host = subj[1].split(':')[0] CRL_file = CRL_path + host if new_crl == ' ': open(CRL_file, 'w') #if os.path.exists(CRL_file): #os.unlink(CRL_file) continue if os.path.exists(CRL_file): if open(CRL_file, 'r').read() == new_crl: continue fd = open(CRL_file, 'w') fd.write(new_crl) fd.close() print _("CRL added") find_ca_in_crl (CRL_path, all_ca_certs_list) def find_ca_in_crl (CRL_path, all_ca_certs_list): CRL_name_list = glob.glob(CRL_path + '*') for ca in all_ca_certs_list: certobj = OpenSSL.crypto.load_certificate \ (OpenSSL.SSL.FILETYPE_PEM, ca) Issuer = certobj.get_issuer().get_components() for item in Issuer: if item[0] == 'CN': CN = item[1] serverSerial = certobj.get_serial_number() CRL = CRL_path + CN if not os.path.exists(CRL): continue with open(CRL, 'r') as _crl_file: crl = "".join(_crl_file.readlines()) try: crl_object = OpenSSL.crypto.load_crl \ (OpenSSL.crypto.FILETYPE_PEM, crl) except: continue revoked_objects = crl_object.get_revoked() for rvk in revoked_objects: if serverSerial == int(rvk.get_serial(), 16): rm_ca_from_trusted(ca) def rm_ca_from_trusted(ca_cert): clVars = DataVarsCore() clVars.importCore() clVars.flIniFile() user_ca_dir = clVars.Get('core.cl_client_cert_dir') homePath = clVars.Get('ur_home_path') user_ca_dir = user_ca_dir.replace("~",homePath) user_ca_dir = os.path.join(user_ca_dir, 'ca') user_ca_list = os.path.join(user_ca_dir, 'cert_list') user_ca_db = clVars.Get('core.cl_user_root_cert') homePath = clVars.Get('ur_home_path') user_ca_db = user_ca_db.replace("~",homePath) system_ca_dir = clVars.Get('core.cl_core_cert_path') system_ca_list = os.path.join(system_ca_dir, 'cert_list') system_ca_db = clVars.Get('core.cl_glob_root_cert') import hashlib md5 = hashlib.md5() md5.update(ca_cert) md5sum = md5.hexdigest() # search ca certificate in user ca list with open(user_ca_list) as fd: t = fd.read() # See each line for line in t.splitlines(): newfile = '' # and each word in line words = line.split() if words[0] == md5sum: filename = os.path.join(user_ca_dir, words[1]) if ca_cert == open(filename, 'r').read(): try: os.unlink(filename) except OSError, e: _print (e.message) else: newfile += (line + '\n') else: newfile += (line + '\n') fd.close() fn = open(user_ca_list, 'w') fn.write(newfile) fn.close() p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?') # open, write and split user ca certificates user_ca_certs = open(user_ca_db, 'r').read() user_ca_certs_list = p.findall(user_ca_certs) if ca_cert in user_ca_certs_list: new_user_ca_certs = [] for cert in user_ca_certs_list: if ca_cert != cert: new_user_ca_certs.append(cert) else: print _("CA certificate deleted from the list of user " \ "trusted certificates") fd = open(user_ca_db, 'w') for cert in new_user_ca_certs: fd.write(cert) fd.close() if not os.path.exists(system_ca_db): open(system_ca_db, 'w') system_ca_certs = open(system_ca_db, 'r').read() system_ca_certs_list = p.findall(system_ca_certs) if ca_cert in system_ca_certs_list: new_system_ca_certs = [] for cert in system_ca_certs_list: if ca_cert != cert: new_system_ca_certs.append(cert) else: print _("CA certificate deleted from the list of system " \ "trusted certificates") fd = open(system_ca_db, 'w') for cert in new_system_ca_certs: fd.write(cert) fd.close() return 0