# -*- coding: utf-8 -*- # Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import sys from os import path import re import operator from operator import itemgetter from calculate.lib.datavars import (Variable, VariableError, ReadonlyVariable, CommonVariableError) from calculate.lib.utils.common import (getSupportArch, getTupleVersion, cmpVersion, cmp) from calculate.lib.utils.files import listDirectory, pathJoin from calculate.lib.variables.linux import Linux from ..distr import (Distributive, PartitionDistributive, DirectoryDistributive, DefaultMountPath, DistributiveError, FlashDistributive, ArchiveDistributive, MultiPartitions, PxeDistributive) from calculate.lib.cl_lang import setLocalTranslate, _ from ..fs_manager import FileSystemManager from functools import reduce, cmp_to_key setLocalTranslate('cl_install3', sys.modules[__name__]) class DistroRepository(Linux): contentCache = {} marches = ['i686', 'x86_64'] extensiton = ['iso', 'tar.bz2', 'tar.gz', 'tar.7z', 'tar.lzma'] reDistName = re.compile(""" ^.*/(?P%(name)s) -(?P%(ver)s) (?:-(?P%(ser)s))? -(?P%(march)s) .(?P%(ext)s)$""" % {'name': "[a-z0-9]+", 'ver': r"(\d+\.)*\d+", 'ser': r"\d+", 'march': "|".join(marches), 'ext': "|".join(extensiton) }, re.X) def _getDistrInfo(self, filename): """Get information by distributive""" # if filename is directory if not path.isfile(filename): return Distributive().getInfo(filename) else: match = self.reDistName.match(filename) if not match: return {} distdic = match.groupdict() distdic["os_linux_build"] = "" if "os_linux_ver" in distdic: if re.match("^\d{8}$", distdic["os_linux_ver"]): distdic["os_linux_build"] = distdic["os_linux_ver"] distdic["os_linux_ver"] = "" return distdic def getImage(self, scratch, rootType, imagePath, march=None, shortName=None, linuxVer=None, linuxBuild=None): """Get image by parameters""" # exclude directory distributive for flash and scratch install if scratch == "on" or rootType == "flash": discardType = ["dir"] else: discardType = [] return self.getBestDistributive(imagePath, march=march, shortname=shortName, discardType=discardType, version=linuxVer, build=linuxBuild) def _getAvailableShortnames(self, dirs): """Get available distributives shortnames""" distros = [x for x in map(self.reDistName.search, self._getAvailableDistributives(dirs)) if x] return sorted(list(set([x.groupdict()['name'] for x in distros]))) def opcompareByString(self, buf): if buf: reOp = re.compile("^(!=|=|==|<=|>=|>|<)?(\d+.*)$") res = reOp.search(buf) if res: return ({'!=': operator.ne, '=': operator.eq, '==': operator.eq, '>=': operator.ge, '<=': operator.le, '<': operator.lt, '>': operator.gt}.get(res.group(1), operator.eq), res.group(2)) else: return operator.eq, buf return None, None def _getAvailableDistributives(self, dirs, system=None, shortname=None, march=None, version=None, build=None): """Get all distributives by filter""" def systemByName(name): return self.dictNameSystem.get(name.upper(), "") verCmp, version = self.opcompareByString(version) if version: version = getTupleVersion(version) buildCmp, build = self.opcompareByString(build) if build and build.isdigit(): build = int(build) def distfilter(dist): d = self._getDistrInfo(dist) if not d: return False # check filter conditions if system and systemByName(d['os_linux_shortname']) != system: return False if "os_linux_shortname" not in d or "os_linux_ver" not in d: return False if (shortname and d['os_linux_shortname'].lower() != shortname.lower()): return False if march and d['os_arch_machine'] != march: return False if version and \ not verCmp(getTupleVersion(d['os_linux_ver']), version): return False if build and "os_linux_build" in d and \ (not d['os_linux_build'].isdigit() or not buildCmp(int(d['os_linux_build']), build)): return False return True def listdistr(pathname): if path.exists(path.join(pathname, 'etc/make.profile')) or \ path.exists( path.join(pathname, 'etc/portage/make.profile')) or \ path.exists(path.join(pathname, 'livecd')) or \ pathname.startswith('/dev/'): return [pathname] else: # discard inner directories return [x for x in listDirectory(pathname) if not path.isdir(path.join(pathname, x))] # get lists files in directories allFiles = [[path.join(x, y) for y in listdistr(x)] for x in dirs] # filter distributives # join files lists to one list return [x for x in reduce(lambda x, y: x + y, allFiles, []) if distfilter(x)] @staticmethod def extcomparator(self, *exts): """Compare extensions""" mapExts = {'iso': 0, 'flash': -1, 'isodir': -2, 'partdir': -3, 'dir': -4} return cmp(mapExts.get(exts[0], -4), mapExts.get(exts[1], -4)) @staticmethod def sortdistrfunc(x, y): """Func of comparing two distributive""" ver1, ver2 = x[1].get('os_linux_ver', ""), y[1].get('os_linux_ver', "") if ver1 and ver2 and ver1 != "0" and ver2 != "0" and ver1 != ver2: return cmpVersion(ver1, ver2) build1 = getTupleVersion(x[1].get('os_linux_build', "")) build2 = getTupleVersion(y[1].get('os_linux_build', "")) if build1 != build2: return cmp(build1, build2) else: ser1, ser2 = (x[1].get('serial_id') or "0", y[1].get('serial_id') or "0") if ser1 != ser2: return cmp(int(ser1), int(ser2)) ext1 = x[1].get('ext', "") ext2 = y[1].get('ext', "") return DistroRepository.extcomparator(ext1, ext2) def getAvailableDristibutives(self, dirs, system=None, shortname=None, march=None, version=None, build=None, discardType=()): """Get list available distributives""" if shortname: shortname = shortname.lower() availDistrs = self._getAvailableDistributives(dirs, system, shortname, march, version, build) availDistrs = (x for x in [(y, self._getDistrInfo(y)) for y in availDistrs] if x[1] and "ext" in x[1] and not x[1]["ext"] in discardType) return [x[0] for x in sorted(availDistrs, key=cmp_to_key(DistroRepository.sortdistrfunc), reverse=True)] def getBestDistributive(self, dirs, system=None, shortname=None, march=None, version=None, build=None, discardType=()): """Get the actualest distributive""" availDistrs = self.getAvailableDristibutives(dirs, system, shortname, march, version, build, discardType) if availDistrs: return availDistrs[0] else: return None def _findLatestFile(self, dirs, reMatch, keyfunc): """Find latest file in dirs, which match by reMatch, comparable part get by keyfunc""" existsdirs = filter(path.exists, dirs) listimgs = reduce(lambda x, y: x + map( lambda x: reMatch.search( path.join(y, x)), listDirectory(y)), existsdirs, []) listimgs = [x for x in listimgs if x] if listimgs: return max(listimgs, key=keyfunc).group() return "" def getBestStage(self, dirs, march=None, hardened=None): """Get latest stage by march""" if march: march = {'x86_64': 'amd64'}.get(march, march) else: march = "[^-]+" if hardened is None: hardened = "(?:-hardened)?" elif hardened is True: hardened = "-hardened" elif hardened is False: hardened = "" reStage = re.compile(r'^.*/stage3-%s%s-(\d+)\.tar\.bz2$' % (march, hardened), re.S) return self._findLatestFile(dirs, reStage, lambda x: x.groups()[0]) class VariableClImage(ReadonlyVariable): """ System image for installation """ type = "object" def get(self): """Get image file from distributive repository""" try: action = self.Get('cl_action') if not action in ('system',): return Distributive.fromFile('/') filename = self.Get('cl_image_filename') if filename: filename = Distributive.fromFile(filename) except DistributiveError as e: return "" return filename def humanReadable(self): filename = self.Get('cl_image') if filename: return filename.getType() return filename class VariableClImageFilename(DistroRepository, Variable): """ Distributive image filename """ type = 'file' element = 'file' metavalue = "IMAGE" opt = ['--iso'] check_after = ["os_install_root_type"] untrusted = True def init(self): self.label = _("Installation image") self.help = _("ISO image for installation") def get(self): if self.Get('cl_action') != 'system': return "" arch = self.Get('cl_image_arch_machine') or self.Get('os_arch_machine') shortname = self.Get('cl_image_linux_shortname') or \ self.Get('os_linux_shortname') ver = self.Get('cl_image_linux_ver') or None build = self.Get('cl_image_linux_build') or None return self.getImage(self.Get('os_install_scratch'), self.Get('os_install_root_type'), self.Get('cl_image_path'), arch, shortname, ver, build) or "" def check(self, isoimage): """Set image file""" if self.Get('cl_action') == 'system' and not isoimage: raise VariableError(_("You need to select a distribution image")) try: d = Distributive.fromFile(isoimage) if isinstance(d, ArchiveDistributive): raise VariableError(_("Wrong image file")) except DistributiveError: pass imageData = Distributive().getInfo(isoimage) if not ("os_linux_shortname" in imageData and imageData.get('os_linux_build', '') and "os_arch_machine" in imageData): raise VariableError(_("Wrong image file")) if imageData["os_chrootable_set"] == 'off': raise VariableError( _("The image is not compatible with the current kernel")) def humanImageName(self, distroinfo, filepath): if all(x in distroinfo for x in ("os_linux_shortname", "os_arch_machine", "os_linux_build")): distroinfo['os_linux_shortname'] = \ distroinfo['os_linux_shortname'].upper() fullname = distroinfo.get('os_linux_name', Linux.dictLinuxName.get( distroinfo['os_linux_shortname'], "Calculate")) subname = distroinfo.get('os_linux_subname', Linux.dictLinuxSubName.get( distroinfo['os_linux_shortname'], "")) if subname: subname = " %s" % subname build = distroinfo['os_linux_build'] or \ distroinfo.get('os_linux_ver', '') ver = distroinfo.get('os_linux_ver', '') return "{fullname} {os_arch_machine} {build}".format( fullname="%s%s" % (fullname, subname), filepath=filepath, build=build, ver=ver, **distroinfo) else: return filepath def humanReadable(self): fullname = self.Get('os_install_linux_name') subname = self.Get('os_install_linux_subname') if subname: subname = " %s" % subname arch = self.Get('os_install_arch_machine') build = self.Get('os_install_linux_build') ver = self.Get('os_install_linux_ver') return "{fullname} {ver} {arch} {build}".format( fullname="%s%s" % (fullname, subname), build=build, ver=ver, arch=arch) def choice(self): scratch = self.Get('os_install_scratch') rootType = self.Get('os_install_root_type') imagePath = self.Get('cl_image_path') if scratch == "on" or rootType == "flash" or \ self.Get('cl_install_type') == 'flash': discardType = ["dir"] else: discardType = [] distros = self.getAvailableDristibutives(imagePath, discardType=discardType) if self.wasSet and not self.value in distros: distros.append(self.value) return sorted(((x, self.humanImageName(self._getDistrInfo(x), x)) for x in distros), key=itemgetter(1)) class VariableClImageArchMachine(DistroRepository, Variable): """ Filter by architecture """ value = "" type = 'choice' opt = ['--march'] metavalue = "ARCH" available_arch = ["i686", "x86_64"] def init(self): self.label = "%s %s" % (_("Filter"), _("by processor architecture")) self.help = _("select the processor architecture") def set(self, march): if march == "auto": march = getSupportArch()[-1] return march def choice(self): return [("", _("Not used"))] + \ [("auto", _("Auto"))] + \ [(x, x) for x in self.available_arch] def humanReadable(self): return self.Get() or _("Not used") class VariableClImageLinuxShortname(DistroRepository, Variable): """ Filter by shortname """ value = "" type = 'choiceedit' metavalue = "SYSTEM" opt = ['--os', '-s'] def init(self): self.label = "%s %s" % (_("Filter"), _("by distribution")) self.help = _("select the operation system") def choice(self): return [("", _("Not used"))] + [ ("CLD", "Calculate Linux Desktop KDE"), ("CLDM", "Calculate Linux Desktop MATE"), ("CLDX", "Calculate Linux Desktop XFCE"), ("CLS", "Calculate Linux Scratch"), ("CDS", "Calculate Directory Server"), ("CSS", "Calculate Scratch Server"), ("CMC", "Calculate Media Center"), ] def humanReadable(self): return self.Get() or _("Not used") class VariableClImageLinuxVer(DistroRepository, Variable): """ Filter by version """ value = "" def init(self): self.label = "%s %s" % (_("Filter"), _("by version")) self.help = _("select the operation system by version") def humanReadable(self): return self.Get() or _("Not used") class VariableClImageLinuxBuild(DistroRepository, Variable): """ Filter by build """ value = "" def init(self): self.label = "%s %s" % (_("Filter"), _("by build")) self.help = _("select the operation system by build") def humanReadable(self): return self.Get() or _("Not used") class VariableClImagePath(ReadonlyVariable): """ Image search path """ type = "list" def get(self): # if current distributive is live if self.Get('os_root_type') == "livecd": # if builder from flash then this source path '/mnt/flash' # may be this path will be '/mnt/builder' for install # modified system if self.Get('os_scratch') == "on" and path.exists('/mnt/flash'): livedistr = ['/mnt/flash'] # if system boot with kernel param 'docache' elif path.exists('/mnt/squash'): livedistr = ['/mnt/livecd'] # standard livecd else: if self.Get('os_install_root_type') == "flash": livedistr = ['/run/initramfs/live', '/run/initramfs/squashfs', '/mnt/cdrom'] else: livedistr = ['/run/initramfs/squashfs', '/run/initramfs/live', '/mnt/cdrom'] livedistr = [x for x in livedistr if listDirectory(x)][:1] else: livedistr = [] # search all partition for source installation distributive rootDev = self.Get('os_install_root_dev') livedistr += [x[0] for x in zip(self.Get('os_disk_dev'), self.Get('os_disk_content')) if " live" in x[1] and x[0] != rootDev] # add to standard path return [x for x in ['/var/calculate/remote/linux', '/var/calculate/linux'] + livedistr if path.exists(x)] class VariableClSource(ReadonlyVariable): """ Дистрибутив текущей системы """ type = "object" def get(self): return DirectoryDistributive('/') class VariableClTarget(ReadonlyVariable): """ Target distributive """ type = "object" def get(self): listVars = ['os_install_disk_dev', 'os_install_disk_mount', 'os_install_disk_format', 'os_install_disk_perform_format', 'os_install_disk_part', 'os_install_disk_id'] rootLabel = "{short}-{ver}".format( short=self.Get('os_install_linux_shortname'), ver=self.Get('os_install_linux_ver')) if self.Get('os_install_root_type') == "flash": flashLabel = "{short}-{build}".format( short="CL", build=self.Get('os_install_linux_build')) disk = self.Get('os_install_disk_single') fileSystem = "vfat" systemId = FileSystemManager.supportFS.get( 'vfat', {}).get('msdos', '0b') isFormat = self.GetBool('os_install_format_single_set') partTable = self.select('os_disk_part', os_disk_dev=disk, limit=1) return FlashDistributive( disk, mdirectory=DefaultMountPath.InstallMount, check=True, fileSystem=fileSystem, isFormat=isFormat, systemId=systemId, rootLabel=flashLabel, partitionTable=partTable) osInstallScratch = self.isTrue(self.Get('os_install_scratch')) mapDevId = dict(self.ZipVars('os_disk_dev', 'os_disk_id')) disk, mount, fileSystem, isFormat, partTable, systemId = \ self.Select(listVars, where='os_install_disk_mount', eq='/', limit=1) if not systemId or mapDevId.get(disk, '') == systemId: systemId = None if osInstallScratch: raise VariableError("Scratch is not supported") if self.Get('os_install_pxe') == "on": return PxeDistributive(self.Get('os_install_pxe_path')) else: target = PartitionDistributive( disk, mdirectory=DefaultMountPath.InstallMount, check=True, fileSystem=fileSystem, rootLabel=rootLabel, isFormat=self.isTrue(isFormat), systemId=systemId, partitionTable=partTable, compression=self.Get('os_install_btrfs_compression')) multiPartition = None diskData = self.Select(listVars, where='os_install_disk_mount', ne='/') bindData = self.Select(['os_install_bind_path', 'os_install_bind_mountpoint'], where='os_install_bind_mountpoint', ne='') if diskData or bindData: multiPartition = MultiPartitions() target.multipartition = multiPartition for disk, mount, fileSystem, isFormat, partTable, systemId in diskData: if not systemId or mapDevId.get(disk, '') == systemId: systemId = None multiPartition.addPartition(dev=disk, mountPoint=mount, fileSystem=fileSystem, isFormat=self.isTrue(isFormat), systemId=systemId, partitionTable=partTable) for source, dest in bindData: multiPartition.addPartition(dev=source, mountPoint=dest, fileSystem='bind', isFormat=False, systemId=None, partitionTable='') return target class VariableClImageNewOnly(Variable): """ Distributive image filename """ type = 'bool' opt = ['-U', '--update'] value = "off" def init(self): self.label = _("Install the newer image only") self.help = _("install the newer image only") def installedBuild(self): """ Get build already installed system Need for check update """ rootDev = self.Get('os_install_root_dev') if not rootDev: return "" try: imageData = Distributive().getInfo(rootDev) return imageData.get('os_linux_build', '') except Exception: pass return "" def check(self, value): if value == 'on': try: imageData = Distributive().getInfo( self.Get('cl_image_filename')) except Exception as e: raise VariableError(_("Wrong image file")) if imageData.get('os_linux_build', '') <= \ self.Get('os_linux_build') or \ imageData.get('os_linux_build', '') <= self.installedBuild(): raise CommonVariableError(_("The image for update not found")) class VariableClInstallPathFrom(ReadonlyVariable): """ Путь из устанавливаемой системы до устанавливающий системы """ def get(self): template_path = pathJoin(self.Get('cl_chroot_path'), self.Get('cl_root_path')) return os.path.relpath("/", template_path)