Рефакторинг: работа с файлами.

develop 3.6.2.1
parent 21e104b364
commit 9e390c4aaa

@ -21,6 +21,7 @@ import OpenSSL, hashlib
from more import show_msg, LabelWordWrap, show_question, get_ip_mac, get_icon
from create_cert import RequestCreate
from client_class import HTTPSClientCertTransport, Client_suds
from calculate.lib.utils.files import readFile
import os
class CertClass (qt.QWidget):
@ -108,10 +109,11 @@ class CertClass (qt.QWidget):
# send request
cert_path = self.default_cert_path
if os.path.exists(cert_path + 'req_id'):
cert_path_req_id = cert_path + 'req_id'
if os.path.exists(cert_path_req_id):
text=_("You have already sent a signature request!")
informative_text = _("Request ID: %s") \
%open(cert_path + 'req_id', 'r').read()+'\n' \
%readFile(cert_path_req_id)+'\n' \
+ _("Send a new request?")
reply = show_question(self, text, informative_text,
title = _('Calculate Console'))
@ -175,7 +177,7 @@ class CertClass (qt.QWidget):
def end_send(self):
ip, mac = get_ip_mac()
data = open(self.csr_file).read()
data = readFile(self.csr_file)
try:
res = self.client.service.post_client_request(request = data, \
ip = ip, mac = mac, client_type = 'gui')
@ -188,9 +190,8 @@ class CertClass (qt.QWidget):
show_msg (_("The server has not signed the certificate!"),
parent=self)
return 1
fc = open(self.default_cert_path + 'req_id', 'w')
fc.write(res)
fc.close()
with open(self.default_cert_path + 'req_id', 'w') as fc:
fc.write(res)
show_msg (_("Your request ID: %s") %res + '\n' + \
_("To submit the certificate request on the server use command") + \
'\n'+'cl-core --sign-client %s' %res,
@ -204,9 +205,8 @@ class CertClass (qt.QWidget):
%(cert_path + 'req_id'),
_("Certificates"), self)
return 1
fc = open(cert_path + 'req_id', 'r')
req_id = fc.read()
fc.close()
with open(cert_path + 'req_id', 'r') as fc:
req_id = fc.read()
from_host = self.send_host.text()
if from_host == '':
@ -247,7 +247,7 @@ class CertClass (qt.QWidget):
%(cert_path + server_host_name + '.csr'))
return 1
request = open(cert_path + server_host_name + '.csr').read()
request = readFile(cert_path + server_host_name + '.csr')
md5 = hashlib.md5()
md5.update(request)
md5sum = md5.hexdigest()
@ -288,7 +288,7 @@ class CertClass (qt.QWidget):
if ca_root:
system_ca_db = self.ClientObj.VarsApi.Get('core.cl_glob_root_cert')
if os.path.exists(system_ca_db):
if ca_root in open(system_ca_db, 'r').read():
if ca_root in readFile(system_ca_db):
return 0
cl_client_cert_dir = self.ClientObj.VarsApi.Get \

@ -137,7 +137,8 @@ class SelectedMethodWgt(qt.QWidget):
selfsize = self.size()
self.config.set('gui', 'size',
"{},{}".format(selfsize.width(), selfsize.height()))
self.config.write(open(self.user_config, 'w'))
with open(self.user_config, 'w') as f:
self.config.write(f)
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
return False
@ -569,7 +570,8 @@ class ToolTabWidget(qt.QTabWidget):
selfsize = self.size()
self.config.set('gui', 'size',
"{},{}".format(selfsize.width(), selfsize.height()))
self.config.write(open(self.user_config, 'w'))
with open(self.user_config, 'w') as f:
self.config.write(f)
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
return False

@ -33,6 +33,7 @@ import OpenSSL, hashlib
from sudsds.client import Client
from logging import getLogger
from calculate.core.client.cert_verify import verify, VerifyError
from calculate.lib.utils.files import readFile
from more import show_msg, show_question, LabelWordWrap
@ -141,17 +142,15 @@ class AddServerCert (qt.QDialog):
ca_certs = self.parent.trusted_path + "cert.list"
if not os.path.exists(ca_certs):
fc = open(ca_certs,"w")
fc.close()
open(ca_certs,"w").close()
if self.parent.host == '127.0.0.1':
host = 'localhost'
else:
host = self.parent.host
filename = host
fc = open(self.parent.trusted_path + filename,"w")
fc.write(self.cert)
fc.close()
with open(self.parent.trusted_path + filename,"w") as fc:
fc.write(self.cert)
with open(ca_certs) as fd:
t = fd.read()
# for each line
@ -165,9 +164,8 @@ class AddServerCert (qt.QDialog):
return 0
# Open file with compliance server certificates and server hostname
fcl = open(ca_certs,"a")
fcl.write(host + ' ' + filename + '\n')
fcl.close()
with open(ca_certs,"a") as fcl:
fcl.write(host + ' ' + filename + '\n')
show_msg (_('Server certificate added to trusted') +'\n'+ \
(self.parent.trusted_path + filename),_('Certificate added'))
self.flag = 3
@ -282,8 +280,7 @@ class CheckingClientHTTPSConnection(CheckingHTTPSConnection):
except OSError:
pass
if not os.path.exists(ca_certs):
fc = open(ca_certs,"w")
fc.close()
open(ca_certs,"w").close()
filename = None
try:
with open(ca_certs) as fd:
@ -304,9 +301,7 @@ class CheckingClientHTTPSConnection(CheckingHTTPSConnection):
# self.parent.MainWidget.bottom.addMessage(msg)
return None
try:
fd = open(self.trusted_path + filename, 'r')
store_cert = fd.read()
fd.close()
store_cert = readFile(self.trusted_path + filename)
if store_cert == server_cert:
return filename
except:
@ -329,14 +324,14 @@ class CheckingClientHTTPSConnection(CheckingHTTPSConnection):
system_ca_db = \
self.ClientObj.VarsApi.Get('core.cl_glob_root_cert')
if os.path.exists(system_ca_db):
if cert in open(system_ca_db, 'r').read():
if cert in readFile(system_ca_db):
continue
user_root_cert = \
self.ClientObj.VarsApi.Get('core.cl_user_root_cert')
user_root_cert = user_root_cert.replace("~",homePath)
if os.path.exists(user_root_cert):
if cert in open(user_root_cert, 'r').read():
if cert in readFile(user_root_cert):
continue
md5 = hashlib.md5()
@ -344,8 +339,7 @@ class CheckingClientHTTPSConnection(CheckingHTTPSConnection):
md5sum = md5.hexdigest()
if not os.path.exists(root_cert_md5):
fc = open(root_cert_md5,"w")
fc.close()
open(root_cert_md5,"w").close()
filename = None
with open(root_cert_md5) as fd:
@ -364,21 +358,18 @@ class CheckingClientHTTPSConnection(CheckingHTTPSConnection):
if item[0] == 'CN':
filename = item[1]
fc = open(root_cert_md5,"a")
fc.write('%s %s\n' %(md5sum, filename))
fc.close()
with open(root_cert_md5,"a") as fc:
fc.write('%s %s\n' %(md5sum, filename))
if not filename:
show_msg (_('Field "CN" not found in the certificate!'))
return 1
fd = open(cl_client_cert_dir + '/ca/' + filename, 'w')
fd.write(cert)
fd.close()
with open(cl_client_cert_dir + '/ca/' + filename, 'w') as fd:
fd.write(cert)
fa = open(user_root_cert, 'a')
fa.write(cert+'\n')
fa.close()
with open(user_root_cert, 'a') as fa:
fa.write(cert+'\n')
show_msg (_("Filename = %s") %filename, _('Certificate added'))
else:
show_msg (_('A file with the CA certificate now exists'))
@ -657,10 +648,10 @@ def get_CRL(path_to_cert):
glob_root_cert = clVars.Get('core.cl_glob_root_cert')
if os.path.exists(user_root_cert):
user_ca_certs = open(user_root_cert, 'r').read()
user_ca_certs = readFile(user_root_cert)
else: user_ca_certs = ''
if os.path.exists(glob_root_cert):
glob_ca_certs = open(glob_root_cert, 'r').read()
glob_ca_certs = readFile(glob_root_cert)
else: glob_ca_certs = ''
# get certificates list fron text
@ -707,17 +698,16 @@ def get_CRL(path_to_cert):
host = subj[1].split(':')[0]
CRL_file = CRL_path + host
if new_crl == ' ':
open(CRL_file, 'w')
open(CRL_file, 'w').close()
#if os.path.exists(CRL_file):
#os.unlink(CRL_file)
continue
if os.path.exists(CRL_file):
if open(CRL_file, 'r').read() == new_crl:
if readFile(CRL_file) == new_crl:
continue
fd = open(CRL_file, 'w')
fd.write(new_crl)
fd.close()
with open(CRL_file, 'w') as fd:
fd.write(new_crl)
_print (_("CRL added"))
find_ca_in_crl (CRL_path, all_ca_certs_list)
@ -783,7 +773,7 @@ def rm_ca_from_trusted(ca_cert):
words = line.split()
if words[0] == md5sum:
filename = os.path.join(user_ca_dir, words[1])
if ca_cert == open(filename, 'r').read():
if ca_cert == readFile(filename):
os.unlink(filename)
else:
newfile += (line + '\n')
@ -798,7 +788,7 @@ def rm_ca_from_trusted(ca_cert):
p = re.compile('[-]+[\w ]+[-]+\n+[\w\n\+\\=/]+[-]+[\w ]+[-]+\n?')
# open, write and split user ca certificates
user_ca_certs = open(user_ca_db, 'r').read()
user_ca_certs = readFile(user_ca_db)
user_ca_certs_list = p.findall(user_ca_certs)
if ca_cert in user_ca_certs_list:
@ -809,15 +799,14 @@ def rm_ca_from_trusted(ca_cert):
else:
_print (_("CA certificate removed from user trusted"))
fd = open(user_ca_db, 'w')
for cert in new_user_ca_certs:
fd.write(cert)
fd.close()
with open(user_ca_db, 'w') as fd:
for cert in new_user_ca_certs:
fd.write(cert)
if not os.path.exists(system_ca_db):
open(system_ca_db, 'w')
open(system_ca_db, 'w').close()
system_ca_certs = open(system_ca_db, 'r').read()
system_ca_certs = readFile(system_ca_db)
system_ca_certs_list = p.findall(system_ca_certs)
if ca_cert in system_ca_certs_list:
@ -828,8 +817,7 @@ def rm_ca_from_trusted(ca_cert):
else:
_print (_("CA certificate removed from system trusted"))
fd = open(system_ca_db, 'w')
for cert in new_system_ca_certs:
fd.write(cert)
fd.close()
with open(system_ca_db, 'w') as fd:
for cert in new_system_ca_certs:
fd.write(cert)
return 0

@ -22,7 +22,7 @@ import sys
from calculate.consolegui import qt
from calculate.core.client.function import create_obj
from calculate.lib.utils.files import readLinesFile, listDirectory
from calculate.lib.utils.files import readLinesFile, listDirectory, readFile
from PIL import Image
from calculate.lib.utils.colortext.palette import (
GuiPalette as WorkPalette)
@ -2706,7 +2706,7 @@ def getRunProc():
cmdLineFile = '/proc/%s/cmdline'%procNum
try:
if os.path.exists(cmdLineFile):
return [open(cmdLineFile,'r').read().strip(), procNum]
return [readFile(cmdLineFile).strip(), procNum]
except:
pass
return ["", procNum]

Loading…
Cancel
Save