You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-console-gui/consolegui/application/client_class.py

969 lines
38 KiB

#-*- coding: utf-8 -*-
# Copyright 2012 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from PySide import QtGui, QtCore
import urllib2 as u2
import os, re, sys
from calculate.core.datavars import DataVarsCore
from sudsds.transport.http import HttpTransport, SUDSHTTPRedirectHandler, \
CheckingHTTPSConnection, CheckingHTTPSHandler, \
PYOPENSSL_AVAILABLE, PyOpenSSLSocket
from sudsds.transport import Transport
from sudsds.properties import Unskin
from cookielib import CookieJar, DefaultCookiePolicy
#import httplib
import socket, ssl
import OpenSSL, hashlib
from sudsds.client import Client
#from sudsds.cache import ObjectCache
#from sudsds.options import Options
from logging import getLogger
from calculate.core.client.cert_verify import verify, VerifyError
from more import show_msg, show_question, LabelWordWrap
flag = 0
log = getLogger(__name__)
class AddServerCert (QtGui.QDialog):
def __init__(self, parent, ClientObj, cert):
super(AddServerCert, self).__init__()
self.ClientObj = ClientObj
self.parent = parent
self.cert = cert
self.grid = QtGui.QGridLayout(self)
self.lbl_list = []
self.grid.addWidget(LabelWordWrap(_('Untrusted Server Certificate!'), \
self), 0, 1, 1, 2)
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
self.grid.addWidget(LabelWordWrap (_('Fingerprint = %s') \
% certobj.digest('SHA1'), self), 1, 0, 1, 3)
self.grid.addWidget(LabelWordWrap (_('Serial Number = %s') \
% certobj.get_serial_number(), self), 2, 0, 1, 3)
self.tab = QtGui.QTabWidget(self)
# add Issuer tab
self.issuer_wgt = QtGui.QWidget(self)
self.issuer_layout = QtGui.QVBoxLayout()
Issuer = certobj.get_issuer().get_components()
for i in Issuer:
self.issuer_layout.addWidget(LabelWordWrap \
("%s : %s" %(i[0], i[1]),self))
self.issuer_wgt.setLayout(self.issuer_layout)
self.tab.addTab(self.issuer_wgt, _('Issuer'))
# add Subject tab
self.subject_wgt = QtGui.QWidget(self)
self.subject_layout = QtGui.QVBoxLayout()
Subject = certobj.get_subject().get_components()
for item in Subject:
self.subject_layout.addWidget(LabelWordWrap \
("%s : %s" %(item[0], item[1]),self))
self.subject_wgt.setLayout(self.subject_layout)
self.tab.addTab(self.subject_wgt, _('Subject'))
# add certificate
# self.cert_lbl = LabelWordWrap (certobj,self)
# self.tab.addTab(self.cert_lbl, 'Certificate')
self.grid.addWidget(self.tab, 3, 0, 3, 3)
self.lbl_list.append(LabelWordWrap \
(_('Add this Servers certificate to trusted or ') +\
_('try add CA and ROOT certificates to trusted?'),self))
self.pix_lbl = QtGui.QLabel(self)
pi = QtGui.QPixmap()
pi.load('/usr/share/icons/oxygen/48x48/status/security-medium.png')
self.pix_lbl.setPixmap(pi)
self.grid.addWidget(self.pix_lbl, 0,0)
for num_lbl in range(len(self.lbl_list)):
self.grid.addWidget(self.lbl_list[num_lbl], num_lbl + 8, 0, 1, 3)
x = len (self.lbl_list) + 8
self.server_but = QtGui.QPushButton(_('Add Servers certificate'), self)
self.server_but.clicked.connect(self.add_server)
self.server_but.clicked.connect(self.close)
self.grid.addWidget(self.server_but, x, 0)
self.ca_but = QtGui.QPushButton(_('Add CA and ROOT certificate'), self)
self.ca_but.clicked.connect(self.add_ca)
self.ca_but.clicked.connect(self.add_server)
self.grid.addWidget(self.ca_but, x, 1)
self.cancel_but = QtGui.QPushButton(_('Cancel'), self)
self.cancel_but.clicked.connect(self.close)
self.grid.addWidget(self.cancel_but, x, 2)
# self.setLayout(self.grid)
self.setWindowTitle (_('Add certificate to trusted'))
# move to center
prim_screen = ClientObj.app.desktop().primaryScreen()
x = ClientObj.app.desktop().screenGeometry(prim_screen).width()/2 - \
self.sizeHint().width()/2
y = ClientObj.app.desktop().screenGeometry(prim_screen).height()/2 - \
self.sizeHint().height()/2
self.move(x, y)
self.setFixedSize(self.sizeHint())
self.setAttribute(QtCore.Qt.WA_ShowModal)
self.exec_()
self.setAttribute(QtCore.Qt.WA_DeleteOnClose)
def add_server(self):
ca_certs = self.parent.trusted_path + "cert.list"
if not os.path.exists(ca_certs):
fc = open(ca_certs,"w")
fc.close()
if self.parent.host == '127.0.0.1':
host = 'localhost'
else:
host = self.parent.host
filename = host
fc = open(self.parent.trusted_path + filename,"w")
fc.write(self.cert)
fc.close()
with open(ca_certs) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split()
if len(words) > 1:
# if first word...
if words[0] == host:
return 0
# Open file with compliance server certificates and server hostname
fcl = open(ca_certs,"a")
fcl.write(host + ' ' + filename + '\n')
fcl.close()
show_msg (_('Server certificate add to trusted \n%s') \
%(self.parent.trusted_path + filename),_('Certificate add'))
from conf_connection import FrameConnection
self.ConnectWidget = FrameConnection(self, self.ClientObj)
self.ConnectWidget.connect_to_host(host, self.ClientObj.port)
def add_ca(self):
cl_client_cert_dir = self.ClientObj.VarsApi.Get('cl_client_cert_dir')
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
root_cert_dir = cl_client_cert_dir + "/ca"
if not os.path.exists(root_cert_dir):
try:
os.makedirs(root_cert_dir)
except OSError:
show_msg (_('error creating directory %s') %root_cert_dir)
return 1
# show_msg ("Try add CA and ROOT certificates")
self.parent.list_ca_certs = []
self.parent.add_ca_cert(self.cert, self.parent.list_ca_certs)
self.close()
###############################################################################
class Client_suds(Client):
# def __init__(self, url, **kwargs):
# Client.__init__(self, url, **kwargs)
# options = Options()
# options.cache = ObjectCache(days=0)
def set_parameters (self, path_to_cert, CERT_FILE, PKEY_FILE):
self.path_to_cert = path_to_cert
if not CERT_FILE:
CERT_FILE = ''
self.CERT_FILE = CERT_FILE
self.REQ_FILE = path_to_cert + 'client.csr'
self.PKEY_FILE = PKEY_FILE
self.SID_FILE = path_to_cert + 'sids'
self.CRL_PATH = path_to_cert + 'ca/crl/'
if not os.path.exists(self.CRL_PATH):
os.makedirs(self.CRL_PATH)
#class clientHTTPSConnection (httplib.HTTPSConnection):
#
# def __init__(self, host, port=None, key_file=None, cert_file=None,
# strict=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
# source_address=None, cert_path=None, ClientObj = None):
# httplib.HTTPConnection.__init__(self, host, port, strict, timeout,
# source_address)
# self.ClientObj = ClientObj
# self.key_file = key_file
# self.cert_file = cert_file
# self.cert_path = cert_path
# self.CRL_PATH = cert_path + 'ca/crl/'
class CheckingClientHTTPSConnection(CheckingHTTPSConnection):
"""based on httplib.HTTPSConnection code - extended to support
server certificate verification and client certificate authorization"""
def __init__(self, ClientObj, cert_path, host, ca_certs=None,
cert_verifier=None, keyobj=None, certobj=None, **kw):
"""cert_verifier is a function returning either True or False
based on whether the certificate was found to be OK,
keyobj and certobj represent internal PyOpenSSL structures holding
the key and certificate respectively.
"""
CheckingHTTPSConnection.__init__(self, host, ca_certs, cert_verifier,
keyobj, certobj, **kw)
self.ClientObj = ClientObj
self.cert_path = cert_path
self.CRL_PATH = os.path.join(cert_path, 'ca/crl/')
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if hasattr(self, '_tunnel_host') and self._tunnel_host:
self.sock = sock
self._tunnel()
user_root_cert = self.ClientObj.VarsApi.Get('cl_user_root_cert')
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
user_root_cert = user_root_cert.replace("~",homePath)
result_user_root = 1
while True:
if os.path.exists(user_root_cert):
result_user_root = self.connect_trusted_root(sock, user_root_cert,\
self.CRL_PATH)
if result_user_root == 1:
glob_root_cert = self.ClientObj.VarsApi.Get('cl_glob_root_cert')
result_root_con = 1
if os.path.exists(glob_root_cert):
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
result_root_con = self.connect_trusted_root(sock, \
glob_root_cert, self.CRL_PATH)
if result_root_con == 1:
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
result_server_con = self.connect_trusted_server \
(sock, self.CRL_PATH)
if result_server_con in [1,2]:
raise Exception (1)
elif result_server_con == 3:
continue
elif result_root_con == 2:
raise Exception (1)
elif result_user_root == 2:
raise Exception (1)
break
# get filename store cert server
def cert_list (self, host, ca_certs, server_cert):
if host == '127.0.0.1':
host = 'localhost'
if not os.path.exists(self.trusted_path):
try:
os.makedirs(self.trusted_path)
except OSError:
pass
if not os.path.exists(ca_certs):
fc = open(ca_certs,"w")
fc.close()
filename = None
try:
with open(ca_certs) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split()
if len(words) > 1:
# if first word...
if words[0] == host:
filename = words[1]
if not filename:
return None
except:
msg = _("Certificate not found in client")
show_msg (msg)
# self.parent.MainWidget.bottom.addMessage(msg)
return None
try:
fd = open(self.trusted_path + filename, 'r')
store_cert = fd.read()
fd.close()
if store_cert == server_cert:
return filename
except:
msg = _('Error open file')
show_msg (msg + ' %s%s' %(self.trusted_path, filename))
# self.parent.MainWidget.bottom.addMessage(msg)
return None
def add_all_ca_cert(self, list_ca_certs):
# so root cert be first, ca after
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
cl_client_cert_dir = self.ClientObj.VarsApi.Get('cl_client_cert_dir')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
root_cert_md5 = cl_client_cert_dir + "/ca/cert_list"
list_ca_certs.reverse()
for cert in list_ca_certs:
system_ca_db = self.ClientObj.VarsApi.Get('cl_glob_root_cert')
if os.path.exists(system_ca_db):
if cert in open(system_ca_db, 'r').read():
continue
user_root_cert = self.ClientObj.VarsApi.Get('cl_user_root_cert')
user_root_cert = user_root_cert.replace("~",homePath)
if os.path.exists(user_root_cert):
if cert in open(user_root_cert, 'r').read():
continue
md5 = hashlib.md5()
md5.update(cert)
md5sum = md5.hexdigest()
if not os.path.exists(root_cert_md5):
fc = open(root_cert_md5,"w")
fc.close()
filename = None
with open(root_cert_md5) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split(' ',1)
if words[0] == md5sum:
filename = words[1]
if not filename:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == 'CN':
filename = item[1]
fc = open(root_cert_md5,"a")
fc.write('%s %s\n' %(md5sum, filename))
fc.close()
if not filename:
show_msg (_('Not found field "CN" in certificate!'))
return 1
fd = open(cl_client_cert_dir + '/ca/' + filename, 'w')
fd.write(cert)
fd.close()
fa = open(user_root_cert, 'a')
fa.write(cert+'\n')
fa.close()
show_msg (_("filename = %s") %filename, _('CERTIFICATE ADD'))
else:
show_msg (_('File with ca certificates exists'))
get_CRL(cl_client_cert_dir + '/')
def add_ca_cert(self, cert, list_ca_certs):
url = 'https://%s:%s/?wsdl' %(self.host, self.port)
client = Client_suds(url, \
transport = HTTPSClientCertTransport(None,None, self.cert_path, \
parent = self.ClientObj))
client.wsdl.services[0].setlocation(url)
try:
cert = client.service.get_ca()
except u2.URLError, e:
_print ('client.service.get_ca in client_class Exception')
cert = '0'
if cert == '1':
msg = _('Server certificate is not valid')
show_msg (msg)
return
if cert == '2':
msg = _('CA not found on server')
show_msg (msg)
return
if cert == '0':
show_msg (e, _("Not connected!"))
return
try:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
except:
msg = _('Error. Certificate not added to trusted')
show_msg (msg)
# self.parent.MainWidget.bottom.addMessage(msg)
return
inf_text = ''
inf_text += _("Fingerprint = %s") % certobj.digest('SHA1')
inf_text += '\n'+_("Serial Number = %s") %str(certobj.get_serial_number())
Issuer = certobj.get_issuer().get_components()
inf_text += '\n'+_("Issuer")
for i in Issuer:
inf_text += "\n %s : %s" %(i[0], i[1])
Subject = certobj.get_subject().get_components()
inf_text += '\n'+_("Subject")
for subj in Subject:
inf_text += "\n %s : %s" %(subj[0], subj[1])
text = _("Add CA certificates to trusted? ")
reply = show_question(self.ClientObj.MainWidget, text, inf_text)
if reply == QtGui.QMessageBox.No:
show_msg (_('Certificate not added to trusted'))
elif reply == QtGui.QMessageBox.Yes:
list_ca_certs.append(cert)
self.add_all_ca_cert(list_ca_certs)
return
# add certificate server in trusted
def add_server_cert(self, cert):
self.add_cert = AddServerCert(self, self.ClientObj, cert)
def connect_trusted_root(self, sock, root_cert, crl_certs):
self.ca_path = self.cert_path + "ca/"
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
global flag
if self.cert_file:
f = verify(server_cert, crl_certs, flag)
if not f:
flag = 1
elif f == 1:
sys.exit()
else:
import time
time.sleep(0.1)
try:
if self.FORCE_SSL_VERSION:
add = {'ssl_version': self.FORCE_SSL_VERSION}
else:
add = {}
add['cert_reqs'] = ssl.CERT_REQUIRED
# try to use PyOpenSSL by default
if PYOPENSSL_AVAILABLE:
wrap_class = PyOpenSSLSocket
add['keyobj'] = self.keyobj
add['certobj'] = self.certobj
add['keyfile'] = self.key_file
add['certfile'] = self.cert_file
else:
wrap_class = ssl.SSLSocket
self.sock = wrap_class(sock, ca_certs=self.ca_certs, **add)
# self.sock = ssl.wrap_socket(sock,\
# certfile = self.cert_file, \
# keyfile = self.key_file, \
# ca_certs = root_cert, \
# ssl_version = ssl.PROTOCOL_SSLv23, \
# cert_reqs = ssl.CERT_REQUIRED)
# dercert_after_connect = self.sock.getpeercert(True)
# cert_after_connect = \
# ssl.DER_cert_to_PEM_cert(dercert_after_connect)
# if not server_cert == cert_after_connect:
# show_msg (_("\nWARNING! %s trying to replace certificate!\n") \
# %self.host)
# self.sock.close()
# return 2
return 0
except:
return 1
def connect_trusted_server(self, sock, crl_certs):
self.trusted_path = self.cert_path + "trusted/"
ca_cert_list = self.trusted_path + "cert.list"
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
global flag
if self.cert_file:
f = verify(server_cert, crl_certs, flag)
if not f:
flag = 1
elif f == 1:
sys.exit()
#if not hasattr(HTTPSClientCertTransport, 'filename') or \
#HTTPSClientCertTransport.filename == None:
HTTPSClientCertTransport.filename = self.cert_list \
(self.host, ca_cert_list, server_cert)
if HTTPSClientCertTransport.filename:
try:
if self.FORCE_SSL_VERSION:
add = {'ssl_version': self.FORCE_SSL_VERSION}
else:
add = {}
add['cert_reqs'] = ssl.CERT_NONE
# try to use PyOpenSSL by default
if PYOPENSSL_AVAILABLE:
wrap_class = PyOpenSSLSocket
add['keyobj'] = self.keyobj
add['certobj'] = self.certobj
add['keyfile'] = self.key_file
add['certfile'] = self.cert_file
else:
wrap_class = ssl.SSLSocket
# print 'HHHHHHHHHHHHHHHHHHHHHH'
self.sock = wrap_class(sock, ca_certs=self.ca_certs, **add)
# self.sock = ssl.wrap_socket(sock,\
# certfile = self.cert_file, \
# keyfile = self.key_file, \
# #ca_certs = self.ca_path + self.filename, \
# ssl_version = ssl.PROTOCOL_SSLv23, \
# cert_reqs = ssl.CERT_NONE)
# dercert_after_connect = self.sock.getpeercert(True)
# print 'FFFFFFFFFFFFFFFFFFFFFFFFFF'
# cert_after_connect = \
# ssl.DER_cert_to_PEM_cert(dercert_after_connect)
# print 'DDDDDDDDDDDDDDDDDDDDDDDDDDDd'
# filename2 = \
# self.cert_list (self.host, ca_cert_list, cert_after_connect)
#
# print 'JJJJJJJJJJJJJJJJJJJJ'
# if not HTTPSClientCertTransport.filename == filename2:
# show_msg (_("\nWARNING! %s trying to replace certificate!\n")\
# %self.host)
#self.sock.close()
#return 2
return 0
except Exception, e:
_print (e)
HTTPSClientCertTransport.filename = None
return 1
else:
self.add_server_cert (server_cert)
return 3
#
# def connect(self):
# """Connect to a host on a given (SSL) port."""
# if self.host == '127.0.0.1':
# self.host = 'localhost'
# sock = socket.create_connection((self.host, self.port),
# self.timeout, self.source_address)
# if self._tunnel_host:
# self.sock = sock
# self._tunnel()
# sock.settimeout(self.ClientObj.timeout)
#
# user_root_cert = self.ClientObj.VarsApi.Get('cl_user_root_cert')
# homePath = self.ClientObj.VarsApi.Get('ur_home_path')
# user_root_cert = user_root_cert.replace("~",homePath)
# result_user_root = 1
#
# if os.path.exists(user_root_cert):
# result_user_root = self.connect_trusted_root(sock, user_root_cert,\
# self.CRL_PATH)
# if result_user_root == 1:
# glob_root_cert = self.ClientObj.VarsApi.Get('cl_glob_root_cert')
# result_root_con = 1
# if os.path.exists(glob_root_cert):
# sock = socket.create_connection((self.host, self.port),
# self.timeout, self.source_address)
# if self._tunnel_host:
# self.sock = sock
# self._tunnel()
# result_root_con = self.connect_trusted_root(sock, \
# glob_root_cert, self.CRL_PATH)
# if result_root_con == 1:
# sock = socket.create_connection((self.host, self.port),
# self.timeout, self.source_address)
# if self._tunnel_host:
# self.sock = sock
# self._tunnel()
# result_server_con = self.connect_trusted_server \
# (sock, self.CRL_PATH)
# if result_server_con in [1,2]:
# raise Exception
# elif result_root_con == 2:
# raise Exception
# elif result_user_root == 2:
# raise Exception
#class HTTPSClientAuthHandler(u2.HTTPSHandler):
# def __init__(self, key, cert, cert_path, ClientObj = None):
# u2.HTTPSHandler.__init__(self)
# self.ClientObj = ClientObj
# self.key = key
# self.cert = cert
# self.cert_path = cert_path
#
# def https_open(self, req):
# #Rather than pass in a reference to a connection class, we pass in
# # a reference to a function which, for all intents and purposes,
# # will behave as a constructor
## try:
# return self.do_open(self.getConnection, req)
## except u2.URLError, e:
## _print (e)
## return 1
#
#
# def getConnection(self, host, timeout=300):
# return clientHTTPSConnection(host, key_file=self.key,
# cert_file=self.cert, cert_path = self.cert_path,
# ClientObj = self.ClientObj)
class CheckingClientHTTPSHandler(CheckingHTTPSHandler):
#class CheckingHTTPSHandler(u2.HTTPSHandler):
def __init__(self, ClientObj, cert_path, ca_certs=None, cert_verifier=None,
client_certfile=None, client_keyfile=None,
client_keyobj=None, client_certobj=None,
*args, **kw):
"""cert_verifier is a function returning either True or False
based on whether the certificate was found to be OK"""
CheckingHTTPSHandler.__init__(self, ca_certs, cert_verifier,
client_keyfile, client_certfile,
client_keyobj, client_certobj)
self.ClientObj = ClientObj
self.cert_path = cert_path
def https_open(self, req):
def open(*args, **kw):
new_kw = dict(ca_certs=self.ca_certs,
cert_verifier=self.cert_verifier,
cert_file=self.client_certfile,
key_file=self.client_keyfile,
keyobj=self.keyobj,
certobj=self.certobj)
new_kw.update(kw)
return CheckingClientHTTPSConnection(self.ClientObj,
self.cert_path, *args, **new_kw)
return self.do_open(open, req)
https_request = u2.AbstractHTTPHandler.do_request_
class HTTPSClientCertTransport(HttpTransport):
def __init__(self, key, cert, path_to_cert, parent, password = None,
ca_certs=None, cert_verifier=None,
client_keyfile=None, client_certfile=None,
client_keyobj=None, client_certobj=None,
cookie_callback=None, user_agent_string=None,
**kwargs):
Transport.__init__(self)
# HttpTransport.__init__(self, ca_certs, cert_verifier, client_keyfile,
# client_certfile, client_keyobj, client_certobj,
# cookie_callback, user_agent_string, **kwargs)
self.ClientObj = parent
self.key = key
self.cert = cert
self.cert_path = path_to_cert
if key:
client_certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, file(cert).read())
if password:
client_keyobj = OpenSSL.crypto.load_privatekey \
(OpenSSL.SSL.FILETYPE_PEM, file(key).read(),
str(password))
else:
import M2Crypto
bio = M2Crypto.BIO.openfile(key)
rsa = M2Crypto.m2.rsa_read_key(bio._ptr(),lambda *unused: None)
if not rsa:
raise OpenSSL.crypto.Error
client_keyobj = OpenSSL.crypto.load_privatekey \
(OpenSSL.SSL.FILETYPE_PEM, file(key).read())
Unskin(self.options).update(kwargs)
self.cookiejar = CookieJar(DefaultCookiePolicy())
self.cookie_callback = cookie_callback
self.user_agent_string = user_agent_string
log.debug("Proxy: %s", self.options.proxy)
from dslib.network import ProxyManager
proxy_handler = ProxyManager.HTTPS_PROXY.create_proxy_handler()
proxy_auth_handler = \
ProxyManager.HTTPS_PROXY.create_proxy_auth_handler()
if ca_certs or (client_keyfile and client_certfile) \
or (client_keyobj and client_certobj):
https_handler = CheckingClientHTTPSHandler(parent,
cert_path=path_to_cert,
ca_certs=ca_certs,
cert_verifier=cert_verifier,
client_keyfile=client_keyfile,
client_certfile=client_certfile,
client_keyobj=client_keyobj,
client_certobj=client_certobj)
else:
https_handler = u2.HTTPSHandler()
self.urlopener = u2.build_opener(SUDSHTTPRedirectHandler(),
u2.HTTPCookieProcessor(self.cookiejar),
https_handler)
if proxy_handler:
self.urlopener.add_handler(proxy_handler)
if proxy_auth_handler:
self.urlopener.add_handler(proxy_auth_handler)
self.urlopener.addheaders = [('User-agent', self.user_agent_string)]
# def u2open(self, u2request):
# """
# Open a connection.
# @param u2request: A urllib2 request.
# @type u2request: urllib2.Requet.
# @return: The opened file-like urllib2 object.
# @rtype: fp
# """
# self.options.timeout = self.ClientObj.timeout
# tm = self.options.timeout
# url = u2.build_opener(HTTPSClientAuthHandler(self.key, self.cert,\
# self.cert_path, ClientObj = self.ClientObj))
#
# #from urllib2 import URLError
# #try:
# socket.setdefaulttimeout(tm)
# if self.u2ver() < 2.6:
# socket.setdefaulttimeout(tm)
# return url.open(u2request)
# else:
# return url.open(u2request, timeout=tm)
# #except URLError, e:
# #print _("Excertion URLError: "), e
# #sys.exit(1)
###############################################################################
def get_CRL(path_to_cert):
""" get new CRL (Certificate Revocation List) from all CA """
# local CRL
CRL_path = os.path.join(path_to_cert, 'ca/crl/')
if not os.path.exists(CRL_path):
if not os.path.exists(os.path.join(path_to_cert, '/ca')):
if not os.path.exists(path_to_cert):
try:
os.makedirs(path_to_cert)
except OSError:
_print (_("error creating directory %s") %path_to_cert)
sys.exit()
try:
os.makedirs(os.path.join(path_to_cert, '/ca'))
except OSError:
_print (_("error creating directory %s") \
%(os.path.join(path_to_cert, '/ca')))
sys.exit()
os.makedirs(CRL_path)
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
# user and system ca and root certificates
user_root_cert = clVars.Get('cl_user_root_cert')
homePath = clVars.Get('ur_home_path')
user_root_cert = user_root_cert.replace("~",homePath)
glob_root_cert = clVars.Get('cl_glob_root_cert')
if os.path.exists(user_root_cert):
user_ca_certs = open(user_root_cert, 'r').read()
else: user_ca_certs = ''
if os.path.exists(glob_root_cert):
glob_ca_certs = open(glob_root_cert, 'r').read()
else: glob_ca_certs = ''
# get certificates list fron text
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]{5}\n?')
user_ca_certs_list = p.findall(user_ca_certs)
glob_ca_certs_list = p.findall(glob_ca_certs)
# association in one list
all_ca_certs_list = user_ca_certs_list + glob_ca_certs_list
for ca in all_ca_certs_list:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, ca)
# get url from certificates
url = None
CN = None
Subject = certobj.get_subject().get_components()
for subj in Subject:
if subj[0] == 'L':
url = "https://" + subj[1] +"/?wsdl"
if subj[0] == 'CN':
CN = subj[1]
if url:
# connect to ca server (url get from certificates)
try:
client = Client_suds(url,\
transport = HTTPSClientCertTransport(None, None, \
path_to_cert))
client.set_parameters (path_to_cert, None, None)
new_crl = client.service.get_crl()
except VerifyError, e:
_print (e.value)
#rm_ca_from_trusted(ca)
sys.exit()
except:
pass
if 'new_crl' in locals():
if new_crl:
if CN and len(CN) > 2:
CRL_file = CRL_path + CN
else:
host = subj[1].split(':')[0]
CRL_file = CRL_path + host
if new_crl == ' ':
open(CRL_file, 'w')
#if os.path.exists(CRL_file):
#os.unlink(CRL_file)
continue
if os.path.exists(CRL_file):
if open(CRL_file, 'r').read() == new_crl:
continue
fd = open(CRL_file, 'w')
fd.write(new_crl)
fd.close()
_print (_("CRL add"))
find_ca_in_crl (CRL_path, all_ca_certs_list)
def find_ca_in_crl (CRL_path, all_ca_certs_list):
# CRL_name_list = glob.glob(CRL_path + '*')
for ca in all_ca_certs_list:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, ca)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == 'CN':
CN = item[1]
serverSerial = certobj.get_serial_number()
CRL = CRL_path + CN
if not os.path.exists(CRL):
continue
with open(CRL, 'r') as _crl_file:
crl = "".join(_crl_file.readlines())
try:
crl_object = OpenSSL.crypto.load_crl(OpenSSL.crypto.FILETYPE_PEM, crl)
except:
continue
revoked_objects = crl_object.get_revoked()
for rvk in revoked_objects:
if serverSerial == int(rvk.get_serial(), 16):
rm_ca_from_trusted(ca)
def rm_ca_from_trusted(ca_cert):
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
user_ca_dir = clVars.Get('cl_client_cert_dir')
homePath = clVars.Get('ur_home_path')
user_ca_dir = user_ca_dir.replace("~",homePath)
user_ca_dir = os.path.join(user_ca_dir, 'ca')
user_ca_list = os.path.join(user_ca_dir, 'cert_list')
user_ca_db = clVars.Get('cl_user_root_cert')
homePath = clVars.Get('ur_home_path')
user_ca_db = user_ca_db.replace("~",homePath)
# system_ca_dir = clVars.Get('cl_core_cert_path')
# system_ca_list = os.path.join(system_ca_dir, 'cert_list')
system_ca_db = clVars.Get('cl_glob_root_cert')
import hashlib
md5 = hashlib.md5()
md5.update(ca_cert)
md5sum = md5.hexdigest()
# search ca certificate in user ca list
with open(user_ca_list) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
newfile = ''
# and each word in line
words = line.split()
if words[0] == md5sum:
filename = os.path.join(user_ca_dir, words[1])
if ca_cert == open(filename, 'r').read():
os.unlink(filename)
else:
newfile += (line + '\n')
else:
newfile += (line + '\n')
fd.close()
fn = open(user_ca_list, 'w')
fn.write(newfile)
fn.close()
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?')
# open, write and split user ca certificates
user_ca_certs = open(user_ca_db, 'r').read()
user_ca_certs_list = p.findall(user_ca_certs)
if ca_cert in user_ca_certs_list:
new_user_ca_certs = []
for cert in user_ca_certs_list:
if ca_cert != cert:
new_user_ca_certs.append(cert)
else:
_print (_("CA certificate delete from user"
" trusted certificate"))
fd = open(user_ca_db, 'w')
for cert in new_user_ca_certs:
fd.write(cert)
fd.close()
if not os.path.exists(system_ca_db):
open(system_ca_db, 'w')
system_ca_certs = open(system_ca_db, 'r').read()
system_ca_certs_list = p.findall(system_ca_certs)
if ca_cert in system_ca_certs_list:
new_system_ca_certs = []
for cert in system_ca_certs_list:
if ca_cert != cert:
new_system_ca_certs.append(cert)
else:
_print (_("CA certificate delete from system"
" trusted certificate"))
fd = open(system_ca_db, 'w')
for cert in new_system_ca_certs:
fd.write(cert)
fd.close()
return 0