You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-2.2-lib/pym/update_config/cl_update_config.py

370 lines
16 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#-*- coding: utf-8 -*-
# Copyright 2010 Calculate Ltd. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "2.2.1"
__app__ = "calculate-lib"
import sys, os, stat, re
from shutil import copy2
from cl_log import log
import cl_datavars
import cl_template
from cl_print import color_print as old_color_print
from cl_utils import runOsCommand, scanDirectory, pathJoin
import cl_overriding
from cl_utils import getModeFile
import cl_lang
# Перевод модуля
tr = cl_lang.lang()
tr.setLocalDomain('cl_lib')
tr.setLanguage(sys.modules[__name__])
cl_overriding.printERROR = lambda x:filter(lambda y:color_print().printERROR(y),
str(x).splitlines())
class writeLog:
"""Класс логгирования"""
logger = log("apply-templates",
filename="/var/log/calculate/update_config.log",
formatter="%(asctime)s - %(levelname)s - %(message)s")
class color_print(old_color_print, writeLog):
"""Класс для переопределения печати сообщений"""
_printObj = old_color_print()
def printERROR(self, *arg, **argv):
"""Вывод на печать ошибки"""
# Запись в log ошибки
self.logger.error(arg[0])
self._printObj.printERROR(*arg, **argv)
def printSUCCESS(self, *arg, **argv):
"""Вывод на печать в случае успеха без [ok] справа"""
# Запись в log информации
self.logger.info(arg[0])
self._printObj.printSUCCESS(*arg, **argv)
def printWARNING(self, *arg, **argv):
"""Вывод на печать предупреждения"""
# Запись в log предупреждения
self.logger.warn(arg[0])
self._printObj.printWARNING(*arg, **argv)
def printOK(self, *arg, **argv):
"""Вывод на печать в случае успеха с [ok] справа"""
# Запись в log информации
self.logger.info(arg[0])
self._printObj.printOK(*arg, **argv)
def printNotOK(self, *arg, **argv):
"""Вывод на печать в случае сбоя"""
# Запись в log предупреждения
self.logger.error(arg[0])
self._printObj.printNotOK(*arg, **argv)
class DataVarsObject(cl_datavars.DataVars):
"""Класс переменных для десктопа"""
packagePath = "/usr/lib/calculate-2.2"
def __init__(self, nameProgram):
cl_datavars.DataVars.__init__(self)
self.nameProgram = nameProgram
def findPathVars(self):
pymPath = os.path.join(self.packagePath, self.nameProgram, "pym")
if os.path.isdir(pymPath) and\
len(filter(lambda x:x.startswith("cl_vars_") and\
x.endswith(".py") or x.startswith("cl_fill_") and\
x.endswith(".py"), os.listdir(pymPath)))==2:
importPath = os.path.abspath(pymPath)
if not importPath in sys.path:
sys.path.insert(0, importPath)
return True
return False
def importDataObject(self, **args):
'''Заполнить конфигурацию переменных, для десктопа'''
# заполнить переменные окружения алгоритмом по умолнанию
sectName = self.nameProgram.rpartition("-")[2]
self.importData(sectName, ('cl_vars_%s' %sectName,
'cl_fill_%s' %sectName))
self.flIniFile()
class shareUpdateConfigs(color_print, writeLog):
"""Общие методы для обновления конфигурационных файлов"""
reCleanVer = re.compile("\d+\.?\d*\.?\d*")
_tupleOn = ("on","On","oN","ON")
def getFlagUpdAndInstPrograms(self):
"""Получаем флаг обновления и
установленные программы работающие с шаблонами"""
clVars = cl_datavars.DataVars()
clVars.flIniFile()
flagUpdate = clVars.Get("cl_autoupdate_set") in self._tupleOn
return flagUpdate, clVars.GetList("cl_merges")
class updateUserConfigs(shareUpdateConfigs):
"""Обновление пользовательских конфигурационных файлов"""
def getXUsers(self):
"""Имена пользователей в X сессии"""
xSession = 0
foundTwoSession = False
retCode, resWho = runOsCommand("who")
xUsers = []
if retCode==0:
if resWho:
listProcessing = lambda x: (x[0], x[1], x[4])\
if len(x)==5 else []
xUsers = list(set(filter(lambda x: x!="root",
map(lambda x: x[0],
filter(lambda x: x and\
(x[2].startswith("(:") or \
x[1].startswith(":")),
map(lambda x: listProcessing(\
filter(lambda y: y, x.split(" "))),
resWho))))))
else:
self.printERROR(_("Can not execute 'who'"))
return False
return xUsers
def updateConfig(self, nameProgram, category, version):
"""Обновление конфигурационных файлов у пользователей"""
# флаг обновления и программы используемые для наложения шаблонов
flagUpdate, mergePrograms = self.getFlagUpdAndInstPrograms()
# Пользователи в X сессии
xUsers = self.getXUsers()
if not xUsers:
self.logger.info(_("Package %s") %nameProgram)
self.logger.warn(_("Not found X sessions users"))
return True
cleanVer = self.reCleanVer.search(version)
if cleanVer:
version = cleanVer.group()
self.logger.info(_("Package %s") %nameProgram)
self.logger.info(_("Update desktop configuration files"))
if "calculate-desktop" in mergePrograms:
mergePrograms = ["calculate-desktop"]
else:
mergePrograms = []
dictPakkages = {}
listIndex = []
# Добавление условия, что программа category/nameProgram
# установлена
cl_template.templateFunction.installProg.update(\
{"%s/%s"%(category,nameProgram):[version],
"%s"%(nameProgram):[version]})
for mergeProgram in mergePrograms:
for userName in xUsers:
clVars = DataVarsObject(mergeProgram)
if not clVars.findPathVars():
continue
clVars.importDataObject()
clVars.Set("ur_login", userName, True)
clVars.Set("cl_action", "desktop", True)
clVars.Set("cl_belong_pkg", nameProgram, True)
clTempl = cl_template.template(clVars, printWarning=False)
dirsFiles = clTempl.applyTemplates()
if dirsFiles is False:
self.printERROR(\
_("Error using templates for the user %s")\
%userName)
for errMess in clTempl.getError().splitlines():
self.printERROR(errMess)
return False
if dirsFiles and dirsFiles[1]:
nameAndVerPkg = clVars.Get("cl_name")+"-"+\
clVars.Get("cl_ver")
if not nameAndVerPkg in dictPakkages:
listIndex.append(nameAndVerPkg)
dictPakkages[nameAndVerPkg] = []
dictPakkages[nameAndVerPkg].append((userName,
sorted(list(set(dirsFiles[1])))))
if dictPakkages:
for calcPkg in listIndex:
self.printWARNING(_("Package %s has changed files")\
%calcPkg+":")
for userName, configFiles in dictPakkages[calcPkg]:
self.printWARNING(" "*2 + _("User %s")%userName + ":")
for nameConfigFile in configFiles:
self.printWARNING(" "*5 + nameConfigFile)
if not dictPakkages:
self.logger.warn(_("Not found templates"))
return True
class updateSystemConfigs(shareUpdateConfigs):
"""Обновление системных конфигурационных файлов"""
def isExistsProtectFiles(self, configPath):
"""Есть ли в защищенных директориях конфигурационные файлы"""
if not "CONFIG_PROTECT" in os.environ:
self.printERROR(_("Missing environment variable CONFIG_PROTECT"))
exit(1)
protectPaths = ["/etc"] + filter(lambda x: x.strip(),
os.environ["CONFIG_PROTECT"].split(" "))
flagFoundProtect = False
for pPath in protectPaths:
fPath = os.path.join(configPath, pPath[1:])
if os.path.exists(fPath) and os.listdir(fPath):
flagFoundProtect = True
break
if not flagFoundProtect:
return False
return True
def scanProtectDirs(self, configPath):
configFiles = []
scanObj = scanDirectory()
scanObj.processingFile = lambda path,prefix:configFiles.append(path) or\
True
protectPaths = ["/etc"] + filter(lambda x: x.strip(),
os.environ["CONFIG_PROTECT"].split(" "))
configPath = os.path.realpath(configPath)
for pPath in protectPaths:
realPath = pathJoin(configPath, pPath)
if os.path.exists(realPath):
scanObj.scanningDirectory(realPath)
configFiles = map(lambda x: x.partition(configPath)[2], configFiles)
configFiles = map(lambda x: pathJoin('/',x), configFiles)
return configFiles
def createDir(self, configPath, dstDir):
"""Создание директории в случае необходимости"""
if os.path.exists(dstDir):
return True
def splPath(path):
listPath = []
if path in ("","/"):
return []
base, p = os.path.split(path)
listPath.append(p)
while(not base in ("","/")):
base, p = os.path.split(base)
listPath.append(p)
listPath.reverse()
return listPath
notFoundPaths = []
path = "/"
for p in splPath(dstDir):
path = os.path.join(path,p)
if not os.path.exists(path):
notFoundPaths.append(path)
for mkPath in notFoundPaths:
srcPath = pathJoin(configPath, mkPath)
dMode, dUid, dGid = getModeFile(srcPath)
os.mkdir(mkPath, dMode)
os.chown(mkPath, dUid, dGid)
return True
def copyConfigFiles(self, configPath):
"""Копирование конфигурационных файлов"""
configDstFiles = self.scanProtectDirs(configPath)
if configDstFiles:
self.logger.warn(_("Replace files:"))
for dst in configDstFiles:
src = pathJoin(configPath, dst)
if src != dst:
dstDir = os.path.dirname(dst)
self.createDir(configPath, dstDir)
copy2(src, dst)
sUid, sGid = getModeFile(src, mode="owner")
os.chown(dst, sUid, sGid)
self.logger.warn(" "*5 + dst)
return True
def updateConfig(self, nameProgram, category, version, configPath):
"""Обновление системных конфигурационных файлов"""
# флаг обновления и программы используемые для наложения шаблонов
flagUpdate, mergePrograms = self.getFlagUpdAndInstPrograms()
# Проверка есть ли в пакете защищенные конфигурационные файлы
if not self.isExistsProtectFiles(configPath):
self.logger.info(_("Package %s") %nameProgram)
self.logger.warn(_("Not found protected files"))
return True
cleanVer = self.reCleanVer.search(version)
if cleanVer:
version = cleanVer.group()
self.logger.info(_("Package %s") %nameProgram)
self.logger.info(_("Update system cofiguration files"))
if not os.path.exists(configPath):
self.printERROR(_("Path '%s' does not exist")%configPath)
return False
dictPakkages = {}
listIndex = []
# Добавление условия, что программа category/nameProgram установлена
cl_template.templateFunction.installProg.update(\
{"%s/%s"%(category,nameProgram):[version],
"%s"%(nameProgram):[version]})
clTempl = False
for mergeProgram in mergePrograms:
clVars = DataVarsObject(mergeProgram)
if not clVars.findPathVars():
continue
clVars.importDataObject()
clVars.Set("cl_root_path", configPath, True)
clVars.Set("cl_belong_pkg", nameProgram, True)
clVars.Set("cl_action", 'merge', True)
configFiles = []
nameProg = clVars.Get("cl_name")
if nameProg == "calculate-install":
configFiles = self.scanProtectDirs(configPath)
if configFiles:
cltObject = cl_template.templateClt(clVars)
cltObject.filterApplyTemplates = configFiles
clTempl = cl_template.template(clVars, cltObj=cltObject,
printWarning=False)
else:
clTempl = cl_template.template(clVars, cltObj=False,
printWarning=False)
dirsFiles = clTempl.applyTemplates()
nameAndVerPkg = nameProg + "-"+clVars.Get("cl_ver")
if dirsFiles is False:
self.printERROR(_("Error template in a package %s")\
%nameAndVerPkg)
for errMess in clTempl.getError().splitlines():
self.printERROR(errMess)
return False
if dirsFiles and dirsFiles[1]:
if not nameAndVerPkg in listIndex:
listIndex.append(nameAndVerPkg)
dictPakkages[nameAndVerPkg] =\
sorted(list(set(dirsFiles[1])))
if dictPakkages:
for calcPkg in listIndex:
self.printWARNING(_("Package %s has changed files")%calcPkg+":")
for nameF in dictPakkages[calcPkg]:
nameFile = nameF.partition(configPath)[2]
if nameFile[:1] != "/":
nameFile = "/" + nameFile
self.printWARNING(" "*5 + nameFile)
else:
self.logger.warn(_("Not found templates"))
if flagUpdate:
self.copyConfigFiles(configPath)
if clTempl and clTempl.getWarning():
cl_overriding.printSUCCESS("")
for warn in clTempl.getWarning().split("\n"):
self.printWARNING(warn)
cl_overriding.printSUCCESS("")
return True