You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-core/core/server/certificate.py

287 lines
12 KiB

#-*- coding: utf-8 -*-
# Copyright 2010-2012 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys, os
import subprocess
import OpenSSL
from soaplib.serializers.primitive import String, Integer, Boolean
from soaplib.serializers.clazz import Array, ClassSerializer
from soaplib.service import rpc
from calculate.core.server.api_types import ReturnedMessage
from calculate.core.server.api_types import (Field, GroupField,
ViewInfo, ViewParams)
from calculate.lib.datavars import VariableError
from calculate.core import datavars
import traceback
class CertificateInfo(ClassSerializer):
"""Parameters for method install"""
cl_cert_id = String
def catchExcept():
class wrapper:
def __init__(self,f):
self.f = f
self.func_name = f.func_name
self.func_code = f.func_code
self.__doc__ = f.__doc__
self.__name__ = f.__name__
def __call__(self,selfobj,*args,**kwargs):
try:
return self.f(selfobj,*args,**kwargs)
except BaseException as e:
view = ViewInfo(groups=[])
group = GroupField(name=_("Error"),last=True)
group.fields = []
group.fields.append(Field(
name = "error",
label = str(e),
default = 'color:red;',
element = "error"))
view.groups.append(group)
print "!!!!EXCEPTION!!!!"
for i in apply(traceback.format_exception, sys.exc_info()):
print i
return view
return wrapper
class CoreWsdl:
def view_cert_meth (self, dv):
""" rights for the selected certificate """
try:
cert_id = dv.Get('cl_cert_id')
data_path = dv.Get('cl_core_data')
group_rights = dv.Get('cl_core_group_rights')
rights = dv.Get('cl_core_rights')
# Помещение данных в словарь процессов
self.briefParams('cert_view')
if cert_id != 'all':
try:
cert_id_list = [int(cert_id)]
except:
self.printERROR("Error input certificate ID!")
return True
else:
cert_id_list = dv.Get('cl_all_cert_id')
for cert_id in map(lambda x: int(x), cert_id_list):
cert_file = data_path+'/client_certs/%s.crt' %str(cert_id)
if not os.path.exists(cert_file):
self.printERROR ("Certificate not found in server!")
return True
cert = open(cert_file, 'r').read()
#try:
certobj = OpenSSL.crypto.load_certificate \
(OpenSSL.SSL.FILETYPE_PEM, cert)
com = certobj.get_extension(certobj.get_extension_count()-1). \
get_data()
groups = com.split(':')[1]
groups_list = groups.split(',')
#except:
#return ['-1']
# if group = all and not redefined group all
if 'all' in groups_list:
find_flag = False
fd = open(group_rights, 'r')
t = fd.read()
# find all in group_rights file
for line in t.splitlines():
if line.split()[0] == 'all':
find_flag = True
break
if not find_flag:
self.printSUCCESS(_("Certificate %d can run all methods!") \
%cert_id)
else:
results = []
if not os.path.exists (group_rights):
open(group_rights, 'w')
with open(group_rights) as fd:
t = fd.read()
for line in t.splitlines():
12 years ago
words = line.split(' ',1)
# first word in line equal name input method
if words[0] in groups_list:
methods = words[1].split(',')
for i in methods:
results.append(i.strip())
results = uniq(results)
add_list_rights = []
del_list_rights = []
with open(rights) as fr:
t = fr.read()
for line in t.splitlines():
words = line.split()
meth = words[0]
for word in words:
try:
word = int(word)
except:
continue
# compare with certificat number
if cert_id == word:
# if has right
add_list_rights.append(meth)
if cert_id == -word:
del_list_rights.append(meth)
results += add_list_rights
results = uniq(results)
for method in results:
if method in del_list_rights:
results.remove(method)
if results == []:
results.append("No Methods")
self.printSUCCESS(_("Certificate %d can run: ") %cert_id)
for i in results:
self.printSUCCESS(i)
b = '<b>'
_b = '</b>'
self.printSUCCESS(b+_('Certificate groups')+':'+_b+'\n' + \
reduce (lambda x, y: x + ', \n' + y, groups_list))
self.printSUCCESS(b+_("Fingerprint = ") +_b+\
certobj.digest('SHA1'))
self.printSUCCESS(b+_("Serial Number = ") +_b +\
str(certobj.get_serial_number()))
self.printSUCCESS(' ')
Issuer = certobj.get_issuer().get_components()
self.printSUCCESS (b+_("\nIssuer") +_b)
for i in Issuer:
self.printSUCCESS ("%s : %s" %(i[0], i[1]))
Subject = certobj.get_subject().get_components()
self.printSUCCESS (b+_("\nSubject") +_b)
for item in Subject:
self.printSUCCESS ("%s : %s" %(item[0], item[1]))
self.printSUCCESS(' ')
self.printSUCCESS(' ')
return True
# Обработка сигнала прерывания работы процесса
except KeyboardInterrupt:
# Необходимо передать Fasle для сохранения данных о процессе
return False
############################################
def check_cert_params (self, dv, info,allvars=False,ordered=None):
errors = []
keys = sorted(filter(lambda x:x.lower() == x,
info._type_info.keys()))
if ordered:
keys = ordered + filter(lambda x:not x in ordered,
keys)
for var in keys:
# get value of variable from info
val = info.__getattribute__(var)
# check value if value send of check allvariables
if val != None or allvars:
try:
# if value not send, then get from datavars
if val == None:
val = dv.Get(var)
else:
uncomperr = dv.Uncompatible(var)
if uncomperr:
raise VariableError(uncomperr)
if not dv.Uncompatible(var):
dv.Set(var, val)
except VariableError, e:
mess = ''
messages = e.message if type(e.message) == list else [e.message]
for error in messages:
mess += str(error) + '\n'
errors.append(ReturnedMessage(type = 'error', field = var,
message = mess))
return errors
from calculate.core.server.baseClass import Basic
from calculate.core.server.decorators import Dec
def certificateCommon(self,sid,info,methodname):
"""
Install common method
"""
try:
dv = self.get_cache(sid,methodname,"vars")
if not dv:
#reload(cl_install)
dv = datavars.DataVarsCore()
dv.importCore()
dv.flIniFile()
errors = self.check_cert_params(dv, info,
ordered=['cl_cert_id'],
allvars=True)
if errors:
return errors
certificate_meth = type("certificateCommon",(self.Common,
CoreWsdl, object), {})
pid = self.startprocess(sid, target=certificate_meth,
method="view_cert_meth",
auto_delete = True,
args_proc = (dv,))
returnmess = ReturnedMessage(type = 'pid', message = pid,
expert = True)
returnmess.type = "pid"
returnmess.message = pid
dv = self.clear_cache(sid,methodname)
return [returnmess]
finally:
if dv:
self.set_cache(sid,methodname,"vars",dv,smart=False)
return []
@rpc(Integer, CertificateInfo, _returns = Array(ReturnedMessage))
@Dec.check_permissions(["view_cert"])
@Dec.console('view_cert')
@Dec.gui('Core', 'View Cert', 'certificate-server,application-certificate')
def view_cert ( self, sid, info):
if not info:
errors = []
mess = 'Certificate Identificator must be int or "all"!'
errors.append(ReturnedMessage(type = 'error', field = 'cl_cert_id',
message = mess))
return errors
dv = datavars.DataVarsCore()
dv.importCore()
dv.flIniFile()
dv.Set('cl_cert_id', info.cl_cert_id)
self.set_cache(sid, 'view_cert', "vars", dv, smart=False)
return self.certificateCommon(sid,info,'view_cert')
@rpc(Integer, ViewParams, _returns = ViewInfo)
@catchExcept()
def view_cert_view (self, sid, params):
dv = datavars.DataVarsCore()
dv.importCore()
dv.flIniFile()
dv.addGroup(_("Certificate View"),
normal=('cl_cert_id',),
next_label=_("View"))
view = ViewInfo (dv)
self.set_cache(sid, 'view_cert', "vars", dv, smart=False)
return view