You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-install/pym/install/variables/distr.py

656 lines
25 KiB

# -*- coding: utf-8 -*-
# Copyright 2008-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from os import path
import re
import operator
from operator import itemgetter
from calculate.lib.datavars import (Variable, VariableError, ReadonlyVariable,
CommonVariableError)
from calculate.lib.utils.common import (getSupportArch, getTupleVersion,
cmpVersion)
from calculate.lib.utils.files import listDirectory, pathJoin
from calculate.lib.variables.linux import Linux
from calculate.install.distr import (Distributive, PartitionDistributive,
DirectoryDistributive, DefaultMountPath,
DistributiveError, FlashDistributive,
ArchiveDistributive,
MultiPartitions, PxeDistributive)
from calculate.lib.cl_lang import setLocalTranslate, _
from calculate.install.fs_manager import FileSystemManager
setLocalTranslate('cl_install3', sys.modules[__name__])
class DistroRepository(Linux):
contentCache = {}
marches = ['i686', 'x86_64']
extensiton = ['iso', 'tar.bz2', 'tar.gz', 'tar.7z', 'tar.lzma']
reDistName = re.compile("""
^.*/(?P<os_linux_shortname>%(name)s)
-(?P<os_linux_ver>%(ver)s)
(?:-(?P<serial_id>%(ser)s))?
-(?P<os_arch_machine>%(march)s)
.(?P<ext>%(ext)s)$""" %
{'name': "[a-z0-9]+",
'ver': r"(\d+\.)*\d+",
'ser': r"\d+",
'march': "|".join(marches),
'ext': "|".join(extensiton)
}, re.X)
def _getDistrInfo(self, filename):
"""Get information by distributive"""
# if filename is directory
if not path.isfile(filename):
return Distributive().getInfo(filename)
else:
match = self.reDistName.match(filename)
if not match:
return {}
distdic = match.groupdict()
distdic["os_linux_build"] = ""
if "os_linux_ver" in distdic:
if re.match("^\d{8}$", distdic["os_linux_ver"]):
distdic["os_linux_build"] = distdic["os_linux_ver"]
distdic["os_linux_ver"] = ""
return distdic
def getImage(self, scratch, rootType, imagePath, march=None,
shortName=None, linuxVer=None, linuxBuild=None):
"""Get image by parameters"""
# exclude directory distributive for flash and scratch install
if scratch == "on" or rootType == "flash":
discardType = ["dir"]
else:
discardType = []
return self.getBestDistributive(imagePath,
march=march,
shortname=shortName,
discardType=discardType,
version=linuxVer,
build=linuxBuild)
def _getAvailableShortnames(self, dirs):
"""Get available distributives shortnames"""
distros = filter(lambda x: x,
map(self.reDistName.search,
self._getAvailableDistributives(dirs)))
return sorted(list(set(map(lambda x: x.groupdict()['name'], distros))))
def opcompareByString(self, buf):
if buf:
reOp = re.compile("^(!=|=|==|<=|>=|>|<)?(\d+.*)$")
res = reOp.search(buf)
if res:
return ({'!=': operator.ne,
'=': operator.eq,
'==': operator.eq,
'>=': operator.ge,
'<=': operator.le,
'<': operator.lt,
'>': operator.gt}.get(res.group(1), operator.eq),
res.group(2))
else:
return operator.eq, buf
return None, None
def _getAvailableDistributives(self, dirs, system=None, shortname=None,
march=None, version=None, build=None):
"""Get all distributives by filter"""
def systemByName(name):
return self.dictNameSystem.get(name.upper(), "")
verCmp, version = self.opcompareByString(version)
if version:
version = getTupleVersion(version)
buildCmp, build = self.opcompareByString(build)
if build and build.isdigit():
build = int(build)
def distfilter(dist):
d = self._getDistrInfo(dist)
if not d:
return False
# check filter conditions
if system and systemByName(d['os_linux_shortname']) != system:
return False
if "os_linux_shortname" not in d or "os_linux_ver" not in d:
return False
if (shortname and
d['os_linux_shortname'].lower() != shortname.lower()):
return False
if march and d['os_arch_machine'] != march:
return False
if version and \
not verCmp(getTupleVersion(d['os_linux_ver']), version):
return False
if build and "os_linux_build" in d and \
(not d['os_linux_build'].isdigit() or
not buildCmp(int(d['os_linux_build']), build)):
return False
return True
def listdistr(pathname):
if path.exists(path.join(pathname, 'etc/make.profile')) or \
path.exists(
path.join(pathname, 'etc/portage/make.profile')) or \
path.exists(path.join(pathname, 'livecd')) or \
pathname.startswith('/dev/'):
return [pathname]
else:
# discard inner directories
return filter(lambda x: not path.isdir(path.join(pathname, x)),
listDirectory(pathname))
# get lists files in directories
allFiles = map(lambda x: map(lambda y: path.join(x, y),
listdistr(x)),
dirs)
# filter distributives
return filter(distfilter,
# join files lists to one list
reduce(lambda x, y: x + y,
allFiles, []))
def extcomparator(self, *exts):
"""Compare extensions"""
mapExts = {'iso': 0,
'flash': -1,
'isodir': -2,
'partdir': -3,
'dir': -4}
return cmp(mapExts.get(exts[0], -4), mapExts.get(exts[1], -4))
def sortdistrfunc(self, x, y):
"""Func of comparing two distributive"""
ver1, ver2 = x[1].get('os_linux_ver', ""), y[1].get('os_linux_ver', "")
if ver1 and ver2 and ver1 != "0" and ver2 != "0" and ver1 != ver2:
return cmpVersion(ver1, ver2)
build1 = getTupleVersion(x[1].get('os_linux_build', ""))
build2 = getTupleVersion(y[1].get('os_linux_build', ""))
if build1 != build2:
return cmp(build1, build2)
else:
ser1, ser2 = (x[1].get('serial_id') or "0",
y[1].get('serial_id') or "0")
if ser1 != ser2:
return cmp(int(ser1), int(ser2))
ext1 = x[1].get('ext', "")
ext2 = y[1].get('ext', "")
return self.extcomparator(ext1, ext2)
def getAvailableDristibutives(self, dirs, system=None, shortname=None,
march=None, version=None, build=None,
discardType=()):
"""Get list available distributives"""
if shortname:
shortname = shortname.lower()
availDistrs = self._getAvailableDistributives(dirs, system, shortname,
march, version,
build)
availDistrs = filter(lambda x: x[1] and "ext" in x[1] and
not x[1]["ext"] in discardType,
map(lambda x: (x, self._getDistrInfo(x)),
availDistrs))
return map(lambda x: x[0],
sorted(availDistrs, self.sortdistrfunc, reverse=True))
def getBestDistributive(self, dirs, system=None, shortname=None, march=None,
version=None, build=None, discardType=()):
"""Get the actualest distributive"""
availDistrs = self.getAvailableDristibutives(dirs, system, shortname,
march, version, build,
discardType)
if availDistrs:
return availDistrs[0]
else:
return None
def _findLatestFile(self, dirs, reMatch, keyfunc):
"""Find latest file in dirs, which match by reMatch,
comparable part get by keyfunc"""
existsdirs = filter(path.exists, dirs)
listimgs = reduce(lambda x, y: x + map(
lambda x: reMatch.search(
path.join(y, x)),
listDirectory(y)),
existsdirs, [])
listimgs = filter(lambda x: x, listimgs)
if listimgs:
return max(listimgs, key=keyfunc).group()
return ""
def getBestStage(self, dirs, march=None, hardened=None):
"""Get latest stage by march"""
if march:
march = {'x86_64': 'amd64'}.get(march, march)
else:
march = "[^-]+"
if hardened is None:
hardened = "(?:-hardened)?"
elif hardened is True:
hardened = "-hardened"
elif hardened is False:
hardened = ""
reStage = re.compile(r'^.*/stage3-%s%s-(\d+)\.tar\.bz2$' %
(march, hardened), re.S)
return self._findLatestFile(dirs, reStage, lambda x: x.groups()[0])
class VariableClImage(ReadonlyVariable):
"""
System image for installation
"""
type = "object"
def get(self):
"""Get image file from distributive repository"""
try:
action = self.Get('cl_action')
if not action in ('system',):
return Distributive.fromFile('/')
filename = self.Get('cl_image_filename')
if filename:
filename = Distributive.fromFile(filename)
except DistributiveError as e:
return ""
return filename
def humanReadable(self):
filename = self.Get('cl_image')
if filename:
return filename.getType()
return filename
class VariableClImageFilename(DistroRepository, Variable):
"""
Distributive image filename
"""
type = 'file'
element = 'file'
metavalue = "IMAGE"
opt = ['--iso']
check_after = ["os_install_root_type"]
untrusted = True
def init(self):
self.label = _("Installation image")
self.help = _("ISO image for installation")
def get(self):
if self.Get('cl_action') != 'system':
return ""
arch = self.Get('cl_image_arch_machine') or self.Get('os_arch_machine')
shortname = self.Get('cl_image_linux_shortname') or \
self.Get('os_linux_shortname')
ver = self.Get('cl_image_linux_ver') or None
build = self.Get('cl_image_linux_build') or None
return self.getImage(self.Get('os_install_scratch'),
self.Get('os_install_root_type'),
self.Get('cl_image_path'),
arch, shortname, ver, build) or ""
def check(self, isoimage):
"""Set image file"""
if self.Get('cl_action') == 'system' and not isoimage:
raise VariableError(_("You need to select a distribution image"))
try:
d = Distributive.fromFile(isoimage)
if isinstance(d, ArchiveDistributive):
raise VariableError(_("Wrong image file"))
except DistributiveError:
pass
imageData = Distributive().getInfo(isoimage)
if not ("os_linux_shortname" in imageData and
imageData.get('os_linux_build', '') and
"os_arch_machine" in imageData):
raise VariableError(_("Wrong image file"))
def humanImageName(self, distroinfo, filepath):
if all(x in distroinfo for x in ("os_linux_shortname",
"os_arch_machine",
"os_linux_build")):
distroinfo['os_linux_shortname'] = \
distroinfo['os_linux_shortname'].upper()
fullname = distroinfo.get('os_linux_name',
Linux.dictLinuxName.get(
distroinfo['os_linux_shortname'],
"Calculate"))
subname = distroinfo.get('os_linux_subname',
Linux.dictLinuxSubName.get(
distroinfo['os_linux_shortname'], ""))
if subname:
subname = " %s" % subname
build = distroinfo['os_linux_build'] or \
distroinfo.get('os_linux_ver', '')
ver = distroinfo.get('os_linux_ver', '')
return "{fullname} {os_arch_machine} {build}".format(
fullname="%s%s" % (fullname, subname), filepath=filepath,
build=build, ver=ver, **distroinfo)
else:
return filepath
def humanReadable(self):
fullname = self.Get('os_install_linux_name')
subname = self.Get('os_install_linux_subname')
if subname:
subname = " %s" % subname
arch = self.Get('os_install_arch_machine')
build = self.Get('os_install_linux_build')
ver = self.Get('os_install_linux_ver')
return "{fullname} {ver} {arch} {build}".format(
fullname="%s%s" % (fullname, subname),
build=build, ver=ver, arch=arch)
def choice(self):
scratch = self.Get('os_install_scratch')
rootType = self.Get('os_install_root_type')
imagePath = self.Get('cl_image_path')
if scratch == "on" or rootType == "flash" or \
self.Get('cl_install_type') == 'flash':
discardType = ["dir"]
else:
discardType = []
distros = self.getAvailableDristibutives(imagePath,
discardType=discardType)
if self.wasSet and not self.value in distros:
distros.append(self.value)
return sorted(map(lambda x: (
x, self.humanImageName(self._getDistrInfo(x), x)), distros),
key=itemgetter(1))
class VariableClImageArchMachine(DistroRepository, Variable):
"""
Filter by architecture
"""
value = ""
type = 'choice'
opt = ['--march']
metavalue = "ARCH"
available_arch = ["i686", "x86_64"]
def init(self):
self.label = "%s %s" % (_("Filter"), _("by processor architecture"))
self.help = _("select the processor architecture")
def set(self, march):
if march == "auto":
march = getSupportArch()[-1]
return march
def choice(self):
return [("", _("Not used"))] + \
[("auto", _("Auto"))] + \
[(x, x) for x in self.available_arch]
def humanReadable(self):
return self.Get() or _("Not used")
class VariableClImageLinuxShortname(DistroRepository, Variable):
"""
Filter by shortname
"""
value = ""
type = 'choiceedit'
metavalue = "SYSTEM"
opt = ['--os', '-s']
def init(self):
self.label = "%s %s" % (_("Filter"), _("by distribution"))
self.help = _("select the operation system")
def choice(self):
return [("", _("Not used"))] + [
("CLD", "Calculate Linux Desktop KDE"),
("CLDM", "Calculate Linux Desktop MATE"),
("CLDX", "Calculate Linux Desktop XFCE"),
("CLS", "Calculate Linux Scratch"),
("CDS", "Calculate Directory Server"),
("CSS", "Calculate Scratch Server"),
("CMC", "Calculate Media Center"),
]
def humanReadable(self):
return self.Get() or _("Not used")
class VariableClImageLinuxVer(DistroRepository, Variable):
"""
Filter by version
"""
value = ""
def init(self):
self.label = "%s %s" % (_("Filter"), _("by version"))
self.help = _("select the operation system by version")
def humanReadable(self):
return self.Get() or _("Not used")
class VariableClImageLinuxBuild(DistroRepository, Variable):
"""
Filter by build
"""
value = ""
def init(self):
self.label = "%s %s" % (_("Filter"), _("by build"))
self.help = _("select the operation system by build")
def humanReadable(self):
return self.Get() or _("Not used")
class VariableClImagePath(ReadonlyVariable):
"""
Image search path
"""
type = "list"
def get(self):
# if current distributive is live
if self.Get('os_root_type') == "livecd":
# if builder from flash then this source path '/mnt/flash'
# may be this path will be '/mnt/builder' for install
# modified system
if self.Get('os_scratch') == "on" and path.exists('/mnt/flash'):
livedistr = ['/mnt/flash']
# if system boot with kernel param 'docache'
elif path.exists('/mnt/squash'):
livedistr = ['/mnt/livecd']
# standard livecd
else:
if self.Get('os_install_root_type') == "flash":
livedistr = ['/run/initramfs/live',
'/run/initramfs/squashfs',
'/mnt/cdrom']
else:
livedistr = ['/run/initramfs/squashfs',
'/run/initramfs/live',
'/mnt/cdrom']
livedistr = filter(listDirectory,
livedistr)[:1]
else:
livedistr = []
# search all partition for source installation distributive
rootDev = self.Get('os_install_root_dev')
livedistr += \
map(lambda x: x[0],
filter(lambda x: " live" in x[1] and x[0] != rootDev,
zip(self.Get('os_disk_dev'),
self.Get('os_disk_content'))))
# add to standard path
return filter(path.exists,
['/var/calculate/remote/linux',
'/var/calculate/linux'] + livedistr)
class VariableClSource(ReadonlyVariable):
"""
Дистрибутив текущей системы
"""
type = "object"
def get(self):
return DirectoryDistributive('/')
class VariableClTarget(ReadonlyVariable):
"""
Target distributive
"""
type = "object"
def get(self):
listVars = ['os_install_disk_dev', 'os_install_disk_mount',
'os_install_disk_format', 'os_install_disk_perform_format',
'os_install_disk_part', 'os_install_disk_id']
rootLabel = "{short}-{ver}".format(
short=self.Get('os_install_linux_shortname'),
ver=self.Get('os_install_linux_ver'))
if self.Get('os_install_root_type') == "flash":
flashLabel = "{short}-{build}".format(
short="CL", build=self.Get('os_install_linux_build'))
disk = self.Get('os_install_disk_single')
fileSystem = "vfat"
systemId = FileSystemManager.supportFS.get(
'vfat', {}).get('msdos', '0b')
isFormat = self.GetBool('os_install_format_single_set')
partTable = self.select('os_disk_part',
os_disk_dev=disk, limit=1)
return FlashDistributive(
disk, mdirectory=DefaultMountPath.InstallMount,
check=True, fileSystem=fileSystem,
isFormat=isFormat, systemId=systemId,
rootLabel=flashLabel,
partitionTable=partTable)
osInstallScratch = self.isTrue(self.Get('os_install_scratch'))
mapDevId = dict(self.ZipVars('os_disk_dev', 'os_disk_id'))
disk, mount, fileSystem, isFormat, partTable, systemId = \
self.Select(listVars,
where='os_install_disk_mount',
eq='/', limit=1)
if not systemId or mapDevId.get(disk, '') == systemId:
systemId = None
if osInstallScratch:
raise VariableError("Scratch is not supported")
if self.Get('os_install_pxe') == "on":
return PxeDistributive(self.Get('os_install_pxe_path'))
else:
target = PartitionDistributive(
disk, mdirectory=DefaultMountPath.InstallMount,
check=True, fileSystem=fileSystem,
rootLabel=rootLabel,
isFormat=self.isTrue(isFormat),
systemId=systemId,
partitionTable=partTable)
multiPartition = None
diskData = self.Select(listVars,
where='os_install_disk_mount',
ne='/')
bindData = self.Select(['os_install_bind_path',
'os_install_bind_mountpoint'],
where='os_install_bind_mountpoint',
ne='')
if diskData or bindData:
multiPartition = MultiPartitions()
target.multipartition = multiPartition
for disk, mount, fileSystem, isFormat, partTable, systemId in diskData:
if not systemId or mapDevId.get(disk, '') == systemId:
systemId = None
multiPartition.addPartition(dev=disk,
mountPoint=mount,
fileSystem=fileSystem,
isFormat=self.isTrue(isFormat),
systemId=systemId,
partitionTable=partTable)
for source, dest in bindData:
multiPartition.addPartition(dev=source,
mountPoint=dest,
fileSystem='bind',
isFormat=False,
systemId=None,
partitionTable='')
return target
class VariableClImageNewOnly(Variable):
"""
Distributive image filename
"""
type = 'bool'
opt = ['-U', '--update']
value = "off"
def init(self):
self.label = _("Install the newer image only")
self.help = _("install the newer image only")
def installedBuild(self):
"""
Get build already installed system
Need for check update
"""
rootDev = self.Get('os_install_root_dev')
if not rootDev:
return ""
try:
imageData = Distributive().getInfo(rootDev)
return imageData.get('os_linux_build', '')
except Exception:
pass
return ""
def check(self, value):
if value == 'on':
try:
imageData = Distributive().getInfo(
self.Get('cl_image_filename'))
except Exception as e:
raise VariableError(_("Wrong image file"))
if imageData.get('os_linux_build', '') <= \
self.Get('os_linux_build') or \
imageData.get('os_linux_build',
'') <= self.installedBuild():
raise CommonVariableError(_("The image for update not found"))
class VariableClInstallPathFrom(ReadonlyVariable):
"""
Путь из устанавливаемой системы до устанавливающий системы
"""
def get(self):
template_path = pathJoin(self.Get('cl_chroot_path'),
self.Get('cl_root_path'))
return os.path.relpath("/", template_path)