You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-core/core/server/cert_cmd.py

1113 lines
38 KiB

#-*- coding: utf-8 -*-
# Copyright 2012 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
################ Job with Command Line and Certificates ########################
################################################################################
import os, glob, socket, time
import fcntl, struct, subprocess
import M2Crypto, OpenSSL, hashlib
from func import new_key_req, uniq
from calculate.lib.utils import ip as ip
def getIpLocal():
for interface in ip.getInterfaces():
try:
return ip.getIp(interface)
except:
continue
else:
return ""
def getHwAddr():
""" get MAC adress for interface """
for interface in ip.getInterfaces():
try:
return ip.getMac(interface)
except:
continue
else:
return ""
# method for generating server certificates
def check_server_certificate(cert, key, cert_path, args, port, auto = False):
if not os.path.isdir(cert_path):
os.makedirs(cert_path)
# generate a root certificate
if args.gen_root_cert:
if auto:
c = 'n'
else:
c = raw_input (_("Enter certificate data by hand? [y]/n: "))
from M2Crypto import X509
import gettext
name = X509.X509_Name()
host_name = socket.getfqdn()
lang = gettext.locale.getdefaultlocale()[0][:2]
if c.lower() in ['n', 'no']:
name.CN = host_name #(Common Name);
name.OU = 'www.calculate-linux.ru' # (Organization Unit);
name.O = 'calculate-linux'# (Organization Name);
name.L = host_name+':'+str(port) # (Locality Name);
name.ST = 'Spb'# (State Name);
name.C = lang # (Country);
else:
print _('Do not use space characters and tabs')
host_name = socket.getfqdn()
name.CN = raw_input (_('Host Name [%s] : ') %host_name)
if name.CN in ['', None]:
name.CN = host_name
name.OU = raw_input (_('Organization Unit: '))
if not name.OU:
name.OU = ''
else:
name.OU.replace(' ', '_').replace('\t', '_')
name.O = raw_input (_('Organization Name: '))
if not name.O:
name.O = ''
else:
name.O.replace(' ', '_').replace('\t', '_')
network = _('Full network address (host:port)')
name.L = raw_input (network + ' [%s:%d]: ' \
%(host_name, port))
if name.L in ['', None]:
name.L = host_name + ':' + str(port)
name.ST = raw_input (_('State Name: '))
if not name.ST:
name.ST = ''
else:
name.ST.replace(' ', '_').replace('\t', '_')
name.C = raw_input (_('Country (only TWO letters!) [%s]: ') %lang)
if not name.C:
name.C = lang
cmd = ("openssl req -new -newkey rsa:2048 -nodes -keyout %s -x509 "
"-days 11000 -subj /C=%s/ST=%s/L=%s/O=%s"
"/OU=%s/CN=%s -out %s") \
%(cert_path+'/root.key', name.C, name.ST, name.L, name.O, name.OU, \
name.CN,\
cert_path+'/root.crt')
print cmd
PIPE = subprocess.PIPE
p = subprocess.Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE,
stderr=subprocess.STDOUT, close_fds=True)
p.wait()
# add certificate in trusted
fd = open(cert_path+'/ca_root.crt', 'a')
try:
fd.write(open(cert_path+'/root.crt', 'r').read())
except:
print _('error write (read) file from directory %s') %cert_path
fd.close()
print _("OK")
# use self root certificate as server certificate
elif args.use_root_cert:
if not os.path.exists(cert_path+'/root.crt'):
print _('root certificate not found (use cl-core with key '
'--gen-root-cert)')
return 1
# use root certificate as server certificate
ft = open(cert_path+'/root.crt', 'rb')
fd = open(cert_path+'/server.crt', 'wb')
ft.seek(0)
fd.write(ft.read())
ft.close()
fd.close()
ft = open(cert_path+'/root.key', 'rb')
fd = open(cert_path+'/server.key', 'wb')
ft.seek(0)
fd.write(ft.read())
ft.close()
fd.close()
print _("OK")
return 0
# send a certificate signing request to another server
elif args.host:
port = raw_input (_("Enter port: "))
try:
port = int(port)
except:
print _('Port must be int')
return 1
url = "https://%s:%d/?wsdl" %(args.host, port)
print _("%s\nconnect...") % url
from suds.client import Client
from client_class import HTTPSClientsCertTransport
try:
client = Client(url, \
transport = HTTPSClientsCertTransport(None, None, None))
except KeyboardInterrupt:
print _("\nClose. Connecting Error.")
#return 0
serv_host_name = client.service.get_server_host_name()
if os.path.exists(key) and os.path.exists(cert_path + '/server.csr'):
print _('secret key and request exists')
ask = raw_input(_("Create new secret key and request?")+" y/[n]: ")
if ask.lower() in ['y','yes']:
new_key_req(key, cert_path, serv_host_name, port)
else:
new_key_req(key, cert_path, serv_host_name, port)
ip = getIpLocal()
mac = getHwAddr()
data = open(cert_path + '/server.csr').read()
res = client.service.post_server_request(request = data, ip = ip,\
mac = mac)
if int(res) < 0:
print _("This server can not sign certificate!")
return 1
fc = open(cert_path + '/req_id', 'w')
fc.write(res)
fc.close()
print _("Your request id = %s") %res
return 0
# get a signed certificate from another server
elif args.root_host:
if not os.path.exists(cert_path + '/req_id'):
print _("request was not sent or deleted file %s") \
%(cert_path + '/req_id')
return 1
fc = open(cert_path + '/req_id', 'r')
req_id = fc.read()
fc.close()
print _("\nURL has form"), "https://%s:[port]/?wsdl" \
%args.root_host
port = raw_input (_("Enter port: "))
try:
port = int(port)
except:
print _('Port must be int')
return 1
url = "https://%s:%d/?wsdl" %(args.root_host, port)
print _("%s\nconnect...") % url
from suds.client import Client
from client_class import HTTPSClientsCertTransport
try:
client = Client(url, \
transport = HTTPSClientsCertTransport(None, None, None))
except KeyboardInterrupt:
print _("\nClose. Connecting Error.")
request = open(cert_path + '/server.csr').read()
md5 = hashlib.md5()
md5.update(request)
md5sum = md5.hexdigest()
result = client.service.get_server_cert(req_id, md5sum)
print "result = ", result
cert = result[0][0]
ca_root = result[0][1]
if cert == '1':
print _('Request to sign is rejected!')
return 1
elif cert == '2':
print _("Request for the signing has not yet reviewed.")
print _("Your request id = %s") %req_id
return 1
elif cert == '3':
print _("Request on signature does not match sent earlier.")
return 1
elif cert == '4':
print _("Request was sent from another ip.")
return 1
fc = open(cert_path + '/server.crt', 'w')
fc.write(cert)
fc.close()
os.unlink(cert_path + '/req_id')
print 'OK. Certificate save. Your certificate id = %s' %req_id
fd = open(cert_path + '/ca_root.crt', 'w')
if ca_root:
fd.write(ca_root)
#fd.write(cert)
if os.path.exists(cert_path + '/ca_root.crt'):
fd.write(open(cert_path + '/ca_root.crt', 'r').read())
fd.close()
return 0
def create_path(data_path, certbase, rights, group_rights):
if not os.path.exists(certbase):
if not os.path.exists(data_path+'/client_certs'):
try:
os.makedirs(data_path+'/client_certs')
except OSError:
print _("cannot create directory %s")\
%(data_path+'/client_certs')
temp = open(certbase, 'w')
temp.close()
if not os.path.exists(data_path+'/conf'):
try:
os.makedirs(data_path+'/conf')
except OSError:
print _("cannot create directory %s") %(data_path+'/conf')
if not os.path.isfile(rights):
rights_text = '# example of content:\n'+ \
'# certificate number 2 has right to run method ' + \
'"pid_info", and the certificate\n' + \
'# number 1 does not have rights. Use key --right-add '+ \
'and --right-del. See man.\n' + \
'#pid_info 2 -1\n'
fr = open(rights, 'w')
fr.write(rights_text)
fr.close()
if not os.path.isfile(group_rights):
group_rights_text = '# example of content:\n'+ \
'#manager pid_info,list_pid,cl_template,cl_login\n'
fgr = open(group_rights, 'w')
fgr.write(group_rights_text)
fgr.close()
#if not os.path.exists(data_path+'/server_certs/CRL'):
#open(data_path+'/server_certs/CRL', 'w')
# find a id by certificate
def find_cert_id(certificate, data_path, certbase):
# Open database
if not os.path.exists(certbase):
if not os.path.exists(data_path+'/client_certs'):
try:
os.makedirs(data_path+'/client_certs')
except OSError:
print _("cannot create directory %s")\
%(data_path+'/client_certs')
pass
temp = open(certbase, 'w')
temp.close()
md5 = hashlib.md5()
md5.update(certificate)
md5sum = md5.hexdigest()
cert_id = []
with open(certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
# if in line present certificate id
if words[1] == md5sum:
cert_id.append (words[0])
cert_path = data_path+'/client_certs/'
for certId in cert_id:
if os.path.isfile(cert_path + certId + '.crt'):
fp = open(cert_path + certId + '.crt', 'r')
cert = fp.read()
fp.close()
else:
cert = 'no cert'
if certificate == cert:
return certId
return 0
# find a certificate by id
def find_id_cert(cert_id, data_path):
cert_file = data_path+'/client_certs/%s.crt' %str(cert_id)
if os.path.exists(cert_file):
fp = open(cert_file, 'r')
cert = fp.read()
fp.close()
return cert
return 0
# delete selected clients certificate
def del_cert(certbase, data_path, cert_id):
cert_id = str(cert_id)
ft = open(certbase + '_temp', 'w')
with open(certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
# if in line present certificate id
if not words[0] == cert_id:
ft.write(line + '\n')
ft.close()
fd.close()
ft = open(certbase + '_temp', 'rb')
fc = open(certbase, 'wb')
ft.seek(0)
fc.write(ft.read())
ft.close()
fc.close()
os.unlink(certbase + '_temp')
try:
os.unlink (data_path+'/client_certs/' + cert_id)
except:
print _("delete file certificate error!")
def add_right(cert_id, method, rights):
ft = open(rights + "_temp", 'w')
# open file with rights
with open(rights) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
flag = 0
# and each word in line
words = line.split()
if words[0] == method:
for word in words:
# if in line present certificate id
if cert_id == word:
flag = 1
if '-'+cert_id == word:
line=line.replace(' '+word, '')
if not flag:
line += "%s " %cert_id
print "id %s - add %s" %(cert_id, method)
ft.write(line + '\n')
fd.close()
ft.close()
# copy all from temp file
ft = open(rights + '_temp', 'rb')
fd = open(rights, 'wb')
ft.seek(0)
fd.write(ft.read())
ft.close()
fd.close()
# delete temp file
os.unlink(rights + '_temp')
def del_right(cert_id, method, rights):
ft = open(rights + "_temp", 'w')
# open file with rights
with open(rights) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
flag = 0
# and each word in line
words = line.split()
if words[0] == method:
for word in words:
# if in line present certificate id
if '-'+cert_id == word:
flag = 1
if cert_id == word:
line=line.replace(' '+word, '')
if not flag:
line += "-%s " %cert_id
print "id %s - add %s" %(cert_id, method)
ft.write(line + '\n')
fd.close()
ft.close()
# copy all from temp file
ft = open(rights + '_temp', 'rb')
fd = open(rights, 'wb')
ft.seek(0)
fd.write(ft.read())
ft.close()
fd.close()
# delete temp file
os.unlink(rights + '_temp')
# add or delete rights a selected certificate
def change_rights_cert(cert_id, right_add, right_del, \
rights, group_rights, certbase):
list_id = []
if not cert_id == 'all':
try:
int(cert_id)
list_id.append(cert_id)
except:
ls_id = cert_id.split(',')
try:
for i in ls_id:
int(i)
list_id.append(i)
except:
print _("to change rights certificate number must be integer")
return 1
elif cert_id == 'all':
with open(certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
# if in line present certificate id
list_id.append(words[0])
for cert_id in list_id:
cert_id = str(cert_id)
if right_add:
if not os.path.exists(rights):
print _('file %s not found!') %rights
return 1
ls_rig_add = right_add.split(',')
for meth in ls_rig_add:
add_right(cert_id, meth, rights)
if right_del:
if not os.path.exists(rights):
print _('file %s not found!') %rights
return 1
ls_rig_del = right_del.split(',')
for meth in ls_rig_del:
del_right(cert_id, meth, rights)
# Detailed view clients certificates
def view_cert_info(cert, cert_id, rights, group_rights):
certobj = OpenSSL.crypto.load_certificate (OpenSSL.SSL.FILETYPE_PEM, cert)
print certobj.get_extension(certobj.get_extension_count()-1)
print _("Fingerprint = "), certobj.digest('SHA1')
print _("Serial Number = "), certobj.get_serial_number()
Issuer = certobj.get_issuer().get_components()
print _("\nIssuer")
for i in Issuer:
print "%s : %s" %(i[0], i[1])
Subject = certobj.get_subject().get_components()
print _("\nSubject")
for item in Subject:
print "%s : %s" %(item[0], item[1])
print _("\nRights: ")
#results = []
#cert_id = str(cert_id)
## open file with rights
#with open(rights) as fd:
#t = fd.read()
## See each line
#for line in t.splitlines():
## and each word in line
#words = line.split()
#for word in words:
## if in line present certificate id
#if cert_id == word:
## add name available method
#results.append(words[0])
#print " - %s" %words[0]
#fd.close()
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
com = certobj.get_extension(certobj.get_extension_count()-1).get_data()
groups = com.split(':')[1]
groups_list = groups.split(',')
#except:
#return ['-1']
results = []
if not os.path.exists (group_rights):
open(group_rights, 'w')
with open(group_rights) as fd:
t = fd.read()
fd.close()
for line in t.splitlines():
words = line.split(' ',1)
# first word in line equal name input method
if words[0] in groups_list:
methods = words[1].split(',')
for i in methods:
results.append(i.strip())
results = uniq(results)
add_list_rights = []
del_list_rights = []
with open(rights) as fr:
t = fr.read()
for line in t.splitlines():
words = line.split()
meth = words[0]
for word in words:
try:
word = int(word)
except:
continue
# compare with certificat number
if cert_id == word:
# if has right
add_list_rights.append(meth)
if cert_id == -word:
del_list_rights.append(meth)
results += add_list_rights
results = uniq(results)
for method in results:
if method in del_list_rights:
results.remove(method)
if results == []:
print _("no methods available")
else:
for meth in results:
print meth
# View, change rights, delete clients certificates on server
def view_cert(args, certbase, data_path, rights, group_rights):
cert_id = args.Id
dump = args.dump
remove = args.remove
right_add = args.right_add
right_del = args.right_del
for i in [right_add, right_del]:
if i:
change_rights_cert (cert_id, right_add, right_del, \
rights, group_rights, certbase)
return 0
if not os.path.exists(certbase):
fc = open(certbase,"w")
fc.close()
if cert_id == 'all':
count = 0
with open(certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
count += 1
cert = find_id_cert(words[0], data_path)
if cert == 0:
count -= 1
continue
print _("Certificate id = %s") %words[0]
if dump:
cert = find_id_cert(words[0], data_path)
if cert == 0:
count -= 1
continue
view_cert_info(cert, words[0], rights, group_rights)
print "#############################################\n"
print _("Total %d certificates.") %count
if remove:
answer = \
raw_input(_("Are you sure? Delete all client certificates?") + \
' y/[n]: ')
if answer.lower() in ['y','yes']:
fc = open(certbase,"w")
fc.close()
return 0
try:
cert_id = int (cert_id)
except:
print _("certificate number not int and not 'all'")
return 1
cert = find_id_cert(cert_id, data_path)
if not cert:
print _("Certificate not found")
return 1
if dump:
print cert
else:
view_cert_info(cert, cert_id, rights, group_rights)
if remove:
answer = \
raw_input("Delete client certificate with id = %d? y/[n]: "%cert_id)
if answer.lower() in ['y','yes']:
del_cert(certbase, data_path, cert_id)
print _("Deleted")
return 0
# Sign client request by server certificate
def sing_req_by_server(id_client_req, cert_path, data_path, auto = False):
server_cert = cert_path + '/root.crt'
server_key = cert_path + '/root.key'
if id_client_req:
try:
int (id_client_req)
except:
print _("Certificate number (id) must be int")
return 1
cl_req = data_path + '/client_certs/%s.csr' %id_client_req
cl_cert = data_path + '/client_certs/%s.crt' %id_client_req
if not os.path.exists(cl_req):
print _("Signing Request %s not found") %cl_req
return 1
if os.path.exists(cl_cert):
print _("certificate %s already exists") %cl_cert
return 1
if auto:
group = "group:all"
else:
group = "group:%s" %raw_input(_("Enter Group new certificate: "))
config = data_path + '/client_certs/ssl-client.cfg'
if os.path.exists(config):
os.unlink(config)
cfg_text = ("[ ssl_client ]\n"
"basicConstraints = CA:FALSE\n"
"nsCertType = client\n"
"keyUsage = digitalSignature, keyEncipherment\n"
"extendedKeyUsage = clientAuth\n"
"nsComment = %s") %group
fc = open(config, 'w')
fc.write(cfg_text)
fc.close()
cmd = ("openssl x509 -req -days 11000 -CA %s -CAkey %s -CAcreateserial "
"-extfile %s -extensions ssl_client -in %s -out %s") \
%(server_cert, server_key, config, cl_req, cl_cert)
print cmd
PIPE = subprocess.PIPE
p = subprocess.Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE,
stderr=subprocess.STDOUT, close_fds=True)
p.wait()
print _("certificate %s is signed") %cl_cert
return 0
# Sign server request by root certificate
def sing_req_by_root(args, cert_path, data_path):
root_cert = cert_path + '/root.crt'
root_key = cert_path + '/root.key'
if not os.path.exists(root_cert) or not os.path.exists(root_key):
print _("Root certificate or private key not found")
print _("look at %s") %cert_path
return 1
if args.id_server_req:
try:
int (args.id_server_req)
except:
print _("Certificate number (id) must be int")
return 1
sign_req = data_path + '/server_certs/%s.csr' %args.id_server_req
sign_cert = data_path + '/server_certs/%s.crt' %args.id_server_req
if not os.path.exists(sign_req):
print _("Signing Request %s not found") %sign_req
return 1
if os.path.exists(sign_cert):
print _("certificate %s already exists") %sign_cert
return 1
#config = cert_path + '/ssl-server-ca.cfg'
#if not os.path.exists(config):
#cfg_text = ("[ ssl_server_ca ]\nsubjectKeyIdentifier=hash\n"
#"authorityKeyIdentifier = keyid, issuer\n"
#"basicConstraints = CA:true\n"
#"nsCertType = server\n"
#"keyUsage = keyCertSign, digitalSignature, keyEncipherment, "
#"cRLSign, nonRepudiation\n"
#"extendedKeyUsage = serverAuth, clientAuth, nsSGC, msSGC")
#fc = open(config, 'w')
#fc.write(cfg_text)
#fc.close()
#cmd = ("openssl x509 -req -days 11000 -CA %s -CAkey %s -CAcreateserial "
#"-extfile %s -extensions ssl_server_ca -in %s -out %s") \
#%(root_cert, root_key, config, sign_req, sign_cert)
cmd = ("openssl x509 -req -days 11000 -CA %s -CAkey %s -CAcreateserial "
"-in %s -out %s") \
%(root_cert, root_key, sign_req, sign_cert)
print cmd
PIPE = subprocess.PIPE
p = subprocess.Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE,
stderr=subprocess.STDOUT, close_fds=True)
p.wait()
print _("certificate %s is signed") %sign_cert
return 0
# Detailed view server signed certificates
def view_signed_cert_info(cert_id, serv_certbase, data_path, mid_path):
cert_file = data_path + '/%s/%d.crt' %(mid_path, cert_id)
print cert_file
if os.path.exists(cert_file):
fp = open(cert_file, 'r')
cert = fp.read()
fp.close()
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
if mid_path == 'client_certs':
print certobj.get_extension(certobj.get_extension_count()-1)
print _("Fingerprint = "), certobj.digest('SHA1')
print _("Serial Number = "), certobj.get_serial_number()
Issuer = certobj.get_issuer().get_components()
print _("\nIssuer")
for i in Issuer:
print "%s : %s" %(i[0], i[1])
Subject = certobj.get_subject().get_components()
print _("\nSubject")
for item in Subject:
print "%s : %s" %(item[0], item[1])
if not os.path.exists(serv_certbase):
fc = open(serv_certbase,"w")
fc.close()
with open(serv_certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
# if in line present certificate id
if words[0] == str(cert_id):
print _('\nrequest sent from:')
print 'ip - %s' %words[4]
print 'mac - %s' %words[5]
print _('date') + ' - %s %s' %(words[2], words[3])
break
else:
print _("Certificate not found!")
print "\n###################################################\n"
# Detailed view server request
req_file = data_path+'/%s/%d.csr' %(mid_path, cert_id)
print req_file
if os.path.exists(req_file):
fp = open(req_file, 'r')
request = fp.read()
fp.close()
reqobj = OpenSSL.crypto.load_certificate_request \
(OpenSSL.SSL.FILETYPE_PEM, request)
Subject = reqobj.get_subject().get_components()
print _("\nSubject")
for item in Subject:
print " %s : %s" %(item[0], item[1])
if not os.path.exists(serv_certbase):
fc = open(serv_certbase,"w")
fc.close()
with open(serv_certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
# if in line present certificate id
if words[0] == str(cert_id):
print _('\nrequest sent from:')
print 'ip - %s' %words[4]
print 'mac - %s' %words[5]
print _('date') + ' - %s %s' %(words[2], words[3])
break
else:
print _("Request not found!")
return 0
# view servers signed certificates
def view_signed_cert(args, serv_certbase, data_path):
cert_id = args.cert_id
dump = args.dump
remove = args.remove
if cert_id == 'all':
if not os.path.exists(serv_certbase):
fc = open(serv_certbase,"w")
fc.close()
count = 0
with open(serv_certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
cert = '%s/server_certs/%s.crt' %(data_path, words[0])
req = '%s/server_certs/%s.csr' %(data_path, words[0])
if os.path.exists(cert):
print 'Certificate \t', cert
count += 1
if os.path.exists(req):
if not os.path.exists(cert):
print _('Request \t%s - not signed') %req
else:
print _('Request \t%s') %req
count += 1
if not count:
print _("Certificates or requests not found!")
return 0
try:
cert_id = int (cert_id)
except:
print _("certificate (request) number not int and not 'all'")
return 1
view_signed_cert_info(cert_id, serv_certbase, data_path, 'server_certs')
return 0
# View clients requests on server
def view_client_request(args, client_certbase, data_path):
req_id = args.req_id
dump = args.dump
remove = args.remove
if req_id == 'all':
if not os.path.exists(client_certbase):
fc = open(client_certbase,"w")
fc.close()
count = 0
with open(client_certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
cert = '%s/client_certs/%s.crt' %(data_path, words[0])
req = '%s/client_certs/%s.csr' %(data_path, words[0])
if os.path.exists(cert):
print _('Certificate \t'), cert
count += 1
if os.path.exists(req):
if not os.path.exists(cert):
print _('Request \t%s - not signed') %req
else:
print _('Request \t%s') %req
count += 1
if not count:
print _("Certificates or requests not found!")
return 0
try:
req_id = int (req_id)
except:
print _("certificate (request) number not int and not 'all'")
return 1
view_signed_cert_info(req_id, client_certbase, data_path, 'client_certs')
return 0
## refuse to sign request
def del_request(id_del_req, serv_certbase, data_path):
try:
int(id_del_req)
except:
print _("id must be int")
return 1
request = data_path + '/server_certs/%s.csr' %id_del_req
cert = data_path + '/server_certs/%s.crt' %id_del_req
# chect exists request and certificate files
print request
if not os.path.exists(request) and not os.path.exists(cert):
print _("Request or certificate with id = %s not found!") %id_del_req
return 1
if not os.path.exists(request):
print _("request %s not found!") %request
if os.path.exists(cert):
print _("This request has already been signed")
ask = raw_input (_("Delete certificate and request? y/[n]: "))
if not ask.lower() in ['y', 'yes']:
print _("Not deleted")
return 0
# create temp file
ft = open(serv_certbase + '_temp', 'w')
with open(serv_certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
# if in line present certificate id
if not words[0] == id_del_req:
ft.write(line + '\n')
ft.close()
fd.close()
ft = open(serv_certbase + '_temp', 'rb')
fc = open(serv_certbase, 'wb')
ft.seek(0)
fc.write(ft.read())
ft.close()
fc.close()
os.unlink(serv_certbase + '_temp')
try:
if os.path.exists(request):
os.unlink (request)
print _("request deleted")
if os.path.exists(cert):
os.unlink (cert)
print _("certificate deleted")
print _("successfully removed")
except:
print _("delete file error!")
def revoke_signed_cert(revoke_cert_id, data_path, cert_path):
CRL = data_path + '/server_certs/ca.crl'
CRL_mid_dir = "/server_certs/CRL/"
CRL_db_dir = data_path + CRL_mid_dir
if revoke_cert_id == 'rm':
if os.path.exists(CRL_db_dir):
for filename in glob.glob(CRL_db_dir+"*"):
os.unlink (filename)
if os.path.exists(CRL):
os.unlink(CRL)
print _("CRL deleted")
return 0
print _("CRL not exists")
return 0
try:
int (revoke_cert_id)
except:
print _("Id revocation certificate must be integer!")
return 1
cert_file = data_path + "/server_certs/%s.crt" %revoke_cert_id
if not os.path.exists (cert_file):
print _("Certificate %s not found") %cert_file
return 1
if not os.path.exists(CRL_db_dir):
os.makedirs(CRL_db_dir)
index_file = CRL_db_dir + 'index'
if not os.path.exists (index_file):
open(index_file,'w')
serial_file = CRL_db_dir + 'serial'
if not os.path.exists (serial_file):
open(serial_file,'w')
default_crl_days = 14
conf_file = data_path + "/server_certs/ca.config"
#if not os.path.exists (conf_file):
content_conf = ("[ ca ]\ndefault_ca = CA_CLIENT\n"
"[ CA_CLIENT ]\n"
"dir = %s\n"
"database = %s\n"
"serial = %s\n"
"certificate = %s\n"
"private_key = %s\n"
#"default_days = 365"
"default_crl_days = %d\n"
"default_md = md5\n") %(CRL_db_dir, index_file, \
serial_file, cert_path+'/root.crt', cert_path+'/root.key', \
default_crl_days)
fd = open(conf_file,'w')
fd.write(content_conf)
fd.close()
server_cert = open (cert_file, 'r').read()
cmd_rev_cert = "openssl ca -config %s -revoke %s" %(conf_file, cert_file)
os.system(cmd_rev_cert)
cmd_create_crl = "openssl ca -gencrl -config %s -out %s" %(conf_file, CRL)
os.system(cmd_create_crl)
print
cmd_show_crl = "openssl crl -text -noout -in %s" %CRL
os.system(cmd_show_crl)
#################################################################
def parse():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
'-l', '--lang', type=str, dest='lang',
help=_('language for translate'))
parser.add_argument(
'-p', '--port', type=int, default = '8888', dest='port',
help=_('port number'))
parser.add_argument(
'-c', '--cert', type=str, dest='Id',
help=_('operations with certificates (number or "all"). Server not run'))
parser.add_argument(
'-r', '--show-request', type=str, dest='req_id',
help=_('view clients requests (number or "all")'))
parser.add_argument(
'--sc', '--server-cert', type=str, dest='cert_id',
help=_('view servers certificates (number or "all"). Server not run'))
parser.add_argument(
'-b', '--bootstrap', type=str, dest='bootstrap_user_name',
help=_('bootstrap action'))
parser.add_argument(
'-d', '--dump', action='store_true', default=False, dest = 'dump',
help=_('dump (using with -c [ID])'))
parser.add_argument(
'--db','--debug', action='store_true', default=False, dest = 'debug',
help=_('debug'))
parser.add_argument(
'--rm','--remove', action='store_true', default=False, dest = 'remove',
help=_('remove selected certificate (using with -c [ID])'))
parser.add_argument(
'--right-add', type=str, dest='right_add',
help=_("Add right for certificate (or list via ',') (using with -c [ID])"))
parser.add_argument(
'--right-del', type=str, dest='right_del',
help=_('Delete right(s) for certificate (using with -c [ID])'))
parser.add_argument(
'--gen-root-cert', action='store_true', default=False,
dest = 'gen_root_cert', help=_('generate a root (CA) certificate'))
parser.add_argument(
'--gen-cert-by', type=str, dest='host',
help=_('send request to sign root certificate'))
parser.add_argument(
'--get-cert-from', type=str, dest='root_host',
help=_('get signed certificate from server'))
parser.add_argument(
'--use-root-as-server', action='store_true', default=False,
dest = 'use_root_cert', help=_('use root certificate as server cert'))
parser.add_argument(
'--sign-client', type=str, dest='id_client_req',
help=_("sign client's request by server certificate"))
parser.add_argument(
'--sign-server', type=str, dest='id_server_req',
help=_("sign server's request by root certificate"))
parser.add_argument(
'--del-req', type=str, dest='id_del_req',
help=_("refuse to sign request"))
parser.add_argument(
'--rv', '--revoke-cert', type=str, dest='revoke_cert_id',
help=_("revoke early signed server (or ca) certificate. rm to remove CRL"))
parser.add_argument(
'--version', action='store_true', default=False, dest = 'version',
help=_('prints the version number, then exits'))
return parser.parse_args()