You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-update/pym/builder/variables/builder.py

812 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# Copyright 2015 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from os import path
import os
import re
from calculate.install import distr
from calculate.lib.utils.device import getUdevDeviceInfo, humanreadableSize
from calculate.lib.utils.files import isMount, process, typeFile, listDirectory, \
pathJoin
from calculate.lib.utils.kernel import InitrdFile
from calculate.lib.utils.tools import max_default
from ..build_storage import BuildStorage, Build
from ..drive_spool import DriveSpool
from calculate.lib.datavars import Variable, VariableError, ReadonlyVariable
from functools import wraps
from calculate.lib.cl_lang import setLocalTranslate
setLocalTranslate('cl_builder3',sys.modules[__name__])
def debug(func):
@wraps(func)
def _wrapped_func(*args, **kw):
ret = func(*args, **kw)
print "MYDEBUG",ret
return ret
return _wrapped_func
def is_action(*available_action, **action_kwargs):
def decorator(func):
@wraps(func)
def _wrapped_func(self, *args, **kw):
if self.Get('cl_action') in available_action:
return func(self, *args, **kw)
else:
return action_kwargs.get('default_value', '')
return _wrapped_func
return decorator
def as_list(func):
@wraps(func)
def _wrapped_func(self, *args, **kw):
return list(func(self, *args, **kw))
return _wrapped_func
class VariableClBuilderStorage(ReadonlyVariable):
type = "object"
def get(self):
return BuildStorage()
class VariableClBuilderAvailableDev(Variable):
"""
Список дисков, которые можно использовать для сборки
"""
type = "list"
class VariableClBuilderDeviceSpool(ReadonlyVariable):
"""
Стэк дисков, которые можно использовать для сборки
"""
type = "object"
def get(self):
ds = DriveSpool(self.Get('cl_builder_available_dev'))
return ds
class BaseBuildId(Variable):
"""
Базовый класс для переменной id
"""
opt = ["--id"]
metavalue = "ID"
def init(self):
self.label = _("Build ID")
self.help = _("build ID")
class VariableClBuilderSourceFilename(Variable):
"""
Названия файла исходного дистрибутива
"""
type = "file"
element = 'file'
opt = ["--source"]
value = ""
metavalue = "SOURCE"
def init(self):
self.label = _("Source distribution")
self.help = _("source distribution")
def check(self, isoimage):
"""Set image file"""
if self.Get('cl_action') == 'create' and not isoimage:
raise VariableError(_("You need to select a source image"))
shortname = self.Get('os_builder_linux_shortname')
build = self.Get('os_builder_linux_build')
arch = self.Get('os_builder_arch_machine')
if not build or not shortname or not arch:
raise VariableError(_("Wrong image file"))
def humanReadable(self):
fullname = self.Get('os_builder_linux_name')
subname = self.Get('os_builder_linux_subname')
ver = self.Get('os_builder_linux_ver')
if subname:
subname = " %s" % subname
if ver:
ver = " %s"%ver
arch = self.Get('os_builder_arch_machine')
build = self.Get('os_builder_linux_build')
return "{fullname}{ver} {arch} {build}".format(
fullname="%s%s" % (fullname, subname),
build=build, ver=ver, arch=arch)
class VariableClBuilderSource(ReadonlyVariable):
"""
Объект исходного дистрибутива для разворачивания сборки
"""
type = "object"
def get(self):
"""Get image file from distributive repository"""
try:
filename = self.Get('cl_builder_source_filename')
if filename:
return distr.Distributive.fromFile(filename)
except distr.DistributiveError:
pass
return ""
def humanReadable(self):
filename = self.Get('cl_builder_source')
if filename:
return filename.getType()
return filename
class VariableClBuilderDiskDev(Variable):
"""
Диск или директория, куда будет развёрнут образ
"""
untrusted = True
opt = ["-d", "--disk"]
metavalue = "DEST"
def init(self):
self.label = _("Assemble location")
self.help = _("partition or directory intended for assemble")
def get(self):
ds = self.Get('cl_builder_device_spool')
return ds.get() or ""
def check(self, value):
if value.startswith("/dev"):
cnDisk = getUdevDeviceInfo(name=value).get('DEVNAME', value)
if cnDisk not in self.Get('install.os_disk_dev'):
raise VariableError(_("Wrong device '%s'") % value)
if not value.startswith('/'):
raise VariableError(
_("Wrong directory '%s'") % value)
if isMount(value):
raise VariableError(
_("Destination '%s' is already in use") % value)
class VariableClBuilderDiskSize(ReadonlyVariable):
"""
"""
type = "int"
def init(self):
self.label = _("Free disk space")
def get_free_for(self, dn):
try:
for dfdn in (dn, path.dirname(dn)):
dfProcess = process("/bin/df", "--output=avail", dfdn)
data = dfProcess.readlines()
if len(data) > 1:
return int(data[1].strip()) * 1024
except ValueError:
return 0
return 0
def get_create(self):
device = self.Get('cl_builder_disk_dev')
if device:
if device.startswith('/dev/'):
return self.Select('install.os_disk_size',
where='install.os_disk_dev',
eq=device, limit=1) or "0"
else:
return str(self.get_free_for(device))
return "0"
def get(self):
action = self.Get('cl_action')
if action == 'create':
return self.get_create()
elif action in ('break', 'restore'):
return
def humanReadable(self):
return humanreadableSize(int(self.Get()))
class VariableClBuilderLayeredSet(Variable):
"""
Сборка будет выполняться в слое
"""
type = "bool"
opt = ["--layered"]
def init(self):
self.help = _("use layer for build")
self.label = _("Use layer for build")
def get(self):
return "off"
def check(self, value):
if self.Get('cl_builder_disk_dev').startswith('/dev'):
raise VariableError(
_("Layers are used for building in a directory only"))
if isinstance(self.Get('cl_builder_source'), distr.ArchiveDistributive):
raise VariableError(
_("Layers are used for building from iso image"))
class VariableClBuilderPath(ReadonlyVariable):
"""
Путь, где будет собираться дистрбутив
"""
def get(self):
image = self.Get('cl_builder_target')
if image:
return image.getDirectory()
return ""
class VariableClBuilderTarget(Variable):
"""
Объект собираемого дистрибутива
"""
type = "object"
def get_create(self):
builder_disk_dev = self.Get('cl_builder_disk_dev')
if not builder_disk_dev:
return ""
build_id = self.Get('cl_builder_id')
mount_dir = path.join(distr.DefaultMountPath.BaseMountPath, build_id)
source = self.Get('cl_builder_source_filename')
if self.GetBool('cl_builder_layered_set'):
dist_obj = distr.LayeredDistributive(mount_dir, builder_disk_dev,
source)
return dist_obj
else:
if builder_disk_dev.startswith('/dev'):
dist_obj = distr.PartitionDistributive(builder_disk_dev,
mdirectory=mount_dir)
return dist_obj
else:
dist_obj = distr.DirectoryDistributive(builder_disk_dev,
mdirectory=mount_dir)
return dist_obj
def get_worked(self):
build = self.Get('cl_builder_build')
if build:
dist_obj = build.distributive
return dist_obj
return ""
def get(self):
action = self.Get('cl_action')
if action == 'create':
return self.get_create()
elif action in ('break', 'restore', 'image'):
return self.get_worked()
else:
return ""
class VariableClBuilderClearSet(Variable):
"""
Очистить дистрибутив при отключении сборки
"""
type = "bool"
value = "off"
opt = ["--clear"]
def init(self):
self.label = _("Clear after unmount")
self.help = _("clear data after unmount")
class VariableClBuilderBuild(Variable):
"""
Объект сборки
"""
type = "object"
def get_create(self):
distr = self.Get('cl_builder_target')
if not distr:
return ""
storage = self.Get('cl_builder_storage')
buildid = self.Get('cl_builder_id')
build = Build(buildid, distr, storage)
return build
def get_worked(self):
storage = self.Get('cl_builder_storage')
buildid = self.Get('cl_builder_id')
build = storage.get_build(buildid)
if build and build.distributive:
build.distributive.reserve()
for child in build.distributive.childs:
child.reserve()
return build or ""
def get(self):
action = self.Get('cl_action')
if action == 'create':
return self.get_create()
if action in ('break', 'restore', 'image'):
return self.get_worked()
return ""
class VariableClBuilderNewId(BaseBuildId):
"""
Id сборки при развертывании
"""
value = ""
untrusted = True
def check(self, value):
if not value:
raise VariableError(_("Please specify the id"))
if value in self.Get('cl_builder_storage'):
raise VariableError(_("Assemble %s already exists")%value)
class VariableClBuilderPreparedId(BaseBuildId):
"""
Id развёрнутой сборки
"""
type = "choice"
untrusted = True
@as_list
def available(self):
bs = self.Get('cl_builder_storage')
action = self.Get('cl_action')
for x in bs:
build = bs.get_build(x)
if (build and (action == 'break' or
build.status == Build.Status.WORKED)):
yield x
def get(self):
l = self.available()
if l and len(l) == 1:
return l[0]
return ""
@as_list
def choice(self):
bs = self.Get('cl_builder_storage')
for x in bs:
build = bs.get_build(x)
if build:
if build.status == Build.Status.BROKEN:
yield (x, "%s (%s)" % (x, _("broken")))
else:
yield (x, x)
def check(self, value):
if not value:
raise VariableError(_("Please select the assemble id"))
l = self.available()
if not l:
raise VariableError(_("Assemble %s is not found") % value)
if (self.Get('cl_builder_build').status == Build.Status.BROKEN and
self.Get('cl_action') != 'break'):
raise VariableError(
_("Assemble %s is broken, try to restore build") % value)
class VariableClBuilderBrokenId(BaseBuildId):
"""
Id развёрнутой сборки
"""
type = "choice"
untrusted = True
@as_list
def available(self):
bs = self.Get('cl_builder_storage')
for x in bs:
build = bs.get_build(x)
if build and build.status == Build.Status.BROKEN:
yield x
@is_action('restore')
def get(self):
l = self.available()
if l and len(l) == 1:
return l[0]
return ""
@as_list
@is_action('restore', default_value=[])
def choice(self):
bs = self.Get('cl_builder_storage')
for x in bs:
build = bs.get_build(x)
if build.status == Build.Status.BROKEN:
yield (x, "%s (%s)" % (x, _("broken")))
def check(self, value):
if not value:
raise VariableError(_("Please select the assemble id"))
l = self.available()
if not l:
raise VariableError(_("Assemble %s is not found") % value)
class VariableClBuilderId(ReadonlyVariable):
"""
Общий id сборки
"""
def get(self):
action = self.Get('cl_action')
if action == 'create':
return self.Get('cl_builder_new_id')
elif action in ('break', 'image'):
return self.Get('cl_builder_prepared_id')
elif action == 'restore':
return self.Get('cl_builder_broken_id')
class VariableOsBuilderMakeopts(Variable):
"""
Параметры MAKEOPTS
"""
def get(self):
return self.Get('install.os_install_makeopts')
class VariableClBuilderBuildpkgSet(Variable):
"""
Собирать бинарные пакеты в сборке
"""
type = "bool"
value = "off"
class VariableClBuilderBasePath(Variable):
"""
Базовый путь до сборок (директория, куда будут помещены готовые
iso образы, бинарные пакеты и т.д.)
"""
value = "/var/calculate/remote/assemble"
class VariableClBuilderParentPath(ReadonlyVariable):
"""
Путь в ".." до родительской системы
"""
def get(self):
builder_path = self.Get('cl_builder_path')
return ("../"*len(filter(None,
builder_path.split('/'))))[:-1]
class VariableClBuilderPkgdir(Variable):
"""
Путь собираемых бинарных архивов
"""
def fallback(self):
return path.join(self.Get('cl_builder_base_path'),
self.Get('cl_builder_id'), "packages")
def get(self):
action = self.Get('cl_action')
if action == 'create':
return self.fallback()
elif action in ('restore', 'image'):
build = self.Get('cl_builder_build')
return build.pkgdir or self.fallback()
else:
return ""
class VariableClBuilderAction(ReadonlyVariable):
"""
Дополнительное действие по созданию образа: iso, squash.
"""
value = ""
class VariableClBuilderImageFilename(ReadonlyVariable):
"""
Название iso образа
"""
opt = ["--iso"]
value = ""
metavalue = "IMAGE"
def init(self):
self.label = _("Image name")
self.help = _("set image name")
def get(self):
base_dn = self.Get('cl_builder_base_path')
build_id = self.Get('cl_builder_id')
if build_id:
shortname = self.Get('os_builder_linux_shortname')
buildnumber = self.Get('os_builder_linux_build')
arch = self.Get('os_builder_arch_machine')
imagename = "%s-%s-%s.iso" % (shortname, buildnumber,
arch)
return path.join(base_dn, build_id, "linux", imagename)
return ""
class VariableClBuilderIsoBasePath(Variable):
"""
Базовый путь, где будут подготавливаться данные, которые будут запакованы в iso
"""
value = "/var/calculate/tmp"
class VariableClBuilderIsoPath(ReadonlyVariable):
"""
Путь, где будут подготавливаться данные, которые будут запакованы в iso
"""
def get(self):
base_dn = self.Get('cl_builder_iso_base_path')
build_id = self.Get('cl_builder_id')
if build_id:
dn = "iso-%s" % self.Get('cl_builder_id')
directory = path.join(base_dn, dn)
new_dn = directory
for i in range(0, 9999):
if not path.exists(new_dn):
return new_dn
else:
new_dn = "%s.%04d" % (directory, i)
return new_dn
return ""
class VariableClBuilderSquashPath(ReadonlyVariable):
"""
Путь от iso до содержимого squash
"""
@is_action('image')
def get(self):
return path.relpath(self.Get('cl_builder_path'),
self.Get('cl_builder_iso_path'))
class VariableClBuilderImage(ReadonlyVariable):
"""
Создаваемый образ
"""
@is_action('image')
def get(self):
image_name = self.Get('cl_builder_image_filename')
bdn = self.Get('cl_builder_iso_path')
iso = distr.IsoDistributive(image_name, bdirectory=bdn,
vol_id=self.Get('cl_builder_iso_label'))
return iso
class VariableClBuilderLiveSet(Variable):
"""
Вызывать только live шаблоны при первой загрузке
"""
type = "bool"
value = "on"
class VariableClBuilderCdname(ReadonlyVariable):
"""
Type of iso (CD/DVD)
"""
@is_action("image")
def get(self):
squashfile = pathJoin(self.Get('cl_builder_iso_path'),
self.Get('cl_builder_current_squash'))
kernelfile = pathJoin(self.Get('cl_builder_iso_path'),
self.Get('cl_builder_squash_path'),
'boot',
self.Get('cl_builder_kernel'))
initrdfile = pathJoin(self.Get('cl_builder_iso_path'),
self.Get('cl_builder_squash_path'),
'boot',
self.Get('cl_builder_initrd_install'))
if os.access(squashfile, os.R_OK) and os.access(kernelfile, os.R_OK) and \
os.access(initrdfile, os.R_OK):
isosize = path.getsize(squashfile) + path.getsize(kernelfile) + \
path.getsize(initrdfile) + 2 * 1024 * 1024
if isosize > 700 * 1024 * 1024:
return "DVD"
else:
return "CD"
return ""
class VariableClBuilderIsoLabel(Variable):
"""
LABEL для iso
"""
@is_action("image")
def get(self):
return "%s-%s" % (self.Get('os_builder_linux_shortname').upper(),
self.Get('os_builder_linux_build'))
class VariableClBuilderRootParam(Variable):
"""
параметр root= для livecd
"""
@is_action("image")
def get(self):
return "live:LABEL=%s" % self.Get('cl_builder_iso_label')
class VariableClBuilderCurrentSquash(ReadonlyVariable):
"""
Создаваемый livecd.squash
"""
value = "livecd.squashfs"
class VariableClBuilderKernelCmd(ReadonlyVariable):
"""
Параметры по умолчанию для calcboot
"""
value = ""
class VariableClBuilderKernelVer(ReadonlyVariable):
"""
Текущая версия ядра
"""
def init(self):
self.label = _("Kernel version")
def get(self):
prefix = self.Get('cl_builder_path')
if prefix:
current_src = self.get_current_kernel_src(prefix)
src = path.join(prefix,current_src)
return self.get_src_kernel_version(src)
def get_config_version(self, configfile):
re_config = re.compile("Automatically generated file;.*\n"
".*?Linux/\S+\s+(\S+)\s", re.M)
if path.exists(configfile):
with open(configfile) as f:
match = re_config.search(f.read(200))
if match:
return match.group(1)
def get_src_kernel_version(self, kernel_src):
"""
Get version of kernel from .config
"""
config_path = path.join(kernel_src, ".config")
makefile_path = path.join(kernel_src, "Makefile")
# get version from config
version = self.get_config_version(config_path)
if version:
return version
# get version from Makefile
re_makefile = re.compile("^VERSION = (\S+)\n"
"PATCHLEVEL = (\S+)\n"
"SUBLEVEL = (\S+)\n"
"EXTRAVERSION = (\S*)\n", re.M)
if path.exists(makefile_path):
with open(makefile_path) as f:
match = re_makefile.search(f.read(200))
if match:
return "{0}.{1}.{2}{3}".format(*match.groups())
return ""
def get_current_kernel_src(self, prefix):
src_path = "usr/src"
current_linux_src = path.join(src_path, "linux")
symlink_kernel = path.join(prefix, current_linux_src)
if not path.exists(symlink_kernel) or not path.islink(symlink_kernel):
raise ValueError("Failed to determine current kernel version")
return path.join(src_path, os.readlink(symlink_kernel))
class KernelData(ReadonlyVariable):
"""
Данные о текущем ядре
"""
kernel_object = "kernel"
file_description = ""
def filter(self, x, version=None):
return x
def list(self, prefix='/', bootdir='boot'):
boot_dir = path.join(prefix, bootdir)
return self.get_files_by_type(boot_dir, self.file_description)
def get(self):
prefix = self.Get('cl_builder_path')
version = self.Get('cl_builder_kernel_ver')
obj_file = max_default(
self.filter(self.list(prefix), version=version),
key=path.getmtime,
default="")
if obj_file:
obj_file = path.basename(obj_file)
return obj_file
def get_files_by_type(self, pathname, descr):
ftype = typeFile(magic=0x4).getMType
for x in listDirectory(pathname, fullPath=True):
if descr in ftype(x):
yield x
class VariableClBuilderInitrdInstall(KernelData):
"""
Текущий initrd
"""
file_description = "ASCII cpio archive"
def init(self):
self.label = _("Init RAM fs")
def filter(self, iterable, version=None):
for fn in iterable:
if InitrdFile(fn).get_kernel_version() == version:
yield fn
class VariableClBuilderKernel(KernelData):
"""
Текущее ядро
"""
file_description = "boot executable bzImage"
def init(self):
self.label = _("Kernel file")
def filter(self, iterable, version=None):
ftype = typeFile(magic=0x4).getMType
re_kver = re.compile("bzImage, version (\S+)\s")
for fn in iterable:
m = re_kver.search(ftype(fn))
if m.group(1) == version:
yield fn
class VariableClBuilderTemplateLocation(Variable):
"""
Устанавливаются только дистрибутивные шаблоны
"""
def get(self):
return [x for x in self.Get('main.cl_template_location')
if x not in ('remote', 'local')]
class VariableClBuilderBranchName(Variable):
"""
Ветки оверлеев используемые для сборки системы
"""
type = "list"
def get(self):
# TODO: заглушка
return self.Get('update.cl_update_branch_name')