You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-unix/pym/unix/unix.py

932 lines
34 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
import sys
from calculate.lib.utils.files import readLinesFile, listDirectory
from calculate.ldap.ldap import Ldap
from calculate.lib.utils.common import getPagesInterval
import os
from os import path
import shutil
_ = lambda x: x
from calculate.lib.cl_lang import (setLocalTranslate, getLazyLocalTranslate)
from calculate.lib.cl_ldap import ldap, LDAPConnectError, LDAPBadSearchFilter
setLocalTranslate('cl_unix3', sys.modules[__name__])
__ = getLazyLocalTranslate(_)
class UnixError(Exception):
pass
UnixGroup = namedtuple("UnixGroup", ['group_name', 'gid', 'user_list',
'comment'])
class GroupFile(object):
def __init__(self, group_fn='/etc/group'):
self.group_fn = group_fn
def __iter__(self):
for line in (x for x in readLinesFile(self.group_fn)
if not x.startswith("#")):
data = line.strip().split(':')[:4]
if len(data) >= 4 and data[2].isdigit():
yield UnixGroup(data[0], int(data[2]),
filter(None, data[3].split(",")),
"/etc/group")
class UnixGroups(object):
"""
Набор методов для нахождения параметров группы и сопаставления GID,
названия
"""
groups = GroupFile()
exception = UnixError
ldap_connect = None
groups_dn = ""
def __init__(self, ldap_connect, groups_dn):
self.ldap_connect = ldap_connect
self.groups_dn = groups_dn
def search_system_group_name(self, name):
for group in (x for x in self.groups if x.group_name == name):
return group
else:
return None
def search_system_group_id(self, gid):
for group in (x for x in self.groups if x.gid == gid):
return group
else:
return None
def iterate_ldap_group(self, search_filter, offset=None, count=None):
ldap_connect = self.ldap_connect
if ldap_connect:
base_dn = self.groups_dn
if offset is not None:
below = lambda x: x < offset
else:
below = lambda x: False
if count is not None:
above = lambda x: x >= offset + count
else:
above = lambda x: False
for i, group in enumerate(x[0][1] for x in ldap_connect.ldap_search(
base_dn, ldap.SCOPE_ONELEVEL, search_filter)):
if below(i):
continue
if above(i):
break
yield UnixGroup(
group['cn'][0],
int(group['gidNumber'][0]),
sorted(group.get('memberUid', [])),
group['description'][0])
def search_ldap_group(self, search_filter):
for group in self.iterate_ldap_group(search_filter):
return group
return None
def search_ldap_group_name(self, value):
return self.search_ldap_group("cn=%s" % value)
def gid_to_name(self, value):
if value.isdigit():
value = int(value)
group = (self.search_ldap_group("gidNumber=%d" % value) or
self.search_system_group_id(value))
if group:
return group.group_name
return value
def _groupname(self, name, comment):
if comment != name:
return "{0} ({1})".format(name, comment)
else:
return name
def ldap_group_list(self):
groups = list(x for x in self.iterate_ldap_group("cn=*"))
return [(x.group_name, self._groupname(x.group_name, x.comment))
for x in sorted(groups, key=lambda k: k.gid)]
def ldap_group_names(self):
return list(x.group_name for x in self.iterate_ldap_group("cn=*"))
def check_group(self, value):
ldap_group = self.search_ldap_group_name(value)
unix_group = ldap_group or self.search_system_group_name(value)
if not unix_group:
raise self.exception(_("Group %s not found") % value)
if not ldap_group and unix_group:
raise self.exception(
_("You should create group %s (%d) in Unix service") %
(unix_group.group_name, unix_group.gid))
def group_dn(self, group):
return "cn=%s,%s" % (group, self.groups_dn)
def add_users_to_group(self, users, group):
try:
self.ldap_connect.ldap_modify_attrs(
self.group_dn(group),
((ldap.MOD_ADD, 'memberUid', user) for user in users))
except LDAPConnectError as e:
raise UnixError(_("Failed to modify group in LDAP: %s") % str(e))
def update_group_id(self, group, gid):
try:
self.ldap_connect.ldap_modify_attrs(
self.group_dn(group),
[(ldap.MOD_REPLACE, 'gidNumber', gid)])
except LDAPConnectError as e:
raise UnixError(_("Failed to update group id in LDAP: %s") % str(e))
def update_group_comment(self, group, comment):
try:
self.ldap_connect.ldap_modify_attrs(
self.group_dn(group),
[(ldap.MOD_REPLACE, 'description', comment)])
except LDAPConnectError as e:
raise UnixError(
_("Failed to update group comment in LDAP: %s") % str(e))
def remove_users_from_group(self, users, group):
try:
self.ldap_connect.ldap_modify_attrs(
self.group_dn(group),
((ldap.MOD_DELETE, 'memberUid', user) for user in users))
except LDAPConnectError as e:
raise UnixError(
_("Failed to remove users from group in LDAP: %s") % str(e))
def rename_group(self, oldname, newname):
try:
old_dn = self.group_dn(oldname)
new_dn = "cn=%s" % newname
self.ldap_connect.ldap_modify_dn(old_dn, new_dn)
except LDAPConnectError as e:
raise UnixError(
_("Failed to rename the group in LDAP: %s") % str(e))
def remove_group(self, group):
try:
self.ldap_connect.ldap_remove_dn(self.group_dn(group))
except LDAPConnectError as e:
raise UnixError(
_("Failed to remove the group in LDAP: %s") % str(e))
UnixUser = namedtuple(
"UnixUser", ['username', 'uid', 'gid', 'comment', 'homedir', 'shell',
'visible', 'lock', 'pass_set'])
class PasswdFile(object):
def __init__(self, passwd_fn='/etc/passwd'):
self.passwd_fn = passwd_fn
def __iter__(self):
for line in (x for x in readLinesFile(self.passwd_fn)
if not x.startswith("#")):
data = line.strip().split(':')[:7]
if len(data) >= 7 and data[2].isdigit() and data[3].isdigit():
yield UnixUser(data[0], int(data[2]),
int(data[3]), data[4], data[5], data[6], True,
False, False)
class UnixUsers(object):
"""
Набор методов для нахождения параметров пользователя и сопаставления UID,
названия
"""
passwd = PasswdFile()
exception = UnixError
ldap_connect = None
users_dn = ""
DeletedPassword = "crypt{xxx}"
def __init__(self, ldap_connect, users_dn):
self.ldap_connect = ldap_connect
self.users_dn = users_dn
def search_system_user_name(self, name):
for user in (x for x in self.passwd if x.username == name):
return user
else:
return None
def search_system_user_id(self, uid):
uid = int(uid)
for user in (x for x in self.passwd if x.uid == uid):
return user
else:
return None
def iterate_ldap_user(self, search_filter, offset=None, count=None):
ldap_connect = self.ldap_connect
if ldap_connect:
base_dn = self.users_dn
if offset is not None:
below = lambda x: x < offset
else:
below = lambda x: False
if count is not None:
above = lambda x: x >= offset + count
else:
above = lambda x: False
for i, user in enumerate(x[0][1] for x in ldap_connect.ldap_search(
base_dn, ldap.SCOPE_ONELEVEL, search_filter)):
if below(i):
continue
if above(i):
break
yield UnixUser(
user['uid'][0],
int(user['uidNumber'][0]),
int(user['gidNumber'][0]),
user['cn'][0],
user['homeDirectory'][0],
user['loginShell'][0],
self.flag_to_visible(user['shadowFlag'][0]),
self.flag_to_lock(user['shadowExpire'][0]),
self.has_password(user['userPassword'][0])
if 'userPassword' in user else None,
)
def search_ldap_user(self, search_filter):
for user in self.iterate_ldap_user(search_filter):
return user
return None
def has_password(self, password):
if password and password != self.DeletedPassword:
return True
return False
def search_ldap_user_name(self, value):
return self.search_ldap_user("uid=%s" % value)
def search_ldap_user_id(self, value):
return self.search_ldap_user("uidNumber=%s" % value)
def get_primary_gids(self):
ldap_connect = self.ldap_connect
if ldap_connect:
base_dn = self.users_dn
return list(ldap_connect.ldap_simple_search(
base_dn, "uid=*", "gidNumber"))
return []
def get_password_hash(self, username):
ldap_connect = self.ldap_connect
if ldap_connect:
base_dn = self.users_dn
for pw in ldap_connect.ldap_simple_search(
self.users_dn, "uid=%s" % username, "userPassword"):
return pw
return ""
def uid_to_name(self, value):
if value.isdigit():
value = int(value)
user = (self.search_ldap_user("uidNumber=%d" % value) or
self.search_system_user_id(value))
if user:
return user.username
return value
def _username(self, name, comment):
if comment != name:
return "{0} ({1})".format(name, comment)
else:
return name
def ldap_user_list(self):
users = list(x for x in self.iterate_ldap_user("uid=*"))
return [(x.username, self._username(x.username, x.comment))
for x in sorted(users, key=lambda y: y.uid)]
def ldap_user_names(self):
return list(x.username for x in self.iterate_ldap_user("uid=*"))
# def check_user(self, value):
# ldap_user = self.search_ldap_user_name(value)
# unix_user = ldap_user or self.search_system_user_name(value)
# if not unix_user:
# raise VariableError(_("User %s not found") % value)
# if not ldap_user and unix_user:
# raise VariableError(
# _("You should create group %s (%d) in Unix service") %
# (unix_group.group_name, unix_group.gid))
def user_dn(self, user):
return "uid=%s,%s" % (user, self.users_dn)
def update_user_group_id(self, user, gid):
try:
self.ldap_connect.ldap_modify_attrs(
self.user_dn(user),
[(ldap.MOD_REPLACE, 'gidNumber', gid)])
except LDAPConnectError as e:
raise UnixError(
_("Failed to modify user primary group in LDAP: %s") % str(e))
def visible_to_flag(self, visible):
return "1" if visible else "0"
def flag_to_visible(self, flag):
return flag == "1"
def lock_to_flag(self, lock):
return "1" if lock else "-1"
def flag_to_lock(self, flag):
return flag == "1"
def modify_user(self, username, pw=None, gid=None, shell=None, visible=None,
lock=None, comment=None, homedir=None):
attributes = []
if pw is not None:
attributes.append((ldap.MOD_REPLACE, 'userPassword', str(pw)))
if gid is not None:
attributes.append((ldap.MOD_REPLACE, 'gidNumber', str(gid)))
if shell is not None:
attributes.append((ldap.MOD_REPLACE, 'loginShell', shell))
if comment is not None:
attributes.append((ldap.MOD_REPLACE, 'cn', comment))
if homedir is not None:
attributes.append((ldap.MOD_REPLACE, 'homeDirectory', homedir))
if visible is not None:
attributes.append(
(ldap.MOD_REPLACE, 'shadowFlag',
self.visible_to_flag(visible)))
if lock is not None:
attributes.append(
(ldap.MOD_REPLACE, 'shadowExpire', self.lock_to_flag(lock)))
try:
self.ldap_connect.ldap_modify_attrs(
self.user_dn(username), attributes)
except LDAPConnectError as e:
raise UnixError(
_("Failed to modify user parameters in LDAP: %s") % str(e))
def remove_user(self, username):
try:
self.ldap_connect.ldap_remove_dn(self.user_dn(username))
except LDAPConnectError as e:
raise UnixError(_("Failed to remove user from LDAP: %s") % str(e))
class Unix(Ldap):
"""Основной объект для выполнения действий связанных
с настройкой Unix сервиса
"""
class Method(object):
Setup = "unix_setup"
UserAdd = "unix_useradd"
UserMod = "unix_usermod"
UserDel = "unix_userdel"
UserShow = "unix_usershow"
GroupAdd = "unix_groupadd"
GroupMod = "unix_groupmod"
GroupDel = "unix_groupdel"
GroupShow = "unix_groupshow"
Passwd = "unix_passwd"
All = (
Setup, UserAdd, UserMod, UserDel, GroupAdd, GroupMod, GroupDel,
Passwd,
UserShow, GroupShow)
service_name = "unix"
def init(self):
pass
def rename_group(self, oldname, newname):
"""
Переименовать группу
"""
if oldname != newname and newname:
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
ug.rename_group(oldname, newname)
return True
def update_group_comment(self, group, comment):
"""
Переименовать группу
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
ug.update_group_comment(group, comment)
return True
def update_group_id(self, group, gid, old_gid):
"""
Обновить gid группы
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
ug.update_group_id(group, str(gid))
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
for user in uu.iterate_ldap_user("gidNumber=%d" % old_gid):
self.printSUCCESS(_("Change primary group for user {user}").format(
user=user.username))
uu.update_user_group_id(user.username, str(gid))
return True
def remove_group(self, groupname):
"""
Удалить группу
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
ug.remove_group(groupname)
return True
def remove_user(self, username):
"""
Удалить пользователя
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
uu.remove_user(username)
return True
def _add_users_in_group(self, users, groupname):
"""
Добавить пользователей в группу
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
group = ug.search_ldap_group_name(groupname)
if group:
exists_users = group.user_list
new_users = set(users) - set(exists_users)
ug.add_users_to_group(new_users, groupname)
return True
return False
def remove_users_from_group(self, users, groupname):
self._remove_users_from_group(users, groupname)
self.printSUCCESS(_("Users {logins} was removed "
"from group {group}").format(
logins=", ".join(users), group=groupname))
return True
def add_users_in_group(self, users, groupname):
self._add_users_in_group(users, groupname)
self.printSUCCESS(_("Users {logins} was added "
"to group {group}").format(
logins=", ".join(users), group=groupname))
return True
def _remove_users_from_group(self, users, groupname):
"""
Удалить пользователей из групп
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
group = ug.search_ldap_group_name(groupname)
if group:
exists_users = group.user_list
remove_users = set(exists_users) & set(users)
ug.remove_users_from_group(list(remove_users), groupname)
return True
return False
def add_user_in_groups(self, user, groups):
"""
Добавить пользователя в группы
"""
for group in groups:
self._add_users_in_group([user], group)
self.printSUCCESS(_("User {login} added "
"to groups {groups}").format(
login=user, groups=", ".join(groups)))
return True
def remove_user_from_groups(self, user, groups):
"""
Удалить пользователя из группы
"""
for group in groups:
self._remove_users_from_group([user], group)
self.printSUCCESS(_("User {login} removed "
"from groups {groups}").format(
login=user, groups=", ".join(groups)))
return True
def modify_user(self, login, pw, pw_delete, gid, shell, visible, lock,
comment, homedir):
""""
Изменить параметры пользователя в LDAP
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
user = uu.search_ldap_user_name(login)
if user:
params = {}
if gid is not None and user.gid != gid:
params['gid'] = str(gid)
self.printSUCCESS(
_("Changed primary group for user %s") % user.username)
if comment is not None and user.comment != comment:
params['comment'] = comment
self.printSUCCESS(
_("Changed comment for user %s") % user.username)
if homedir is not None and user.homedir != homedir:
params['homedir'] = homedir
self.printSUCCESS(
_("Changed home directory for user %s") % user.username)
if pw != UnixUsers.DeletedPassword or pw_delete:
params['pw'] = pw
if pw_delete:
self.printSUCCESS(
_("Removed password for user %s") % user.username)
else:
self.printSUCCESS(
_("Changed password for user %s") % user.username)
if shell and user.shell != shell:
params['shell'] = shell
self.printSUCCESS(
_("Changed shell for user %s") % user.username)
if visible is not None:
if user.visible != visible:
params['visible'] = visible
if visible:
self.printSUCCESS(
_("User %s is visible") % user.username)
else:
self.printSUCCESS(
_("User %s is unvisible") % user.username)
else:
if visible:
self.printWARNING(
_("User %s is visible already") % user.username)
else:
self.printWARNING(
_("User %s is unvisible already") % user.username)
if lock is not None:
if user.lock != lock:
params['lock'] = lock
if lock:
self.printSUCCESS(
_("User %s is locked") % user.username)
else:
self.printSUCCESS(
_("User %s is unlocked") % user.username)
else:
if lock:
self.printWARNING(
_("User %s is locked already") % user.username)
else:
self.printWARNING(
_("User %s is unlocked already") % user.username)
uu.modify_user(user.username, **params)
else:
raise UnixError(_("User %s not found") % user)
return True
def move_home_directory(self, homedir, new_homedir):
"""
Преместить домашнюю директорию пользователя
"""
try:
dirname = path.dirname(new_homedir)
if not path.exists(dirname):
os.makedirs(dirname)
os.rename(homedir, new_homedir)
except (OSError, IOError):
raise UnixError(_("Failed to move home directory"))
return True
def create_home_directory(self, homedir, uid, gid, skel):
"""
Создать домашнюю директорию пользователя
"""
try:
os.makedirs(homedir, 0700)
if path.exists(skel):
for fn in listDirectory(skel):
src_fn = path.join(skel, fn)
dst_fn = path.join(homedir, fn)
if path.isdir(src_fn):
shutil.copytree(src_fn, dst_fn, True)
elif path.islink(src_fn):
link_src = os.readlink(src_fn)
os.symlink(link_src, dst_fn)
else:
shutil.copy2(src_fn, dst_fn)
for root, dns, fns in os.walk(homedir):
for fn in (path.join(root, x) for x in dns + fns):
os.lchown(fn, uid, gid)
os.lchown(homedir, uid, gid)
except (OSError, IOError) as e:
self.printWARNING(str(e))
raise UnixError(_("Failed to create user home directory"))
return True
def show_groups(self, groupname, fields, count, offset):
dv = self.clVars
fields = ["ur_unix_group_name"] + list(fields)
head = [dv.getInfo(x).label for x in fields]
body = []
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
filters = (self.clVars.getInfo(x)
for x in ('cl_unix_group_filter_name',
'cl_unix_group_filter_id',
'cl_unix_group_filter_comment',
'cl_unix_group_filter_users'))
filters = [x.test for x in filters if x.enabled()]
def all_users(group):
for user in sorted(uu.iterate_ldap_user("gidNumber=%d" % group.gid),
key=lambda x: x.username):
yield "<%s>" % user.username
for user in group.user_list:
yield user
variables_mapping = {
'ur_unix_group_name': lambda group: group.group_name,
'ur_unix_group_id': lambda group: str(group.gid),
'ur_unix_group_comment': lambda group: group.comment,
'ur_unix_group_users': lambda group: ", ".join(all_users(group))
}
mapping = {'ur_unix_group_name': 'ur_unix_group_name_exists'}
maxi = 0
try:
for i, group in enumerate(sorted(
(group for group in
ug.iterate_ldap_group("cn=*")
if all(x(group) for x in filters)),
key=lambda x: x.group_name)):
maxi = i
if offset <= i < offset + count:
body.append(
[variables_mapping.get(x)(group) for x in fields])
except LDAPBadSearchFilter:
raise UnixError(_("Wrong group pattern"))
table_fields = [mapping.get(x, '') for x in fields]
if not body:
body = [[]]
dv.Invalidate('ur_unix_group_count')
if any(body):
head_message = _("Groups")
elif dv.GetInteger("ur_unix_group_count"):
head_message = _("Groups not found")
else:
head_message = _("No groups")
self.printTable(
head_message, head, body,
fields=table_fields,
onClick='unix_groupmod' if any(table_fields) else None,
addAction='unix_groupadd',
records=str(maxi))
if any(body):
num_page, count_page = getPagesInterval(
count, offset, maxi)
self.printSUCCESS(_('page %d from ') % num_page + str(count_page))
return True
def show_group(self, groupname):
dv = self.clVars
list_group_name = sorted(dv.Choice('cl_core_group'))
if not list_group_name:
self.printSUCCESS(_("No groups"))
head = [_('Field'), _('Value')]
body = []
fields = [
'ur_unix_group_name',
'ur_unix_group_id',
'ur_unix_group_comment',
'ur_unix_group_users'
]
def get_primary_users(gid):
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
for user in sorted(uu.iterate_ldap_user("gidNumber=%d" % gid),
key=lambda x: x.username):
yield "<%s>" % user.username
self.clVars.Set('ur_unix_group_name', groupname, True)
gid = self.clVars.GetInteger('ur_unix_group_id')
for varname in fields:
varval = self.clVars.Get(varname)
varobj = self.clVars.getInfo(varname)
if varname == 'ur_unix_group_users':
varval = (sorted(get_primary_users(gid))
+ sorted(varval))
if "list" in varobj.type:
varval = "\n".join(varval)
body.append([varobj.label or "", varval])
if body:
self.printTable(_("Group info"), head, body,
onClick="unix_groupmod",
records="0",
fields=["ur_unix_group_name_exists:%s" % groupname])
return True
def show_users(self, login, fields, count, offset):
dv = self.clVars
fields = ["ur_unix_login"] + list(fields)
head = [dv.getInfo(x).label for x in fields]
body = []
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
yesno_map = {True: _("Yes"),
False: _("No"),
None: _("Unavailable")}
yesno = lambda x: yesno_map.get(x, _("Failed value"))
filters = (self.clVars.getInfo(x)
for x in ('cl_unix_user_filter_login',
'cl_unix_user_filter_pw_set',
'cl_unix_user_filter_comment',
'cl_unix_user_filter_uid',
'cl_unix_user_filter_gid',
'cl_unix_user_filter_home_path',
'cl_unix_user_filter_shell',
'cl_unix_user_filter_groups',
'cl_unix_user_filter_visible_set',
'cl_unix_user_filter_lock_set',
))
filters = [x.test for x in filters if x.enabled()]
variables_mapping = {
'ur_unix_uid': lambda user: user.uid,
'ur_unix_login': lambda user: user.username,
'ur_unix_comment': lambda user: user.comment,
'ur_unix_lock_set': lambda user: yesno(user.lock),
'ur_unix_visible_set': lambda user: yesno(user.visible),
'ur_unix_primary_group': lambda user: ug.gid_to_name(str(user.gid)),
'ur_unix_groups': lambda user: ", ".join(
x.group_name for x in ug.iterate_ldap_group(
"memberUid=%s" % user.username)
),
'ur_unix_home_path': lambda user: user.homedir,
'ur_unix_shell': lambda user: user.shell,
'ur_unix_pw_set': lambda user: yesno(user.pass_set),
}
mapping = {'ur_unix_login': 'ur_unix_login_exists'}
maxi = 0
try:
for i, user in enumerate(sorted(
(user for user in uu.iterate_ldap_user("uid=*")
if all(x(user) for x in filters)),
key=lambda x: x.username)):
maxi = i
if offset <= i < offset + count:
body.append(
[variables_mapping.get(x)(user) for x in fields])
except LDAPBadSearchFilter:
raise UnixError(_("Wrong user pattern"))
table_fields = [mapping.get(x, '') for x in fields]
if not body:
body = [[]]
dv.Invalidate('ur_unix_user_count')
if any(body):
head_message = _("Users")
elif dv.GetInteger("ur_unix_user_count"):
head_message = _("Users not found")
else:
head_message = _("No users")
self.printTable(head_message,
head, body,
fields=table_fields,
onClick='unix_usermod' if any(table_fields) else None,
addAction='unix_useradd',
records=str(maxi))
if any(body):
num_page, count_page = getPagesInterval(
count, offset, maxi)
self.printSUCCESS(_('page %d from ') % num_page + str(count_page))
return True
def show_user(self, user):
dv = self.clVars
head = [_('Field'), _('Value')]
body = []
fields = [
'ur_unix_uid',
'ur_unix_login',
'ur_unix_comment',
'ur_unix_lock_set',
'ur_unix_visible_set',
'ur_unix_primary_group',
'ur_unix_groups',
'ur_unix_home_path',
'ur_unix_shell',
'ur_unix_pw_set',
]
self.clVars.Set('ur_unix_login', user, True)
for varname in fields:
varval = self.clVars.Get(varname)
varobj = self.clVars.getInfo(varname)
if "list" in varobj.type:
varval = "\n".join(varval)
body.append([varobj.label or "", varval])
if body:
self.printTable(_("User info"), head, body,
onClick="unix_usermod",
records="0",
fields=["ur_unix_login_exists:%s" % user])
return True
def try_remove_primary_group(self, user, primary_group):
"""
Primary group
:param primary_group:
:return:
"""
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
groups_dn = self.clVars.Get('ld_unix_groups_dn')
ug = UnixGroups(ldap_connect, groups_dn)
filter_str = "gidNumber={0}".format(primary_group)
filter_str2 = "(&(gidNumber={0})(!(uid={1})))".format(
primary_group, user)
group = ug.search_ldap_group(filter_str)
ldap_connect = self.clVars.Get('ldap.cl_ldap_connect')
users_dn = self.clVars.Get('ld_unix_users_dn')
uu = UnixUsers(ldap_connect, users_dn)
if group:
if not group.user_list and not uu.search_ldap_user(filter_str2):
self.remove_group(group.group_name)
self.printSUCCESS(_("Removed user primary group {0}").format(
group.group_name))
return True