# -*- coding: utf-8 -*- # Copyright 2018 Mir Calculate. http://www.calculate-linux.org # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys import signal from os import environ from multiprocessing import Process, Queue print "Import QtGUI" try: from PyQt5 import QtCore from PyQt5 import QtGui from PyQt5 import QtWidgets class ControlledProgressDialog(QtWidgets.QProgressDialog): """ QProgressDialog controlled by pipe """ def __init__(self, inQueue, outQueue, *args, **kwargs): QtWidgets.QProgressDialog.__init__(self, *args, **kwargs) self.progress = QtWidgets.QProgressBar(self) self.progress.setFormat("%p%") self.setBar(self.progress) self.inQueue = inQueue self.outQueue = outQueue self.timer = QtCore.QTimer(self) if not hasattr(self.timer.timeout, "connect"): self.timer.timeout.connect = lambda x: x self.timer.timeout.connect(self.dispatcher) self.timer.start(50) self.center() def setTextVisible(self, visible): self.progress.setTextVisible(visible) def center(self): screen = QtWidgets.QDesktopWidget().screenGeometry() size = self.geometry() self.move((screen.width() - size.width()) / 2, (screen.height() - size.height()) / 2) @QtCore.pyqtSlot() def dispatcher(self): """ Dispatcher called by 50ms """ while not self.inQueue.empty(): cmd, args, ret = self.inQueue.get() if cmd == "quit": self.timer.stop() self.close() else: res = getattr(self, cmd)(*args) if ret: self.outQueue.put(res) self.center() def sigint_handler(*args): pass class ClMessageBox(object): """ ProgressDialog in other process """ proc = None def runProgress(self, message, typemes="warning"): signal.signal(signal.SIGINT, sigint_handler) app = QtWidgets.QApplication(sys.argv) getattr(QtWidgets.QMessageBox, typemes)( None, "", message, QtWidgets.QMessageBox.Close, QtWidgets.QMessageBox.Close) app.quit() def critical(self, message): self.proc = Process(target=self.runProgress, args=(message, "critical")) self.proc.start() self.proc.join() def warning(self, message): self.proc = Process(target=self.runProgress, args=(message, "warning")) self.proc.start() self.proc.join() class ClProgressDialog: """ ProgressDialog in other process """ homeDir = '/root' def runProgress(self, inQueue, outQueue): signal.signal(signal.SIGINT, sigint_handler) environ['HOME'] = self.homeDir app = QtWidgets.QApplication(sys.argv) progressDialog = ControlledProgressDialog(inQueue, outQueue) progressDialog.exec_() app.quit() setMaximum = None setTextVisible = None setValue = None methods = ["autoClose", "autoReset", "colorCount", "depth", "maximum", "minimum", "minimumDuration", "setLabelText", "setMaximum", "setMinimum", "setMinimumDuration", "setRange", "setValue", "setAutoClose", "setAutoReset", "setWindowTitle", "setCancelButton", "value", "setTextVisible", "adjustSize", "setStyleSheet"] def __init__(self): self.outQueue = Queue() self.finished = False self.inQueue = Queue() Process(target=self.runProgress, args=(self.outQueue, self.inQueue)).start() for method in self.methods: setattr(self, method, self.proxyCall(method)) def proxyCall(self, method): def wrapper(*args, **kwargs): needRet = kwargs.get('needRet', False) self.outQueue.put((method, args, needRet)) if needRet: return self.inQueue.get() return None return wrapper def quit(self): self.outQueue.put(("quit", (), False)) self.finished = True self.outQueue.close() self.inQueue.close() def finish(self): self.quit() def update(self, value): self.setMaximum(100) self.setTextVisible(True) self.setValue(min(value, 99)) except Exception: ControlledProgressDialog = None ClProgressDialog = None ClMessageBox = None