Source code for gpi.port

#    Copyright (C) 2014  Dignity Health
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Lesser General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Lesser General Public License for more details.
#
#    You should have received a copy of the GNU Lesser General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
#    NO CLINICAL USE.  THE SOFTWARE IS NOT INTENDED FOR COMMERCIAL PURPOSES
#    AND SHOULD BE USED ONLY FOR NON-COMMERCIAL RESEARCH PURPOSES.  THE
#    SOFTWARE MAY NOT IN ANY EVENT BE USED FOR ANY CLINICAL OR DIAGNOSTIC
#    PURPOSES.  YOU ACKNOWLEDGE AND AGREE THAT THE SOFTWARE IS NOT INTENDED FOR
#    USE IN ANY HIGH RISK OR STRICT LIABILITY ACTIVITY, INCLUDING BUT NOT
#    LIMITED TO LIFE SUPPORT OR EMERGENCY MEDICAL OPERATIONS OR USES.  LICENSOR
#    MAKES NO WARRANTY AND HAS NO LIABILITY ARISING FROM ANY USE OF THE
#    SOFTWARE IN ANY HIGH RISK OR STRICT LIABILITY ACTIVITIES.


from gpi import QtCore, QtGui

# gpi
from .defines import PortTYPE, InPortTYPE, OutPortTYPE, REQUIRED, OPTIONAL
from .defines import GPI_PORT_EVENT, stw
from .defines import getKeyboardModifiers, printMouseEvent
from .logger import manager

# start logger for this module
log = manager.getLogger(__name__)


[docs]class Port(QtGui.QGraphicsItem): '''The base-class of the Node InPorts and OutPorts. This is responsible for Tool tips, serialization, drawing, painting, and type matching for connectivity. ''' Type = PortTYPE def __init__(self, nodeWidget, CanvasBackend, portTitle, portNum, intype=None, dtype=None, ndim=None, menuWidget=None): super(Port, self).__init__() self._id = None self.setID() self.node = nodeWidget self.menuWidget = menuWidget # a reference to a module menu widget self.graph = CanvasBackend self.edgeList = [] self.newPos = QtCore.QPointF() self.setFlag(QtGui.QGraphicsItem.ItemSendsGeometryChanges) self.setCacheMode(QtGui.QGraphicsItem.DeviceCoordinateCache) self.setZValue(3) ## TYPE info self._GPIType = intype # basic data type # for NPY arrays self._dtype = dtype # array data type self._ndim = ndim self._obligation = None # inport use only # a link to the data self._data = None # position self.portNum = portNum self.portTitle = portTitle if portTitle is None: self.portTitle = str(portNum) # port shape self.largenessFact = 3.0 self.pointsCoord = [[0.0, 0.0], [7.0, 0.0], [3.5, 5]] self.setTransformOriginPoint(3.5, 5) if isinstance(self, OutPort): self.setTransformOriginPoint(3.5, 0) # self.portShape = trianglePolygon = QtGui.QPolygonF() self.portShape = QtGui.QPolygonF() for i in self.pointsCoord: self.portShape.append(QtCore.QPointF(i[0], i[1])) # box # self.pointsCoord_canvasConnect = [[0.0, 0.0], [7.0, 0.0], [7.0, 5], [0.0, 5]] # bowtie #self.pointsCoord_canvasConnect = [[0.0, 0.0], [7.0, 0.0], [3.5, 2.5], [0.0, 5.0], [7.0, 5.0], [3.5, 2.5]] #self.portShape_canvasConnect = QtGui.QPolygonF() #for i in self.pointsCoord_canvasConnect: # self.portShape_canvasConnect.append(QtCore.QPointF(i[0], i[1])) # box # self.pointsCoord_canvasConnect = [[0.0, 0.0], [7.0, 0.0], [7.0, 5], [0.0, 5]] # bowtie self.pointsCoord_memSave = [[0.0, 0.0], [7.0, 0.0], [3.5, 2.5], [0.0, 5.0], [7.0, 5.0], [3.5, 2.5]] self.portShape_memSave = QtGui.QPolygonF() for i in self.pointsCoord_memSave: self.portShape_memSave.append(QtCore.QPointF(i[0], i[1])) self.setCursor(QtCore.Qt.CrossCursor) self.setAcceptHoverEvents(True) self._beingHovered = False self.updateToolTip() # src or sink self._canvasConnect = False # save memory self._savemem = False def triggerHoverLeaveEvent(self): e = QtCore.QEvent(QtCore.QEvent.GraphicsSceneHoverLeave) self.hoverLeaveEvent(e) def BeingHovered(self): return self._beingHovered def allowsCyclicConn(self): '''Abstract function ''' return False def resetPos(self): if isinstance(self, InPort): self.setPos(-8 + 8 * self.portNum, -12) self.updateEdges() if isinstance(self, OutPort): h = self.getNode().getOutPortVOffset() self.setPos(-8 + 8 * self.portNum, h) self.updateEdges() def setPosByPortNum(self, portNum): if isinstance(self, InPort): self.setPos(-8 + 8 * portNum, -12) self.updateEdges() if isinstance(self, OutPort): h = self.getNode().getOutPortVOffset() self.setPos(-8 + 8 * portNum, h) self.updateEdges() def setMemSaver(self, val): self._savemem = val def isMemSaver(self): return self._savemem def toggleMemSaver(self): if self._savemem: self._savemem= False else: self._savemem= True def setCanvasConnect(self, val): self._canvasConnect= val def isCanvasConnected(self): return self._canvasConnect def toggleCanvasConnect(self): if self._canvasConnect: self._canvasConnect = False else: self._canvasConnect = True def setID(self, value=None): if value is None: self._id = id(self) # this will always be unique else: self._id = value def getID(self): return self._id def isWidgetPort(self): if self.menuWidget is None: return False else: return True def GPIType(self): return self._GPIType def updateToolTip(self): # PORTAUTH oblBuf = '' if isinstance(self, InPort): if self.isREQUIRED(): oblBuf = '(REQUIRED)' else: oblBuf = '(OPTIONAL)' if self.allowsCyclicConn(): oblBuf += ' Cyclic' tip = "\'"+str(self.portTitle)+'\': '+oblBuf+'\n' tip += self._GPIType.toolTip_Port() tip += '\n' if isinstance(self, OutPort): tip += '____________________\n' tip += 'Data Info: \n' tip += self._GPIType.toolTip_Data(self.data) self.setToolTip(tip) def scaleUp(self): self.setScale(self.largenessFact) self.update() def resetScale(self): self.setScale(1) self.update() def findOppositePorts(self): ports = [] # for outports, find all appropriate inports if isinstance(self, OutPort): for item in list(self.graph.scene().items()): if isinstance(item, InPort): ports.append(item) # vica-versa for all inports if isinstance(self, InPort): for item in list(self.graph.scene().items()): if isinstance(item, OutPort): ports.append(item) return ports def getName(self): return self.portTitle def getSettings(self): # PORT SETTINGS # get the settings required to re-instantiate this port # PORTAUTH s = {} # Possibly not needed: s['id'] = self.getID() s['porttitle'] = self.portTitle s['porttype'] = self.porttype() # data type # s['intype'] = self._type # s['dtype'] = self._dtype # s['ndim'] = self._ndim s['obligation'] = self._obligation s['portnum'] = self.portNum if self.isWidgetPort(): s['widgetTitle'] = self.menuWidget.getTitle() else: s['widgetTitle'] = None # In loadNetwork() this is the only info used # -the rest is instantiated from ExternalNode() # derived class. s['connections'] = [] for edge in self.edgeList: s['connections'].append(edge.getCoords()) return s def getNodeID(self): return self.node.getID() def hoverEnterEvent(self, event): # Upstream forced hover if isinstance(self, InPort): for edge in self.edgeList: edge.sourcePort().hoverEnterEvent(event) self._beingHovered = True self.setScale(self.largenessFact) self.node.setZValue(101) self.update() for edge in self.edgeList: edge.adjust() edge.update() self.node.graph.viewAndSceneForcedUpdate() def hoverLeaveEvent(self, event): # Upstream forced hover if isinstance(self, InPort): for edge in self.edgeList: edge.sourcePort().hoverLeaveEvent(event) self._beingHovered = False self.setScale(1.0) self.node.setZValue(2) self.update() for edge in self.edgeList: edge.adjust() edge.update() self.node.graph.viewAndSceneForcedUpdate() def type(self): return Port.Type def porttype(self): return Port.Type def addEdge(self, edge): self.edgeList.append(edge) edge.adjust() def detachEdge(self, edge): '''This is a little misleading, it only pops the edge from a port\'s edgelist''' self.triggerHoverLeaveEvent() for i in range(len(self.edgeList)): if self.edgeList[i] == edge: return self.edgeList.pop(i) def edges(self): # PORT return self.edgeList def edgeCount(self): return len(self.edgeList) # all connected nodes and thier connetion directions def getConnectionTuples(self): '''Source and dest only apply to the order in which the user connected the edge. The DAG order is determined by port direction. -append( (src -> sink) ) ''' c = [] for edge in self.edgeList: if isinstance(edge.sourcePort(), OutPort): c.append((edge.sourceNode(), edge.destNode())) else: c.append((edge.destNode(), edge.sourceNode())) return c def getNonCyclicConnectionTuples(self): c = [] for edge in self.edgeList: if not edge.isCyclicConnection(): if isinstance(edge.sourcePort(), OutPort): c.append((edge.sourceNode(), edge.destNode())) else: c.append((edge.destNode(), edge.sourceNode())) return c def getDownstreamNodes(self): # PORT '''Get downstream nodes connected to this port.''' c = [] for edge in self.edgeList: edge.update() # update edge-text if edge.sourcePort() is self: # c[edge.destNode()] = edge.destPort().portTitle c.append([edge.destNode(), edge.destPort().portTitle]) # print edge.destNode().name return c def getNode(self): return self.node def boundingRect(self): adjust = 2.0 maxx = max([i for i, j in self.pointsCoord]) maxy = max([j for i, j in self.pointsCoord]) return QtCore.QRectF((0 - adjust), (0 - adjust), (maxx + adjust), (maxy + adjust)) def shape(self): path = QtGui.QPainterPath() # path.addEllipse(0, 0, 5, 5) path.addPolygon(self.portShape) self.update() return path def paint(self, painter, option, widget): # PORT # choose module color gradient = QtGui.QRadialGradient(-1, -1, 10) if option.state & QtGui.QStyle.State_Sunken: gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.red).lighter(150)) gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.darkRed).lighter(150)) # elif self._beingHovered: # gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.darkRed).lighter(150)) # gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.red).lighter(150)) elif isinstance(self, InPort): # if self.menuWidget: if self.isREQUIRED(): # gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.yellow).lighter(200)) # gradient.setColorAt(1, # QtGui.QColor(QtCore.Qt.darkYellow).lighter(200)) gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.blue).lighter(200)) gradient.setColorAt(1, QtGui.QColor( QtCore.Qt.darkBlue).lighter(200)) # gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.gray).lighter(300)) # gradient.setColorAt(0, # QtGui.QColor(QtCore.Qt.darkGray).lighter(150)) else: gradient.setColorAt(0, QtGui.QColor( QtCore.Qt.green).lighter(200)) gradient.setColorAt(1, QtGui.QColor( QtCore.Qt.darkGreen).lighter(150)) # gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.gray).lighter(150)) # gradient.setColorAt(1, # QtGui.QColor(QtCore.Qt.darkGray).lighter(100)) elif isinstance(self, OutPort): # if self.menuWidget: # orange=QtGui.QColor().fromRgbF(1.,0.5,0.) # gradient.setColorAt(0, orange.lighter(200)) # gradient.setColorAt(1, orange.lighter(100)) # gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.blue).lighter(300)) # gradient.setColorAt(1, # QtGui.QColor(QtCore.Qt.darkBlue).lighter(300)) if self.dataIsNone(): gradient.setColorAt(1, QtGui.QColor( QtCore.Qt.red).lighter(300)) gradient.setColorAt(0, QtGui.QColor( QtCore.Qt.darkRed).lighter(150)) elif self.dataHasChanged(): # gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.gray).lighter(200)) # gradient.setColorAt(0, # QtGui.QColor(QtCore.Qt.darkGray).lighter(100)) gradient.setColorAt(1, QtGui.QColor( QtCore.Qt.blue).lighter(200)) gradient.setColorAt(0, QtGui.QColor( QtCore.Qt.darkBlue).lighter(200)) else: gradient.setColorAt(1, QtGui.QColor( QtCore.Qt.yellow).lighter(200)) gradient.setColorAt(0, QtGui.QColor( QtCore.Qt.darkYellow).lighter(150)) # else: # gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.blue).lighter(175)) # gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.darkBlue).lighter(175)) # gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.gray).lighter(150)) # gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.darkGray).lighter(100)) # if self.dataIsNone(): # gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.red).lighter(150)) # gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.darkRed).lighter(100)) # elif self.dataHasChanged(): # gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.gray).lighter(150)) # gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.darkGray).lighter(100)) # else: # gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.yellow).lighter(200)) # gradient.setColorAt(1, # QtGui.QColor(QtCore.Qt.darkYellow).lighter(150)) # draw module box (apply color) painter.setBrush(QtGui.QBrush(gradient)) #painter.setPen(QtGui.QPen(QtCore.Qt.black, 0)) fade = QtGui.QColor(QtCore.Qt.black) fade.setAlpha(50) painter.setPen(QtGui.QPen(fade, 0)) # if isinstance(self,InPort): # if self.isREQUIRED(): # painter.setPen(QtGui.QPen(QtCore.Qt.red, 0)) # else: # painter.setPen(QtGui.QPen(QtCore.Qt.black, 0)) # #painter.setPen(QtGui.QPen(QtCore.Qt.yellow, 0)) if self.isMemSaver(): painter.drawPolygon(self.portShape_memSave) else: if self.menuWidget: painter.drawEllipse(1, 0, 5, 5) else: painter.drawPolygon(self.portShape) def updateEdges(self): for edge in self.edgeList: edge.adjust() def mousePressEvent(self, event): # PORT printMouseEvent(self, event) modifiers = getKeyboardModifiers() self.update() # update port color/size/etc modmidbutton_event = (event.button() == QtCore.Qt.LeftButton and modifiers == QtCore.Qt.AltModifier) if event.button() == QtCore.Qt.MidButton or modmidbutton_event: event.accept() elif event.button() == QtCore.Qt.LeftButton: event.accept() elif event.button() == QtCore.Qt.RightButton: event.accept() else: event.ignore() super(Port, self).mousePressEvent(event) def mouseReleaseEvent(self, event): # PORT printMouseEvent(self, event) self.update() # update port color/size/etc if event.button() == QtCore.Qt.MidButton: event.accept() elif event.button() == QtCore.Qt.LeftButton: event.accept() elif event.button() == QtCore.Qt.RightButton: event.accept() else: event.ignore() super(Port, self).mouseReleaseEvent(event) def advance(self): if self.newPos == self.pos(): return False self.setPos(self.newPos) return True def checkDataType(self, indata): # PORTAUTH # This checks the inbound data type against the porttype while a # connection is being made or when new data is passed downstream. return self._GPIType.matchesData(indata) def checkTypeAgainstPort(self, uport): # PORTAUTH # This check verfies a connection between two ports # when the user is drawing a connection or when a port # is clicked and other connectable ports are highlighted. if uport: return self._GPIType.matchesType(uport._GPIType) # port doesn't exist, not sure why this is being tested. return False
[docs]class InPort(Port): '''Defines the specific behavior for connecting to inports i.e. how to check for upstream data types, and obligation. ''' PortType = InPortTYPE def __init__(self, nodeWidget, CanvasBackend, title, portNum, intype=None, dtype=None, ndim=None, obligation=REQUIRED, menuWidget=None, cyclic=False): self._cyclic = cyclic super(InPort, self).__init__(nodeWidget, CanvasBackend, title, portNum, intype, dtype, ndim, menuWidget) # REQUIRED or OPTIONAL type self._obligation = obligation def edge(self): '''InPorts only have one edge''' if len(self.edgeList): return self.edgeList[0] def allowsCyclicConn(self): return self._cyclic def findMatchingOutPorts(self): ports = self.findOppositePorts() matching_ports = [] # just ignore if its already connected if len(self.edges()): return matching_ports if len(ports): for port in ports: # skip ports on the same node if port.node == self.node: continue # check data if available if port.data is not None: if self.checkDataType(port.data): matching_ports.append(port) # if not, just check against port spec else: # call checker from inport since they are # the limiting factor if self.checkTypeAgainstPort(port): matching_ports.append(port) return matching_ports def incomingDataTypeMatches(self): indata = self.getUpstreamData() if indata is not None: return self.checkDataType(indata) else: return False def checkUpstreamPortType(self): uport = self.getUpstreamPort() if uport.data is not None: return self.checkDataType(uport.data) else: return self.checkTypeAgainstPort(uport) def type(self): # generic object type return InPort.Type def porttype(self): return InPort.PortType def getUpstreamData(self): # first check if there is any data if len(self.edgeList) > 0: return self.edgeList[0].sourcePort().data else: return None def getUpstreamPort(self): # first check if there is any data if len(self.edgeList) > 0: return self.edgeList[0].sourcePort() else: return None def getUpstreamNode(self): '''Return node ref in a list. ''' c = [] if self.getUpstreamPort(): c.append(self.getUpstreamPort().getNode()) return c def setREQUIRED(self): self._obligation = REQUIRED log.debug("set REQUIRED") return self def setOPTIONAL(self): self._obligation = OPTIONAL log.debug("set OPTIONAL") return self def isOPTIONAL(self): return self._obligation == OPTIONAL def isREQUIRED(self): return self._obligation == REQUIRED
[docs]class OutPort(Port): '''Defines the specific behavior for connecting to outports i.e. how to check for downstream data types for scaling downstream ports on the canvas to highlight potentially valid connections. ''' PortType = OutPortTYPE def __init__(self, nodeWidget, CanvasBackend, title, portNum, intype=None, dtype=None, ndim=None, obligation=None, menuWidget=None): super(OutPort, self).__init__(nodeWidget, CanvasBackend, title, portNum, intype, dtype, ndim, menuWidget) self.data_changed = True # don't start in warning state def setDataCalled(self, called=True): self.data_changed = called def dataHasChanged(self): '''Used for setting port color and signaling downstream events. ''' return self.data_changed def dataIsNone(self): return (self.data is None) def findMatchingInPorts(self): ports = self.findOppositePorts() matching_ports = [] if len(ports): # check data if available if not self.dataIsNone(): for port in ports: # skip ports on the same node if port.node == self.node: continue if not len(port.edges()): # dont consider ports with connections if port.checkDataType(self.data): matching_ports.append(port) # if not, just check against port spec else: for port in ports: # skip ports on the same node if port.node == self.node: continue if not len(port.edges()): # dont consider ports with connections # call checker from inport since they are # the limiting factor if port.checkTypeAgainstPort(self): matching_ports.append(port) return matching_ports def type(self): return OutPort.Type def porttype(self): return OutPort.PortType # return a read-only ptr @property def data(self): return self._data def freeDataQuietly(self): '''Quietly free (remove ref) the data causing no new events. All connected downstream nodes must run before this is freed. ''' self._data = None def getDataString(self): # PORTAUTH return self._GPIType.edgeTip(self._data) def setWidgetData(self): # widget-ports val = self.menuWidget.get_val() # This comparison could be an expensive operation # not sure if ids are checked first. if self._data != val: if self.setData(val): # check val against port type return True return False # data didn't change def setData(self, out): # PORTAUTH # enforce port type if self.checkDataType(out): self._data = self._GPIType.setDataAttr(out) self.setDataCalled() return True # SUCCESS elif out is None: #log.warn("setData(): OutPort unchanged.") self._data = None #self.setDataCalled() #return True # SUCCESS else: log.warn("setData(\'"+stw(self.portTitle)+"\',...): OutPort type doesn't match data, unchanged.") self.setDataCalled(False) return False def setDownstreamEvents(self): '''Look at downstream Nodes and flag pending_event regardless of obligation. -Finds inports connected to THIS outport. ''' for nodeObj in self.getDownstreamNodes(): # nodeObj[node,portTitle] # if self.isWidgetPort(): # print "setDownstreamEvents(): Send widget-event down to: "+node.name # else: # print "setDownstreamEvents(): Send event down to: "+node.name nodeObj[0].setEventStatus({GPI_PORT_EVENT: nodeObj[1]})