#-*- coding: utf-8 -*- # Copyright 2010 Calculate Ltd. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. __version__ = "2.2.16" __app__ = "calculate-lib" import sys, os, stat, re from shutil import copy2 from cl_log import log import cl_datavars import cl_template from cl_print import color_print as old_color_print from cl_utils import runOsCommand, scanDirectory, pathJoin import cl_overriding from cl_utils import getModeFile import cl_lang # Перевод модуля tr = cl_lang.lang() tr.setLocalDomain('cl_lib') tr.setLanguage(sys.modules[__name__]) cl_overriding.printERROR = lambda x:filter(lambda y:color_print().printERROR(y), str(x).splitlines()) class writeLog: """Класс логгирования""" logger = log("apply-templates", filename="/var/log/calculate/update_config.log", formatter="%(asctime)s - %(levelname)s - %(message)s") class DataVars(cl_datavars.DataVars): flagNotFoundVar = False def Get(self, *args, **kwargs): try: valVars = cl_datavars.DataVars.Get(self, *args, **kwargs) except self.DataVarsError, e: valVars = "" self.flagNotFoundVar = True return valVars def Set(self, *args, **kwargs): try: valVars = cl_datavars.DataVars.Set(self, *args, **kwargs) except self.DataVarsError, e: valVars = "" self.flagNotFoundVar = True return valVars class template(cl_template.template): def getApplyHeadTemplate(self, *args, **kwargs): if self.objVar.flagNotFoundVar: self.objVar.flagNotFoundVar = False return ([], False) filesApply, objHeadNew =\ cl_template.template.getApplyHeadTemplate(self, *args, **kwargs) if self.objVar.flagNotFoundVar: self.objVar.flagNotFoundVar = False return ([], False) else: return filesApply, objHeadNew def getApplyHeadDir(self, *args, **kwargs): if self.objVar.flagNotFoundVar: self.objVar.flagNotFoundVar = False return ("", False, []) pathDir, objHeadDir, createdDirs =\ cl_template.template.getApplyHeadDir(self, *args, **kwargs) if self.objVar.flagNotFoundVar: self.objVar.flagNotFoundVar = False return ("", False, []) else: return pathDir, objHeadDir, createdDirs class templateClt(cl_template.templateClt): def getApplyHeadTemplate(self, *args, **kwargs): if self.objVar.flagNotFoundVar: self.objVar.flagNotFoundVar = False return ([], False) filesApply, objHeadNew =\ cl_template.templateClt.getApplyHeadTemplate(self, *args, **kwargs) if self.objVar.flagNotFoundVar: self.objVar.flagNotFoundVar = False return ([], False) else: return filesApply, objHeadNew def getApplyHeadDir(self, *args, **kwargs): if self.objVar.flagNotFoundVar: self.objVar.flagNotFoundVar = False return ("", False, []) pathDir, objHeadDir, createdDirs =\ cl_template.templateClt.getApplyHeadDir(self, *args, **kwargs) if self.objVar.flagNotFoundVar: self.objVar.flagNotFoundVar = False return ("", False, []) else: return pathDir, objHeadDir, createdDirs class color_print(old_color_print, writeLog): """Класс для переопределения печати сообщений""" _printObj = old_color_print() def printERROR(self, *arg, **argv): """Вывод на печать ошибки""" # Запись в log ошибки self.logger.error(arg[0]) self._printObj.printERROR(*arg, **argv) def printSUCCESS(self, *arg, **argv): """Вывод на печать в случае успеха без [ok] справа""" # Запись в log информации self.logger.info(arg[0]) self._printObj.printSUCCESS(*arg, **argv) def printWARNING(self, *arg, **argv): """Вывод на печать предупреждения""" # Запись в log предупреждения self.logger.warn(arg[0]) self._printObj.printWARNING(*arg, **argv) def printOK(self, *arg, **argv): """Вывод на печать в случае успеха с [ok] справа""" # Запись в log информации self.logger.info(arg[0]) self._printObj.printOK(*arg, **argv) def printNotOK(self, *arg, **argv): """Вывод на печать в случае сбоя""" # Запись в log предупреждения self.logger.error(arg[0]) self._printObj.printNotOK(*arg, **argv) class DataVarsObject(DataVars): """Класс переменных для десктопа""" packagePath = "/usr/lib/calculate-2.2" def __init__(self, nameProgram): DataVars.__init__(self) self.nameProgram = nameProgram def findPathVars(self): pymPath = os.path.join(self.packagePath, self.nameProgram, "pym") if os.path.isdir(pymPath) and\ len(filter(lambda x:x.startswith("cl_vars_") and\ x.endswith(".py") or x.startswith("cl_fill_") and\ x.endswith(".py"), os.listdir(pymPath)))==2: importPath = os.path.abspath(pymPath) if not importPath in sys.path: sys.path.insert(0, importPath) return True return False def importDataObject(self, **args): '''Заполнить конфигурацию переменных, для десктопа''' # заполнить переменные окружения алгоритмом по умолнанию sectName = self.nameProgram.rpartition("-")[2] self.importData(sectName, ('cl_vars_%s' %sectName, 'cl_fill_%s' %sectName)) self.flIniFile() class shareUpdateConfigs(color_print, writeLog): """Общие методы для обновления конфигурационных файлов""" _tupleOn = ("on","On","oN","ON") def getFlagUpdAndInstPrograms(self): """Получаем флаг обновления и установленные программы работающие с шаблонами""" clVars = DataVars() clVars.flIniFile() flagUpdate = clVars.Get("cl_autoupdate_set") in self._tupleOn return flagUpdate, clVars.GetList("cl_merges") class updateUserConfigs(shareUpdateConfigs): """Обновление пользовательских конфигурационных файлов""" def getXUsers(self): """Имена пользователей в X сессии""" xSession = 0 foundTwoSession = False retCode, resWho = runOsCommand("who") xUsers = [] if retCode==0: if resWho: listProcessing = lambda x: (x[0], x[1], x[4])\ if len(x)==5 else [] xUsers = list(set(filter(lambda x: x!="root", map(lambda x: x[0], filter(lambda x: x and\ (x[2].startswith("(:") or \ x[1].startswith(":")), map(lambda x: listProcessing(\ filter(lambda y: y, x.split(" "))), resWho)))))) else: self.printERROR(_("Can not execute 'who'")) return False return xUsers def updateConfig(self, nameProgram, category, version): """Обновление конфигурационных файлов у пользователей""" # флаг обновления и программы используемые для наложения шаблонов flagUpdate, mergePrograms = self.getFlagUpdAndInstPrograms() # Пользователи в X сессии xUsers = self.getXUsers() if not xUsers: self.logger.info(_("Package %s") %nameProgram) self.logger.warn(_("Not found X sessions users")) return True self.logger.info(_("Package %s") %nameProgram) self.logger.info(_("Update desktop configuration files")) if "calculate-desktop" in mergePrograms: mergePrograms = ["calculate-desktop"] else: mergePrograms = [] dictPakkages = {} listIndex = [] # Добавление условия, что программа category/nameProgram # установлена cl_template.templateFunction.installProg.update(\ {"%s/%s"%(category,nameProgram):[version], "%s"%(nameProgram):[version]}) for mergeProgram in mergePrograms: for userName in xUsers: clVars = DataVarsObject(mergeProgram) if not clVars.findPathVars(): continue clVars.importDataObject() clVars.Set("ur_login", userName, True) clVars.Set("cl_action", "desktop", True) clVars.Set("cl_belong_pkg", nameProgram, True) clTempl = template(clVars, printWarning=False) dirsFiles = clTempl.applyTemplates() if dirsFiles is False: self.printERROR(\ _("Error using templates for the user %s")\ %userName) for errMess in clTempl.getError().splitlines(): self.printERROR(errMess) return False if dirsFiles and dirsFiles[1]: nameAndVerPkg = clVars.Get("cl_name")+"-"+\ clVars.Get("cl_ver") if not nameAndVerPkg in dictPakkages: listIndex.append(nameAndVerPkg) dictPakkages[nameAndVerPkg] = [] dictPakkages[nameAndVerPkg].append((userName, sorted(list(set(dirsFiles[1]))))) if dictPakkages: for calcPkg in listIndex: self.printWARNING(_("Package %s has changed files")\ %calcPkg+":") for userName, configFiles in dictPakkages[calcPkg]: self.printWARNING(" "*2 + _("User %s")%userName + ":") for nameConfigFile in configFiles: self.printWARNING(" "*5 + nameConfigFile) if not dictPakkages: self.logger.warn(_("Not found templates")) return True class updateSystemConfigs(shareUpdateConfigs): """Обновление системных конфигурационных файлов""" def isExistsProtectFiles(self, configPath): """Есть ли в защищенных директориях конфигурационные файлы""" if not "CONFIG_PROTECT" in os.environ: self.printERROR(_("Missing environment variable CONFIG_PROTECT")) exit(1) protectPaths = ["/etc"] + filter(lambda x: x.strip(), os.environ["CONFIG_PROTECT"].split(" ")) flagFoundProtect = False for pPath in protectPaths: fPath = os.path.join(configPath, pPath[1:]) if os.path.exists(fPath) and os.listdir(fPath): flagFoundProtect = True break if not flagFoundProtect: return False return True def scanProtectDirs(self, configPath): configFiles = [] scanObj = scanDirectory() scanObj.processingFile = lambda path,prefix:configFiles.append(path) or\ True protectPaths = ["/etc"] + filter(lambda x: x.strip(), os.environ["CONFIG_PROTECT"].split(" ")) configPath = os.path.realpath(configPath) for pPath in protectPaths: realPath = pathJoin(configPath, pPath) if os.path.exists(realPath): scanObj.scanningDirectory(realPath) configFiles = map(lambda x: x.partition(configPath)[2], configFiles) configFiles = map(lambda x: pathJoin('/',x), configFiles) return configFiles def createDir(self, configPath, dstDir): """Создание директории в случае необходимости""" if os.path.exists(dstDir): return True def splPath(path): listPath = [] if path in ("","/"): return [] base, p = os.path.split(path) listPath.append(p) while(not base in ("","/")): base, p = os.path.split(base) listPath.append(p) listPath.reverse() return listPath notFoundPaths = [] path = "/" for p in splPath(dstDir): path = os.path.join(path,p) if not os.path.exists(path): notFoundPaths.append(path) for mkPath in notFoundPaths: srcPath = pathJoin(configPath, mkPath) dMode, dUid, dGid = getModeFile(srcPath) os.mkdir(mkPath, dMode) os.chown(mkPath, dUid, dGid) return True def copyConfigFiles(self, configPath): """Копирование конфигурационных файлов""" configDstFiles = self.scanProtectDirs(configPath) if configDstFiles: self.logger.warn(_("Replace files:")) for dst in configDstFiles: src = pathJoin(configPath, dst) if src != dst: dstDir = os.path.dirname(dst) self.createDir(configPath, dstDir) copy2(src, dst) sMode, sUid, sGid = getModeFile(src) os.chown(dst, sUid, sGid) os.chmod(dst, sMode) self.logger.warn(" "*5 + dst) return True def copyDirOrFile(self, src, dst, configPath): if src != dst: if os.path.isfile(src): dstDir = os.path.dirname(dst) self.createDir(configPath, dstDir) copy2(src, dst) sMode, sUid, sGid = getModeFile(src) os.chown(dst, sUid, sGid) os.chmod(dst, sMode) elif os.path.isdir(src): self.createDir(configPath, dst) sMode, sUid, sGid = getModeFile(src) os.chown(dst, sUid, sGid) os.chmod(dst, sMode) def updateConfig(self, nameProgram, category, version, configPath): """Обновление системных конфигурационных файлов""" # флаг обновления и программы используемые для наложения шаблонов flagUpdate, mergePrograms = self.getFlagUpdAndInstPrograms() self.logger.info(_("Package %s") %nameProgram) self.logger.info(_("Update system cofiguration files")) if not os.path.exists(configPath): self.printERROR(_("Path '%s' does not exist")%configPath) return False dictPakkages = {} listIndex = [] # Добавление условия, что программа category/nameProgram установлена cl_template.templateFunction.installProg.update(\ {"%s/%s"%(category,nameProgram):[version], "%s"%(nameProgram):[version]}) clTempl = False for mergeProgram in mergePrograms: clVars = DataVarsObject(mergeProgram) if not clVars.findPathVars(): continue clVars.importDataObject() clVars.Set("cl_root_path", configPath, True) clVars.Set("cl_belong_pkg", nameProgram, True) clVars.Set("cl_action", 'merge', True) configFiles = [] nameProg = clVars.Get("cl_name") if nameProg == "calculate-install": configFiles = self.scanProtectDirs(configPath) if configFiles: cltObject = templateClt(clVars) cltObject.filterApplyTemplates = configFiles clTempl = template(clVars, cltObj=cltObject, printWarning=False) else: clTempl = template(clVars, cltObj=False, printWarning=False) dirsFiles = clTempl.applyTemplates() nameAndVerPkg = nameProg + "-"+clVars.Get("cl_ver") if dirsFiles is False: self.printERROR(_("Error template in a package %s")\ %nameAndVerPkg) for errMess in clTempl.getError().splitlines(): self.printERROR(errMess) return False copyFiles = clTempl.autoUpdateFiles copyDirs = clTempl.autoUpdateDirs allCopyAutoupdateFiles = copyDirs + copyFiles for fileOrDir in allCopyAutoupdateFiles: dst = "/" + fileOrDir.partition(configPath)[2] self.copyDirOrFile(fileOrDir, dst, configPath) if dirsFiles and dirsFiles[1]: if not nameAndVerPkg in listIndex: listIndex.append(nameAndVerPkg) dictPakkages[nameAndVerPkg] =\ sorted(list(set(dirsFiles[1]))) if dictPakkages: for calcPkg in listIndex: self.printWARNING(_("Package %s has changed files")%calcPkg+":") for nameF in dictPakkages[calcPkg]: nameFile = nameF.partition(configPath)[2] if nameFile: if nameFile[:1] != "/": nameFile = "/" + nameFile else: nameFile = nameF self.printWARNING(" "*5 + nameFile) else: self.logger.warn(_("Not found templates")) if flagUpdate: self.copyConfigFiles(configPath) if clTempl and clTempl.getWarning(): cl_overriding.printSUCCESS("") for warn in clTempl.getWarning().split("\n"): self.printWARNING(warn) cl_overriding.printSUCCESS("") return True