You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-2.2-lib/pym/cl_fill.py

702 lines
24 KiB

#-*- coding: utf-8 -*-
# Copyright 2008-2010 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import os
import types
import pwd, grp
import cl_overriding
from cl_vars_share import varsShare, clLocale
from os.path import exists as pathexists
from os import path
from cl_utils import isMount, genpassword, getAvailableX11Drivers, \
listDirectory
from encrypt import getHash
class fillVars(varsShare):
"""Auxilary object for creating variables
Contains filling methods"""
addDn = lambda x,*y: ",".join(y)
genDn = lambda x,*y: "=".join(y)
def get_cl_env_path(self):
"""Path to env files"""
envData = self.Get("cl_env_data")
if envData:
return map(lambda x: x[1], envData)
else:
cl_overriding.printERROR(_("Error:") + " " +\
_("Template variable cl_env_data is empty"))
cl_overriding.exit(1)
def get_cl_env_location(self):
"""Aliases to env files"""
envData = self.Get("cl_env_data")
if envData:
return map(lambda x: x[0], envData)
else:
cl_overriding.printERROR(_("Error:") + " " +\
_("Template variable cl_env_data is empty"))
cl_overriding.exit(1)
def get_cl_env_server_path(self):
"""Paths to clt-template files"""
return '/var/calculate/remote/server.env'
def get_cl_template_path(self):
"""Paths to template files"""
return ["/usr/share/calculate/templates",
"/var/calculate/templates",
"/var/calculate/remote/templates"]
def get_cl_template_clt_path(self):
"""Path to 'clt' files"""
if "CONFIG_PROTECT" in os.environ:
protectPaths = ["/etc"] + filter(lambda x: x.strip(),
os.environ["CONFIG_PROTECT"].split(" "))
else:
protectPaths = ["/etc", "/usr/share/X11/xkb", "var/lib/hsqldb",
"/usr/share/config"]
return filter(path.exists, protectPaths)
def get_os_net_domain(self):
"""Get net domain name"""
textLines = self._runos("hostname -d 2>&1")
if textLines is False:
cl_overriding.printERROR(_("Error execute 'hostname -d'"))
return cl_overriding.exit(1)
domain = ""
if textLines:
domain = textLines[0]
if not domain:
cl_overriding.printERROR(_("Error:") + " " +\
_("Not found domain name"))
cl_overriding.printERROR(\
_("Command 'hostname -d' returns an empty value"))
return cl_overriding.exit(1)
elif re.search("^hostname: ",domain):
return "local"
else:
return domain
def get_os_linux_shortname(self):
"""Get short system name"""
systemRoot = "/"
return self.getShortnameByMakeprofile(systemRoot) or \
self.getShortnameByIni(systemRoot) or \
self.detectOtherShortname(systemRoot) or \
"Linux"
def get_os_linux_name(self):
"""Get full system name"""
linuxShortName = self.Get("os_linux_shortname")
return self.dictLinuxName.get(linuxShortName,"Linux")
def get_os_linux_subname(self):
"""Get posfix name of system"""
linuxShortName = self.Get("os_linux_shortname")
return self.dictLinuxSubName.get(linuxShortName,"")
def get_os_linux_ver(self):
"""Get system version"""
linuxShortName = self.Get("os_linux_shortname")
return self.getVersionFromMetapackage('/',linuxShortName) or \
self.getVersionFromCalculateIni('/') or \
self.getVersionFromGentooFiles('/') or \
self.getVersionFromUname() or "0"
def get_os_net_hostname(self):
"""Get hostname of computer"""
textLines = self._runos("hostname -s 2>&1")
hostname = ""
if textLines:
hostname = textLines[0]
if not hostname:
return ""
if re.search("^hostname: ",hostname):
textLines = self._runos("hostname 2>&1")
if not textLines:
return ""
hostname = textLines[0]
if re.search("^hostname: ",hostname):
return self.Get('os_linux_shortname')
else:
if hostname=='livecd':
return self.Get('os_linux_shortname')
return hostname
def get_os_net_ip(self):
"""All computer ip addresses, comma delimeter"""
IPs = []
netInterfaces=self.Get("os_net_interfaces")
for i in netInterfaces:
res = self._runos("/sbin/ifconfig %s"%i)
if not res:
break
for line in res:
searchIP = re.search('addr:([0-9\.]+).+Bcast:', line)
if searchIP:
# ip адрес
ip = searchIP.groups()[0]
IPs.append(ip)
return ",".join(IPs)
def get_os_net_interfaces(self):
"""All net interfaces"""
return filter(lambda x: x!="lo", self.getDirList("/sys/class/net"))
def get_os_net_allow(self):
"""Allowed networks"""
def getNet(ip, mask):
"""By ip and mask get network"""
octetsMult = (0x1, 0x100, 0x10000, 0x1000000)
octetsIp = map(lambda x: int(x), ip.split("."))
octetsMask = map(lambda x: int(x), mask.split("."))
ipNumb = 0
for i in octetsMult:
ipNumb += octetsIp.pop()*i
maskNumb = 0
for i in octetsMult:
maskNumb += octetsMask.pop()*i
startIpNumber = maskNumb&ipNumb
x = startIpNumber
nMask = lambda y: len(filter(lambda x: y >> x &1 ,range(32)))
return "%s.%s.%s.%s/%s"\
%(x>>24, x>>16&255, x>>8&255, x&255, nMask(maskNumb))
networks=[]
netInterfaces=self.Get("os_net_interfaces")
flagError = False
for i in netInterfaces:
res = self._runos("/sbin/ifconfig %s"%i)
if not res:
flagError = True
break
for j in res:
s_ip=re.search('addr:([0-9\.]+).+Bcast:.+Mask:([0-9\.]+)' ,j)
if s_ip:
ip, mask = s_ip.groups()
networks.append(getNet(ip, mask))
if flagError:
return ""
return ",".join(networks)
def get_os_arch_machine(self):
"""Processor architecture"""
textLines = self._runos("uname -m")
if not textLines:
return ""
march = textLines[0]
return march
def get_os_root_dev(self):
"""Root filesystem device"""
record = open('/proc/cmdline','rb').read().strip()
re_resRealRoot=re.search('(?:^|\s)real_root=(\S+)(\s|$)',record)
re_resFakeRoot=re.search('(?:^|\s)root=(\S+)(\s|$)',record)
# param real_root priority that root
re_res = re_resRealRoot or re_resFakeRoot
if re_res:
rootparam=re_res.group(1)
# check root for /dev/sd view
if re.match("^\/dev\/[a-z]+.*$", rootparam):
return rootparam
# check root set by uuid
uuidpath = '/dev/disk/by-uuid'
if os.access(uuidpath,os.R_OK):
uuidDevs = filter(path.islink,
map(lambda x: path.join(uuidpath,x),
os.listdir(uuidpath)))
mapUuidDev = dict(map(lambda x:(path.basename(x),
path.normpath(path.join(uuidpath,
os.readlink(x)))), uuidDevs))
else:
mapUuidDev = {}
if re.match("^UUID=.*$",rootparam):
uuid = rootparam[5:]
if uuid in mapUuidDev:
return mapUuidDev[uuid]
# get device mounted to root
dfLines = self._runos("LANG=C df /")
if not dfLines:
return ""
if type(dfLines) == types.ListType and len(dfLines)>1:
root_dev = dfLines[1].split(" ")[0].strip()
if root_dev:
return {'none':'/dev/ram0'}.get(root_dev,root_dev)
return ""
def get_os_root_type(self):
"""Root device type (ram, hdd, livecd)"""
def link2pair(linkfile):
"""Return pair (target,link) from link"""
basedir = os.path.dirname(linkfile)
targetfile = os.readlink(linkfile)
return (path.normpath(os.path.join(basedir,targetfile)),linkfile)
rootDev = self.Get("os_root_dev")
if rootDev:
if "/dev/ram" in rootDev:
return "livecd"
idDict = dict(map(link2pair,
filter(lambda x:path.islink(x),
map(lambda x:path.join('/dev/disk/by-id',x),
listDirectory('/dev/disk/by-id')))))
if "usb-" in idDict.get(rootDev,""):
return "usb-hdd"
return "hdd"
def get_hr_cdrom_set(self):
"""Cdrom variable"""
if os.path.exists('/sys/block/sr0'):
textLines = self._runos("udevadm info --query=all --name=/dev/dvdrw")
if not textLines is False:
for line in textLines:
if "ID_CDROM=1" in line:
return "on"
return "off"
def get_hr_virtual(self):
"""Virtual machine name (virtualbox, vmware, qemu)"""
pciLines = self._runos("/usr/sbin/lspci")
if not pciLines:
return False
virtSysDict = {'VirtualBox':'virtualbox',
'VMware':'vmware',
'Qumranet':'qemu'}
virtName = ''
for vName in virtSysDict.keys():
if filter(lambda x: vName in x, pciLines):
virtName = virtSysDict[vName]
break
return virtName
def get_hr_board_model(self):
"""Get motherboard model"""
modelFile = "/sys/class/dmi/id/board_name"
try:
return open(modelFile,"r").read().strip()
except:
return ""
def get_hr_board_vendor(self):
"""Get motherboard vendor"""
vendorFile = "/sys/class/dmi/id/board_vendor"
try:
return open(vendorFile,"r").read().strip()
except:
return ""
def get_hr_cpu_num(self):
"""Get processors count"""
cpuinfoFile = "/proc/cpuinfo"
try:
return len(["" for line in open(cpuinfoFile,"r").readlines()
if "processor" in line])
except:
return 1
def get_os_locale_locale(self):
"""locale (example: ru_RU.UTF-8)"""
locale = clLocale()
# get locale from boot calculate param
localeVal = self.getValueFromCmdLine("calculate",0)
if locale.isLangExists(localeVal):
return locale.getFieldByLang('locale',localeVal)
else:
localeVal = self.getValueFromConfig('/etc/env.d/02locale','LANG')
if locale.isValueInFieldExists('locale',localeVal):
return localeVal
if os.environ.has_key("LANG") and os.environ["LANG"] != "C":
return os.environ["LANG"]
return locale.getFieldByLang("locale","default")
def get_os_locale_lang(self):
"""lang (example: ru_RU)"""
locale = clLocale()
return locale.getLangByField("locale",self.Get('os_locale_locale'))
def get_os_locale_language(self):
"""language (example: ru)"""
locale = clLocale()
return locale.getFieldByLang("language",self.Get('os_locale_lang'))
def get_os_locale_xkb(self):
"""xkb layouts (example: en,ru)"""
locale = clLocale()
return locale.getFieldByLang("xkblayout",
self.Get('os_locale_lang'))
def get_os_locale_xkbname(self):
"""названия используемых раскладок клавиатуры для X"""
localeXkb = self.Get("os_locale_xkb")
if localeXkb:
return localeXkb.split("(")[0]
return ""
def get_ur_login(self):
"""User login"""
uid = os.getuid()
try:
userName = pwd.getpwuid(uid).pw_name
except:
return ""
return userName
def get_ur_group(self):
"""User group"""
userName = self.Get('ur_login')
groupName = ""
if userName:
try:
gid = pwd.getpwnam(userName).pw_gid
groupName = grp.getgrgid(gid).gr_name
except:
return ""
return groupName
def get_ur_fullname(self):
"""Full user name"""
userName = self.Get('ur_login')
fullName = ""
if userName:
try:
fullName = pwd.getpwnam(userName).pw_gecos
except:
return ""
return fullName
def get_ur_jid(self):
"""Get user Jabber id"""
userInfo = self.getUserInfo()
userJID = ""
if userInfo:
userJID = userInfo["jid"]
return userJID
def get_ur_mail(self):
"""Get user email"""
userInfo = self.getUserInfo()
userMail = ""
if userInfo:
userMail = userInfo["mail"]
return userMail
def get_ur_home_path(self):
"""Get user home directory"""
userName = self.Get('ur_login')
homeDir = ""
if userName:
try:
homeDir = pwd.getpwnam(userName).pw_dir
except:
return ""
return homeDir
def get_os_linux_system(self):
"""Get linux system (server or desktop)"""
shortName = self.Get('os_linux_shortname')
return self.dictNameSystem.get(shortName,"")
def get_os_x11_video_drv(self):
"""Get video driver used by xorg"""
xorg_conf = '/etc/X11/xorg.conf'
# Try analize Xorg.{DISPLAY}.log
display = os.environ.get('DISPLAY')
list_avialable_drivers = getAvailableX11Drivers()
if display and list_avialable_drivers:
reDriver = re.compile('|'.join(map(lambda x: "%s_drv.so"%x,
list_avialable_drivers)))
display_number = re.search(r':(\d+)\..*', display)
if display_number:
xorg_log_file = '/var/log/Xorg.%s.log' % \
display_number.group(1)
if path.exists(xorg_log_file):
matchStrs = [i for i in open(xorg_log_file)
if "drv" in i and reDriver.search(i)]
if matchStrs:
resDriver = re.search(r'([^/]+)_drv.so',
matchStrs[-1])
if resDriver:
return resDriver.group(1)
# analize /etc/X11/xorg.conf
if path.exists(xorg_conf):
matchSect = re.search(r'Section "Device".*?EndSection',
open('/etc/X11/xorg.conf').read(),re.S)
if matchSect:
resDriver = re.search(r'^\S*Driver\s*"([^"]+)"',
matchSect.group(0),re.S)
if resDriver and resDriver.group(1) in list_avialable_drivers:
return resDriver.group(1)
videoVal = self.getValueFromCmdLine("calculate","video")
videoVal = {'i915':'intel'}.get(videoVal,videoVal)
if videoVal in list_avialable_drivers:
return videoVal
workedModules = map(lambda x:x[0],
filter(lambda x:x[1].isdigit() and int(x[1])>0,
map(lambda x:x.split()[:3:2],
open('/proc/modules','r'))))
return self.getVideoByDefault(list_avialable_drivers)
def get_os_x11_height(self):
"""Get screen height in pixeles"""
resolution = self.getX11Resolution()
if resolution:
self.Set('os_x11_width',resolution[0])
return resolution[1]
return "768"
def get_os_x11_width(self):
"""Get screen width in pixeles"""
resolution = self.getX11Resolution()
if resolution:
self.Set('os_x11_height',resolution[1])
return resolution[0]
return "1024"
def get_os_x11_standart(self):
"""Get the nearest standard size of image relative current
screen resolution"""
#Стандартные разрешения
widthVal = self.Get('os_x11_width')
heightVal = self.Get('os_x11_height')
if not widthVal or not heightVal:
return ""
width = int(widthVal)
height = int(heightVal)
res = [(1024,600),
(1024,768),
(1280,1024),
(1280,800),
(1366,768),
(1440,900),
(1600,1200),
(1680,1050),
(1920,1200)]
resolution = []
formats = []
for w, h in res:
formats.append(float(w)/float(h))
listFr = list(set(formats))
listFormats = {}
for fr in listFr:
listFormats[fr] = []
for w, h in res:
for fr in listFormats.keys():
if fr == float(w)/float(h):
listFormats[fr].append((w,h))
break
format = float(width)/float(height)
deltaFr = {}
for fr in listFormats.keys():
deltaFr[abs(format - fr)] = fr
resolution = listFormats[deltaFr[min(deltaFr.keys())]]
flagFound = False
stResol = []
stHeights = []
stWidths = []
stWidth = False
stHeight = False
for w, h in resolution:
if w >= width and h >= height:
stResol.append((w,h))
stHeights.append(h)
if stHeights:
stHeight = min(stHeights)
for w, h in stResol:
if stHeight == h:
stWidths.append(w)
if stWidths:
stWidth = min(stWidths)
if (not stWidth) or (not stHeight):
return "%sx%s"%(resolution[-1][0],resolution[-1][1])
else:
return "%sx%s"%(stWidth,stHeight)
def get_os_x11_composite(self):
"""On or off composite mode"""
state = self.get_composite_from_xorgconf()
return state or "off"
def get_hr_laptop(self):
"""Laptop vendor"""
chassisType = '/sys/class/dmi/id/chassis_type'
boardVendor = '/sys/class/dmi/id/board_vendor'
if os.access(chassisType,os.R_OK) and \
os.access(boardVendor,os.R_OK):
chassis = open(chassisType,'r').read().strip()
notebookChassis = ['1','8','10']
if chassis in notebookChassis:
valBoardVendor = open(boardVendor,'r').read().strip()
return valBoardVendor.split(" ")[0].lower() or \
"unknown"
return ""
def get_hr_laptop_model(self):
"""Laptop name"""
boardName = '/sys/class/dmi/id/board_name'
if self.Get('hr_laptop') and os.access(boardName,os.R_OK):
valBoardName = open(boardName,'r').read().strip()
return valBoardName or "unknown"
return ""
def get_hr_video(self):
"""Videocard vendor"""
lines=self._runos("lspci")
if not lines:
return ""
reVGA = re.compile("vga",re.I)
foundVGA = False
for line in lines:
if reVGA.search(line):
foundVGA = True
break
if not foundVGA:
return "vesa"
if "nVidia" in line or "GeForce" in line:
return "nvidia"
elif "ATI" in line:
return "ati"
elif "Intel" in line:
return "intel"
elif "VIA" in line:
return "via"
elif "VMware" in line:
return "vmware"
else:
return "vesa"
def get_cl_kernel_uid(self):
"""Get UID of symlink kernel, initramfs and System.map"""
uuidpath = '/dev/disk/by-uuid'
if not os.access(uuidpath,os.R_OK):
return ""
uuidDevs = filter(path.islink,map(lambda x: path.join(uuidpath,x),
os.listdir(uuidpath)))
mapDevUuid = dict(map(lambda x:(path.normpath(path.join(uuidpath,
os.readlink(x))),
path.basename(x)),
uuidDevs))
if self.Get('os_root_dev') in mapDevUuid:
return mapDevUuid[self.Get('os_root_dev')][:8]
else:
return ""
def get_cl_chroot_status(self):
"""Detect chroot mode by mtab content"""
try:
return "on" if self.isChroot(os.getpid()) else "off"
except:
return "off"
def get_os_scratch(self):
"""Current system is scratch"""
if self.Get('os_root_type') == 'livecd':
return "on" if isMount('/mnt/scratch/workspace') else "off"
else:
return "on" if isMount('/mnt/scratch') else "off"
def get_cl_root_path(self):
"""Path to directory relative which perform joining templates to
system files (sandbox)"""
return '/'
def get_cl_chroot_path(self):
"""Path to directory which contain other system"""
return '/'
def get_cl_autoupdate_set(self):
"""(on or off) autoupdate config from install program"""
return 'off'
def get_cl_api(self):
"""The path to the module api,
and additional parameters caluclate packages"""
return {}
def get_ld_encrypt(self):
"""hash crypto algoritm"""
return 'ssha'
def get_ld_bind_login(self):
"""bind login"""
return 'proxyuser'
def get_ld_base_root(self):
"""base name LDAP"""
return 'calculate'
def get_ld_base_dn(self):
"""base DN LDAP"""
return self.genDn("dc", self.Get('ld_base_root'))
def get_ld_bind_dn(self):
"""bind DN LDAP"""
return self.addDn(self.genDn("cn", self.Get('ld_bind_login')),
self.Get('ld_base_dn'))
def get_ld_bind_pw(self):
"""bind password"""
return 'calculate'
def get_ld_bind_hash(self):
"""hash bind"""
return getHash(self.Get('ld_bind_pw'), self.Get('ld_encrypt'))
def get_ld_admin_login(self):
"""administrator name"""
return 'ldapadmin'
def get_ld_admin_dn(self):
"""root DN"""
return self.addDn(self.genDn("cn", self.Get('ld_admin_login')),
self.Get('ld_base_dn'))
def get_ld_admin_hash(self):
"""root hash"""
return getHash(self.Get('ld_admin_pw'), self.Get('ld_encrypt'))
def get_ld_admin_pw(self):
"""password root"""
return genpassword()
def get_ld_services(self):
"""Name from all services"""
return 'Services'
def get_ld_services_dn(self):
"""DN from all services"""
return self.addDn(self.genDn("ou", self.Get('ld_services')),
self.Get('ld_base_dn'))
def get_cl_ca_cert(self):
"""CA certificate"""
return 'CA.crt'
def get_cl_ca_key(self):
"""CA key"""
return 'CA.key'
def get_cl_ca_path(self):
"""CA path"""
return '/var/calculate/ssl/main'