You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-2.2-lib/pym/cl_ldap.py

241 lines
10 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#-*- coding: utf-8 -*-
# Copyright 2008-2010 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ldap
from cl_utils import _error
class ldapFun(_error):
'''Объект для работы с LDAP сервером
подключение к серверу и поиск данных
'''
def __init__(self, dnUser, password, host="localhost"):
self.conLdap = False
# Получаем соединение с LDAP
try:
self.conLdap = self.__ldapConnect(dnUser, password, host)
except ldap.LDAPError, e:
self.setError(e[0]['desc'])
def __ldapConnect(self, dnUser, password, host):
"""Соединение с LDAP сервером"""
conLdap = ldap.initialize('ldap://%s'%host)
conLdap.simple_bind_s(dnUser, password)
return conLdap
def ldapSearch(self,baseDN, searchScope, searchFilter, retrieveAttributes):
try:
ldap_result_id = self.conLdap.search(baseDN, searchScope,
searchFilter,
retrieveAttributes)
result_set = []
while 1:
result_type, result_data = self.conLdap.result(ldap_result_id,
0)
if (result_data == []):
break
else:
if result_type == ldap.RES_SEARCH_ENTRY:
result_set.append(result_data)
except ldap.NO_SUCH_OBJECT:
return []
except:
return False
return result_set
class ldapUser(_error):
"""Получение данных для пользователя из LDAP"""
# Данные из /etc/ldap.conf
_dictData = {}
# Объект LDAP
ldapObj = False
# Подключение к LDAP
conLdap = False
def getDataInLdapConf(self, bindData=True, cache=True):
"""Получение данных из /etc/ldap.conf"""
data = [("host",'host'),
("usersDN",'nss_base_passwd'),
("groupsDN",'nss_base_group')]
if bindData:
data += [("bindDn",'binddn'), ("bindPw",'bindpw')]
namesData = map(lambda x: x[0], data)
# Данные из кеша, если он есть
if cache and self._dictData and\
set(namesData)<=set(self._dictData.keys()):
return self._dictData
fileName = "/etc/ldap.conf"
getStrList = lambda x: reduce(lambda x,y: [x,y.upper()],([x]*2))
workData = map(lambda x: (x[0],getStrList(x[1]),len(x[1])), data)
dictData = {}
splList = (" ", "\t")
try:
for line in open(fileName):
for name, keys, lenKey in workData:
if not name in dictData.keys() and\
filter(lambda x: line.startswith(x), keys) and\
len(line)>lenKey:
spl = line[lenKey]
if spl in splList:
if not name in dictData:
dictData[name] = []
if name in ("usersDN", "groupsDN"):
dictData[name].append(line.rpartition(spl)[2].\
partition('?')[0].strip())
else:
dictData[name].append(line.rpartition(spl)[2].\
strip())
except:
self.setError(_("Can not open %s")%fileName)
return False
if set(dictData.keys()) == set(namesData):
# Кеширование данных
if cache:
self._dictData.clear()
self._dictData.update(dictData)
return dictData
else:
return {}
def getBindConnectData(self):
"""Получение данных для соединения с LDAP bind пользователем"""
configdata = self.getDataInLdapConf()
if configdata:
bindDn = configdata["bindDn"][0]
bindPw = configdata["bindPw"][0]
host = configdata["host"][0]
return bindDn, bindPw, host
return False
def getUsersDN(self):
"""Получение DN пользователей"""
configdata = self.getDataInLdapConf(bindData=False)
if configdata:
usersDN = self._dictData["usersDN"][0]
return usersDN
return False
def getHost(self):
"""Получение LDAP хоста"""
configdata = self.getDataInLdapConf(bindData=False)
if configdata:
host = configdata["host"][0]
return host
return False
def getGroupsDN(self):
"""Получение списка DN групп"""
configdata = self.getDataInLdapConf(bindData=False)
if configdata:
groupsDNs = self._dictData["groupsDN"]
return groupsDNs
return False
def getUserLdapInfo(self, userName, shadowAttr=False):
"""Выдаем информацию о пользователе из LDAP"""
connectData = self.getBindConnectData()
if not connectData:
return {}
bindDn, bindPw, host = connectData
usersDN = self.getUsersDN()
groupsDNs = self.getGroupsDN()
# Соединяемся с LDAP
if not self.ldapConnect(bindDn, bindPw, host):
return False
searchUser = self.ldapObj.ldapSearch(usersDN, ldap.SCOPE_ONELEVEL,
"uid=%s" %userName, None)
if not searchUser:
return False
convertDict = {'uid':('user','uidNumber'),
'gid':('user','gidNumber'),
'fullName':('user','cn'),
'mail':('user','mail'),
'jid':('user','registeredAddress'),
'home':('user','homeDirectory'),
'group':('group','cn')}
if shadowAttr:
convertDict.update({'loginShell':('user','loginShell'),
'shadowLastChange':('user','shadowLastChange'),
'shadowMin':('user','shadowMin'),
'shadowMax':('user','shadowMax'),
'shadowWarning':('user','shadowWarning'),
'shadowExpire':('user','shadowExpire'),
'shadowFlag':('user','shadowFlag'),
'groups':('group','memberUid')})
listUserAttr = map(lambda x: x[0],
filter(lambda x: x[1][0]=="user",
convertDict.items()))
listGroupAttr = map(lambda x: x[0],
filter(lambda x: x[1][0]=="group",
convertDict.items()))
uid = ""
gid = ""
dictOut = {}
for dictAttr in listUserAttr:
ldapAttr = convertDict[dictAttr][1]
if ldapAttr in searchUser[0][0][1]:
dictOut[dictAttr] = searchUser[0][0][1][ldapAttr][0]
else:
dictOut[dictAttr] = ""
if dictAttr == 'uid':
uid = dictOut[dictAttr]
if dictAttr == 'gid':
gid = dictOut[dictAttr]
if gid:
for dictAttr in listGroupAttr:
searchGroup = []
ldapAttr = convertDict[dictAttr][1]
if dictAttr == "group":
for groupDN in groupsDNs:
searchGroup = self.ldapObj.ldapSearch(groupDN,
ldap.SCOPE_ONELEVEL,
"gidNumber=%s" %gid, None)
if searchGroup:
break
if searchGroup:
if ldapAttr in searchGroup[0][0][1]:
dictOut[dictAttr]=searchGroup[0][0][1][ldapAttr][0]
else:
dictOut[dictAttr] = ""
else:
dictOut[dictAttr] = ""
elif dictAttr == "groups":
userGroupsData = []
for groupDN in groupsDNs:
searchGroup = self.ldapObj.ldapSearch(groupDN,
ldap.SCOPE_ONELEVEL,
"%s=%s" %(ldapAttr,userName),
["cn","gidNumber"])
if searchGroup:
userGroupsData += map(lambda x: (x[0][1]["cn"][0],
x[0][1]["gidNumber"][0]),
searchGroup)
dictOut[dictAttr] = userGroupsData
if uid and gid:
return dictOut
else:
return {}
def ldapConnect(self, bindDn, bindPw, host):
"""Подключение к LDAP"""
if not self.ldapObj:
ldapObj = ldapFun(bindDn, bindPw, host)
if ldapObj.getError():
return False
# Устанавливаем у объекта соединение и объект LDAP функций
self.ldapObj = ldapObj
self.conLdap = ldapObj.conLdap
return True