You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-console-gui/pym/consolegui/application/CertificateClass.py

355 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#-*- coding: utf-8 -*-
# Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from calculate.consolegui.qtwrapper import QtGuiWidgets, QtCore
import urllib2
import OpenSSL, hashlib
from more import show_msg, LabelWordWrap, show_question, get_ip_mac
from create_cert import RequestCreate
from client_class import HTTPSClientCertTransport, Client_suds
import os
class CertClass (QtGuiWidgets.QWidget):
def __init__(self, parent, ClientObj, window):
self.ClientObj = ClientObj
QtGuiWidgets.QWidget.__init__(self)
# get_cert_path
self.default_cert_path = ClientObj.path_to_cert
# self.default_cert_path = self.default_cert_path.replace("~",homePath)
self.sendlayout = QtGuiWidgets.QGridLayout()
self.sendlayout.setContentsMargins(8,8,8,8)
self.mainlayout = QtGuiWidgets.QVBoxLayout()
# gen ceth by host
self.sendlayout.addWidget(LabelWordWrap(_('Host'), self), 0,0)
self.send_host = QtGuiWidgets.QLineEdit('localhost', self)
self.sendlayout.addWidget(self.send_host, 0, 1, 1, 2)
self.sendlayout.addWidget(LabelWordWrap(_('Port'), self), 1,0)
cl_core_port = self.ClientObj.VarsApi.Get('core.cl_core_port')
self.send_port = QtGuiWidgets.QLineEdit(cl_core_port, self)
self.send_port.setValidator(QtGuiWidgets.QIntValidator(self))
self.sendlayout.addWidget(self.send_port, 1, 1, 1, 2)
layout_button = QtGuiWidgets.QHBoxLayout()
Send_button = QtGuiWidgets.QPushButton(_("Send a request"), self)
# Send_button.setFixedWidth(140)
Send_button.clicked.connect(self.send)
layout_button.addWidget(Send_button)
Get_button = QtGuiWidgets.QPushButton(_("Get the certificate"), self)
# Get_button.setFixedWidth(140)
Get_button.clicked.connect(self.get)
layout_button.addWidget(Get_button)
Quit_button = QtGuiWidgets.QPushButton(_("Quit"), self)
# Quit_button.setFixedWidth(60)
Quit_button.setShortcut(QtGuiWidgets.QKeySequence(QtCore.Qt.Key_Return))
Quit_button.clicked.connect(self.close)
layout_button.addWidget(Quit_button)
self.sendlayout.addLayout(layout_button, 2, 0, 1, 3)
self.sendlayout.setColumnStretch(0,0)
self.sendlayout.setColumnStretch(1,1)
self.sendlayout.setColumnStretch(2,1)
self.GroupBoxSend = QtGuiWidgets.QGroupBox(_("Certificate signature request"))
# self.GroupBoxSend.setContentsMargins(10,0,0,0)
self.GroupBoxSend.setLayout(self.sendlayout)
self.mainlayout.addWidget(self.GroupBoxSend)
self.setLayout(self.mainlayout)
self.setFocus()
# for clear memory after closed this window
self.setAttribute(QtCore.Qt.WA_DeleteOnClose)
self.setFixedSize(self.sizeHint().width(), self.sizeHint().height())
self.move(window.geometry().x() + window.geometry().width() / 2 \
- self.size().width() / 2, \
window.geometry().y() + window.geometry().height() / 2 \
- self.size().height() / 2 - 50)
self.setWindowTitle (_('Certificates'))
self.setWindowIcon (QtGuiWidgets.QIcon.fromTheme("view-certificate"))
def send(self):
### client_post_request analog
# validation correctness of values
by_host = self.send_host.text()
if by_host == '':
show_msg (_('Enter the hostname or the IP address'), \
_('Incorrect "Host" value!'))
return 1
port = self.send_port.text()
if port == '' or not port.isdigit():
show_msg (_('Enter the port number'), _('Incorrect "Port" value!'))
return 1
# send request
cert_path = self.default_cert_path
if os.path.exists(cert_path + 'req_id'):
text=_("You have already sent a signature request!")
informative_text = _("Request ID: %s") \
%open(cert_path + 'req_id', 'r').read()+'\n' \
+ _("Send a new request?")
reply = show_question(self, text, informative_text,
title = _('Calculate Console'))
if reply == QtGuiWidgets.QMessageBox.No:
return 0
elif reply == QtGuiWidgets.QMessageBox.Yes:
pass
url = "https://%s:%d/?wsdl" %(by_host, int(port))
_print ('URL = ', url)
try:
self.client = Client_suds(url, \
transport = HTTPSClientCertTransport(None, None, \
cert_path, parent = self.ClientObj))
except (KeyboardInterrupt, urllib2.URLError), e:
try:
show_msg(e.reason.strerror.decode('utf-8') \
, _("Closing. Connection error."), self)
except (UnicodeDecodeError, UnicodeEncodeError):
show_msg(e.reason.strerror, \
_("Closing. Connection error."), self)
return 1
except (UnicodeDecodeError, UnicodeEncodeError):
show_msg (_('Enter the hostname or the IP address'),
_('Input error'), self)
return 1
self.client.wsdl.services[0].setlocation(url)
try:
server_host_name = self.client.service.get_server_host_name()
except AttributeError:
return 1
except urllib2.URLError, e:
_print ('method send in CertificateClass Exception')
show_msg (e, _("Not connected!"), self)
return 1
key = cert_path + server_host_name + '.key'
self.csr_file = cert_path + server_host_name +'.csr'
if os.path.exists(key) and os.path.exists(self.csr_file):
text = _('The private key and the request now exist!')
informative_text = \
_("Create a new private key and signature request?")
reply = show_question(self, text, informative_text,
title = _('Calculate Console'))
if reply == QtGuiWidgets.QMessageBox.Yes:
self.req_obj = RequestCreate(self, self.ClientObj, key, \
cert_path, server_host_name)
self.req_obj.show()
elif reply == QtGuiWidgets.QMessageBox.No:
pass
else:
self.req_obj = RequestCreate(self, self.ClientObj, key, \
cert_path, server_host_name)
self.req_obj.setAttribute(QtCore.Qt.WA_ShowModal)
self.req_obj.show()
def end_send(self):
ip, mac = get_ip_mac()
data = open(self.csr_file).read()
try:
res = self.client.service.post_client_request(request = data, \
ip = ip, mac = mac, client_type = 'gui')
except urllib2.URLError, e:
_print ('method end_send in CertificateClass Exception')
show_msg (e, _("Not connected!"), self)
return 1
del (self.client)
if int(res) < 0:
show_msg (_("The server has not signed the certificate!"),
parent=self)
return 1
fc = open(self.default_cert_path + 'req_id', 'w')
fc.write(res)
fc.close()
show_msg (_("Your request ID: %s") %res + '\n' + \
_("To submit the certificate request on the server use command") + \
'\n'+'cl-core --sign-client %s' %res,
title = _('Request'), parent=self)
return 0
def get(self):
cert_path = self.default_cert_path
if not os.path.exists(cert_path + 'req_id'):
show_msg (_("Request not sent, or file %s deleted") \
%(cert_path + 'req_id'),
_("Certificates"), self)
return 1
fc = open(cert_path + 'req_id', 'r')
req_id = fc.read()
fc.close()
from_host = self.send_host.text()
if from_host == '':
show_msg (_('Enter the hostname or the IP address'), \
_('Incorrect "Host" value!'), self)
return 1
port = self.send_port.text()
if port == '' or not port.isdigit():
show_msg (_('Enter the port number'), _('Incorrect "Port" value!'),
self)
return 1
url = "https://%s:%s/?wsdl" %(from_host, port)
try:
client = Client_suds(url, \
transport = HTTPSClientCertTransport(None, None, \
cert_path, parent = self.ClientObj))
except (KeyboardInterrupt, urllib2.URLError), e:
show_msg(_("Error code: %s") %e, _("Closing. Connection error."),\
self)
return 1
except (UnicodeDecodeError, UnicodeEncodeError):
show_msg (_('Enter the hostname or the IP address'), \
_('Input error'), self)
return 1
client.wsdl.services[0].setlocation(url)
try:
server_host_name = client.service.get_server_host_name()
except urllib2.URLError, e:
_print ('method get in CertificateClass Exception')
show_msg (e, _("Not connected!"), self)
return 1
if not os.path.exists(cert_path + server_host_name + '.csr'):
show_msg(_('Request %s not found on the clients side') \
%(cert_path + server_host_name + '.csr'))
return 1
request = open(cert_path + server_host_name + '.csr').read()
md5 = hashlib.md5()
md5.update(request)
md5sum = md5.hexdigest()
try:
result = client.service.get_client_cert(req_id, md5sum)
except urllib2.URLError, e:
_print ('get_client_cert in CertificateClass Exception')
show_msg (e, _("Not connected!"))
return 1
cert = result[0][0]
try:
ca_root = result[0][1]
except Exception, e:
ca_root = None
# show_msg(e.message)
if cert == '1':
show_msg (_('Signature request rejected!'))
return 1
elif cert == '2':
show_msg (_("The request has not been reviewed yet.") + '\n' + \
_("Your request ID: %s") %req_id + '\n'+\
_("To submit the certificate request on the server use command")+\
'\n'+'cl-core --sign-client %s'%req_id,
title = _('Request'), parent = self)
return 1
elif cert == '3':
show_msg (_('Either the request or the signature does not match '
'the earlier ones.'))
return 1
fc = open(cert_path + server_host_name + '.crt', 'w')
fc.write(cert)
fc.close()
os.unlink(cert_path + 'req_id')
show_msg (_('Certificate saved. Your certificate ID = %s') %req_id)
if ca_root:
system_ca_db = self.ClientObj.VarsApi.Get('core.cl_glob_root_cert')
if os.path.exists(system_ca_db):
if ca_root in open(system_ca_db, 'r').read():
return 0
cl_client_cert_dir = self.ClientObj.VarsApi.Get \
('core.cl_client_cert_dir')
homePath = self.ClientObj.VarsApi.Get('ur_home_path')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
if not os.path.isdir(cl_client_cert_dir):
os.mkdir(cl_client_cert_dir)
if not os.path.isdir(cl_client_cert_dir+'/ca'):
os.mkdir(cl_client_cert_dir+'/ca')
root_cert_md5 = cl_client_cert_dir + "/ca/cert_list"
md5 = hashlib.md5()
md5.update(ca_root)
md5sum = md5.hexdigest()
_print ("md5sum = ", md5sum)
if not os.path.isfile(root_cert_md5):
fc = open(root_cert_md5,"w")
fc.close()
filename = None
with open(root_cert_md5) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split(' ',1)
if words[0] == md5sum:
filename = words[1]
if not filename:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, ca_root)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == 'CN':
filename = item[1]
fc = open(root_cert_md5,"a")
fc.write('%s %s\n' %(md5sum, filename))
fc.close()
if not filename:
show_msg \
(_('Field "CN" not found in the root certificate!'))
return 1
fd = open(cl_client_cert_dir + '/ca/' + filename, 'w')
fd.write(ca_root)
fd.close()
user_root_cert = \
self.ClientObj.VarsApi.Get('core.cl_user_root_cert')
user_root_cert = user_root_cert.replace("~",homePath)
fa = open(user_root_cert, 'a')
fa.write(ca_root)
fa.close()
_print (_("Filename = "), filename)
show_msg (_("Root certificate added") + '\n' + filename)
else:
_print (_("File with the CA certificate now exists"))
return 0