選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。
calculate-utils-3-unix/pym/unix/unix.py

942 行
34 KiB

# -*- coding: utf-8 -*-
# Copyright 2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
8年前
from collections import namedtuple
import sys
8年前
from calculate.lib.utils.files import readLinesFile, listDirectory
from calculate.ldap.ldap import Ldap
from calculate.lib.utils.common import getPagesInterval
8年前
import os
from os import path
import shutil
_ = lambda x: x
from calculate.lib.cl_lang import (setLocalTranslate, getLazyLocalTranslate)
8年前
from calculate.lib.cl_ldap import ldap, LDAPConnectError, LDAPBadSearchFilter
8年前
setLocalTranslate('cl_unix3', sys.modules[__name__])
__ = getLazyLocalTranslate(_)
8年前
8年前
class UnixError(Exception):
pass
8年前
UnixGroup = namedtuple("UnixGroup", ['group_name', 'gid', 'user_list',
'comment'])
class GroupFile(object):
def __init__(self, group_fn='/etc/group'):
self.group_fn = group_fn
def __iter__(self):
for line in (x for x in readLinesFile(self.group_fn)
if not x.startswith("#")):
data = line.strip().split(':')[:4]
if len(data) >= 4 and data[2].isdigit():
yield UnixGroup(data[0], int(data[2]),
filter(None, data[3].split(",")),
"/etc/group")
class UnixGroups(object):
"""
Набор методов для нахождения параметров группы и сопаставления GID,
названия
"""
groups = GroupFile()
exception = UnixError
ldap_connect = None
groups_dn = ""
def __init__(self, ldap_connect, groups_dn):
self.ldap_connect = ldap_connect
self.groups_dn = groups_dn
def search_system_group_name(self, name):
for group in (x for x in self.groups if x.group_name == name):
return group
else:
return None
def search_system_group_id(self, gid):
for group in (x for x in self.groups if x.gid == gid):
return group
else:
return None
def iterate_ldap_group(self, search_filter, offset=None, count=None):
ldap_connect = self.ldap_connect
if ldap_connect:
base_dn = self.groups_dn
if offset is not None:
below = lambda x: x < offset
else:
below = lambda x: False
if count is not None:
above = lambda x: x >= offset + count
else:
above = lambda x: False
for i, group in enumerate(x[0][1] for x in ldap_connect.ldap_search(
base_dn, ldap.SCOPE_ONELEVEL, search_filter)):
if below(i):
continue
if above(i):
break
yield UnixGroup(
group['cn'][0],
int(group['gidNumber'][0]),
sorted(group.get('memberUid', [])),
8年前
group['description'][0])
def search_ldap_group(self, search_filter):
for group in self.iterate_ldap_group(search_filter):
return group
return None
def search_ldap_group_name(self, value):
return self.search_ldap_group("cn=%s" % value)
def gid_to_name(self, value):
if value.isdigit():
value = int(value)
group = (self.search_ldap_group("gidNumber=%d" % value) or
self.search_system_group_id(value))
if group:
return group.group_name
return value
8年前
def _groupname(self, name, comment):
if comment != name:
return "{0} ({1})".format(name, comment)
else:
return name
8年前
def ldap_group_list(self):
groups = list(x for x in self.iterate_ldap_group("cn=*"))
8年前
return [(x.group_name, self._groupname(x.group_name, x.comment))
8年前
for x in sorted(groups, key=lambda k: k.gid)]
def ldap_group_names(self):
return list(x.group_name for x in self.iterate_ldap_group("cn=*"))
def check_group(self, value):
ldap_group = self.search_ldap_group_name(value)
unix_group = ldap_group or self.search_system_group_name(value)
if not unix_group:
raise self.exception(_("Group %s not found") % value)
if not ldap_group and unix_group:
raise self.exception(
_("You should create group {groupname} ({groupid}) "
"in Unix service").format(groupname=unix_group.group_name,
groupid=unix_group.gid))
8年前
def group_dn(self, group):
return "cn=%s,%s" % (group, self.groups_dn)
def add_users_to_group(self, users, group):
try:
self.ldap_connect.ldap_modify_attrs(
self.group_dn(group),
((ldap.MOD_ADD, 'memberUid', user) for user in users))
except LDAPConnectError as e:
raise UnixError(_("Failed to modify group in LDAP: %s") % str(e))
def update_group_id(self, group, gid):
try:
self.ldap_connect.ldap_modify_attrs(
self.group_dn(group),
[(ldap.MOD_REPLACE, 'gidNumber', gid)])
except LDAPConnectError as e:
raise UnixError(_("Failed to update group ID in LDAP: %s") % str(e))
8年前
def update_group_comment(self, group, comment):
try:
self.ldap_connect.ldap_modify_attrs(
self.group_dn(group),
[(ldap.MOD_REPLACE, 'description', comment)])
except LDAPConnectError as e:
raise UnixError(
_("Failed to update group comment in LDAP: %s") % str(e))
def remove_users_from_group(self, users, group):
try:
self.ldap_connect.ldap_modify_attrs(
self.group_dn(group),
((ldap.MOD_DELETE, 'memberUid', user) for user in users))
except LDAPConnectError as e:
raise UnixError(
_("Failed to remove users from group in LDAP: %s") % str(e))
def rename_group(self, oldname, newname):
try:
old_dn = self.group_dn(oldname)
new_dn = "cn=%s" % newname
self.ldap_connect.ldap_modify_dn(old_dn, new_dn)
except LDAPConnectError as e:
raise UnixError(
_("Failed to rename group in LDAP: %s") % str(e))
8年前
def remove_group(self, group):
try:
self.ldap_connect.ldap_remove_dn(self.group_dn(group))
except LDAPConnectError as e:
raise UnixError(
_("Failed to remove the group in LDAP: %s") % str(e))
UnixUser = namedtuple(
"UnixUser", ['username', 'uid', 'gid', 'comment', 'homedir', 'shell',
8年前
'visible', 'lock', 'pass_set'])
8年前
class PasswdFile(object):
def __init__(self, passwd_fn='/etc/passwd'):
self.passwd_fn = passwd_fn
def __iter__(self):
for line in (x for x in readLinesFile(self.passwd_fn)
if not x.startswith("#")):
data = line.strip().split(':')[:7]
if len(data) >= 7 and data[2].isdigit() and data[3].isdigit():
yield UnixUser(data[0], int(data[2]),
int(data[3]), data[4], data[5], data[6], True,
8年前
False, False)
8年前
class UnixUsers(object):
"""
Набор методов для нахождения параметров пользователя и сопаставления UID,
названия
"""
passwd = PasswdFile()
exception = UnixError
ldap_connect = None
users_dn = ""
DeletedPassword = "crypt{xxx}"
def __init__(self, ldap_connect, users_dn):
self.ldap_connect = ldap_connect
self.users_dn = users_dn
def search_system_user_name(self, name):
for user in (x for x in self.passwd if x.username == name):
return user
else:
return None
def search_system_user_id(self, uid):
uid = int(uid)
for user in (x for x in self.passwd if x.uid == uid):
return user
else:
return None
def iterate_ldap_user(self, search_filter, offset=None, count=None):
ldap_connect = self.ldap_connect
if ldap_connect:
base_dn = self.users_dn
if offset is not None:
below = lambda x: x < offset
else:
below = lambda x: False
if count is not None:
above = lambda x: x >= offset + count
else:
above = lambda x: False
for i, user in enumerate(x[0][1] for x in ldap_connect.ldap_search(
base_dn, ldap.SCOPE_ONELEVEL, search_filter)):
if below(i):
continue
if above(i):
break
yield UnixUser(
user['uid'][0],
int(user['uidNumber'][0]),
int(user['gidNumber'][0]),
user['cn'][0],
user['homeDirectory'][0],
user['loginShell'][0],
self.flag_to_visible(user['shadowFlag'][0]),
self.flag_to_lock(user['shadowExpire'][0]),
self.has_password(user['userPassword'][0])
if 'userPassword' in user else None,
8年前
)
def search_ldap_user(self, search_filter):
for user in self.iterate_ldap_user(search_filter):
return user
return None
8年前
def has_password(self, password):
if password and password != self.DeletedPassword:
return True
return False
8年前
def search_ldap_user_name(self, value):
return self.search_ldap_user("uid=%s" % value)
def search_ldap_user_id(self, value):
return self.search_ldap_user("uidNumber=%s" % value)
def get_primary_gids(self):
ldap_connect = self.ldap_connect
if ldap_connect:
base_dn = self.users_dn
return list(ldap_connect.ldap_simple_search(
base_dn, "uid=*", "gidNumber"))
return []
def get_password_hash(self, username):
ldap_connect = self.ldap_connect
if ldap_connect:
base_dn = self.users_dn
for pw in ldap_connect.ldap_simple_search(
self.users_dn, "uid=%s" % username, "userPassword"):
return pw
return ""
def uid_to_name(self, value):
if value.isdigit():
value = int(value)
user = (self.search_ldap_user("uidNumber=%d" % value) or
self.search_system_user_id(value))
if user:
return user.username
return value
8年前
def _username(self, name, comment):
if comment != name:
return "{0} ({1})".format(name, comment)
else:
return name
8年前
def ldap_user_list(self):
users = list(x for x in self.iterate_ldap_user("uid=*"))
8年前
return [(x.username, self._username(x.username, x.comment))
8年前
for x in sorted(users, key=lambda y: y.uid)]
def ldap_user_names(self):
return list(x.username for x in self.iterate_ldap_user("uid=*"))
# def check_user(self, value):
# ldap_user = self.search_ldap_user_name(value)
# unix_user = ldap_user or self.search_system_user_name(value)
# if not unix_user:
# raise VariableError(_("User %s not found") % value)
# if not ldap_user and unix_user:
# raise VariableError(
# _("You should create group %s (%d) in Unix service") %
# (unix_group.group_name, unix_group.gid))
def user_dn(self, user):
return "uid=%s,%s" % (user, self.users_dn)
def update_user_group_id(self, user, gid):
try:
self.ldap_connect.ldap_modify_attrs(
self.user_dn(user),
[(ldap.MOD_REPLACE, 'gidNumber', gid)])
except LDAPConnectError as e:
raise UnixError(
_("Failed to modify user primary group in LDAP: %s") % str(e))
def visible_to_flag(self, visible):
return "1" if visible else "0"
def flag_to_visible(self, flag):
return flag == "1"
def lock_to_flag(self, lock):
return "1" if lock else "-1"
def flag_to_lock(self, flag):
return flag == "1"
def modify_user(self, username, pw=None, gid=None, shell=None, visible=None,
lock=None, comment=None, homedir=None):
8年前
attributes = []
if pw is not None:
attributes.append((ldap.MOD_REPLACE, 'userPassword', str(pw)))
if gid is not None:
attributes.append((ldap.MOD_REPLACE, 'gidNumber', str(gid)))
if shell is not None:
attributes.append((ldap.MOD_REPLACE, 'loginShell', shell))
if comment is not None:
attributes.append((ldap.MOD_REPLACE, 'cn', comment))
if homedir is not None:
attributes.append((ldap.MOD_REPLACE, 'homeDirectory', homedir))
8年前
if visible is not None:
attributes.append(
(ldap.MOD_REPLACE, 'shadowFlag',
self.visible_to_flag(visible)))
if lock is not None:
attributes.append(
(ldap.MOD_REPLACE, 'shadowExpire', self.lock_to_flag(lock)))
try:
self.ldap_connect.ldap_modify_attrs(
self.user_dn(username), attributes)
except LDAPConnectError as e:
raise UnixError(
_("Failed to modify user parameters in LDAP: %s") % str(e))
def remove_user(self, username):
try:
self.ldap_connect.ldap_remove_dn(self.user_dn(username))
except LDAPConnectError as e:
raise UnixError(_("Failed to remove user from LDAP: %s") % str(e))
8年前
class Unix(Ldap):
"""Основной объект для выполнения действий связанных
8年前
с настройкой Unix сервиса
"""
8年前
class Method(object):
8年前
Setup = "unix_setup"
8年前
UserAdd = "unix_useradd"
UserMod = "unix_usermod"
UserDel = "unix_userdel"
UserShow = "unix_usershow"
GroupAdd = "unix_groupadd"
GroupMod = "unix_groupmod"
GroupDel = "unix_groupdel"
GroupShow = "unix_groupshow"
Passwd = "unix_passwd"
All = (
Setup, UserAdd, UserMod, UserDel, GroupAdd, GroupMod, GroupDel,
Passwd,
UserShow, GroupShow)
service_name = "unix"
def init(self):
pass
8年前
def rename_group(self, oldname, newname):
"""
Переименовать группу
"""
if oldname != newname and newname:
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
ug.rename_group(oldname, newname)
return True
def update_group_comment(self, group, comment):
"""
Переименовать группу
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
ug.update_group_comment(group, comment)
return True
def update_group_id(self, group, gid, old_gid):
"""
Обновить gid группы
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
ug.update_group_id(group, str(gid))
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
for user in uu.iterate_ldap_user("gidNumber=%d" % old_gid):
self.printSUCCESS(_("Changed primary group for user {user}").format(
8年前
user=user.username))
uu.update_user_group_id(user.username, str(gid))
return True
def remove_group(self, groupname):
"""
Удалить группу
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
ug.remove_group(groupname)
return True
def remove_user(self, username):
"""
Удалить пользователя
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
uu.remove_user(username)
return True
8年前
def _add_users_in_group(self, users, groupname):
8年前
"""
Добавить пользователей в группу
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
group = ug.search_ldap_group_name(groupname)
if group:
exists_users = group.user_list
new_users = set(users) - set(exists_users)
ug.add_users_to_group(new_users, groupname)
return True
return False
def remove_users_from_group(self, users, groupname):
8年前
self._remove_users_from_group(users, groupname)
self.printSUCCESS(_("Users {logins} were removed from "
"group {group}").format(
8年前
logins=", ".join(users), group=groupname))
return True
def add_users_in_group(self, users, groupname):
self._add_users_in_group(users, groupname)
self.printSUCCESS(_("Users {logins} were added "
8年前
"to group {group}").format(
logins=", ".join(users), group=groupname))
return True
def _remove_users_from_group(self, users, groupname):
8年前
"""
Удалить пользователей из групп
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
group = ug.search_ldap_group_name(groupname)
if group:
exists_users = group.user_list
remove_users = set(exists_users) & set(users)
ug.remove_users_from_group(list(remove_users), groupname)
return True
return False
def add_user_in_groups(self, user, groups):
"""
Добавить пользователя в группы
"""
for group in groups:
8年前
self._add_users_in_group([user], group)
self.printSUCCESS(_("User {login} was added "
8年前
"to groups {groups}").format(
login=user, groups=", ".join(groups)))
8年前
return True
def remove_user_from_groups(self, user, groups):
"""
Удалить пользователя из группы
"""
for group in groups:
8年前
self._remove_users_from_group([user], group)
self.printSUCCESS(_("User {login} was removed "
8年前
"from groups {groups}").format(
login=user, groups=", ".join(groups)))
8年前
return True
def modify_user(self, login, pw, pw_delete, gid, shell, visible, lock,
comment, homedir):
8年前
""""
Изменить параметры пользователя в LDAP
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
user = uu.search_ldap_user_name(login)
if user:
params = {}
if gid is not None and user.gid != gid:
params['gid'] = str(gid)
self.printSUCCESS(
_("Changed primary group for user {user}").format(
user.username))
8年前
if comment is not None and user.comment != comment:
params['comment'] = comment
self.printSUCCESS(
_("Changed comment for user %s") % user.username)
if homedir is not None and user.homedir != homedir:
params['homedir'] = homedir
self.printSUCCESS(
_("Changed home directory for user %s") % user.username)
if pw != UnixUsers.DeletedPassword or pw_delete:
8年前
params['pw'] = pw
if pw_delete:
self.printSUCCESS(
_("Removed password for user %s") % user.username)
else:
self.printSUCCESS(
_("Changed password for user %s") % user.username)
8年前
if shell and user.shell != shell:
params['shell'] = shell
self.printSUCCESS(
_("Changed shell for user %s") % user.username)
if visible is not None:
if user.visible != visible:
params['visible'] = visible
if visible:
self.printSUCCESS(
_("User %s is visible") % user.username)
else:
self.printSUCCESS(
_("User %s is unvisible") % user.username)
8年前
else:
if visible:
self.printWARNING(
_("User %s is visible already") % user.username)
else:
self.printWARNING(
_("User %s is already unvisible") % user.username)
if lock is not None:
if user.lock != lock:
params['lock'] = lock
if lock:
self.printSUCCESS(
_("User %s is locked") % user.username)
else:
self.printSUCCESS(
_("User %s is unlocked") % user.username)
8年前
else:
if lock:
self.printWARNING(
_("User %s is locked already") % user.username)
else:
self.printWARNING(
_("User %s is unlocked already") % user.username)
8年前
uu.modify_user(user.username, **params)
else:
raise UnixError(_("User %s not found") % user)
return True
def move_home_directory(self, homedir, new_homedir):
"""
Преместить домашнюю директорию пользователя
"""
if not path.exists(homedir):
self.printWARNING(_("Previous home directory %s not found")
% homedir)
return True
8年前
try:
self.printSUCCESS(
_("User home directory {homedir} "
"moved to {new_homedir}").format(
homedir=homedir, new_homedir=new_homedir))
8年前
dirname = path.dirname(new_homedir)
if not path.exists(dirname):
os.makedirs(dirname)
os.rename(homedir, new_homedir)
except (OSError, IOError):
raise UnixError(_("Failed to move home directory"))
return True
def create_home_directory(self, homedir, uid, gid, skel):
"""
Создать домашнюю директорию пользователя
"""
try:
os.makedirs(homedir, 0700)
if path.exists(skel):
for fn in listDirectory(skel):
src_fn = path.join(skel, fn)
dst_fn = path.join(homedir, fn)
if path.isdir(src_fn):
shutil.copytree(src_fn, dst_fn, True)
elif path.islink(src_fn):
link_src = os.readlink(src_fn)
os.symlink(link_src, dst_fn)
else:
shutil.copy2(src_fn, dst_fn)
for root, dns, fns in os.walk(homedir):
for fn in (path.join(root, x) for x in dns + fns):
os.lchown(fn, uid, gid)
os.lchown(homedir, uid, gid)
except (OSError, IOError) as e:
self.printWARNING(str(e))
raise UnixError(_("Failed to create user home directory"))
return True
def show_groups(self, groupname, fields, count, offset):
8年前
dv = self.clVars
fields = ["ur_unix_group_name"] + list(fields)
8年前
head = [dv.getInfo(x).label for x in fields]
body = []
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
8年前
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
filters = (self.clVars.getInfo(x)
for x in ('cl_unix_group_filter_name',
'cl_unix_group_filter_id',
'cl_unix_group_filter_comment',
'cl_unix_group_filter_users'))
8年前
filters = [x.test for x in filters if x.enabled()]
def all_users(group):
for user in sorted(uu.iterate_ldap_user("gidNumber=%d" % group.gid),
key=lambda x: x.username):
8年前
yield "<%s>" % user.username
for user in group.user_list:
yield user
8年前
variables_mapping = {
'ur_unix_group_name': lambda group: group.group_name,
'ur_unix_group_id': lambda group: str(group.gid),
'ur_unix_group_comment': lambda group: group.comment,
8年前
'ur_unix_group_users': lambda group: ", ".join(all_users(group))
8年前
}
mapping = {'ur_unix_group_name': 'ur_unix_group_name_exists'}
maxi = 0
try:
for i, group in enumerate(sorted(
(group for group in
ug.iterate_ldap_group("cn=*")
if all(x(group) for x in filters)),
key=lambda x: x.group_name)):
maxi = i
if offset <= i < offset + count:
body.append(
[variables_mapping.get(x)(group) for x in fields])
except LDAPBadSearchFilter:
raise UnixError(_("Wrong group pattern"))
8年前
table_fields = [mapping.get(x, '') for x in fields]
8年前
if not body:
body = [[]]
dv.Invalidate('ur_unix_group_count')
if any(body):
head_message = _("Groups")
elif dv.GetInteger("ur_unix_group_count"):
head_message = _("Groups not found")
else:
head_message = _("No groups")
8年前
self.printTable(
head_message, head, body,
fields=table_fields,
onClick='unix_groupmod' if any(table_fields) else None,
addAction='unix_groupadd',
records=str(maxi + 1))
8年前
if any(body):
8年前
num_page, count_page = getPagesInterval(
count, offset, maxi + 1)
self.printSUCCESS(_('page %d of ') % num_page + str(count_page))
8年前
return True
def show_group(self, groupname):
dv = self.clVars
list_group_name = sorted(dv.Choice('cl_core_group'))
if not list_group_name:
self.printSUCCESS(_("No groups"))
head = [_('Field'), _('Value')]
body = []
fields = [
'ur_unix_group_name',
'ur_unix_group_id',
'ur_unix_group_comment',
'ur_unix_group_users'
]
def get_primary_users(gid):
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
for user in sorted(uu.iterate_ldap_user("gidNumber=%d" % gid),
key=lambda x: x.username):
yield "<%s>" % user.username
8年前
self.clVars.Set('ur_unix_group_name', groupname, True)
gid = self.clVars.GetInteger('ur_unix_group_id')
8年前
for varname in fields:
varval = self.clVars.Get(varname)
varobj = self.clVars.getInfo(varname)
if varname == 'ur_unix_group_users':
varval = (sorted(get_primary_users(gid))
+ sorted(varval))
8年前
if "list" in varobj.type:
varval = "\n".join(varval)
body.append([varobj.label or "", varval])
if body:
self.printTable(_("Group info"), head, body,
onClick="unix_groupmod",
records="0",
fields=["ur_unix_group_name_exists:%s" % groupname])
8年前
return True
8年前
def show_users(self, login, fields, count, offset):
8年前
dv = self.clVars
fields = ["ur_unix_login"] + list(fields)
8年前
head = [dv.getInfo(x).label for x in fields]
body = []
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
yesno_map = {True: _("Yes"),
False: _("No"),
None: _("Unavailable")}
yesno = lambda x: yesno_map.get(x, _("Failed value"))
8年前
8年前
filters = (self.clVars.getInfo(x)
for x in ('cl_unix_user_filter_login',
'cl_unix_user_filter_pw_set',
'cl_unix_user_filter_comment',
'cl_unix_user_filter_uid',
'cl_unix_user_filter_gid',
'cl_unix_user_filter_home_path',
'cl_unix_user_filter_shell',
'cl_unix_user_filter_groups',
'cl_unix_user_filter_visible_set',
'cl_unix_user_filter_lock_set',
))
filters = [x.test for x in filters if x.enabled()]
8年前
variables_mapping = {
'ur_unix_uid': lambda user: user.uid,
'ur_unix_login': lambda user: user.username,
'ur_unix_comment': lambda user: user.comment,
'ur_unix_lock_set': lambda user: yesno(user.lock),
'ur_unix_visible_set': lambda user: yesno(user.visible),
'ur_unix_primary_group': lambda user: ug.gid_to_name(str(user.gid)),
'ur_unix_groups': lambda user: ", ".join(
x.group_name for x in ug.iterate_ldap_group(
"memberUid=%s" % user.username)
),
'ur_unix_home_path': lambda user: user.homedir,
'ur_unix_shell': lambda user: user.shell,
8年前
'ur_unix_pw_set': lambda user: yesno(user.pass_set),
8年前
}
mapping = {'ur_unix_login': 'ur_unix_login_exists'}
maxi = 0
8年前
try:
for i, user in enumerate(sorted(
(user for user in uu.iterate_ldap_user("uid=*")
if all(x(user) for x in filters)),
key=lambda x: x.username)):
maxi = i
if offset <= i < offset + count:
8年前
body.append(
[variables_mapping.get(x)(user) for x in fields])
8年前
except LDAPBadSearchFilter:
raise UnixError(_("Wrong user pattern"))
8年前
table_fields = [mapping.get(x, '') for x in fields]
if not body:
body = [[]]
dv.Invalidate('ur_unix_user_count')
if any(body):
head_message = _("Users")
elif dv.GetInteger("ur_unix_user_count"):
head_message = _("Users not found")
else:
head_message = _("No users")
self.printTable(head_message,
8年前
head, body,
fields=table_fields,
onClick='unix_usermod' if any(table_fields) else None,
addAction='unix_useradd',
records=str(maxi + 1))
8年前
if any(body):
8年前
num_page, count_page = getPagesInterval(
count, offset, maxi + 1)
self.printSUCCESS(_('page %d of ') % num_page + str(count_page))
8年前
return True
def show_user(self, user):
dv = self.clVars
head = [_('Field'), _('Value')]
body = []
fields = [
'ur_unix_uid',
'ur_unix_login',
'ur_unix_comment',
'ur_unix_lock_set',
'ur_unix_visible_set',
'ur_unix_primary_group',
'ur_unix_groups',
'ur_unix_home_path',
'ur_unix_shell',
'ur_unix_pw_set',
]
self.clVars.Set('ur_unix_login', user, True)
for varname in fields:
varval = self.clVars.Get(varname)
varobj = self.clVars.getInfo(varname)
if "list" in varobj.type:
varval = "\n".join(varval)
body.append([varobj.label or "", varval])
if body:
self.printTable(_("User info"), head, body,
onClick="unix_usermod",
records="0",
fields=["ur_unix_login_exists:%s" % user])
8年前
return True
def try_remove_primary_group(self, user, primary_group):
"""
Primary group
:param primary_group:
:return:
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
filter_str = "gidNumber={0}".format(primary_group)
filter_str2 = "(&(gidNumber={0})(!(uid={1})))".format(
primary_group, user)
group = ug.search_ldap_group(filter_str)
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
if group:
if not group.user_list and not uu.search_ldap_user(filter_str2):
self.remove_group(group.group_name)
self.printSUCCESS(_("Removed user primary group {0}").format(
group.group_name))
return True