You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
calculate-utils-3-console-gui/bin/cl-update-checker

266 lines
8.2 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2012-2016 Mir Calculate. http://www.calculate-linux.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
# logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
logger = logging.getLogger(__name__)
import os
from os import path
from PyQt5 import QtCore, QtGui, QtWidgets
import time
from calculate.consolegui.application.user_update import user_can_run_update
from calculate.lib.datavars import DataVars
import dbus
import dbus.service
import dbus.mainloop.glib
import pyinotify
from calculate.lib.utils.tools import debug
from calculate.consolegui.application.dbus_service import (
DBUS_NAME, DBUS_METHOD_APP, DBUS_MAINAPP, DBUS_APP_UPDATER,
DBUS_NAME_UPDATER)
from calculate.update.update_info import UpdateInfo
import sys
from calculate.lib.cl_lang import setLocalTranslate, getLazyLocalTranslate, _
setLocalTranslate('cl_consolegui3', sys.modules[__name__])
__ = getLazyLocalTranslate(_)
GUI_UPDATE_APP = DBUS_METHOD_APP % "update"
MINUTE = 60
def check_livecd_and_autocheck():
"""
Проверить необходимость запуска cl-update-checker
Не запускать на livecd или если выключена автоматическая проверка обновлений
"""
dv = DataVars()
try:
dv.importData()
dv.flIniFile()
if (dv.Get('os_root_type') == "livecd" or
dv.Get('update.cl_update_autocheck_set') == 'off'):
return False
return True
finally:
dv.close()
class SysTray(QtWidgets.QSystemTrayIcon):
ICON = "/usr/share/pixmaps/calculate-console-update.svg"
ICON_NO_UPDATE = "/usr/share/pixmaps/calculate-console-update-no.svg"
def __init__(self, parent):
super().__init__()
self.parent = parent
self.timeout = 0
self.activated.connect(self.systrayActivate)
self.icon_on = QtGui.QIcon()
self.icon_on.addPixmap(QtGui.QPixmap(self.ICON))
self.icon_off = QtGui.QIcon()
self.icon_off.addPixmap(QtGui.QPixmap(self.ICON_NO_UPDATE))
self.update(has_updates=False)
self.setVisible(True)
def run_cl_gui_update(self):
method = "update"
cmd = '/usr/bin/cl-console-gui-%s'%method
params = ["--skip-options"]
os.system("{cmd} {params} &>/dev/null &".format(
cmd=cmd, params=" ".join(params)))
def show_update(self):
try:
iface = self.parent.get_dbus_iface(DBUS_MAINAPP)
if iface:
iface.update()
return True
except dbus.DBusException as e:
pass
try:
iface = self.parent.get_dbus_iface(GUI_UPDATE_APP)
if iface:
iface.show()
return True
except dbus.DBusException as e:
pass
return False
def systrayActivate(self, reason):
if time.time() > self.timeout:
self.timeout = time.time() + 5
self.show_update() or self.run_cl_gui_update()
self.parent.step()
else:
self.show_update()
def update(self, has_updates=False):
if has_updates:
self.setVisible(True)
self.setIcon(self.icon_on)
self.setToolTip(_("Updates are available"))
else:
self.setIcon(self.icon_off)
self.setToolTip(_("No updates"))
self.setVisible(False)
class DBusChecker(dbus.service.Object):
def __init__(self, name, session, parent):
self.parent = parent
# export this object to dbus
dbus.service.Object.__init__(self, name, session)
self.parent.systray.setVisible(True)
@dbus.service.method(DBUS_NAME_UPDATER, in_signature='', out_signature='')
def hide_systray(self):
self.parent.systray.setVisible(False)
@dbus.service.method(DBUS_NAME_UPDATER, in_signature='', out_signature='')
def show_systray(self):
self.parent.systray.setVisible(True)
@dbus.service.method(DBUS_NAME_UPDATER, in_signature='', out_signature='b')
def ping(self):
return True
@dbus.service.method(DBUS_NAME_UPDATER, in_signature='', out_signature='')
def quit(self):
QtWidgets.qApp.quit()
@dbus.service.method(DBUS_NAME_UPDATER, in_signature='', out_signature='')
def check(self):
self.parent.step()
class CheckThread(QtWidgets.QMainWindow, UpdateInfo):
interval = 5 * MINUTE
def __init__(self, bus):
super().__init__()
UpdateInfo.__init__(self)
self.bus = bus
self.wm = pyinotify.WatchManager()
self.notifier = pyinotify.Notifier(self.wm, timeout=1)
self.wm.add_watch(path.dirname(UpdateInfo.update_file),
pyinotify.IN_DELETE | pyinotify.IN_CREATE,
self.event_step)
self.already_timer = QtCore.QTimer(self)
self.already_timer.timeout.connect(self.notify_step)
self.already_timer.start(1)
self.check_timer = QtCore.QTimer(self)
self.check_timer.timeout.connect(self.step)
self.check_timer.start(self.interval * 1)
self.systray = SysTray(self)
self.gui_runned = False
self.systray.update(has_updates=False)
self.step()
def get_dbus_iface(self, app_name):
try:
remote_object = self.bus.get_object(DBUS_NAME, app_name)
return dbus.Interface(remote_object, DBUS_NAME)
except dbus.DBusException:
return None
def ping_app(self, app_name):
try:
obj = self.get_dbus_iface(app_name)
if obj:
obj.ping()
return True
except dbus.DBusException:
pass
return False
def notify_step(self):
if self.notifier.check_events(timeout=1):
self.notifier.read_events()
self.notifier.process_events()
def event_step(self, event):
if event.name == path.basename(UpdateInfo.update_file):
self.step()
def is_console_gui_run(self):
return self.ping_app(DBUS_MAINAPP)
def is_gui_update_run(self):
return self.ping_app(GUI_UPDATE_APP)
def step(self):
if self.need_update():
self.systray.update(has_updates=True)
return
self.systray.update(has_updates=False)
if __name__ == '__main__':
import sys
pid = os.fork()
if pid != 0:
sys.exit(0)
if not user_can_run_update(True):
sys.stderr.write(_("User can not to perform the system update") + "\n")
sys.exit(1)
if not check_livecd_and_autocheck():
sys.stderr.write(
_("No need to run a live cd or disable autoupdates") + "\n")
sys.exit(1)
app = QtWidgets.QApplication(sys.argv)
for i in [0.5, 1, 2, 5]:
if QtWidgets.QSystemTrayIcon.isSystemTrayAvailable():
break
time.sleep(i)
# Enable glib main loop support
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
# Get the session bus
try:
bus = dbus.SessionBus()
except dbus.exceptions.DBusException as e:
sys.exit(1)
try:
remote_object = bus.get_object(DBUS_NAME_UPDATER, DBUS_APP_UPDATER)
g = dbus.Interface(remote_object, DBUS_NAME_UPDATER)
sys.exit(1)
except Exception as e:
pass
QtWidgets.QApplication.setQuitOnLastWindowClosed(False)
QtWidgets.QApplication.setApplicationName(_("Calculate Update Checker"))
# Export the service
name = dbus.service.BusName(DBUS_NAME_UPDATER, bus)
# Export the object
ct = CheckThread(bus)
DBusChecker(bus, DBUS_APP_UPDATER, ct)
sys.exit(app.exec_())