test bootstrap

master3.3
Спиридонов Денис 12 years ago
parent 488b8caa13
commit f15b3be39b

@ -63,7 +63,7 @@ def client_post_cert (client):
#Creation of secret key of the client
def new_key_req(key, cert_path, server_host_name):
def new_key_req(key, cert_path, server_host_name, auto = False):
from create_cert import generateRSAKey, makePKey, makeRequest,\
passphrase_callback
rsa = generateRSAKey()
@ -74,11 +74,14 @@ def new_key_req(key, cert_path, server_host_name):
pkey.save_key(key,\
cipher=None, callback=passphrase_callback)
req = makeRequest(rsa, pkey, server_host_name)
req = makeRequest(rsa, pkey, server_host_name, auto)
crtreq = req.as_pem()
crtfile = open(cert_path + '/%s.csr' %server_host_name, 'w')
req_file = cert_path + '/%s.csr' %server_host_name
crtfile = open(req_file, 'w')
crtfile.write(crtreq)
crtfile.close()
return req_file
def delete_old_cert(client):
os.unlink(client.CERT_FILE)
@ -86,7 +89,7 @@ def delete_old_cert(client):
os.unlink(client.PKEY_FILE)
os.unlink(client.PubKEY_FILE)
def client_post_request (cert_path, args):
def client_post_request (cert_path, by_host):
if os.path.exists(cert_path + 'req_id'):
print _("You have sent a request to sign the certificate.")
print _("request id = %s") %open(cert_path + 'req_id', 'r').read()
@ -100,7 +103,7 @@ def client_post_request (cert_path, args):
except:
print _('Port must be int')
return 1
url = "https://%s:%d/?wsdl" %(args.by_host, port)
url = "https://%s:%d/?wsdl" %(by_host, port)
print _("%s\nconnect...") % url
from suds.client import Client
#try:
@ -260,7 +263,6 @@ def client_get_cert(cert_path, args):
else:
print _("file with ca certificates exists")
return 0
def client_post_auth(client):
""" authorization client or post request """

@ -397,7 +397,7 @@ def main():
getCRL.start()
if args.by_host:
client_post_request (path_to_cert, args)
client_post_request (path_to_cert, args.by_host)
return 0
if args.from_host:
client_get_cert (path_to_cert, args)

@ -18,6 +18,7 @@ import urllib, sys, getopt, os, shutil
import socket
from M2Crypto import RSA, X509, EVP, m2, Rand, Err
from calculate.lib.datavars import DataVars
import gettext
def passphrase_callback(v):
return None
@ -30,16 +31,28 @@ def makePKey(key):
pkey.assign_rsa(key)
return pkey
def makeRequest(pubkey, pkey, serv_host):
def makeRequest(pubkey, pkey, serv_host, auto = False):
""" create query to the signing on server """
req = X509.Request()
# Seems to default to 0, but we can now set it as well, so just API test
req.set_version(req.get_version())
req.set_pubkey(pkey)
name = X509.X509_Name()
c = raw_input (_("Enter certificate data by hand? y/[n]: "))
if auto:
c = 'n'
else:
c = raw_input (_("Enter certificate data by hand? y/[n]: "))
# Get HostName
host_name = socket.getfqdn()
list_host_name = host_name.split('.')
result_host_name = list_host_name[0]+"@"+serv_host
# Get username
clVars = DataVars()
clVars.flIniFile()
username = clVars.Get('ur_fullname')
# Get language
lang = gettext.locale.getdefaultlocale()[0][:2]
if c.lower() in ['y', 'yes']:
host_name = socket.getfqdn()
#if serv_host in host_name:
#host_name = host_name.replace('.'+serv_host, '')
#list_host_name = host_name.split('.')
@ -47,15 +60,10 @@ def makeRequest(pubkey, pkey, serv_host):
#list_host_name[len(list_host_name)-1]+"@"+serv_host
#else:
#host_name = socket.getfqdn()
list_host_name = host_name.split('.')
result_host_name = list_host_name[0]+"@"+serv_host
name.CN = raw_input (_('Host Name [%s] : ') %result_host_name)
if name.CN in ['', None]:
name.CN = result_host_name
clVars = DataVars()
clVars.flIniFile()
username = clVars.Get('ur_fullname')
name.OU = raw_input (_('User Name [%s]: ') %username)
if name.OU in ['', None]:
name.OU = username
@ -63,14 +71,14 @@ def makeRequest(pubkey, pkey, serv_host):
name.L = raw_input (_('Network address (hostname or IP) [%s]: ')\
%host_name)
name.ST = raw_input (_('State Name: '))
name.C = raw_input (_('Country: '))
name.C = raw_input (_('Country: [%s]') %lang)
else:
name.CN = 'calculate' # Имя сертификата (Common Name);
name.OU = 'My Unit' # Название отдела (Organization Unit);
name.CN = result_host_name # Имя сертификата (Common Name);
name.OU = username # Название отдела (Organization Unit);
name.O = 'My Company'# Название организации (Organization Name);
name.L = 'My City' # Название города (Locality Name);
name.L = host_name # Название города (Locality Name);
name.ST = 'My State'# Название региона (State Name);
name.C = 'En' # Двухсимвольный код страны (Country);
name.C = lang # Двухсимвольный код страны (Country);
req.set_subject_name(name)
ext1 = X509.new_extension('Comment', 'Auto Generated')

@ -0,0 +1,194 @@
#!/usr/bin/python
#-*- coding: utf-8 -*-
# Copyright 2012 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cert_cmd import check_server_certificate, sing_req_by_server
import post_request
from calculate.api.client.cert_func import new_key_req
from calculate.api.client.function import get_ip_mac_type
from calculate.lib.utils import ip as ip_mod
def init(cert, key, cert_path, data_path, certbase, args, port):
if check():
for step in range (2):
args = change_args(args, step)
create_server_cert(cert, key, cert_path, args, port)
create_client_cert(cert, cert_path, data_path, certbase, port)
def change_args(args, step = None):
if step == 0:
args.host = False
args.gen_root_cert = True
args.root_host = False
args.use_root_cert = False
elif step == 1:
args.gen_root_cert = False
args.use_root_cert = True
return args
def create_server_cert(cert, key, cert_path, args, port):
check_server_certificate(cert, key, cert_path, args, port, auto = True)
def create_client_cert(server_cert, cert_path, data_path, certbase):
req_id = create_request(server_cert, cert_path, data_path, certbase)
sign_certificate(req_id, cert_path, data_path)
get_certificate(cert_path, data_path, certbase)
def create_request(server_cert, cert_path, data_path, certbase):
server_host_name = 'localhost'
#key = cert_path + server_host_name + '.key'
#csr_file = cert_path + server_host_name +'.csr'
#pritn 'request file = ', csr_file
client_req_file = new_key_req(key, cert_path, server_host_name, auto = True)
ip, mac, client_type = get_ip_mac_type()
data = open(client_req_file).read()
req_id = post_request.serv_post_client_request (data, data_path, ip, mac, \
client_type, certbase, cert_path)
fc = open(cert_path + 'req_id', 'w')
fc.write(res)
fc.close()
return req_id
def sign_certificate(req_id, cert_path, data_path):
sing_req_by_server(req_id, cert_path, data_path)
def get_ip_mac_type():
for Interfaces in ip_mod.getInterfaces():
try:
ip, mac, client_type = get_ip_mac_type(Interfaces, 'gui')
return (ip, mac, client_type)
except:
pass
return ('no_ip','no_mac', 'live')
def get_certificate(cert_path, data_path, certbase):
if not os.path.exists(cert_path + 'req_id'):
print _("request was not sent or deleted file %s") \
%(cert_path + 'req_id')
return 1
fc = open(cert_path + 'req_id', 'r')
req_id = fc.read()
fc.close()
server_host_name = 'localhost'
if not os.path.exists(cert_path + server_host_name + '.csr'):
print _('Request %s not found') %(cert_path + server_host_name + '.csr')
return 1
request = open(cert_path + server_host_name + '.csr').read()
md5 = hashlib.md5()
md5.update(request)
md5sum = md5.hexdigest()
result = post_request.serv_get_client_cert (req_id, request, data_path, \
certbase, cert_path)
cert = result[0][0]
ca_root = result[0][1]
if cert == '1':
print _('Request to sign is rejected!')
return 1
elif cert == '2':
print _("Request for the signing has not yet reviewed.")
print _("Your request id = %s") %req_id
return 1
elif cert == '3':
print _("Request on signature does not match sent earlier.")
return 1
elif cert == '4':
print _("Request was sent from another ip.")
return 1
fc = open(cert_path + server_host_name + '.crt', 'w')
fc.write(cert)
fc.close()
os.unlink(cert_path + 'req_id')
print 'OK. Certificate save. Your certificate id = %s' %req_id
if ca_root:
clVars = DataVarsApi()
clVars.importApi()
clVars.flIniFile()
system_ca_db = clVars.Get('cl_glob_root_cert')
if os.path.exists(system_ca_db):
if ca_root in open(system_ca_db, 'r').read():
return 0
cl_client_cert_dir = clVars.Get('cl_client_cert_dir')
homePath = clVars.Get('ur_home_path')
cl_client_cert_dir = cl_client_cert_dir.replace("~",homePath)
root_cert_md5 = cl_client_cert_dir + "/ca/cert_list"
md5 = hashlib.md5()
md5.update(ca_root)
md5sum = md5.hexdigest()
print "\n================================================="
print "md5sum = ", md5sum
if not os.path.exists(root_cert_md5):
fc = open(root_cert_md5,"w")
fc.close()
filename = None
with open(root_cert_md5) as fd:
t = fd.read()
# for each line
for line in t.splitlines():
# Split string into a words list
words = line.split(' ',1)
if words[0] == md5sum:
filename = words[1]
if not filename:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, ca_root)
Issuer = certobj.get_issuer().get_components()
for item in Issuer:
if item[0] == 'CN':
filename = item[1]
fc = open(root_cert_md5,"a")
fc.write('%s %s\n' %(md5sum, filename))
fc.close()
if not filename:
print _('Not found field "CN" in certificate!')
return 1
fd = open(cl_client_cert_dir + '/ca/' + filename, 'w')
fd.write(ca_root)
fd.close()
user_root_cert = clVars.Get('cl_user_root_cert')
user_root_cert = user_root_cert.replace("~",homePath)
fa = open(user_root_cert, 'a')
fa.write(ca_root)
fa.close()
print _("filename = "), filename
print _("CERTIFICATE ADD")
else:
print _("file with ca certificates exists")
return 0

@ -39,12 +39,15 @@ def getHwAddr(ifname = 'eth0'):
return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
# method for generating server certificates
def check_server_certificate(cert, key, cert_path, args, port):
def check_server_certificate(cert, key, cert_path, args, port, auto = False):
if not os.path.isdir(cert_path):
os.makedirs(cert_path)
# generate a root certificate
if args.gen_root_cert:
c = raw_input (_("Enter certificate data by hand? [y]/n: "))
if auto:
c = raw_input (_("Enter certificate data by hand? [y]/n: "))
else:
c = 'n'
from M2Crypto import X509
name = X509.X509_Name()
if c.lower() in ['n', 'no']:
@ -515,6 +518,7 @@ def view_cert_info(cert, cert_id, rights, group_rights):
open(group_rights, 'w')
with open(group_rights) as fd:
t = fd.read()
fd.close()
for line in t.splitlines():
words = line.split()
# first word in line equal name input method
@ -631,18 +635,18 @@ def view_cert(args, certbase, data_path, rights, group_rights):
return 0
# Sign client request by server certificate
def sing_req_by_server(args, cert_path, data_path):
def sing_req_by_server(id_client_req, cert_path, data_path, auto = False):
server_cert = cert_path + '/root.crt'
server_key = cert_path + '/root.key'
if args.id_client_req:
if id_client_req:
try:
int (args.id_client_req)
int (id_client_req)
except:
print _("Certificate number (id) must be int")
return 1
cl_req = data_path + '/client_certs/%s.csr' %args.id_client_req
cl_cert = data_path + '/client_certs/%s.crt' %args.id_client_req
cl_req = data_path + '/client_certs/%s.csr' %id_client_req
cl_cert = data_path + '/client_certs/%s.crt' %id_client_req
if not os.path.exists(cl_req):
print _("Signing Request %s not found") %cl_req
return 1
@ -651,7 +655,10 @@ def sing_req_by_server(args, cert_path, data_path):
print _("certificate %s already exists") %cl_cert
return 1
group = "group:%s" %raw_input(_("Enter Group new certificate: "))
if auto:
group = "group:all"
else:
group = "group:%s" %raw_input(_("Enter Group new certificate: "))
config = data_path + '/client_certs/ssl-client.cfg'
if os.path.exists(config):
os.unlink(config)
@ -1033,7 +1040,6 @@ def parse():
help=_('language for translate'))
parser.add_argument(
'-p', '--port', type=int, default = '8888', dest='port',
#dest='port', action='store_const', const=sum,
help=_('port number'))
parser.add_argument(
'-c', '--cert', type=str, dest='Id',
@ -1044,6 +1050,9 @@ def parse():
parser.add_argument(
'--sc', '--server-cert', type=str, dest='cert_id',
help=_('view servers certificates (number or "all"). Server not run'))
parser.add_argument(
'-b', '--bootstrap', action='store_true', default=False,
dest = 'bootstrap', help=_('bootstrap action'))
parser.add_argument(
'-d', '--dump', action='store_true', default=False, dest = 'dump',
help=_('dump (using with -c [ID])'))

@ -29,11 +29,12 @@ import threading
from calculate.lib.datavars import DataVars
from calculate.api.datavars import DataVarsApi
from calculate.api.server.clean import clean
import calculate.api.server.cert_cmd as cert_cmd
from clean import clean
import cert_cmd
import bootstrap
from calculate.api.server.func import initialization
from calculate.api.server.server_class import ClApplication
from func import initialization
from server_class import ClApplication
class OpenSSLAdapter (pyOpenSSLAdapter):
@ -168,7 +169,9 @@ def main(*args, **keywords):
''' view information about client certificates '''
if args.version:
print cl_ver
return 0
return 0
if args.bootstrap:
bootstrap.init(cert, key, cert_path, data_path, certbase, args, port)
if args.revoke_cert_id:
cert_cmd.revoke_signed_cert(args.revoke_cert_id, data_path, cert_path)
return 0
@ -176,7 +179,7 @@ def main(*args, **keywords):
cert_cmd.check_server_certificate(cert, key, cert_path, args, port)
return 0
if args.id_client_req:
cert_cmd.sing_req_by_server(args, cert_path, data_path)
cert_cmd.sing_req_by_server(args.id_client_req, cert_path, data_path)
return 0
if args.Id:
cert_cmd.view_cert(args, certbase, data_path, rights, group_rights)

@ -0,0 +1,229 @@
##-*- coding: utf-8 -*-
## Copyright 2010-2012 Calculate Ltd. http://www.calculate-linux.org
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
#import soaplib, sys, time, os
#import threading
#from soaplib.serializers.primitive import String, Integer, Any, Boolean
#from soaplib.serializers.clazz import Array, ClassSerializer
#from soaplib.service import rpc, DefinitionBase
#from calculate.api.server.api_types import ReturnedMessage, getViewForVariables
#from calculate.api.server.api_types import ChoiceValue, Table, Option, Field, \
#GroupField, ViewInfo
#from calculate.lib.datavars import VariableError
##from cl_install import Install,DataVarsInstall
#import cl_install
#from calculate.lib.cl_lang import setLocalTranslate
#from calculate.api.server.decorators import Dec
#setLocalTranslate('cl_install',sys.modules[__name__])
#import traceback
#class InstallInfo(ClassSerializer):
#"""Parameters for method install"""
#request_id = String
#request_group = String
#def catchExcept():
#class wrapper:
#def __init__(self,f):
#self.f = f
#self.func_name = f.func_name
#self.func_code = f.func_code
#self.__doc__ = f.__doc__
#self.__name__ = f.__name__
#def __call__(self,selfobj,*args,**kwargs):
#try:
#return self.f(selfobj,*args,**kwargs)
#except BaseException as e:
#view = ViewInfo(groups=[])
#group = GroupField(name=_("Error"),last=True)
#group.fields = []
#group.fields.append(Field(
#name = "error",
#label = str(e),
#default = 'color:red;',
#element = "error"))
#view.groups.append(group)
#print "!!!!EXCEPTION!!!!"
#for i in apply(traceback.format_exception, sys.exc_info()):
#print i
#return view
#return wrapper
#class Wsdl:
#def check_params (self, dv, info,allvars=False,ordered=None):
#errors = []
#keys = sorted(filter(lambda x:x.lower() == x,
#info._type_info.keys()))
#if ordered:
#keys = ordered + filter(lambda x:not x in ordered,
#keys)
#for var in keys:
## get value of variable from info
#val = info.__getattribute__(var)
## check value if value send of check allvariables
#if val != None or allvars:
#try:
## if value not send, then get from datavars
#if val == None:
#val = dv.Get(var)
#else:
#uncomperr = dv.Uncompatible(var)
#if uncomperr:
#raise VariableError(uncomperr)
#if not dv.Uncompatible(var):
#dv.Set(var, val)
#except VariableError, e:
#mess = ''
#messages = e.message if type(e.message) == list else [e.message]
#for error in messages:
#mess += str(error) + '\n'
#errors.append(ReturnedMessage(type = 'error', field = var,
#message = mess))
#return errors
#from calculate.api.server.baseClass import Basic
#from calculate.api.server.decorators import Dec
#def installCommon(self,sid,info,methodname,initfunc):
#"""
#Install common method
#"""
#try:
#dv = self.get_cache(sid,methodname,"vars")
#if not dv:
#reload(cl_install)
#dv = cl_install.DataVarsInstall()
#dv.importInstall()
#dv.flIniFile()
#initfunc(dv)
#errors = self.check_params(dv, info,
#ordered=['cl_autopartition_scheme',
#'cl_autopartition_device',
#'cl_autopartition_root_size',
#'cl_image_linux_shortname',
#'cl_image_arch_machine',
#'cl_image_linux_ver',
#'cl_image_linux_build'],
#allvars=not info.CheckOnly)
#if errors:
#return errors
#if info.CheckOnly:
#returnmess = ReturnedMessage(type = '', message = None)
#return [returnmess]
#install_meth = type("CommonInstall",(self.Common,
#cl_install.Install, object), {})
#pid = self.startprocess(sid, target=install_meth,
#method="installSystem",\
#args_proc = (dv,))
#returnmess = ReturnedMessage(type = 'pid', message = pid)
#returnmess.type = "pid"
#returnmess.message = pid
#dv = self.clear_cache(sid,methodname)
#return [returnmess]
#finally:
#if dv:
#self.set_cache(sid,methodname,"vars",dv,smart=False)
#return []
#@rpc(Integer, InstallInfo, _returns = Array(ReturnedMessage))
#@Dec.check_permissions(["install"])
#@Dec.console('cl-install')
#@Dec.gui(_('System'),_('Install'),'drive-harddisk')
#def install ( self, sid, info):
#def init_func(dv):
#dv.Set('cl_action','system',True)
#return self.installCommon(sid,info,'install',init_func)
#@rpc(Integer, Integer, Boolean,_returns = ViewInfo)
#@catchExcept()
#def install_view (self, sid, step,expert):
#curThread = threading.currentThread()
#dv = self.get_cache(sid,"install","vars")
#if not dv:
#reload(cl_install)
#dv = cl_install.DataVarsInstall()
#dv.importInstall()
#dv.flIniFile()
#dv.Set('cl_action','system',True)
#view = getViewForVariables (dv, [
#(_("Requests"), \
#('cl_req_id',),
#('cl_image_linux_shortname','cl_image_arch_machine',
#'cl_image_linux_ver','cl_image_linux_build'),
#_("Next")), \
#(_("Autopartition"), \
#('cl_autopartition_device',
#'cl_autopartition_scheme'),
#('cl_autopartition_table','cl_autopartition_root_size'),
#_("Next")), \
#(_("Partitioning"), \
#('os_location_data','os_install_scratch','cl_uuid_set'),
#('os_install_root_type','os_install_mbr',
#'os_install_kernel_scheduler'),
#_("Next")), \
#(_("Locale"), \
#('os_install_locale_lang','os_install_clock_timezone'),(), \
#_("Next")),
#(_("Networking"), \
#('os_install_net_conf','os_install_net_data','os_install_net_fqdn',
#'os_install_ntp'),('os_install_net_route_data',), \
#_("Next")),
#(_("Users"), \
#('cl_migrate_data','cl_autologin'),(), \
#_("Next")),
#(_("Video"), \
#('os_install_x11_video_drv', 'os_install_x11_composite',
#'os_install_x11_resolution', 'os_install_fb_resolution'),(), \
#_("Install")),
#],step,expert)
#self.set_cache(sid, 'install', "vars",dv,smart=False)
#return view
#@rpc(Integer, InstallInfo, _returns = Array(ReturnedMessage))
#@Dec.check_permissions(["install"])
#@Dec.gui(_('System'),_('Flash install'),
#'drive-removable-media-usb-pendrive,media-removable')
#def install_flash ( self, sid, info):
#"""
#Install to flash
#"""
#def init_func(dv):
#dv.Set('cl_action','system',True)
#dv.Set('cl_install_type','flash')
#return self.installCommon(sid,info,'install_flash',init_func)
#@rpc(Integer, Integer, Boolean,_returns = ViewInfo)
#@catchExcept()
#def install_flash_view (self, sid, step,expert):
#dv = self.get_cache(sid,"install_flash","vars")
#if not dv:
#reload(cl_install)
#dv = cl_install.DataVarsInstall()
#dv.importInstall()
#dv.flIniFile()
#dv.Set('cl_action','system',True)
#dv.Set('cl_install_type','flash')
#view = getViewForVariables (dv, [
#(_("Distribute"), \
#('os_install_disk_single','cl_image_filename'),
#('os_location_data',),
#_("Install")), \
#],step,expert)
#self.set_cache(sid, 'install_flash', "vars",dv,smart=False)
#return view

@ -334,7 +334,7 @@ class ApiWsdl:
if expert:
group.fields.append(Field(
name = "expert_open",
#label = _("Press for advanced settings..."),
label = _("Press for advanced settings..."),
type = "label",
opt = Option(longopt="--bot",
metavalue="BOT"),
@ -360,17 +360,17 @@ class ApiWsdl:
group.fields.append(Field(
name = "combovariable",
label = "check city number: ",
choice = ['Moscow-1','Piter-2','New York-3','London-4'],
comments = ['Mos','Pit','New Yo','Lond'],
choice = ['Moscow-1','Piter-2','---','New York-3','London-4'],
comments = ['Mos','Pit','---','New Yo','Lond'],
type = "str",
default = 'New York-3',
help = \
"you not human? This is a anti-bot test",
element = "comboEdit"))
element = "combo"))
group.fields.append(Field(
name = "combovariable2",
label = "check city number: ",
choice = ['Moscow','Piter','New York','London'],
choice = ['Moscow','Piter','New York','---','London'],
type = "str",
default = 'London',
help = \
@ -379,7 +379,7 @@ class ApiWsdl:
else:
group.fields.append(Field(
name = "expert_close",
#label = _("Press for advanced settings..."),
label = _("Press for advanced settings..."),
type = "label",
opt = Option(longopt="--bot",
metavalue="BOT"),

@ -18,6 +18,7 @@ from calculate.lib.datavars import ReadonlyVariable
from calculate.api.datavars import __version__,__app__
import api
import request
section = "api"

@ -0,0 +1,216 @@
#-*- coding: utf-8 -*-
# Copyright 2010-2012 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0 #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# mode - read only or writeable variable
# value - default variable value
# select - list of posible values for variable
# hide - flag, if it is True, then the variable is not printable
# printval - print value of variable
from calculate.lib.datavars import Variable,ReadonlyVariable,VariableError
import os
import sys
from os import path
import OpenSSL
from calculate.lib.cl_lang import setLocalTranslate
from calculate.api.cert_cmd import find_id_cert
setLocalTranslate('cl_api',sys.modules[__name__])
class VariableClReqId(Variable):
"""
Certificate Identification
"""
type = "choice"
value = ""
opt = ["-r"]
metavalue = "REQ_ID"
def init(self):
self.help = _("Request Identification")
self.label = _("Request Identification")
def choice(self):
data_path = self.Get('cl_api_data')
result = []
cert_dir = data_path + '/client_certs/'
for filename in glob.glob(cert_dir+"*"):
if filename.endswith('.csr'):
result.append(filename.split('.')[0])
return result
def check(self, req_id):
try:
int(req_id)
except ValueError:
raise VariableError(_("Group %s does not exist") %group)
class VariableClReqBaseData(Variable):
"""
"""
def get(self):
req_id = self.Get('cl_req_id')
serv_certbase = self.Get('cl_api_serv_database')
for line in readFileLine(serv_certbase):
if line.split()[0] == req_id:
return line.strip().split()
return ['']*7
class VariableClReqData(Variable):
"""
"""
def get(self):
req_id = self.Get('cl_req_id')
data_path = self.Get('cl_api_data')
req_file = data_path + '/client_certs/%s.csr' %req_id
if os.path.exists(req_file):
fp = open(req_file, 'r')
request = fp.read()
fp.close()
reqobj = OpenSSL.crypto.load_certificate_request \
(OpenSSL.SSL.FILETYPE_PEM, request)
Subject = reqobj.get_subject().get_components()
return Subject
return [['','']]*6
class VariableClReqIp(Variable):
"""
Ip Request
"""
def init(self):
self.help = _("Request Ip adress")
self.label = _("Request Ip")
def get(self):
return self.Get('cl_req_base_data')[4]
class VariableClReqMac(Variable):
"""
Mac Adress Request
"""
def init(self):
self.help = _("Request Mac adress")
self.label = _("Request Mac")
def get(self):
return self.Get('cl_req_base_data')[5]
class VariableClReqDate(Variable):
"""
Date send Request
"""
def init(self):
self.help = _("Request Date")
self.label = _("Request Date")
def get(self):
words = self.Get('cl_req_base_data')
return '%s %s' %(words[2], words[3])
class VariableClReqUserName(Variable):
"""
UserName Owner Request
"""
def init(self):
self.help = _("Request Owner UserName")
self.label = _("Request Owner UserName")
def get(self):
Subject = self.Get('cl_req_data')
for item in Subject:
if item[0] == 'OU':
return item[1]
return ''
class VariableClReqLocation(Variable):
"""
Location Owner Request
"""
def init(self):
self.help = _("Request Location")
self.label = _("Request Location")
def get(self):
Subject = self.Get('cl_req_data')
for item in Subject:
if item[0] == 'L':
return item[1]
return ''
class VariableClReqGroup(Variable):
"""
Certificate Group
"""
type = "choice"
value = ""
opt = ["-g"]
metavalue = "REQ_GROUP"
def init(self):
self.help = _("set certificate group")
self.label = _("Certificate group")
def choice(self):
group_rights = self.Get('cl_api_group_rights')
t = open(group_rights, 'r').read()
result = []
for line in t.splitlines():
words = line.split()
if not words[0].startswith('#'):
result.append(words[0])
return result
def check(self, group):
group_rights = self.Get('cl_api_group_rights')
t = open(group_rights, 'r').read()
for line in t.splitlines():
words = line.split()
if group == words[0]:
return
raise VariableError(_("Group %s does not exist") %group)
class VariableClCertActive(Variable):
"""
Certificate Identification
"""
type = "choice"
value = ""
opt = ["-c"]
metavalue = "CERT_ID"
def init(self):
self.help = _("set certificate group")
self.label = _("Certificate group")
def choice(self):
return [""]+self.Get('os_device_dev')
def check(self,value):
if self.Get('cl_autopartition_scheme') and not value:
raise VariableError(_("For autopartition need select install device"))
Loading…
Cancel
Save