You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-desktop/pym/desktop/variables/desktop.py

446 lines
14 KiB

# -*- coding: utf-8 -*-
# Copyright 2010-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import re
from os import path
import pwd
from calculate.lib.datavars import (Variable, VariableError, ReadonlyVariable,
ReadonlyTableVariable, FieldValue,
VariableInterface)
from calculate.lib.utils.common import getValueFromConfig
from calculate.lib.variables import user
from calculate.lib.utils.files import (readLinesFile, process,
listDirectory)
from calculate.desktop._cl_keys import getKey
from itertools import *
from calculate.lib.cl_template import templateFunction
from calculate.lib.cl_ini_parser import iniParser
import hashlib
from calculate.lib.cl_lang import setLocalTranslate
_ = lambda x: x
setLocalTranslate('cl_desktop3', sys.modules[__name__])
class VariableUrJidHost(ReadonlyVariable):
"""
Jabber host for user
"""
def get(self):
userJid = self.Get("ur_jid")
if userJid:
return userJid.partition('@')[2]
return ""
class DomainInfoHelper(VariableInterface):
"""
Вспомогательный класс для определения доменный ли пользователь
"""
def getUserDataInFile(self, login, filePasswd):
return filter(lambda x: x[0] == login,
map(lambda x: x.strip().split(':'),
readLinesFile(filePasswd)))
def isDomainUser(self, userName):
if userName:
try:
passwdUserData = self.getUserDataInFile(userName, "/etc/passwd")
except Exception:
return False
if passwdUserData:
passwdUserData = passwdUserData[0]
try:
cacheUserData = self.getUserDataInFile(
userName,
"/var/lib/calculate/calculate-client/cache/passwd")
except Exception:
return False
if cacheUserData:
cacheUserData = cacheUserData[0]
if cacheUserData == passwdUserData:
return True
else:
return True
return False
class VariableUrDomainSet(ReadonlyVariable, DomainInfoHelper):
"""
Flag for determining domain user or local
"""
type = "bool"
def getUserDataInFile(self, login, file_passwd):
return filter(lambda x: x[0] == login,
map(lambda x: x.strip().split(':'),
readLinesFile(file_passwd)))
def get(self):
return "on" if self.isDomainUser(self.Get('ur_login')) else "off"
class VariableClDesktopXsession(ReadonlyVariable):
"""
User current X session
"""
session_list = ("kde", "gnome", "mate", "xfce", "plasma")
def get(self):
envXsessionFile = "/etc/env.d/90xsession"
xsession = os.environ.get("XSESSION", None)
desktopSession = os.environ.get("DESKTOP_SESSION", None)
if not xsession:
xsession = getValueFromConfig(envXsessionFile, "XSESSION")
if xsession:
if (desktopSession and any(x in desktopSession.lower()
for x in self.session_list)):
xsession = desktopSession
xsession = xsession.lower()
for session in self.session_list:
if session in xsession:
return session
else:
return xsession
return ""
class VariableClDesktopGstData(ReadonlyVariable):
"""
GStreamer data
"""
def get(self):
# try import gst
copyargv = sys.argv
sys.argv = []
olderr = os.dup(sys.stderr.fileno())
try:
os.close(sys.stderr.fileno())
import gst
import gst.interfaces
except ImportError:
gst = None
finally:
sys.argv = copyargv
os.dup2(olderr, sys.stderr.fileno())
if gst is None:
return {}
outdata = {}
try:
pipeline = "alsamixer"
alsamixer = gst.element_factory_make(pipeline)
res = alsamixer.set_state(gst.STATE_PAUSED)
if res == gst.STATE_CHANGE_SUCCESS:
outdata['device_name'] = alsamixer.get_property("device-name")
outdata['long_name'] = alsamixer.get_factory().get_longname()
outdata['internal_name'] = filter(str.isalnum,
"%s (%s)" % (
outdata['device_name'],
outdata['long_name']))
outdata['channels'] = []
for t in alsamixer.list_tracks():
if t.flags & gst.interfaces.MIXER_TRACK_OUTPUT:
if t.flags & gst.interfaces.MIXER_TRACK_MASTER or \
any(x in t.label
for x in ("Wave", "Front", "LFE", "Center",
"Head", "Side", "Speaker",
"Surround", "PCM")):
outdata['channels'].append(t.label)
if t.flags & gst.interfaces.MIXER_TRACK_MASTER:
outdata['master_channel'] = t.label
except Exception:
pass
return outdata
class VariableClDesktopGstCard(ReadonlyVariable):
"""
Internal card name for xfce mixer
"""
def get(self):
return self.Get('cl_desktop_gst_data').get('internal_name', '')
class VariableClDesktopGstMasterchannel(ReadonlyVariable):
"""
Master track name
"""
def get(self):
return self.Get('cl_desktop_gst_data').get('master_channel', '')
class VariableClDesktopXfceMixer(ReadonlyVariable):
"""
List of channel for xfce-perchannel mixer
"""
def get(self):
return "\n".join(
map(lambda x: ' <value type="string" value="%s" />' % x,
self.Get('cl_desktop_gst_data').get('channels', [])))
class VariableClDesktopOnlineData(ReadonlyTableVariable, DomainInfoHelper):
"""
Information about online users
"""
source = ['cl_desktop_online_user',
'cl_desktop_online_display',
'cl_desktop_online_uid',
'cl_desktop_online_domain_set',
'cl_desktop_online_count']
reDisplay = re.compile(r"^\(?:(\d+\.?\d*)")
def _getDisplay(self, *args):
"""
Get DISPLAY from args
"""
for arg in map(self.reDisplay.search, args):
if arg:
return arg.group(1)
return ""
def get_user_uid(self, username):
try:
return str(pwd.getpwnam(username).pw_uid)
except Exception:
return ""
def get(self, hr=False):
# TODO: need to KISS rewrite
resWho = process("who")
xData = [[]]
if resWho.success():
listProcessing = lambda x: (x[0], x[1], x[-1]) \
if len(x) >= 5 else []
xData = groupby(
sorted(
filter(lambda x: x[0] != "root",
map(lambda x: (x[0], self._getDisplay(x[1], x[2])),
filter(lambda x: x and \
(x[2].startswith("(:") or
x[1].startswith(":")),
map(lambda x: listProcessing(
filter(lambda y: y, x.split())),
resWho)))),
key=lambda x: x[0]),
lambda x: x[0])
xData = map(lambda x: (x[0][0], x[0][1],
self.get_user_uid(x[0][0]),
"on" if self.isDomainUser(
x[0][0]) else "off",
len(x)),
map(lambda x: list(x[1]),
xData))
return xData
setValue = Variable.setValue
class VariableClDesktopOnlineUser(FieldValue, ReadonlyVariable):
"""
Логин пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 0
class VariableClDesktopOnlineDisplay(FieldValue, ReadonlyVariable):
"""
Display пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 1
class VariableClDesktopOnlineUid(FieldValue, ReadonlyVariable):
"""
UID пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 2
class VariableClDesktopOnlineDomainSet(FieldValue, ReadonlyVariable):
"""
Является ли пользователь доменным
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 3
class VariableClDesktopOnlineCount(FieldValue, ReadonlyVariable):
"""
Количество сеансов пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 4
class VariableClDesktopLogin(user.VariableUrLogin):
"""
User Login
"""
opt = ["cl_desktop_login"]
def choice(self):
if self.Get('cl_action') == 'logout':
return self.Get('cl_desktop_online_user')
else:
return user.VariableUrLogin.choice(self)
def check(self, value):
"""Does user exist"""
if value not in self.choice() and self.Get('cl_action') == 'logout':
raise VariableError(_("No X session user found"))
if value == "":
raise VariableError(_("Please specify the user name"))
try:
pwd.getpwnam(value).pw_gid
except Exception:
raise VariableError(_("User %s does not exist") % value)
def get(self):
return ""
class VariableUrMountDirs(ReadonlyVariable):
"""
Примонтированные директории в профиле пользователя
"""
type = "list"
def get(self):
homeDir = self.Get("ur_home_path")
if not homeDir:
return []
dirStart, dirEnd = path.split(homeDir)
mountProfileDir = path.join(dirStart, ".%s" % dirEnd)
mountRemoteProfileDir = path.join(dirStart, ".%s.remote" % dirEnd)
directories = filter(lambda x: x != homeDir,
filter(lambda x: (x.startswith(homeDir) or
x.startswith(mountProfileDir) or
x.startswith(
mountRemoteProfileDir)),
map(lambda x: x.split(" ")[1],
readLinesFile('/proc/mounts'))))
# if isMount(homeDir):
# directories.append(homeDir)
return sorted(directories, reverse=True)
class VariableUrPassword(ReadonlyVariable):
"""
Пароль пользователя, получаемый из ключей ядра
"""
def get(self):
return getKey(self.Get('ur_login')) or ""
class VariableClDesktopUpdateProfileSet(Variable):
"""
Нужно ли выполнять обновление профиля пользователя на основании
обновлении пакетов
"""
type = "bool"
def get(self):
lastTimestamp = templateFunction.getLastElog()
iniEnv = path.join(self.Get('ur_home_path'), '.calculate/ini.env')
userIni = iniParser(iniEnv)
userTimestamp = userIni.getVar('main', 'elog')
if userTimestamp:
userTimestamp = userTimestamp.encode('utf-8')
profileSetup = userIni.getVar('main', 'profile_setup')
login_setup = profileSetup == 'on'
if (self.Get('ur_domain_set') == 'on' or login_setup or
not path.exists(iniEnv) or userTimestamp != lastTimestamp):
return 'on'
else:
return 'off'
class VariableClDesktopForceSetupSet(Variable):
"""
Принудительно выполнить обновление пользовательского профиля
"""
type = "bool"
opt = ["--force-setup"]
metavalue = "ON/OFF"
value = "off"
def init(self):
self.label = _("Force configuration")
self.help = _("force configuration")
class VariableClDesktopFacePath(Variable):
"""Путь к стандартным иконкам пользователей"""
value = "/usr/share/pixmaps/faces"
class VariableClDesktopFaceList(Variable):
"""Список доступных иконок по умолчанию для пользователей"""
type = "list"
def get(self):
return sorted(
filter(lambda x: x.endswith('.png'),
listDirectory(self.Get('cl_desktop_face_path'))))
class VariableClDesktopHashFace(Variable):
"""Номер иконки пользователя
Номер вычисляется по контрольной сумму md5 логина пользователя
"""
def get(self):
login = self.Get('ur_login')
icon_list = self.Get('cl_desktop_face_list')
if icon_list:
return path.join(self.Get('cl_desktop_face_path'),
icon_list[sum(map(lambda x: ord(x),
hashlib.md5(
login).digest())) % len(
icon_list)])
else:
return ""
class VariableClDesktopFastloginPath(ReadonlyVariable):
"""
Путь до каталога в котором указаны пользователи быстрого входа в сеанс
"""
value = "/var/lib/calculate/calculate-desktop/fastlogin"