You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-console-gui/pym/consolegui/application/conf_connection.py

425 lines
17 KiB

#-*- coding: utf-8 -*-
# Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from calculate.consolegui import qt
import calculate.contrib
from suds import WebFault
import logging, OpenSSL
import os
import calculate.lib.configparser as ConfigParser
import urllib.request as urllib2
import getpass
from calculate.core.client.function import clear as clear_suds_cache
from .session_function import client_post_cert
from calculate.core.client.cert_verify import VerifyError
from .client_class import Client_suds, HTTPSClientCertTransport
from .more import post_connect_action, show_msg, uniq, LabelWordWrap, get_icon, dpivalue
class LocalhostPasswd(qt.QDialog):
def __init__(self, parent):
qt.QDialog.__init__(self)
grid = qt.QGridLayout(self)
grid.setContentsMargins(10, 10, 10, 10)
grid.setSpacing(4)
self.lbl_passwd = qt.QLabel(_("Password"), self)
self.text_passwd = qt.QLineEdit('', self)
self.text_passwd.setEchoMode(self.text_passwd.Password)
grid.addWidget(self.lbl_passwd, 0, 0)
grid.addWidget(self.text_passwd, 0, 1)
layout_button = qt.QHBoxLayout()
self.cmd_ok = qt.QPushButton(_('Ok'), self)
self.cmd_ok.setDefault(True)
self.cmd_ok.setAutoDefault(True)
self.cmd_ok.clicked.connect(self.set_disabled)
self.cmd_ok.clicked.connect(self.onClick)
self.cmd_ok.setFixedWidth(100)
self.cmd_cancel = qt.QPushButton(_('Cancel'), self)
self.cmd_cancel.setShortcut(qt.QKeySequence(qt.Qt.Key_Escape))
self.cmd_cancel.clicked.connect(self.close)
self.cmd_cancel.setFixedWidth(100)
layout_button.addWidget(self.cmd_ok)
layout_button.addWidget(self.cmd_cancel)
grid.addLayout(layout_button, 1,0,1,2, qt.Qt.AlignRight)
self.setLayout(grid)
self.setFixedSize(self.sizeHint())
self._text = None
self.exec_()
def onClick(self):
self._text = self.text_passwd.text()
self.close()
def set_disabled(self):
self.cmd_ok.setDisabled(True)
def text(self):
return self._text
class FrameConnection(qt.QWidget):
def __init__(self, parent, ClientObj):
qt.QWidget.__init__(self)
self.ClientObj = ClientObj
def initUI(self, parent, window):
grid = qt.QGridLayout(self)
grid.setContentsMargins(10, 10, 10, 10)
grid.setSpacing(2)
self.lbl_host = LabelWordWrap(_("Host"), self)
self.lbl_port = LabelWordWrap(_("Port"), self)
self.lbl_passwd = LabelWordWrap(_("Password"), self)
self.text_host = qt.QLineEdit(self.ClientObj.default_host, self)
self.text_port = qt.QLineEdit(self.ClientObj.default_port, self)
self.text_passwd = qt.QLineEdit('', self)
self.text_passwd.setEchoMode(self.text_passwd.Password)
######################
# add completer in 'host name' QLineEdit
def add_wordlist(self):
def wrapper():
self.wordList.append (self.text_host.text())
self.wordList = uniq(self.wordList)
del (self.completer)
self.completer = qt.QCompleter(self.wordList, self)
self.completer.setCaseSensitivity(qt.Qt.CaseInsensitive)
self.completer.setMaxVisibleItems(7)
self.text_host.setCompleter(self.completer)
return wrapper
config = ConfigParser.ConfigParser()
config.read(self.ClientObj.user_config)
try:
wordList = config.get('other', 'connect_list')
self.wordList = wordList.split(',')
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
self.wordList = []
self.completer = qt.QCompleter(self.wordList, self)
self.completer.setCaseSensitivity(qt.Qt.CaseInsensitive)
self.completer.setMaxVisibleItems(7)
self.text_host.setCompleter(self.completer)
self.text_host.editingFinished.connect(add_wordlist(self))
#####################
grid.addWidget(self.lbl_host, 0, 0)
grid.addWidget(self.lbl_port, 1, 0)
grid.addWidget(self.text_host, 0, 1)
grid.addWidget(self.text_port, 1, 1)
grid.addWidget(self.lbl_passwd, 2, 0)
grid.addWidget(self.text_passwd, 2, 1)
self.cmd_connect = qt.QPushButton(_('Connect'), self)
self.cmd_connect.setIcon(get_icon("network-connect"))
self.cmd_connect.setDefault(True)
self.cmd_connect.setAutoDefault(True)
# self.cmd_connect.setMaximumWidth(120)
self.cmd_connect.setShortcut(qt.QKeySequence(qt.Qt.Key_Return))
self.cmd_cancel = qt.QPushButton(_('Cancel'), self)
# self.cmd_cancel.setMaximumWidth(120)
self.cmd_cancel.setShortcut(qt.QKeySequence(qt.Qt.Key_Escape))
self.cmd_cancel.clicked.connect(self.close)
layout_button = qt.QHBoxLayout()
layout_button.addWidget(self.cmd_connect)
layout_button.addWidget(self.cmd_cancel)
grid.addLayout(layout_button, 3, 0, 1, 2)
grid.setColumnStretch(0,1)
grid.setColumnStretch(1,5)
self.cmd_connect.clicked.connect (self.set_disabled)
self.cmd_connect.clicked.connect (self.onClick)
# for clear memory after closed this window
self.setAttribute(qt.Qt.WA_DeleteOnClose)
self.setLayout(grid)
self.setFixedSize(dpivalue(300), dpivalue(160))
self.move(window.geometry().x() + window.geometry().width() / 2 \
- self.sizeHint().width() / 2, \
window.geometry().y() + window.geometry().height() / 2 \
- self.sizeHint().height() / 2)
self.setWindowTitle(_('Connection'))
self.setWindowIcon(get_icon("network-connect"))
def set_disabled(self):
self.setDisabled(True)
self.ClientObj.app.processEvents()
# establish a connection
def onClick(self):
# print "DEBUG: onClick"
self.setDisabled(True)
if self.ClientObj.client:
show_msg(_('You are connected to the server!')+'\n' + \
_("Please disconnect") +' '+ _('previous connection!'), \
_('Connection Error'))
return 1
self.ClientObj.app.processEvents()
self.setDisabled(False)
# write parameters
self.str_host = self.text_host.text()
self.str_port = self.text_port.text()
self.str_passwd = self.text_passwd.text()
# add host_name in list ########## start
while len(self.wordList) > 6:
self.wordList.remove(self.wordList[6])
if not self.str_host in self.wordList:
self.wordList.insert(0, self.str_host)
if not os.path.isfile (self.ClientObj.user_config):
f = open (self.ClientObj.user_config, 'w')
f.close()
fc = open (self.ClientObj.user_config, 'r')
config = fc.readlines()
fc.close()
new_config = []
host_list_flag = False
for line in config:
if line.startswith('connect_list '):
host_list_flag = True
new_config.append('connect_list = %s\n' \
%','.join(self.wordList))
else:
new_config.append(line)
if not host_list_flag:
part_flag = False
temp_cfg = []
for line in new_config:
temp_cfg.append(line)
#add new line in config
if line.startswith('[other]'):
temp_cfg.append('%s = %s\n' \
%('connect_list', ','.join(self.wordList)))
part_flag = True
new_config = temp_cfg
# if part not exists
if not part_flag:
new_config.append('\n')
new_config.append('%s\n' %'[other]')
new_config.append('%s = %s\n' \
%('connect_list', ','.join(self.wordList)))
fnc = open(self.ClientObj.user_config, 'w')
for line in new_config:
fnc.write(line)
fnc.close()
# add host_name in list ########## end
try:
int_port = int(self.str_port)
except:
show_msg(_("Enter the port correctly!"))
return 1
self.connect_to_host(self.str_host, int_port)
def connect_to_host(self, host_name, port = None, auto = False,
re_passwd = False):
logging.basicConfig(level=logging.FATAL)
logging.getLogger('suds.client').setLevel(logging.FATAL)
logging.getLogger('suds.transport').setLevel(logging.FATAL)
logging.getLogger('suds.transport.http').setLevel(logging.FATAL)
logging.getLogger('suds.umx.typed').setLevel(logging.ERROR)
clear_suds_cache()
# not twice connect to localhost
if host_name == 'localhost':
host_name = '127.0.0.1'
# raise Exception
if not port:
port = self.ClientObj.VarsApi.GetInteger('core.cl_core_port')
if hasattr (self.ClientObj._parent, 'find_host'):
if self.ClientObj._parent.find_host(host_name, port):
self.close()
return
# get server hostname
url = "https://%s:%d/?wsdl" %(host_name, port)
self.ClientObj.port = port
_print (url)
try:
from calculate.lib.utils.dbus_tools import run_dbus_core
run_dbus_core(host_name, port)
except ImportError:
pass
path_to_cert = self.ClientObj.path_to_cert
try:
client = Client_suds(url, transport =
HTTPSClientCertTransport(None, None, path_to_cert,
parent = self.ClientObj, timeout = 10))
except AttributeError as e:
show_msg (_('This server is not trusted'), _('Not connected!'))
return 1
except Exception as e:
show_msg (e, _("Not connected!"))
return 1
client.wsdl.services[0].setlocation(url)
try:
server_host_name = client.service.get_server_host_name()
except urllib2.URLError as e:
show_msg (e, _("Not connected!"))
return 1
except Exception as e:
_print ('connect_to_host Exception: ', e)
return 1
del (client)
if not hasattr (self, 'str_passwd'):
self.str_passwd = None
else:
passwd = None
try:
import glob
all_cert_list = glob.glob(path_to_cert + '*.crt')
fit_cert_list = []
for client_cert_path in all_cert_list:
client_cert = client_cert_path.replace(path_to_cert, '')
client_cert_name = client_cert.replace('.crt', '')
if server_host_name.endswith(client_cert_name):
fit_cert_list.append(client_cert_name)
fit_cert_list.sort(key = len)
Connect_Error = 1
crypto_Error = 0
for i in range (0, len(fit_cert_list)):
cert_name = fit_cert_list.pop()
CERT_FILE = path_to_cert + cert_name + '.crt'
CERT_KEY = path_to_cert + cert_name + '.key'
# check passwd private_key
import M2Crypto
bio = M2Crypto.BIO.openfile(CERT_KEY)
rsa = M2Crypto.m2.rsa_read_key(bio._ptr(), lambda *unused: b"")
if re_passwd != False:
self.str_passwd = re_passwd, "UTF-8"
if not rsa and not self.str_passwd and auto:
self.passwd_wgt = LocalhostPasswd(self)
passwd = self.passwd_wgt.text()
self.str_passwd = passwd if passwd else None
try:
# if True:
self.ClientObj.client = Client_suds(url,\
transport = HTTPSClientCertTransport(CERT_KEY, \
CERT_FILE, path_to_cert, parent = self.ClientObj, \
timeout = 5, password = bytes(self.str_passwd, "UTF-8")))
self.ClientObj.client.wsdl.services[0].setlocation(url)
self.ClientObj.client.set_parameters \
(path_to_cert, CERT_FILE, CERT_KEY)
self.ClientObj.client.server_host_name = host_name
client_post_cert(self.ClientObj.client, \
self.ClientObj.lang)
Connect_Error = 0
except VerifyError as e:
show_msg (e.value,_("Server certificate verification error"),self)
e.message = 'VerifyError'
Connect_Error = 1
except OpenSSL.crypto.Error as e:
print("OPENSSL ERROR")
Connect_Error = 1
crypto_Error = 1
# except Exception as e:
# print("GENERAL ERROR")
# print(e)
# Connect_Error = 1
if Connect_Error == 0:
break
#If the certificate file misses
if Connect_Error:
self.ClientObj.client = None
if crypto_Error and 'passwd' in locals():
show_msg (_('Invalid password'), \
_('Connection Error'), self)
return
# self.close()
if 'e' not in locals():
mess = _('You do not have a certificate. Please generate '
'a new request and get the new certificate from '
'the server.')
if host_name in ("127.0.0.1", "localhost"):
mess = ("%s %s:\ncl-core -u %s")%(mess,
_("Also you can generate administrator certificate by root command"),
getpass.getuser())
show_msg (mess, parent = self)
elif e.message == 'VerifyError':
pass
elif e.message == 1:
mess = _('You do not have a certificate or your '
'certificate does not match the server certificate.'
' Please generate a new request and get the new '
'certificate from the server.')
if host_name in ("127.0.0.1", "localhost"):
mess = ("%s %s:\ncl-core -u %s")%(mess,
_("Also you can generate administrator certificate by root command"),
getpass.getuser())
show_msg (mess, parent = self)
else:
if e.message or e.args:
show_msg (e, parent = self)
return
CERT_FILE = None
CERT_KEY = None
self.ClientObj.client = Client_suds(url,\
transport = HTTPSClientCertTransport(CERT_KEY, CERT_FILE,\
path_to_cert, parent = self.ClientObj, timeout = 5))
self.ClientObj.client.wsdl.services[0].setlocation(url)
self.ClientObj.client.set_parameters (path_to_cert, CERT_FILE,\
CERT_KEY)
try:
self.ClientObj.client.frame_period = \
int(self.ClientObj.VarsApi.Get \
('core.cl_core_get_frame_period'))
except:
self.ClientObj.client.frame_period = 2
self.ClientObj.host_name = host_name
if CERT_FILE:
if hasattr (self.ClientObj._parent, 'rename_tab'):
self.ClientObj._parent.rename_tab(host_name)
if host_name in ['127.0.0.1', 'localhost']:
self.ClientObj._parent.set_localhost(self.ClientObj)
self.ClientObj.client.server_host_name = host_name
self.ClientObj.client.password = self.str_passwd
# self.ClientObj._parent.connect_count_changed(1)
post_connect_action(self.ClientObj.client, self.ClientObj)
#----------------------------------------------------
except WebFault as f:
show_msg ("Exception: %s" %f)
_print (f.fault)
self.close()