You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-core/pym/core/client/client_class.py

485 lines
18 KiB

# -*- coding: utf-8 -*-
# Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import urllib2 as u2
import os
import sys
# from sudsds.transport.http import HttpTransport
from suds.transport.http import HttpTransport
import httplib #http.client in python3
from httplib import HTTPConnection
import socket
import ssl
import hashlib
from calculate.core.datavars import DataVarsCore
from calculate.lib.datavars import DataVars
from calculate.lib.utils.files import readFile
# from sudsds.client import Client
from suds.client import Client
from cert_verify import verify, get_CRL
if hasattr(ssl, "PROTOCOL_TLSv1_2"):
ssl_version = ssl.PROTOCOL_TLSv1_2
else:
ssl_version = None
_ = lambda x: x
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_core3', sys.modules[__name__])
flag = 0
class Client_suds(Client):
def set_parameters(self, path_to_cert, CERT_FILE, PKEY_FILE):
self.path_to_cert = path_to_cert
if not CERT_FILE:
CERT_FILE = ''
self.CERT_FILE = CERT_FILE
self.REQ_FILE = path_to_cert + 'client.csr'
self.PKEY_FILE = PKEY_FILE
self.SID_FILE = path_to_cert + 'sid.int'
self.CRL_PATH = path_to_cert + 'ca/crl/'
if not os.path.exists(self.CRL_PATH):
os.makedirs(self.CRL_PATH)
class clientHTTPSConnection(httplib.HTTPSConnection):
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, cert_path=None):
HTTPConnection.__init__(self, host, port, strict, timeout,
source_address)
self.key_file = key_file
self.cert_file = cert_file
self.cert_path = cert_path
self.CRL_PATH = cert_path + 'ca/crl/'
# get filename store cert server
def cert_list(self, host, ca_certs, server_cert):
if host == '127.0.0.1':
host = 'localhost'
if not os.path.exists(self.trusted_path):
try:
os.makedirs(self.trusted_path)
except OSError:
pass
if not os.path.exists(ca_certs):
fc = open(ca_certs, "w")
fc.close()
filename = None
try:
with open(ca_certs) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split()
if len(words) > 1:
# if first word...
if words[0] == host:
filename = words[1]
if not filename:
return None
except (IndexError, IOError):
print _("Certificate not found on the client's side")
return None
try:
with open(self.trusted_path + filename, 'r') as fd:
store_cert = fd.read()
if store_cert == server_cert:
return filename
except IOError:
print _("Failed to open the file"), self.trusted_path, filename
return None
def add_all_ca_cert(self, list_ca_certs):
# so root cert be first, ca after
clVarsCore = DataVarsCore()
clVarsCore.importCore()
clVarsCore.flIniFile()
list_ca_certs.reverse()
import OpenSSL
for cert in list_ca_certs:
system_ca_db = clVarsCore.Get('cl_glob_root_cert')
if os.path.exists(system_ca_db):
if cert in readFile(system_ca_db):
continue
clVars = DataVars()
clVars.flIniFile()
homePath = clVars.Get('ur_home_path')
user_root_cert = clVarsCore.Get('cl_user_root_cert')
user_root_cert = user_root_cert.replace("~", homePath)
if os.path.exists(user_root_cert):
if cert in readFile(user_root_cert):
continue
cl_client_cert_dir = clVarsCore.Get('cl_client_cert_dir')
cl_client_cert_dir = cl_client_cert_dir.replace("~", homePath)
root_cert_md5 = cl_client_cert_dir + "/ca/cert_list"
md5 = hashlib.md5()
md5.update(cert)
md5sum = md5.hexdigest()
print "\n================================================="
print "md5sum = ", md5sum
if not os.path.exists(root_cert_md5):
fc = open(root_cert_md5, "w")
fc.close()
filename = None
with open(root_cert_md5) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split(' ', 1)
if words[0] == md5sum:
filename = words[1]
if not filename:
certobj = OpenSSL.crypto.load_certificate(
OpenSSL.SSL.FILETYPE_PEM, cert)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == 'CN':
filename = item[1]
fc = open(root_cert_md5, "a")
fc.write('%s %s\n' % (md5sum, filename))
fc.close()
if not filename:
print _('Field "CN" not found in the certificate!')
return 1
fd = open(cl_client_cert_dir + '/ca/' + filename, 'w')
fd.write(cert)
fd.close()
fa = open(user_root_cert, 'a')
fa.write(cert)
fa.close()
print _("filename = "), filename
print _("Certificate added")
else:
print _("file containing the CA certificate now exists")
get_CRL(cl_client_cert_dir + '/')
def add_ca_cert(self, cert, list_ca_certs):
url = 'https://%s:%s/?wsdl' % (self.host, self.port)
client = Client_suds(url, transport=HTTPSClientCertTransport(
None, None, self.cert_path))
cert = client.service.get_ca()
if cert == '1':
print _("Invalid server certificate!")
sys.exit()
if cert == '2':
print _("CA certificate not found on the server")
sys.exit()
import OpenSSL
try:
certobj = OpenSSL.crypto.load_certificate(
OpenSSL.SSL.FILETYPE_PEM, cert)
except (OpenSSL.SSL.Error, IOError):
print _("Error. Certificate not added to the trusted list.")
sys.exit()
print '\n' + _("Fingerprint = %s") % certobj.digest('SHA1')
print _("Serial number = "), certobj.get_serial_number()
Issuer = certobj.get_issuer().get_components()
print '\n' + _("Issuer")
for i in Issuer:
print "%s : %s" % (i[0], i[1])
Subject = certobj.get_subject().get_components()
print '\n' + _("Subject")
for subj in Subject:
print "%s : %s" % (subj[0], subj[1])
ans = raw_input(_("Add the CA certificate to trusted? y/[n]:"))
if ans.lower() in ['y', 'yes']:
list_ca_certs.append(cert)
self.add_all_ca_cert(list_ca_certs)
else:
print _("Certificate not added to trusted")
sys.exit()
# add certificate server in trusted
def add_server_cert(self, cert):
import OpenSSL
print _("Untrusted server certificate!")
certobj = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM,
cert)
print '\n' + _("Fingerprint = %s") % certobj.digest('SHA1')
print _("Serial number = "), certobj.get_serial_number()
Issuer = certobj.get_issuer().get_components()
print '\n' + _("Issuer")
for i in Issuer:
print "%s : %s" % (i[0], i[1])
Subject = certobj.get_subject().get_components()
print '\n' + _("Subject")
for item in Subject:
print "%s : %s" % (item[0], item[1])
print '\n' + _('Add this server certificate to trusted (s) or')
print _('Try to add the CA and root certificates to trusted (c) or')
choice = raw_input(_("Quit (q)? s/c/[q]: "))
if choice.lower() in ['s', 'c']:
# self.sock = ssl.wrap_socket(sock)
ca_certs = self.trusted_path + "cert.list"
if not os.path.exists(ca_certs):
fc = open(ca_certs, "w")
fc.close()
if self.host == '127.0.0.1':
host = 'localhost'
else:
host = self.host
filename = host
fc = open(self.trusted_path + filename, "w")
fc.write(cert)
fc.close()
with open(ca_certs) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split()
if len(words) > 1:
# if first word...
if words[0] == host:
return 0
# Open file with compliance server certificates and server hostname
fcl = open(ca_certs, "a")
fcl.write(host + ' ' + filename + '\n')
fcl.close()
if choice.lower() == 'c':
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
cl_client_cert_dir = clVars.Get('cl_client_cert_dir')
homePath = clVars.Get('ur_home_path')
cl_client_cert_dir = cl_client_cert_dir.replace("~", homePath)
root_cert_dir = cl_client_cert_dir + "/ca"
if not os.path.exists(root_cert_dir):
try:
os.makedirs(root_cert_dir)
except OSError:
print _("Error creating directory %s") % root_cert_dir
sys.exit()
print '\n' + _("Add the CA and root certificates")
self.list_ca_certs = []
self.add_ca_cert(cert, self.list_ca_certs)
sys.exit()
else:
sys.exit()
def connect_trusted_root(self, sock, root_cert, crl_certs):
self.ca_path = self.cert_path + "ca/"
server_cert = ssl.get_server_certificate(addr=(self.host, self.port))
global flag
if self.cert_file:
f = verify(server_cert, crl_certs, flag)
if not f:
flag = 1
elif f == 1:
sys.exit()
else:
import time
time.sleep(1)
try:
if ssl_version is None:
print _("SSL library is not support TLSv1_2")
self.sock.close()
return 2
self.sock = ssl.wrap_socket(sock,
certfile=self.cert_file,
keyfile=self.key_file,
ca_certs=root_cert,
ssl_version=ssl_version,
cert_reqs=ssl.CERT_REQUIRED)
dercert_after_connect = self.sock.getpeercert(True)
cert_after_connect = \
ssl.DER_cert_to_PEM_cert(dercert_after_connect)
if not server_cert == cert_after_connect:
print ('\n' +
_("WARNING! %s trying to replace the certificate!")
% self.host + '\n')
self.sock.close()
return 2
return 0
except Exception:
return 1
def connect_trusted_server(self, sock, crl_certs):
self.trusted_path = self.cert_path + "trusted/"
ca_cert_list = self.trusted_path + "cert.list"
server_cert = ssl.get_server_certificate(addr=(self.host, self.port))
global flag
if self.cert_file:
f = verify(server_cert, crl_certs, flag)
if not f:
flag = 1
elif f == 1:
sys.exit()
HTTPSClientCertTransport.filename = self.cert_list(
self.host, ca_cert_list, server_cert)
if HTTPSClientCertTransport.filename:
try:
if ssl_version is None:
print _("SSL library is not support TLSv1_2")
HTTPSClientCertTransport.filename = None
self.sock.close()
return 1
self.sock = ssl.wrap_socket(sock,
certfile=self.cert_file,
keyfile=self.key_file,
ssl_version=ssl_version,
cert_reqs=ssl.CERT_NONE)
dercert_after_connect = self.sock.getpeercert(True)
ssl.DER_cert_to_PEM_cert(dercert_after_connect)
return 0
except Exception as e:
print e
HTTPSClientCertTransport.filename = None
return 1
else:
self.add_server_cert(server_cert)
def connect(self):
"""Connect to a host on a given (SSL) port."""
if self.host == '127.0.0.1':
self.host = 'localhost'
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
user_root_cert = clVars.Get('cl_user_root_cert')
homePath = clVars.Get('ur_home_path')
user_root_cert = user_root_cert.replace("~", homePath)
result_user_root = 1
if os.path.exists(user_root_cert):
result_user_root = self.connect_trusted_root(sock, user_root_cert,
self.CRL_PATH)
# print 'rur = ',result_user_root
if result_user_root == 1:
glob_root_cert = clVars.Get('cl_glob_root_cert')
result_root_con = 1
if os.path.exists(glob_root_cert):
sock = socket.create_connection((self.host, self.port),
self.timeout,
self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
result_root_con = self.connect_trusted_root(sock,
glob_root_cert,
self.CRL_PATH)
# print 'rrc = ',result_root_con
if result_root_con == 1:
sock = socket.create_connection((self.host, self.port),
self.timeout,
self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
result_server_con = self.connect_trusted_server(
sock, self.CRL_PATH)
# print 'rsc = ',result_server_con
if result_server_con in [1, 2]:
raise Exception
# sys.exit(1)
elif result_root_con == 2:
# sys.exit(1)
raise Exception
elif result_user_root == 2:
# sys.exit(1)
raise Exception
class HTTPSClientAuthHandler(u2.HTTPSHandler):
def __init__(self, key, cert, cert_path):
u2.HTTPSHandler.__init__(self)
self.key = key
self.cert = cert
self.cert_path = cert_path
def https_open(self, req):
# Rather than pass in a reference to a connection class, we pass in
# a reference to a function which, for all intents and purposes,
# will behave as a constructor
return self.do_open(self.getConnection, req)
def getConnection(self, host, timeout=300):
return clientHTTPSConnection(host, key_file=self.key,
cert_file=self.cert,
cert_path=self.cert_path)
class HTTPSClientCertTransport(HttpTransport):
def __init__(self, key, cert, path_to_cert, *args, **kwargs):
HttpTransport.__init__(self, *args, **kwargs)
self.key = key
self.cert = cert
self.cert_path = path_to_cert
def u2open(self, u2request):
"""
Open a connection.
@param u2request: A urllib2 request.
@type u2request: urllib2.Requet.
@return: The opened file-like urllib2 object.
@rtype: fp
"""
tm = self.options.timeout
url = u2.build_opener(HTTPSClientAuthHandler(self.key, self.cert,
self.cert_path))
# from urllib2 import URLError
# try:
if hasattr(self, "u2ver"):
if self.u2ver() < 2.6:
socket.setdefaulttimeout(tm)
return url.open(u2request)
else:
return url.open(u2request, timeout=tm)
else:
return url.open(u2request, timeout=tm)