You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-2.2-lib/pym/encrypt.py

604 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#-*- coding: utf-8 -*-
# Copyright 2008-2010 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
import hashlib
import crypt
import string
import time
from random import choice
from base64 import encodestring as _b64enc
b64encode = lambda x: _b64enc(x).rstrip()
try:
from smbpasswd import lmhash,nthash
except ImportError:
lmhash,nthash = None,None
from cl_print import color_print
# для создания сертификата
import pwd
from server.utils import execProg
import cl_overriding
import cl_lang
from cl_utils import genpassword, removeDir
# Перевод модуля
tr = cl_lang.lang()
tr.setLocalDomain('cl_lib')
tr.setLanguage(sys.modules[__name__])
class encrypt(color_print):
"""Класс хранения общих методов используемых для настройки сервисов
Методы шифрования, создания сертификатов и.т. д
"""
def __GenCryptSalt__(self, len=2):
"""Генерация соли для хеширования пароля (CRYPT)"""
chars = string.letters + string.digits + "./"
salt = ""
for i in range(len):
salt = salt + choice(chars)
return salt
def getHashPasswd(self, password, SecHashAlg):
"""Генерация хеша пароля,
Поддерживаемые алгоритмы шифрования пароля:
plain, md5, smd5, crypt, sha, ssha, lm, nt, shadow_ssha512,
shadow_ssha256, shadow_md5
"""
if not password:
self.printERROR(_("ERROR") + " getHashPasswd: " +
_("empty password"))
return False
hashPwd = ""
if SecHashAlg == "plain":
hashPwd = password
elif SecHashAlg == "md5":
h = hashlib.md5(password)
hashPwd = "{MD5}" + b64encode(h.digest())
elif SecHashAlg == "smd5":
salt = os.urandom(4)
h = hashlib.md5(password)
h.update(salt)
hashPwd = "{SMD5}" + b64encode(h.digest() + salt)
elif SecHashAlg == "shadow_ssha512":
salt = self.__GenCryptSalt__(8)
hashPwd = crypt.crypt(password, "$6$%s$"%salt)
elif SecHashAlg == "shadow_ssha256":
salt = self.__GenCryptSalt__(8)
hashPwd = crypt.crypt(password, "$5$%s$"%salt)
elif SecHashAlg == "shadow_md5":
salt = self.__GenCryptSalt__(8)
hashPwd = crypt.crypt(password, "$1$%s$"%salt)
elif SecHashAlg == "crypt":
salt = self.__GenCryptSalt__()
hashPwd = "{CRYPT}" + crypt.crypt(password, salt)
elif SecHashAlg == "sha":
h = hashlib.sha1(password)
hashPwd = "{SHA}" + b64encode(h.digest())
elif SecHashAlg == "ssha":
salt = os.urandom(4)
h = hashlib.sha1(password)
h.update(salt)
hashPwd = "{SSHA}" + b64encode(h.digest() + salt)
elif SecHashAlg == "lm" and lmhash:
hashPwd = lmhash(password)
elif SecHashAlg == "nt" and nthash:
hashPwd = nthash(password)
else:
if SecHashAlg in ("lm","nt"):
self.printERROR(_("ERROR") + " getHashPasswd: " +
(_("Failed to support '%s' crypto algorithm")
%SecHashAlg) + " " + _("without py-smbpasswd"))
else:
self.printERROR(_("ERROR") + " getHashPasswd: " +
_("Failed to support '%s' crypto algorithm")
%SecHashAlg)
return False
return hashPwd
class certificate(color_print):
sslCountry = "RU"
sslState = "Saint-Petersburg"
sslLocality = "Saint-Petersburg"
sslOrganization = "Calculate Ltd."
sslUnit = "SSL"
sslCommonName = "localhost"
sslEmail = "support@calculate.ru"
nsCertType = "server"
sslDays = 3653
sslBits = 1024
userName = "root"
fileMode = 0400
certsDir = "/var/calculate/ssl"
tmpDir = os.path.join(certsDir,"tmp")
certFile = os.path.join(tmpDir, "server.crt")
keyFile = os.path.join(tmpDir, "server.key")
csrFile = os.path.join(tmpDir, "server.csr")
CAPath = os.path.join(certsDir,"main")
CACertFileName = "CA.crt"
CAKeyFileName = "CA.key"
CACrlFileName = "CA.crl"
rCACertPath = "crt"
rCAKeyPath = "key"
rCACrlPath = "crl"
rSerialFileName = "serial"
rDatabaseFileName = "index.dat"
sslFile = "/usr/bin/openssl"
templCnfCA = """[ ca ]
default_ca = CA_default
[ CA_default ]
dir = %(CAPath)s
certs = $dir/%(rCACertPath)s
crl_dir = $dir/%(rCACrlPath)s
database = $dir/%(rDatabaseFileName)s
new_certs_dir = $dir/%(rCACertPath)s
certificate = $dir/%(rCACertFile)s
serial = $dir/%(rSerialFileName)s
crlnumber = $dir/crlnumber
crl = $dir/%(rCACrlFile)s
private_key = $dir/%(rCAKeyFile)s
RANDFILE = $dir/%(rRandFile)s
x509_extensions = usr_cert
name_opt = ca_default
cert_opt = ca_default
default_days = 365
default_crl_days= 30
default_md = default
preserve = no
policy = policy_match
[ policy_match ]
countryName = match
stateOrProvinceName = match
organizationName = match
organizationalUnitName = optional
commonName = supplied
emailAddress = optional
[ policy_anything ]
countryName = optional
stateOrProvinceName = optional
localityName = optional
organizationName = optional
organizationalUnitName = optional
commonName = supplied
emailAddress = optional
[ usr_cert ]
basicConstraints=CA:FALSE
subjectKeyIdentifier=hash
authorityKeyIdentifier=keyid,issuer
[ req ]
prompt = no
default_bits = %(sslBits)s
x509_extensions = v3_ca
string_mask = utf8only
distinguished_name = req_dn
[ req_dn ]
C = %(sslCountry)s
ST = %(sslState)s
L = %(sslLocality)s
O = %(sslOrganization)s
OU = %(sslUnit)s
CN = %(sslCommonName)s
emailAddress = %(sslEmail)s
[ cert_type ]
nsCertType = %(nsCertType)s
[ v3_ca ]
subjectKeyIdentifier=hash
authorityKeyIdentifier=keyid:always,issuer
basicConstraints = CA:true
"""
templCnfData = """[ req ]
prompt = no
default_bits = %(sslBits)s
distinguished_name = req_dn
[ req_dn ]
C = %(sslCountry)s
ST = %(sslState)s
L = %(sslLocality)s
O = %(sslOrganization)s
OU = %(sslUnit)s
CN = %(sslCommonName)s
emailAddress = %(sslEmail)s
[ cert_type ]
nsCertType = %(nsCertType)s
"""
templCreateKey = "%(sslFile)s genrsa -out '%(certKeyFile)s' %(sslBits)s"
templCreateCert = "%(sslFile)s req -new -x509 -days %(sslDays)s "\
"-config '%(cnfFile)s' -key '%(certKeyFile)s' "\
"-out '%(certFile)s'"
templCreateReq = "%(sslFile)s req -new -days %(sslDays)s "\
"-config '%(cnfFile)s' -key '%(certKeyFile)s' "\
"-out '%(certCsrFile)s'"
templCreateSignCert = "%(sslFile)s ca -batch -config '%(cnfFile)s' "\
"-policy policy_anything -days '%(sslDays)s' "\
"-out '%(certFile)s' -infiles '%(certCsrFile)s'"
def createCnfFile(self, textCnf):
'''Generate openssl.cnf file'''
if not os.path.exists(self.sslFile):
self.printERROR(_("%s not found")%self.sslFile)
return False
strData = time.strftime("%Y%m%d%H%M%S",time.localtime(time.time()))
cnfFileName = "%s.%s.cnf" %(strData,genpassword())
if not os.path.exists(self.tmpDir):
self._createDir(self.tmpDir)
cnfFile = os.path.join(self.tmpDir,cnfFileName)
self._createFile(cnfFile, textCnf)
return cnfFile
def checkCertificate(self, certFile):
# Проверка сертификата
textLine = execProg("%s x509 -subject -fingerprint -noout -in '%s'"
%(self.sslFile, certFile))
if textLine is False:
self.printERROR(_("Error checking certificate %s")%certFile)
return False
return True
def createCertificateAutority(self,sslCountry=sslCountry,
sslState=sslState,
sslLocality=sslLocality,
sslOrganization=sslOrganization,
sslUnit=sslUnit,
sslCommonName=sslCommonName,
sslEmail=sslEmail,
nsCertType=nsCertType,
sslDays=sslDays,
sslBits=sslBits,
userName=userName,
CAPath=CAPath,
CACertFileName=CACertFileName,
CAKeyFileName=CAKeyFileName,
CACrlFileName=CACrlFileName,
fileMode=fileMode,
force=False):
'''Create CA'''
rCACertFile = os.path.join(self.rCACertPath, CACertFileName)
rCAKeyFile = os.path.join(self.rCAKeyPath, CAKeyFileName)
rCACrlFile = os.path.join(self.rCACrlPath, CACrlFileName)
rRandFile = os.path.join(self.rCAKeyPath,".rnd")
CACertFile = os.path.join(CAPath, rCACertFile)
CAKeyFile = os.path.join(CAPath, rCAKeyFile)
# Cerificates exists
if not force and os.path.isfile(CACertFile) and\
os.path.isfile(CAKeyFile):
return True
# получаем id и gid пользователя
try:
pwdObj = pwd.getpwnam(userName)
except:
self.printERROR(_("User %s not found")%userName)
return False
uid = pwdObj.pw_uid
gid = pwdObj.pw_gid
# delete certificate dir
if os.path.isdir(CAPath):
removeDir(CAPath)
# create certificate dirs
self._createDir(CAPath)
CACertPath = os.path.join(CAPath, self.rCACertPath)
CAKeyPath = os.path.join(CAPath, self.rCAKeyPath)
CACrlPath = os.path.join(CAPath, self.rCACrlPath)
for createDir in [CACertPath, CAKeyPath, CACrlPath]:
self._createDir(createDir)
# save serial number
SerialFile = os.path.join(CAPath, self.rSerialFileName)
self._createFile(SerialFile, "01\n")
# create database file
DatabaseFile = os.path.join(CAPath, self.rDatabaseFileName)
self._createFile(DatabaseFile, "")
textCnf = self.templCnfCA%{'CAPath':CAPath,
'rCACertPath':self.rCACertPath,
'rCACrlPath':self.rCACrlPath,
'rDatabaseFileName':self.rDatabaseFileName,
'rCACertFile':rCACertFile,
'rSerialFileName':self.rSerialFileName,
'rCACrlFile':rCACrlFile,
'rCAKeyFile':rCAKeyFile,
'rRandFile':rRandFile,
'sslBits':sslBits,
'sslCountry':sslCountry,
'sslState':sslState,
'sslLocality':sslLocality,
'sslOrganization':sslOrganization,
'sslUnit':sslUnit,
'sslCommonName':sslCommonName,
'sslEmail':sslEmail,
'nsCertType':nsCertType}
cnfFile = self.createCnfFile(textCnf)
if cnfFile is False:
return False
# generate CA RSA key
execStr = self.templCreateKey%{'sslFile':self.sslFile,
'certKeyFile':CAKeyFile,
'sslBits':sslBits}
if execProg(execStr) is False:
self.printERROR(_("Can not execute '%s'")%execStr)
return False
if os.path.exists(CAKeyFile):
os.chown(CAKeyFile, uid,gid)
os.chmod(CAKeyFile, fileMode)
# create CA
execStr = self.templCreateCert%{'sslFile':self.sslFile,
'sslDays':sslDays,
'cnfFile':cnfFile,
'certKeyFile':CAKeyFile,
'certFile':CACertFile}
if execProg(execStr) is False:
self.printERROR(_("Can not execute '%s'")%execStr)
return False
if os.path.exists(CACertFile):
os.chown(CACertFile, uid,gid)
os.chmod(CACertFile, fileMode)
if os.path.exists(cnfFile):
os.remove(cnfFile)
# check certificate
return self.checkCertificate(CACertFile)
def createSignedCertificate(self,
sslCountry=sslCountry,
sslState=sslState,
sslLocality=sslLocality,
sslOrganization=sslOrganization,
sslUnit=sslUnit,
sslCommonName=sslCommonName,
sslEmail=sslEmail,
nsCertType=nsCertType,
sslDays=sslDays,
sslBits=sslBits,
userName=userName,
CAPath=CAPath,
CACertFileName=CACertFileName,
CAKeyFileName=CAKeyFileName,
CACrlFileName=CACrlFileName,
certFile=certFile,
fileMode=fileMode,
keyFile=keyFile,
csrFile=csrFile,
force=False):
'''Create signed CA certificate'''
certAndKeyFiles = [certFile, keyFile, csrFile]
foundCertFiles = map(lambda x: os.path.exists(x), certAndKeyFiles)
if not force and foundCertFiles[0] and foundCertFiles[1]:
return True
foundCertFiles = map(lambda x: x[1], filter(lambda x: x[0],
zip(foundCertFiles, certAndKeyFiles)))
# Удаляем файлы
map(lambda x: os.remove(x), foundCertFiles)
# получаем id и gid пользователя
try:
pwdObj = pwd.getpwnam(userName)
except:
self.printERROR(_("User %s not found")%userName)
return False
uid = pwdObj.pw_uid
gid = pwdObj.pw_gid
# create dirs
for fileName in certAndKeyFiles:
dirName = os.path.split(fileName)[0]
if not os.path.exists(dirName):
self._createDir(dirName, uid=uid, gid=gid)
rCACertFile = os.path.join(self.rCACertPath, CACertFileName)
rCAKeyFile = os.path.join(self.rCAKeyPath, CAKeyFileName)
rCACrlFile = os.path.join(self.rCACrlPath, CACrlFileName)
rRandFile = os.path.join(self.rCAKeyPath,".rnd")
textCnf = self.templCnfCA%{'CAPath':CAPath,
'rCACertPath':self.rCACertPath,
'rCACrlPath':self.rCACrlPath,
'rDatabaseFileName':self.rDatabaseFileName,
'rCACertFile':rCACertFile,
'rSerialFileName':self.rSerialFileName,
'rCACrlFile':rCACrlFile,
'rCAKeyFile':rCAKeyFile,
'rRandFile':rRandFile,
'sslBits':sslBits,
'sslCountry':sslCountry,
'sslState':sslState,
'sslLocality':sslLocality,
'sslOrganization':sslOrganization,
'sslUnit':sslUnit,
'sslCommonName':sslCommonName,
'sslEmail':sslEmail,
'nsCertType':nsCertType}
cnfFile = self.createCnfFile(textCnf)
if cnfFile is False:
return False
# generate RSA key
execStr = self.templCreateKey%{'sslFile':self.sslFile,
'certKeyFile':keyFile,
'sslBits':sslBits}
if execProg(execStr) is False:
self.printERROR(_("Can not execute '%s'")%execStr)
return False
if os.path.exists(keyFile):
os.chown(keyFile, uid,gid)
os.chmod(keyFile, fileMode)
# generate request
execStr = self.templCreateReq%{'sslFile':self.sslFile,
'sslDays':sslDays,
'cnfFile':cnfFile,
'certKeyFile':keyFile,
'certCsrFile':csrFile}
if execProg(execStr) is False:
self.printERROR(_("Can not execute '%s'")%execStr)
return False
if os.path.exists(csrFile):
os.chown(csrFile, uid,gid)
os.chmod(csrFile, fileMode)
# set database attribute
databaseAttrFileName = os.path.join(CAPath, "index.dat.attr")
self._createFile(databaseAttrFileName, "unique_subject = no\n")
# generate signed cerificate
execStr = self.templCreateSignCert%{'sslFile':self.sslFile,
'sslDays':sslDays,
'cnfFile':cnfFile,
'certFile':certFile,
'certCsrFile':csrFile}
if execProg(execStr) is False:
self.printERROR(_("Can not execute '%s'")%execStr)
return False
if os.path.exists(certFile):
os.chown(certFile, uid,gid)
os.chmod(certFile, fileMode)
if os.path.exists(cnfFile):
os.remove(cnfFile)
# check certificate
return self.checkCertificate(certFile)
def createCertificate(self,
sslCountry=sslCountry,
sslState=sslCountry,
sslLocality=sslLocality,
sslOrganization=sslOrganization,
sslUnit=sslUnit,
sslCommonName=sslCommonName,
sslEmail=sslEmail,
nsCertType=nsCertType,
sslDays=sslDays,
sslBits=sslBits,
userName=userName,
certFile=certFile,
fileMode=fileMode,
keyFile=keyFile):
"""Создает сертификат"""
certAndKeyFiles = [certFile, keyFile]
foundCertFiles = filter(lambda x: os.path.exists(x), certAndKeyFiles)
if len(foundCertFiles)==2:
return True
# Удаляем файл сертификата
map(lambda x: os.remove(x), foundCertFiles)
# получаем id и gid пользователя
try:
pwdObj = pwd.getpwnam(userName)
except:
self.printERROR(_("User %s not found")%userName)
return False
uid = pwdObj.pw_uid
gid = pwdObj.pw_gid
textCnf=self.templCnfData%{'sslBits':sslBits,
'sslCountry':sslCountry,
'sslState':sslState,
'sslLocality':sslLocality,
'sslOrganization':sslOrganization,
'sslUnit':sslUnit,
'sslCommonName':sslCommonName,
'sslEmail':sslEmail,
'nsCertType':nsCertType}
cnfFile = self.createCnfFile(textCnf)
if cnfFile is False:
return False
# Cоздание директорий
for fileName in certAndKeyFiles:
dirName = os.path.split(fileName)[0]
if not os.path.exists(dirName):
self._createDir(dirName, uid=uid, gid=gid)
# Создание сертификата
textLine = execProg("%s req -new -x509 -nodes -config '%s'"
"-days %s -out '%s'-keyout '%s'"
%(self.sslFile, cnfFile, sslDays, certFile,
keyFile))
if textLine is False:
self.printERROR(_("Failed to create certificate %s")%certFile)
return False
# Удаление конфигурационного файла
if os.path.exists(cnfFile):
os.remove(cnfFile)
# Меняем права
if os.path.exists(certFile):
os.chown(certFile, uid,gid)
os.chmod(certFile, fileMode)
if os.path.exists(keyFile):
os.chown(keyFile, uid,gid)
os.chmod(keyFile, fileMode)
return self.checkCertificate(certFile)
def _createDir(self, dirName, uid=0, gid=0, mode=0700):
"""Создание пользовательской директории"""
if not os.path.exists(dirName):
os.makedirs(dirName)
if mode:
os.chmod(dirName,mode)
os.chown(dirName,uid,gid)
return True
else:
self.printERROR(_("Path %s exists") %dirName)
return False
def _createFile(self, fileName, fileTxt, uid=0, gid=0, mode=0600):
"""Создает пользовательский файл с содержимым
Если директория файла не существует то ошибка
"""
dirName = os.path.split(fileName)[0]
if not os.path.exists(dirName):
self.printERROR(_("Path %s does not exist") %dirName)
return False
fd = os.open(fileName, os.O_CREAT)
os.close(fd)
os.chmod(fileName, 0600)
os.chown(fileName,uid,gid)
if fileTxt:
FD = open(fileName, "r+")
FD.write(fileTxt)
FD.close()
os.chmod(fileName, mode)
return True
def getHash(password, encr):
"""Получить хеш пароля
password - пароль
encr - алгоритм шифрования, например 'ssha'
"""
encryptObj = encrypt()
hashPwd = encryptObj.getHashPasswd(password, encr.lower())
if hashPwd:
return hashPwd
else:
encryptObj.printERROR(_("Error encrypt password, "
"method getHash()"))
cl_overriding.exit(1)