You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-core/pym/core/server/cert_cmd.py

1252 lines
44 KiB

# -*- coding: utf-8 -*-
# Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org
#
# Job with Command Line and Certificates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import absolute_import
import os
import glob
import socket
import subprocess
import hashlib
import sys
from .func import new_key_req, uniq
from calculate.lib.utils import ip as ip
from calculate.core.datavars import DataVarsCore
from calculate.lib.utils.files import readFile
from calculate.lib.utils.text import _u8
import urllib.request as u2
if hasattr(u2, "ssl"):
u2.ssl._create_default_https_context = u2.ssl._create_unverified_context
_ = lambda x: x
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_core3', sys.modules[__name__])
def getIpLocal():
for interface in ip.getInterfaces():
return ip.getIp(interface)
else:
return ""
def getHwAddr():
""" get MAC adress for interface """
for interface in ip.getInterfaces():
return ip.getMac(interface)
else:
return ""
# method for generating server certificates
def check_server_certificate(cert, key, cert_path, args, port, auto=False):
if not os.path.isdir(cert_path):
os.makedirs(cert_path)
# generate a root certificate
if args.gen_root_cert:
if auto:
c = 'n'
else:
c = raw_input(_("Enter the certificate date manually? [y]/n: "))
from M2Crypto import X509
name = X509.X509_Name()
ob = DataVarsCore()
ob.importCore()
if not ob.flIniFile():
sys.exit(1)
lang = ob.Get('os_locale_locale')[:2]
host_name = socket.getfqdn()
from .create_cert import (passphrase_callback, generateRSAKey,
makePKey, CreateCertError, create_selfsigned_ca)
# Generating public key
rsa = generateRSAKey()
rsa.save_key(cert_path + '/root.key' + '_pub', cipher=None,
callback=passphrase_callback)
# Generating private key
pkey = makePKey(rsa)
pkey.save_key(cert_path + '/root.key', cipher=None,
callback=passphrase_callback)
name = {}
if c.lower() in ['n', 'no']:
name['CN'] = host_name # (Common Name);
name['OU'] = 'www.calculate-linux.ru' # (Organization Unit);
name['O'] = 'calculate-linux' # (Organization Name);
name['L'] = host_name + ':' + str(port) # (Locality Name);
name['ST'] = 'Spb' # (State Name);
name['C'] = lang # (Country);
else:
print(_('Do not use spaces or tabs.'))
host_name = socket.getfqdn()
name['CN'] = raw_input(_('Hostname [%s] : ') % host_name)
if name['CN'] in ['', None]:
name['CN'] = host_name
name['OU'] = raw_input(_('Organization unit: '))
if not name['OU']:
name['OU'] = ''
else:
name['OU'] = name['OU'].replace(' ', '_').replace('\t', '_')
name['O'] = raw_input(_('Organization name: '))
if not name['O']:
name['O'] = ''
else:
name['O'] = name['O'].replace(' ', '_').replace('\t', '_')
network = _('Full network address (host:port)')
name['L'] = raw_input(network + ' [%s:%d]: ' % (host_name, port))
if name['L'] in ['', None]:
name['L'] = host_name + ':' + str(port)
name['ST'] = raw_input(_('City: '))
if not name['ST']:
name['ST'] = ''
else:
name['ST'] = name['ST'].replace(' ', '_').replace('\t', '_')
name['C'] = raw_input(_('Country (two letters only!) [%s]: ') % lang)
if not name['C']:
name['C'] = lang
try:
create_selfsigned_ca(name, cert_path + '/root.key', cert_path + '/root.crt')
except CreateCertError:
print (_('Failed to create root certificate'))
# add certificate in trusted
try:
with open(cert_path + '/root.crt', 'r') as fd_r:
with open(cert_path + '/ca_root.crt', 'a') as fd_w:
fd_w.write(fd_r.read())
except IOError:
print (_('error writing to (reading from) files in directory %s')
% cert_path)
print(_("OK"))
# use self root certificate as server certificate
elif args.use_root_cert:
if not os.path.exists(cert_path + '/root.crt'):
print(_('root certificate not found (use cl-core with '
'option --gen-root-cert)'))
return 1
print(_('Using the root certificate as the server certificate'))
# use root certificate as server certificate
ft = open(cert_path + '/root.crt', 'rb')
fd = open(cert_path + '/server.crt', 'wb')
ft.seek(0)
fd.write(ft.read())
ft.close()
fd.close()
ft = open(cert_path + '/root.key', 'rb')
fd = open(cert_path + '/server.key', 'wb')
ft.seek(0)
fd.write(ft.read())
ft.close()
fd.close()
print(_("OK"))
return 0
# send a certificate signing request to another server
elif args.host:
url = "https://%s:%d/?wsdl" % (args.host, port)
print(url + '\n' + _("connecting..."))
import calculate.contrib
from suds.client import Client
from .client_class import HTTPSClientsCertTransport
from urllib.error import URLError
try:
client = Client(url, transport=HTTPSClientsCertTransport(
None, None, None))
except (KeyboardInterrupt, URLError):
print('\n' + _("Close. Connection Error."))
return 1
serv_host_name = client.service.get_server_host_name()
if os.path.exists(key) and os.path.exists(cert_path + '/server.csr'):
print(_("the private key and request now exist"))
ask = raw_input(_("Create a new private key and request?") +
" y/[n]: ")
if ask.lower() in ['y', 'yes']:
new_key_req(key, cert_path, serv_host_name, port)
else:
new_key_req(key, cert_path, serv_host_name, port)
local_ip = getIpLocal()
mac = getHwAddr()
data = readFile(cert_path + '/server.csr')
res = client.service.post_server_request(request=data, ip=local_ip,
mac=mac)
if int(res) < 0:
print(_("This server is not enabled to sign certificates!"))
return 1
with open(cert_path + '/req_id', 'w') as fc:
fc.write(res)
print(_("Your request ID = %s") % _u8(res))
return 0
# get a signed certificate from another server
elif args.root_host:
if not os.path.exists(cert_path + '/req_id'):
print (_("request not sent or file %s deleted")
% (cert_path + '/req_id'))
return 1
req_id = readFile(cert_path + '/req_id')
url = "https://%s:%d/?wsdl" % (args.root_host, port)
print(url + '\n' + _("connecting..."))
import calculate.contrib
from suds.client import Client
from .client_class import HTTPSClientsCertTransport
try:
client = Client(
url, transport=HTTPSClientsCertTransport(None, None, None))
except KeyboardInterrupt:
print('\n' + _("Close. Connection Error."))
return 1
request = readFile(cert_path + '/server.csr')
md5 = hashlib.md5()
md5.update(request)
md5sum = md5.hexdigest()
result = client.service.get_server_cert(req_id, md5sum)
cert = result[0][0]
if cert == '1':
print(_('The signature request was rejected!'))
return 1
elif cert == '2':
print(_("The signature request has not been examined yet."))
print(_("Your request ID = %s") % req_id)
return 1
elif cert == '3':
print(_('The signature request does not match earlier data.'))
return 1
elif cert == '4':
print(_("The request was sent from another IP."))
return 1
with open(cert_path + '/server.crt', 'w') as fc:
fc.write(cert)
ca_root = result[0][1]
os.unlink(cert_path + '/req_id')
print(_('Certificate saved. Your certificate ID = %s') % req_id)
with open(cert_path + '/ca_root.crt', 'w') as fd:
if ca_root:
fd.write(ca_root)
# fd.write(cert)
if os.path.exists(cert_path + '/ca_root.crt'):
fd.write(readFile(cert_path + '/ca_root.crt'))
return 0
def create_path(data_path, certbase, rights, group_rights, local_data_path):
if not os.path.exists(certbase):
if not os.path.exists(data_path + '/client_certs'):
try:
os.makedirs(data_path + '/client_certs')
except OSError:
print (_("cannot create directory %s")
% (data_path + '/client_certs'))
open(certbase, 'w').close()
if not os.path.exists(data_path + '/conf'):
try:
os.makedirs(data_path + '/conf')
except OSError:
print(_("cannot create directory %s") % (data_path + '/conf'))
if not os.path.exists(local_data_path + '/conf'):
try:
os.makedirs(local_data_path + '/conf')
except OSError:
print(_("cannot create directory %s") % (local_data_path + '/conf'))
if not os.path.isfile(rights):
rights_text = '# example of content:\n' + \
'# certificate number 2 has right to run method ' + \
'"pid_info", and the certificate\n' + \
'# number 1 does not have rights. Use key --right-add ' + \
'and --right-del. See man.\n' + \
'#pid_info 2 -1\n'
fr = open(rights, 'w')
fr.write(rights_text)
fr.close()
if not os.path.isfile(group_rights):
group_rights_text = ('# example of content:\n'
'#manager pid_info,list_pid,cl_template,install\n'
'system_update update,setupsystem,configure')
fgr = open(group_rights, 'w')
fgr.write("%s\n"%group_rights_text)
fgr.close()
# if not os.path.exists(data_path+'/server_certs/CRL'):
# open(data_path+'/server_certs/CRL', 'w')
# find a id by certificate
def find_cert_id(certificate, data_path, certbase):
# Open database
if not os.path.exists(certbase):
if not os.path.exists(data_path + '/client_certs'):
try:
os.makedirs(data_path + '/client_certs')
except OSError:
print (_("cannot create directory %s")
% (data_path + '/client_certs'))
pass
temp = open(certbase, 'w')
temp.close()
md5 = hashlib.md5()
md5.update(certificate)
md5sum = md5.hexdigest()
cert_id = []
with open(certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
if len(words) > 1:
# if in line present certificate id
if words[1] == md5sum:
cert_id.append(words[0])
cert_path = data_path + '/client_certs/'
for certId in cert_id:
if os.path.isfile(cert_path + certId + '.crt'):
fp = open(cert_path + certId + '.crt', 'r')
cert = fp.read()
fp.close()
else:
cert = 'no cert'
if certificate == cert:
return certId
return 0
# find a certificate by id
def find_id_cert(cert_id, data_path):
cert_file = data_path + '/client_certs/%s.crt' % str(cert_id)
if os.path.exists(cert_file):
fp = open(cert_file, 'r')
cert = fp.read()
fp.close()
return cert
return 0
# delete selected clients certificate
def del_cert(certbase, data_path, cert_id):
cert_id = str(cert_id)
ft = open(certbase + '_temp', 'w')
with open(certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
# if in line present certificate id
if not words[0] == cert_id:
ft.write(line + '\n')
ft.close()
fd.close()
ft = open(certbase + '_temp', 'rb')
fc = open(certbase, 'wb')
ft.seek(0)
fc.write(ft.read())
ft.close()
fc.close()
os.unlink(certbase + '_temp')
try:
os.unlink(data_path + '/client_certs/' + cert_id)
except OSError:
print(_("failed to delete the certificate!"))
def add_right(cert_id, method, rights):
ft = open(rights + "_temp", 'w')
# open file with rights
find_perm_flag = False
with open(rights) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
if not line:
continue
flag = 0
# and each word in line
words = line.split()
if words[0] == method:
find_perm_flag = True
for word in words:
# if in line present certificate id
if cert_id == word:
flag = 1
if '-' + cert_id == word:
line = line.replace(' ' + word, '')
if not flag:
line += "%s " % cert_id
print("id %s - add %s" % (cert_id, method))
ft.write(line + '\n')
fd.close()
if not find_perm_flag:
ft.write('%s %s \n' % (method, str(cert_id)))
print("id %s - add %s" % (cert_id, method))
ft.close()
# copy all from temp file
ft = open(rights + '_temp', 'rb')
fd = open(rights, 'wb')
ft.seek(0)
fd.write(ft.read())
ft.close()
fd.close()
# delete temp file
os.unlink(rights + '_temp')
def del_right(cert_id, method, rights):
ft = open(rights + "_temp", 'w')
find_perm_flag = False
# open file with rights
with open(rights) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
flag = 0
# and each word in line
words = line.split()
if words[0] == method:
find_perm_flag = True
for word in words:
# if in line present certificate id
if '-' + cert_id == word:
flag = 1
if cert_id == word:
line = line.replace(' ' + word, '')
if not flag:
line += "-%s " % cert_id
print("id %s - remove %s" % (cert_id, method))
ft.write(line + '\n')
fd.close()
if not find_perm_flag:
ft.write('%s -%s \n' % (method, str(cert_id)))
print("id %s - remove %s" % (cert_id, method))
ft.close()
# copy all from temp file
ft = open(rights + '_temp', 'rb')
fd = open(rights, 'wb')
ft.seek(0)
fd.write(ft.read())
ft.close()
fd.close()
# delete temp file
os.unlink(rights + '_temp')
# add or delete rights a selected certificate
def change_rights_cert(cert_id, right_add, right_del,
rights, group_rights, certbase):
list_id = []
if not cert_id == 'all':
try:
int(cert_id)
list_id.append(cert_id)
except ValueError:
ls_id = cert_id.split(',')
try:
for i in ls_id:
int(i)
list_id.append(i)
except ValueError:
print(_('to change permissions, the certificate number must '
'be integer'))
return 1
elif cert_id == 'all':
with open(certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
# if in line present certificate id
list_id.append(words[0])
for cert_id in list_id:
cert_id = str(cert_id)
if right_add:
if not os.path.exists(rights):
print(_('file %s not found!') % rights)
return 1
ls_rig_add = right_add.split(',')
for meth in ls_rig_add:
add_right(cert_id, meth, rights)
if right_del:
if not os.path.exists(rights):
print(_('file %s not found!') % rights)
return 1
ls_rig_del = right_del.split(',')
for meth in ls_rig_del:
del_right(cert_id, meth, rights)
# Detailed view clients certificates
def view_cert_info(cert, cert_id, rights, group_rights):
import OpenSSL
certobj = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, cert)
print(certobj.get_extension(certobj.get_extension_count() - 1))
print(_("Fingerprint = "), certobj.digest('SHA1'))
print(_("Serial number = "), certobj.get_serial_number())
issuer = certobj.get_issuer().get_components()
print('\n' + _("Issuer"))
for i in issuer:
print(" %s : %s" % (i[0], i[1]))
subject = certobj.get_subject().get_components()
print('\n' + _("Subject"))
for item in subject:
print(" %s : %s" % (item[0], item[1]))
print('\n' + _("Permissions: "))
certobj = OpenSSL.crypto.load_certificate(OpenSSL.SSL.FILETYPE_PEM, cert)
com = certobj.get_extension(certobj.get_extension_count() - 1).get_data()
groups = com.split(':')[1]
groups_list = groups.split(',')
results = []
if not os.path.exists(group_rights):
open(group_rights, 'w').close()
with open(group_rights) as fd:
t = fd.read()
fd.close()
for line in t.splitlines():
if not line:
continue
words = line.split(' ', 1)
# first word in line equal name input method
if words[0] in groups_list:
methods = words[1].split(',')
for i in methods:
results.append(i.strip())
results = uniq(results)
add_list_rights = []
del_list_rights = []
with open(rights) as fr:
t = fr.read()
for line in t.splitlines():
words = line.split()
meth = words[0]
for word in words:
try:
word = int(word)
except ValueError:
continue
# compare with certificat number
if cert_id == word:
# if has right
add_list_rights.append(meth)
if cert_id == -word:
del_list_rights.append(meth)
if 'all' in groups_list:
sys.stdout.write(' ' + _('all methods'))
sys.stdout.flush()
if del_list_rights:
sys.stdout.write(_(', except:') + '\n')
sys.stdout.flush()
for meth in del_list_rights:
print(' ' + meth)
return
results += add_list_rights
results = uniq(results)
for method in results:
if method in del_list_rights:
results.remove(method)
if not results:
print(_("No methods available"))
else:
for meth in results:
print(' ' + meth)
# View, change rights, delete clients certificates on server
def view_cert(args, certbase, data_path, rights, group_rights):
cert_id = args.Id
dump = args.dump
remove = args.remove
right_add = args.right_add
right_del = args.right_del
for i in [right_add, right_del]:
if i:
change_rights_cert(cert_id, right_add, right_del,
rights, group_rights, certbase)
return 0
if not os.path.exists(certbase):
fc = open(certbase, "w")
fc.close()
if cert_id == 'all':
count = 0
with open(certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
count += 1
cert = find_id_cert(words[0], data_path)
if cert == 0:
count -= 1
continue
print(_("Certificate ID = %s") % words[0])
if dump:
cert = find_id_cert(words[0], data_path)
if cert == 0:
count -= 1
continue
view_cert_info(cert, words[0], rights, group_rights)
print("#############################################\n")
print(_("Total: %d certificates.") % count)
if remove:
answer = raw_input(
_("Are you sure? Delete all client certificates?") + ' y/[n]: ')
if answer.lower() in ['y', 'yes']:
fc = open(certbase, "w")
fc.close()
return 0
try:
cert_id = int(cert_id)
except ValueError:
print(_("certificate number not int and not 'all'"))
return 1
cert = find_id_cert(cert_id, data_path)
if not cert:
print(_("Certificate not found"))
return 1
if dump:
print(cert)
else:
view_cert_info(cert, cert_id, rights, group_rights)
if remove:
answer = raw_input(
_("Delete the client certificate with ID %d? y/[n]: ") % cert_id)
if answer.lower() in ['y', 'yes']:
del_cert(certbase, data_path, cert_id)
print(_("Deleted"))
return 0
# Sign client request by server certificate
def sing_req_by_server(id_client_req, cert_path, data_path, auto=False,
group_name="all"):
server_cert = cert_path + '/root.crt'
server_key = cert_path + '/root.key'
if id_client_req:
try:
int(id_client_req)
except ValueError:
print(_("The certificate number must be int"))
return 1
cl_req = data_path + '/client_certs/%s.csr' % id_client_req
cl_cert = data_path + '/client_certs/%s.crt' % id_client_req
if not os.path.exists(cl_req):
print(_("Signature request %s not found") % cl_req)
return 1
if os.path.exists(cl_cert):
print(_("Certificate %s now exists") % cl_cert)
return 1
if not auto:
group_name = ""
while not group_name.split():
group_name = "%s" % raw_input(
_("Enter the group of the new certificate "
"(group name or 'all'): "))
config = data_path + '/client_certs/ssl-client.cfg'
if os.path.exists(config):
os.unlink(config)
from .create_cert import (sign_client_certifacation_request,
CreateCertError)
try:
sign_client_certifacation_request(
server_key, server_cert, cl_req, cl_cert, group_name)
except CreateCertError:
print (_('Failed to sign client certificate'))
# print 'startdate = ', startdate
# cfg_text = (#"[ ssl_client ]\n"
# "basicConstraints = CA:FALSE\n"
# "nsCertType = client\n"
# "keyUsage = digitalSignature, keyEncipherment\n"
# "extendedKeyUsage = clientAuth\n"
# "nsComment = %s\n" %group)
# print 'config = ', cfg_text
# fc = open(config, 'w')
# fc.write(cfg_text)
# fc.close()
# cmd=("openssl x509 -req -days 11000 -CA %s -CAkey %s -CAcreateserial "
# "-extfile %s -extensions ssl_client -in %s -out %s") \
# %(server_cert, server_key, config, cl_req, cl_cert)
# print cmd
# PIPE = subprocess.PIPE
# p = subprocess.Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE,
# stderr=subprocess.STDOUT, close_fds=True)
# p.wait()
if os.path.exists(cl_cert):
print(_("Certificate %s is signed") % cl_cert)
else:
print(_("Certificate %s has not been signed") % cl_cert)
return 0
# Sign server request by root certificate
def sing_req_by_root(args, cert_path, data_path):
root_cert = cert_path + '/root.crt'
root_key = cert_path + '/root.key'
if not os.path.exists(root_cert) or not os.path.exists(root_key):
print(_("Root certificate or private key not found"))
print(_("see %s") % cert_path)
return 1
if args.id_server_req:
try:
int(args.id_server_req)
except ValueError:
print(_("The certificate number must be int"))
return 1
sign_req = data_path + '/server_certs/%s.csr' % args.id_server_req
sign_cert = data_path + '/server_certs/%s.crt' % args.id_server_req
if not os.path.exists(sign_req):
print(_("Signature request %s not found") % sign_req)
return 1
if os.path.exists(sign_cert):
print(_("Certificate %s now exists") % sign_cert)
return 1
# config = cert_path + '/ssl-server-ca.cfg'
# if not os.path.exists(config):
# cfg_text = ("[ ssl_server_ca ]\nsubjectKeyIdentifier=hash\n"
# "authorityKeyIdentifier = keyid, issuer\n"
# "basicConstraints = CA:true\n"
# "nsCertType = server\n"
# "keyUsage = keyCertSign, digitalSignature, keyEncipherment, "
# "cRLSign, nonRepudiation\n"
# "extendedKeyUsage = serverAuth, clientAuth, nsSGC, msSGC")
# fc = open(config, 'w')
# fc.write(cfg_text)
# fc.close()
# cmd=("openssl x509 -req -days 11000 -CA %s -CAkey %s -CAcreateserial "
# "-extfile %s -extensions ssl_server_ca -in %s -out %s") \
# %(root_cert, root_key, config, sign_req, sign_cert)
cmd = ("openssl x509 -req -days 11000 -CA %s -CAkey %s -CAcreateserial "
"-in %s -out %s") % (root_cert, root_key, sign_req, sign_cert)
print(cmd)
PIPE = subprocess.PIPE
p = subprocess.Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE,
stderr=subprocess.STDOUT, close_fds=True)
p.wait()
print(_("Certificate %s is signed") % sign_cert)
return 0
# Detailed view server signed certificates
def view_signed_cert_info(cert_id, serv_certbase, data_path, mid_path):
cert_file = data_path + '/%s/%d.crt' % (mid_path, cert_id)
print(cert_file)
import OpenSSL
if os.path.exists(cert_file):
fp = open(cert_file, 'r')
cert = fp.read()
fp.close()
certobj = OpenSSL.crypto.load_certificate(
OpenSSL.SSL.FILETYPE_PEM, cert)
if mid_path == 'client_certs':
print(certobj.get_extension(certobj.get_extension_count() - 1))
print(_("Fingerprint = "), certobj.digest('SHA1'))
print(_("Serial number = "), certobj.get_serial_number())
issuer = certobj.get_issuer().get_components()
print('\n' + _("Issuer"))
for i in issuer:
print("%s : %s" % (i[0], i[1]))
subject = certobj.get_subject().get_components()
print('\n' + _("Subject"))
for item in subject:
print("%s : %s" % (item[0], item[1]))
if not os.path.exists(serv_certbase):
fc = open(serv_certbase, "w")
fc.close()
with open(serv_certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
# if in line present certificate id
if words[0] == str(cert_id):
print('\n' + _('request sent from:'))
print('IP: %s' % words[4])
print('MAC: %s' % words[5])
print(_('date') + ' - %s %s' % (words[2], words[3]))
break
else:
print(_("Certificate not found!"))
print("\n###################################################\n")
# Detailed view server request
req_file = data_path + '/%s/%d.csr' % (mid_path, cert_id)
print(req_file)
if os.path.exists(req_file):
fp = open(req_file, 'r')
request = fp.read()
fp.close()
reqobj = OpenSSL.crypto.load_certificate_request(
OpenSSL.SSL.FILETYPE_PEM, request)
subject = reqobj.get_subject().get_components()
print('\n' + _("Subject"))
for item in subject:
print(" %s : %s" % (item[0], item[1]))
if not os.path.exists(serv_certbase):
fc = open(serv_certbase, "w")
fc.close()
with open(serv_certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
# if in line present certificate id
if words[0] == str(cert_id):
print('\n' + _('request sent from:'))
print('IP: %s' % words[4])
print('MAC: %s' % words[5])
print(_('date') + ' - %s %s' % (words[2], words[3]))
break
else:
print(_("Request for signature not found!"))
return 0
# view servers signed certificates
def view_signed_cert(args, serv_certbase, data_path):
cert_id = args.cert_id
# dump = args.dump
# remove = args.remove
if cert_id == 'all':
if not os.path.exists(serv_certbase):
fc = open(serv_certbase, "w")
fc.close()
count = 0
with open(serv_certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
cert = '%s/server_certs/%s.crt' % (data_path, words[0])
req = '%s/server_certs/%s.csr' % (data_path, words[0])
if os.path.exists(cert):
print('Certificate \t', cert)
count += 1
if os.path.exists(req):
if not os.path.exists(cert):
print(_('Request \t%s: not signed') % req)
else:
print(_('Request \t%s') % req)
count += 1
if not count:
print(_("Certificate or request not found!"))
return 0
try:
cert_id = int(cert_id)
except ValueError:
print(_("certificate (request) number not int and not 'all'"))
return 1
view_signed_cert_info(cert_id, serv_certbase, data_path, 'server_certs')
return 0
# View clients requests on server
def view_client_request(args, client_certbase, data_path):
req_id = args.req_id
if req_id == 'all':
if not os.path.exists(client_certbase):
fc = open(client_certbase, "w")
fc.close()
count = 0
with open(client_certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
cert = '%s/client_certs/%s.crt' % (data_path, words[0])
req = '%s/client_certs/%s.csr' % (data_path, words[0])
if os.path.exists(cert):
print(_('Certificate \t'), cert)
count += 1
if os.path.exists(req):
if not os.path.exists(cert):
print(_('Request \t%s: not signed') % req)
else:
print(_('Request \t%s') % req)
count += 1
if not count:
print(_("Certificate or request not found!"))
return 0
try:
req_id = int(req_id)
except ValueError:
print(_("certificate (request) number not int and not 'all'"))
return 1
view_signed_cert_info(req_id, client_certbase, data_path, 'client_certs')
return 0
# refuse to sign request
def del_request(id_del_serv_req, id_del_client_req, serv_certbase,
client_certbase, data_path):
if id_del_serv_req:
sub_path = 'server_certs'
id_del_req = id_del_serv_req
certbase = serv_certbase
else:
sub_path = 'client_certs'
id_del_req = id_del_client_req
certbase = client_certbase
try:
int(id_del_req)
except ValueError:
print(_("The ID must be int."))
return 1
request = os.path.join(data_path, sub_path, '%s.csr' % id_del_req)
cert = os.path.join(data_path, sub_path, '%s.crt' % id_del_req)
# chect exists request and certificate files
print(request)
if not os.path.exists(request) and not os.path.exists(cert):
print(_("Request or certificate with ID = %s not found!") % id_del_req)
return 1
if not os.path.exists(request):
print(_("request %s not found!") % request)
if os.path.exists(cert):
print(_("Request duly signed"))
ask = raw_input(
_("Delete the certificate and the signature request? y/[n]: "))
if not ask.lower() in ['y', 'yes']:
print(_("Not deleted"))
return 0
# create temp file
if not os.path.isfile(certbase):
fd = open(certbase, 'w')
fd.close()
ft = open(certbase + '_temp', 'w')
with open(certbase) as fd:
t = fd.read()
# See each line
for line in t.splitlines():
# and each word in line
words = line.split()
# if in line present certificate id
if not words[0] == id_del_req:
ft.write(line + '\n')
ft.close()
fd.close()
ft = open(certbase + '_temp', 'rb')
fc = open(certbase, 'wb')
ft.seek(0)
fc.write(ft.read())
ft.close()
fc.close()
os.unlink(certbase + '_temp')
try:
if os.path.exists(request):
os.unlink(request)
print(_("signature request deleted"))
if os.path.exists(cert):
os.unlink(cert)
print(_("certificate deleted"))
except OSError:
print(_("failed to delete the file!"))
def revoke_signed_cert(revoke_cert_id, data_path, cert_path):
CRL = data_path + '/server_certs/ca.crl'
CRL_mid_dir = "/server_certs/CRL/"
CRL_db_dir = data_path + CRL_mid_dir
if revoke_cert_id == 'rm':
if os.path.exists(CRL_db_dir):
for filename in glob.glob(CRL_db_dir + "*"):
os.unlink(filename)
if os.path.exists(CRL):
os.unlink(CRL)
print(_("CRL deleted"))
return 0
print(_("CRL does not exist"))
return 0
try:
int(revoke_cert_id)
except ValueError:
print(_("The ID of a certificate to be revoked must be integer!"))
return 1
cert_file = data_path + "/server_certs/%s.crt" % revoke_cert_id
if not os.path.exists(cert_file):
print(_("Certificate %s not found") % cert_file)
return 1
if not os.path.exists(CRL_db_dir):
os.makedirs(CRL_db_dir)
index_file = CRL_db_dir + 'index'
if not os.path.exists(index_file):
open(index_file, 'w').close()
serial_file = CRL_db_dir + 'serial'
if not os.path.exists(serial_file):
open(serial_file, 'w').close()
default_crl_days = 14
conf_file = data_path + "/server_certs/ca.config"
# if not os.path.exists (conf_file):
content_conf = ("[ ca ]\ndefault_ca = CA_CLIENT\n"
"[ CA_CLIENT ]\n"
"dir = %s\n"
"database = %s\n"
"serial = %s\n"
"certificate = %s\n"
"private_key = %s\n"
# "default_days = 365"
"default_crl_days = %d\n"
"default_md = md5\n") % (
CRL_db_dir, index_file,
serial_file, cert_path + '/root.crt',
cert_path + '/root.key',
default_crl_days)
fd = open(conf_file, 'w')
fd.write(content_conf)
fd.close()
# server_cert = open (cert_file, 'r').read()
cmd_rev_cert = "openssl ca -config %s -revoke %s" % (conf_file, cert_file)
os.system(cmd_rev_cert)
cmd_create_crl = "openssl ca -gencrl -config %s -out %s" % (conf_file, CRL)
os.system(cmd_create_crl)
print()
cmd_show_crl = "openssl crl -text -noout -in %s" % CRL
os.system(cmd_show_crl)
#################################################################
def parse(full=False):
import argparse
from .methods_func import RawAndDefaultsHelpFormatter
parser = argparse.ArgumentParser(
add_help=False, formatter_class=RawAndDefaultsHelpFormatter)
parser.add_argument(
'-h', '--help', action='store_true', default=False,
dest='help', help=_("show this help message and exit"))
if full:
parser.add_argument(
'--start', action='store_true', default=False, dest='start',
help=_('server started'))
parser.add_argument(
'--port', type=int, dest='port',
help=_('port number'))
parser.add_argument(
'--pid-file', type=str, dest='pidfile',
help=_('Specify the location of the PID file'))
parser.add_argument(
'--close-on-inactive', action='store_true', dest='inactiveclose',
default=False,
help=_('stop the server if there are no active sessions'))
parser.add_argument(
'--cert', type=str, dest='Id',
help=_('operations with certificates (number or "all"). Server not'
' running.'))
parser.add_argument(
'--show-request', type=str, dest='req_id',
help=_("view client requests (number or \"all\")"))
parser.add_argument(
'--sc', '--server-cert', type=str, dest='cert_id',
help=_('view servers certificates (number or "all"). '
'Server not running.'))
parser.add_argument(
'-b', '--bootstrap', action='store_true', default=False,
dest='bootstrap_user_name', help=_('bootstrap action'))
parser.add_argument(
'-u', '--create-localuser-cert', type=str, dest='cert_user_name',
help=_('create certificate for local user'))
parser.add_argument(
'--clear-localuser-cert', action='store_true', default=False,
dest='clear_user_cert',
help=_('remove all local user certificates'))
parser.add_argument(
'--remove-certs', action='store_true', default=False,
dest='remove_certificates',
help=_('use with option --bootstrap to remove '
'all certificates, requests and config files on the server'))
parser.add_argument(
'--dump', action='store_true', default=False, dest='dump',
help=_('dump (use with option -c [ID])'))
parser.add_argument(
'--db', '--debug', action='store_true', default=False, dest='debug',
help=_('debug'))
if full:
parser.add_argument(
'--rm', '--remove', action='store_true', default=False,
dest='remove',
help=_('remove the selected certificate (use with option -c [ID])'))
parser.add_argument(
'--right-add', type=str, dest='right_add',
help=_('add permissions for a certificate (or certificates, '
'comma-separated) (use with option -c [ID])'))
parser.add_argument(
'--right-del', type=str, dest='right_del', help=_(
'remove permissions for a certificate '
'(use with option -c [ID])'))
parser.add_argument(
'--gen-root-cert', action='store_true', default=False,
dest='gen_root_cert', help=_('generate a root (or CA) certificate'))
parser.add_argument(
'--gen-cert-by', type=str, dest='host',
help=_("send a signature request for the root certificate"))
parser.add_argument(
'--get-cert-from', type=str, dest='root_host',
help=_('fetch the signed certificate from the server'))
parser.add_argument(
'--use-root-as-server', action='store_true', default=False,
dest='use_root_cert', help=_('use the root certificate as the '
'server certificate'))
parser.add_argument(
'--sign-client', type=str, dest='id_client_req',
help=_("sign the client's request with the server certificate"))
parser.add_argument(
'--sign-server', type=str, dest='id_server_req',
help=_("sign the server's request with the root certificate"))
parser.add_argument(
'--del-server-req', type=str, dest='id_del_req',
metavar='id_req', help=_("refuse to sign the certificate at "
"the server's request"))
parser.add_argument(
'--del-client-req', type=str, dest='id_del_client_req',
metavar='id_req', help=_("decline client's request for signature"))
parser.add_argument(
'--rv', '--revoke-cert', type=str, dest='revoke_cert_id', help=_(
'revoke an earlier signed server (or CA) certificate. '
'Run rm to remove the CRL'))
parser.add_argument(
'--method', type=str, dest='method', help=_('call method'))
parser.add_argument(
'--list-methods', action='store_true', default=False,
dest='list_methods', help=_('display all available methods'))
parser.add_argument(
'--no-progress', action='store_true', default=False,
dest='no_progress', help=_('do not display the progress bar'))
parser.add_argument(
'--gui-progress', action='store_true', default=False,
dest='gui_progress', help=_("display the GUI progress bar"))
parser.add_argument(
'--gui-warning', action='store_true', default=False,
dest='gui_warning', help=_('display warnings at the end'))
parser.add_argument(
'-f', '--force', action='store_true', default=False,
dest='no_questions', help=_('silent during the process'))
parser.add_argument(
'-P', action='store_true', default=False,
dest='stdin_passwd',
help=_('use passwords from standard input for users accounts'))
if full:
parser.add_argument(
'--check', action='store_true', default=False, dest='check',
help=_('configuration check'))
parser.add_argument(
'--create-symlink', action='store_true', default=False,
dest='create_symlink', help=_("Create symlinks for methods"))
parser.add_argument(
'--log-path', type=str, dest='log_path',
help=_('path to log files'))
parser.add_argument(
'--version', action='store_true', default=False,
dest='version', help=_('print the version number, then exit'))
return parser