You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-desktop/pym/desktop/variables/desktop.py

416 lines
14 KiB

#-*- coding: utf-8 -*-
# Copyright 2010-2013 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
12 years ago
import os
import sys
import re
12 years ago
from os import path
import pwd
from calculate.lib.datavars import Variable,VariableError,ReadonlyVariable, \
ReadonlyTableVariable,FieldValue
from calculate.lib.variables.user import VariableUrLogin
from calculate.lib.utils.files import (readLinesFile,process,isMount,
listDirectory)
from calculate.desktop._cl_keys import getKey
from itertools import *
from calculate.lib.cl_template import (templateFunction,iniParser)
import hashlib
12 years ago
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_desktop3',sys.modules[__name__])
class VariableUrJidHost(ReadonlyVariable):
"""
Jabber host for user
"""
def get(self):
userJid = self.Get("ur_jid")
if userJid:
return userJid.partition('@')[2]
return ""
class DomainInfoHelper:
12 years ago
"""
Вспомогательный класс для определения доменный ли пользователь
12 years ago
"""
def getUserDataInFile(self, login, filePasswd):
return filter(lambda x: x[0]==login,
map(lambda x: x.strip().split(':'),
readLinesFile(filePasswd)))
def isDomainUser(self,userName):
12 years ago
if userName:
try:
passwdUserData = self.getUserDataInFile(userName, "/etc/passwd")
except:
return False
12 years ago
if passwdUserData:
passwdUserData = passwdUserData[0]
try:
cacheUserData = self.getUserDataInFile(userName,
"/var/lib/calculate/calculate-client/cache/passwd")
except:
return False
12 years ago
if cacheUserData:
cacheUserData = cacheUserData[0]
if cacheUserData == passwdUserData:
return True
12 years ago
else:
return True
return False
class VariableUrDomainSet(ReadonlyVariable,DomainInfoHelper):
"""
Flag for determining domain user or local
"""
type = "bool"
def getUserDataInFile(self, login, filePasswd):
return filter(lambda x: x[0]==login,
map(lambda x: x.strip().split(':'),
readLinesFile(filePasswd)))
def get(self):
return "on" if self.isDomainUser(self.Get('ur_login')) else "off"
12 years ago
class VariableClDesktopXsession(ReadonlyVariable):
"""
User current X session
"""
def get(self):
envXsessionFile = "/etc/env.d/90xsession"
xsession = os.environ.get("XSESSION",None)
desktopSession = os.environ.get("DESKTOP_SESSION",None)
if not xsession:
if os.path.exists(envXsessionFile):
xsession = \
map(lambda x:x.partition("=")[2].strip("'\""),
filter(lambda x:x.startswith("XSESSION="),
filter(lambda x:not x.startswith("#"),
open(envXsessionFile,"r"))))
if xsession:
xsession = xsession[-1]
if xsession:
if desktopSession and \
any(x in desktopSession.lower()
for x in ("kde","xfce","gnome")):
xsession = desktopSession
if "kde" in xsession.lower():
return "kde"
elif "gnome" in xsession.lower():
return "gnome"
elif "xfce" in xsession.lower():
return "xfce"
else:
return xsession.lower()
return ""
class VariableClDesktopGstData(ReadonlyVariable):
"""
GStreamer data
"""
def get(self):
# try import gst
try:
copyargv = sys.argv
sys.argv = []
12 years ago
olderr = os.dup(sys.stderr.fileno())
os.close(sys.stderr.fileno())
import gst
import gst.interfaces
except ImportError:
gst = None
finally:
sys.argv= copyargv
12 years ago
os.dup2(olderr,sys.stderr.fileno())
if gst is None:
return {}
outdata = {}
try:
pipeline = "alsamixer"
alsamixer = gst.element_factory_make(pipeline)
res = alsamixer.set_state(gst.STATE_PAUSED)
if res == gst.STATE_CHANGE_SUCCESS:
outdata['device_name'] = alsamixer.get_property("device-name")
outdata['long_name'] = alsamixer.get_factory().get_longname()
outdata['internal_name'] = filter(str.isalnum,
"%s (%s)"%(outdata['device_name'],
outdata['long_name']))
outdata['channels'] = []
for t in alsamixer.list_tracks():
if t.flags & gst.interfaces.MIXER_TRACK_OUTPUT:
if t.flags & gst.interfaces.MIXER_TRACK_MASTER or \
any(x in t.label
for x in ("Wave","Front","LFE","Center",
"Head","Side","Speaker",
"Surround","PCM")):
outdata['channels'].append(t.label)
if t.flags & gst.interfaces.MIXER_TRACK_MASTER:
outdata['master_channel'] = t.label
except:
pass
return outdata
class VariableClDesktopGstCard(ReadonlyVariable):
"""
Internal card name for xfce mixer
"""
def get(self):
return self.Get('cl_desktop_gst_data').get('internal_name','')
class VariableClDesktopGstMasterchannel(ReadonlyVariable):
"""
Master track name
"""
def get(self):
return self.Get('cl_desktop_gst_data').get('master_channel','')
class VariableClDesktopXfceMixer(ReadonlyVariable):
"""
List of channel for xfce-perchannel mixer
"""
def get(self):
return "\n".join(
map(lambda x:' <value type="string" value="%s" />'%x,
self.Get('cl_desktop_gst_data').get('channels',[])))
class VariableClDesktopOnlineData(ReadonlyTableVariable,DomainInfoHelper):
"""
Information about online users
"""
source = ['cl_desktop_online_user',
'cl_desktop_online_display',
'cl_desktop_online_uid',
'cl_desktop_online_domain_set',
'cl_desktop_online_count']
12 years ago
reDisplay = re.compile(r"^\(?:(\d+\.?\d*)")
def _getDisplay(self,*args):
"""
Get DISPLAY from args
"""
for arg in map(self.reDisplay.search,args):
if arg:
return arg.group(1)
return ""
def get_user_uid(self,username):
try:
return str(pwd.getpwnam(username).pw_uid)
except:
return ""
def get(self,hr=False):
xSession = 0
foundTwoSession = False
resWho = process("who")
xData = [[]]
if resWho.success():
12 years ago
listProcessing = lambda x: (x[0], x[1], x[-1]) \
if len(x)>=5 else []
xData = groupby(
sorted(
filter(lambda x: x[0]!="root",
map(lambda x: (x[0],self._getDisplay(x[1],x[2])),
filter(lambda x: x and\
(x[2].startswith("(:") or \
x[1].startswith(":")),
map(lambda x: listProcessing(\
12 years ago
filter(lambda y: y, x.split())),
resWho)))),
key=lambda x:x[0]),
lambda x:x[0])
xData = map(lambda x:(x[0][0],x[0][1],
self.get_user_uid(x[0][0]),
"on" if self.isDomainUser(x[0][0]) else "off",
len(x)),
map(lambda x:list(x[1]),
xData))
return xData
setValue = Variable.setValue
class VariableClDesktopOnlineUser(FieldValue,ReadonlyVariable):
"""
Логин пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 0
class VariableClDesktopOnlineDisplay(FieldValue,ReadonlyVariable):
"""
Display пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 1
class VariableClDesktopOnlineUid(FieldValue,ReadonlyVariable):
"""
UID пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 2
class VariableClDesktopOnlineDomainSet(FieldValue,ReadonlyVariable):
"""
Является ли пользователь доменным
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 3
class VariableClDesktopOnlineCount(FieldValue,ReadonlyVariable):
"""
Количество сеансов пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 4
class VariableClDesktopLogin(VariableUrLogin):
"""
User Login
"""
opt = ["cl_desktop_login"]
def choice(self):
if self.Get('cl_action') == 'logout':
return self.Get('cl_desktop_online_user')
else:
return VariableUrLogin.choice(self)
def check(self,value):
"""Does user exist"""
if not value in self.choice() and self.Get('cl_action') == 'logout':
raise VariableError(_("No X session user found"))
if value == "":
raise VariableError(_("Please specify the user name"))
try:
pwd.getpwnam(value).pw_gid
except:
raise VariableError(_("User %s does not exist")%value)
def get(self):
return ""
class VariableUrMountDirs(ReadonlyVariable):
"""
Примонтированные директории в профиле пользователя
"""
type = "list"
def get(self):
homeDir = self.Get("ur_home_path")
if not homeDir:
return []
dirStart, dirEnd = path.split(homeDir)
mountProfileDir = path.join(dirStart, ".%s" %dirEnd)
mountRemoteProfileDir = path.join(dirStart, ".%s.remote" %dirEnd)
directories = filter(lambda x:x != homeDir,
filter(lambda x: (x.startswith(homeDir) or
x.startswith(mountProfileDir) or
x.startswith(mountRemoteProfileDir)),
map(lambda x: x.split(" ")[1],
readLinesFile('/proc/mounts'))))
#if isMount(homeDir):
# directories.append(homeDir)
return sorted(directories,reverse=True)
class VariableUrPassword(ReadonlyVariable):
"""
Пароль пользователя, получаемый из ключей ядра
"""
def get(self):
return getKey(self.Get('ur_login')) or ""
class VariableClDesktopUpdateProfileSet(Variable):
"""
Нужно ли выполнять обновление профиля пользователя на основании
обновлении пакетов
"""
type = "bool"
def get(self):
lastTimestamp = templateFunction.getLastElog()
iniEnv = path.join(self.Get('ur_home_path'),'.calculate/ini.env')
userIni = iniParser(iniEnv)
userTimestamp = userIni.getVar('main','elog')
if userTimestamp:
userTimestamp = userTimestamp.encode('utf-8')
profileSetup = userIni.getVar('main','profile_setup')
login_setup = profileSetup == 'on'
if (self.Get('ur_domain_set') == 'on' or login_setup or
not path.exists(iniEnv) or userTimestamp != lastTimestamp):
return 'on'
else:
return 'off'
class VariableClDesktopForceSetupSet(Variable):
"""
Принудительно выполнить обновление пользовательского профиля
"""
type = "bool"
opt = ["--force-setup"]
metavalue = "ON/OFF"
value = "off"
def init(self):
self.label = _("Force configuration")
self.help = _("force configuration")
class VariableClDesktopFacePath(Variable):
"""Путь к стандартным иконкам пользователей"""
value = "/usr/share/pixmaps/faces"
class VariableClDesktopFaceList(Variable):
"""Список доступных иконок по умолчанию для пользователей"""
type = "list"
def get(self):
return sorted(
filter(lambda x:x.endswith('.png'),
listDirectory(self.Get('cl_desktop_face_path'))))
class VariableClDesktopHashFace(Variable):
"""Номер иконки пользователя
Номер вычисляется по контрольной сумму md5 логина пользователя
"""
def get(self):
login = self.Get('ur_login')
icon_list = self.Get('cl_desktop_face_list')
if icon_list:
return path.join(self.Get('cl_desktop_face_path'),
icon_list[sum(map(lambda x:ord(x),
hashlib.md5(login).digest()))%len(icon_list)])
else:
return ""
class VariableClDesktopFastloginPath(ReadonlyVariable):
"""
Путь до каталога в котором указаны пользователи быстрого входа в сеанс
"""
value = "/var/lib/calculate/calculate-desktop/fastlogin"