You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-desktop/pym/desktop/variables/desktop.py

492 lines
16 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2010-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import re
from os import path
import pwd
from calculate.lib.datavars import (Variable, VariableError, ReadonlyVariable,
ReadonlyTableVariable, FieldValue,
VariableInterface)
from calculate.lib.utils.common import getValueFromConfig
from calculate.lib.variables import user
import calculate.lib.utils.device as device
from calculate.lib.utils.files import (readLinesFile, process,
listDirectory, readFile)
from calculate.desktop._cl_keys import getKey
from itertools import *
import glob
from calculate.lib.cl_template import templateFunction
from calculate.lib.cl_ini_parser import iniParser
from calculate.lib.utils.portage import isPkgInstalled
import hashlib
from calculate.lib.cl_lang import setLocalTranslate
_ = lambda x: x
setLocalTranslate('cl_desktop3', sys.modules[__name__])
class VariableUrJidHost(ReadonlyVariable):
"""
Jabber host for user
"""
def get(self):
userJid = self.Get("ur_jid")
if userJid:
return userJid.partition('@')[2]
return ""
class DomainInfoHelper(VariableInterface):
"""
Вспомогательный класс для определения доменный ли пользователь
"""
def getUserDataInFile(self, login, filePasswd):
return list(filter(lambda x: x[0] == login,
map(lambda x: x.strip().split(':'),
readLinesFile(filePasswd))))
def isDomainUser(self, userName):
if userName:
try:
passwdUserData = self.getUserDataInFile(userName, "/etc/passwd")
except Exception:
return False
if passwdUserData:
passwdUserData = passwdUserData[0]
try:
cacheUserData = self.getUserDataInFile(
userName,
"/var/lib/calculate/calculate-client/cache/passwd")
except Exception:
return False
if cacheUserData:
cacheUserData = cacheUserData[0]
if cacheUserData == passwdUserData:
return True
else:
return True
return False
class VariableUrDomainSet(ReadonlyVariable, DomainInfoHelper):
"""
Flag for determining domain user or local
"""
type = "bool"
def getUserDataInFile(self, login, file_passwd):
return list(filter(lambda x: x[0] == login,
map(lambda x: x.strip().split(':'),
readLinesFile(file_passwd))))
def get(self):
return "on" if self.isDomainUser(self.Get('ur_login')) else "off"
class VariableClDesktopXsession(ReadonlyVariable):
"""
User current X session
"""
session_list = ("kde", "gnome", "mate", "xfce", "plasma")
def get(self):
envXsessionFile = "/etc/env.d/90xsession"
xsession = os.environ.get("XSESSION", None)
desktopSession = os.environ.get("DESKTOP_SESSION", None)
if self.Get('ur_login') == 'root' or not xsession:
xsession = getValueFromConfig(envXsessionFile, "XSESSION")
if xsession:
if (desktopSession and any(x in desktopSession.lower()
for x in self.session_list)):
xsession = desktopSession
xsession = xsession.lower()
for session in self.session_list:
if session in xsession:
return session
else:
return xsession
return ""
class VariableClDesktopGstData(ReadonlyVariable):
"""
GStreamer data
"""
def get(self):
# try import gst
copyargv = sys.argv
sys.argv = []
olderr = os.dup(sys.stderr.fileno())
try:
os.close(sys.stderr.fileno())
import gst
import gst.interfaces
except ImportError:
gst = None
finally:
sys.argv = copyargv
os.dup2(olderr, sys.stderr.fileno())
if gst is None:
return {}
outdata = {}
try:
pipeline = "alsamixer"
alsamixer = gst.element_factory_make(pipeline)
res = alsamixer.set_state(gst.STATE_PAUSED)
if res == gst.STATE_CHANGE_SUCCESS:
outdata['device_name'] = alsamixer.get_property("device-name")
outdata['long_name'] = alsamixer.get_factory().get_longname()
outdata['internal_name'] = list(filter(str.isalnum,
"%s (%s)" % (
outdata['device_name'],
outdata['long_name'])))
outdata['channels'] = []
for t in alsamixer.list_tracks():
if t.flags & gst.interfaces.MIXER_TRACK_OUTPUT:
if t.flags & gst.interfaces.MIXER_TRACK_MASTER or \
any(x in t.label
for x in ("Wave", "Front", "LFE", "Center",
"Head", "Side", "Speaker",
"Surround", "PCM")):
outdata['channels'].append(t.label)
if t.flags & gst.interfaces.MIXER_TRACK_MASTER:
outdata['master_channel'] = t.label
except Exception:
pass
return outdata
class VariableClDesktopGstCard(ReadonlyVariable):
"""
Internal card name for xfce mixer
"""
def get(self):
return self.Get('cl_desktop_gst_data').get('internal_name', '')
class VariableClDesktopGstMasterchannel(ReadonlyVariable):
"""
Master track name
"""
def get(self):
return self.Get('cl_desktop_gst_data').get('master_channel', '')
class VariableClDesktopXfceMixer(ReadonlyVariable):
"""
List of channel for xfce-perchannel mixer
"""
def get(self):
return "\n".join(
map(lambda x: ' <value type="string" value="%s" />' % x,
self.Get('cl_desktop_gst_data').get('channels', [])))
class VariableClDesktopXfcePointers(ReadonlyVariable):
"""
Список устройств-указателей для xfce
"""
type = "list"
def get(self):
def generate():
for input_device_name in sorted(
device.sysfs.glob(device.sysfs.Path.Input,
"mouse*/device/name")):
data = device.sysfs.read(input_device_name)
if data:
yield data.strip().replace(" ", "_").replace("/","")
return list(generate())
class VariableClDesktopOnlineData(ReadonlyTableVariable, DomainInfoHelper):
"""
Information about online users
"""
source = ['cl_desktop_online_user',
'cl_desktop_online_display',
'cl_desktop_online_uid',
'cl_desktop_online_domain_set',
'cl_desktop_online_count']
reDisplay = re.compile(r"^\(?:(\d+\.?\d*)")
def _getDisplay(self, *args):
"""
Get DISPLAY from args
"""
for arg in map(self.reDisplay.search, args):
if arg:
return arg.group(1)
return ""
def get_user_uid(self, username):
try:
return str(pwd.getpwnam(username).pw_uid)
except Exception:
return ""
def get(self, hr=False):
# TODO: need to KISS rewrite
resWho = process("who")
xData = [[]]
if resWho.success():
listProcessing = lambda x: (x[0], x[1], x[-1]) \
if len(x) >= 5 else []
xData = groupby(
sorted(
filter(lambda x: x[0] != "root",
map(lambda x: (x[0], self._getDisplay(x[1], x[2])),
filter(lambda x: x and \
(x[2].startswith("(:") or
x[1].startswith(":")),
map(lambda x: listProcessing(
list(filter(lambda y: y, x.split()))),
resWho)))),
key=lambda x: x[0]),
lambda x: x[0])
xData = list(map(lambda x: (x[0][0], x[0][1],
self.get_user_uid(x[0][0]),
"on" if self.isDomainUser(
x[0][0]) else "off",
len(x)),
map(lambda x: list(x[1]),
xData)))
return xData
setValue = Variable.setValue
class VariableClDesktopOnlineUser(FieldValue, ReadonlyVariable):
"""
Логин пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 0
class VariableClDesktopOnlineDisplay(FieldValue, ReadonlyVariable):
"""
Display пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 1
class VariableClDesktopOnlineUid(FieldValue, ReadonlyVariable):
"""
UID пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 2
class VariableClDesktopOnlineDomainSet(FieldValue, ReadonlyVariable):
"""
Является ли пользователь доменным
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 3
class VariableClDesktopOnlineCount(FieldValue, ReadonlyVariable):
"""
Количество сеансов пользователя
"""
type = "list"
source_variable = "cl_desktop_online_data"
column = 4
class VariableClDesktopLogin(user.VariableUrLogin):
"""
User Login
"""
opt = ["cl_desktop_login"]
def choice(self):
if self.Get('cl_action') == 'logout':
return self.Get('cl_desktop_online_user')
else:
return user.VariableUrLogin.choice(self)
def check(self, value):
"""Does user exist"""
if value not in self.choice() and self.Get('cl_action') == 'logout':
raise VariableError(_("No X session user found"))
if value == "":
raise VariableError(_("Please specify the user name"))
try:
pwd.getpwnam(value).pw_gid
except Exception:
raise VariableError(_("User %s does not exist") % value)
def get(self):
return ""
class VariableUrMountDirs(ReadonlyVariable):
"""
Примонтированные директории в профиле пользователя
"""
type = "list"
def get(self):
homeDir = self.Get("ur_home_path")
if not homeDir:
return []
dirStart, dirEnd = path.split(homeDir)
mountProfileDir = path.join(dirStart, ".%s" % dirEnd)
mountRemoteProfileDir = path.join(dirStart, ".%s.remote" % dirEnd)
directories = filter(lambda x: x != homeDir,
filter(lambda x: (x.startswith(homeDir) or
x.startswith(mountProfileDir) or
x.startswith(
mountRemoteProfileDir)),
map(lambda x: x.split(" ")[1],
readLinesFile('/proc/mounts'))))
# if isMount(homeDir):
# directories.append(homeDir)
return sorted(directories, reverse=True)
class VariableUrPassword(ReadonlyVariable):
"""
Пароль пользователя, получаемый из ключей ядра
"""
def get(self):
return getKey(self.Get('ur_login')) or ""
class VariableClDesktopUpdateProfileSet(Variable):
"""
Нужно ли выполнять обновление профиля пользователя на основании
обновлении пакетов
"""
type = "bool"
def is_fastlogin(self):
urLogin = self.Get('ur_login')
fastlogin = self.Get('cl_desktop_fastlogin_path')
fastlogin_user = path.join(fastlogin, urLogin)
if path.exists(fastlogin_user):
return True
return False
def get(self):
lastTimestamp = templateFunction.getLastElog()
iniEnv = path.join(self.Get('ur_home_path'), '.calculate/ini.env')
userIni = iniParser(iniEnv)
userTimestamp = userIni.getVar('main', 'elog')
if userTimestamp:
userTimestamp = userTimestamp.encode('utf-8')
profileSetup = userIni.getVar('main', 'profile_setup')
login_setup = profileSetup == 'on'
if (self.Get('ur_domain_set') == 'on' or
not self.is_fastlogin() or login_setup or
not path.exists(iniEnv) or userTimestamp != lastTimestamp):
return 'on'
else:
return 'off'
class VariableClDesktopForceSetupSet(Variable):
"""
Принудительно выполнить обновление пользовательского профиля
"""
type = "bool"
opt = ["--force-setup"]
metavalue = "ON/OFF"
value = "off"
def init(self):
self.label = _("Force configuration")
self.help = _("force configuration")
class VariableClDesktopFacePath(Variable):
"""Путь к стандартным иконкам пользователей"""
value = "/usr/share/pixmaps/faces"
class VariableClDesktopFaceList(Variable):
"""Список доступных иконок по умолчанию для пользователей"""
type = "list"
def get(self):
return sorted(
filter(lambda x: x.endswith('.png'),
listDirectory(self.Get('cl_desktop_face_path'))))
class VariableClDesktopHashFace(Variable):
"""Номер иконки пользователя
Номер вычисляется по контрольной сумму md5 логина пользователя
"""
def get(self):
login = self.Get('ur_login')
icon_list = self.Get('cl_desktop_face_list')
if icon_list:
# return path.join(self.Get('cl_desktop_face_path'),
# icon_list[sum(map(lambda x: ord(x), hashlib.md5(login).digest())) % len(icon_list)])
return path.join(self.Get('cl_desktop_face_path'),
icon_list[sum([x for x in hashlib.md5(login.encode("UTF-8")).digest()]) % len(icon_list)])
else:
return ""
class VariableClDesktopFastloginPath(ReadonlyVariable):
"""
Путь до каталога в котором указаны пользователи быстрого входа в сеанс
"""
value = "/var/lib/calculate/calculate-desktop/fastlogin"
class VariableClDesktopElogindSet(ReadonlyVariable):
"""
В системе используется elogind
"""
type = Variable.Types.Boolean
elogind = "sys-auth/elogind"
consolekit = "sys-auth/consolekit"
def get(self):
if isPkgInstalled(self.elogind) and \
not isPkgInstalled(self.consolekit):
return Variable.On
return Variable.Off