You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
833 lines
32 KiB
833 lines
32 KiB
13 years ago
|
#-*- coding: utf-8 -*-
|
||
|
|
||
11 years ago
|
# Copyright 2012-2013 Calculate Ltd. http://www.calculate-linux.org
|
||
13 years ago
|
#
|
||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
# you may not use this file except in compliance with the License.
|
||
|
# You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
|
||
13 years ago
|
from PySide import QtGui, QtCore
|
||
13 years ago
|
import urllib2 as u2
|
||
12 years ago
|
import os, re, sys
|
||
|
from calculate.core.datavars import DataVarsCore
|
||
12 years ago
|
|
||
|
from sudsds.transport.http import HttpTransport, SUDSHTTPRedirectHandler, \
|
||
|
CheckingHTTPSConnection, CheckingHTTPSHandler, \
|
||
|
PYOPENSSL_AVAILABLE, PyOpenSSLSocket
|
||
|
from sudsds.transport import Transport
|
||
|
from sudsds.properties import Unskin
|
||
|
from cookielib import CookieJar, DefaultCookiePolicy
|
||
|
|
||
13 years ago
|
import socket, ssl
|
||
|
import OpenSSL, hashlib
|
||
12 years ago
|
from sudsds.client import Client
|
||
|
from logging import getLogger
|
||
12 years ago
|
from calculate.core.client.cert_verify import verify, VerifyError
|
||
13 years ago
|
|
||
|
from more import show_msg, show_question, LabelWordWrap
|
||
|
|
||
|
flag = 0
|
||
12 years ago
|
log = getLogger(__name__)
|
||
13 years ago
|
|
||
12 years ago
|
class AddServerCert (QtGui.QDialog):
|
||
13 years ago
|
def __init__(self, parent, ClientObj, cert):
|
||
12 years ago
|
super(AddServerCert, self).__init__()
|
||
12 years ago
|
self.ClientObj = ClientObj
|
||
12 years ago
|
self.parent = parent
|
||
13 years ago
|
self.cert = cert
|
||
12 years ago
|
|
||
13 years ago
|
self.grid = QtGui.QGridLayout(self)
|
||
|
self.lbl_list = []
|
||
|
|
||
13 years ago
|
self.grid.addWidget(LabelWordWrap(_('Untrusted Server Certificate!'), \
|
||
|
self), 0, 1, 1, 2)
|
||
12 years ago
|
|
||
13 years ago
|
certobj = OpenSSL.crypto.load_certificate \
|
||
|
(OpenSSL.SSL.FILETYPE_PEM, cert)
|
||
13 years ago
|
self.grid.addWidget(LabelWordWrap (_('Fingerprint = %s') \
|
||
12 years ago
|
% certobj.digest('SHA1'), self), 1, 0, 1, 3)
|
||
13 years ago
|
self.grid.addWidget(LabelWordWrap (_('Serial Number = %s') \
|
||
12 years ago
|
% certobj.get_serial_number(), self), 2, 0, 1, 3)
|
||
|
|
||
13 years ago
|
self.tab = QtGui.QTabWidget(self)
|
||
|
# add Issuer tab
|
||
|
self.issuer_wgt = QtGui.QWidget(self)
|
||
|
self.issuer_layout = QtGui.QVBoxLayout()
|
||
12 years ago
|
|
||
13 years ago
|
Issuer = certobj.get_issuer().get_components()
|
||
|
for i in Issuer:
|
||
12 years ago
|
self.issuer_layout.addWidget(LabelWordWrap \
|
||
|
("%s : %s" %(i[0], i[1]),self))
|
||
12 years ago
|
|
||
13 years ago
|
self.issuer_wgt.setLayout(self.issuer_layout)
|
||
13 years ago
|
self.tab.addTab(self.issuer_wgt, _('Issuer'))
|
||
13 years ago
|
|
||
|
# add Subject tab
|
||
|
self.subject_wgt = QtGui.QWidget(self)
|
||
|
self.subject_layout = QtGui.QVBoxLayout()
|
||
|
|
||
13 years ago
|
Subject = certobj.get_subject().get_components()
|
||
|
for item in Subject:
|
||
12 years ago
|
self.subject_layout.addWidget(LabelWordWrap \
|
||
12 years ago
|
("%s : %s" %(item[0], item[1]),self))
|
||
12 years ago
|
|
||
13 years ago
|
self.subject_wgt.setLayout(self.subject_layout)
|
||
13 years ago
|
self.tab.addTab(self.subject_wgt, _('Subject'))
|
||
13 years ago
|
|
||
|
# add certificate
|
||
|
# self.cert_lbl = LabelWordWrap (certobj,self)
|
||
|
# self.tab.addTab(self.cert_lbl, 'Certificate')
|
||
12 years ago
|
|
||
13 years ago
|
self.grid.addWidget(self.tab, 3, 0, 3, 3)
|
||
13 years ago
|
|
||
13 years ago
|
self.lbl_list.append(LabelWordWrap \
|
||
12 years ago
|
(_("Add this server certificate to trusted or ") +\
|
||
|
_('try adding the CA and root certificates to trusted?'),self))
|
||
12 years ago
|
|
||
13 years ago
|
self.pix_lbl = QtGui.QLabel(self)
|
||
|
pi = QtGui.QPixmap()
|
||
13 years ago
|
pi.load('/usr/share/icons/oxygen/48x48/status/security-medium.png')
|
||
13 years ago
|
self.pix_lbl.setPixmap(pi)
|
||
|
self.grid.addWidget(self.pix_lbl, 0,0)
|
||
12 years ago
|
|
||
13 years ago
|
for num_lbl in range(len(self.lbl_list)):
|
||
13 years ago
|
self.grid.addWidget(self.lbl_list[num_lbl], num_lbl + 8, 0, 1, 3)
|
||
12 years ago
|
|
||
13 years ago
|
x = len (self.lbl_list) + 8
|
||
12 years ago
|
self.server_but = QtGui.QPushButton(_('Add the server certificate'),
|
||
|
self)
|
||
13 years ago
|
self.server_but.clicked.connect(self.add_server)
|
||
|
self.server_but.clicked.connect(self.close)
|
||
|
self.grid.addWidget(self.server_but, x, 0)
|
||
|
|
||
12 years ago
|
self.ca_but = QtGui.QPushButton(_("Add the CA and root certificates"),
|
||
12 years ago
|
self)
|
||
13 years ago
|
self.ca_but.clicked.connect(self.add_ca)
|
||
12 years ago
|
self.ca_but.clicked.connect(self.add_server)
|
||
12 years ago
|
self.ca_but.clicked.connect(self.close)
|
||
13 years ago
|
self.grid.addWidget(self.ca_but, x, 1)
|
||
|
|
||
13 years ago
|
self.cancel_but = QtGui.QPushButton(_('Cancel'), self)
|
||
13 years ago
|
self.cancel_but.clicked.connect(self.close)
|
||
|
self.grid.addWidget(self.cancel_but, x, 2)
|
||
12 years ago
|
|
||
13 years ago
|
# self.setLayout(self.grid)
|
||
12 years ago
|
self.setWindowTitle (_('Add the certificate to trusted'))
|
||
12 years ago
|
|
||
|
# move to center
|
||
|
prim_screen = ClientObj.app.desktop().primaryScreen()
|
||
12 years ago
|
|
||
12 years ago
|
x = ClientObj.app.desktop().screenGeometry(prim_screen).width()/2 - \
|
||
|
self.sizeHint().width()/2
|
||
|
y = ClientObj.app.desktop().screenGeometry(prim_screen).height()/2 - \
|
||
|
self.sizeHint().height()/2
|
||
12 years ago
|
self.move(x, y)
|
||
|
self.setFixedSize(self.sizeHint())
|
||
13 years ago
|
|
||
12 years ago
|
self.setAttribute(QtCore.Qt.WA_ShowModal)
|
||
12 years ago
|
self.flag = 4
|
||
12 years ago
|
|
||
13 years ago
|
def add_server(self):
|
||
12 years ago
|
ca_certs = self.parent.trusted_path + "cert.list"
|
||
12 years ago
|
|
||
13 years ago
|
if not os.path.exists(ca_certs):
|
||
|
fc = open(ca_certs,"w")
|
||
|
fc.close()
|
||
12 years ago
|
|
||
12 years ago
|
if self.parent.host == '127.0.0.1':
|
||
13 years ago
|
host = 'localhost'
|
||
|
else:
|
||
12 years ago
|
host = self.parent.host
|
||
13 years ago
|
filename = host
|
||
12 years ago
|
fc = open(self.parent.trusted_path + filename,"w")
|
||
13 years ago
|
fc.write(self.cert)
|
||
|
fc.close()
|
||
|
with open(ca_certs) as fd:
|
||
|
t = fd.read()
|
||
|
# for each line
|
||
|
for line in t.splitlines():
|
||
|
# Split string into a words list
|
||
|
words = line.split()
|
||
|
if len(words) > 1:
|
||
|
# if first word...
|
||
|
if words[0] == host:
|
||
12 years ago
|
self.flag = 3
|
||
13 years ago
|
return 0
|
||
12 years ago
|
|
||
13 years ago
|
# Open file with compliance server certificates and server hostname
|
||
|
fcl = open(ca_certs,"a")
|
||
|
fcl.write(host + ' ' + filename + '\n')
|
||
|
fcl.close()
|
||
12 years ago
|
show_msg (_('Server certificate added to trusted') +'\n'+ \
|
||
|
(self.parent.trusted_path + filename),_('Certificate added'))
|
||
12 years ago
|
self.flag = 3
|
||
12 years ago
|
from conf_connection import FrameConnection
|
||
|
self.ConnectWidget = FrameConnection(self, self.ClientObj)
|
||
|
self.ConnectWidget.connect_to_host(host, self.ClientObj.port)
|
||
12 years ago
|
|
||
13 years ago
|
def add_ca(self):
|
||
12 years ago
|
cl_client_cert_dir = \
|
||
|
self.ClientObj.VarsApi.Get('core.cl_client_cert_dir')
|
||
12 years ago
|
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
|
||
13 years ago
|
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
|
||
|
root_cert_dir = cl_client_cert_dir + "/ca"
|
||
12 years ago
|
|
||
13 years ago
|
if not os.path.exists(root_cert_dir):
|
||
|
try:
|
||
|
os.makedirs(root_cert_dir)
|
||
|
except OSError:
|
||
12 years ago
|
show_msg (_('Error creating directory %s') %root_cert_dir)
|
||
13 years ago
|
return 1
|
||
12 years ago
|
|
||
12 years ago
|
self.parent.list_ca_certs = []
|
||
|
self.parent.add_ca_cert(self.cert, self.parent.list_ca_certs)
|
||
12 years ago
|
|
||
|
###############################################################################
|
||
|
|
||
13 years ago
|
class Client_suds(Client):
|
||
|
def set_parameters (self, path_to_cert, CERT_FILE, PKEY_FILE):
|
||
|
self.path_to_cert = path_to_cert
|
||
|
if not CERT_FILE:
|
||
|
CERT_FILE = ''
|
||
|
self.CERT_FILE = CERT_FILE
|
||
|
self.REQ_FILE = path_to_cert + 'client.csr'
|
||
|
self.PKEY_FILE = PKEY_FILE
|
||
12 years ago
|
self.SID_FILE = path_to_cert + 'sids'
|
||
13 years ago
|
self.CRL_PATH = path_to_cert + 'ca/crl/'
|
||
|
if not os.path.exists(self.CRL_PATH):
|
||
|
os.makedirs(self.CRL_PATH)
|
||
|
|
||
12 years ago
|
class CheckingClientHTTPSConnection(CheckingHTTPSConnection):
|
||
|
"""based on httplib.HTTPSConnection code - extended to support
|
||
|
server certificate verification and client certificate authorization"""
|
||
12 years ago
|
|
||
12 years ago
|
def __init__(self, ClientObj, cert_path, host, ca_certs=None,
|
||
|
cert_verifier=None, keyobj=None, certobj=None, **kw):
|
||
|
"""cert_verifier is a function returning either True or False
|
||
|
based on whether the certificate was found to be OK,
|
||
|
keyobj and certobj represent internal PyOpenSSL structures holding
|
||
|
the key and certificate respectively.
|
||
|
"""
|
||
|
CheckingHTTPSConnection.__init__(self, host, ca_certs, cert_verifier,
|
||
|
keyobj, certobj, **kw)
|
||
|
self.ClientObj = ClientObj
|
||
|
self.cert_path = cert_path
|
||
|
self.CRL_PATH = os.path.join(cert_path, 'ca/crl/')
|
||
|
|
||
|
def connect(self):
|
||
|
sock = socket.create_connection((self.host, self.port), self.timeout)
|
||
|
if hasattr(self, '_tunnel_host') and self._tunnel_host:
|
||
|
self.sock = sock
|
||
|
self._tunnel()
|
||
|
|
||
12 years ago
|
user_root_cert = self.ClientObj.VarsApi.Get('core.cl_user_root_cert')
|
||
12 years ago
|
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
|
||
|
user_root_cert = user_root_cert.replace("~",homePath)
|
||
|
result_user_root = 1
|
||
12 years ago
|
|
||
12 years ago
|
while True:
|
||
|
if os.path.exists(user_root_cert):
|
||
12 years ago
|
result_user_root = self.connect_trusted_root(sock, \
|
||
|
user_root_cert, self.CRL_PATH)
|
||
12 years ago
|
if result_user_root == 1:
|
||
12 years ago
|
glob_root_cert = self.ClientObj.VarsApi.Get \
|
||
12 years ago
|
('core.cl_glob_root_cert')
|
||
12 years ago
|
result_root_con = 1
|
||
|
if os.path.exists(glob_root_cert):
|
||
|
sock = socket.create_connection((self.host, self.port),
|
||
|
self.timeout, self.source_address)
|
||
|
if self._tunnel_host:
|
||
|
self.sock = sock
|
||
|
self._tunnel()
|
||
|
result_root_con = self.connect_trusted_root(sock, \
|
||
12 years ago
|
glob_root_cert, self.CRL_PATH)
|
||
12 years ago
|
if result_root_con == 1:
|
||
|
sock = socket.create_connection((self.host, self.port),
|
||
|
self.timeout, self.source_address)
|
||
|
if self._tunnel_host:
|
||
|
self.sock = sock
|
||
|
self._tunnel()
|
||
|
result_server_con = self.connect_trusted_server \
|
||
|
(sock, self.CRL_PATH)
|
||
|
if result_server_con in [1,2]:
|
||
12 years ago
|
raise Exception (1)
|
||
12 years ago
|
elif result_server_con == 3:
|
||
|
continue
|
||
12 years ago
|
elif result_server_con == 4:
|
||
|
raise Exception (_('This server is not trusted'))
|
||
12 years ago
|
elif result_root_con == 2:
|
||
12 years ago
|
raise Exception (1)
|
||
12 years ago
|
elif result_user_root == 2:
|
||
12 years ago
|
raise Exception (1)
|
||
12 years ago
|
break
|
||
|
|
||
|
# get filename store cert server
|
||
|
def cert_list (self, host, ca_certs, server_cert):
|
||
|
if host == '127.0.0.1':
|
||
|
host = 'localhost'
|
||
|
if not os.path.exists(self.trusted_path):
|
||
|
try:
|
||
|
os.makedirs(self.trusted_path)
|
||
|
except OSError:
|
||
|
pass
|
||
|
if not os.path.exists(ca_certs):
|
||
|
fc = open(ca_certs,"w")
|
||
|
fc.close()
|
||
|
filename = None
|
||
|
try:
|
||
|
with open(ca_certs) as fd:
|
||
|
t = fd.read()
|
||
|
# for each line
|
||
|
for line in t.splitlines():
|
||
|
# Split string into a words list
|
||
|
words = line.split()
|
||
|
if len(words) > 1:
|
||
|
# if first word...
|
||
|
if words[0] == host:
|
||
|
filename = words[1]
|
||
|
if not filename:
|
||
|
return None
|
||
|
except:
|
||
12 years ago
|
msg = _("Certificate not found on the client's side")
|
||
12 years ago
|
show_msg (msg)
|
||
|
# self.parent.MainWidget.bottom.addMessage(msg)
|
||
|
return None
|
||
|
try:
|
||
|
fd = open(self.trusted_path + filename, 'r')
|
||
|
store_cert = fd.read()
|
||
|
fd.close()
|
||
|
if store_cert == server_cert:
|
||
|
return filename
|
||
|
except:
|
||
12 years ago
|
msg = _('Error opening file')
|
||
12 years ago
|
show_msg (msg + ' %s%s' %(self.trusted_path, filename))
|
||
|
# self.parent.MainWidget.bottom.addMessage(msg)
|
||
|
return None
|
||
12 years ago
|
|
||
12 years ago
|
def add_all_ca_cert(self, list_ca_certs):
|
||
|
# so root cert be first, ca after
|
||
|
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
|
||
12 years ago
|
cl_client_cert_dir = \
|
||
|
self.ClientObj.VarsApi.Get('core.cl_client_cert_dir')
|
||
12 years ago
|
|
||
12 years ago
|
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
|
||
|
root_cert_md5 = cl_client_cert_dir + "/ca/cert_list"
|
||
12 years ago
|
|
||
12 years ago
|
list_ca_certs.reverse()
|
||
|
for cert in list_ca_certs:
|
||
12 years ago
|
system_ca_db = \
|
||
|
self.ClientObj.VarsApi.Get('core.cl_glob_root_cert')
|
||
12 years ago
|
if os.path.exists(system_ca_db):
|
||
|
if cert in open(system_ca_db, 'r').read():
|
||
|
continue
|
||
12 years ago
|
|
||
12 years ago
|
user_root_cert = \
|
||
|
self.ClientObj.VarsApi.Get('core.cl_user_root_cert')
|
||
12 years ago
|
user_root_cert = user_root_cert.replace("~",homePath)
|
||
|
if os.path.exists(user_root_cert):
|
||
|
if cert in open(user_root_cert, 'r').read():
|
||
|
continue
|
||
|
|
||
|
md5 = hashlib.md5()
|
||
|
md5.update(cert)
|
||
|
md5sum = md5.hexdigest()
|
||
12 years ago
|
|
||
12 years ago
|
if not os.path.exists(root_cert_md5):
|
||
|
fc = open(root_cert_md5,"w")
|
||
|
fc.close()
|
||
12 years ago
|
|
||
12 years ago
|
filename = None
|
||
|
with open(root_cert_md5) as fd:
|
||
|
t = fd.read()
|
||
|
# for each line
|
||
|
for line in t.splitlines():
|
||
|
# Split string into a words list
|
||
|
words = line.split(' ',1)
|
||
|
if words[0] == md5sum:
|
||
|
filename = words[1]
|
||
|
if not filename:
|
||
|
certobj = OpenSSL.crypto.load_certificate \
|
||
|
(OpenSSL.SSL.FILETYPE_PEM, cert)
|
||
|
Issuer = certobj.get_issuer().get_components()
|
||
|
for item in Issuer:
|
||
|
if item[0] == 'CN':
|
||
|
filename = item[1]
|
||
12 years ago
|
|
||
12 years ago
|
fc = open(root_cert_md5,"a")
|
||
|
fc.write('%s %s\n' %(md5sum, filename))
|
||
|
fc.close()
|
||
12 years ago
|
|
||
12 years ago
|
if not filename:
|
||
12 years ago
|
show_msg (_('Field "CN" not found in the certificate!'))
|
||
12 years ago
|
return 1
|
||
|
|
||
|
fd = open(cl_client_cert_dir + '/ca/' + filename, 'w')
|
||
|
fd.write(cert)
|
||
|
fd.close()
|
||
|
|
||
|
fa = open(user_root_cert, 'a')
|
||
|
fa.write(cert+'\n')
|
||
|
fa.close()
|
||
12 years ago
|
show_msg (_("Filename = %s") %filename, _('Certificate added'))
|
||
12 years ago
|
else:
|
||
12 years ago
|
show_msg (_('A file with the CA certificate now exists'))
|
||
12 years ago
|
get_CRL(cl_client_cert_dir)
|
||
12 years ago
|
|
||
12 years ago
|
def add_ca_cert(self, cert, list_ca_certs):
|
||
|
url = 'https://%s:%s/?wsdl' %(self.host, self.port)
|
||
|
client = Client_suds(url, \
|
||
12 years ago
|
transport = HTTPSClientCertTransport(None,None, \
|
||
|
self.cert_path, parent = self.ClientObj))
|
||
12 years ago
|
client.wsdl.services[0].setlocation(url)
|
||
|
try:
|
||
|
cert = client.service.get_ca()
|
||
|
except u2.URLError, e:
|
||
|
_print ('client.service.get_ca in client_class Exception')
|
||
|
cert = '0'
|
||
|
|
||
|
if cert == '1':
|
||
12 years ago
|
msg = _("Invalid server certificate!")
|
||
12 years ago
|
show_msg (msg)
|
||
|
return
|
||
12 years ago
|
|
||
12 years ago
|
if cert == '2':
|
||
12 years ago
|
msg = _('CA certificate not found on the server')
|
||
12 years ago
|
show_msg (msg)
|
||
|
return
|
||
|
|
||
|
if cert == '0':
|
||
|
show_msg (e, _("Not connected!"))
|
||
|
return
|
||
12 years ago
|
|
||
12 years ago
|
try:
|
||
|
certobj = OpenSSL.crypto.load_certificate \
|
||
|
(OpenSSL.SSL.FILETYPE_PEM, cert)
|
||
|
except:
|
||
|
msg = _('Error. Certificate not added to trusted')
|
||
|
show_msg (msg)
|
||
|
# self.parent.MainWidget.bottom.addMessage(msg)
|
||
|
return
|
||
12 years ago
|
|
||
12 years ago
|
inf_text = ''
|
||
|
inf_text += _("Fingerprint = %s") % certobj.digest('SHA1')
|
||
12 years ago
|
inf_text += '\n'+_("Serial Number = %s") \
|
||
|
%str(certobj.get_serial_number())
|
||
12 years ago
|
Issuer = certobj.get_issuer().get_components()
|
||
|
inf_text += '\n'+_("Issuer")
|
||
|
for i in Issuer:
|
||
|
inf_text += "\n %s : %s" %(i[0], i[1])
|
||
|
Subject = certobj.get_subject().get_components()
|
||
|
inf_text += '\n'+_("Subject")
|
||
|
for subj in Subject:
|
||
|
inf_text += "\n %s : %s" %(subj[0], subj[1])
|
||
|
|
||
12 years ago
|
text = _("Add the CA certificate to trusted? ")
|
||
12 years ago
|
reply = show_question(self.ClientObj.MainWidget, text, inf_text,
|
||
12 years ago
|
title = _('Adding the certificate'))
|
||
12 years ago
|
|
||
12 years ago
|
if reply == QtGui.QMessageBox.No:
|
||
|
show_msg (_('Certificate not added to trusted'))
|
||
12 years ago
|
|
||
12 years ago
|
elif reply == QtGui.QMessageBox.Yes:
|
||
|
list_ca_certs.append(cert)
|
||
|
self.add_all_ca_cert(list_ca_certs)
|
||
|
return
|
||
|
|
||
|
# add certificate server in trusted
|
||
|
def add_server_cert(self, cert):
|
||
|
self.add_cert = AddServerCert(self, self.ClientObj, cert)
|
||
12 years ago
|
self.add_cert.exec_()
|
||
12 years ago
|
return self.add_cert.flag
|
||
12 years ago
|
|
||
|
def connect_trusted_root(self, sock, root_cert, crl_certs):
|
||
|
self.ca_path = self.cert_path + "ca/"
|
||
|
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
|
||
|
global flag
|
||
|
|
||
|
if self.cert_file:
|
||
|
f = verify(server_cert, crl_certs, flag)
|
||
|
if not f:
|
||
|
flag = 1
|
||
|
elif f == 1:
|
||
|
sys.exit()
|
||
|
else:
|
||
|
import time
|
||
|
time.sleep(0.1)
|
||
12 years ago
|
|
||
12 years ago
|
try:
|
||
|
if self.FORCE_SSL_VERSION:
|
||
|
add = {'ssl_version': self.FORCE_SSL_VERSION}
|
||
|
else:
|
||
|
add = {}
|
||
|
add['cert_reqs'] = ssl.CERT_REQUIRED
|
||
|
# try to use PyOpenSSL by default
|
||
|
if PYOPENSSL_AVAILABLE:
|
||
|
wrap_class = PyOpenSSLSocket
|
||
|
add['keyobj'] = self.keyobj
|
||
|
add['certobj'] = self.certobj
|
||
|
add['keyfile'] = self.key_file
|
||
|
add['certfile'] = self.cert_file
|
||
|
else:
|
||
|
wrap_class = ssl.SSLSocket
|
||
|
self.sock = wrap_class(sock, ca_certs=self.ca_certs, **add)
|
||
|
return 0
|
||
|
except:
|
||
|
return 1
|
||
|
|
||
|
def connect_trusted_server(self, sock, crl_certs):
|
||
|
self.trusted_path = self.cert_path + "trusted/"
|
||
|
ca_cert_list = self.trusted_path + "cert.list"
|
||
|
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
|
||
|
global flag
|
||
|
if self.cert_file:
|
||
|
f = verify(server_cert, crl_certs, flag)
|
||
|
if not f:
|
||
|
flag = 1
|
||
|
elif f == 1:
|
||
|
sys.exit()
|
||
|
HTTPSClientCertTransport.filename = self.cert_list \
|
||
|
(self.host, ca_cert_list, server_cert)
|
||
|
if HTTPSClientCertTransport.filename:
|
||
|
try:
|
||
|
if self.FORCE_SSL_VERSION:
|
||
|
add = {'ssl_version': self.FORCE_SSL_VERSION}
|
||
|
else:
|
||
|
add = {}
|
||
|
add['cert_reqs'] = ssl.CERT_NONE
|
||
|
# try to use PyOpenSSL by default
|
||
|
if PYOPENSSL_AVAILABLE:
|
||
|
wrap_class = PyOpenSSLSocket
|
||
|
add['keyobj'] = self.keyobj
|
||
|
add['certobj'] = self.certobj
|
||
|
add['keyfile'] = self.key_file
|
||
|
add['certfile'] = self.cert_file
|
||
|
else:
|
||
|
wrap_class = ssl.SSLSocket
|
||
|
self.sock = wrap_class(sock, ca_certs=self.ca_certs, **add)
|
||
|
|
||
|
return 0
|
||
12 years ago
|
except OpenSSL.SSL.Error, e:
|
||
|
if type(e.message) == list:
|
||
|
if type(e.message[0]) == tuple:
|
||
|
for i in e.message[0]:
|
||
|
sys.stdout.write(i+' ')
|
||
|
sys.stdout.flush()
|
||
|
sys.stdout.write('\n')
|
||
|
sys.stdout.flush()
|
||
|
else:
|
||
|
_print (e.message)
|
||
|
else:
|
||
|
_print (e.message)
|
||
|
HTTPSClientCertTransport.filename = None
|
||
|
return 1
|
||
12 years ago
|
except Exception, e:
|
||
|
_print (e)
|
||
|
HTTPSClientCertTransport.filename = None
|
||
|
return 1
|
||
|
else:
|
||
12 years ago
|
return self.add_server_cert(server_cert)
|
||
|
|
||
12 years ago
|
class CheckingClientHTTPSHandler(CheckingHTTPSHandler):
|
||
|
def __init__(self, ClientObj, cert_path, ca_certs=None, cert_verifier=None,
|
||
|
client_certfile=None, client_keyfile=None,
|
||
|
client_keyobj=None, client_certobj=None,
|
||
|
*args, **kw):
|
||
|
"""cert_verifier is a function returning either True or False
|
||
|
based on whether the certificate was found to be OK"""
|
||
|
CheckingHTTPSHandler.__init__(self, ca_certs, cert_verifier,
|
||
|
client_keyfile, client_certfile,
|
||
|
client_keyobj, client_certobj)
|
||
13 years ago
|
self.ClientObj = ClientObj
|
||
|
self.cert_path = cert_path
|
||
|
|
||
|
def https_open(self, req):
|
||
12 years ago
|
def open(*args, **kw):
|
||
|
new_kw = dict(ca_certs=self.ca_certs,
|
||
|
cert_verifier=self.cert_verifier,
|
||
|
cert_file=self.client_certfile,
|
||
|
key_file=self.client_keyfile,
|
||
|
keyobj=self.keyobj,
|
||
|
certobj=self.certobj)
|
||
|
new_kw.update(kw)
|
||
|
return CheckingClientHTTPSConnection(self.ClientObj,
|
||
|
self.cert_path, *args, **new_kw)
|
||
|
return self.do_open(open, req)
|
||
12 years ago
|
|
||
12 years ago
|
https_request = u2.AbstractHTTPHandler.do_request_
|
||
13 years ago
|
|
||
|
class HTTPSClientCertTransport(HttpTransport):
|
||
12 years ago
|
def __init__(self, key, cert, path_to_cert, parent, password = None,
|
||
12 years ago
|
ca_certs=None, cert_verifier=None,
|
||
|
client_keyfile=None, client_certfile=None,
|
||
|
client_keyobj=None, client_certobj=None,
|
||
|
cookie_callback=None, user_agent_string=None,
|
||
|
**kwargs):
|
||
|
Transport.__init__(self)
|
||
13 years ago
|
self.ClientObj = parent
|
||
|
self.key = key
|
||
|
self.cert = cert
|
||
|
self.cert_path = path_to_cert
|
||
12 years ago
|
if key:
|
||
|
client_certobj = OpenSSL.crypto.load_certificate \
|
||
|
(OpenSSL.SSL.FILETYPE_PEM, file(cert).read())
|
||
12 years ago
|
if password:
|
||
|
client_keyobj = OpenSSL.crypto.load_privatekey \
|
||
|
(OpenSSL.SSL.FILETYPE_PEM, file(key).read(),
|
||
|
str(password))
|
||
|
else:
|
||
|
import M2Crypto
|
||
|
bio = M2Crypto.BIO.openfile(key)
|
||
|
rsa = M2Crypto.m2.rsa_read_key(bio._ptr(),lambda *unused: None)
|
||
|
if not rsa:
|
||
|
raise OpenSSL.crypto.Error
|
||
|
client_keyobj = OpenSSL.crypto.load_privatekey \
|
||
|
(OpenSSL.SSL.FILETYPE_PEM, file(key).read())
|
||
12 years ago
|
|
||
|
Unskin(self.options).update(kwargs)
|
||
|
self.cookiejar = CookieJar(DefaultCookiePolicy())
|
||
|
self.cookie_callback = cookie_callback
|
||
|
self.user_agent_string = user_agent_string
|
||
|
log.debug("Proxy: %s", self.options.proxy)
|
||
|
from dslib.network import ProxyManager
|
||
|
proxy_handler = ProxyManager.HTTPS_PROXY.create_proxy_handler()
|
||
12 years ago
|
proxy_auth_handler = \
|
||
|
ProxyManager.HTTPS_PROXY.create_proxy_auth_handler()
|
||
12 years ago
|
if ca_certs or (client_keyfile and client_certfile) \
|
||
12 years ago
|
or (client_keyobj and client_certobj):
|
||
12 years ago
|
https_handler = CheckingClientHTTPSHandler(parent,
|
||
|
cert_path=path_to_cert,
|
||
|
ca_certs=ca_certs,
|
||
|
cert_verifier=cert_verifier,
|
||
|
client_keyfile=client_keyfile,
|
||
|
client_certfile=client_certfile,
|
||
|
client_keyobj=client_keyobj,
|
||
|
client_certobj=client_certobj)
|
||
13 years ago
|
else:
|
||
12 years ago
|
https_handler = u2.HTTPSHandler()
|
||
|
self.urlopener = u2.build_opener(SUDSHTTPRedirectHandler(),
|
||
12 years ago
|
u2.HTTPCookieProcessor(self.cookiejar),
|
||
|
https_handler)
|
||
12 years ago
|
if proxy_handler:
|
||
|
self.urlopener.add_handler(proxy_handler)
|
||
|
if proxy_auth_handler:
|
||
|
self.urlopener.add_handler(proxy_auth_handler)
|
||
|
self.urlopener.addheaders = [('User-agent', self.user_agent_string)]
|
||
12 years ago
|
|
||
12 years ago
|
###############################################################################
|
||
12 years ago
|
|
||
|
def get_CRL(path_to_cert):
|
||
|
""" get new CRL (Certificate Revocation List) from all CA """
|
||
|
# local CRL
|
||
12 years ago
|
CRL_path = os.path.join(path_to_cert, 'ca/crl/')
|
||
12 years ago
|
if not os.path.exists(CRL_path):
|
||
12 years ago
|
if not os.path.exists(os.path.join(path_to_cert, 'ca')):
|
||
12 years ago
|
if not os.path.exists(path_to_cert):
|
||
|
try:
|
||
|
os.makedirs(path_to_cert)
|
||
|
except OSError:
|
||
12 years ago
|
_print (_("Error creating directory %s") %path_to_cert)
|
||
12 years ago
|
sys.exit()
|
||
|
try:
|
||
12 years ago
|
os.makedirs(os.path.join(path_to_cert, 'ca'))
|
||
12 years ago
|
except OSError:
|
||
12 years ago
|
_print (_("Error creating directory %s") \
|
||
12 years ago
|
%(os.path.join(path_to_cert, 'ca')))
|
||
12 years ago
|
sys.exit()
|
||
|
os.makedirs(CRL_path)
|
||
|
|
||
|
clVars = DataVarsCore()
|
||
|
clVars.importCore()
|
||
|
clVars.flIniFile()
|
||
|
# user and system ca and root certificates
|
||
12 years ago
|
user_root_cert = clVars.Get('core.cl_user_root_cert')
|
||
12 years ago
|
homePath = clVars.Get('ur_home_path')
|
||
|
user_root_cert = user_root_cert.replace("~",homePath)
|
||
|
|
||
12 years ago
|
glob_root_cert = clVars.Get('core.cl_glob_root_cert')
|
||
12 years ago
|
|
||
|
if os.path.exists(user_root_cert):
|
||
|
user_ca_certs = open(user_root_cert, 'r').read()
|
||
|
else: user_ca_certs = ''
|
||
|
if os.path.exists(glob_root_cert):
|
||
|
glob_ca_certs = open(glob_root_cert, 'r').read()
|
||
|
else: glob_ca_certs = ''
|
||
|
|
||
|
# get certificates list fron text
|
||
12 years ago
|
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]{5}\n?')
|
||
12 years ago
|
user_ca_certs_list = p.findall(user_ca_certs)
|
||
|
glob_ca_certs_list = p.findall(glob_ca_certs)
|
||
|
|
||
|
# association in one list
|
||
|
all_ca_certs_list = user_ca_certs_list + glob_ca_certs_list
|
||
|
for ca in all_ca_certs_list:
|
||
|
certobj = OpenSSL.crypto.load_certificate \
|
||
|
(OpenSSL.SSL.FILETYPE_PEM, ca)
|
||
|
# get url from certificates
|
||
|
url = None
|
||
|
CN = None
|
||
|
Subject = certobj.get_subject().get_components()
|
||
|
for subj in Subject:
|
||
|
if subj[0] == 'L':
|
||
|
url = "https://" + subj[1] +"/?wsdl"
|
||
|
if subj[0] == 'CN':
|
||
|
CN = subj[1]
|
||
|
|
||
|
if url:
|
||
|
# connect to ca server (url get from certificates)
|
||
|
try:
|
||
|
client = Client_suds(url,\
|
||
|
transport = HTTPSClientCertTransport(None, None, \
|
||
|
path_to_cert))
|
||
|
|
||
|
client.set_parameters (path_to_cert, None, None)
|
||
|
new_crl = client.service.get_crl()
|
||
|
except VerifyError, e:
|
||
12 years ago
|
_print (e.value)
|
||
12 years ago
|
#rm_ca_from_trusted(ca)
|
||
|
sys.exit()
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
if 'new_crl' in locals():
|
||
|
if new_crl:
|
||
|
if CN and len(CN) > 2:
|
||
|
CRL_file = CRL_path + CN
|
||
|
else:
|
||
|
host = subj[1].split(':')[0]
|
||
|
CRL_file = CRL_path + host
|
||
|
if new_crl == ' ':
|
||
|
open(CRL_file, 'w')
|
||
|
#if os.path.exists(CRL_file):
|
||
|
#os.unlink(CRL_file)
|
||
|
continue
|
||
|
if os.path.exists(CRL_file):
|
||
|
if open(CRL_file, 'r').read() == new_crl:
|
||
|
continue
|
||
|
|
||
|
fd = open(CRL_file, 'w')
|
||
|
fd.write(new_crl)
|
||
|
fd.close()
|
||
12 years ago
|
_print (_("CRL added"))
|
||
12 years ago
|
find_ca_in_crl (CRL_path, all_ca_certs_list)
|
||
|
|
||
|
def find_ca_in_crl (CRL_path, all_ca_certs_list):
|
||
|
# CRL_name_list = glob.glob(CRL_path + '*')
|
||
|
for ca in all_ca_certs_list:
|
||
|
certobj = OpenSSL.crypto.load_certificate \
|
||
|
(OpenSSL.SSL.FILETYPE_PEM, ca)
|
||
|
|
||
|
Issuer = certobj.get_issuer().get_components()
|
||
|
for item in Issuer:
|
||
|
if item[0] == 'CN':
|
||
|
CN = item[1]
|
||
|
serverSerial = certobj.get_serial_number()
|
||
|
CRL = CRL_path + CN
|
||
|
if not os.path.exists(CRL):
|
||
|
continue
|
||
|
|
||
|
with open(CRL, 'r') as _crl_file:
|
||
|
crl = "".join(_crl_file.readlines())
|
||
|
|
||
|
try:
|
||
12 years ago
|
crl_object = OpenSSL.crypto.load_crl \
|
||
|
(OpenSSL.crypto.FILETYPE_PEM, crl)
|
||
12 years ago
|
except:
|
||
|
continue
|
||
|
revoked_objects = crl_object.get_revoked()
|
||
|
|
||
|
for rvk in revoked_objects:
|
||
|
if serverSerial == int(rvk.get_serial(), 16):
|
||
|
rm_ca_from_trusted(ca)
|
||
|
|
||
|
def rm_ca_from_trusted(ca_cert):
|
||
|
clVars = DataVarsCore()
|
||
|
clVars.importCore()
|
||
|
clVars.flIniFile()
|
||
|
|
||
12 years ago
|
user_ca_dir = clVars.Get('core.cl_client_cert_dir')
|
||
12 years ago
|
homePath = clVars.Get('ur_home_path')
|
||
|
user_ca_dir = user_ca_dir.replace("~",homePath)
|
||
|
user_ca_dir = os.path.join(user_ca_dir, 'ca')
|
||
|
user_ca_list = os.path.join(user_ca_dir, 'cert_list')
|
||
12 years ago
|
user_ca_db = clVars.Get('core.cl_user_root_cert')
|
||
12 years ago
|
homePath = clVars.Get('ur_home_path')
|
||
|
user_ca_db = user_ca_db.replace("~",homePath)
|
||
|
|
||
|
# system_ca_dir = clVars.Get('cl_core_cert_path')
|
||
|
# system_ca_list = os.path.join(system_ca_dir, 'cert_list')
|
||
|
system_ca_db = clVars.Get('cl_glob_root_cert')
|
||
|
|
||
|
import hashlib
|
||
|
md5 = hashlib.md5()
|
||
|
md5.update(ca_cert)
|
||
|
md5sum = md5.hexdigest()
|
||
|
|
||
|
# search ca certificate in user ca list
|
||
|
with open(user_ca_list) as fd:
|
||
|
t = fd.read()
|
||
|
# See each line
|
||
|
for line in t.splitlines():
|
||
|
newfile = ''
|
||
|
# and each word in line
|
||
|
words = line.split()
|
||
|
if words[0] == md5sum:
|
||
|
filename = os.path.join(user_ca_dir, words[1])
|
||
|
if ca_cert == open(filename, 'r').read():
|
||
|
os.unlink(filename)
|
||
|
else:
|
||
|
newfile += (line + '\n')
|
||
|
else:
|
||
|
newfile += (line + '\n')
|
||
|
|
||
|
fd.close()
|
||
|
fn = open(user_ca_list, 'w')
|
||
|
fn.write(newfile)
|
||
|
fn.close()
|
||
|
|
||
|
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?')
|
||
|
|
||
|
# open, write and split user ca certificates
|
||
|
user_ca_certs = open(user_ca_db, 'r').read()
|
||
|
user_ca_certs_list = p.findall(user_ca_certs)
|
||
|
|
||
|
if ca_cert in user_ca_certs_list:
|
||
|
new_user_ca_certs = []
|
||
|
for cert in user_ca_certs_list:
|
||
|
if ca_cert != cert:
|
||
|
new_user_ca_certs.append(cert)
|
||
|
else:
|
||
12 years ago
|
_print (_("CA certificate removed from user trusted"))
|
||
12 years ago
|
|
||
|
fd = open(user_ca_db, 'w')
|
||
|
for cert in new_user_ca_certs:
|
||
|
fd.write(cert)
|
||
|
fd.close()
|
||
|
|
||
|
if not os.path.exists(system_ca_db):
|
||
|
open(system_ca_db, 'w')
|
||
|
|
||
|
system_ca_certs = open(system_ca_db, 'r').read()
|
||
|
system_ca_certs_list = p.findall(system_ca_certs)
|
||
|
|
||
|
if ca_cert in system_ca_certs_list:
|
||
|
new_system_ca_certs = []
|
||
|
for cert in system_ca_certs_list:
|
||
|
if ca_cert != cert:
|
||
|
new_system_ca_certs.append(cert)
|
||
|
else:
|
||
12 years ago
|
_print (_("CA certificate removed from system trusted"))
|
||
12 years ago
|
|
||
|
fd = open(system_ca_db, 'w')
|
||
|
for cert in new_system_ca_certs:
|
||
|
fd.write(cert)
|
||
|
fd.close()
|
||
12 years ago
|
return 0
|