You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-console-gui/console/gui/CertificateClass.py

323 lines
12 KiB

#!/usr/bin/python
#-*- coding: utf-8 -*-
# Copyright 2012 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from PySide import QtGui, QtCore
import urllib2
import OpenSSL, hashlib
from calculate.lib.cl_datavars import ClDataVars
from calculate.api.client.function import get_ip_mac_type
from calculate.api.cl_api import DataVarsApi
from more import show_msg, LabelWordWrap, show_question,ClearLineEdit
from create_cert import RequestCreate
from client_class import HTTPSClientCertTransport
import os
class CertClass (QtGui.QWidget):
def __init__(self, parent, ClientObj):
self.ClientObj = ClientObj
QtGui.QWidget.__init__(self)
# get_cert_path
clVars = ClDataVars()
clVars.flIniFile()
homePath = clVars.Get('ur_home_path')
self.default_cert_path = '~/.calculate/client_cert/'
self.default_cert_path = self.default_cert_path.replace("~",homePath)
self.sendlayout = QtGui.QGridLayout()
self.getlayout = QtGui.QGridLayout()
self.mainlayout = QtGui.QVBoxLayout()
# gen ceth by host
self.sendlayout.addWidget(LabelWordWrap(_('Host')), 1,0)
self.send_host = ClearLineEdit(_('Enter Host'))
self.sendlayout.addWidget(self.send_host, 1, 1)
self.sendlayout.addWidget(LabelWordWrap(_('Port')), 2,0)
self.send_port = ClearLineEdit('8888')
self.send_port.setValidator(QtGui.QIntValidator(self))
self.sendlayout.addWidget(self.send_port, 2, 1)
Send_button = QtGui.QPushButton("Send")
Send_button.clicked.connect(self.send)
self.sendlayout.addWidget(Send_button, 3, 1)
self.GroupBoxSend = QtGui.QGroupBox(_('Send certificate signing request'))
self.GroupBoxSend.setLayout(self.sendlayout)
# GroupBox get certificate
self.getlayout.addWidget(LabelWordWrap(_('Host')), 1,0)
self.get_host = ClearLineEdit(_('Enter Host'))
self.getlayout.addWidget(self.get_host, 1, 1)
self.getlayout.addWidget(LabelWordWrap(_('Port')), 2,0)
self.get_port = ClearLineEdit('8888')
self.get_port.setValidator(QtGui.QIntValidator(self))
self.getlayout.addWidget(self.get_port, 2, 1)
Get_button = QtGui.QPushButton("Get")
Get_button.clicked.connect(self.get)
self.getlayout.addWidget(Get_button, 3, 1)
self.GroupBoxGet = QtGui.QGroupBox(_('Get certificate from server'))
self.GroupBoxGet.setLayout(self.getlayout)
# group all in widget
Quit_button = QtGui.QPushButton("Quit")
self.connect(Quit_button, QtCore.SIGNAL("clicked()"),
self, QtCore.SLOT("close()"))
self.move(100+parent.frameGeometry().x(), \
100+parent.frameGeometry().y())
self.resize(400,200)
self.mainlayout.addWidget(self.GroupBoxSend)
self.mainlayout.addWidget(self.GroupBoxGet)
self.mainlayout.addWidget(Quit_button)
self.setLayout(self.mainlayout)
self.setFocus()
# for clear memory after closed this window
self.setAttribute(QtCore.Qt.WA_DeleteOnClose)
self.setWindowTitle (_('Certificates'))
self.setWindowIcon (QtGui.QIcon.fromTheme("view-certificate"))
def send(self):
### client_post_request analog
# validation correctness of values
by_host = self.send_host.text()
if by_host in [self.send_host.parent, '']:
show_msg (_('Enter Hostname or IP adress'), _('Field "Host" Error!'))
return 1
port = self.send_port.text()
if port == '' or not port.isdigit():
show_msg (_('Enter Port'), _('Field "Port" Error!'))
return 1
# send request
cert_path = self.default_cert_path
if os.path.exists(cert_path + 'req_id'):
text = "You have sent a request to sign the certificate!"
informative_text = "request id = %s\n" %open(cert_path + 'req_id', 'r').read() \
+ "Send new request?"
reply = show_question(self, text, informative_text)
if reply == QtGui.QMessageBox.No:
return 0
elif reply == QtGui.QMessageBox.Yes:
pass
url = "https://%s:%d/?wsdl" %(by_host, int(port))
print 'URL = ', url
from suds.client import Client
try:
self.client = Client(url, \
transport = HTTPSClientCertTransport(None, None, \
cert_path, parent = self.ClientObj))
except (KeyboardInterrupt, urllib2.URLError), e:
show_msg(_("Error code: %s") %e, _("Close. Connecting Error."))
return 1
server_host_name = self.client.service.get_server_host_name()
print "=====>>>>>>>>", server_host_name
key = cert_path + server_host_name + '.key'
self.csr_file = cert_path + server_host_name +'.csr'
if os.path.exists(key) and os.path.exists(self.csr_file):
text = 'Private Key and Request exists!'
informative_text = "Create new Private Key and Request?"
reply = show_question(self, text, informative_text)
if reply == QtGui.QMessageBox.Yes:
self.req_obj = RequestCreate(self, self.ClientObj, key, cert_path, server_host_name)
self.req_obj.show()
elif reply == QtGui.QMessageBox.No:
pass
else:
self.req_obj = RequestCreate(self, self.ClientObj, key, cert_path, server_host_name)
self.req_obj.setAttribute(QtCore.Qt.WA_ShowModal)
self.req_obj.show()
def end_send(self):
ip, mac, client_type = get_ip_mac_type('eth0', 'gui')
data = open(self.csr_file).read()
res = self.client.service.post_client_request(request = data, ip = ip,\
mac = mac, client_type = client_type)
del (self.client)
if int(res) < 0:
show_msg (_("This server can not sign certificate!"))
return 1
fc = open(self.default_cert_path + 'req_id', 'w')
fc.write(res)
fc.close()
show_msg ( _("Your request id = %s") %res)
return 0
def get(self):
cert_path = self.default_cert_path
if not os.path.exists(cert_path + 'req_id'):
show_msg (("request was not sent or deleted file %s") \
%(cert_path + 'req_id'))
return 1
fc = open(cert_path + 'req_id', 'r')
req_id = fc.read()
fc.close()
from_host = self.get_host.text()
if from_host in [self.get_host.parent, '']:
show_msg (_('Enter Hostname or IP adress'), _('Field "Host" Error!'))
return 1
port = self.get_port.text()
if port == '' or not port.isdigit():
show_msg (_('Enter Port'), _('Field "Port" Error!'))
return 1
url = "https://%s:%s/?wsdl" %(from_host, port)
from suds.client import Client
try:
client = Client(url, \
transport = HTTPSClientCertTransport(None, None, \
cert_path, parent = self.ClientObj))
except (KeyboardInterrupt, urllib2.URLError), e:
show_msg(_("Error code: %s") %e, _("Close. Connecting Error."))
return 1
server_host_name = client.service.get_server_host_name()
if not os.path.exists(cert_path + server_host_name + '.csr'):
show_msg('Request %s not found on client side' %(cert_path + server_host_name + '.csr'))
return 1
request = open(cert_path + server_host_name + '.csr').read()
md5 = hashlib.md5()
md5.update(request)
md5sum = md5.hexdigest()
result = client.service.get_client_cert(req_id, md5sum)
cert = result[0][0]
try:
ca_root = result[0][1]
except Exception, e:
ca_root = None
show_msg(e.message)
if cert == '1':
show_msg ('Request to sign is rejected!')
return 1
elif cert == '2':
show_msg ("Request for the signing has not yet reviewed.\n" + \
"Your request id = %s" %req_id)
return 1
elif cert == '3':
show_msg ("Request on signature does not match sent earlier.")
return 1
# elif cert == '4':
# print _("Request was sent from another ip.")
# return 1
fc = open(cert_path + server_host_name + '.crt', 'w')
fc.write(cert)
fc.close()
os.unlink(cert_path + 'req_id')
show_msg ('OK. Certificate save. Your certificate id = %s' %req_id)
if ca_root:
clVars = DataVarsApi()
clVars.importApi()
clVars.flIniFile()
system_ca_db = clVars.Get('cl_glob_root_cert')
if os.path.exists(system_ca_db):
if ca_root in open(system_ca_db, 'r').read():
return 0
cl_client_cert_dir = clVars.Get('cl_client_cert_dir')
homePath = clVars.Get('ur_home_path')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
root_cert_md5 = cl_client_cert_dir + "/ca/cert_list"
md5 = hashlib.md5()
md5.update(ca_root)
md5sum = md5.hexdigest()
print "\n================================================="
print "md5sum = ", md5sum
if not os.path.exists(root_cert_md5):
fc = open(root_cert_md5,"w")
fc.close()
filename = None
with open(root_cert_md5) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split(' ',1)
if words[0] == md5sum:
filename = words[1]
if not filename:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, ca_root)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == 'CN':
filename = item[1]
fc = open(root_cert_md5,"a")
fc.write('%s %s\n' %(md5sum, filename))
fc.close()
if not filename:
show_msg ('Not found field "CN" in root certificate!')
return 1
fd = open(cl_client_cert_dir + '/ca/' + filename, 'w')
fd.write(ca_root)
fd.close()
user_root_cert = clVars.Get('cl_user_root_cert')
user_root_cert = user_root_cert.replace("~",homePath)
fa = open(user_root_cert, 'a')
fa.write(ca_root)
fa.close()
print _("filename = "), filename
show_msg ("Root Certificate Add\n" + filename)
else:
print _("file with ca certificates exists")
return 0