You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-console-gui/pym/consolegui/application/client_class.py

949 lines
36 KiB

#-*- coding: utf-8 -*-
# Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .. import qt
3 years ago
import urllib.request as u2
if hasattr(u2,"ssl"):
u2.ssl._create_default_https_context = u2.ssl._create_unverified_context
import os, re, sys
import calculate.contrib
from suds.transport.http import HttpTransport
3 years ago
from .pyopenssl_wrapper import PyOpenSSLSocket
3 years ago
import http.client as httplib
from suds.transport import Transport
from suds.properties import Unskin
3 years ago
from http.cookiejar import CookieJar, DefaultCookiePolicy
import socket, ssl
import OpenSSL, hashlib
from suds.client import Client
from logging import getLogger
# from calculate.core.client.cert_verify import verify, VerifyError
from calculate.lib.utils.files import readFile
3 years ago
from .more import show_msg, show_question, LabelWordWrap
flag = 0
log = getLogger(__name__)
# #TODO fix translation for verify
# _ = lambda x: x
# from calculate.lib.cl_lang import setLocalTranslate
# setLocalTranslate('cl_core3', sys.modules["__name__"])
from calculate.core.datavars import DataVarsCore
class VerifyError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
# check recall of server certificate
def verify(server_cert, crl_path, flag):
import OpenSSL
certobj = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM,
server_cert)
serverSerial = certobj.get_serial_number()
Issuer = certobj.get_issuer().get_components()
CN, L = None, None
for i in Issuer:
if i[0] == 'CN':
CN = i[1]
elif i[0] == 'L':
L = i[1]
if CN and len(CN) > 2:
crl_file = crl_path + CN
elif L:
try:
host = L.split(':')[0]
except Exception:
if not flag:
print(_('The CN and L fields in the CA certificate are '
'incorrect!'))
return 0
crl_file = crl_path + host
else:
if not flag:
print(_('The CN and L fields in the CA certificate are incorrect!'))
return 0
if not os.path.exists(crl_file):
if not flag:
print(_("This certificate cannot be verified in the CRL."))
return 0
with open(crl_file, 'r') as _crl_file:
crl = "".join(_crl_file.readlines())
if crl == '':
return 0
crl_object = OpenSSL.crypto.load_crl(OpenSSL.crypto.FILETYPE_PEM, crl)
revoked_objects = crl_object.get_revoked()
for rvk in revoked_objects:
if serverSerial == int(rvk.get_serial(), 16):
print(_("This certificate has been revoked!"))
print (_("Serial") + ': %s\n' % rvk.get_serial() +
_("Revoke date") + _(': %s') % rvk.get_rev_date())
raise VerifyError('CRL Exception')
return 0
class AddServerCert (qt.QDialog):
def __init__(self, parent, ClientObj, cert):
3 years ago
super().__init__()
self.ClientObj = ClientObj
self.parent = parent
self.cert = cert
self.grid = qt.QGridLayout(self)
self.lbl_list = []
self.grid.addWidget(LabelWordWrap(_('Untrusted Server Certificate!'), \
self), 0, 1, 1, 2)
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
self.grid.addWidget(LabelWordWrap (_('Fingerprint = %s') \
% certobj.digest('SHA1'), self), 1, 0, 1, 3)
self.grid.addWidget(LabelWordWrap (_('Serial Number = %s') \
% certobj.get_serial_number(), self), 2, 0, 1, 3)
self.tab = qt.QTabWidget(self)
# add Issuer tab
self.issuer_wgt = qt.QWidget(self)
self.issuer_layout = qt.QVBoxLayout()
Issuer = certobj.get_issuer().get_components()
for i in Issuer:
12 years ago
self.issuer_layout.addWidget(LabelWordWrap \
("%s : %s" %(i[0], i[1]),self))
self.issuer_wgt.setLayout(self.issuer_layout)
self.tab.addTab(self.issuer_wgt, _('Issuer'))
# add Subject tab
self.subject_wgt = qt.QWidget(self)
self.subject_layout = qt.QVBoxLayout()
Subject = certobj.get_subject().get_components()
for item in Subject:
12 years ago
self.subject_layout.addWidget(LabelWordWrap \
("%s : %s" %(item[0], item[1]),self))
self.subject_wgt.setLayout(self.subject_layout)
self.tab.addTab(self.subject_wgt, _('Subject'))
# add certificate
# self.cert_lbl = LabelWordWrap (certobj,self)
# self.tab.addTab(self.cert_lbl, 'Certificate')
self.grid.addWidget(self.tab, 3, 0, 3, 3)
self.lbl_list.append(LabelWordWrap \
12 years ago
(_("Add this server certificate to trusted or ") +\
_('try adding the CA and root certificates to trusted?'),self))
self.pix_lbl = qt.QLabel(self)
pi = qt.QPixmap()
pi.load('/usr/share/icons/oxygen/48x48/status/security-medium.png')
self.pix_lbl.setPixmap(pi)
self.grid.addWidget(self.pix_lbl, 0,0)
for num_lbl in range(len(self.lbl_list)):
self.grid.addWidget(self.lbl_list[num_lbl], num_lbl + 8, 0, 1, 3)
x = len (self.lbl_list) + 8
self.server_but = qt.QPushButton(_('Add the server certificate'),
12 years ago
self)
self.server_but.clicked.connect(self.add_server)
self.server_but.clicked.connect(self.close)
self.grid.addWidget(self.server_but, x, 0)
self.ca_but = qt.QPushButton(_("Add the CA and root certificates"),
12 years ago
self)
self.ca_but.clicked.connect(self.add_ca)
self.ca_but.clicked.connect(self.add_server)
self.ca_but.clicked.connect(self.close)
self.grid.addWidget(self.ca_but, x, 1)
self.cancel_but = qt.QPushButton(_('Cancel'), self)
self.cancel_but.clicked.connect(self.close)
self.grid.addWidget(self.cancel_but, x, 2)
# self.setLayout(self.grid)
12 years ago
self.setWindowTitle (_('Add the certificate to trusted'))
# move to center
prim_screen = ClientObj.app.desktop().primaryScreen()
x = ClientObj.app.desktop().screenGeometry(prim_screen).width()/2 - \
self.sizeHint().width()/2
y = ClientObj.app.desktop().screenGeometry(prim_screen).height()/2 - \
self.sizeHint().height()/2
self.move(x, y)
self.setFixedSize(self.sizeHint())
self.setWindowModality(qt.Qt.WindowModal)
self.flag = 4
def add_server(self):
ca_certs = self.parent.trusted_path + "cert.list"
if not os.path.exists(ca_certs):
open(ca_certs,"w").close()
if self.parent.host == '127.0.0.1':
host = 'localhost'
else:
host = self.parent.host
filename = host
with open(self.parent.trusted_path + filename,"w") as fc:
fc.write(self.cert)
with open(ca_certs) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split()
if len(words) > 1:
# if first word...
if words[0] == host:
self.flag = 3
return 0
# Open file with compliance server certificates and server hostname
with open(ca_certs,"a") as fcl:
fcl.write(host + ' ' + filename + '\n')
12 years ago
show_msg (_('Server certificate added to trusted') +'\n'+ \
(self.parent.trusted_path + filename),_('Certificate added'))
self.flag = 3
3 years ago
from .conf_connection import FrameConnection
self.ConnectWidget = FrameConnection(self, self.ClientObj)
self.ConnectWidget.connect_to_host(host, self.ClientObj.port)
def add_ca(self):
cl_client_cert_dir = \
self.ClientObj.VarsApi.Get('core.cl_client_cert_dir')
12 years ago
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
root_cert_dir = cl_client_cert_dir + "/ca"
if not os.path.exists(root_cert_dir):
try:
os.makedirs(root_cert_dir)
except OSError:
12 years ago
show_msg (_('Error creating directory %s') %root_cert_dir)
return 1
self.parent.list_ca_certs = []
self.parent.add_ca_cert(self.cert, self.parent.list_ca_certs)
###############################################################################
class Client_suds(Client):
def set_parameters (self, path_to_cert, CERT_FILE, PKEY_FILE):
self.path_to_cert = path_to_cert
if not CERT_FILE:
CERT_FILE = ''
self.CERT_FILE = CERT_FILE
self.REQ_FILE = path_to_cert + 'client.csr'
self.PKEY_FILE = PKEY_FILE
self.SID_FILE = path_to_cert + 'sids'
self.SID_LOCK = path_to_cert + 'sids.lock'
self.CRL_PATH = path_to_cert + 'ca/crl/'
if not os.path.exists(self.CRL_PATH):
os.makedirs(self.CRL_PATH)
class SUDSHTTPRedirectHandler(u2.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl):
"""Return a Request or None in response to a redirect.
This is called by the http_error_30x methods,
it was taken from the original Python version and modified
to use POST when redirection takes place.
This allows a SOAP message to be redirected without a loss
of content.
"""
m = req.get_method()
if (code in (301, 302, 303, 307) and m in ("GET", "HEAD")
or code in (301, 302, 303) and m == "POST"):
newurl = newurl.replace(' ', '%20')
newheaders = dict((k,v) for k,v in req.headers.items()
if k.lower() not in ("content-length", "content-type")
)
log.debug("Redirecting to %s", newurl)
return u2.Request(newurl,
data=req.data, # here we pass the original data
headers=newheaders,
origin_req_host=req.get_origin_req_host(),
unverifiable=True,
)
else:
raise u2.HTTPError(req.get_full_url(), code, msg, headers, fp)
class CheckingClientHTTPSConnection(httplib.HTTPSConnection):
3 years ago
"""based on httplib.HTTPSConnection code"""
response_class = httplib.HTTPResponse
FORCE_SSL_VERSION = None
SERVER_CERT_CHECK = True # might be turned off when a workaround is needed
def __init__(self, ClientObj, cert_path, host, ca_certs=None,
cert_verifier=None, keyobj=None, certobj=None, **kw):
"""cert_verifier is a function returning either True or False
based on whether the certificate was found to be OK,
keyobj and certobj represent internal PyOpenSSL structures holding
the key and certificate respectively.
"""
httplib.HTTPSConnection.__init__(self, host, **kw)
self.ca_certs = ca_certs
self.cert_verifier = cert_verifier
self.keyobj = keyobj
self.certobj = certobj
self.ClientObj = ClientObj
self.cert_path = cert_path
self.CRL_PATH = os.path.join(cert_path, 'ca/crl/')
def connect(self):
# print("DEBUG connect checkClientHTTPS")
sock = socket.create_connection((self.host, self.port), self.timeout)
if hasattr(self, '_tunnel_host') and self._tunnel_host:
# print("Tunnel connection")
self.sock = sock
self._tunnel()
user_root_cert = self.ClientObj.VarsApi.Get('core.cl_user_root_cert')
# print(f"user_root_cert : \n {user_root_cert}")
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
user_root_cert = user_root_cert.replace("~",homePath)
# print(f"user_root_cert2 : \n {user_root_cert}")
result_user_root = 1
while True:
if os.path.exists(user_root_cert):
result_user_root = self.connect_trusted_root(sock,
user_root_cert, self.CRL_PATH)
# print(f"result_user_root : \n {result_user_root}")
if result_user_root == 1:
glob_root_cert = self.ClientObj.VarsApi.Get(
'core.cl_glob_root_cert')
# print(f"glob_root_cert : \n {glob_root_cert}")
result_root_con = 1
if os.path.exists(glob_root_cert):
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
result_root_con = self.connect_trusted_root(sock,
glob_root_cert, self.CRL_PATH)
# print(f"result_root_con : \n {result_root_con}")
if result_root_con == 1:
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
result_server_con = self.connect_trusted_server(
sock, self.CRL_PATH)
if result_server_con in [1,2]:
raise Exception (1)
elif result_server_con == 3:
continue
elif result_server_con == 4:
raise Exception (_('This server is not trusted'))
elif result_root_con == 2:
raise Exception (1)
elif result_user_root == 2:
raise Exception (1)
break
# get filename store cert server
def cert_list (self, host, ca_certs, server_cert):
if host == '127.0.0.1':
host = 'localhost'
if not os.path.exists(self.trusted_path):
try:
os.makedirs(self.trusted_path)
except OSError:
pass
if not os.path.exists(ca_certs):
open(ca_certs,"w").close()
filename = None
try:
with open(ca_certs) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split()
if len(words) > 1:
# if first word...
if words[0] == host:
filename = words[1]
if not filename:
return None
except:
12 years ago
msg = _("Certificate not found on the client's side")
show_msg (msg)
# self.parent.MainWidget.bottom.addMessage(msg)
return None
try:
store_cert = readFile(self.trusted_path + filename)
if store_cert == server_cert:
return filename
except:
12 years ago
msg = _('Error opening file')
show_msg (msg + ' %s%s' %(self.trusted_path, filename))
# self.parent.MainWidget.bottom.addMessage(msg)
return None
def add_all_ca_cert(self, list_ca_certs):
# so root cert be first, ca after
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
cl_client_cert_dir = \
self.ClientObj.VarsApi.Get('core.cl_client_cert_dir')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
root_cert_md5 = cl_client_cert_dir + "/ca/cert_list"
list_ca_certs.reverse()
for cert in list_ca_certs:
system_ca_db = \
self.ClientObj.VarsApi.Get('core.cl_glob_root_cert')
if os.path.exists(system_ca_db):
if cert in readFile(system_ca_db):
continue
user_root_cert = \
self.ClientObj.VarsApi.Get('core.cl_user_root_cert')
user_root_cert = user_root_cert.replace("~",homePath)
if os.path.exists(user_root_cert):
if cert in readFile(user_root_cert):
continue
md5 = hashlib.md5()
md5.update(cert)
md5sum = md5.hexdigest()
if not os.path.exists(root_cert_md5):
open(root_cert_md5,"w").close()
filename = None
with open(root_cert_md5) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split(' ',1)
if words[0] == md5sum:
filename = words[1]
if not filename:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == 'CN':
filename = item[1]
with open(root_cert_md5,"a") as fc:
fc.write('%s %s\n' %(md5sum, filename))
if not filename:
12 years ago
show_msg (_('Field "CN" not found in the certificate!'))
return 1
with open(cl_client_cert_dir + '/ca/' + filename, 'w') as fd:
fd.write(cert)
with open(user_root_cert, 'a') as fa:
fa.write(cert+'\n')
12 years ago
show_msg (_("Filename = %s") %filename, _('Certificate added'))
else:
12 years ago
show_msg (_('A file with the CA certificate now exists'))
get_CRL(cl_client_cert_dir)
def add_ca_cert(self, cert, list_ca_certs):
url = 'https://%s:%s/?wsdl' %(self.host, self.port)
client = Client_suds(url, \
transport = HTTPSClientCertTransport(None,None, \
self.cert_path, parent = self.ClientObj))
client.wsdl.services[0].setlocation(url)
try:
cert = client.service.get_ca()
3 years ago
except u2.URLError as e:
_print ('client.service.get_ca in client_class Exception')
cert = '0'
if cert == '1':
12 years ago
msg = _("Invalid server certificate!")
show_msg (msg)
return
if cert == '2':
12 years ago
msg = _('CA certificate not found on the server')
show_msg (msg)
return
if cert == '0':
show_msg (e, _("Not connected!"))
return
try:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
except:
msg = _('Error. Certificate not added to trusted')
show_msg (msg)
# self.parent.MainWidget.bottom.addMessage(msg)
return
inf_text = ''
inf_text += _("Fingerprint = %s") % certobj.digest('SHA1')
inf_text += '\n'+_("Serial Number = %s") \
%str(certobj.get_serial_number())
Issuer = certobj.get_issuer().get_components()
inf_text += '\n'+_("Issuer")
for i in Issuer:
inf_text += "\n %s : %s" %(i[0], i[1])
Subject = certobj.get_subject().get_components()
inf_text += '\n'+_("Subject")
for subj in Subject:
inf_text += "\n %s : %s" %(subj[0], subj[1])
12 years ago
text = _("Add the CA certificate to trusted? ")
reply = show_question(self.ClientObj.MainWidget, text, inf_text,
12 years ago
title = _('Adding the certificate'))
if reply == qt.QMessageBox.No:
show_msg (_('Certificate not added to trusted'))
elif reply == qt.QMessageBox.Yes:
list_ca_certs.append(cert)
self.add_all_ca_cert(list_ca_certs)
return
# add certificate server in trusted
def add_server_cert(self, cert):
self.add_cert = AddServerCert(self, self.ClientObj, cert)
self.add_cert.exec_()
return self.add_cert.flag
def connect_trusted_root(self, sock, root_cert, crl_certs):
self.ca_path = self.cert_path + "ca/"
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
global flag
if self.cert_file:
f = verify(server_cert, crl_certs, flag)
if not f:
flag = 1
elif f == 1:
sys.exit()
else:
import time
time.sleep(0.1)
try:
if self.FORCE_SSL_VERSION:
add = {'ssl_version': self.FORCE_SSL_VERSION}
else:
add = {}
add['cert_reqs'] = ssl.CERT_REQUIRED
3 years ago
add['keyobj'] = self.keyobj
add['certobj'] = self.certobj
add['keyfile'] = self.key_file
add['certfile'] = self.cert_file
self.sock = PyOpenSSLSocket(sock, ca_certs=self.ca_certs, **add)
return 0
except Exception as e:
# print("DEBUG except in connect trusted root")
# print(e)
return 1
def connect_trusted_server(self, sock, crl_certs):
self.trusted_path = self.cert_path + "trusted/"
ca_cert_list = self.trusted_path + "cert.list"
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
global flag
if self.cert_file:
f = verify(server_cert, crl_certs, flag)
if not f:
flag = 1
elif f == 1:
sys.exit()
HTTPSClientCertTransport.filename = self.cert_list \
(self.host, ca_cert_list, server_cert)
if HTTPSClientCertTransport.filename:
try:
if self.FORCE_SSL_VERSION:
add = {'ssl_version': self.FORCE_SSL_VERSION}
else:
add = {}
add['cert_reqs'] = ssl.CERT_NONE
3 years ago
add['keyobj'] = self.keyobj
add['certobj'] = self.certobj
add['keyfile'] = self.key_file
add['certfile'] = self.cert_file
self.sock = PyOpenSSLSocket(sock, ca_certs=self.ca_certs, **add)
return 0
3 years ago
except OpenSSL.SSL.Error as e:
if type(e) == list:
if type(e[0]) == tuple:
for i in e[0]:
3 years ago
sys.stdout.write(i+' ')
sys.stdout.flush()
sys.stdout.write('\n')
sys.stdout.flush()
else:
_print (e)
3 years ago
else:
_print (e)
HTTPSClientCertTransport.filename = None
return 1
3 years ago
except Exception as e:
_print (e)
HTTPSClientCertTransport.filename = None
return 1
else:
return self.add_server_cert(server_cert)
class CheckingClientHTTPSHandler(u2.HTTPSHandler):
def __init__(self, ClientObj, cert_path, ca_certs=None, cert_verifier=None,
client_certfile=None, client_keyfile=None,
client_keyobj=None, client_certobj=None,
*args, **kw):
"""cert_verifier is a function returning either True or False
based on whether the certificate was found to be OK"""
u2.HTTPSHandler.__init__(self, *args, **kw)
self.ca_certs = ca_certs
self.cert_verifier = cert_verifier
self.client_keyfile = client_keyfile # filename
self.client_certfile = client_certfile # filename
self.keyobj = client_keyobj
self.certobj = client_certobj
# FOR DEBUG
# self.set_http_debuglevel(100)
self.ClientObj = ClientObj
self.cert_path = cert_path
def https_open(self, req):
def open(*args, **kw):
new_kw = dict(ca_certs=self.ca_certs,
cert_verifier=self.cert_verifier,
cert_file=self.client_certfile,
key_file=self.client_keyfile,
keyobj=self.keyobj,
certobj=self.certobj)
new_kw.update(kw)
return CheckingClientHTTPSConnection(self.ClientObj,
self.cert_path, *args, **new_kw)
return self.do_open(open, req)
https_request = u2.AbstractHTTPHandler.do_request_
class HTTPSClientCertTransport(HttpTransport):
def __init__(self, key, cert, path_to_cert, parent, password = None,
ca_certs=None, cert_verifier=None,
client_keyfile=None, client_certfile=None,
client_keyobj=None, client_certobj=None,
cookie_callback=None, user_agent_string=None,
**kwargs):
Transport.__init__(self)
self.ClientObj = parent
self.key = key
self.cert = cert
self.cert_path = path_to_cert
if key:
3 years ago
with open(cert) as cert_file:
client_certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert_file.read())
if password:
with open(key) as key_file:
client_keyobj = OpenSSL.crypto.load_privatekey \
(OpenSSL.SSL.FILETYPE_PEM, key_file.read(),
password)
3 years ago
else:
import M2Crypto
bio = M2Crypto.BIO.openfile(key)
rsa = M2Crypto.m2.rsa_read_key(bio._ptr(),lambda *unused: b"")
3 years ago
if not rsa:
raise OpenSSL.crypto.Error
with open(key) as key_file:
client_keyobj = OpenSSL.crypto.load_privatekey(OpenSSL.SSL.FILETYPE_PEM,
key_file.read())
Unskin(self.options).update(kwargs)
self.cookiejar = CookieJar(DefaultCookiePolicy())
self.cookie_callback = cookie_callback
self.user_agent_string = user_agent_string
log.debug("Proxy: %s", self.options.proxy)
3 years ago
3 years ago
if ca_certs or (client_keyfile and client_certfile) \
or (client_keyobj and client_certobj):
https_handler = CheckingClientHTTPSHandler(parent,
cert_path=path_to_cert,
ca_certs=ca_certs,
cert_verifier=cert_verifier,
client_keyfile=client_keyfile,
client_certfile=client_certfile,
client_keyobj=client_keyobj,
client_certobj=client_certobj)
else:
https_handler = u2.HTTPSHandler()
self.urlopener = u2.build_opener(SUDSHTTPRedirectHandler(),
u2.HTTPCookieProcessor(self.cookiejar),
https_handler)
3 years ago
3 years ago
# relic from old times:
# from dslib.network import ProxyManager
# proxy_handler = ProxyManager.HTTPS_PROXY.create_proxy_handler()
# proxy_auth_handler = ProxyManager.HTTPS_PROXY.create_proxy_auth_handler()
# apparently, dslib simply returned None on create_proxy_auth_handler
# if this is ever needed, probably use urllib2.ProxyBasicAuthHandler
# proxy_auth_handler = None
# and create_proxy_handler SHOULD HAVE eval'd to this:
# proxy_handler = urllib2.ProxyHandler({"https" : "https://hostname"})
# but because no hostname was given, it also just returned None
# proxy_handler = None
3 years ago
#these two literally do nothing right now
3 years ago
# if proxy_handler:
# self.urlopener.add_handler(proxy_handler)
# if proxy_auth_handler:
# self.urlopener.add_handler(proxy_auth_handler)
3 years ago
self.urlopener.addheaders = [('User-agent', str(self.user_agent_string))]
###############################################################################
def get_CRL(path_to_cert):
""" get new CRL (Certificate Revocation List) from all CA """
# local CRL
CRL_path = os.path.join(path_to_cert, 'ca/crl/')
if not os.path.exists(CRL_path):
if not os.path.exists(os.path.join(path_to_cert, 'ca')):
if not os.path.exists(path_to_cert):
try:
os.makedirs(path_to_cert)
except OSError:
12 years ago
_print (_("Error creating directory %s") %path_to_cert)
sys.exit()
try:
os.makedirs(os.path.join(path_to_cert, 'ca'))
except OSError:
12 years ago
_print (_("Error creating directory %s") \
%(os.path.join(path_to_cert, 'ca')))
sys.exit()
os.makedirs(CRL_path)
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
# user and system ca and root certificates
user_root_cert = clVars.Get('core.cl_user_root_cert')
homePath = clVars.Get('ur_home_path')
user_root_cert = user_root_cert.replace("~",homePath)
glob_root_cert = clVars.Get('core.cl_glob_root_cert')
if os.path.exists(user_root_cert):
user_ca_certs = readFile(user_root_cert)
else: user_ca_certs = ''
if os.path.exists(glob_root_cert):
glob_ca_certs = readFile(glob_root_cert)
else: glob_ca_certs = ''
# get certificates list fron text
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]{5}\n?')
user_ca_certs_list = p.findall(user_ca_certs)
glob_ca_certs_list = p.findall(glob_ca_certs)
# association in one list
all_ca_certs_list = user_ca_certs_list + glob_ca_certs_list
for ca in all_ca_certs_list:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, ca)
# get url from certificates
url = None
CN = None
Subject = certobj.get_subject().get_components()
for subj in Subject:
if subj[0] == 'L':
url = "https://" + subj[1] +"/?wsdl"
if subj[0] == 'CN':
CN = subj[1]
if url:
# connect to ca server (url get from certificates)
try:
client = Client_suds(url,\
transport = HTTPSClientCertTransport(None, None, \
path_to_cert))
client.set_parameters (path_to_cert, None, None)
new_crl = client.service.get_crl()
3 years ago
except VerifyError as e:
12 years ago
_print (e.value)
#rm_ca_from_trusted(ca)
sys.exit()
except:
pass
if 'new_crl' in locals():
if new_crl:
if CN and len(CN) > 2:
CRL_file = CRL_path + CN
else:
host = subj[1].split(':')[0]
CRL_file = CRL_path + host
if new_crl == ' ':
open(CRL_file, 'w').close()
#if os.path.exists(CRL_file):
#os.unlink(CRL_file)
continue
if os.path.exists(CRL_file):
if readFile(CRL_file) == new_crl:
continue
with open(CRL_file, 'w') as fd:
fd.write(new_crl)
12 years ago
_print (_("CRL added"))
find_ca_in_crl(CRL_path, all_ca_certs_list)
def find_ca_in_crl(CRL_path, all_ca_certs_list):
# CRL_name_list = glob.glob(CRL_path + '*')
for ca in all_ca_certs_list:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, ca)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == 'CN':
CN = item[1]
serverSerial = certobj.get_serial_number()
CRL = CRL_path + CN
if not os.path.exists(CRL):
continue
with open(CRL, 'r') as _crl_file:
crl = "".join(_crl_file.readlines())
try:
crl_object = OpenSSL.crypto.load_crl \
(OpenSSL.crypto.FILETYPE_PEM, crl)
except:
continue
revoked_objects = crl_object.get_revoked()
for rvk in revoked_objects:
if serverSerial == int(rvk.get_serial(), 16):
rm_ca_from_trusted(ca)
def rm_ca_from_trusted(ca_cert):
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
user_ca_dir = clVars.Get('core.cl_client_cert_dir')
homePath = clVars.Get('ur_home_path')
user_ca_dir = user_ca_dir.replace("~",homePath)
user_ca_dir = os.path.join(user_ca_dir, 'ca')
user_ca_list = os.path.join(user_ca_dir, 'cert_list')
user_ca_db = clVars.Get('core.cl_user_root_cert')
homePath = clVars.Get('ur_home_path')
user_ca_db = user_ca_db.replace("~",homePath)
# system_ca_dir = clVars.Get('cl_core_cert_path')
# system_ca_list = os.path.join(system_ca_dir, 'cert_list')
system_ca_db = clVars.Get('cl_glob_root_cert')
import hashlib
md5 = hashlib.md5()
md5.update(ca_cert)
md5sum = md5.hexdigest()
# search ca certificate in user ca list
with open(user_ca_list) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
newfile = ''
# and each word in line
words = line.split()
if words[0] == md5sum:
filename = os.path.join(user_ca_dir, words[1])
if ca_cert == readFile(filename):
os.unlink(filename)
else:
newfile += (line + '\n')
else:
newfile += (line + '\n')
fd.close()
fn = open(user_ca_list, 'w')
fn.write(newfile)
fn.close()
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?')
# open, write and split user ca certificates
user_ca_certs = readFile(user_ca_db)
user_ca_certs_list = p.findall(user_ca_certs)
if ca_cert in user_ca_certs_list:
new_user_ca_certs = []
for cert in user_ca_certs_list:
if ca_cert != cert:
new_user_ca_certs.append(cert)
else:
12 years ago
_print (_("CA certificate removed from user trusted"))
with open(user_ca_db, 'w') as fd:
for cert in new_user_ca_certs:
fd.write(cert)
if not os.path.exists(system_ca_db):
open(system_ca_db, 'w').close()
system_ca_certs = readFile(system_ca_db)
system_ca_certs_list = p.findall(system_ca_certs)
if ca_cert in system_ca_certs_list:
new_system_ca_certs = []
for cert in system_ca_certs_list:
if ca_cert != cert:
new_system_ca_certs.append(cert)
else:
12 years ago
_print (_("CA certificate removed from system trusted"))
with open(system_ca_db, 'w') as fd:
for cert in new_system_ca_certs:
fd.write(cert)
return 0