You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
506 lines
21 KiB
506 lines
21 KiB
#-*- coding: utf-8 -*-
|
|
|
|
# Copyright 2012 Calculate Ltd. http://www.calculate-linux.org
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import urllib2 as u2
|
|
import os, sys
|
|
import socket, ssl
|
|
import OpenSSL, hashlib, M2Crypto
|
|
from calculate.core.datavars import DataVarsCore
|
|
from calculate.lib.datavars import DataVars
|
|
from sudsds.client import Client
|
|
from cert_verify import verify, get_CRL
|
|
from sudsds.transport.http import HttpTransport, SUDSHTTPRedirectHandler, \
|
|
CheckingHTTPSConnection, CheckingHTTPSHandler, \
|
|
PYOPENSSL_AVAILABLE, PyOpenSSLSocket
|
|
from sudsds.transport import Transport
|
|
from sudsds.properties import Unskin
|
|
from cookielib import CookieJar, DefaultCookiePolicy
|
|
from logging import getLogger
|
|
from calculate.console.datavars import DataVarsConsole
|
|
from calculate.lib.cl_lang import setLocalTranslate
|
|
setLocalTranslate('calculate_console',sys.modules[__name__])
|
|
#import cert_func.verify
|
|
log = getLogger(__name__)
|
|
flag = 0
|
|
|
|
class Client_suds(Client):
|
|
def set_parameters (self, path_to_cert, CERT_FILE, PKEY_FILE):
|
|
self.path_to_cert = path_to_cert
|
|
if not CERT_FILE:
|
|
CERT_FILE = ''
|
|
self.CERT_FILE = CERT_FILE
|
|
self.REQ_FILE = path_to_cert + 'client.csr'
|
|
self.PKEY_FILE = PKEY_FILE
|
|
self.SID_FILE = path_to_cert + 'sid.int'
|
|
self.CRL_PATH = path_to_cert + 'ca/crl/'
|
|
if not os.path.exists(self.CRL_PATH):
|
|
os.makedirs(self.CRL_PATH)
|
|
|
|
class CheckingClientHTTPSConnection(CheckingHTTPSConnection):
|
|
"""based on httplib.HTTPSConnection code - extended to support
|
|
server certificate verification and client certificate authorization"""
|
|
|
|
def __init__(self, cert_path, host, ca_certs=None, cert_verifier=None,
|
|
keyobj=None, certobj=None, wait_thread=None, **kw):
|
|
"""cert_verifier is a function returning either True or False
|
|
based on whether the certificate was found to be OK,
|
|
keyobj and certobj represent internal PyOpenSSL structures holding
|
|
the key and certificate respectively.
|
|
"""
|
|
CheckingHTTPSConnection.__init__(self, host, ca_certs, cert_verifier,
|
|
keyobj, certobj, **kw)
|
|
# self.ClientObj = ClientObj
|
|
self.cert_path = cert_path
|
|
self.ca_certs = ca_certs
|
|
self.CRL_PATH = os.path.join(cert_path, 'ca/crl/')
|
|
self.wait_thread = wait_thread
|
|
|
|
# get filename store cert server
|
|
def cert_list (self, host, ca_certs, server_cert):
|
|
if host == '127.0.0.1':
|
|
host = 'localhost'
|
|
if not os.path.exists(self.trusted_path):
|
|
try:
|
|
os.makedirs(self.trusted_path)
|
|
except OSError:
|
|
pass
|
|
if not os.path.exists(ca_certs):
|
|
fc = open(ca_certs,"w")
|
|
fc.close()
|
|
filename = None
|
|
try:
|
|
with open(ca_certs) as fd:
|
|
t = fd.read()
|
|
# for each line
|
|
for line in t.splitlines():
|
|
# Split string into a words list
|
|
words = line.split()
|
|
if len(words) > 1:
|
|
# if first word...
|
|
if words[0] == host:
|
|
filename = words[1]
|
|
if not filename:
|
|
return None
|
|
except:
|
|
print _("Certificate not found in client")
|
|
return None
|
|
try:
|
|
fd = open(self.trusted_path + filename, 'r')
|
|
store_cert = fd.read()
|
|
fd.close()
|
|
if store_cert == server_cert:
|
|
return filename
|
|
except:
|
|
print _("Error open file"), self.trusted_path, filename
|
|
return None
|
|
|
|
def add_all_ca_cert(self, list_ca_certs):
|
|
# so root cert be first, ca after
|
|
clVarsCore = DataVarsCore()
|
|
clVarsCore.importCore()
|
|
clVarsCore.flIniFile()
|
|
|
|
list_ca_certs.reverse()
|
|
system_ca_db = clVarsCore.Get('cl_glob_root_cert')
|
|
|
|
clVars = DataVars()
|
|
clVars.flIniFile()
|
|
homePath = clVars.Get('ur_home_path')
|
|
cl_client_cert_dir = clVarsCore.Get('cl_client_cert_dir')
|
|
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
|
|
root_cert_md5 = os.path.join(cl_client_cert_dir, "ca/cert_list")
|
|
|
|
user_root_cert = clVarsCore.Get('cl_user_root_cert')
|
|
user_root_cert = user_root_cert.replace("~",homePath)
|
|
|
|
for cert in list_ca_certs:
|
|
if os.path.exists(system_ca_db):
|
|
if cert in open(system_ca_db, 'r').read():
|
|
continue
|
|
|
|
if os.path.exists(user_root_cert):
|
|
if cert in open(user_root_cert, 'r').read():
|
|
continue
|
|
|
|
md5 = hashlib.md5()
|
|
md5.update(cert)
|
|
md5sum = md5.hexdigest()
|
|
print "\n================================================="
|
|
print "md5sum = ", md5sum
|
|
|
|
if not os.path.exists(root_cert_md5):
|
|
fc = open(root_cert_md5,"w")
|
|
fc.close()
|
|
|
|
filename = None
|
|
with open(root_cert_md5) as fd:
|
|
t = fd.read()
|
|
# for each line
|
|
for line in t.splitlines():
|
|
# Split string into a words list
|
|
words = line.split(' ',1)
|
|
if words[0] == md5sum:
|
|
filename = words[1]
|
|
if not filename:
|
|
certobj = OpenSSL.crypto.load_certificate \
|
|
(OpenSSL.SSL.FILETYPE_PEM, cert)
|
|
Issuer = certobj.get_issuer().get_components()
|
|
for item in Issuer:
|
|
if item[0] == 'CN':
|
|
filename = item[1]
|
|
|
|
fc = open(root_cert_md5,"a")
|
|
fc.write('%s %s\n' %(md5sum, filename))
|
|
fc.close()
|
|
|
|
if not filename:
|
|
print _('Not found field "CN" in certificate!')
|
|
return 1
|
|
|
|
fd = open(os.path.join(cl_client_cert_dir,'ca/',filename),'w')
|
|
fd.write(cert)
|
|
fd.close()
|
|
|
|
fa = open(user_root_cert, 'a')
|
|
fa.write(cert)
|
|
fa.close()
|
|
print _("filename = "), filename
|
|
print _("CERTIFICATE ADD")
|
|
else:
|
|
print _("file with ca certificates exists")
|
|
get_CRL(cl_client_cert_dir)
|
|
|
|
def add_ca_cert(self, cert, list_ca_certs):
|
|
url = 'https://%s:%s/?wsdl' %(self.host, self.port)
|
|
client = Client_suds(url, transport = HTTPSClientCertTransport \
|
|
(None, None, self.cert_path))
|
|
client.wsdl.services[0].setlocation(url)
|
|
cert = client.service.get_ca()
|
|
if cert == '1':
|
|
print _('Server certificate is not valid')
|
|
raise Exception(1)
|
|
|
|
if cert == '2':
|
|
print _('CA not found on server')
|
|
raise Exception(1)
|
|
|
|
try:
|
|
certobj = OpenSSL.crypto.load_certificate \
|
|
(OpenSSL.SSL.FILETYPE_PEM, cert)
|
|
except:
|
|
print _("Error. Certificate not added to trusted")
|
|
raise Exception(1)
|
|
print '\n', _("Fingerprint = %s") % certobj.digest('SHA1')
|
|
print _("Serial Number = "), certobj.get_serial_number()
|
|
Issuer = certobj.get_issuer().get_components()
|
|
print '\n', _("Issuer")
|
|
for i in Issuer:
|
|
print "%s : %s" %(i[0], i[1])
|
|
Subject = certobj.get_subject().get_components()
|
|
print '\n', _("Subject")
|
|
for subj in Subject:
|
|
print "%s : %s" %(subj[0], subj[1])
|
|
ans = raw_input (_("Add CA certificates to trusted? y/[n]:"))
|
|
if ans.lower() in ['y','yes']:
|
|
list_ca_certs.append(cert)
|
|
self.add_all_ca_cert(list_ca_certs)
|
|
else:
|
|
print _("Certificate not added to trusted")
|
|
|
|
# add certificate server in trusted
|
|
def add_server_cert(self, cert):
|
|
self.wait_thread.stop()
|
|
print _("Untrusted Server Certificate!")
|
|
certobj = OpenSSL.crypto.load_certificate \
|
|
(OpenSSL.SSL.FILETYPE_PEM, cert)
|
|
print '\n' + _("Fingerprint = %s") % certobj.digest('SHA1')
|
|
print _("Serial Number = "), certobj.get_serial_number()
|
|
Issuer = certobj.get_issuer().get_components()
|
|
print '\n' + _("Issuer")
|
|
for i in Issuer:
|
|
print "%s : %s" %(i[0], i[1])
|
|
Subject = certobj.get_subject().get_components()
|
|
print '\n' + _("Subject")
|
|
for item in Subject:
|
|
print "%s : %s" %(item[0], item[1])
|
|
|
|
print '\n' + _('Add this Servers certificate to trusted (s) or')
|
|
print _('Try add CA and ROOT certificates to trusted (c) or')
|
|
choice = raw_input (_("Quit (q)? s/c/[q]: "))
|
|
if choice.lower() in ['s', 'c']:
|
|
#self.sock = ssl.wrap_socket(sock)
|
|
ca_certs = os.path.join(self.trusted_path, "cert.list")
|
|
|
|
if not os.path.exists(ca_certs):
|
|
fc = open(ca_certs,"w")
|
|
fc.close()
|
|
|
|
if self.host == '127.0.0.1':
|
|
host = 'localhost'
|
|
else: host = self.host
|
|
filename = host
|
|
fc = open(self.trusted_path + filename,"w")
|
|
fc.write(cert)
|
|
fc.close()
|
|
with open(ca_certs) as fd:
|
|
t = fd.read()
|
|
# for each line
|
|
for line in t.splitlines():
|
|
# Split string into a words list
|
|
words = line.split()
|
|
if len(words) > 1:
|
|
# if first word...
|
|
if words[0] == host:
|
|
return 0
|
|
# Open file with compliance server certificates and server hostname
|
|
fcl = open(ca_certs,"a")
|
|
fcl.write(host + ' ' + filename + '\n')
|
|
fcl.close()
|
|
if choice.lower() != 'c':
|
|
return 3
|
|
if choice.lower() == 'c':
|
|
clVars = DataVarsCore()
|
|
clVars.importCore()
|
|
clVars.flIniFile()
|
|
cl_client_cert_dir = clVars.Get('cl_client_cert_dir')
|
|
homePath = clVars.Get('ur_home_path')
|
|
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
|
|
root_cert_dir = os.path.join(cl_client_cert_dir, "ca")
|
|
|
|
if not os.path.exists(root_cert_dir):
|
|
try:
|
|
os.makedirs(root_cert_dir)
|
|
except OSError:
|
|
print _("error creating directory %s") %root_cert_dir
|
|
raise Exception(1)
|
|
|
|
print '\n' + _("Try add CA and ROOT certificates")
|
|
self.list_ca_certs = []
|
|
self.add_ca_cert(cert, self.list_ca_certs)
|
|
return 3
|
|
elif not choice.lower() in ['c','s']:
|
|
return 4
|
|
|
|
def connect_trusted_root(self, sock, root_cert, crl_certs):
|
|
self.ca_path = self.cert_path + "ca/"
|
|
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
|
|
global flag
|
|
|
|
if self.cert_file:
|
|
f = verify(server_cert, crl_certs, flag)
|
|
if not f:
|
|
flag = 1
|
|
elif f == 1:
|
|
raise Exception(1)
|
|
else:
|
|
import time
|
|
time.sleep(0.1)
|
|
|
|
try:
|
|
if self.FORCE_SSL_VERSION:
|
|
add = {'ssl_version': self.FORCE_SSL_VERSION}
|
|
else:
|
|
add = {}
|
|
add['cert_reqs'] = ssl.CERT_REQUIRED
|
|
# try to use PyOpenSSL by default
|
|
if PYOPENSSL_AVAILABLE:
|
|
wrap_class = PyOpenSSLSocket
|
|
add['keyobj'] = self.keyobj
|
|
add['certobj'] = self.certobj
|
|
add['keyfile'] = self.key_file
|
|
add['certfile'] = self.cert_file
|
|
else:
|
|
wrap_class = ssl.SSLSocket
|
|
self.sock = wrap_class(sock, ca_certs=self.ca_certs, **add)
|
|
return 0
|
|
except:
|
|
return 1
|
|
|
|
def connect_trusted_server(self, sock, crl_certs):
|
|
self.trusted_path = self.cert_path + "trusted/"
|
|
ca_cert_list = self.trusted_path + "cert.list"
|
|
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
|
|
global flag
|
|
if self.cert_file:
|
|
f = verify(server_cert, crl_certs, flag)
|
|
if not f:
|
|
flag = 1
|
|
elif f == 1:
|
|
raise Exception(1)
|
|
#if not hasattr(HTTPSClientCertTransport, 'filename') or \
|
|
#HTTPSClientCertTransport.filename == None:
|
|
HTTPSClientCertTransport.filename = self.cert_list \
|
|
(self.host, ca_cert_list, server_cert)
|
|
if HTTPSClientCertTransport.filename:
|
|
try:
|
|
if self.FORCE_SSL_VERSION:
|
|
add = {'ssl_version': self.FORCE_SSL_VERSION}
|
|
else:
|
|
add = {}
|
|
add['cert_reqs'] = ssl.CERT_NONE
|
|
# try to use PyOpenSSL by default
|
|
if PYOPENSSL_AVAILABLE:
|
|
wrap_class = PyOpenSSLSocket
|
|
add['keyobj'] = self.keyobj
|
|
add['certobj'] = self.certobj
|
|
add['keyfile'] = self.key_file
|
|
add['certfile'] = self.cert_file
|
|
else:
|
|
wrap_class = ssl.SSLSocket
|
|
self.sock = wrap_class(sock, ca_certs=None, **add)
|
|
|
|
return 0
|
|
except Exception:
|
|
# print (e)
|
|
HTTPSClientCertTransport.filename = None
|
|
return 1
|
|
else:
|
|
return self.add_server_cert(server_cert)
|
|
|
|
def connect(self):
|
|
sock = socket.create_connection((self.host, self.port), self.timeout)
|
|
if hasattr(self, '_tunnel_host') and self._tunnel_host:
|
|
self.sock = sock
|
|
self._tunnel()
|
|
|
|
self.Vars = DataVarsConsole()
|
|
self.Vars.importConsole()
|
|
self.Vars.flIniFile()
|
|
user_root_cert = self.Vars.Get('cl_user_root_cert')
|
|
homePath = self.Vars.Get('ur_home_path')
|
|
user_root_cert = user_root_cert.replace("~",homePath)
|
|
result_user_root = 1
|
|
|
|
while True:
|
|
if os.path.exists(user_root_cert):
|
|
result_user_root = self.connect_trusted_root(sock, \
|
|
user_root_cert, self.CRL_PATH)
|
|
if result_user_root == 1:
|
|
glob_root_cert = self.Vars.Get('cl_glob_root_cert')
|
|
result_root_con = 1
|
|
if os.path.exists(glob_root_cert):
|
|
sock = socket.create_connection((self.host, self.port),
|
|
self.timeout, self.source_address)
|
|
if self._tunnel_host:
|
|
self.sock = sock
|
|
self._tunnel()
|
|
result_root_con = self.connect_trusted_root(sock, \
|
|
glob_root_cert, self.CRL_PATH)
|
|
if result_root_con == 1:
|
|
sock = socket.create_connection((self.host, self.port),
|
|
self.timeout, self.source_address)
|
|
if self._tunnel_host:
|
|
self.sock = sock
|
|
self._tunnel()
|
|
result_server_con = self.connect_trusted_server \
|
|
(sock, self.CRL_PATH)
|
|
if result_server_con in [1,2]:
|
|
raise Exception (1)
|
|
elif result_server_con == 3:
|
|
continue
|
|
elif result_server_con == 4:
|
|
print _('This server is not trusted')
|
|
self.wait_thread.stop()
|
|
sys.exit(1)
|
|
# raise Exception (_('This server is not trusted'))
|
|
elif result_root_con == 2:
|
|
raise Exception (1)
|
|
elif result_user_root == 2:
|
|
raise Exception (1)
|
|
break
|
|
|
|
class CheckingClientHTTPSHandler(CheckingHTTPSHandler):
|
|
def __init__(self, cert_path, ca_certs=None, cert_verifier=None,
|
|
client_certfile=None, client_keyfile=None,
|
|
client_keyobj=None, client_certobj=None, wait_thread=None,
|
|
*args, **kw):
|
|
"""cert_verifier is a function returning either True or False
|
|
based on whether the certificate was found to be OK"""
|
|
CheckingHTTPSHandler.__init__(self, ca_certs, cert_verifier,
|
|
client_keyfile, client_certfile,
|
|
client_keyobj, client_certobj)
|
|
# self.ClientObj = ClientObj
|
|
self.cert_path = cert_path
|
|
self.wait_thread = wait_thread
|
|
|
|
def https_open(self, req):
|
|
def open(*args, **kw):
|
|
new_kw = dict(ca_certs=self.ca_certs,
|
|
cert_verifier=self.cert_verifier,
|
|
cert_file=self.client_certfile,
|
|
key_file=self.client_keyfile,
|
|
keyobj=self.keyobj, certobj=self.certobj,
|
|
wait_thread=self.wait_thread)
|
|
new_kw.update(kw)
|
|
return CheckingClientHTTPSConnection(self.cert_path,
|
|
*args, **new_kw)
|
|
return self.do_open(open, req)
|
|
|
|
https_request = u2.AbstractHTTPHandler.do_request_
|
|
|
|
class HTTPSClientCertTransport(HttpTransport):
|
|
def __init__(self, key, cert, path_to_cert, password = None,
|
|
ca_certs=None, cert_verifier=None,
|
|
client_keyfile=None, client_certfile=None,
|
|
client_keyobj=None, client_certobj=None,
|
|
cookie_callback=None, user_agent_string=None,
|
|
wait_thread=None, **kwargs):
|
|
Transport.__init__(self)
|
|
# self.ClientObj = parent
|
|
self.key = key
|
|
self.cert = cert
|
|
self.cert_path = path_to_cert
|
|
if key:
|
|
client_certobj = OpenSSL.crypto.load_certificate \
|
|
(OpenSSL.SSL.FILETYPE_PEM, file(cert).read())
|
|
if password:
|
|
client_keyobj = OpenSSL.crypto.load_privatekey \
|
|
(OpenSSL.SSL.FILETYPE_PEM, file(key).read(),
|
|
str(password))
|
|
else:
|
|
bio = M2Crypto.BIO.openfile(key)
|
|
rsa = M2Crypto.m2.rsa_read_key(bio._ptr(),lambda *unused:None)
|
|
if not rsa:
|
|
raise OpenSSL.crypto.Error
|
|
client_keyobj = OpenSSL.crypto.load_privatekey \
|
|
(OpenSSL.SSL.FILETYPE_PEM, file(key).read())
|
|
Unskin(self.options).update(kwargs)
|
|
self.cookiejar = CookieJar(DefaultCookiePolicy())
|
|
self.cookie_callback = cookie_callback
|
|
self.user_agent_string = user_agent_string
|
|
log.debug("Proxy: %s", self.options.proxy)
|
|
from dslib.network import ProxyManager
|
|
proxy_handler = ProxyManager.HTTPS_PROXY.create_proxy_handler()
|
|
proxy_auth_handler = \
|
|
ProxyManager.HTTPS_PROXY.create_proxy_auth_handler()
|
|
if ca_certs or (client_keyfile and client_certfile) \
|
|
or (client_keyobj and client_certobj):
|
|
https_handler = CheckingClientHTTPSHandler(cert_path=path_to_cert,
|
|
ca_certs=ca_certs, cert_verifier=cert_verifier,
|
|
client_keyfile=client_keyfile, client_certfile = \
|
|
client_certfile, client_keyobj=client_keyobj,
|
|
client_certobj=client_certobj,
|
|
wait_thread=wait_thread)
|
|
else:
|
|
https_handler = u2.HTTPSHandler()
|
|
self.urlopener = u2.build_opener(SUDSHTTPRedirectHandler(),
|
|
u2.HTTPCookieProcessor(self.cookiejar),
|
|
https_handler)
|
|
if proxy_handler:
|
|
self.urlopener.add_handler(proxy_handler)
|
|
if proxy_auth_handler:
|
|
self.urlopener.add_handler(proxy_auth_handler)
|
|
self.urlopener.addheaders = [('User-agent', self.user_agent_string)] |