Source code for gpi.stateMachine

#    Copyright (C) 2014  Dignity Health
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Lesser General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Lesser General Public License for more details.
#
#    You should have received a copy of the GNU Lesser General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#    NO CLINICAL USE.  THE SOFTWARE IS NOT INTENDED FOR COMMERCIAL PURPOSES
#    AND SHOULD BE USED ONLY FOR NON-COMMERCIAL RESEARCH PURPOSES.  THE
#    SOFTWARE MAY NOT IN ANY EVENT BE USED FOR ANY CLINICAL OR DIAGNOSTIC
#    PURPOSES.  YOU ACKNOWLEDGE AND AGREE THAT THE SOFTWARE IS NOT INTENDED FOR
#    USE IN ANY HIGH RISK OR STRICT LIABILITY ACTIVITY, INCLUDING BUT NOT
#    LIMITED TO LIFE SUPPORT OR EMERGENCY MEDICAL OPERATIONS OR USES.  LICENSOR
#    MAKES NO WARRANTY AND HAS NO LIABILITY ARISING FROM ANY USE OF THE
#    SOFTWARE IN ANY HIGH RISK OR STRICT LIABILITY ACTIVITIES.


import gpi
from gpi import QtCore
from .logger import manager

# start logger for this module
log = manager.getLogger(__name__)


[docs]class GPIState(QtCore.QObject): """A single FSM state to be used with the GPI_FSM class. It manages its transitions, entry and exit functions. """ entered = gpi.Signal() exited = gpi.Signal() def __init__(self, name, func, machine=None, efunc=None): super(GPIState, self).__init__() if not isinstance(name, str): msg = "expecting str arg: GPIState.__init__(>str<,func,GPI_FSM)" log.critical(msg) if machine: if not isinstance(machine, GPI_FSM): msg = "expecting GPI_FSM arg: GPIState.__init__(str,func,>GPI_FSM<)" log.critical(msg) self._name = str(name) self._transitions = {} self._func = func # on entry self._efunc = efunc # on exit if machine: machine.addState(self) def addTransition(self, sig, state): if not isinstance(sig, str): msg = "expecting str arg: GPIState.addTransition(>str<,GPIState)" log.critical(msg) self._transitions[sig] = state @property def name(self): return self._name def transitions(self): return self._transitions def onEntry(self, sig): self.entered.emit() if self._func: self._func(sig) def onExit(self, sig): self.exited.emit() if self._efunc: self._efunc(sig)
[docs]class GPI_FSM(QtCore.QObject): """The GPI Canvase and Nodes operate within different states. The finite state machine allows for fast and easy checking to determine whether the user has performed an invalid operation or not. """ switched = gpi.Signal() def __init__(self, name=''): super(GPI_FSM, self).__init__() self._name = name self._states = [] self._cur_state = None @property def curState(self): return self._cur_state @property def curStateName(self): return self._cur_state.name @property def name(self): return self._name def next(self, dsig): '''Validate input signal as part of _cur_state, then switch to indicated state.''' if isinstance(dsig, str): sig = str(dsig) dsig = sig elif isinstance(dsig, str): sig = str(dsig) dsig = sig elif isinstance(dsig, dict): if 'sig' in dsig: if isinstance(dsig['sig'], str) or isinstance(dsig['sig'], str): sig = str(dsig['sig']) dsig['sig'] = sig else: msg = "expecting str in dict[\'sig\'] in arg: GPI_FSM(" + self._name + ").next(>str<)" log.critical(msg) else: msg = "expecting str key \'sig\' in arg: GPI_FSM(" + self._name + ").next(>str<)" log.critical(msg) return else: msg = "expecting str (or str in dict[\'sig\']) arg: GPI_FSM(" + \ self._name + ").next(>str<) type:" + str(type(dsig)) log.critical(msg) return if sig in self._cur_state.transitions(): self._cur_state.onExit(dsig) self._cur_state = self._cur_state.transitions()[sig] msg = "GPI_FSM(" + self._name + "):next(): Switched to state(" + \ self._cur_state.name + ")" log.debug(msg) self._cur_state.onEntry(dsig) self.switched.emit() else: msg = "GPI_FSM(" + self._name + "):next(): state(" + self._cur_state.name + \ ") has no transition: \'" + str( sig) + "\'\n" + str(self._cur_state.transitions()) log.debug(msg) def addState(self, state): if state in self._states: msg = "EMPHATIC WARNING!!!: GPI_FSM(" + self._name + "):addState(): Warning, state(" + state.name \ + ") already exists, skipping..." log.critical(msg) else: self._states.append(state) def start(self, state): if state in self._states: self._cur_state = state msg = "GPI_FSM(" + self._name + \ "):start(): in state(" + state.name + ")" log.debug(msg) self._cur_state.onEntry('init') else: msg = "GPI_FSM(" + self._name + "):start(): ERROR, state(" + state.name \ + ") state not in list, can't start GPI_FSM!" log.critical(msg)