# Copyright (C) 2014 Dignity Health
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# NO CLINICAL USE. THE SOFTWARE IS NOT INTENDED FOR COMMERCIAL PURPOSES
# AND SHOULD BE USED ONLY FOR NON-COMMERCIAL RESEARCH PURPOSES. THE
# SOFTWARE MAY NOT IN ANY EVENT BE USED FOR ANY CLINICAL OR DIAGNOSTIC
# PURPOSES. YOU ACKNOWLEDGE AND AGREE THAT THE SOFTWARE IS NOT INTENDED FOR
# USE IN ANY HIGH RISK OR STRICT LIABILITY ACTIVITY, INCLUDING BUT NOT
# LIMITED TO LIFE SUPPORT OR EMERGENCY MEDICAL OPERATIONS OR USES. LICENSOR
# MAKES NO WARRANTY AND HAS NO LIABILITY ARISING FROM ANY USE OF THE
# SOFTWARE IN ANY HIGH RISK OR STRICT LIABILITY ACTIVITIES.
import gc
import time
import numpy as np # for 32bit-Pipe hack
import traceback
import multiprocessing
import gpi
from gpi import QtCore
from .dataproxy import DataProxy, ProxyType
from .defines import GPI_PROCESS, GPI_THREAD, GPI_APPLOOP
from .logger import manager
from .sysspecs import Specs
# start logger for this module
log = manager.getLogger(__name__)
class ReturnCodes(object):
# Return codes from the functor have specific meaning to the node internals.
InitUIError = 2
ValidateError = 1
Success = 0
ComputeError = -1
def isComputeError(self, ret):
return ret == self.ComputeError
def isValidateError(self, ret):
return ret == self.ValidateError
def isInitUIError(self, ret):
return ret == self.InitUIError
# Return codes from the nodeAPI (i.e. compute(), validate() and initUI())
# are either success or failure for each function.
def isSuccess(self, ret):
return (ret is None) or (ret == 0)
def isError(self, ret):
return (ret is not None) and (ret != 0)
Return = ReturnCodes() # make a global copy
def ExecRunnable(runnable):
tp = QtCore.QThreadPool.globalInstance()
#print 'active threads: ', tp.activeThreadCount()
#print 'expiry timeout: ', tp.expiryTimeout()
#print 'maxThreadCount: ', tp.maxThreadCount()
tp.start(runnable)
class GPIRunnable(QtCore.QRunnable):
def __init__(self, func):
super(GPIRunnable, self).__init__()
self.run = func
self.setAutoDelete(True)
[docs]class GPIFunctor(QtCore.QObject):
'''A common parent API for each execution type (i.e. ATask, PTask, TTask).
Handles the data communications to and from each task type. '''
finished = gpi.Signal(int)
terminated = gpi.Signal()
applyQueuedData_finished = gpi.Signal()
_setData_finished = gpi.Signal()
def __init__(self, node, parent=None):
super(GPIFunctor, self).__init__(parent)
self._node = node
self._title = node.name
self._func = node.getModuleCompute()
self._validate = node.getModuleValidate()
self._retcode = None
self._validate_retcode = None
# for applying data when a GPI_PROCESS is finished
# this is done in a thread to keep the GUI responsive
self._applyData_thread = None
self._setData_finished.connect(self.applyQueuedData_finalMatter)
self.applyQueuedData_finished.connect(self.finalMatter)
self._ap_st_time = 0
# flag for segmented types that need reconstitution on this side
self._segmentedDataProxy = False
# For Windows just make them all apploops for now to be safe
self._execType = node._nodeIF.execType()
if Specs.inWindows() and (self._execType == GPI_PROCESS):
#if (self._execType == GPI_PROCESS):
log.info("init(): <<< WINDOWS Detected >>> Forcing GPI_PROCESS -> GPI_THREAD")
self._execType = GPI_THREAD
#self._execType = GPI_APPLOOP
self._label = node._nodeIF.getLabel()
self._isTerminated = False
self._compute_start = 0
self._manager = None
self._proxy = None
self._proc = None
if self._execType == GPI_PROCESS:
log.debug("init(): set as GPI_PROCESS: "+str(self._title))
self._manager = multiprocessing.Manager()
self._proxy = self._manager.list()
self._proc = PTask(self._func, self._title, self._label, self._proxy)
# apply data in a thread to make the GUI more responsive
self._applyData_thread = GPIRunnable(self.applyQueuedData_setData)
elif self._execType == GPI_THREAD:
log.debug("init(): set as GPI_THREAD: "+str(self._title))
self._proc = TTask(self._func, self._title, self._label, self._proxy)
else: # default to GPI_APPLOOP
log.debug("init(): set as GPI_APPLOOP: "+str(self._title))
self._proc = ATask(self._func, self._title, self._label, self._proxy)
self._proc.finished.connect(self.computeFinished)
self._proc.terminated.connect(self.computeTerminated)
def execType(self):
return self._execType
def terminate(self):
self._isTerminated = True
self.cleanup()
self._proc.terminate()
# self.wait() # so that something is waiting
self.computeTerminated()
def cleanup(self):
# make sure the proxy manager for processes is shutdown.
if self._manager:
self._manager.shutdown()
# try to minimize leftover memory from the segmented array transfers
# force cleanup of mmap
#if self._segmentedDataProxy:
gc.collect()
def curTime(self):
return time.time() - self._compute_start
def start(self):
self._compute_start = time.time()
# VALIDATE
# temporarily trick all widget calls to use GPI_APPLOOP for validate()
tmp_exec = self._execType
self._execType = GPI_APPLOOP
try:
self._validate_retcode = self._validate()
except:
self._validate_retcode = 1 # validate error
self._execType = tmp_exec
# None as zero
# send validate() return code thru same channels
if self._validate_retcode != 0 and self._validate_retcode is not None:
self._node.appendWallTime(time.time() - self._compute_start)
self.finished.emit(1) # validate error
return
# COMPUTE
if self._execType == GPI_PROCESS:
log.debug("start(): buffer process parms")
self._node._nodeIF.bufferParmSettings()
# keep objects on death-row from being copied into processes
# before they've finally terminated. -otherwise they'll try
# and terminate within child process and cause a fork error.
log.debug('start(): garbage collect before spawning GPI_PROCESS')
gc.collect()
log.debug("start(): call task.start()")
self._proc.start()
def wait(self):
self._proc.wait()
def isRunning(self):
return self._proc.isRunning()
def returnCode(self):
return self._retcode
# GPI_PROCESS support
def addToQueue(self, item):
# add internal calls (port, widget, retcode...)
# to a queue that is processed after compute()
self._proc._proxy.append(item)
def computeTerminated(self):
self.terminated.emit()
def computeFinished(self):
if self._execType == GPI_PROCESS:
self.applyQueuedData()
else:
self._retcode = 0 # success
if Return.isError(self._proc._retcode):
self._retcode = Return.ComputeError
self.finalMatter()
def finalMatter(self):
log.info("computeFinished():Node \'"+str(self._title)+"\': compute time:"+str(time.time() - self._compute_start)+" sec.")
self._node.appendWallTime(time.time() - self._compute_start)
self.finished.emit(self._retcode) # success
def applyQueuedData_setData(self):
for o in self._proxy:
try:
log.debug("applyQueuedData_setData(): apply object "+str(o[0])+', '+str(o[1]))
if o[0] == 'setData':
# DataProxy is used for complex data types like numpy
if type(o[2]) is DataProxy:
# segmented types must be gathered before reassembly
if o[2].isSegmented():
log.debug("seg proxy is True")
self._segmentedDataProxy = True
else:
log.debug("o[2].getData()")
self._node.setData(o[1], o[2].getData())
# all other simple types get set directly
else:
log.debug("direct setData()")
self._node.setData(o[1], o[2])
except:
log.error("applyQueuedData() failed. "+str(traceback.format_exc()))
self._retcode = Return.ComputeError
self._setData_finished.emit()
# Assemble Segmented Data
if self._segmentedDataProxy:
log.warn("Using segmented data proxy...")
# group all segmented types
oportData = [ o for o in self._proxy if (o[0] == 'setData') and (type(o[2]) is DataProxy) ]
# take only those that are segmented
oportData = [ o for o in oportData if o[2].isSegmented() ]
# consolidate all outports with large data
largeports = set([ o[1] for o in oportData ])
for port in largeports:
log.info("applyQueuedData(): ------ APPENDING SEGMENTED PROXY OBJECTS")
# gather port segs
curport = [o for o in oportData if o[1] == port]
# gather all DataProxy segs
segs = [ o[2] for o in curport ]
buf = DataProxy().getDataFromSegments(segs)
# if the pieces fail to assemble move on
if buf is None:
log.warn("applyQueuedData(): segmented proxy object failed to assemble, skipping...")
continue
self._node.setData(port, buf)
# run self.applyQueuedData_finalMatter()
self._setData_finished.emit()
def applyQueuedData(self):
# Replay all external compute() events after execution.
# This ensures that the target is not being used by another set method.
self._ap_st_time = time.time()
if self._isTerminated:
return
log.debug("applyQueuedData(): Sending data to main loop...")
if len(self._proxy) == 0:
log.debug("applyQueuedData(): no data in output queue. Terminated.")
self.computeTerminated()
return
self._segmentedDataProxy = False
for o in self._proxy:
try:
log.debug("applyQueuedData(): apply object "+str(o[0])+', '+str(o[1]) )
if o[0] == 'retcode':
self._retcode = o[1]
if Return.isError(self._retcode):
self._retcode = Return.ComputeError
else:
self._retcode = 0 # squash Nones
if o[0] == 'modifyWdg':
self._node.modifyWdg(o[1], o[2])
if o[0] == 'setReQueue':
self._node.setReQueue(o[1])
except:
log.error("applyQueuedData() failed. "+str(traceback.format_exc()))
self._retcode = Return.ComputeError
# transfer all setData() calls to a thread
log.debug("applyQueuedData(): run _applyData_thread")
ExecRunnable(self._applyData_thread)
def applyQueuedData_finalMatter(self):
if Return.isComputeError(self._retcode):
self.finished.emit(self._retcode)
elapsed = (time.time() - self._ap_st_time)
log.info("applyQueuedData(): time (total queue): "+str(elapsed)+" sec")
# shutdown the proxy manager
self.cleanup()
# start self.finalMatter
self.applyQueuedData_finished.emit()
[docs]class PTask(multiprocessing.Process, QtCore.QObject):
'''A forked process node task. Memmaps are used to communicate data.
NOTE: The process-type has to be checked periodically to see if its alive,
from the spawning process.
'''
finished = gpi.Signal()
terminated = gpi.Signal()
def __init__(self, func, title, label, proxy):
multiprocessing.Process.__init__(self)
QtCore.QObject.__init__(self)
self._func = func
self._title = title
self._label = label
self._proxy = proxy
self._cnt = 0
# Since we don't know when the process finishes
# probe at regular intervals.
# -it would be nicer to have the process check-in with the GPI
# main proc when its done.
self._timer = QtCore.QTimer()
self._timer.timeout.connect(self.checkProcess)
self._timer.start(10) # 10msec update
def run(self):
# This try/except is only good for catching compute() exceptions
# not run() terminations.
try:
self._proxy.append(['retcode', self._func()])
except:
log.error('PROCESS: \''+str(self._title)+'\':\''+str(self._label)+'\' compute() failed.\n'+str(traceback.format_exc()))
self._proxy.append(['retcode', Return.ComputeError])
def terminate(self):
self._timer.stop()
super(PTask, self).terminate()
def wait(self):
self.join()
def isRunning(self):
return self.is_alive()
def retcodeExists(self):
for o in self._proxy:
if o[0] == 'retcode':
return True
return False
def checkProcess(self):
if self.is_alive():
return
# else if its not alive:
self._timer.stop()
if self.retcodeExists():
# we assume its termination was deliberate.
self.finished.emit()
else:
self.terminated.emit()
[docs]class TTask(QtCore.QThread):
'''A QThread based node runner. Data is communicated directly.
NOTE: The thread-type emits a signal when its finished:
gpi.Signal.finished()
gpi.Signal.terminated()
'''
def __init__(self, func, title, label, proxy):
super(TTask, self).__init__()
self._func = func
self._title = title
self._label = label
self._proxy = proxy
self._retcode = None
# allow thread to terminate immediately
# NOTE: doesn't seem to work
self.setTerminationEnabled(True)
def terminate(self):
# Threads don't die as well as processes right now,
# so just let them run off in the background.
log.warn("WARNING: Terminated QThread-Node is backgrounded as a zombie.")
self.exit() # terminate when finished
def run(self):
# This try/except is only good for catching compute() exceptions
# not run() terminations.
try:
self._retcode = self._func()
log.info("TTask _func() finished")
except:
log.error('THREAD: \''+str(self._title)+'\':\''+str(self._label)+'\' compute() failed.\n'+str(traceback.format_exc()))
self._retcode = Return.ComputeError
[docs]class ATask(QtCore.QObject):
'''App-Loop or Main-Loop executed task. This will block GUI updates. Data
is communicated directly.
NOTE: The apploop-type blocks until finished, obviating the need for
signals or timer checks
'''
finished = gpi.Signal()
terminated = gpi.Signal()
def __init__(self, func, title, label, proxy):
super(ATask, self).__init__()
self._func = func
self._title = title
self._label = label
self._proxy = proxy
self._cnt = 0
def run(self):
# This try/except is only good for catching compute() exceptions
# not run() terminations.
try:
self._retcode = self._func()
except:
log.error('APPLOOP: \''+str(self._title)+'\':\''+str(self._label)+'\' compute() failed.\n'+str(traceback.format_exc()))
self._retcode = Return.ComputeError
def terminate(self):
pass # can't happen b/c blocking mainloop
def wait(self):
pass # can't happen b/c blocking mainloop
def isRunning(self):
pass # can't happen b/c blocking mainloop
def quit(self):
pass
def start(self):
self.run()
self.finished.emit()