# -*- coding: utf-8 -*- # Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import urllib.request as u2 if hasattr(u2, "ssl"): u2.ssl._create_default_https_context = u2.ssl._create_unverified_context import os import sys import socket import ssl import OpenSSL import hashlib import M2Crypto import calculate.contrib from calculate.core.datavars import DataVarsCore from calculate.lib.datavars import DataVars import calculate.contrib from suds.client import Client from .cert_verify import verify, get_CRL import http.client as httplib from suds.transport.http import HttpTransport from .pyopenssl_wrapper import PyOpenSSLSocket from suds.transport import Transport from suds.properties import Unskin from http.cookiejar import CookieJar, DefaultCookiePolicy from logging import getLogger from ..datavars import DataVarsConsole from calculate.lib.cl_lang import setLocalTranslate from .sid_func import SessionId from calculate.lib.utils.files import readFile _ = lambda x: x setLocalTranslate('cl_console3', sys.modules[__name__]) log = getLogger(__name__) flag = 0 class SUDSHTTPRedirectHandler(u2.HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): """Return a Request or None in response to a redirect. This is called by the http_error_30x methods, it was taken from the original Python version and modified to use POST when redirection takes place. This allows a SOAP message to be redirected without a loss of content. """ m = req.get_method() if (code in (301, 302, 303, 307) and m in ("GET", "HEAD") or code in (301, 302, 303) and m == "POST"): newurl = newurl.replace(' ', '%20') newheaders = dict((k,v) for k,v in req.headers.items() if k.lower() not in ("content-length", "content-type") ) log.debug("Redirecting to %s", newurl) return u2.Request(newurl, data=req.data, # here we pass the original data headers=newheaders, origin_req_host=req.get_origin_req_host(), unverifiable=True, ) else: raise u2.HTTPError(req.get_full_url(), code, msg, headers, fp) # class CheckingHTTPSConnection(httplib.HTTPSConnection): # """based on httplib.HTTPSConnection code - extended to support # server certificate verification and client certificate authorization""" # response_class = MyHTTPResponse # FORCE_SSL_VERSION = None # SERVER_CERT_CHECK = True # might be turned off when a workaround is needed # def __init__(self, host, ca_certs=None, cert_verifier=None, # keyobj=None, certobj=None, **kw): # """cert_verifier is a function returning either True or False # based on whether the certificate was found to be OK, # keyobj and certobj represent internal PyOpenSSL structures holding # the key and certificate respectively. # """ # httplib.HTTPSConnection.__init__(self, host, **kw) # self.ca_certs = ca_certs # self.cert_verifier = cert_verifier # self.keyobj = keyobj # self.certobj = certobj # def connect(self): # sock = socket.create_connection((self.host, self.port), self.timeout) # if hasattr(self, '_tunnel_host') and self._tunnel_host: # self.sock = sock # self._tunnel() # if self.FORCE_SSL_VERSION: # add = {'ssl_version': self.FORCE_SSL_VERSION} # else: # add = {} # if self.SERVER_CERT_CHECK and self.ca_certs: # add['cert_reqs'] = ssl.CERT_REQUIRED # else: # add['cert_reqs'] = ssl.CERT_NONE # # try to use PyOpenSSL by default # if PYOPENSSL_AVAILABLE: # wrap_class = PyOpenSSLSocket # add['keyobj'] = self.keyobj # add['certobj'] = self.certobj # add['keyfile'] = self.key_file # add['certfile'] = self.cert_file # else: # wrap_class = ssl.SSLSocket # self.sock = wrap_class(sock, ca_certs=self.ca_certs, **add) # #if self.cert_verifier and self.SERVER_CERT_CHECK: # # if not self.cert_verifier(self.sock.getpeercert()): # # raise Exception("Server certificate did not pass security check.", # # self.sock.getpeercert()) class Client_suds(SessionId, Client): def set_parameters(self, path_to_cert, CERT_FILE, PKEY_FILE, HOST): self.path_to_cert = path_to_cert if not CERT_FILE: CERT_FILE = '' self.CERT_FILE = CERT_FILE self.REQ_FILE = path_to_cert + 'client.csr' self.PKEY_FILE = PKEY_FILE self.SID_FILE = path_to_cert + 'sids' self.SID_LOCK = path_to_cert + 'sids.lock' self.CRL_PATH = path_to_cert + 'ca/crl/' self.HOST = HOST if not os.path.exists(self.CRL_PATH): os.makedirs(self.CRL_PATH) class CheckingClientHTTPSConnection(httplib.HTTPSConnection): """based on httplib.HTTPSConnection code""" response_class = httplib.HTTPResponse FORCE_SSL_VERSION = None SERVER_CERT_CHECK = True # might be turned off when a workaround is needed def __init__(self, cert_path, host, ca_certs=None, cert_verifier=None, keyobj=None, certobj=None, wait_thread=None, **kw): """cert_verifier is a function returning either True or False based on whether the certificate was found to be OK, keyobj and certobj represent internal PyOpenSSL structures holding the key and certificate respectively. """ httplib.HTTPSConnection.__init__(self, host, **kw) self.ca_certs = ca_certs self.cert_verifier = cert_verifier self.keyobj = keyobj self.certobj = certobj self.cert_path = cert_path self.CRL_PATH = os.path.join(cert_path, 'ca/crl/') self.wait_thread = wait_thread # get filename store cert server def cert_list(self, host, ca_certs, server_cert): if host == '127.0.0.1': host = 'localhost' if not os.path.exists(self.trusted_path): try: os.makedirs(self.trusted_path) except OSError: pass if not os.path.exists(ca_certs): fc = open(ca_certs, "w") fc.close() filename = None try: with open(ca_certs) as fd: t = fd.read() # for each line for line in t.splitlines(): # Split string into a words list words = line.split() if len(words) > 1: # if first word... if words[0] == host: filename = words[1] if not filename: return None except: print(_("Certificate not found on the client`s side")) return None try: fd = open(self.trusted_path + filename, 'r') store_cert = fd.read() fd.close() if store_cert == server_cert: return filename except: print(_("Failed to open the file"), self.trusted_path, filename) return None def add_all_ca_cert(self, list_ca_certs): # so root cert be first, ca after clVarsCore = DataVarsCore() clVarsCore.importCore() clVarsCore.flIniFile() list_ca_certs.reverse() system_ca_db = clVarsCore.Get('core.cl_glob_root_cert') clVars = DataVars() clVars.flIniFile() homePath = clVars.Get('ur_home_path') cl_client_cert_dir = clVarsCore.Get('core.cl_client_cert_dir') cl_client_cert_dir = cl_client_cert_dir.replace("~", homePath) root_cert_md5 = os.path.join(cl_client_cert_dir, "ca/cert_list") user_root_cert = clVarsCore.Get('core.cl_user_root_cert') user_root_cert = user_root_cert.replace("~", homePath) for cert in list_ca_certs: if os.path.exists(system_ca_db): if cert in readFile(system_ca_db): continue if os.path.exists(user_root_cert): if cert in readFile(user_root_cert): continue md5 = hashlib.md5() md5.update(cert.encode("UTF-8")) md5sum = md5.hexdigest() print("\n=================================================") print("md5sum = ", md5sum) if not os.path.exists(root_cert_md5): fc = open(root_cert_md5, "w") fc.close() filename = None with open(root_cert_md5) as fd: t = fd.read() # for each line for line in t.splitlines(): # Split string into a words list words = line.split(' ', 1) if words[0] == md5sum: filename = words[1] if not filename: certobj = OpenSSL.crypto.load_certificate( OpenSSL.SSL.FILETYPE_PEM, cert) Issuer = certobj.get_issuer().get_components() for item in Issuer: if item[0] == b'CN': filename = item[1].decode("UTF-8") fc = open(root_cert_md5, "a") fc.write('%s %s\n' % (md5sum, filename)) fc.close() if not filename: print(_('Field "CN" not found in the certificate!')) return 1 fd = open(os.path.join(cl_client_cert_dir, 'ca/', filename), 'w') fd.write(cert) fd.close() fa = open(user_root_cert, 'a') fa.write(cert) fa.close() print(_("filename = "), filename) print(_("Certificate added")) else: print(_("The file containing the CA certificate now exists")) get_CRL(cl_client_cert_dir) def add_ca_cert(self, cert, list_ca_certs): url = 'https://%s:%s/?wsdl' % (self.host, self.port) client = Client_suds( url, transport=HTTPSClientCertTransport(None, None, self.cert_path)) client.wsdl.services[0].setlocation(url) cert = client.service.get_ca() if cert == '1': print(_("Invalid server certificate!")) raise Exception(1) if cert == '2': print(_("CA certificate not found on the server")) raise Exception(1) try: certobj = OpenSSL.crypto.load_certificate( OpenSSL.SSL.FILETYPE_PEM, cert) except: print(_("Error. Certificate not added to trusted")) raise Exception(1) print('\n', _("Fingerprint = %s") % certobj.digest('SHA1')) print(_("Serial Number = "), certobj.get_serial_number()) Issuer = certobj.get_issuer().get_components() print('\n', _("Issuer")) for i in Issuer: print("%s : %s" % (i[0], i[1])) Subject = certobj.get_subject().get_components() print('\n', _("Subject")) for subj in Subject: print("%s : %s" % (subj[0], subj[1])) ans = input(_("Add the CA certificate to trusted? y/[n]:")) if ans.lower() in ['y', 'yes']: list_ca_certs.append(cert) self.add_all_ca_cert(list_ca_certs) else: print(_("Certificate not added to trusted")) # add certificate server in trusted def add_server_cert(self, cert): self.wait_thread.stop() print(_("Untrusted server certificate!")) certobj = OpenSSL.crypto.load_certificate( OpenSSL.SSL.FILETYPE_PEM, cert) print('\n' + _("Fingerprint = %s") % certobj.digest('SHA1')) print(_("Serial Number = "), certobj.get_serial_number()) Issuer = certobj.get_issuer().get_components() print('\n' + _("Issuer")) for i in Issuer: print("%s : %s" % (i[0], i[1])) Subject = certobj.get_subject().get_components() print('\n' + _("Subject")) for item in Subject: print("%s : %s" % (item[0], item[1])) print('\n' + _('Add this server certificate to trusted (s) or')) print(_('Try to add the CA and root certificates to trusted (c) or')) choice = input(_("Quit (q)? s/c/[q]: ")) if choice.lower() in ['s', 'c']: # self.sock = ssl.wrap_socket(sock) ca_certs = os.path.join(self.trusted_path, "cert.list") if not os.path.exists(ca_certs): fc = open(ca_certs, "w") fc.close() if self.host == '127.0.0.1': host = 'localhost' else: host = self.host filename = host fc = open(self.trusted_path + filename, "w") fc.write(cert) fc.close() with open(ca_certs) as fd: t = fd.read() # for each line for line in t.splitlines(): # Split string into a words list words = line.split() if len(words) > 1: # if first word... if words[0] == host: return 0 # Open file with compliance server certificates and server hostname fcl = open(ca_certs, "a") fcl.write(host + ' ' + filename + '\n') fcl.close() if choice.lower() != 'c': return 3 if choice.lower() == 'c': clVars = DataVarsCore() clVars.importCore() clVars.flIniFile() cl_client_cert_dir = clVars.Get('core.cl_client_cert_dir') homePath = clVars.Get('ur_home_path') cl_client_cert_dir = cl_client_cert_dir.replace("~", homePath) root_cert_dir = os.path.join(cl_client_cert_dir, "ca") if not os.path.exists(root_cert_dir): try: os.makedirs(root_cert_dir) except OSError: print(_("Failed to create directory %s") % root_cert_dir) raise Exception(1) print('\n' + _("Add the CA and root certificates")) self.list_ca_certs = [] self.add_ca_cert(cert, self.list_ca_certs) return 3 elif not choice.lower() in ['c', 's']: return 4 def connect_trusted_root(self, sock, root_cert, crl_certs): self.ca_path = self.cert_path + "ca/" server_cert = ssl.get_server_certificate(addr=(self.host, self.port)) global flag if self.cert_file: f = verify(server_cert, crl_certs, flag) if not f: flag = 1 elif f == 1: raise Exception(1) else: import time time.sleep(0.1) try: if self.FORCE_SSL_VERSION: add = {'ssl_version': self.FORCE_SSL_VERSION} else: add = {} add['cert_reqs'] = ssl.CERT_REQUIRED add['keyobj'] = self.keyobj add['certobj'] = self.certobj add['keyfile'] = self.key_file add['certfile'] = self.cert_file self.sock = PyOpenSSLSocket(sock, ca_certs=self.ca_certs, **add) return 0 except: return 1 def connect_trusted_server(self, sock, crl_certs): self.trusted_path = self.cert_path + "trusted/" ca_cert_list = self.trusted_path + "cert.list" server_cert = ssl.get_server_certificate(addr=(self.host, self.port)) global flag if self.cert_file: f = verify(server_cert, crl_certs, flag) if not f: flag = 1 elif f == 1: raise Exception(1) # if not hasattr(HTTPSClientCertTransport, 'filename') or \ # HTTPSClientCertTransport.filename == None: HTTPSClientCertTransport.filename = self.cert_list( self.host, ca_cert_list, server_cert) if HTTPSClientCertTransport.filename: try: if self.FORCE_SSL_VERSION: add = {'ssl_version': self.FORCE_SSL_VERSION} else: add = {} add['cert_reqs'] = ssl.CERT_NONE add['keyobj'] = self.keyobj add['certobj'] = self.certobj add['keyfile'] = self.key_file add['certfile'] = self.cert_file self.sock = PyOpenSSLSocket(sock, ca_certs=None, **add) return 0 except Exception: HTTPSClientCertTransport.filename = None return 1 else: return self.add_server_cert(server_cert) def connect(self): sock = socket.create_connection((self.host, self.port), self.timeout) if hasattr(self, '_tunnel_host') and self._tunnel_host: self.sock = sock self._tunnel() self.Vars = DataVarsConsole() self.Vars.importConsole() self.Vars.flIniFile() user_root_cert = self.Vars.Get('core.cl_user_root_cert') homePath = self.Vars.Get('ur_home_path') user_root_cert = user_root_cert.replace("~", homePath) result_user_root = 1 while True: if os.path.exists(user_root_cert): result_user_root = self.connect_trusted_root(sock, user_root_cert, self.CRL_PATH) if result_user_root == 1: glob_root_cert = self.Vars.Get('core.cl_glob_root_cert') result_root_con = 1 if os.path.exists(glob_root_cert): sock = socket.create_connection((self.host, self.port), self.timeout, self.source_address) if self._tunnel_host: self.sock = sock self._tunnel() result_root_con = self.connect_trusted_root(sock, glob_root_cert, self.CRL_PATH) if result_root_con == 1: sock = socket.create_connection((self.host, self.port), self.timeout, self.source_address) if self._tunnel_host: self.sock = sock self._tunnel() result_server_con = self.connect_trusted_server( sock, self.CRL_PATH) if result_server_con in [1, 2]: raise Exception(1) elif result_server_con == 3: continue elif result_server_con == 4: print(_('This server is not trusted')) self.wait_thread.stop() sys.exit(1) elif result_root_con == 2: raise Exception(1) elif result_user_root == 2: raise Exception(1) break class CheckingClientHTTPSHandler(u2.HTTPSHandler): def __init__(self, cert_path, ca_certs=None, cert_verifier=None, client_certfile=None, client_keyfile=None, client_keyobj=None, client_certobj=None, *args, **kw): """cert_verifier is a function returning either True or False based on whether the certificate was found to be OK""" u2.HTTPSHandler.__init__(self, *args, **kw) self.ca_certs = ca_certs self.cert_verifier = cert_verifier self.client_keyfile = client_keyfile # filename self.client_certfile = client_certfile # filename self.keyobj = client_keyobj self.certobj = client_certobj # FOR DEBUG # self.set_http_debuglevel(100) self.cert_path = cert_path def https_open(self, req): def open(*args, **kw): new_kw = dict(ca_certs=self.ca_certs, cert_verifier=self.cert_verifier, cert_file=self.client_certfile, key_file=self.client_keyfile, keyobj=self.keyobj, certobj=self.certobj) new_kw.update(kw) return CheckingClientHTTPSConnection(self.cert_path, *args, **new_kw) return self.do_open(open, req) https_request = u2.AbstractHTTPHandler.do_request_ class HTTPSClientCertTransport(HttpTransport): def __init__(self, key, cert, path_to_cert, password=None, ca_certs=None, cert_verifier=None, client_keyfile=None, client_certfile=None, client_keyobj=None, client_certobj=None, cookie_callback=None, user_agent_string=None, wait_thread=None, **kwargs): Transport.__init__(self) self.key = key self.cert = cert self.cert_path = path_to_cert if key: with open(cert) as cert_file: client_certobj = OpenSSL.crypto.load_certificate \ (OpenSSL.SSL.FILETYPE_PEM, cert_file.read()) if password: with open(key) as key_file: client_keyobj = OpenSSL.crypto.load_privatekey \ (OpenSSL.SSL.FILETYPE_PEM, key_file.read(), password) else: import M2Crypto bio = M2Crypto.BIO.openfile(key) rsa = M2Crypto.m2.rsa_read_key(bio._ptr(),lambda *unused: b"") if not rsa: raise OpenSSL.crypto.Error with open(key) as key_file: client_keyobj = OpenSSL.crypto.load_privatekey(OpenSSL.SSL.FILETYPE_PEM, key_file.read()) Unskin(self.options).update(kwargs) self.cookiejar = CookieJar(DefaultCookiePolicy()) self.cookie_callback = cookie_callback self.user_agent_string = user_agent_string log.debug("Proxy: %s", self.options.proxy) if ca_certs or (client_keyfile and client_certfile) \ or (client_keyobj and client_certobj): https_handler = CheckingClientHTTPSHandler(cert_path=path_to_cert, ca_certs=ca_certs, cert_verifier=cert_verifier, client_keyfile=client_keyfile, client_certfile=client_certfile, client_keyobj=client_keyobj, client_certobj=client_certobj) else: https_handler = u2.HTTPSHandler() self.urlopener = u2.build_opener(SUDSHTTPRedirectHandler(), u2.HTTPCookieProcessor(self.cookiejar), https_handler) # relic from old times: # from dslib.network import ProxyManager # proxy_handler = ProxyManager.HTTPS_PROXY.create_proxy_handler() # proxy_auth_handler = ProxyManager.HTTPS_PROXY.create_proxy_auth_handler() # apparently, dslib simply returned None on create_proxy_auth_handler # if this is ever needed, probably use urllib2.ProxyBasicAuthHandler # proxy_auth_handler = None # and create_proxy_handler SHOULD HAVE eval'd to this: # proxy_handler = urllib2.ProxyHandler({"https" : "https://hostname"}) # but because no hostname was given, it also just returned None # proxy_handler = None #these two literally do nothing right now # if proxy_handler: # self.urlopener.add_handler(proxy_handler) # if proxy_auth_handler: # self.urlopener.add_handler(proxy_auth_handler) self.urlopener.addheaders = [('User-agent', str(self.user_agent_string))]