You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
951 lines
36 KiB
951 lines
36 KiB
#-*- coding: utf-8 -*-
|
|
|
|
# Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from __future__ import absolute_import
|
|
from calculate.consolegui import qt
|
|
import urllib.request as u2
|
|
if hasattr(u2,"ssl"):
|
|
u2.ssl._create_default_https_context = u2.ssl._create_unverified_context
|
|
import os, re, sys
|
|
|
|
|
|
import calculate.contrib
|
|
from suds.transport.http import HttpTransport
|
|
|
|
# try:
|
|
from .pyopenssl_wrapper import PyOpenSSLSocket
|
|
import http.client as httplib
|
|
from suds.transport import Transport
|
|
from suds.properties import Unskin
|
|
from http.cookiejar import CookieJar, DefaultCookiePolicy
|
|
|
|
import socket, ssl
|
|
import OpenSSL, hashlib
|
|
from suds.client import Client
|
|
from logging import getLogger
|
|
# from calculate.core.client.cert_verify import verify, VerifyError
|
|
from calculate.lib.utils.files import readFile
|
|
|
|
from .more import show_msg, show_question, LabelWordWrap
|
|
|
|
flag = 0
|
|
log = getLogger(__name__)
|
|
|
|
# #TODO fix translation for verify
|
|
# _ = lambda x: x
|
|
# from calculate.lib.cl_lang import setLocalTranslate
|
|
# setLocalTranslate('cl_core3', sys.modules["__name__"])
|
|
|
|
from calculate.core.datavars import DataVarsCore
|
|
|
|
class VerifyError(Exception):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def __str__(self):
|
|
return repr(self.value)
|
|
|
|
|
|
# check recall of server certificate
|
|
def verify(server_cert, crl_path, flag):
|
|
import OpenSSL
|
|
certobj = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM,
|
|
server_cert)
|
|
serverSerial = certobj.get_serial_number()
|
|
Issuer = certobj.get_issuer().get_components()
|
|
CN, L = None, None
|
|
for i in Issuer:
|
|
if i[0] == 'CN':
|
|
CN = i[1]
|
|
elif i[0] == 'L':
|
|
L = i[1]
|
|
if CN and len(CN) > 2:
|
|
crl_file = crl_path + CN
|
|
elif L:
|
|
try:
|
|
host = L.split(':')[0]
|
|
except Exception:
|
|
if not flag:
|
|
print(_('The CN and L fields in the CA certificate are '
|
|
'incorrect!'))
|
|
return 0
|
|
crl_file = crl_path + host
|
|
else:
|
|
if not flag:
|
|
print(_('The CN and L fields in the CA certificate are incorrect!'))
|
|
return 0
|
|
|
|
if not os.path.exists(crl_file):
|
|
if not flag:
|
|
print(_("This certificate cannot be verified in the CRL."))
|
|
return 0
|
|
|
|
with open(crl_file, 'r') as _crl_file:
|
|
crl = "".join(_crl_file.readlines())
|
|
|
|
if crl == '':
|
|
return 0
|
|
crl_object = OpenSSL.crypto.load_crl(OpenSSL.crypto.FILETYPE_PEM, crl)
|
|
|
|
revoked_objects = crl_object.get_revoked()
|
|
|
|
for rvk in revoked_objects:
|
|
if serverSerial == int(rvk.get_serial(), 16):
|
|
print(_("This certificate has been revoked!"))
|
|
print (_("Serial") + ': %s\n' % rvk.get_serial() +
|
|
_("Revoke date") + _(': %s') % rvk.get_rev_date())
|
|
|
|
raise VerifyError('CRL Exception')
|
|
return 0
|
|
|
|
class AddServerCert (qt.QDialog):
|
|
def __init__(self, parent, ClientObj, cert):
|
|
super().__init__()
|
|
self.ClientObj = ClientObj
|
|
self.parent = parent
|
|
self.cert = cert
|
|
|
|
self.grid = qt.QGridLayout(self)
|
|
self.lbl_list = []
|
|
|
|
self.grid.addWidget(LabelWordWrap(_('Untrusted Server Certificate!'), \
|
|
self), 0, 1, 1, 2)
|
|
|
|
certobj = OpenSSL.crypto.load_certificate \
|
|
(OpenSSL.SSL.FILETYPE_PEM, cert)
|
|
self.grid.addWidget(LabelWordWrap (_('Fingerprint = %s') \
|
|
% certobj.digest('SHA1'), self), 1, 0, 1, 3)
|
|
self.grid.addWidget(LabelWordWrap (_('Serial Number = %s') \
|
|
% certobj.get_serial_number(), self), 2, 0, 1, 3)
|
|
|
|
self.tab = qt.QTabWidget(self)
|
|
# add Issuer tab
|
|
self.issuer_wgt = qt.QWidget(self)
|
|
self.issuer_layout = qt.QVBoxLayout()
|
|
|
|
Issuer = certobj.get_issuer().get_components()
|
|
for i in Issuer:
|
|
self.issuer_layout.addWidget(LabelWordWrap \
|
|
("%s : %s" %(i[0], i[1]),self))
|
|
|
|
self.issuer_wgt.setLayout(self.issuer_layout)
|
|
self.tab.addTab(self.issuer_wgt, _('Issuer'))
|
|
|
|
# add Subject tab
|
|
self.subject_wgt = qt.QWidget(self)
|
|
self.subject_layout = qt.QVBoxLayout()
|
|
|
|
Subject = certobj.get_subject().get_components()
|
|
for item in Subject:
|
|
self.subject_layout.addWidget(LabelWordWrap \
|
|
("%s : %s" %(item[0], item[1]),self))
|
|
|
|
self.subject_wgt.setLayout(self.subject_layout)
|
|
self.tab.addTab(self.subject_wgt, _('Subject'))
|
|
|
|
# add certificate
|
|
# self.cert_lbl = LabelWordWrap (certobj,self)
|
|
# self.tab.addTab(self.cert_lbl, 'Certificate')
|
|
|
|
self.grid.addWidget(self.tab, 3, 0, 3, 3)
|
|
|
|
self.lbl_list.append(LabelWordWrap \
|
|
(_("Add this server certificate to trusted or ") +\
|
|
_('try adding the CA and root certificates to trusted?'),self))
|
|
|
|
self.pix_lbl = qt.QLabel(self)
|
|
pi = qt.QPixmap()
|
|
pi.load('/usr/share/icons/oxygen/48x48/status/security-medium.png')
|
|
self.pix_lbl.setPixmap(pi)
|
|
self.grid.addWidget(self.pix_lbl, 0,0)
|
|
|
|
for num_lbl in range(len(self.lbl_list)):
|
|
self.grid.addWidget(self.lbl_list[num_lbl], num_lbl + 8, 0, 1, 3)
|
|
|
|
x = len (self.lbl_list) + 8
|
|
self.server_but = qt.QPushButton(_('Add the server certificate'),
|
|
self)
|
|
self.server_but.clicked.connect(self.add_server)
|
|
self.server_but.clicked.connect(self.close)
|
|
self.grid.addWidget(self.server_but, x, 0)
|
|
|
|
self.ca_but = qt.QPushButton(_("Add the CA and root certificates"),
|
|
self)
|
|
self.ca_but.clicked.connect(self.add_ca)
|
|
self.ca_but.clicked.connect(self.add_server)
|
|
self.ca_but.clicked.connect(self.close)
|
|
self.grid.addWidget(self.ca_but, x, 1)
|
|
|
|
self.cancel_but = qt.QPushButton(_('Cancel'), self)
|
|
self.cancel_but.clicked.connect(self.close)
|
|
self.grid.addWidget(self.cancel_but, x, 2)
|
|
|
|
# self.setLayout(self.grid)
|
|
self.setWindowTitle (_('Add the certificate to trusted'))
|
|
|
|
# move to center
|
|
prim_screen = ClientObj.app.desktop().primaryScreen()
|
|
|
|
x = ClientObj.app.desktop().screenGeometry(prim_screen).width()/2 - \
|
|
self.sizeHint().width()/2
|
|
y = ClientObj.app.desktop().screenGeometry(prim_screen).height()/2 - \
|
|
self.sizeHint().height()/2
|
|
self.move(x, y)
|
|
self.setFixedSize(self.sizeHint())
|
|
|
|
self.setWindowModality(qt.Qt.WindowModal)
|
|
self.flag = 4
|
|
|
|
def add_server(self):
|
|
ca_certs = self.parent.trusted_path + "cert.list"
|
|
|
|
if not os.path.exists(ca_certs):
|
|
open(ca_certs,"w").close()
|
|
|
|
if self.parent.host == '127.0.0.1':
|
|
host = 'localhost'
|
|
else:
|
|
host = self.parent.host
|
|
filename = host
|
|
with open(self.parent.trusted_path + filename,"w") as fc:
|
|
fc.write(self.cert)
|
|
with open(ca_certs) as fd:
|
|
t = fd.read()
|
|
# for each line
|
|
for line in t.splitlines():
|
|
# Split string into a words list
|
|
words = line.split()
|
|
if len(words) > 1:
|
|
# if first word...
|
|
if words[0] == host:
|
|
self.flag = 3
|
|
return 0
|
|
|
|
# Open file with compliance server certificates and server hostname
|
|
with open(ca_certs,"a") as fcl:
|
|
fcl.write(host + ' ' + filename + '\n')
|
|
show_msg (_('Server certificate added to trusted') +'\n'+ \
|
|
(self.parent.trusted_path + filename),_('Certificate added'))
|
|
self.flag = 3
|
|
from .conf_connection import FrameConnection
|
|
self.ConnectWidget = FrameConnection(self, self.ClientObj)
|
|
self.ConnectWidget.connect_to_host(host, self.ClientObj.port)
|
|
|
|
def add_ca(self):
|
|
cl_client_cert_dir = \
|
|
self.ClientObj.VarsApi.Get('core.cl_client_cert_dir')
|
|
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
|
|
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
|
|
root_cert_dir = cl_client_cert_dir + "/ca"
|
|
|
|
if not os.path.exists(root_cert_dir):
|
|
try:
|
|
os.makedirs(root_cert_dir)
|
|
except OSError:
|
|
show_msg (_('Error creating directory %s') %root_cert_dir)
|
|
return 1
|
|
|
|
self.parent.list_ca_certs = []
|
|
self.parent.add_ca_cert(self.cert, self.parent.list_ca_certs)
|
|
|
|
###############################################################################
|
|
|
|
class Client_suds(Client):
|
|
def set_parameters (self, path_to_cert, CERT_FILE, PKEY_FILE):
|
|
self.path_to_cert = path_to_cert
|
|
if not CERT_FILE:
|
|
CERT_FILE = ''
|
|
self.CERT_FILE = CERT_FILE
|
|
self.REQ_FILE = path_to_cert + 'client.csr'
|
|
self.PKEY_FILE = PKEY_FILE
|
|
self.SID_FILE = path_to_cert + 'sids'
|
|
self.SID_LOCK = path_to_cert + 'sids.lock'
|
|
self.CRL_PATH = path_to_cert + 'ca/crl/'
|
|
if not os.path.exists(self.CRL_PATH):
|
|
os.makedirs(self.CRL_PATH)
|
|
|
|
class SUDSHTTPRedirectHandler(u2.HTTPRedirectHandler):
|
|
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
|
"""Return a Request or None in response to a redirect.
|
|
|
|
This is called by the http_error_30x methods,
|
|
it was taken from the original Python version and modified
|
|
to use POST when redirection takes place.
|
|
This allows a SOAP message to be redirected without a loss
|
|
of content.
|
|
"""
|
|
m = req.get_method()
|
|
if (code in (301, 302, 303, 307) and m in ("GET", "HEAD")
|
|
or code in (301, 302, 303) and m == "POST"):
|
|
newurl = newurl.replace(' ', '%20')
|
|
newheaders = dict((k,v) for k,v in req.headers.items()
|
|
if k.lower() not in ("content-length", "content-type")
|
|
)
|
|
log.debug("Redirecting to %s", newurl)
|
|
return u2.Request(newurl,
|
|
data=req.data, # here we pass the original data
|
|
headers=newheaders,
|
|
origin_req_host=req.get_origin_req_host(),
|
|
unverifiable=True,
|
|
)
|
|
else:
|
|
raise u2.HTTPError(req.get_full_url(), code, msg, headers, fp)
|
|
|
|
class CheckingClientHTTPSConnection(httplib.HTTPSConnection):
|
|
"""based on httplib.HTTPSConnection code"""
|
|
response_class = httplib.HTTPResponse
|
|
|
|
FORCE_SSL_VERSION = None
|
|
SERVER_CERT_CHECK = True # might be turned off when a workaround is needed
|
|
|
|
def __init__(self, ClientObj, cert_path, host, ca_certs=None,
|
|
cert_verifier=None, keyobj=None, certobj=None, **kw):
|
|
"""cert_verifier is a function returning either True or False
|
|
based on whether the certificate was found to be OK,
|
|
keyobj and certobj represent internal PyOpenSSL structures holding
|
|
the key and certificate respectively.
|
|
"""
|
|
httplib.HTTPSConnection.__init__(self, host, **kw)
|
|
self.ca_certs = ca_certs
|
|
self.cert_verifier = cert_verifier
|
|
self.keyobj = keyobj
|
|
self.certobj = certobj
|
|
self.ClientObj = ClientObj
|
|
self.cert_path = cert_path
|
|
self.CRL_PATH = os.path.join(cert_path, 'ca/crl/')
|
|
|
|
def connect(self):
|
|
# print("DEBUG connect checkClientHTTPS")
|
|
sock = socket.create_connection((self.host, self.port), self.timeout)
|
|
if hasattr(self, '_tunnel_host') and self._tunnel_host:
|
|
# print("Tunnel connection")
|
|
self.sock = sock
|
|
self._tunnel()
|
|
|
|
user_root_cert = self.ClientObj.VarsApi.Get('core.cl_user_root_cert')
|
|
# print(f"user_root_cert : \n {user_root_cert}")
|
|
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
|
|
user_root_cert = user_root_cert.replace("~",homePath)
|
|
# print(f"user_root_cert2 : \n {user_root_cert}")
|
|
result_user_root = 1
|
|
|
|
while True:
|
|
if os.path.exists(user_root_cert):
|
|
result_user_root = self.connect_trusted_root(sock,
|
|
user_root_cert, self.CRL_PATH)
|
|
# print(f"result_user_root : \n {result_user_root}")
|
|
if result_user_root == 1:
|
|
glob_root_cert = self.ClientObj.VarsApi.Get(
|
|
'core.cl_glob_root_cert')
|
|
# print(f"glob_root_cert : \n {glob_root_cert}")
|
|
result_root_con = 1
|
|
if os.path.exists(glob_root_cert):
|
|
sock = socket.create_connection((self.host, self.port),
|
|
self.timeout, self.source_address)
|
|
if self._tunnel_host:
|
|
self.sock = sock
|
|
self._tunnel()
|
|
result_root_con = self.connect_trusted_root(sock,
|
|
glob_root_cert, self.CRL_PATH)
|
|
# print(f"result_root_con : \n {result_root_con}")
|
|
if result_root_con == 1:
|
|
sock = socket.create_connection((self.host, self.port),
|
|
self.timeout, self.source_address)
|
|
if self._tunnel_host:
|
|
self.sock = sock
|
|
self._tunnel()
|
|
result_server_con = self.connect_trusted_server(
|
|
sock, self.CRL_PATH)
|
|
if result_server_con in [1,2]:
|
|
raise Exception (1)
|
|
elif result_server_con == 3:
|
|
continue
|
|
elif result_server_con == 4:
|
|
raise Exception (_('This server is not trusted'))
|
|
elif result_root_con == 2:
|
|
raise Exception (1)
|
|
elif result_user_root == 2:
|
|
raise Exception (1)
|
|
break
|
|
|
|
# get filename store cert server
|
|
def cert_list (self, host, ca_certs, server_cert):
|
|
if host == '127.0.0.1':
|
|
host = 'localhost'
|
|
if not os.path.exists(self.trusted_path):
|
|
try:
|
|
os.makedirs(self.trusted_path)
|
|
except OSError:
|
|
pass
|
|
if not os.path.exists(ca_certs):
|
|
open(ca_certs,"w").close()
|
|
filename = None
|
|
try:
|
|
with open(ca_certs) as fd:
|
|
t = fd.read()
|
|
# for each line
|
|
for line in t.splitlines():
|
|
# Split string into a words list
|
|
words = line.split()
|
|
if len(words) > 1:
|
|
# if first word...
|
|
if words[0] == host:
|
|
filename = words[1]
|
|
if not filename:
|
|
return None
|
|
except:
|
|
msg = _("Certificate not found on the client's side")
|
|
show_msg (msg)
|
|
# self.parent.MainWidget.bottom.addMessage(msg)
|
|
return None
|
|
try:
|
|
store_cert = readFile(self.trusted_path + filename)
|
|
if store_cert == server_cert:
|
|
return filename
|
|
except:
|
|
msg = _('Error opening file')
|
|
show_msg (msg + ' %s%s' %(self.trusted_path, filename))
|
|
# self.parent.MainWidget.bottom.addMessage(msg)
|
|
return None
|
|
|
|
def add_all_ca_cert(self, list_ca_certs):
|
|
# so root cert be first, ca after
|
|
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
|
|
cl_client_cert_dir = \
|
|
self.ClientObj.VarsApi.Get('core.cl_client_cert_dir')
|
|
|
|
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
|
|
root_cert_md5 = cl_client_cert_dir + "/ca/cert_list"
|
|
|
|
list_ca_certs.reverse()
|
|
for cert in list_ca_certs:
|
|
system_ca_db = \
|
|
self.ClientObj.VarsApi.Get('core.cl_glob_root_cert')
|
|
if os.path.exists(system_ca_db):
|
|
if cert in readFile(system_ca_db):
|
|
continue
|
|
|
|
user_root_cert = \
|
|
self.ClientObj.VarsApi.Get('core.cl_user_root_cert')
|
|
user_root_cert = user_root_cert.replace("~",homePath)
|
|
if os.path.exists(user_root_cert):
|
|
if cert in readFile(user_root_cert):
|
|
continue
|
|
|
|
md5 = hashlib.md5()
|
|
md5.update(cert)
|
|
md5sum = md5.hexdigest()
|
|
|
|
if not os.path.exists(root_cert_md5):
|
|
open(root_cert_md5,"w").close()
|
|
|
|
filename = None
|
|
with open(root_cert_md5) as fd:
|
|
t = fd.read()
|
|
# for each line
|
|
for line in t.splitlines():
|
|
# Split string into a words list
|
|
words = line.split(' ',1)
|
|
if words[0] == md5sum:
|
|
filename = words[1]
|
|
if not filename:
|
|
certobj = OpenSSL.crypto.load_certificate \
|
|
(OpenSSL.SSL.FILETYPE_PEM, cert)
|
|
Issuer = certobj.get_issuer().get_components()
|
|
for item in Issuer:
|
|
if item[0] == 'CN':
|
|
filename = item[1]
|
|
|
|
with open(root_cert_md5,"a") as fc:
|
|
fc.write('%s %s\n' %(md5sum, filename))
|
|
|
|
if not filename:
|
|
show_msg (_('Field "CN" not found in the certificate!'))
|
|
return 1
|
|
|
|
with open(cl_client_cert_dir + '/ca/' + filename, 'w') as fd:
|
|
fd.write(cert)
|
|
|
|
with open(user_root_cert, 'a') as fa:
|
|
fa.write(cert+'\n')
|
|
show_msg (_("Filename = %s") %filename, _('Certificate added'))
|
|
else:
|
|
show_msg (_('A file with the CA certificate now exists'))
|
|
get_CRL(cl_client_cert_dir)
|
|
|
|
def add_ca_cert(self, cert, list_ca_certs):
|
|
url = 'https://%s:%s/?wsdl' %(self.host, self.port)
|
|
client = Client_suds(url, \
|
|
transport = HTTPSClientCertTransport(None,None, \
|
|
self.cert_path, parent = self.ClientObj))
|
|
client.wsdl.services[0].setlocation(url)
|
|
try:
|
|
cert = client.service.get_ca()
|
|
except u2.URLError as e:
|
|
_print ('client.service.get_ca in client_class Exception')
|
|
cert = '0'
|
|
|
|
if cert == '1':
|
|
msg = _("Invalid server certificate!")
|
|
show_msg (msg)
|
|
return
|
|
|
|
if cert == '2':
|
|
msg = _('CA certificate not found on the server')
|
|
show_msg (msg)
|
|
return
|
|
|
|
if cert == '0':
|
|
show_msg (e, _("Not connected!"))
|
|
return
|
|
|
|
try:
|
|
certobj = OpenSSL.crypto.load_certificate \
|
|
(OpenSSL.SSL.FILETYPE_PEM, cert)
|
|
except:
|
|
msg = _('Error. Certificate not added to trusted')
|
|
show_msg (msg)
|
|
# self.parent.MainWidget.bottom.addMessage(msg)
|
|
return
|
|
|
|
inf_text = ''
|
|
inf_text += _("Fingerprint = %s") % certobj.digest('SHA1')
|
|
inf_text += '\n'+_("Serial Number = %s") \
|
|
%str(certobj.get_serial_number())
|
|
Issuer = certobj.get_issuer().get_components()
|
|
inf_text += '\n'+_("Issuer")
|
|
for i in Issuer:
|
|
inf_text += "\n %s : %s" %(i[0], i[1])
|
|
Subject = certobj.get_subject().get_components()
|
|
inf_text += '\n'+_("Subject")
|
|
for subj in Subject:
|
|
inf_text += "\n %s : %s" %(subj[0], subj[1])
|
|
|
|
text = _("Add the CA certificate to trusted? ")
|
|
reply = show_question(self.ClientObj.MainWidget, text, inf_text,
|
|
title = _('Adding the certificate'))
|
|
|
|
if reply == qt.QMessageBox.No:
|
|
show_msg (_('Certificate not added to trusted'))
|
|
|
|
elif reply == qt.QMessageBox.Yes:
|
|
list_ca_certs.append(cert)
|
|
self.add_all_ca_cert(list_ca_certs)
|
|
return
|
|
|
|
# add certificate server in trusted
|
|
def add_server_cert(self, cert):
|
|
self.add_cert = AddServerCert(self, self.ClientObj, cert)
|
|
self.add_cert.exec_()
|
|
return self.add_cert.flag
|
|
|
|
def connect_trusted_root(self, sock, root_cert, crl_certs):
|
|
self.ca_path = self.cert_path + "ca/"
|
|
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
|
|
global flag
|
|
|
|
if self.cert_file:
|
|
f = verify(server_cert, crl_certs, flag)
|
|
if not f:
|
|
flag = 1
|
|
elif f == 1:
|
|
sys.exit()
|
|
else:
|
|
import time
|
|
time.sleep(0.1)
|
|
|
|
try:
|
|
if self.FORCE_SSL_VERSION:
|
|
add = {'ssl_version': self.FORCE_SSL_VERSION}
|
|
else:
|
|
add = {}
|
|
add['cert_reqs'] = ssl.CERT_REQUIRED
|
|
add['keyobj'] = self.keyobj
|
|
add['certobj'] = self.certobj
|
|
add['keyfile'] = self.key_file
|
|
add['certfile'] = self.cert_file
|
|
self.sock = PyOpenSSLSocket(sock, ca_certs=self.ca_certs, **add)
|
|
return 0
|
|
except Exception as e:
|
|
# print("DEBUG except in connect trusted root")
|
|
# print(e)
|
|
return 1
|
|
|
|
def connect_trusted_server(self, sock, crl_certs):
|
|
self.trusted_path = self.cert_path + "trusted/"
|
|
ca_cert_list = self.trusted_path + "cert.list"
|
|
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
|
|
global flag
|
|
if self.cert_file:
|
|
f = verify(server_cert, crl_certs, flag)
|
|
if not f:
|
|
flag = 1
|
|
elif f == 1:
|
|
sys.exit()
|
|
HTTPSClientCertTransport.filename = self.cert_list \
|
|
(self.host, ca_cert_list, server_cert)
|
|
if HTTPSClientCertTransport.filename:
|
|
try:
|
|
if self.FORCE_SSL_VERSION:
|
|
add = {'ssl_version': self.FORCE_SSL_VERSION}
|
|
else:
|
|
add = {}
|
|
add['cert_reqs'] = ssl.CERT_NONE
|
|
add['keyobj'] = self.keyobj
|
|
add['certobj'] = self.certobj
|
|
add['keyfile'] = self.key_file
|
|
add['certfile'] = self.cert_file
|
|
self.sock = PyOpenSSLSocket(sock, ca_certs=self.ca_certs, **add)
|
|
return 0
|
|
|
|
except OpenSSL.SSL.Error as e:
|
|
if type(e.message) == list:
|
|
if type(e.message[0]) == tuple:
|
|
for i in e.message[0]:
|
|
sys.stdout.write(i+' ')
|
|
sys.stdout.flush()
|
|
sys.stdout.write('\n')
|
|
sys.stdout.flush()
|
|
else:
|
|
_print (e.message)
|
|
else:
|
|
_print (e.message)
|
|
HTTPSClientCertTransport.filename = None
|
|
return 1
|
|
|
|
except Exception as e:
|
|
_print (e)
|
|
HTTPSClientCertTransport.filename = None
|
|
return 1
|
|
else:
|
|
return self.add_server_cert(server_cert)
|
|
|
|
class CheckingClientHTTPSHandler(u2.HTTPSHandler):
|
|
def __init__(self, ClientObj, cert_path, ca_certs=None, cert_verifier=None,
|
|
client_certfile=None, client_keyfile=None,
|
|
client_keyobj=None, client_certobj=None,
|
|
*args, **kw):
|
|
"""cert_verifier is a function returning either True or False
|
|
based on whether the certificate was found to be OK"""
|
|
u2.HTTPSHandler.__init__(self, *args, **kw)
|
|
self.ca_certs = ca_certs
|
|
self.cert_verifier = cert_verifier
|
|
self.client_keyfile = client_keyfile # filename
|
|
self.client_certfile = client_certfile # filename
|
|
self.keyobj = client_keyobj
|
|
self.certobj = client_certobj
|
|
# FOR DEBUG
|
|
# self.set_http_debuglevel(100)
|
|
self.ClientObj = ClientObj
|
|
self.cert_path = cert_path
|
|
|
|
def https_open(self, req):
|
|
def open(*args, **kw):
|
|
new_kw = dict(ca_certs=self.ca_certs,
|
|
cert_verifier=self.cert_verifier,
|
|
cert_file=self.client_certfile,
|
|
key_file=self.client_keyfile,
|
|
keyobj=self.keyobj,
|
|
certobj=self.certobj)
|
|
new_kw.update(kw)
|
|
return CheckingClientHTTPSConnection(self.ClientObj,
|
|
self.cert_path, *args, **new_kw)
|
|
return self.do_open(open, req)
|
|
|
|
https_request = u2.AbstractHTTPHandler.do_request_
|
|
|
|
class HTTPSClientCertTransport(HttpTransport):
|
|
def __init__(self, key, cert, path_to_cert, parent, password = None,
|
|
ca_certs=None, cert_verifier=None,
|
|
client_keyfile=None, client_certfile=None,
|
|
client_keyobj=None, client_certobj=None,
|
|
cookie_callback=None, user_agent_string=None,
|
|
**kwargs):
|
|
Transport.__init__(self)
|
|
self.ClientObj = parent
|
|
self.key = key
|
|
self.cert = cert
|
|
self.cert_path = path_to_cert
|
|
if key:
|
|
with open(cert) as cert_file:
|
|
client_certobj = OpenSSL.crypto.load_certificate \
|
|
(OpenSSL.SSL.FILETYPE_PEM, cert_file.read())
|
|
if password:
|
|
with open(key) as key_file:
|
|
client_keyobj = OpenSSL.crypto.load_privatekey \
|
|
(OpenSSL.SSL.FILETYPE_PEM, key_file.read(),
|
|
password)
|
|
else:
|
|
import M2Crypto
|
|
bio = M2Crypto.BIO.openfile(key)
|
|
rsa = M2Crypto.m2.rsa_read_key(bio._ptr(),lambda *unused: b"")
|
|
if not rsa:
|
|
raise OpenSSL.crypto.Error
|
|
with open(key) as key_file:
|
|
client_keyobj = OpenSSL.crypto.load_privatekey(OpenSSL.SSL.FILETYPE_PEM,
|
|
key_file.read())
|
|
|
|
Unskin(self.options).update(kwargs)
|
|
self.cookiejar = CookieJar(DefaultCookiePolicy())
|
|
self.cookie_callback = cookie_callback
|
|
self.user_agent_string = user_agent_string
|
|
log.debug("Proxy: %s", self.options.proxy)
|
|
|
|
|
|
if ca_certs or (client_keyfile and client_certfile) \
|
|
or (client_keyobj and client_certobj):
|
|
https_handler = CheckingClientHTTPSHandler(parent,
|
|
cert_path=path_to_cert,
|
|
ca_certs=ca_certs,
|
|
cert_verifier=cert_verifier,
|
|
client_keyfile=client_keyfile,
|
|
client_certfile=client_certfile,
|
|
client_keyobj=client_keyobj,
|
|
client_certobj=client_certobj)
|
|
else:
|
|
https_handler = u2.HTTPSHandler()
|
|
self.urlopener = u2.build_opener(SUDSHTTPRedirectHandler(),
|
|
u2.HTTPCookieProcessor(self.cookiejar),
|
|
https_handler)
|
|
|
|
# relic from old times:
|
|
# from dslib.network import ProxyManager
|
|
# proxy_handler = ProxyManager.HTTPS_PROXY.create_proxy_handler()
|
|
# proxy_auth_handler = ProxyManager.HTTPS_PROXY.create_proxy_auth_handler()
|
|
|
|
# apparently, dslib simply returned None on create_proxy_auth_handler
|
|
# if this is ever needed, probably use urllib2.ProxyBasicAuthHandler
|
|
# proxy_auth_handler = None
|
|
# and create_proxy_handler SHOULD HAVE eval'd to this:
|
|
# proxy_handler = urllib2.ProxyHandler({"https" : "https://hostname"})
|
|
# but because no hostname was given, it also just returned None
|
|
# proxy_handler = None
|
|
|
|
#these two literally do nothing right now
|
|
# if proxy_handler:
|
|
# self.urlopener.add_handler(proxy_handler)
|
|
# if proxy_auth_handler:
|
|
# self.urlopener.add_handler(proxy_auth_handler)
|
|
|
|
self.urlopener.addheaders = [('User-agent', str(self.user_agent_string))]
|
|
|
|
###############################################################################
|
|
|
|
def get_CRL(path_to_cert):
|
|
""" get new CRL (Certificate Revocation List) from all CA """
|
|
# local CRL
|
|
CRL_path = os.path.join(path_to_cert, 'ca/crl/')
|
|
if not os.path.exists(CRL_path):
|
|
if not os.path.exists(os.path.join(path_to_cert, 'ca')):
|
|
if not os.path.exists(path_to_cert):
|
|
try:
|
|
os.makedirs(path_to_cert)
|
|
except OSError:
|
|
_print (_("Error creating directory %s") %path_to_cert)
|
|
sys.exit()
|
|
try:
|
|
os.makedirs(os.path.join(path_to_cert, 'ca'))
|
|
except OSError:
|
|
_print (_("Error creating directory %s") \
|
|
%(os.path.join(path_to_cert, 'ca')))
|
|
sys.exit()
|
|
os.makedirs(CRL_path)
|
|
|
|
clVars = DataVarsCore()
|
|
clVars.importCore()
|
|
clVars.flIniFile()
|
|
# user and system ca and root certificates
|
|
user_root_cert = clVars.Get('core.cl_user_root_cert')
|
|
homePath = clVars.Get('ur_home_path')
|
|
user_root_cert = user_root_cert.replace("~",homePath)
|
|
|
|
glob_root_cert = clVars.Get('core.cl_glob_root_cert')
|
|
|
|
if os.path.exists(user_root_cert):
|
|
user_ca_certs = readFile(user_root_cert)
|
|
else: user_ca_certs = ''
|
|
if os.path.exists(glob_root_cert):
|
|
glob_ca_certs = readFile(glob_root_cert)
|
|
else: glob_ca_certs = ''
|
|
|
|
# get certificates list fron text
|
|
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]{5}\n?')
|
|
user_ca_certs_list = p.findall(user_ca_certs)
|
|
glob_ca_certs_list = p.findall(glob_ca_certs)
|
|
|
|
# association in one list
|
|
all_ca_certs_list = user_ca_certs_list + glob_ca_certs_list
|
|
for ca in all_ca_certs_list:
|
|
certobj = OpenSSL.crypto.load_certificate \
|
|
(OpenSSL.SSL.FILETYPE_PEM, ca)
|
|
# get url from certificates
|
|
url = None
|
|
CN = None
|
|
Subject = certobj.get_subject().get_components()
|
|
for subj in Subject:
|
|
if subj[0] == 'L':
|
|
url = "https://" + subj[1] +"/?wsdl"
|
|
if subj[0] == 'CN':
|
|
CN = subj[1]
|
|
|
|
if url:
|
|
# connect to ca server (url get from certificates)
|
|
try:
|
|
client = Client_suds(url,\
|
|
transport = HTTPSClientCertTransport(None, None, \
|
|
path_to_cert))
|
|
|
|
client.set_parameters (path_to_cert, None, None)
|
|
new_crl = client.service.get_crl()
|
|
except VerifyError as e:
|
|
_print (e.value)
|
|
#rm_ca_from_trusted(ca)
|
|
sys.exit()
|
|
except:
|
|
pass
|
|
|
|
if 'new_crl' in locals():
|
|
if new_crl:
|
|
if CN and len(CN) > 2:
|
|
CRL_file = CRL_path + CN
|
|
else:
|
|
host = subj[1].split(':')[0]
|
|
CRL_file = CRL_path + host
|
|
if new_crl == ' ':
|
|
open(CRL_file, 'w').close()
|
|
#if os.path.exists(CRL_file):
|
|
#os.unlink(CRL_file)
|
|
continue
|
|
if os.path.exists(CRL_file):
|
|
if readFile(CRL_file) == new_crl:
|
|
continue
|
|
|
|
with open(CRL_file, 'w') as fd:
|
|
fd.write(new_crl)
|
|
_print (_("CRL added"))
|
|
find_ca_in_crl(CRL_path, all_ca_certs_list)
|
|
|
|
def find_ca_in_crl(CRL_path, all_ca_certs_list):
|
|
# CRL_name_list = glob.glob(CRL_path + '*')
|
|
for ca in all_ca_certs_list:
|
|
certobj = OpenSSL.crypto.load_certificate \
|
|
(OpenSSL.SSL.FILETYPE_PEM, ca)
|
|
|
|
Issuer = certobj.get_issuer().get_components()
|
|
for item in Issuer:
|
|
if item[0] == 'CN':
|
|
CN = item[1]
|
|
serverSerial = certobj.get_serial_number()
|
|
CRL = CRL_path + CN
|
|
if not os.path.exists(CRL):
|
|
continue
|
|
|
|
with open(CRL, 'r') as _crl_file:
|
|
crl = "".join(_crl_file.readlines())
|
|
|
|
try:
|
|
crl_object = OpenSSL.crypto.load_crl \
|
|
(OpenSSL.crypto.FILETYPE_PEM, crl)
|
|
except:
|
|
continue
|
|
revoked_objects = crl_object.get_revoked()
|
|
|
|
for rvk in revoked_objects:
|
|
if serverSerial == int(rvk.get_serial(), 16):
|
|
rm_ca_from_trusted(ca)
|
|
|
|
def rm_ca_from_trusted(ca_cert):
|
|
clVars = DataVarsCore()
|
|
clVars.importCore()
|
|
clVars.flIniFile()
|
|
|
|
user_ca_dir = clVars.Get('core.cl_client_cert_dir')
|
|
homePath = clVars.Get('ur_home_path')
|
|
user_ca_dir = user_ca_dir.replace("~",homePath)
|
|
user_ca_dir = os.path.join(user_ca_dir, 'ca')
|
|
user_ca_list = os.path.join(user_ca_dir, 'cert_list')
|
|
user_ca_db = clVars.Get('core.cl_user_root_cert')
|
|
homePath = clVars.Get('ur_home_path')
|
|
user_ca_db = user_ca_db.replace("~",homePath)
|
|
|
|
# system_ca_dir = clVars.Get('cl_core_cert_path')
|
|
# system_ca_list = os.path.join(system_ca_dir, 'cert_list')
|
|
system_ca_db = clVars.Get('cl_glob_root_cert')
|
|
|
|
import hashlib
|
|
md5 = hashlib.md5()
|
|
md5.update(ca_cert)
|
|
md5sum = md5.hexdigest()
|
|
|
|
# search ca certificate in user ca list
|
|
with open(user_ca_list) as fd:
|
|
t = fd.read()
|
|
# See each line
|
|
for line in t.splitlines():
|
|
newfile = ''
|
|
# and each word in line
|
|
words = line.split()
|
|
if words[0] == md5sum:
|
|
filename = os.path.join(user_ca_dir, words[1])
|
|
if ca_cert == readFile(filename):
|
|
os.unlink(filename)
|
|
else:
|
|
newfile += (line + '\n')
|
|
else:
|
|
newfile += (line + '\n')
|
|
|
|
fd.close()
|
|
fn = open(user_ca_list, 'w')
|
|
fn.write(newfile)
|
|
fn.close()
|
|
|
|
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?')
|
|
|
|
# open, write and split user ca certificates
|
|
user_ca_certs = readFile(user_ca_db)
|
|
user_ca_certs_list = p.findall(user_ca_certs)
|
|
|
|
if ca_cert in user_ca_certs_list:
|
|
new_user_ca_certs = []
|
|
for cert in user_ca_certs_list:
|
|
if ca_cert != cert:
|
|
new_user_ca_certs.append(cert)
|
|
else:
|
|
_print (_("CA certificate removed from user trusted"))
|
|
|
|
with open(user_ca_db, 'w') as fd:
|
|
for cert in new_user_ca_certs:
|
|
fd.write(cert)
|
|
|
|
if not os.path.exists(system_ca_db):
|
|
open(system_ca_db, 'w').close()
|
|
|
|
system_ca_certs = readFile(system_ca_db)
|
|
system_ca_certs_list = p.findall(system_ca_certs)
|
|
|
|
if ca_cert in system_ca_certs_list:
|
|
new_system_ca_certs = []
|
|
for cert in system_ca_certs_list:
|
|
if ca_cert != cert:
|
|
new_system_ca_certs.append(cert)
|
|
else:
|
|
_print (_("CA certificate removed from system trusted"))
|
|
|
|
with open(system_ca_db, 'w') as fd:
|
|
for cert in new_system_ca_certs:
|
|
fd.write(cert)
|
|
return 0
|