You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-console/pym/console/application/client_class.py

506 lines
21 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#-*- coding: utf-8 -*-
# Copyright 2012-2013 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import urllib2 as u2
import os, sys
import socket, ssl
import OpenSSL, hashlib, M2Crypto
from calculate.core.datavars import DataVarsCore
from calculate.lib.datavars import DataVars
from sudsds.client import Client
from cert_verify import verify, get_CRL
from sudsds.transport.http import HttpTransport, SUDSHTTPRedirectHandler, \
CheckingHTTPSConnection, CheckingHTTPSHandler, \
PYOPENSSL_AVAILABLE, PyOpenSSLSocket
from sudsds.transport import Transport
from sudsds.properties import Unskin
from cookielib import CookieJar, DefaultCookiePolicy
from logging import getLogger
from calculate.console.datavars import DataVarsConsole
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_console3',sys.modules[__name__])
log = getLogger(__name__)
flag = 0
class Client_suds(Client):
def set_parameters (self, path_to_cert, CERT_FILE, PKEY_FILE):
self.path_to_cert = path_to_cert
if not CERT_FILE:
CERT_FILE = ''
self.CERT_FILE = CERT_FILE
self.REQ_FILE = path_to_cert + 'client.csr'
self.PKEY_FILE = PKEY_FILE
self.SID_FILE = path_to_cert + 'sid.int'
self.CRL_PATH = path_to_cert + 'ca/crl/'
if not os.path.exists(self.CRL_PATH):
os.makedirs(self.CRL_PATH)
class CheckingClientHTTPSConnection(CheckingHTTPSConnection):
"""based on httplib.HTTPSConnection code - extended to support
server certificate verification and client certificate authorization"""
def __init__(self, cert_path, host, ca_certs=None, cert_verifier=None,
keyobj=None, certobj=None, wait_thread=None, **kw):
"""cert_verifier is a function returning either True or False
based on whether the certificate was found to be OK,
keyobj and certobj represent internal PyOpenSSL structures holding
the key and certificate respectively.
"""
CheckingHTTPSConnection.__init__(self, host, ca_certs, cert_verifier,
keyobj, certobj, **kw)
# self.ClientObj = ClientObj
self.cert_path = cert_path
self.ca_certs = ca_certs
self.CRL_PATH = os.path.join(cert_path, 'ca/crl/')
self.wait_thread = wait_thread
# get filename store cert server
def cert_list (self, host, ca_certs, server_cert):
if host == '127.0.0.1':
host = 'localhost'
if not os.path.exists(self.trusted_path):
try:
os.makedirs(self.trusted_path)
except OSError:
pass
if not os.path.exists(ca_certs):
fc = open(ca_certs,"w")
fc.close()
filename = None
try:
with open(ca_certs) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split()
if len(words) > 1:
# if first word...
if words[0] == host:
filename = words[1]
if not filename:
return None
except:
print _("Certificate not found on the clients side")
return None
try:
fd = open(self.trusted_path + filename, 'r')
store_cert = fd.read()
fd.close()
if store_cert == server_cert:
return filename
except:
print _("Failed to open the file"), self.trusted_path, filename
return None
def add_all_ca_cert(self, list_ca_certs):
# so root cert be first, ca after
clVarsCore = DataVarsCore()
clVarsCore.importCore()
clVarsCore.flIniFile()
list_ca_certs.reverse()
system_ca_db = clVarsCore.Get('core.cl_glob_root_cert')
clVars = DataVars()
clVars.flIniFile()
homePath = clVars.Get('ur_home_path')
cl_client_cert_dir = clVarsCore.Get('core.cl_client_cert_dir')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
root_cert_md5 = os.path.join(cl_client_cert_dir, "ca/cert_list")
user_root_cert = clVarsCore.Get('core.cl_user_root_cert')
user_root_cert = user_root_cert.replace("~",homePath)
for cert in list_ca_certs:
if os.path.exists(system_ca_db):
if cert in open(system_ca_db, 'r').read():
continue
if os.path.exists(user_root_cert):
if cert in open(user_root_cert, 'r').read():
continue
md5 = hashlib.md5()
md5.update(cert)
md5sum = md5.hexdigest()
print "\n================================================="
print "md5sum = ", md5sum
if not os.path.exists(root_cert_md5):
fc = open(root_cert_md5,"w")
fc.close()
filename = None
with open(root_cert_md5) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split(' ',1)
if words[0] == md5sum:
filename = words[1]
if not filename:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == 'CN':
filename = item[1]
fc = open(root_cert_md5,"a")
fc.write('%s %s\n' %(md5sum, filename))
fc.close()
if not filename:
print _('Field "CN" not found in the certificate!')
return 1
fd = open(os.path.join(cl_client_cert_dir,'ca/',filename),'w')
fd.write(cert)
fd.close()
fa = open(user_root_cert, 'a')
fa.write(cert)
fa.close()
print _("filename = "), filename
print _("Certificate added")
else:
print _("The file containing the CA certificate now exists")
get_CRL(cl_client_cert_dir)
def add_ca_cert(self, cert, list_ca_certs):
url = 'https://%s:%s/?wsdl' %(self.host, self.port)
client = Client_suds(url, transport = HTTPSClientCertTransport \
(None, None, self.cert_path))
client.wsdl.services[0].setlocation(url)
cert = client.service.get_ca()
if cert == '1':
print _("Invalid server certificate!")
raise Exception(1)
if cert == '2':
print _("CA certificate not found on the server")
raise Exception(1)
try:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
except:
print _("Error. Certificate not added to trusted")
raise Exception(1)
print '\n', _("Fingerprint = %s") % certobj.digest('SHA1')
print _("Serial Number = "), certobj.get_serial_number()
Issuer = certobj.get_issuer().get_components()
print '\n', _("Issuer")
for i in Issuer:
print "%s : %s" %(i[0], i[1])
Subject = certobj.get_subject().get_components()
print '\n', _("Subject")
for subj in Subject:
print "%s : %s" %(subj[0], subj[1])
ans = raw_input (_("Add the CA certificate to trusted? y/[n]:"))
if ans.lower() in ['y','yes']:
list_ca_certs.append(cert)
self.add_all_ca_cert(list_ca_certs)
else:
print _("Certificate not added to trusted")
# add certificate server in trusted
def add_server_cert(self, cert):
self.wait_thread.stop()
print _("Untrusted server certificate!")
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
print '\n' + _("Fingerprint = %s") % certobj.digest('SHA1')
print _("Serial Number = "), certobj.get_serial_number()
Issuer = certobj.get_issuer().get_components()
print '\n' + _("Issuer")
for i in Issuer:
print "%s : %s" %(i[0], i[1])
Subject = certobj.get_subject().get_components()
print '\n' + _("Subject")
for item in Subject:
print "%s : %s" %(item[0], item[1])
print '\n' + _('Add this server certificate to trusted (s) or')
print _('Try to add the CA and root certificates to trusted (c) or')
choice = raw_input (_("Quit (q)? s/c/[q]: "))
if choice.lower() in ['s', 'c']:
#self.sock = ssl.wrap_socket(sock)
ca_certs = os.path.join(self.trusted_path, "cert.list")
if not os.path.exists(ca_certs):
fc = open(ca_certs,"w")
fc.close()
if self.host == '127.0.0.1':
host = 'localhost'
else: host = self.host
filename = host
fc = open(self.trusted_path + filename,"w")
fc.write(cert)
fc.close()
with open(ca_certs) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split()
if len(words) > 1:
# if first word...
if words[0] == host:
return 0
# Open file with compliance server certificates and server hostname
fcl = open(ca_certs,"a")
fcl.write(host + ' ' + filename + '\n')
fcl.close()
if choice.lower() != 'c':
return 3
if choice.lower() == 'c':
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
cl_client_cert_dir = clVars.Get('core.cl_client_cert_dir')
homePath = clVars.Get('ur_home_path')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
root_cert_dir = os.path.join(cl_client_cert_dir, "ca")
if not os.path.exists(root_cert_dir):
try:
os.makedirs(root_cert_dir)
except OSError:
print _("Failed to create directory %s") %root_cert_dir
raise Exception(1)
print '\n' + _("Add the CA and root certificates")
self.list_ca_certs = []
self.add_ca_cert(cert, self.list_ca_certs)
return 3
elif not choice.lower() in ['c','s']:
return 4
def connect_trusted_root(self, sock, root_cert, crl_certs):
self.ca_path = self.cert_path + "ca/"
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
global flag
if self.cert_file:
f = verify(server_cert, crl_certs, flag)
if not f:
flag = 1
elif f == 1:
raise Exception(1)
else:
import time
time.sleep(0.1)
try:
if self.FORCE_SSL_VERSION:
add = {'ssl_version': self.FORCE_SSL_VERSION}
else:
add = {}
add['cert_reqs'] = ssl.CERT_REQUIRED
# try to use PyOpenSSL by default
if PYOPENSSL_AVAILABLE:
wrap_class = PyOpenSSLSocket
add['keyobj'] = self.keyobj
add['certobj'] = self.certobj
add['keyfile'] = self.key_file
add['certfile'] = self.cert_file
else:
wrap_class = ssl.SSLSocket
self.sock = wrap_class(sock, ca_certs=self.ca_certs, **add)
return 0
except:
return 1
def connect_trusted_server(self, sock, crl_certs):
self.trusted_path = self.cert_path + "trusted/"
ca_cert_list = self.trusted_path + "cert.list"
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
global flag
if self.cert_file:
f = verify(server_cert, crl_certs, flag)
if not f:
flag = 1
elif f == 1:
raise Exception(1)
#if not hasattr(HTTPSClientCertTransport, 'filename') or \
#HTTPSClientCertTransport.filename == None:
HTTPSClientCertTransport.filename = self.cert_list \
(self.host, ca_cert_list, server_cert)
if HTTPSClientCertTransport.filename:
try:
if self.FORCE_SSL_VERSION:
add = {'ssl_version': self.FORCE_SSL_VERSION}
else:
add = {}
add['cert_reqs'] = ssl.CERT_NONE
# try to use PyOpenSSL by default
if PYOPENSSL_AVAILABLE:
wrap_class = PyOpenSSLSocket
add['keyobj'] = self.keyobj
add['certobj'] = self.certobj
add['keyfile'] = self.key_file
add['certfile'] = self.cert_file
else:
wrap_class = ssl.SSLSocket
self.sock = wrap_class(sock, ca_certs=None, **add)
return 0
except Exception:
# print (e)
HTTPSClientCertTransport.filename = None
return 1
else:
return self.add_server_cert(server_cert)
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if hasattr(self, '_tunnel_host') and self._tunnel_host:
self.sock = sock
self._tunnel()
self.Vars = DataVarsConsole()
self.Vars.importConsole()
self.Vars.flIniFile()
user_root_cert = self.Vars.Get('core.cl_user_root_cert')
homePath = self.Vars.Get('ur_home_path')
user_root_cert = user_root_cert.replace("~",homePath)
result_user_root = 1
while True:
if os.path.exists(user_root_cert):
result_user_root = self.connect_trusted_root(sock, \
user_root_cert, self.CRL_PATH)
if result_user_root == 1:
glob_root_cert = self.Vars.Get('core.cl_glob_root_cert')
result_root_con = 1
if os.path.exists(glob_root_cert):
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
result_root_con = self.connect_trusted_root(sock, \
glob_root_cert, self.CRL_PATH)
if result_root_con == 1:
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
result_server_con = self.connect_trusted_server \
(sock, self.CRL_PATH)
if result_server_con in [1,2]:
raise Exception (1)
elif result_server_con == 3:
continue
elif result_server_con == 4:
print _('This server is not trusted')
self.wait_thread.stop()
sys.exit(1)
# raise Exception (_('This server is not trusted'))
elif result_root_con == 2:
raise Exception (1)
elif result_user_root == 2:
raise Exception (1)
break
class CheckingClientHTTPSHandler(CheckingHTTPSHandler):
def __init__(self, cert_path, ca_certs=None, cert_verifier=None,
client_certfile=None, client_keyfile=None,
client_keyobj=None, client_certobj=None, wait_thread=None,
*args, **kw):
"""cert_verifier is a function returning either True or False
based on whether the certificate was found to be OK"""
CheckingHTTPSHandler.__init__(self, ca_certs, cert_verifier,
client_keyfile, client_certfile,
client_keyobj, client_certobj)
# self.ClientObj = ClientObj
self.cert_path = cert_path
self.wait_thread = wait_thread
def https_open(self, req):
def open(*args, **kw):
new_kw = dict(ca_certs=self.ca_certs,
cert_verifier=self.cert_verifier,
cert_file=self.client_certfile,
key_file=self.client_keyfile,
keyobj=self.keyobj, certobj=self.certobj,
wait_thread=self.wait_thread)
new_kw.update(kw)
return CheckingClientHTTPSConnection(self.cert_path,
*args, **new_kw)
return self.do_open(open, req)
https_request = u2.AbstractHTTPHandler.do_request_
class HTTPSClientCertTransport(HttpTransport):
def __init__(self, key, cert, path_to_cert, password = None,
ca_certs=None, cert_verifier=None,
client_keyfile=None, client_certfile=None,
client_keyobj=None, client_certobj=None,
cookie_callback=None, user_agent_string=None,
wait_thread=None, **kwargs):
Transport.__init__(self)
# self.ClientObj = parent
self.key = key
self.cert = cert
self.cert_path = path_to_cert
if key:
client_certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, file(cert).read())
if password:
client_keyobj = OpenSSL.crypto.load_privatekey \
(OpenSSL.SSL.FILETYPE_PEM, file(key).read(),
str(password))
else:
bio = M2Crypto.BIO.openfile(key)
rsa = M2Crypto.m2.rsa_read_key(bio._ptr(),lambda *unused:None)
if not rsa:
raise OpenSSL.crypto.Error
client_keyobj = OpenSSL.crypto.load_privatekey \
(OpenSSL.SSL.FILETYPE_PEM, file(key).read())
Unskin(self.options).update(kwargs)
self.cookiejar = CookieJar(DefaultCookiePolicy())
self.cookie_callback = cookie_callback
self.user_agent_string = user_agent_string
log.debug("Proxy: %s", self.options.proxy)
from dslib.network import ProxyManager
proxy_handler = ProxyManager.HTTPS_PROXY.create_proxy_handler()
proxy_auth_handler = \
ProxyManager.HTTPS_PROXY.create_proxy_auth_handler()
if ca_certs or (client_keyfile and client_certfile) \
or (client_keyobj and client_certobj):
https_handler = CheckingClientHTTPSHandler(cert_path=path_to_cert,
ca_certs=ca_certs, cert_verifier=cert_verifier,
client_keyfile=client_keyfile, client_certfile = \
client_certfile, client_keyobj=client_keyobj,
client_certobj=client_certobj,
wait_thread=wait_thread)
else:
https_handler = u2.HTTPSHandler()
self.urlopener = u2.build_opener(SUDSHTTPRedirectHandler(),
u2.HTTPCookieProcessor(self.cookiejar),
https_handler)
if proxy_handler:
self.urlopener.add_handler(proxy_handler)
if proxy_auth_handler:
self.urlopener.add_handler(proxy_auth_handler)
self.urlopener.addheaders = [('User-agent', self.user_agent_string)]