You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-console/console/application/cert_func.py

329 lines
11 KiB

#-*- coding: utf-8 -*-
# Copyright 2012 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import urllib2
from sid_func import client_sid
from function import get_sid, get_ip_mac_type
import OpenSSL
import hashlib
from client_class import HTTPSClientCertTransport
from cert_verify import VerifyError
from calculate.core.datavars import DataVarsCore
from calculate.lib.cl_lang import setLocalTranslate
from calculate.lib.utils.common import getpass
setLocalTranslate('calculate_console',sys.modules[__name__])
VERSION = 0.11
def client_post_cert (client, show_info = False):
""" send a certificate server for check """
sid = get_sid(client.SID_FILE)
results = client.service.post_cert()
if results[0][0] == -4:
print _("Certificate not found in Server Database!")
print _('Add certificate to server Database...')
ip, mac, client_type = get_ip_mac_type()
print ip, mac, client_type
cert_id = client.service.cert_add(mac, client_type)
print _("Your certificate ID = %s") %cert_id
sys.exit(1)
client_sid(sid, client, cert_id = results[0][0])
if results[0][0] == -3:
print _("Certificate not send!")
else:
if show_info:
print _(" Your certifitate id = %d") %(results[0][0])
try:
if results[0][1] == -2:
print _("expiry date certificate has passed")
elif results[0][1] > 0:
if show_info:
print _("shelf life expires after %d days") \
%(results[0][1])
except:
pass
#Creation of secret key of the client
def new_key_req(key, cert_path, server_host_name, private_key_passwd = None, \
auto = False):
from create_cert import generateRSAKey, makePKey, makeRequest,\
passphrase_callback
rsa = generateRSAKey()
rsa.save_key(key+'_pub', cipher=None, callback = lambda *unused: None)
pkey = makePKey(rsa)
if not passphrase_callback(private_key_passwd):
pkey.save_key(key, cipher = None, callback = lambda *unused: None)
else:
pkey.save_key(key, callback= lambda *unused: str(private_key_passwd))
req = makeRequest(rsa, pkey, server_host_name, auto)
crtreq = req.as_pem()
req_file = cert_path + '/%s.csr' %server_host_name
crtfile = open(req_file, 'w')
crtfile.write(crtreq)
crtfile.close()
return req_file
def delete_old_cert(client):
os.unlink(client.CERT_FILE)
os.unlink(client.REQ_FILE)
os.unlink(client.PKEY_FILE)
os.unlink(client.PubKEY_FILE)
def get_password():
try:
pass1 = 'password'
pass2 = 'repeat'
while pass1 != pass2:
pass1 = getpass.getpass(_('Password: '))
pass2 = getpass.getpass(_('Repeat: '))
if pass1 != pass2:
print _('Passwords do not match')
except KeyboardInterrupt:
return None
passwd = pass1 if (pass1 and pass1 == pass2) else None
return passwd
def client_post_request (cert_path, args):
if os.path.exists(cert_path + 'req_id'):
print _("You have sent a request to sign the certificate.")
print _("request id = %s") %open(cert_path + 'req_id', 'r').read()
ans = raw_input (_("Send new request? y/[n]: "))
if not ans.lower() in ['y','yes']:
return 0
url = "https://%s:%d/?wsdl" %(args.by_host, args.port)
print '"%s\n'% url, _("connect...")
from client_class import Client_suds
try:
client = Client_suds(url, transport = HTTPSClientCertTransport \
(None, None, cert_path))
except (KeyboardInterrupt, urllib2.URLError), e:
print '\n'+_("Close. Connecting Error.")
print _("Error code: %s") %e
return 0
server_host_name = client.service.get_server_host_name()
key = cert_path + server_host_name + '.key'
csr_file = cert_path + server_host_name +'.csr'
if os.path.exists(key) and os.path.exists(csr_file):
print _('secret key and request exists')
ask = raw_input(_("Create new secret key and request? y/[n]: "))
if ask.lower() in ['y','yes']:
passwd = get_password()
new_key_req(key, cert_path, server_host_name,
private_key_passwd = passwd)
else:
passwd = get_password()
new_key_req(key, cert_path, server_host_name,
private_key_passwd = passwd)
ip, mac, client_type = get_ip_mac_type()
data = open(csr_file).read()
res = client.service.post_client_request(request = data, ip = ip,\
mac = mac, client_type = client_type)
if int(res) < 0:
print _("This server can not sign certificate!")
return 1
fc = open(cert_path + 'req_id', 'w')
fc.write(res)
fc.close()
print _("Your request id = %s") %res
return 0
def client_get_cert(cert_path, args):
if not os.path.exists(cert_path + 'req_id'):
print _("request was not sent or deleted file %s") \
%(cert_path + 'req_id')
return 1
fc = open(cert_path + 'req_id', 'r')
req_id = fc.read()
fc.close()
url = "https://%s:%d/?wsdl" %(args.from_host, args.port)
print '%s\n' %url, _("connect...")
from client_class import Client_suds
try:
client = Client_suds(url, \
transport = HTTPSClientCertTransport(None, None, cert_path))
except KeyboardInterrupt:
print _("Close. Connecting Error.")
server_host_name = client.service.get_server_host_name()
if not os.path.exists(cert_path + server_host_name + '.csr'):
print _('Request %s not found on client side') \
%(cert_path + server_host_name + '.csr')
return 1
request = open(cert_path + server_host_name + '.csr').read()
md5 = hashlib.md5()
md5.update(request)
md5sum = md5.hexdigest()
result = client.service.get_client_cert(req_id, md5sum)
cert = result[0][0]
ca_root = result[0][1]
if cert == '1':
print _('Request to sign is rejected!')
return 1
elif cert == '2':
print _("Request for the signing has not yet reviewed.")
print _("Your request id = %s") %req_id
return 1
elif cert == '3':
print _("Request on signature does not match sent earlier.")
return 1
elif cert == '4':
print _("Request was sent from another ip.")
return 1
fc = open(cert_path + server_host_name + '.crt', 'w')
fc.write(cert)
fc.close()
os.unlink(cert_path + 'req_id')
print 'OK. Certificate save. Your certificate id = %s' %req_id
if ca_root:
clVars = DataVarsCore()
clVars.importCore()
clVars.flIniFile()
system_ca_db = clVars.Get('cl_glob_root_cert')
if os.path.exists(system_ca_db):
if ca_root in open(system_ca_db, 'r').read():
return 0
cl_client_cert_dir = clVars.Get('cl_client_cert_dir')
homePath = clVars.Get('ur_home_path')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
root_cert_md5 = cl_client_cert_dir + "/ca/cert_list"
md5 = hashlib.md5()
md5.update(ca_root)
md5sum = md5.hexdigest()
print "\n================================================="
print "md5sum = ", md5sum
if not os.path.exists(root_cert_md5):
fc = open(root_cert_md5,"w")
fc.close()
filename = None
with open(root_cert_md5) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split(' ',1)
if words[0] == md5sum:
filename = words[1]
if not filename:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, ca_root)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == 'CN':
filename = item[1]
fc = open(root_cert_md5,"a")
fc.write('%s %s\n' %(md5sum, filename))
fc.close()
if not filename:
print _('Not found field "CN" in certificate!')
return 1
fd = open(cl_client_cert_dir + '/ca/' + filename, 'w')
fd.write(ca_root)
fd.close()
user_root_cert = clVars.Get('cl_user_root_cert')
user_root_cert = user_root_cert.replace("~",homePath)
fa = open(user_root_cert, 'a')
fa.write(ca_root)
fa.close()
print _("filename = "), filename
print _("CERTIFICATE ADD")
else:
print _("file with ca certificates exists")
return 0
def client_post_auth(client):
""" authorization client or post request """
sid = get_sid(client.SID_FILE)
client.sid = int(sid)
try:
if os.path.exists(client.CERT_FILE):
pass#client_post_cert(client)
else:
#client_post_request(client)
print _("You do not have a certificate. Use key --gen-cert-by "
"HOST for generate new request or key --get-cert-from "
"HOST for get new certificate from server.")
sys.exit()
# print client.service.versions(sid, VERSION)
except VerifyError, e:
print e.value
sys.exit()
## show rights on requested certificated
#def cert_right_inf(client, sid, cert_id):
# s = client.service.view_cert_right(cert_id, 'console')
# if s[0][0] == "-1":
# print _("Certificate not found in server!")
# return -1
# if s[0][0] == "-2":
# print _("Error input certificate ID!")
# return -2
# if s[0][0] == "Permission denied":
# print _("Permission denied %s") % s[0][1]
# return -3
#
# print _("Certificate with ID %d can execute:") %cert_id
# for meth in s[0]:
# print " - %s" %meth
#
## Viewing rights of any certificate on server by its id
#def client_view_cert_right(client):
# cert_id = raw_input (_("Certificate ID: "))
# try:
# cert_id = int (cert_id)
# except:
# print _("Error certificate id")
# return 1
# try:
# sid = get_sid(client.SID_FILE)
# if cert_id > 0:
# cert_right_inf(client, sid, cert_id)
# else:
# print _("Enter correctly cert id!")
# except Exception, e:
# if e[0][0] == 403:
# print _('Permission denied')
# else:
# print e
# #print _("Error get data")
# return 1
# return 0