You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-console/pym/console/application/cert_func.py

499 lines
16 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pwd
import sys
import subprocess
import socket
import time
import urllib.request as urllib2
from urllib.error import URLError
from .function import _print, get_ip_mac_type, parse_error
import OpenSSL
import hashlib
from .client_class import HTTPSClientCertTransport
from .cert_verify import VerifyError
from calculate.core.datavars import DataVarsCore
from calculate.core.server.methods_func import get_password
from calculate.lib.cl_lang import setLocalTranslate
from calculate.lib.utils.common import getpass
from calculate.lib.utils.files import listDirectory, readFile
_ = lambda x: x
setLocalTranslate('cl_console3', sys.modules[__name__])
VERSION = 0.11
def client_post_cert(client, clVars, show_info=False):
""" send a certificate server for check """
sid = client.get_sid()
lang = os.environ['LANG'][:2]
_result_post_cert, _result_sid = client.service.init_session(sid, lang)
result_post_cert = _result_post_cert[1].integer
result_sid = _result_sid[1].integer
if os.environ.get("DEBUG"):
print(_("The client uses certificate {certfile} "
"(server ID {cert_id})").format(
certfile=client.CERT_FILE, cert_id=result_post_cert[0]))
if result_post_cert[0] == -4:
print(_("Certificate not found on the server"))
print(_("the client uses certificate %s") % client.CERT_FILE)
print(_('You can generate a new certificate '
'using options --gen-cert-by and '
'--get-cert-from'))
raise Exception(3)
# client_sid(sid, client, cert_id = results[0][0], clVars = clVars)
if result_post_cert[0] == -3:
print(_("Certificate not sent!"))
elif result_post_cert[0] == -2:
print(_("Using the upstream certificate"))
else:
if show_info:
print(_(" Your certifitate ID = %d") % (result_post_cert[0]))
try:
if result_post_cert[1] == -2:
print(_("The certificate has expired"))
elif result_post_cert[1] > 0:
if show_info:
print(_("The certificate expires after %d days") % (
result_post_cert[1]))
except:
pass
# work with sid
client.write_sid(result_sid[0])
if show_info:
if result_sid[1] == 1:
print(_(" New Session"))
else:
print(_(" Old Session"))
print(_(" Your session ID = %s") % sid)
# Creation of secret key of the client
def new_key_req(key, cert_path, server_host_name, private_key_passwd=None,
auto=False):
from .create_cert import generateRSAKey, makePKey, makeRequest, \
passphrase_callback
rsa = generateRSAKey()
rsa.save_key(key + '_pub', cipher=None, callback=lambda *unused: "")
pkey = makePKey(rsa)
if not passphrase_callback(private_key_passwd):
pkey.save_key(key, cipher=None, callback=lambda *unused: "")
else:
pkey.save_key(key, callback=lambda *unused: str(private_key_passwd))
req = makeRequest(rsa, pkey, server_host_name, auto)
crtreq = req.as_pem()
req_file = cert_path + '/%s.csr' % server_host_name
crtfile = open(req_file, 'wb')
crtfile.write(crtreq)
crtfile.close()
user_name = pwd.getpwuid(os.getuid()).pw_name
try:
pwdObj = pwd.getpwnam(user_name)
except KeyError as e:
_print(parse_error(e))
return None
os.chown(key, pwdObj.pw_uid, pwdObj.pw_gid)
os.chmod(key, 0o600)
return req_file
def delete_old_cert(client):
try:
os.unlink(client.CERT_FILE)
os.unlink(client.REQ_FILE)
os.unlink(client.PKEY_FILE)
os.unlink(client.PubKEY_FILE)
except OSError as e:
_print(parse_error(e))
def client_post_request(cert_path, args):
if os.path.exists(cert_path + 'req_id'):
print(_("You already sent a certificate signature request."))
_print(_("Request ID = %s") % readFile(cert_path + 'req_id'))
ans = input(_("Send a new request? y/[n]: "))
if not ans.lower() in ['y', 'yes']:
return 0
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
port = args.port or clVars.Get('core.cl_core_port')
url = "https://%s:%s/?wsdl" % (args.by_host, port)
print('%s\n' % url, _("connecting..."))
from .client_class import Client_suds
try:
client = Client_suds(url, transport=HTTPSClientCertTransport \
(None, None, cert_path))
except (KeyboardInterrupt, URLError) as e:
print('\n' + _("Closing. Connection error."))
_print(_("Error: %s") % e)
return 0
client.wsdl.services[0].setlocation(url)
server_host_name = client.service.get_server_host_name()
key = os.path.join(cert_path, server_host_name + '.key')
csr_file = os.path.join(cert_path, server_host_name + '.csr')
if os.path.exists(key) and os.path.exists(csr_file):
print(_("the private key and request now exist"))
ask = input(_("Create a new private key and request? y/[n]: "))
if ask.lower() in ['y', 'yes']:
passwd = get_password()
new_key_req(key, cert_path, server_host_name,
private_key_passwd=passwd)
else:
passwd = get_password()
new_key_req(key, cert_path, server_host_name,
private_key_passwd=passwd)
ip, mac, client_type = get_ip_mac_type()
data = readFile(csr_file)
res = client.service.post_client_request(request=data, ip=ip,
mac=mac, client_type=client_type)
if int(res) < 0:
print(_("The server has not signed the certificate!"))
return 1
fc = open(os.path.join(cert_path, 'req_id'), 'w')
fc.write(res)
fc.close()
_print(_("Your request ID = %s") % res + '.\n',
_("To submit the certificate request on the server use command") + \
'\n' + 'cl-core --sign-client ID_CLIENT_REQUEST')
return 0
def client_get_cert(cert_path, args):
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
if not os.path.exists(os.path.join(cert_path, 'req_id')):
print(_("Request not sent or file %s deleted") \
% (os.path.join(cert_path, 'req_id')))
return 1
fc = open(os.path.join(cert_path, 'req_id'), 'r')
req_id = fc.read()
fc.close()
port = args.port or clVars.Get('core.cl_core_port')
url = "https://%s:%s/?wsdl" % (args.from_host, port)
print('%s\n' % url, _("connecting..."))
from .client_class import Client_suds
try:
client = Client_suds(url,
transport=HTTPSClientCertTransport(None, None,
cert_path))
except KeyboardInterrupt:
print(_("Closing. Connection error."))
return 1
client.wsdl.services[0].setlocation(url)
server_host_name = client.service.get_server_host_name()
if not os.path.exists(os.path.join(cert_path, server_host_name + '.csr')):
print(_("Request %s not found on the client's side") \
% (os.path.join(cert_path, server_host_name + '.csr')))
return 1
request = readFile(os.path.join(cert_path, server_host_name + '.csr'), binary=True)
md5 = hashlib.md5()
md5.update(request)
md5sum = md5.hexdigest()
result = client.service.get_client_cert(req_id, md5sum)
cert = result[0][0]
try:
ca_root = result[0][1]
except IndexError:
ca_root = None
if cert == '1':
print(_("Signature request rejected!"))
return 1
elif cert == '2':
print(_("Signature request not examined yet."))
print(_("Your request ID = %s") % req_id + '.\n', \
_("To submit the certificate request on the server use command") + \
'\n' + 'cl-core --sign-client ID_CLIENT_REQUEST')
return 1
elif cert == '3':
print(_("Request or signature not matching earlier data."))
return 1
elif cert == '4':
print(_("The request was sent from another IP."))
return 1
cert_file = os.path.join(cert_path, server_host_name + '.crt')
fc = open(cert_file, 'w')
fc.write(cert)
fc.close()
try:
os.unlink(cert_path + 'req_id')
except OSError as e:
_print(parse_error(e))
print(_('Certificate saved. Your certificate ID: %s') % req_id)
user_name = pwd.getpwuid(os.getuid()).pw_name
try:
pwdObj = pwd.getpwnam(user_name)
except KeyError as e:
_print(parse_error(e))
return None
os.chown(cert_file, pwdObj.pw_uid, pwdObj.pw_gid)
os.chmod(cert_file, 0o600)
if ca_root:
system_ca_db = clVars.Get('core.cl_glob_root_cert')
if os.path.exists(system_ca_db):
if ca_root in readFile(system_ca_db):
return 0
cl_client_cert_dir = clVars.Get('core.cl_client_cert_dir')
homePath = clVars.Get('ur_home_path')
cl_client_cert_dir = cl_client_cert_dir.replace("~", homePath)
root_cert_md5 = os.path.join(cl_client_cert_dir, "ca/cert_list")
md5 = hashlib.md5()
md5.update(ca_root.encode("UTF-8"))
md5sum = md5.hexdigest()
print("\n=================================================")
print("md5sum = ", md5sum)
if not os.path.exists(root_cert_md5):
fc = open(root_cert_md5, "w")
fc.close()
filename = None
with open(root_cert_md5) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split(' ', 1)
if words[0] == md5sum:
filename = words[1]
if not filename:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, ca_root)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == b'CN':
filename = item[1].decode("UTF-8")
fc = open(root_cert_md5, "a")
fc.write('%s %s\n' % (md5sum, filename))
fc.close()
if not filename:
print(_('Field "CN" not found in the certificate!'))
return 1
fd = open(os.path.join(cl_client_cert_dir, 'ca', filename), 'w')
fd.write(ca_root)
fd.close()
user_root_cert = clVars.Get('core.cl_user_root_cert')
user_root_cert = user_root_cert.replace("~", homePath)
fa = open(user_root_cert, 'a')
fa.write(ca_root)
fa.close()
print(_("filename = "), filename)
print(_("Certificate added"))
else:
print(_("The file containing the CA certificate now exists"))
return 0
def client_post_auth(client):
""" authorization client or post request """
sid = client.get_sid()
client.sid = int(sid)
try:
if os.path.exists(client.CERT_FILE):
pass # client_post_cert(client)
else:
# client_post_request(client)
print(_(
"You do not have a certificate. Use option --gen-cert-by HOST to generate a new request or --get-cert-from HOST to get a new certificate from the server."))
raise Exception(1)
# print client.service.versions(sid, VERSION)
except VerifyError as e:
print(e.value)
raise Exception(1)
########## Get password
def getRunProc():
"""List run program"""
def getCmd(procNum):
cmdLineFile = '/proc/%s/cmdline' % procNum
try:
if os.path.exists(cmdLineFile):
return [readFile(cmdLineFile).strip(), procNum]
except:
pass
return ["", procNum]
if not os.access('/proc', os.R_OK):
return []
return [getCmd(x) for x in listDirectory('/proc') if x.isdigit()]
def owner(pid):
UID = 1
for ln in open('/proc/%s/status' % pid):
if ln.startswith('Uid:'):
uid = int(ln.split()[UID])
return pwd.getpwuid(uid).pw_name
def create_socket(file_path, username):
host = '' # ip
port = 5501 # порт
find_proc = False
# if not file_path:
# home_path = pwd.getpwuid(os.getuid()).pw_dir
# file_path = os.path.join(home_path, '.calculate', 'passwd_daemon')
# if not username:
# username = pwd.getpwuid(os.getuid()).pw_name
for run_commands in filter(lambda x: 'cl-consoled' in \
x[0], getRunProc()):
if 'python' in run_commands[0]:
if username == owner(run_commands[1]):
# print 'YES'
find_proc = True
if not find_proc:
try:
os.unlink(file_path)
except OSError as e:
_print(parse_error(e))
cmd = ['cl-consoled']
# print cmd
subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
while True:
try:
s.bind((host, port)) # ассоциировать адрес с сокетом
break
except socket.error:
port += 1
return s
def set_password(s, req, size):
password = getpass.getpass(_('Password: '))
msg = '%s,%s' % (req, password)
s.send(msg)
resp = s.recv(size)
if resp.startswith('Error'):
_print(resp)
return password
def clear_password(server_host, server_port):
size = 1024 # размер данных
username = pwd.getpwuid(os.getuid()).pw_name
home_path = pwd.getpwuid(os.getuid()).pw_dir
file_path = os.path.join(home_path, '.calculate', 'passwd_daemon')
s = create_socket(file_path, username)
connect_error = 0
while connect_error < 16:
try:
while connect_error < 10:
if os.path.isfile(file_path):
serv_port, hash_val = readFile(file_path).split()
s.connect(('localhost', int(serv_port)))
req = 'delete,%s,%s,%s,%s' % (server_host, str(server_port),
username, hash_val)
s.send(req)
s.recv(size)
return
else:
connect_error += 1
time.sleep(0.3)
break
except socket.error:
time.sleep(0.3)
def socket_connect(s, file_path):
connect_error = 0
while connect_error < 16:
try:
while connect_error < 10:
if os.path.isfile(file_path):
serv_port, hash_val = readFile(file_path).split()
s.connect(('localhost', int(serv_port)))
return s, hash_val
else:
connect_error += 1
time.sleep(0.3)
break
except socket.error:
time.sleep(0.3)
def get_password_from_daemon(server_host, server_port, wait_thread):
size = 1024 # размер данных
username = pwd.getpwuid(os.getuid()).pw_name
home_path = pwd.getpwuid(os.getuid()).pw_dir
file_path = os.path.join(home_path, '.calculate', 'passwd_daemon')
while True:
s = create_socket(file_path, username)
s, hash_val = socket_connect(s, file_path)
req = '%s,%s,%s,%s' % (
server_host, str(server_port), username, hash_val)
s.send(req)
resp = s.recv(size)
if resp.startswith('Error'):
if 'timeout' in resp:
continue
wait_thread.stop()
sys.stdout.write('\r')
sys.stdout.flush()
password = set_password(s, req, size)
else:
password = resp if resp else None
return password