You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-console/console/application/client_class.py

543 lines
21 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#-*- coding: utf-8 -*-
# Copyright 2012 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import urllib2 as u2
import os, sys
from suds.transport.http import HttpTransport
import httplib
from httplib import HTTPConnection
import socket, ssl
import OpenSSL, hashlib
from calculate.core.datavars import DataVarsCore
from calculate.lib.datavars import DataVars
from suds.client import Client
from suds.cache import ObjectCache
from cert_verify import verify, get_CRL
from suds.options import Options
#import cert_func.verify
flag = 0
class Client_suds(Client):
def __init__(self, url, **kwargs):
Client.__init__(self, url, **kwargs)
options = Options()
options.cache = ObjectCache(days=0)
def set_parameters (self, path_to_cert, CERT_FILE, PKEY_FILE):
self.path_to_cert = path_to_cert
print 'CERT_FILE =', CERT_FILE
if not CERT_FILE:
CERT_FILE = ''
self.CERT_FILE = CERT_FILE
self.REQ_FILE = path_to_cert + 'client.csr'
self.PKEY_FILE = PKEY_FILE
self.SID_FILE = path_to_cert + 'sid.int'
self.CRL_PATH = path_to_cert + 'ca/crl/'
if not os.path.exists(self.CRL_PATH):
os.makedirs(self.CRL_PATH)
class clientHTTPSConnection (httplib.HTTPSConnection):
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, cert_path=None):
HTTPConnection.__init__(self, host, port, strict, timeout,
source_address)
self.key_file = key_file
self.cert_file = cert_file
self.cert_path = cert_path
self.CRL_PATH = cert_path + 'ca/crl/'
# get filename store cert server
def cert_list (self, host, ca_certs, server_cert):
if host == '127.0.0.1':
host = 'localhost'
if not os.path.exists(self.trusted_path):
try:
os.makedirs(self.trusted_path)
except OSError:
pass
if not os.path.exists(ca_certs):
fc = open(ca_certs,"w")
fc.close()
filename = None
try:
with open(ca_certs) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split()
if len(words) > 1:
# if first word...
if words[0] == host:
filename = words[1]
if not filename:
return None
except:
print _("Certificate not found in client")
return None
try:
fd = open(self.trusted_path + filename, 'r')
store_cert = fd.read()
fd.close()
if store_cert == server_cert:
return filename
except:
print _("Error open file"), self.trusted_path, filename
return None
def add_all_ca_cert(self, list_ca_certs):
# so root cert be first, ca after
clVarsCore = DataVarsCore()
clVarsCore.importCore()
clVarsCore.flIniFile()
list_ca_certs.reverse()
for cert in list_ca_certs:
system_ca_db = clVarsCore.Get('cl_glob_root_cert')
if os.path.exists(system_ca_db):
if cert in open(system_ca_db, 'r').read():
continue
clVars = DataVars()
clVars.flIniFile()
homePath = clVars.Get('ur_home_path')
user_root_cert = clVarsCore.Get('cl_user_root_cert')
user_root_cert = user_root_cert.replace("~",homePath)
if os.path.exists(user_root_cert):
if cert in open(user_root_cert, 'r').read():
continue
cl_client_cert_dir = clVarsCore.Get('cl_client_cert_dir')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
root_cert_md5 = cl_client_cert_dir + "/ca/cert_list"
md5 = hashlib.md5()
md5.update(cert)
md5sum = md5.hexdigest()
print "\n================================================="
print "md5sum = ", md5sum
if not os.path.exists(root_cert_md5):
fc = open(root_cert_md5,"w")
fc.close()
filename = None
with open(root_cert_md5) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split(' ',1)
if words[0] == md5sum:
filename = words[1]
if not filename:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == 'CN':
filename = item[1]
fc = open(root_cert_md5,"a")
fc.write('%s %s\n' %(md5sum, filename))
fc.close()
if not filename:
print _('Not found field "CN" in certificate!')
return 1
fd = open(cl_client_cert_dir + '/ca/' + filename, 'w')
fd.write(cert)
fd.close()
fa = open(user_root_cert, 'a')
fa.write(cert)
fa.close()
print _("filename = "), filename
print _("CERTIFICATE ADD")
else:
print _("file with ca certificates exists")
get_CRL(cl_client_cert_dir + '/')
def add_ca_cert(self, cert, list_ca_certs):
url = 'https://%s:%s/?wsdl' %(self.host, self.port)
client = Client_suds(url, \
transport = HTTPSClientCertTransport(None,None, self.cert_path))
cert = client.service.get_ca()
if cert == '1':
print _('Server certificate is not valid')
sys.exit()
if cert == '2':
print _('CA not found on server')
sys.exit()
try:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
except:
print _("Error. Certificate not added to trusted")
sys.exit()
print '\n', _("Fingerprint = %s") % certobj.digest('SHA1')
print _("Serial Number = "), certobj.get_serial_number()
Issuer = certobj.get_issuer().get_components()
print '\n', _("Issuer")
for i in Issuer:
print "%s : %s" %(i[0], i[1])
Subject = certobj.get_subject().get_components()
print '\n', _("Subject")
for subj in Subject:
print "%s : %s" %(subj[0], subj[1])
ans = raw_input (_("Add CA certificates to trusted? y/[n]:"))
if ans.lower() in ['y','yes']:
list_ca_certs.append(cert)
self.add_all_ca_cert(list_ca_certs)
else:
print _("Certificate not added to trusted")
sys.exit()
#def add_ca_cert(self, cert, list_ca_certs, prev_host = None):
#certobj = OpenSSL.crypto.load_certificate \
#(OpenSSL.SSL.FILETYPE_PEM, cert)
#Issuer = certobj.get_issuer().get_components()
#for item in Issuer:
#if item[0] == 'L':
#print '\nNetwork adress : ', item[1]
#current_host = item[1]
##print prev_host, item[1]
#try:
#host, port = item[1].split(':')
#port = int(port)
#except:
#print _("Network adress must be host:port. port must be int")
#return 1
#try:
#ca_cert = ssl.get_server_certificate(addr = (host, port))
#except:
#print _("Connection aborted!")
#sys.exit()
#certobj = OpenSSL.crypto.load_certificate \
#(OpenSSL.SSL.FILETYPE_PEM, ca_cert)
#print _("\nFingerprint = %s") % certobj.digest('SHA1')
#print _("Serial Number = "), certobj.get_serial_number()
#Issuer = certobj.get_issuer().get_components()
#print _("\nIssuer")
#for i in Issuer:
#print "%s : %s" %(i[0], i[1])
#Subject = certobj.get_subject().get_components()
#print _("\nSubject")
#for subj in Subject:
#print "%s : %s" %(subj[0], subj[1])
#if prev_host == item[1]:
#'''
#получить корневой сертификат и проверить его на рутовость
#'''
#print _("\nThis is root certificate!")
#print _("Add all CA certificates to trusted?\n"
#"ATTENTION! It allows access to all those who "
#"signed these certificates.\n"
#"This may affect your safety!")
#ans = raw_input (_("Add all CA certificates to trusted? "
#"y/[n]:"))
#if ans.lower() in ['y','yes']:
#self.add_all_ca_cert(list_ca_certs)
#else:
#print _("Certificate not added to trusted")
#sys.exit()
#ans = raw_input (_("\nNext? y/[n]: "))
#if not ans.lower() in ['y','yes']:
#print _("Exit")
#sys.exit()
#list_ca_certs.append(ca_cert)
#self.add_ca_cert(cert, self.list_ca_certs, current_host)
#print _("In this certificate not found field 'L' (Network adress)")
# add certificate server in trusted
def add_server_cert(self, cert):
print _("Untrusted Server Certificate!")
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
print _("\nFingerprint = %s") % certobj.digest('SHA1')
print _("Serial Number = "), certobj.get_serial_number()
Issuer = certobj.get_issuer().get_components()
print _("\nIssuer")
for i in Issuer:
print "%s : %s" %(i[0], i[1])
Subject = certobj.get_subject().get_components()
print _("\nSubject")
for item in Subject:
print "%s : %s" %(item[0], item[1])
print _('\nAdd this Servers certificate to trusted (s) or')
print _('Try add CA and ROOT certificates to trusted (c) or')
choice = raw_input (_("Quit (q)? s/c/[q]: "))
if choice.lower() in ['s', 'c']:
#self.sock = ssl.wrap_socket(sock)
ca_certs = self.trusted_path + "cert.list"
if not os.path.exists(ca_certs):
fc = open(ca_certs,"w")
fc.close()
if self.host == '127.0.0.1':
host = 'localhost'
else: host = self.host
filename = host
fc = open(self.trusted_path + filename,"w")
fc.write(cert)
fc.close()
with open(ca_certs) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split()
if len(words) > 1:
# if first word...
if words[0] == host:
return 0
# Open file with compliance server certificates and server hostname
fcl = open(ca_certs,"a")
fcl.write(host + ' ' + filename + '\n')
fcl.close()
if choice.lower() == 'c':
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
cl_client_cert_dir = clVars.Get('cl_client_cert_dir')
homePath = clVars.Get('ur_home_path')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
root_cert_dir = cl_client_cert_dir + "/ca"
if not os.path.exists(root_cert_dir):
try:
os.makedirs(root_cert_dir)
except OSError:
print _("error creating directory %s") %root_cert_dir
sys.exit()
print _("\nTry add CA and ROOT certificates")
self.list_ca_certs = []
self.add_ca_cert(cert, self.list_ca_certs)
sys.exit()
else:
sys.exit()
def connect_trusted_root(self, sock, root_cert, crl_certs):
self.ca_path = self.cert_path + "ca/"
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
global flag
if self.cert_file:
f = verify(server_cert, crl_certs, flag)
if not f:
flag = 1
elif f == 1:
sys.exit()
else:
import time
time.sleep(1)
try:
self.sock = ssl.wrap_socket(sock,\
certfile = self.cert_file, \
keyfile = self.key_file, \
ca_certs = root_cert, \
ssl_version = ssl.PROTOCOL_SSLv23, \
cert_reqs = ssl.CERT_REQUIRED)
dercert_after_connect = self.sock.getpeercert(True)
cert_after_connect = \
ssl.DER_cert_to_PEM_cert(dercert_after_connect)
if not server_cert == cert_after_connect:
print _("\nWARNING! %s trying to replace certificate!\n")\
%self.host
self.sock.close()
return 2
return 0
except:
return 1
def connect_trusted_server(self, sock, crl_certs):
self.trusted_path = self.cert_path + "trusted/"
ca_cert_list = self.trusted_path + "cert.list"
server_cert = ssl.get_server_certificate(addr = (self.host, self.port))
global flag
if self.cert_file:
f = verify(server_cert, crl_certs, flag)
if not f:
flag = 1
elif f == 1:
sys.exit()
#if not hasattr(HTTPSClientCertTransport, 'filename') or \
#HTTPSClientCertTransport.filename == None:
HTTPSClientCertTransport.filename = self.cert_list \
(self.host, ca_cert_list, server_cert)
if HTTPSClientCertTransport.filename:
try:
self.sock = ssl.wrap_socket(sock,\
certfile = self.cert_file, \
keyfile = self.key_file, \
#ca_certs = self.ca_path + self.filename, \
ssl_version = ssl.PROTOCOL_SSLv23, \
cert_reqs = ssl.CERT_NONE)
dercert_after_connect = self.sock.getpeercert(True)
cert_after_connect = \
ssl.DER_cert_to_PEM_cert(dercert_after_connect)
#filename2 = \
#self.cert_list (self.host, ca_cert_list, cert_after_connect)
#if not HTTPSClientCertTransport.filename == filename2:
#print _("\nWARNING!!! %s trying to replace certificate!\n")\
#%self.host
#self.sock.close()
#return 2
return 0
except Exception, e:
print e
#print _("Error. May be a server cert or secret key not valid ")
#print _("or your certificate %s is not trusted") %self.cert_file
HTTPSClientCertTransport.filename = None
return 1
else:
#self.sock = ssl.wrap_socket(sock, \
#certfile = None, \
#keyfile = None, \
##ca_certs = self.ca_path + self.filename, \
#ssl_version = ssl.PROTOCOL_SSLv23, \
#cert_reqs = ssl.CERT_NONE)
self.add_server_cert (server_cert)
def connect(self):
"""Connect to a host on a given (SSL) port."""
if self.host == '127.0.0.1':
self.host = 'localhost'
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
user_root_cert = clVars.Get('cl_user_root_cert')
homePath = clVars.Get('ur_home_path')
user_root_cert = user_root_cert.replace("~",homePath)
result_user_root = 1
if os.path.exists(user_root_cert):
result_user_root = self.connect_trusted_root (sock, user_root_cert,\
self.CRL_PATH)
#print 'rur = ',result_user_root
if result_user_root == 1:
glob_root_cert = clVars.Get('cl_glob_root_cert')
result_root_con = 1
if os.path.exists(glob_root_cert):
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
result_root_con = self.connect_trusted_root(sock, glob_root_cert,\
self.CRL_PATH)
#print 'rrc = ',result_root_con
if result_root_con == 1:
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
if self._tunnel_host:
self.sock = sock
self._tunnel()
result_server_con = self.connect_trusted_server \
(sock, self.CRL_PATH)
#print 'rsc = ',result_server_con
if result_server_con in [1,2]:
raise Exception
#sys.exit(1)
elif result_root_con == 2:
#sys.exit(1)
raise Exception
elif result_user_root == 2:
#sys.exit(1)
raise Exception
class HTTPSClientAuthHandler(u2.HTTPSHandler):
def __init__(self, key, cert, cert_path):
u2.HTTPSHandler.__init__(self)
self.key = key
self.cert = cert
self.cert_path = cert_path
def https_open(self, req):
#Rather than pass in a reference to a connection class, we pass in
# a reference to a function which, for all intents and purposes,
# will behave as a constructor
return self.do_open(self.getConnection, req)
def getConnection(self, host, timeout=300):
#return httplib.HTTPSConnection(host, key_file=self.key, cert_file=self.cert)
return clientHTTPSConnection(host, key_file=self.key,\
cert_file=self.cert, cert_path = self.cert_path)
class HTTPSClientCertTransport(HttpTransport):
def __init__(self, key, cert, path_to_cert, *args, **kwargs):
HttpTransport.__init__(self, *args, **kwargs)
self.key = key
self.cert = cert
self.cert_path = path_to_cert
def u2open(self, u2request):
"""
Open a connection.
@param u2request: A urllib2 request.
@type u2request: urllib2.Requet.
@return: The opened file-like urllib2 object.
@rtype: fp
"""
tm = self.options.timeout
url = u2.build_opener(HTTPSClientAuthHandler(self.key, self.cert,\
self.cert_path))
#from urllib2 import URLError
#try:
if self.u2ver() < 2.6:
socket.setdefaulttimeout(tm)
return url.open(u2request)
else:
return url.open(u2request, timeout=tm)
#except URLError, e:
#print _("Excertion URLError: "), e
#sys.exit(1)