#-*- coding: utf-8 -*- # Copyright 2010-2013 Calculate Ltd. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import sys import re from os import path import pwd from calculate.lib.datavars import Variable,VariableError,ReadonlyVariable, \ ReadonlyTableVariable,FieldValue from calculate.lib.variables.user import VariableUrLogin from calculate.lib.utils.files import (readLinesFile,process,isMount, listDirectory) from calculate.desktop._cl_keys import getKey from itertools import * from calculate.lib.cl_template import (templateFunction,iniParser) import hashlib from calculate.lib.cl_lang import setLocalTranslate setLocalTranslate('cl_desktop3',sys.modules[__name__]) class VariableUrJidHost(ReadonlyVariable): """ Jabber host for user """ def get(self): userJid = self.Get("ur_jid") if userJid: return userJid.partition('@')[2] return "" class DomainInfoHelper: """ Вспомогательный класс для определения доменный ли пользователь """ def getUserDataInFile(self, login, filePasswd): return filter(lambda x: x[0]==login, map(lambda x: x.strip().split(':'), readLinesFile(filePasswd))) def isDomainUser(self,userName): if userName: try: passwdUserData = self.getUserDataInFile(userName, "/etc/passwd") except: return False if passwdUserData: passwdUserData = passwdUserData[0] try: cacheUserData = self.getUserDataInFile(userName, "/var/lib/calculate/calculate-client/cache/passwd") except: return False if cacheUserData: cacheUserData = cacheUserData[0] if cacheUserData == passwdUserData: return True else: return True return False class VariableUrDomainSet(ReadonlyVariable,DomainInfoHelper): """ Flag for determining domain user or local """ type = "bool" def getUserDataInFile(self, login, filePasswd): return filter(lambda x: x[0]==login, map(lambda x: x.strip().split(':'), readLinesFile(filePasswd))) def get(self): return "on" if self.isDomainUser(self.Get('ur_login')) else "off" class VariableClDesktopXsession(ReadonlyVariable): """ User current X session """ def get(self): envXsessionFile = "/etc/env.d/90xsession" xsession = os.environ.get("XSESSION",None) desktopSession = os.environ.get("DESKTOP_SESSION",None) if not xsession: if os.path.exists(envXsessionFile): xsession = \ map(lambda x:x.partition("=")[2].strip("'\""), filter(lambda x:x.startswith("XSESSION="), filter(lambda x:not x.startswith("#"), open(envXsessionFile,"r")))) if xsession: xsession = xsession[-1] if xsession: if desktopSession and \ any(x in desktopSession.lower() for x in ("kde","xfce","gnome")): xsession = desktopSession if "kde" in xsession.lower(): return "kde" elif "gnome" in xsession.lower(): return "gnome" elif "xfce" in xsession.lower(): return "xfce" else: return xsession.lower() return "" class VariableClDesktopGstData(ReadonlyVariable): """ GStreamer data """ def get(self): # try import gst try: copyargv = sys.argv sys.argv = [] olderr = os.dup(sys.stderr.fileno()) os.close(sys.stderr.fileno()) import gst import gst.interfaces except ImportError: gst = None finally: sys.argv= copyargv os.dup2(olderr,sys.stderr.fileno()) if gst is None: return {} outdata = {} try: pipeline = "alsamixer" alsamixer = gst.element_factory_make(pipeline) res = alsamixer.set_state(gst.STATE_PAUSED) if res == gst.STATE_CHANGE_SUCCESS: outdata['device_name'] = alsamixer.get_property("device-name") outdata['long_name'] = alsamixer.get_factory().get_longname() outdata['internal_name'] = filter(str.isalnum, "%s (%s)"%(outdata['device_name'], outdata['long_name'])) outdata['channels'] = [] for t in alsamixer.list_tracks(): if t.flags & gst.interfaces.MIXER_TRACK_OUTPUT: if t.flags & gst.interfaces.MIXER_TRACK_MASTER or \ any(x in t.label for x in ("Wave","Front","LFE","Center", "Head","Side","Speaker", "Surround","PCM")): outdata['channels'].append(t.label) if t.flags & gst.interfaces.MIXER_TRACK_MASTER: outdata['master_channel'] = t.label except: pass return outdata class VariableClDesktopGstCard(ReadonlyVariable): """ Internal card name for xfce mixer """ def get(self): return self.Get('cl_desktop_gst_data').get('internal_name','') class VariableClDesktopGstMasterchannel(ReadonlyVariable): """ Master track name """ def get(self): return self.Get('cl_desktop_gst_data').get('master_channel','') class VariableClDesktopXfceMixer(ReadonlyVariable): """ List of channel for xfce-perchannel mixer """ def get(self): return "\n".join( map(lambda x:' '%x, self.Get('cl_desktop_gst_data').get('channels',[]))) class VariableClDesktopOnlineData(ReadonlyTableVariable,DomainInfoHelper): """ Information about online users """ source = ['cl_desktop_online_user', 'cl_desktop_online_display', 'cl_desktop_online_uid', 'cl_desktop_online_domain_set', 'cl_desktop_online_count'] reDisplay = re.compile(r"^\(?:(\d+\.?\d*)") def _getDisplay(self,*args): """ Get DISPLAY from args """ for arg in map(self.reDisplay.search,args): if arg: return arg.group(1) return "" def get_user_uid(self,username): try: return str(pwd.getpwnam(username).pw_uid) except: return "" def get(self,hr=False): xSession = 0 foundTwoSession = False resWho = process("who") xData = [[]] if resWho.success(): listProcessing = lambda x: (x[0], x[1], x[-1]) \ if len(x)>=5 else [] xData = groupby( sorted( filter(lambda x: x[0]!="root", map(lambda x: (x[0],self._getDisplay(x[1],x[2])), filter(lambda x: x and\ (x[2].startswith("(:") or \ x[1].startswith(":")), map(lambda x: listProcessing(\ filter(lambda y: y, x.split())), resWho)))), key=lambda x:x[0]), lambda x:x[0]) xData = map(lambda x:(x[0][0],x[0][1], self.get_user_uid(x[0][0]), "on" if self.isDomainUser(x[0][0]) else "off", len(x)), map(lambda x:list(x[1]), xData)) return xData setValue = Variable.setValue class VariableClDesktopOnlineUser(FieldValue,ReadonlyVariable): """ Логин пользователя """ type = "list" source_variable = "cl_desktop_online_data" column = 0 class VariableClDesktopOnlineDisplay(FieldValue,ReadonlyVariable): """ Display пользователя """ type = "list" source_variable = "cl_desktop_online_data" column = 1 class VariableClDesktopOnlineUid(FieldValue,ReadonlyVariable): """ UID пользователя """ type = "list" source_variable = "cl_desktop_online_data" column = 2 class VariableClDesktopOnlineDomainSet(FieldValue,ReadonlyVariable): """ Является ли пользователь доменным """ type = "list" source_variable = "cl_desktop_online_data" column = 3 class VariableClDesktopOnlineCount(FieldValue,ReadonlyVariable): """ Количество сеансов пользователя """ type = "list" source_variable = "cl_desktop_online_data" column = 4 class VariableClDesktopLogin(VariableUrLogin): """ User Login """ opt = ["cl_desktop_login"] def choice(self): if self.Get('cl_action') == 'logout': return self.Get('cl_desktop_online_user') else: return VariableUrLogin.choice(self) def check(self,value): """Does user exist""" if not value in self.choice() and self.Get('cl_action') == 'logout': raise VariableError(_("X session users not found")) if value == "": raise VariableError(_("Need to specify user")) try: pwd.getpwnam(value).pw_gid except: raise VariableError(_("User %s does not exist")%value) def get(self): return "" class VariableUrMountDirs(ReadonlyVariable): """ Примонтированные директории в профиле пользователя """ type = "list" def get(self): homeDir = self.Get("ur_home_path") if not homeDir: return [] dirStart, dirEnd = path.split(homeDir) mountProfileDir = path.join(dirStart, ".%s" %dirEnd) mountRemoteProfileDir = path.join(dirStart, ".%s.remote" %dirEnd) directories = filter(lambda x:x != homeDir, filter(lambda x: (x.startswith(homeDir) or x.startswith(mountProfileDir) or x.startswith(mountRemoteProfileDir)), map(lambda x: x.split(" ")[1], readLinesFile('/proc/mounts')))) #if isMount(homeDir): # directories.append(homeDir) return sorted(directories,reverse=True) class VariableUrPassword(ReadonlyVariable): """ Пароль пользователя, получаемый из ключей ядра """ def get(self): return getKey(self.Get('ur_login')) or "" class VariableClDesktopUpdateProfileSet(Variable): """ Нужно ли выполнять обновление профиля пользователя на основании обновлении пакетов """ type = "bool" def get(self): lastTimestamp = templateFunction.getLastElog() iniEnv = path.join(self.Get('ur_home_path'),'.calculate/ini.env') userIni = iniParser(iniEnv) userTimestamp = userIni.getVar('main','elog').encode('utf-8') if self.Get('ur_domain_set') == 'on' or \ not path.exists(iniEnv) or userTimestamp != lastTimestamp: return 'on' else: return 'off' class VariableClDesktopForceSetupSet(Variable): """ Принудительно выполнить обновление пользовательского профиля """ type = "bool" opt = ["--force-setup"] metavalue = "ON/OFF" value = "off" def init(self): self.label = _("Force setup") self.help = _("force setup") class VariableClDesktopFacePath(Variable): """Путь к стандартным иконкам пользователей""" value = "/usr/share/pixmaps/faces" class VariableClDesktopFaceList(Variable): """Список доступных иконок по умолчанию для пользователей""" type = "list" def get(self): return sorted( filter(lambda x:x.endswith('.png'), listDirectory(self.Get('cl_desktop_face_path')))) class VariableClDesktopHashFace(Variable): """Номер иконки пользователя Номер вычисляется по контрольной сумму md5 логина пользователя """ def get(self): login = self.Get('ur_login') icon_list = self.Get('cl_desktop_face_list') if icon_list: return path.join(self.Get('cl_desktop_face_path'), icon_list[sum(map(lambda x:ord(x), hashlib.md5(login).digest()))%len(icon_list)]) else: return ""