#-*- coding: utf-8 -*- # Copyright 2008-2010 Calculate Ltd. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import re import os import types import pwd, grp import cl_overriding from cl_vars_share import varsShare, clLocale from os.path import exists as pathexists from os import path from cl_utils import isMount, genpassword, getAvailableX11Drivers, \ listDirectory from encrypt import getHash class fillVars(varsShare): """Auxilary object for creating variables Contains filling methods""" addDn = lambda x,*y: ",".join(y) genDn = lambda x,*y: "=".join(y) def get_cl_env_path(self): """Path to env files""" envData = self.Get("cl_env_data") if envData: return map(lambda x: x[1], envData) else: cl_overriding.printERROR(_("Error:") + " " +\ _("Template variable cl_env_data is empty")) cl_overriding.exit(1) def get_cl_env_location(self): """Aliases to env files""" envData = self.Get("cl_env_data") if envData: return map(lambda x: x[0], envData) else: cl_overriding.printERROR(_("Error:") + " " +\ _("Template variable cl_env_data is empty")) cl_overriding.exit(1) def get_cl_env_server_path(self): """Paths to clt-template files""" return '/var/calculate/remote/server.env' def get_cl_template_path(self): """Paths to template files""" return ["/usr/share/calculate/templates", "/var/calculate/templates", "/var/calculate/remote/templates"] def get_cl_template_clt_path(self): """Path to 'clt' files""" if "CONFIG_PROTECT" in os.environ: protectPaths = ["/etc"] + filter(lambda x: x.strip(), os.environ["CONFIG_PROTECT"].split(" ")) else: protectPaths = ["/etc", "/usr/share/X11/xkb", "var/lib/hsqldb", "/usr/share/config"] return filter(path.exists, protectPaths) def get_os_net_domain(self): """Get net domain name""" textLines = self._runos("hostname -d 2>&1") if textLines is False: cl_overriding.printERROR(_("Error execute 'hostname -d'")) return cl_overriding.exit(1) domain = "" if textLines: domain = textLines[0] if not domain: cl_overriding.printERROR(_("Error:") + " " +\ _("Not found domain name")) cl_overriding.printERROR(\ _("Command 'hostname -d' returns an empty value")) return cl_overriding.exit(1) elif re.search("^hostname: ",domain): return "local" else: return domain def get_os_linux_shortname(self): """Get short system name""" systemRoot = "/" return self.getShortnameByMakeprofile(systemRoot) or \ self.getShortnameByIni(systemRoot) or \ self.detectOtherShortname(systemRoot) or \ "Linux" def get_os_linux_name(self): """Get full system name""" linuxShortName = self.Get("os_linux_shortname") return self.dictLinuxName.get(linuxShortName,"Linux") def get_os_linux_subname(self): """Get posfix name of system""" linuxShortName = self.Get("os_linux_shortname") return self.dictLinuxSubName.get(linuxShortName,"") def get_os_linux_ver(self): """Get system version""" linuxShortName = self.Get("os_linux_shortname") return self.getVersionFromMetapackage('/',linuxShortName) or \ self.getVersionFromCalculateIni('/') or \ self.getVersionFromGentooFiles('/') or \ self.getVersionFromUname() or "0" def get_os_net_hostname(self): """Get hostname of computer""" textLines = self._runos("hostname -s 2>&1") hostname = "" if textLines: hostname = textLines[0] if not hostname: return "" if re.search("^hostname: ",hostname): textLines = self._runos("hostname 2>&1") if not textLines: return "" hostname = textLines[0] if re.search("^hostname: ",hostname): return self.Get('os_linux_shortname') else: if hostname=='livecd': return self.Get('os_linux_shortname') return hostname def get_os_net_ip(self): """All computer ip addresses, comma delimeter""" IPs = [] netInterfaces=self.Get("os_net_interfaces") for i in netInterfaces: res = self._runos("/sbin/ifconfig %s"%i) if not res: break for line in res: searchIP = re.search('addr:([0-9\.]+).+Bcast:', line) if searchIP: # ip адрес ip = searchIP.groups()[0] IPs.append(ip) return ",".join(IPs) def get_os_net_interfaces(self): """All net interfaces""" return filter(lambda x: x!="lo", self.getDirList("/sys/class/net")) def get_os_net_allow(self): """Allowed networks""" def getNet(ip, mask): """By ip and mask get network""" octetsMult = (0x1, 0x100, 0x10000, 0x1000000) octetsIp = map(lambda x: int(x), ip.split(".")) octetsMask = map(lambda x: int(x), mask.split(".")) ipNumb = 0 for i in octetsMult: ipNumb += octetsIp.pop()*i maskNumb = 0 for i in octetsMult: maskNumb += octetsMask.pop()*i startIpNumber = maskNumb&ipNumb x = startIpNumber nMask = lambda y: len(filter(lambda x: y >> x &1 ,range(32))) return "%s.%s.%s.%s/%s"\ %(x>>24, x>>16&255, x>>8&255, x&255, nMask(maskNumb)) networks=[] netInterfaces=self.Get("os_net_interfaces") flagError = False for i in netInterfaces: res = self._runos("/sbin/ifconfig %s"%i) if not res: flagError = True break for j in res: s_ip=re.search('addr:([0-9\.]+).+Bcast:.+Mask:([0-9\.]+)' ,j) if s_ip: ip, mask = s_ip.groups() networks.append(getNet(ip, mask)) if flagError: return "" return ",".join(networks) def get_os_arch_machine(self): """Processor architecture""" textLines = self._runos("uname -m") if not textLines: return "" march = textLines[0] return march def get_os_root_dev(self): """Root filesystem device""" record = open('/proc/cmdline','rb').read().strip() re_resRealRoot=re.search('(?:^|\s)real_root=(\S+)(\s|$)',record) re_resFakeRoot=re.search('(?:^|\s)root=(\S+)(\s|$)',record) # param real_root priority that root re_res = re_resRealRoot or re_resFakeRoot if re_res: rootparam=re_res.group(1) # check root for /dev/sd view if re.match("^\/dev\/[a-z]+.*$", rootparam): return rootparam # check root set by uuid uuidpath = '/dev/disk/by-uuid' if os.access(uuidpath,os.R_OK): uuidDevs = filter(path.islink, map(lambda x: path.join(uuidpath,x), os.listdir(uuidpath))) mapUuidDev = dict(map(lambda x:(path.basename(x), path.normpath(path.join(uuidpath, os.readlink(x)))), uuidDevs)) else: mapUuidDev = {} if re.match("^UUID=.*$",rootparam): uuid = rootparam[5:] if uuid in mapUuidDev: return mapUuidDev[uuid] # get device mounted to root dfLines = self._runos("LANG=C df /") if not dfLines: return "" if type(dfLines) == types.ListType and len(dfLines)>1: root_dev = dfLines[1].split(" ")[0].strip() if root_dev: return {'none':'/dev/ram0'}.get(root_dev,root_dev) return "" def get_os_root_type(self): """Root device type (ram, hdd, livecd)""" def link2pair(linkfile): """Return pair (target,link) from link""" basedir = os.path.dirname(linkfile) targetfile = os.readlink(linkfile) return (path.normpath(os.path.join(basedir,targetfile)),linkfile) rootDev = self.Get("os_root_dev") if rootDev: if "/dev/ram" in rootDev: return "livecd" idDict = dict(map(link2pair, filter(lambda x:path.islink(x), map(lambda x:path.join('/dev/disk/by-id',x), listDirectory('/dev/disk/by-id'))))) if "usb-" in idDict.get(rootDev,""): return "usb-hdd" return "hdd" def get_hr_cdrom_set(self): """Cdrom variable""" if os.path.exists('/sys/block/sr0'): textLines = self._runos("udevadm info --query=all --name=/dev/dvdrw") if not textLines is False: for line in textLines: if "ID_CDROM=1" in line: return "on" return "off" def get_hr_virtual(self): """Virtual machine name (virtualbox, vmware, qemu)""" pciLines = self._runos("/usr/sbin/lspci") if not pciLines: return False virtSysDict = {'VirtualBox':'virtualbox', 'VMware':'vmware', 'Qumranet':'qemu'} virtName = '' for vName in virtSysDict.keys(): if filter(lambda x: vName in x, pciLines): virtName = virtSysDict[vName] break return virtName def get_hr_board_model(self): """Get motherboard model""" modelFile = "/sys/class/dmi/id/board_name" try: return open(modelFile,"r").read().strip() except: return "" def get_hr_board_vendor(self): """Get motherboard vendor""" vendorFile = "/sys/class/dmi/id/board_vendor" try: return open(vendorFile,"r").read().strip() except: return "" def get_hr_cpu_num(self): """Get processors count""" cpuinfoFile = "/proc/cpuinfo" try: return len(["" for line in open(cpuinfoFile,"r").readlines() if "processor" in line]) except: return 1 def get_os_locale_locale(self): """locale (example: ru_RU.UTF-8)""" locale = clLocale() # get locale from boot calculate param localeVal = self.getValueFromCmdLine("calculate",0) if locale.isLangExists(localeVal): return locale.getFieldByLang('locale',localeVal) else: localeVal = self.getValueFromConfig('/etc/env.d/02locale','LANG') if locale.isValueInFieldExists('locale',localeVal): return localeVal if os.environ.has_key("LANG") and os.environ["LANG"] != "C": return os.environ["LANG"] return locale.getFieldByLang("locale","default") def get_os_locale_lang(self): """lang (example: ru_RU)""" locale = clLocale() return locale.getLangByField("locale",self.Get('os_locale_locale')) def get_os_locale_language(self): """language (example: ru)""" locale = clLocale() return locale.getFieldByLang("language",self.Get('os_locale_lang')) def get_os_locale_xkb(self): """xkb layouts (example: en,ru)""" locale = clLocale() return locale.getFieldByLang("xkblayout", self.Get('os_locale_lang')) def get_os_locale_xkbname(self): """названия используемых раскладок клавиатуры для X""" localeXkb = self.Get("os_locale_xkb") if localeXkb: return localeXkb.split("(")[0] return "" def get_ur_login(self): """User login""" uid = os.getuid() try: userName = pwd.getpwuid(uid).pw_name except: return "" return userName def get_ur_group(self): """User group""" userName = self.Get('ur_login') groupName = "" if userName: try: gid = pwd.getpwnam(userName).pw_gid groupName = grp.getgrgid(gid).gr_name except: return "" return groupName def get_ur_fullname(self): """Full user name""" userName = self.Get('ur_login') fullName = "" if userName: try: fullName = pwd.getpwnam(userName).pw_gecos except: return "" return fullName def get_ur_jid(self): """Get user Jabber id""" userInfo = self.getUserInfo() userJID = "" if userInfo: userJID = userInfo["jid"] return userJID def get_ur_mail(self): """Get user email""" userInfo = self.getUserInfo() userMail = "" if userInfo: userMail = userInfo["mail"] return userMail def get_ur_home_path(self): """Get user home directory""" userName = self.Get('ur_login') homeDir = "" if userName: try: homeDir = pwd.getpwnam(userName).pw_dir except: return "" return homeDir def get_os_linux_system(self): """Get linux system (server or desktop)""" shortName = self.Get('os_linux_shortname') return self.dictNameSystem.get(shortName,"") def get_os_x11_video_drv(self): """Get video driver used by xorg""" xorg_conf = '/etc/X11/xorg.conf' # Try analize Xorg.{DISPLAY}.log display = os.environ.get('DISPLAY') list_avialable_drivers = getAvailableX11Drivers() if display and list_avialable_drivers: reDriver = re.compile('|'.join(map(lambda x: "%s_drv.so"%x, list_avialable_drivers))) display_number = re.search(r':(\d+)\..*', display) if display_number: xorg_log_file = '/var/log/Xorg.%s.log' % \ display_number.group(1) if path.exists(xorg_log_file): matchStrs = [i for i in open(xorg_log_file) if "drv" in i and reDriver.search(i)] if matchStrs: resDriver = re.search(r'([^/]+)_drv.so', matchStrs[-1]) if resDriver: return resDriver.group(1) # analize /etc/X11/xorg.conf if path.exists(xorg_conf): matchSect = re.search(r'Section "Device".*?EndSection', open('/etc/X11/xorg.conf').read(),re.S) if matchSect: resDriver = re.search(r'Driver\s*"([^"]+)"', matchSect.group(0),re.S) if resDriver and resDriver.group(1) in list_avialable_drivers: return resDriver.group(1) videoVal = self.getValueFromCmdLine("calculate","video") videoVal = {'i915':'intel'}.get(videoVal,videoVal) if videoVal in list_avialable_drivers: return videoVal workedModules = map(lambda x:x[0], filter(lambda x:x[1].isdigit() and int(x[1])>0, map(lambda x:x.split()[:3:2], open('/proc/modules','r')))) return self.getVideoByDefault(list_avialable_drivers) def get_os_x11_height(self): """Get screen height in pixeles""" resolution = self.getX11Resolution() if resolution: self.Set('os_x11_width',resolution[0]) return resolution[1] return "768" def get_os_x11_width(self): """Get screen width in pixeles""" resolution = self.getX11Resolution() if resolution: self.Set('os_x11_height',resolution[1]) return resolution[0] return "1024" def get_os_x11_standart(self): """Get the nearest standard size of image relative current screen resolution""" #Стандартные разрешения widthVal = self.Get('os_x11_width') heightVal = self.Get('os_x11_height') if not widthVal or not heightVal: return "" width = int(widthVal) height = int(heightVal) res = [(1024,600), (1024,768), (1280,1024), (1280,800), (1366,768), (1440,900), (1600,1200), (1680,1050), (1920,1200)] resolution = [] formats = [] for w, h in res: formats.append(float(w)/float(h)) listFr = list(set(formats)) listFormats = {} for fr in listFr: listFormats[fr] = [] for w, h in res: for fr in listFormats.keys(): if fr == float(w)/float(h): listFormats[fr].append((w,h)) break format = float(width)/float(height) deltaFr = {} for fr in listFormats.keys(): deltaFr[abs(format - fr)] = fr resolution = listFormats[deltaFr[min(deltaFr.keys())]] flagFound = False stResol = [] stHeights = [] stWidths = [] stWidth = False stHeight = False for w, h in resolution: if w >= width and h >= height: stResol.append((w,h)) stHeights.append(h) if stHeights: stHeight = min(stHeights) for w, h in stResol: if stHeight == h: stWidths.append(w) if stWidths: stWidth = min(stWidths) if (not stWidth) or (not stHeight): return "%sx%s"%(resolution[-1][0],resolution[-1][1]) else: return "%sx%s"%(stWidth,stHeight) def get_os_x11_composite(self): """On or off composite mode""" state = self.get_composite_from_xorgconf() return state or "off" def get_hr_laptop(self): """Laptop vendor""" chassisType = '/sys/class/dmi/id/chassis_type' boardVendor = '/sys/class/dmi/id/board_vendor' if os.access(chassisType,os.R_OK) and \ os.access(boardVendor,os.R_OK): chassis = open(chassisType,'r').read().strip() notebookChassis = ['1','8','10'] if chassis in notebookChassis: valBoardVendor = open(boardVendor,'r').read().strip() return valBoardVendor.split(" ")[0].lower() or \ "unknown" return "" def get_hr_laptop_model(self): """Laptop name""" boardName = '/sys/class/dmi/id/board_name' if self.Get('hr_laptop') and os.access(boardName,os.R_OK): valBoardName = open(boardName,'r').read().strip() return valBoardName or "unknown" return "" def get_hr_video(self): """Videocard vendor""" lines=self._runos("lspci") if not lines: return "" reVGA = re.compile("vga",re.I) foundVGA = False for line in lines: if reVGA.search(line): foundVGA = True break if not foundVGA: return "vesa" if "nVidia" in line or "GeForce" in line: return "nvidia" elif "ATI" in line: return "ati" elif "Intel" in line: return "intel" elif "VIA" in line: return "via" elif "VMware" in line: return "vmware" else: return "vesa" def get_cl_kernel_uid(self): """Get UID of symlink kernel, initramfs and System.map""" uuidpath = '/dev/disk/by-uuid' if not os.access(uuidpath,os.R_OK): return "" uuidDevs = filter(path.islink,map(lambda x: path.join(uuidpath,x), os.listdir(uuidpath))) mapDevUuid = dict(map(lambda x:(path.normpath(path.join(uuidpath, os.readlink(x))), path.basename(x)), uuidDevs)) if self.Get('os_root_dev') in mapDevUuid: return mapDevUuid[self.Get('os_root_dev')][:8] else: return "" def get_cl_chroot_status(self): """Detect chroot mode by mtab content""" try: return "on" if self.isChroot(os.getpid()) else "off" except: return "off" def get_os_scratch(self): """Current system is scratch""" if self.Get('os_root_type') == 'livecd': return "on" if isMount('/mnt/scratch/workspace') else "off" else: return "on" if isMount('/mnt/scratch') else "off" def get_cl_root_path(self): """Path to directory relative which perform joining templates to system files (sandbox)""" return '/' def get_cl_chroot_path(self): """Path to directory which contain other system""" return '/' def get_cl_autoupdate_set(self): """(on or off) autoupdate config from install program""" return 'off' def get_cl_api(self): """The path to the module api, and additional parameters caluclate packages""" return {} def get_ld_encrypt(self): """hash crypto algoritm""" return 'ssha' def get_ld_bind_login(self): """bind login""" return 'proxyuser' def get_ld_base_root(self): """base name LDAP""" return 'calculate' def get_ld_base_dn(self): """base DN LDAP""" return self.genDn("dc", self.Get('ld_base_root')) def get_ld_bind_dn(self): """bind DN LDAP""" return self.addDn(self.genDn("cn", self.Get('ld_bind_login')), self.Get('ld_base_dn')) def get_ld_bind_pw(self): """bind password""" return 'calculate' def get_ld_bind_hash(self): """hash bind""" return getHash(self.Get('ld_bind_pw'), self.Get('ld_encrypt')) def get_ld_admin_login(self): """administrator name""" return 'ldapadmin' def get_ld_admin_dn(self): """root DN""" return self.addDn(self.genDn("cn", self.Get('ld_admin_login')), self.Get('ld_base_dn')) def get_ld_admin_hash(self): """root hash""" return getHash(self.Get('ld_admin_pw'), self.Get('ld_encrypt')) def get_ld_admin_pw(self): """password root""" return genpassword() def get_ld_services(self): """Name from all services""" return 'Services' def get_ld_services_dn(self): """DN from all services""" return self.addDn(self.genDn("ou", self.Get('ld_services')), self.Get('ld_base_dn')) def get_cl_ca_cert(self): """CA certificate""" return 'CA.crt' def get_cl_ca_key(self): """CA key""" return 'CA.key' def get_cl_ca_path(self): """CA path""" return '/var/calculate/ssl/main'