#-*- coding: utf-8 -*- # Copyright 2008-2010 Calculate Ltd. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import re import os import types import pwd, grp import cl_overriding from cl_vars_share import varsShare, clLocale from os.path import exists as pathexists from os import path from cl_utils import isMount, genpassword, \ getAvailableVideo, process, \ listDirectory,isPkgInstalled,lspci, readLinesFile, \ getUdevDeviceInfo from utils import ip from encrypt import getHash class fillVars(varsShare): """Auxilary object for creating variables Contains filling methods""" addDn = lambda x,*y: ",".join(y) genDn = lambda x,*y: "=".join(y) def get_cl_env_path(self): """Path to env files""" envData = self.Get("cl_env_data") if envData: return map(lambda x: x[1], envData) else: cl_overriding.printERROR(_("Error:") + " " +\ _("Template variable cl_env_data is empty")) cl_overriding.exit(1) def get_cl_env_location(self): """Aliases to env files""" envData = self.Get("cl_env_data") if envData: return map(lambda x: x[0], envData) else: cl_overriding.printERROR(_("Error:") + " " +\ _("Template variable cl_env_data is empty")) cl_overriding.exit(1) def get_cl_env_server_path(self): """Paths to clt-template files""" return '/var/calculate/remote/server.env' def get_cl_template_path(self): """Paths to template files""" return ["/usr/share/calculate/templates", "/var/calculate/templates", "/var/calculate/remote/templates"] def get_cl_template_clt_path(self): """Path to 'clt' files""" if "CONFIG_PROTECT" in os.environ: protectPaths = ["/etc"] + filter(lambda x: x.strip(), os.environ["CONFIG_PROTECT"].split(" ")) else: protectPaths = ["/etc", "/usr/share/X11/xkb", "var/lib/hsqldb", "/usr/share/config"] return filter(path.exists, protectPaths) def get_os_net_domain(self): """Get net domain name""" textLines = self._runos("hostname -d 2>&1") if textLines is False: cl_overriding.printERROR(_("Error executing 'hostname -d'")) return cl_overriding.exit(1) domain = "" if textLines: domain = textLines[0] if not domain: cl_overriding.printERROR(_("Error:") + " " +\ _("Domain name not found")) cl_overriding.printERROR(\ _("Command 'hostname -d' returns an empty value")) return cl_overriding.exit(1) elif re.search("^hostname: ",domain): return "local" else: return domain def get_os_linux_shortname(self): """Get short system name""" systemRoot = "/" return self.getShortnameByMakeprofile(systemRoot) or \ self.getShortnameByIni(systemRoot) or \ self.detectOtherShortname(systemRoot) or \ "Linux" def get_os_linux_name(self): """Get full system name""" linuxShortName = self.Get("os_linux_shortname") return self.dictLinuxName.get(linuxShortName,"Linux") def get_os_linux_subname(self): """Get posfix name of system""" linuxShortName = self.Get("os_linux_shortname") return self.dictLinuxSubName.get(linuxShortName,"") def get_os_linux_ver(self): """Get system version""" linuxShortName = self.Get("os_linux_shortname") return self.getVersionFromMetapackage('/',linuxShortName) or \ self.getVersionFromCalculateIni('/') or \ self.getVersionFromGentooFiles('/') or \ self.getVersionFromUname() or "0" def get_os_net_hostname(self): """Get hostname of computer""" textLines = self._runos("hostname -s 2>&1") hostname = "" if textLines: hostname = textLines[0] if not hostname: return "" if re.search("^hostname: ",hostname): textLines = self._runos("hostname 2>&1") if not textLines: return "" hostname = textLines[0] if re.search("^hostname: ",hostname): return self.Get('os_linux_shortname') else: if hostname=='livecd': return self.Get('os_linux_shortname') return hostname def get_os_net_ip(self): """All computer ip addresses, comma delimeter""" IPs = [] netInterfaces=self.Get("os_net_interfaces") for i in netInterfaces: ipaddr, mask = ip.receiveIpAndMask(i) if ipaddr: IPs.append(ipaddr) return ",".join(IPs) def get_os_net_interfaces(self): """All net interfaces""" return filter(lambda x: x!="lo", self.getDirList("/sys/class/net")) def get_os_net_allow(self): """Allowed networks""" networks=[] netInterfaces=self.Get("os_net_interfaces") for i in netInterfaces: ipaddr, mask = ip.receiveIpAndMask(i) if ipaddr and mask: networks.append(ip.getIpNet(ipaddr, mask)) else: networks.append("") return ",".join(filter(lambda x:x,networks)) def get_os_arch_machine(self): """Processor architecture""" textLines = self._runos("uname -m") if not textLines: return "" march = textLines[0] return march def get_os_root_dev(self): """Root filesystem device""" record = open('/proc/cmdline','rb').read().strip() re_resRealRoot=re.search('(?:^|\s)real_root=(\S+)(\s|$)',record) re_resFakeRoot=re.search('(?:^|\s)root=(\S+)(\s|$)',record) # param real_root priority that root re_res = re_resRealRoot or re_resFakeRoot if re_res: rootparam=re_res.group(1) # check root for /dev/sd view if re.match("^\/dev\/[a-z]+.*$", rootparam): return getUdevDeviceInfo( name=rootparam.strip()).get('DEVNAME',rootparam) # check root set by uuid if re.match("^UUID=.*$",rootparam): uuid = rootparam[5:].strip("\"'") blkidProcess = process('/sbin/blkid','-c','/dev/null','-U', uuid) if blkidProcess.success(): return getUdevDeviceInfo( name=blkidProcess.read().strip()).get('DEVNAME','') # check root set by label if re.match("^LABEL=.*$",rootparam): uuid = rootparam[6:].strip("\"'") blkidProcess = process('/sbin/blkid','-c','/dev/null','-L', uuid) if blkidProcess.success(): return getUdevDeviceInfo( name=blkidProcess.read().strip()).get('DEVNAME','') # get device mounted to root dfLines = self._runos("LANG=C df /") if not dfLines: return "" if type(dfLines) == types.ListType and len(dfLines)>1: root_dev = dfLines[1].split(" ")[0].strip() if root_dev: return {'none':'/dev/ram0'}.get(root_dev,root_dev) return "" def get_os_root_type(self): """Root device type (ram, hdd, livecd)""" def link2pair(linkfile): """Return pair (target,link) from link""" basedir = os.path.dirname(linkfile) targetfile = os.readlink(linkfile) return (path.normpath(os.path.join(basedir,targetfile)),linkfile) rootDev = self.Get("os_root_dev") if rootDev: if "/dev/ram" in rootDev or "/dev/nfs" in rootDev: return "livecd" idDict = dict(map(link2pair, filter(lambda x:path.islink(x), map(lambda x:path.join('/dev/disk/by-id',x), listDirectory('/dev/disk/by-id'))))) if "usb-" in idDict.get(rootDev,""): return "usb-hdd" return "hdd" def get_hr_cdrom_set(self): """Cdrom variable""" if os.path.exists('/sys/block/sr0'): textLines = self._runos( "udevadm info --query=all --name=/dev/dvdrw") if not textLines is False: for line in textLines: if "ID_CDROM=1" in line: return "on" return "off" def get_hr_virtual(self): """Virtual machine name (virtualbox, vmware, qemu)""" pciLines = self._runos("/usr/sbin/lspci") if not pciLines: return False virtSysDict = {'VirtualBox':'virtualbox', 'VMware':'vmware', 'Qumranet':'qemu'} virtName = '' for vName in virtSysDict.keys(): if filter(lambda x: vName in x, pciLines): virtName = virtSysDict[vName] break return virtName def get_hr_board_model(self): """Get motherboard model""" modelFile = "/sys/class/dmi/id/board_name" try: return open(modelFile,"r").read().strip() except: return "" def get_hr_board_vendor(self): """Get motherboard vendor""" vendorFile = "/sys/class/dmi/id/board_vendor" try: return open(vendorFile,"r").read().strip() except: return "" def get_hr_cpu_num(self): """Get processors count""" cpuinfoFile = "/proc/cpuinfo" try: return len(["" for line in open(cpuinfoFile,"r").readlines() if line.startswith("processor")]) except: return 1 def get_os_locale_locale(self): """locale (example: ru_RU.UTF-8)""" locale = clLocale() # get locale from boot calculate param localeVal = self.getValueFromCmdLine("calculate",0) if locale.isLangExists(localeVal): return locale.getFieldByLang('locale',localeVal) else: localeVal = self.getValueFromConfig('/etc/env.d/02locale','LANG') if locale.isValueInFieldExists('locale',localeVal): return localeVal if os.environ.has_key("LANG") and os.environ["LANG"] != "C": return os.environ["LANG"] return locale.getFieldByLang("locale","default") def get_os_locale_lang(self): """lang (example: ru_RU)""" locale = clLocale() return locale.getLangByField("locale",self.Get('os_locale_locale')) def get_os_locale_language(self): """language (example: ru)""" locale = clLocale() return locale.getFieldByLang("language",self.Get('os_locale_lang')) def get_os_locale_xkb(self): """xkb layouts (example: en,ru)""" locale = clLocale() return locale.getFieldByLang("xkblayout", self.Get('os_locale_lang')) def get_os_locale_xkbname(self): """названия используемых раскладок клавиатуры для X""" localeXkb = self.Get("os_locale_xkb") if localeXkb: return localeXkb.split("(")[0] return "" def get_ur_login(self): """User login""" uid = os.getuid() try: userName = pwd.getpwuid(uid).pw_name except: return "" return userName def get_ur_group(self): """User group""" userName = self.Get('ur_login') groupName = "" if userName: try: gid = pwd.getpwnam(userName).pw_gid groupName = grp.getgrgid(gid).gr_name except: return "" return groupName def get_ur_fullname(self): """Full user name""" userName = self.Get('ur_login') fullName = "" if userName: try: fullName = pwd.getpwnam(userName).pw_gecos except: return "" return fullName def get_ur_jid(self): """Get user Jabber id""" userInfo = self.getUserInfo() userJID = "" if userInfo: userJID = userInfo["jid"] return userJID def get_ur_mail(self): """Get user email""" userInfo = self.getUserInfo() userMail = "" if userInfo: userMail = userInfo["mail"] return userMail def get_ur_home_path(self): """Get user home directory""" userName = self.Get('ur_login') homeDir = "" if userName: try: homeDir = pwd.getpwnam(userName).pw_dir except: return "" return homeDir def get_os_linux_system(self): """Get linux system (server or desktop)""" shortName = self.Get('os_linux_shortname') return self.dictNameSystem.get(shortName,"") def get_os_x11_video_drv(self): """Get video driver used by xorg""" xorg_conf = '/etc/X11/xorg.conf' # Try analize Xorg.{DISPLAY}.log display = os.environ.get('DISPLAY') list_available_drivers = \ getAvailableVideo(prefix=self.Get('cl_chroot_path')) if display and list_available_drivers: reDriver = re.compile('|'.join(map(lambda x: "%s_drv.so"%x, list_available_drivers))) display_number = re.search(r':(\d+)(\..*)?', display) reDriverName = re.compile(r'([^/]+)_drv.so') if display_number: xorg_log_file = '/var/log/Xorg.%s.log' % \ display_number.group(1) if path.exists(xorg_log_file): matchStrs = \ map(lambda x:x.group(1), filter(lambda x:x, map(reDriverName.search, filter(lambda x:"drv" in x and reDriver.search(x), readLinesFile(xorg_log_file))))) if matchStrs: return matchStrs[-1] # analize /etc/X11/xorg.conf if path.exists(xorg_conf): matchSect = re.search(r'Section "Device".*?EndSection', open('/etc/X11/xorg.conf').read(),re.S) if matchSect: resDriver = re.search(r'^\S*Driver\s*"([^"]+)"', matchSect.group(0),re.S) if resDriver and resDriver.group(1) in list_available_drivers: return resDriver.group(1) videoVal = self.getValueFromCmdLine("calculate","video") videoVal = {'i915':'intel'}.get(videoVal,videoVal) if not isPkgInstalled('xorg-server') or \ videoVal in list_available_drivers: return videoVal return self.getVideoByDefault(list_available_drivers) def getResByXDpyInfo(self): """Get resolution by xdpyinfo utility""" lines=self._runos("xdpyinfo") if not lines: return "" reRes = re.compile("dimensions:\s+(\d+)x(\d+)\s+pixels") searchRes=False for line in lines: searchRes = reRes.search(line) if searchRes: break if searchRes: return "%sx%s"%(searchRes.group(1), searchRes.group(2)) return "" def get_os_x11_resolution(self): """ Return current screen resolution (width, height). Try detect by xdpyinfo, then Xorg.log, xorg.conf """ resolution = self.getResByXDpyInfo() if resolution: return resolution if self.Get('os_root_type') != 'usb-hdd': xlog = "/var/log/Xorg.0.log" if os.access(xlog,os.R_OK): reXorgLogParser = re.compile(""" Virtual\ screen\ size\ determined\ to\ be \ ([0-9]+)\s*x\s*([0-9]+)| Setting\ mode\ "(\d+)x(\d+)[0-9\@]"| Output\ [\S]+\ using\ initial\ mode\ (\d+)x(\d+)| Virtual\ size\ is\ (\d+)x(\d+)""", re.X | re.S) resXorgLogParser = reXorgLogParser.search(open(xlog,'r').read()) if resXorgLogParser: return "%sx%s"%filter(lambda x:x, resXorgLogParser.groups())[:2] # get resolution from xorg.conf xorgconf = "/etc/X11/xorg.conf" reScreenSections = re.compile('Section "Screen"(.*?)EndSection', re.S) reModes = re.compile('Modes\s+"(\d+x\d+)') if os.access(xorgconf,os.R_OK): sectionsScreen = filter(lambda x:"Modes" in x, reScreenSections.findall(open('/etc/X11/xorg.conf', 'r').read())) modes = map(lambda x:x.groups()[0], filter(lambda x:x, map(reModes.search, sectionsScreen))) if modes: return max(modes,key=lambda x:int(x.partition('x')[0])) # get resolution from command line reRightResolution = re.compile("^(\d+x\d+|auto)$",re.S) kernelResolution = self.getValueFromCmdLine("calculate",3) if kernelResolution and reRightResolution.match(kernelResolution): return {'auto':''}.get(kernelResolution,kernelResolution) else: return "" def get_os_x11_height(self): """Get screen height in pixeles""" return self.Get('os_x11_resolution').partition('x')[2] or "768" def get_os_x11_width(self): """Get screen width in pixeles""" return self.Get('os_x11_resolution').partition('x')[0] or "1024" def get_os_x11_standart(self): """Get the nearest standard size of image relative current screen resolution""" #Стандартные разрешения widthVal = self.Get('os_x11_width') heightVal = self.Get('os_x11_height') if not widthVal or not heightVal: return "" width = int(widthVal) height = int(heightVal) res = [(1024,600), (1024,768), (1280,1024), (1280,800), (1366,768), (1440,900), (1600,1200), (1680,1050), (1920,1200)] resolution = [] formats = [] for w, h in res: formats.append(float(w)/float(h)) listFr = list(set(formats)) listFormats = {} for fr in listFr: listFormats[fr] = [] for w, h in res: for fr in listFormats.keys(): if fr == float(w)/float(h): listFormats[fr].append((w,h)) break format = float(width)/float(height) deltaFr = {} for fr in listFormats.keys(): deltaFr[abs(format - fr)] = fr resolution = listFormats[deltaFr[min(deltaFr.keys())]] flagFound = False stResol = [] stHeights = [] stWidths = [] stWidth = False stHeight = False for w, h in resolution: if w >= width and h >= height: stResol.append((w,h)) stHeights.append(h) if stHeights: stHeight = min(stHeights) for w, h in stResol: if stHeight == h: stWidths.append(w) if stWidths: stWidth = min(stWidths) if (not stWidth) or (not stHeight): return "%sx%s"%(resolution[-1][0],resolution[-1][1]) else: return "%sx%s"%(stWidth,stHeight) def get_os_x11_composite(self): """On or off composite mode""" state = self.get_composite_from_xorgconf() return state or "off" def get_hr_laptop(self): """Laptop vendor""" chassisType = '/sys/class/dmi/id/chassis_type' boardVendor = '/sys/class/dmi/id/board_vendor' if os.access(chassisType,os.R_OK) and \ os.access(boardVendor,os.R_OK): chassis = open(chassisType,'r').read().strip() notebookChassis = ['1','8','10'] if chassis in notebookChassis: valBoardVendor = open(boardVendor,'r').read().strip() return valBoardVendor.split(" ")[0].lower() or \ "unknown" return "" def get_hr_laptop_model(self): """Laptop name""" boardName = '/sys/class/dmi/id/board_name' if self.Get('hr_laptop') and os.access(boardName,os.R_OK): valBoardName = open(boardName,'r').read().strip() return valBoardName or "unknown" return "" def get_hr_video_name(self): """Get video name""" pciVideo = list(sorted(lspci("VGA compatible").items())) if pciVideo: pciVideo = pciVideo[0][1] vendor=pciVideo.get("vendor","").split(" ")[0] name=pciVideo.get("name","") if "[" in name and "]" in name: name = name.partition("[")[2].partition("]")[0] return "{vendor} {name}".format(vendor=vendor,name=name) return "" def get_hr_video(self): """Videocard vendor""" line = self.Get('hr_video_name') if "nVidia" in line or "GeForce" in line: return "nvidia" elif "ATI" in line: return "ati" elif "Intel" in line: return "intel" elif "VIA" in line: return "via" elif "VMware" in line: return "vmware" else: return "other" def get_cl_kernel_uid(self): """Get UID of symlink kernel, initramfs and System.map""" return self.getKernelUid(self.Get('os_root_dev')) def get_cl_chroot_status(self): """Detect chroot mode by mtab content""" try: return "on" if self.isChroot(os.getpid()) else "off" except: return "off" def get_os_scratch(self): """Current system is scratch""" if self.Get('os_root_type') == 'livecd': return "on" if isMount('/mnt/scratch/workspace') else "off" else: return "on" if isMount('/mnt/scratch') else "off" def get_cl_root_path(self): """Path to directory relative which perform joining templates to system files (sandbox)""" return '/' def get_cl_chroot_path(self): """Path to directory which contain other system""" return '/' def get_cl_autoupdate_set(self): """(on or off) autoupdate config from install program""" return 'off' def get_cl_api(self): """The path to the module api, and additional parameters caluclate packages""" return {} def get_ld_encrypt(self): """hash crypto algoritm""" return 'ssha' def get_ld_bind_login(self): """bind login""" return 'proxyuser' def get_ld_base_root(self): """base name LDAP""" return 'calculate' def get_ld_base_dn(self): """base DN LDAP""" return self.genDn("dc", self.Get('ld_base_root')) def get_ld_bind_dn(self): """bind DN LDAP""" return self.addDn(self.genDn("cn", self.Get('ld_bind_login')), self.Get('ld_base_dn')) def get_ld_bind_pw(self): """bind password""" return 'calculate' def get_ld_bind_hash(self): """hash bind""" return getHash(self.Get('ld_bind_pw'), self.Get('ld_encrypt')) def get_ld_admin_login(self): """administrator name""" return 'ldapadmin' def get_ld_admin_dn(self): """root DN""" return self.addDn(self.genDn("cn", self.Get('ld_admin_login')), self.Get('ld_base_dn')) def get_ld_admin_hash(self): """root hash""" return getHash(self.Get('ld_admin_pw'), self.Get('ld_encrypt')) def get_ld_admin_pw(self): """password root""" return genpassword() def get_ld_services(self): """Name from all services""" return 'Services' def get_ld_services_dn(self): """DN from all services""" return self.addDn(self.genDn("ou", self.Get('ld_services')), self.Get('ld_base_dn')) def get_cl_ca_cert(self): """CA certificate""" return 'CA.crt' def get_cl_ca_key(self): """CA key""" return 'CA.key' def get_cl_ca_path(self): """CA path""" return '/var/calculate/ssl/main' def get_os_clock_timezone(self): """Current clock timezone""" zoneinfodir = "/usr/share/zoneinfo/" localtimefile = "/etc/localtime" timezonefile = "/etc/timezone" # try get timezone from kernel calculate param timezone = self.getValueFromCmdLine("calculate",2) if timezone and \ path.exists(path.join(zoneinfodir,timezone)): return timezone # get timezone from /etc/timezone if path.exists(timezonefile): return open(timezonefile,"r").read().strip() return "UTC" def get_os_lang(self): """Supported languages""" return list(sorted(list(set(clLocale().getLangs()) & set(["en_US","de_DE","es_ES","fr_FR","it_IT","pl_PL","pt_BR", "uk_UA","bg_BG","ru_RU","ro_RO","pt_PT"]))))