# Copyright (C) 2014 Dignity Health
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# NO CLINICAL USE. THE SOFTWARE IS NOT INTENDED FOR COMMERCIAL PURPOSES
# AND SHOULD BE USED ONLY FOR NON-COMMERCIAL RESEARCH PURPOSES. THE
# SOFTWARE MAY NOT IN ANY EVENT BE USED FOR ANY CLINICAL OR DIAGNOSTIC
# PURPOSES. YOU ACKNOWLEDGE AND AGREE THAT THE SOFTWARE IS NOT INTENDED FOR
# USE IN ANY HIGH RISK OR STRICT LIABILITY ACTIVITY, INCLUDING BUT NOT
# LIMITED TO LIFE SUPPORT OR EMERGENCY MEDICAL OPERATIONS OR USES. LICENSOR
# MAKES NO WARRANTY AND HAS NO LIABILITY ARISING FROM ANY USE OF THE
# SOFTWARE IN ANY HIGH RISK OR STRICT LIABILITY ACTIVITIES.
import math
from gpi import QtCore, QtGui
# gpi
from .defaultTypes import GPITYPE_PASS
from .defines import MacroNodeEdgeType, EdgeNodeType, PortEdgeType, GPI_APPLOOP
from .defines import printMouseEvent, getKeyboardModifiers, OPTIONAL
from .defines import isMacroChildNode
from .layoutWindow import LayoutMaster
from .logger import manager
from .nodeAPI import NodeAPI
from .node import Node
# start logger for this module
log = manager.getLogger(__name__)
[docs]class EdgeNode(QtGui.QGraphicsItem):
'''The EdgeNode simply forwards port connections. It can take one OutPort
connection, and multiple InPort connections. InPort tooltips and
enforcement are concatenated and forwarded.
'''
Type = EdgeNodeType
def __init__(self, portNum, parentItem):
super(EdgeNode, self).__init__()
self.setParentItem(parentItem)
# position
self.portNum = portNum
# attached ports
self.portList = []
def type(self):
return self.Type
[docs]class MacroAPI(NodeAPI):
'''A class that has default operations that stub-out the NodeAPI.
'''
def initUI(self):
pass
# self._layoutWindow = canvasGraph.LayoutMaster(self.node.graph, 0)
# self.layout.addWidget(self._layoutWindow, 0, 0)
def execType(self):
return GPI_APPLOOP
def validate(self):
'''Copy all inputs to outputs.
'''
return 0
def compute(self):
'''Copy all inputs to outputs.
'''
for i in range(self.node._numPorts):
self.setData('out' + str(i + 1), self.getData('in' + str(i + 1)))
return 0
[docs]class PortEdge(Node):
'''The PortEdge holds EdgeNodes for the MacroNode (they are the two node
like pieces that bound the macro and the single collapsed macro
representation).
'''
Type = PortEdgeType
def __init__(self, macroParent, graph, menu=None, nodeIF=None, nodeIFscroll=None, role=None):
super(PortEdge, self).__init__(graph, nodeMenuClass=menu, nodeCatItem=None, nodeIF=nodeIF, nodeIFscroll=nodeIFscroll)
self._roles = ['Input', 'Output', 'Macro']
self._role = self._roles[role]
self.name = self._role
self._label = ''
self._title_delimiter = ': '
self._macro_prefix = '*'
self._macroParent = macroParent
self._isMacroNode = False
if self._macroParent:
self._isMacroNode = True
# code name for network loader to ignore
self._moduleName = '__GPIMacroNode__'
self._macroEdges = []
# Ports
if self._role == 'Macro':
self._numPorts = 0
else:
self._numPorts = 16
for i in range(self._numPorts):
self.addInPort('in' + str(i + 1), GPITYPE_PASS, obligation=OPTIONAL)
self.addOutPort('out' + str(i + 1), GPITYPE_PASS)
self._nodeIF.setTitle(self.name)
def menu(self):
self._macroParent._scrollArea_layoutWindow.show()
self._macroParent._scrollArea_layoutWindow.raise_()
# self._macroParent._scrollArea_layoutWindow.activateWindow()
# self._macroParent._layoutWindow.show()
# self._macroParent._layoutWindow.raise_()
self._macroParent._layoutWindow.activateWindow()
def closeMacroLayout(self):
if hasattr(self._nodeIF, '_layoutWindow'): # sibling safe
log.debug("Macro Node issuing a force close on layout")
self._nodeIF._layoutWindow.forceClose()
# return self._nodeIF._layoutWindow.close()
# return True
def getSiblingNodes(self):
return self._macroParent.getNodes()
def macroParent(self):
return self._macroParent
def itemChange(self, change, value):
if change == QtGui.QGraphicsItem.ItemPositionHasChanged:
for edge in self._macroEdges:
edge.adjust()
return super(PortEdge, self).itemChange(change, value)
def type(self):
return self.Type
def addMacroEdge(self, edge):
self._macroEdges.append(edge)
edge.adjust()
def mousePressEvent(self, event):
printMouseEvent(self, event)
modifiers = getKeyboardModifiers()
self.update() # update node color
modmidbutton_event = (event.button() == QtCore.Qt.LeftButton
and modifiers == QtCore.Qt.AltModifier)
if event.button() == QtCore.Qt.MidButton or modmidbutton_event:
event.accept()
elif event.button() == QtCore.Qt.LeftButton:
event.accept() # this has to accept to be moved
elif event.button() == QtCore.Qt.RightButton:
event.accept()
else:
event.ignore()
super(PortEdge, self).mousePressEvent(event)
def mouseReleaseEvent(self, event):
printMouseEvent(self, event)
modifiers = getKeyboardModifiers()
self.update()
modmidbutton_event = (event.button() == QtCore.Qt.LeftButton
and modifiers == QtCore.Qt.AltModifier)
if event.button() == QtCore.Qt.MidButton or modmidbutton_event:
event.accept()
elif event.button() == QtCore.Qt.LeftButton:
event.accept() # this has to accept to be moved
elif event.button() == QtCore.Qt.RightButton:
event.accept()
# self.menu()
# self.graph.scene().makeOnlyTheseNodesSelected([self])
else:
event.ignore()
super(PortEdge, self).mouseReleaseEvent(event)
def mouseDoubleClickEvent(self, event):
event.accept()
log.debug("double-clicked PortEdge")
self._macroParent.toggleCollapse()
def getTitleWidth(self):
'''Determine how long the module box is.'''
buf = self.getMacroNodeName()
fm = QtGui.QFontMetricsF(self.title_font)
bw = fm.width(buf) + 11.0
bh = fm.height()
return (bw, bh)
def getRoleTitleSize(self):
buf = self._role
fm = QtGui.QFontMetricsF(self.title_font)
bw = fm.width(buf)
bh = fm.height()
return (bw, bh)
def getTitleDelimiterSize(self):
buf = self._title_delimiter
fm = QtGui.QFontMetricsF(self.title_font)
bw = fm.width(buf)
bh = fm.height()
return (bw, bh)
def getNodeWidth(self):
return max(self.getMaxPortWidth(), self.getTitleWidth()[0])
def getMacroNodeName(self):
'''The name is based on the Macro Node's role within the macro
framework.
'''
buf = self._role
if self._role == 'Input':
if self._macroParent._label != '':
buf += self._title_delimiter + self._macroParent._label
elif self._role == 'Output':
if self._macroParent._label != '':
buf += self._title_delimiter + self._macroParent._label
elif self._role == 'Macro':
if self._macroParent._label != '':
buf = self._macro_prefix + self._macroParent._label
return buf
def paint(self, painter, option, widget): # NODE
w = self.getNodeWidth()
# h = self.getTitleWidth()[1]
# draw shadow
painter.setPen(QtCore.Qt.NoPen)
painter.setBrush(QtCore.Qt.darkGray)
painter.drawRoundedRect(-8, -8, w, 20, 3, 3)
# choose module color
gradient = QtGui.QRadialGradient(-10, -10, 40)
# update the face node based on the state of internal nodes.
if self._role == 'Macro':
if self._macroParent.isProcessing():
gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.gray).lighter(70))
gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.darkGray).lighter(70))
elif (option.state & QtGui.QStyle.State_Sunken) or (self._macroParent.inComputeErrorState()):
gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.red).lighter(150))
gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.red).lighter(170))
elif self._macroParent.inValidateErrorState():
gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.yellow).lighter(190))
gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.yellow).lighter(170))
elif self._macroParent.inInitUIErrorState():
gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.red).lighter(150))
gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.yellow).lighter(170))
else:
gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.gray).lighter(150))
gradient.setColorAt(
1, QtGui.QColor(QtCore.Qt.darkGray).lighter(150))
# let the src and sink nodes update themselves normally
else:
conf = self.getCurState()
if self._computeState is conf:
gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.gray).lighter(70))
gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.darkGray).lighter(70))
elif (option.state & QtGui.QStyle.State_Sunken) or (self._computeErrorState is conf):
gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.red).lighter(150))
gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.red).lighter(170))
elif self._validateError is conf:
gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.yellow).lighter(190))
gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.yellow).lighter(170))
else:
gradient.setColorAt(0, QtGui.QColor(QtCore.Qt.gray).lighter(150))
gradient.setColorAt(1, QtGui.QColor(QtCore.Qt.darkGray).lighter(150))
# draw module box (apply color)
painter.setBrush(QtGui.QBrush(gradient))
if self.beingHovered or self.isSelected():
#painter.setPen(QtGui.QPen(QtCore.Qt.red, 1))
fade = QtGui.QColor(QtCore.Qt.red)
fade.setAlpha(100)
painter.setPen(QtGui.QPen(fade, 2))
else:
#painter.setPen(QtGui.QPen(QtCore.Qt.black, 0))
fade = QtGui.QColor(QtCore.Qt.black)
fade.setAlpha(50)
painter.setPen(QtGui.QPen(fade,0))
painter.drawRoundedRect(-10, -10, w, 20, 3, 3)
# title
painter.setPen(QtGui.QPen(QtCore.Qt.black, 0))
painter.setFont(self.title_font)
buf = self.getMacroNodeName()
# paint the node title
painter.drawText(-5, -9, w, 20, (QtCore.Qt.AlignLeft |
QtCore.Qt.AlignVCenter), str(buf))
[docs]class MacroNodeEdge(QtGui.QGraphicsItem):
"""Provides the connection graphic and logic for nodes.
-No enforcement, just methods to retrieve connected nodes.
"""
Type = MacroNodeEdgeType
Pi = math.pi
TwoPi = 2.0 * Pi
def __init__(self, source, dest):
super(MacroNodeEdge, self).__init__()
self.setZValue(0)
self.source = source # PortEdge(pos, self)
self.dest = dest # PortEdge(QtCore.QPointF(0, 30)+pos, self)
self.arrowSize = 20.0
self.penWidth = 10.0
self.sourcePoint = QtCore.QPointF()
self.destPoint = QtCore.QPointF()
self.setFlag(QtGui.QGraphicsItem.ItemSendsGeometryChanges)
self.setCacheMode(QtGui.QGraphicsItem.DeviceCoordinateCache)
self.setAcceptedMouseButtons(QtCore.Qt.NoButton)
self.source.addMacroEdge(self)
self.dest.addMacroEdge(self)
self.adjust()
self.fontHeight = 20
def itemChange(self, change, value):
if change == QtGui.QGraphicsItem.ItemPositionHasChanged:
# call adjustments or updates to position here
pass
return super(MacroNodeEdge, self).itemChange(change, value)
def type(self):
return self.Type
def sourceNode(self):
return self.source
def setSourceNode(self, node):
self.source = node
self.adjust()
def destNode(self):
return self.dest
def setDestNode(self, node):
self.dest = node
self.adjust()
def adjust(self):
if not self.source or not self.dest:
return
# line = QtCore.QLineF(self.mapFromItem(self.source, 0, 0)+self.source.line().p1(),
# self.mapFromItem(self.dest, 0, 0)+self.dest.line().p1())
line = QtCore.QLineF(self.mapFromItem(self.source, 0, 0),
self.mapFromItem(self.dest, 0, 0))
self.prepareGeometryChange()
self.sourcePoint = line.p1()
self.destPoint = line.p2()
def boundingRect(self):
if not self.source or not self.dest:
return QtCore.QRectF()
extra = (self.penWidth + self.arrowSize) / 2.0
extra = max(extra, self.fontHeight)
return QtCore.QRectF(self.sourcePoint,
QtCore.QSizeF(
self.destPoint.x() - self.sourcePoint.x(),
self.destPoint.y() - self.sourcePoint.y())).normalized().adjusted(-extra, -extra, extra, extra)
def paint(self, painter, option, widget):
if not self.source or not self.dest:
return
# Draw the line itself.
line = QtCore.QLineF(self.sourcePoint, self.destPoint)
if line.length() == 0.0:
return
painter.setPen(
QtGui.QPen(QtCore.Qt.gray, self.penWidth, QtCore.Qt.SolidLine,
QtCore.Qt.RoundCap, QtCore.Qt.RoundJoin))
painter.drawLine(line)
# drawing text
x = (line.x1() + line.x2()) / 2.0
y = (line.y1() + line.y2()) / 2.0
xa = (line.x1() - line.x2())
ya = (line.y1() - line.y2())
m = math.sqrt(xa * xa + ya * ya)
a = math.atan2(ya, xa) * 180.0 / math.pi
buf = "Macro"
f = QtGui.QFont("times", 20)
fm = QtGui.QFontMetricsF(f)
bw = fm.width(buf)
bw2 = -bw * 0.5
# bh = fm.height()
# Draw the arrows if there's enough room.
angle = math.acos(line.dx() / line.length())
if line.dy() >= 0:
angle = self.TwoPi - angle
sourceArrowP1 = self.sourcePoint + QtCore.QPointF(
math.sin(angle + self.Pi / 3) * self.arrowSize,
math.cos(angle + self.Pi / 3) * self.arrowSize)
sourceArrowP2 = self.sourcePoint + QtCore.QPointF(
math.sin(angle + self.Pi - self.Pi / 3) * self.arrowSize,
math.cos(angle + self.Pi - self.Pi / 3) * self.arrowSize)
destArrowP1 = self.destPoint + QtCore.QPointF(
math.sin(angle - self.Pi / 3) * self.arrowSize,
math.cos(angle - self.Pi / 3) * self.arrowSize)
destArrowP2 = self.destPoint + QtCore.QPointF(
math.sin(angle - self.Pi + self.Pi / 3) * self.arrowSize,
math.cos(angle - self.Pi + self.Pi / 3) * self.arrowSize)
painter.setBrush(QtCore.Qt.gray)
painter.drawPolygon(
QtGui.QPolygonF([line.p1(), sourceArrowP1, sourceArrowP2]))
painter.drawPolygon(
QtGui.QPolygonF([line.p2(), destArrowP1, destArrowP2]))
# drawing text
painter.setFont(f)
painter.setPen(QtGui.QPen(QtCore.Qt.darkGray, 1))
painter.save()
painter.translate(QtCore.QPointF(x, y))
if m > bw * 1.1:
if a > 90 or a < -90:
painter.rotate(a + 180.0)
painter.drawText(QtCore.QPointF(bw2, -5.0), buf)
else:
painter.rotate(a)
painter.drawText(QtCore.QPointF(bw2, -5.0), buf)
else:
painter.drawText(QtCore.QPointF(bw2, -5.0), '')
painter.restore()
[docs]class MacroNode(object):
'''The MacroNode menu is a stripped down version of the Node class with
the capability of using a "Layout Window" instead of a NodeMenu.
'''
def __init__(self, graph, pos):
self._graph = graph
self._id = id(self)
self._label = ''
self._scrollArea_layoutWindow = None
self._layoutWindow = None
self.newLayoutWindow(config=0)
self._collapsed = False
# for load/paste
self._destined_collapse = False
# connected node groups
self._src_cn = []
self._sink_cn = []
self._encap_nodes = []
self._encap_nodepos = []
self._src = PortEdge(self, graph, MacroAPI, role=0)
self._src.setPos(pos)
# self._sink = PortEdge(self, graph, nodeIF=self._src._nodeIF,
# nodeIFscroll=self._src._nodeIF_scrollArea)
self._sink = PortEdge(self, graph, MacroAPI, role=1)
self._sink.setPos(QtCore.QPointF(0, 100) + pos)
self._sink_pos = QtCore.QPointF(0, 100) + pos
self._face = PortEdge(self, graph, MacroAPI, role=2)
self._face.setPos(pos)
self._face.inportList = []
self._face.outportList = []
# save expanded positions
self._src_exppos = self._src.getPos()
self._sink_exppos = self._sink.getPos()
self._face_colpos = self._face.getPos()
self._macroedge = MacroNodeEdge(self._src, self._sink)
self._graph.scene().addItem(self._src)
self._graph.scene().addItem(self._sink)
self._graph.scene().addItem(self._macroedge)
self._graph.scene().addItem(self._face)
self._face.hide()
self._anim_timeline = None
self._anim = None
self._scrollArea_layoutWindow.setWindowTitle('Macro')
def getSettings(self):
'''Keep all the settings required to instantiate the macro.
-position, connectivity are all kept by normal serialization.
-need collapse, label, layoutWindow.
'''
s = {}
s['id'] = self.getID()
s['label'] = self._label
s['collapse'] = self.isCollapsed()
s['layoutWindow'] = self._layoutWindow.getSettings()
s['src_settings'] = self._src.getSettings()
#s['src_pos'] = self._src_exppos
s['sink_settings'] = self._sink.getSettings()
#s['sink_pos'] = self._sink_exppos
s['face_settings'] = self._face.getSettings()
#s['face_pos'] = self._face_colpos
x = self._sink_exppos[0] - self._src_exppos[0]
y = self._sink_exppos[1] - self._src_exppos[1]
s['rel_jaw_pos'] = [x, y]
if self.isCollapsed():
# save relative node positions in dict with id as the key
# ids can be used at deserializing by lookup.
npos = {}
for node, rel in zip(self._encap_nodes, self._encap_nodepos):
npos[str(node.getID())] = [rel.x(), rel.y()]
s['nodes_rel_pos'] = npos
return s
def getNodeByID(self, buf, nid):
for item in buf:
if isinstance(item, Node):
if item.getID() == nid:
return item
def loadSettings(self, s, nodeList, pos):
'''load all settings from a description generated by getSettings()
'''
self.setID(s['id'])
self.setLabelWdg(s['label'])
self.setLabel(s['label'])
self.newLayoutWindowFromSettings(s['layoutWindow'], nodeList)
if s['collapse']:
x = s['face_settings']['pos'][0] + pos[0]
y = s['face_settings']['pos'][1] + pos[1]
#x = s['src_pos'][0] + pos[0]
#y = s['src_pos'][1] + pos[1]
self._src.setPos(QtCore.QPointF(x, y))
# need relative jaw pos
#x = s['sink_settings']['pos'][0] + pos[0]
#y = s['sink_settings']['pos'][1] + pos[1]
#self._sink.setPos(QtCore.QPoint(x, y))
x += s['rel_jaw_pos'][0]
y += s['rel_jaw_pos'][1]
self._sink.setPos(QtCore.QPointF(x, y))
x = s['face_settings']['pos'][0] + pos[0]
y = s['face_settings']['pos'][1] + pos[1]
self._face.setPos(QtCore.QPointF(x, y))
rel = QtCore.QPointF(x, y)
for nid, epos in list(s['nodes_rel_pos'].items()):
enode = self.getNodeByID(nodeList, int(nid))
if enode:
enode.setPos(rel + QtCore.QPointF(*epos))
log.debug("node found by nid: "+enode.name)
log.debug("pos: ")
log.debug(str(epos))
log.debug(str(rel))
log.debug(str(QtCore.QPointF(*epos)))
log.debug(str(rel + QtCore.QPointF(*epos)))
else:
log.warn("nid not found: "+str(nid))
else:
x = s['src_settings']['pos'][0] + pos[0]
y = s['src_settings']['pos'][1] + pos[1]
self._src.setPos(QtCore.QPointF(x, y))
x = s['sink_settings']['pos'][0] + pos[0]
y = s['sink_settings']['pos'][1] + pos[1]
self._sink.setPos(QtCore.QPointF(x, y))
x = s['src_settings']['pos'][0] + pos[0]
y = s['src_settings']['pos'][1] + pos[1]
self._face.setPos(QtCore.QPointF(x, y))
self._src.setID(s['src_settings']['id'])
self._sink.setID(s['sink_settings']['id'])
self._face.setID(s['face_settings']['id'])
# save expanded positions
self._src_exppos = self._src.getPos()
self._sink_exppos = self._sink.getPos()
self._face_colpos = self._face.getPos()
# self.setCollapse(s['collapse'])
self._destined_collapse = s['collapse']
def setID(self, val=None):
if val is None:
self._id = id(self)
else:
self._id = val
def getID(self):
return self._id
def resetPortParents(self):
'''Return the parent status of each port to their respective _src or
_sink origin nodes.
'''
for port in self._src.inportList:
port.setParentItem(self._src)
port.resetPos()
port.show()
for port in self._sink.outportList:
port.setParentItem(self._sink)
port.resetPos()
port.show()
def showFacePorts(self):
'''Set the parent of each borrowed port to be the face node.
'''
self._face.inportList = []
self._face.outportList = []
for ip, op in zip(self._src.inportList, self._src.outportList):
if ip.edgeCount() or op.edgeCount():
ip.setParentItem(self._face)
ip.setPosByPortNum(len(self._face.inportList))
self._face.inportList.append(ip)
ip.show()
else:
ip.hide()
for op, ip in zip(self._sink.outportList, self._sink.inportList):
if ip.edgeCount() or op.edgeCount():
op.setParentItem(self._face)
op.setPosByPortNum(len(self._face.outportList))
self._face.outportList.append(op)
op.show()
else:
op.hide()
self._face.update()
def toggleCollapse(self):
if self.isCollapsed():
self.setCollapse(False)
else:
self.setCollapse(True)
def isCollapsed(self):
return self._collapsed
def setCollapse(self, val):
self._collapsed = val
if self.isCollapsed():
# only set it if its truly collapsible.
if not self.collapseMacro():
self._collapsed = False
else:
self.expandMacro()
def setCollapse_NoAction(self, val):
'''Just set the flag
'''
self._collapsed = val
def shouldCollapse(self):
'''for load/save, should this be collapsed?
'''
return self._destined_collapse
def getNodes(self):
return [self._src, self._sink, self._face]
def resetIDs(self):
'''After the node has been loaded and reconnected to its neighbors,
reset the ID for future copy/paste or load/save uniqueness.
'''
self.setID()
self._src.setID()
self._sink.setID()
self._face.setID()
def deleteMacroEdge(self):
'''The Macro-edge connecting the Input and Output nodes.
'''
if self._macroedge.scene():
self._graph.scene().removeItem(self._macroedge)
def closeLayoutWidget(self):
# self._layoutWindow.close()
self._scrollArea_layoutWindow.close()
def readyForDeletion(self):
self.closeLayoutWidget()
self.deleteMacroEdge()
# delete all encapsulated nodes if its collapsed
if self.isCollapsed():
for node in self._encap_nodes:
if node.scene():
self._graph.deleteNode(node)
def newLayoutWindowFromSettings(self, s, nodeList):
'''Copied from canvasGraph.
'''
layoutwindow = LayoutMaster(self._graph, config=s['config'], macro=True, labelWin=True)
layoutwindow.loadSettings(s, nodeList)
layoutwindow.setWindowTitle('Macro')
layoutwindow.wdglabel.textChanged.connect(self.setLabel)
self._scrollArea_layoutWindow = QtGui.QScrollArea()
self._scrollArea_layoutWindow.setWidget(layoutwindow)
self._scrollArea_layoutWindow.setWidgetResizable(True)
self._scrollArea_layoutWindow.setGeometry(50, 50, 300, 1000)
layoutwindow.setGeometry(50, 50, 400, 300)
self._layoutWindow = layoutwindow
def newLayoutWindow(self, config):
'''Copied from canvasGraph.
'''
layoutwindow = LayoutMaster(self._graph, config=config, macro=True, labelWin=True)
layoutwindow.setWindowTitle('Macro')
layoutwindow.wdglabel.textChanged.connect(self.setLabel)
self._scrollArea_layoutWindow = QtGui.QScrollArea()
self._scrollArea_layoutWindow.setWidget(layoutwindow)
self._scrollArea_layoutWindow.setWidgetResizable(True)
self._scrollArea_layoutWindow.setGeometry(50, 50, 300, 1000)
layoutwindow.setGeometry(50, 50, 400, 300)
self._layoutWindow = layoutwindow
def setLabel(self, lab):
'''Set the internal label buffer and update the display names for each
node and node-menu-window.
'''
self._label = str(lab)
# update the node-menu window title
if self._label != '':
self._scrollArea_layoutWindow.setWindowTitle('Macro: ' + self._label)
else:
self._scrollArea_layoutWindow.setWindowTitle('Macro')
self._src.update()
self._sink.update()
self._face.update()
def setLabelWdg(self, lab):
'''Sets the QLineEdit label directly for re-instantiating purposes.
'''
self._layoutWindow.wdglabel.setText(lab)
def getEncapsulatedNodes(self):
return self._encap_nodes
def listEncapsulatedNodes(self):
'''Searches for all nodes connected to the output of the _src and the
input of the _sink.
'''
all_cn = self._sink_cn + self._src_cn
enc_nodes = []
for cn in all_cn:
if cn[0] != self._src and cn[0] != self._sink:
enc_nodes.append(cn[0])
if cn[1] != self._src and cn[1] != self._sink:
enc_nodes.append(cn[1])
self._encap_nodes = list(set(enc_nodes))
def isProcessing(self):
'''Look at the run status of all nodes in the macro to determine if
macro is running.
'''
for node in self._encap_nodes:
if node.isProcessingEvent():
return True
if self._src.isProcessingEvent():
return True
if self._src.isProcessingEvent():
return True
return False
def inComputeErrorState(self):
'''Look at the run status of all nodes in the macro to determine if
macro is in error.
'''
for node in self._encap_nodes:
if node.inComputeErrorState():
return True
if self._src.inComputeErrorState():
return True
if self._src.inComputeErrorState():
return True
return False
def inInitUIErrorState(self):
'''Look at the run status of all nodes in the macro to determine if
macro is in error.
'''
for node in self._encap_nodes:
if node.inInitUIErrorState():
return True
if self._src.inInitUIErrorState():
return True
if self._src.inInitUIErrorState():
return True
return False
def inValidateErrorState(self):
'''Look at the run status of all nodes in the macro to determine if
macro is in warning state.
'''
for node in self._encap_nodes:
if node.inValidateErrorState():
return True
if self._src.inValidateErrorState():
return True
if self._src.inValidateErrorState():
return True
return False
def validateSinkNodes(self):
'''Get the list of connected nodes for _sink macro nodes.
Determine if the connections are valid, then list the valid nodes.
'''
# get _sink connections
# throw out input connections (where THIS node is the source).
src_tmp = self._sink.getConnectionTuples()
src_cn = [cn for cn in src_tmp if cn[0] != self._sink]
# store output connections for comparison
src_out = [cn for cn in src_tmp if cn[0] == self._sink]
src_ill = src_out[:]
log.debug("downstream search")
while True:
# build a local connection list for all nodes in the next level
lc = []
for cn in src_out:
# sources
if cn[0] != self._sink: # don't backtrack
for outport in cn[0].outportList:
lc += outport.getConnectionTuples()
# sinks
for inport in cn[1].inportList:
lc += inport.getConnectionTuples()
# cyclic
if cn[1] == self._src:
log.debug("illegal cn (sink validate):")
log.debug("\t"+cn[0].name +"->"+cn[1].name)
return 1
# consolidate connections
pre_len = len(src_out)
src_out += lc
src_out = list(set(src_out))
# if no new connections were added, then quit
if len(src_out) == pre_len:
break
if manager.isDebug():
log.debug("before upstream search")
for cn in src_cn:
log.debug("\t"+cn[0].name +"->"+cn[1].name)
while True:
# build a local connection list for all nodes in the next level
lc = []
for cn in src_cn:
# sources
if cn[0] != self._src:
for outport in cn[0].outportList:
lc += outport.getConnectionTuples()
for inport in cn[0].inportList:
lc += inport.getConnectionTuples()
# sinks
if cn[1] != self._sink:
for inport in cn[1].inportList:
lc += inport.getConnectionTuples()
for outport in cn[1].outportList:
lc += outport.getConnectionTuples()
# cyclic
if cn[1] == self._src:
log.debug("illegal cn (sink validate):")
log.debug("\t"+cn[0].name +"->"+cn[1].name)
return 1
# consolidate connections
pre_len = len(src_cn)
src_cn += lc
src_cn = list(set(src_cn))
# if no new connections were added, then quit
if len(src_cn) == pre_len:
break
if manager.isDebug():
log.debug("after search:")
for cn in src_cn:
log.debug("\t"+cn[0].name +"->"+cn[1].name)
# determine if the macro is valid
log.debug("illegal macro connections:")
ill_cnt = 0
for cn in src_cn:
if cn in src_ill:
ill_cnt += 1
log.debug("\t"+cn[0].name +"->"+cn[1].name)
# check for any other macro nodes
for cn in src_cn:
if isMacroChildNode(cn[0]):
if cn[0] not in self.getNodes():
ill_cnt += 1
if isMacroChildNode(cn[1]):
if cn[1] not in self.getNodes():
ill_cnt += 1
self._sink_cn = src_cn
return ill_cnt
def validateSrcNodes(self):
'''Get the list of connected nodes for _src macro nodes.
Determine if the connections are valid, then list the valid nodes.
'''
# get _src connections
# throw out input connections (where THIS node is the sink).
src_tmp = self._src.getConnectionTuples()
src_cn = [cn for cn in src_tmp if cn[1] != self._src]
# store input connections for comparison
src_in = [cn for cn in src_tmp if cn[1] == self._src]
if manager.isDebug():
log.debug("before search")
for cn in src_cn:
log.debug("\t"+cn[0].name +"->"+cn[1].name)
while True:
# build a local connection list for all nodes in the next level
lc = []
for cn in src_cn:
# sources
if len(cn[0].getConnectionTuples()):
if cn[0] != self._src: # don't backtrack
lc += cn[0].getConnectionTuples()
# cyclic
if cn[0] == self._sink:
log.debug("illegal cn (src validate):")
log.debug("\t"+cn[0].name +"->"+cn[1].name)
return 1
# sinks
if cn[1] != self._sink:
if len(cn[1].getConnectionTuples()):
lc += cn[1].getConnectionTuples()
# consolidate connections
pre_len = len(src_cn)
src_cn += lc
src_cn = list(set(src_cn))
# if no new connections were added, then quit
if len(src_cn) == pre_len:
break
if manager.isDebug():
log.debug("after search:")
for cn in src_cn:
log.debug("\t"+cn[0].name +"->"+cn[1].name)
# determine if the macro is valid
log.debug("illegal macro connections:")
ill_cnt = 0
for cn in src_cn:
if cn in src_in:
ill_cnt += 1
log.debug("\t"+cn[0].name +"->"+cn[1].name)
# check for any other macro nodes
for cn in src_cn:
if isMacroChildNode(cn[0]):
if cn[0] not in self.getNodes():
ill_cnt += 1
if isMacroChildNode(cn[1]):
if cn[1] not in self.getNodes():
ill_cnt += 1
self._src_cn = src_cn
return ill_cnt
def saveNodePos(self):
'''Run thru the list of encapsulated nodes and save their original
relative position to the _src node.
'''
self._sink_pos = self._sink.pos() - self._src.pos()
# save node-pos with parallel list to _encap_nodes
self._encap_nodepos = []
for node in self._encap_nodes:
self._encap_nodepos.append(node.pos() - self._src.pos())
# save the expanded sibling node positions
self._src_exppos = self._src.getPos()
self._sink_exppos = self._sink.getPos()
return self._encap_nodepos
def collapseMacro(self):
if self.validateSrcNodes():
return False
if self.validateSinkNodes():
return False
self.listEncapsulatedNodes()
if manager.isDebug():
log.debug("nodes:")
for node in self._encap_nodes:
log.debug(node.name)
self.saveNodePos()
self._face.setPos(self._src.pos())
# initialize animation stuff
self._anim_timeline = QtCore.QTimeLine(300)
self._anim_timeline.finished.connect(self.jawClosed)
self._anim_timeline.setFrameRange(1, 100)
self._anim = []
# close jaw
self._anim.append(QtGui.QGraphicsItemAnimation())
self._anim[-1].setItem(self._sink)
self._anim[-1].setTimeLine(self._anim_timeline)
self._anim[-1].setPosAt(1, self._src.pos())
for node in self._encap_nodes:
self._anim.append(QtGui.QGraphicsItemAnimation())
self._anim[-1].setItem(node)
self._anim[-1].setTimeLine(self._anim_timeline)
self._anim[-1].setPosAt(1, self._src.pos())
self._anim_timeline.start()
return True
def jawClosed(self):
'''finish the collapse Macro animation by hiding all encapsulated nodes
-connect all nodes
'''
# hide all the nodes and connect their update-signals to the macro
for node in self._encap_nodes:
node.hide()
node._forceUpdate.connect(self.updateNodePainter)
for edge in node.edges():
edge.hide()
self._sink.hide()
self._src.hide()
self._macroedge.hide()
self._face.show()
self.showFacePorts()
for port in self._face.getPorts():
port.updateEdges()
# make sure the face is selected
self._face.setSelected(True)
self._src.setSelected(False)
self._sink.setSelected(False)
def updateNodePainter(self):
'''Call the node updates for each of the bound nodes for re-coloring.
'''
if self.isCollapsed():
self._face.forceUpdate_NodeUI()
def jawOpen(self):
'''Once the open animation is finished, disconnect the re-painting
updater signal.
'''
for node in self._encap_nodes:
node._forceUpdate.disconnect()
node.setSelected(True)
# make sure the end nodes are also selected
self._face.setSelected(False)
self._src.setSelected(True)
self._sink.setSelected(True)
def expandMacro(self):
# before expansion save the face position for load/paste purposes
self._face_colpos = self._face.getPos()
self._src.setPos(self._face.pos())
self._sink.setPos(self._src.pos())
self._face.hide()
self._macroedge.show()
self._sink.show()
self._src.show()
self.resetPortParents()
for port in self._src.getPorts():
port.updateEdges()
for port in self._sink.getPorts():
port.updateEdges()
for node in self._encap_nodes:
node.setPos(self._src.pos())
node.show()
for edge in node.edges():
edge.show()
# initialize animation stuff
self._anim_timeline = QtCore.QTimeLine(300)
self._anim_timeline.finished.connect(self.jawOpen)
self._anim_timeline.setFrameRange(1, 100)
self._anim = []
# close jaw
self._anim.append(QtGui.QGraphicsItemAnimation())
self._anim[-1].setItem(self._sink)
self._anim[-1].setTimeLine(self._anim_timeline)
self._anim[-1].setPosAt(1, self._sink_pos + self._src.pos())
for node, pos in zip(self._encap_nodes, self._encap_nodepos):
self._anim.append(QtGui.QGraphicsItemAnimation())
self._anim[-1].setItem(node)
self._anim[-1].setTimeLine(self._anim_timeline)
self._anim[-1].setPosAt(1, pos + self._src.pos())
self._anim_timeline.start()