1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774 |
- #!/usr/bin/env/python
- # -*- coding: utf-8 -*-
- ###
- ### Author: Chris Iatrou (ichrispa@core-vector.net)
- ### Version: rev 13
- ###
- ### This program was created for educational purposes and has been
- ### contributed to the open62541 project by the author. All licensing
- ### terms for this source is inherited by the terms and conditions
- ### specified for by the open62541 project (see the projects readme
- ### file for more information on the LGPL terms and restrictions).
- ###
- ### This program is not meant to be used in a production environment. The
- ### author is not liable for any complications arising due to the use of
- ### this program.
- ###
- import sys
- import logging
- from ua_builtin_types import *;
- from open62541_MacroHelper import open62541_MacroHelper
- from ua_constants import *
- logger = logging.getLogger(__name__)
- def getNextElementNode(xmlvalue):
- if xmlvalue == None:
- return None
- xmlvalue = xmlvalue.nextSibling
- while not xmlvalue == None and not xmlvalue.nodeType == xmlvalue.ELEMENT_NODE:
- xmlvalue = xmlvalue.nextSibling
- return xmlvalue
- ###
- ### References are not really described by OPC-UA. This is how we
- ### use them here.
- ###
- class opcua_referencePointer_t():
- """ Representation of a pointer.
- A pointer consists of a target (which should be a node class),
- an optional reference type (which should be an instance of
- opcua_node_referenceType_t) and an optional isForward flag.
- """
- __reference_type__ = None
- __target__ = None
- __isForward__ = True
- __addr__ = 0
- __parentNode__ = None
- def __init__(self, target, hidden=False, parentNode=None):
- self.__target__ = target
- self.__reference_type__ = None
- self.__isForward__ = True
- self.__isHidden__ = hidden
- self.__parentNode__ = parentNode
- self.__addr__ = 0
- def isHidden(self, data=None):
- if isinstance(data, bool):
- self.__isHidden__ = data
- return self.__isHidden__
- def isForward(self, data=None):
- if isinstance(data, bool):
- self.__isForward__ = data
- return self.__isForward__
- def referenceType(self, type=None):
- if not type == None:
- self.__reference_type__ = type
- return self.__reference_type__
- def target(self, data=None):
- if not data == None:
- self.__target__ = data
- return self.__target__
- def address(self, data=None):
- if data != None:
- self.__addr__ = data
- return self.__addr__
- def parent(self):
- return self.__parentNode__
- def getCodePrintableID(self):
- src = "None"
- tgt = "None"
- type = "Unknown"
- if self.parent() != None:
- src = str(self.parent().id())
- if self.target() != None:
- tgt = str(self.target().id())
- if self.referenceType() != None:
- type = str(self.referenceType().id())
- tmp = src+"_"+type+"_"+tgt
- tmp = tmp.lower()
- refid = ""
- for i in tmp:
- if not i in "ABCDEFGHIJKLMOPQRSTUVWXYZ0123456789".lower():
- refid = refid + ("_")
- else:
- refid = refid + i
- return refid
- def __str__(self):
- retval=""
- if isinstance(self.parent(), opcua_node_t):
- if isinstance(self.parent().id(), opcua_node_id_t):
- retval=retval + str(self.parent().id()) + "--["
- else:
- retval=retval + "(?) --["
- else:
- retval=retval + "(?) --["
- if isinstance(self.referenceType(), opcua_node_t):
- retval=retval + str(self.referenceType().browseName()) + "]-->"
- else:
- retval=retval + "?]-->"
- if isinstance(self.target(), opcua_node_t):
- if isinstance(self.target().id(), opcua_node_id_t):
- retval=retval + str(self.target().id())
- else:
- retval=retval + "(?) "
- else:
- retval=retval + "(?) "
- if self.isForward() or self.isHidden():
- retval = retval + " <"
- if self.isForward():
- retval = retval + "F"
- if self.isHidden():
- retval = retval + "H"
- retval = retval + ">"
- return retval
- def __repr__(self):
- return self.__str__()
- def __cmp__(self, other):
- if not isinstance(other, opcua_referencePointer_t):
- return -1
- if other.target() == self.target():
- if other.referenceType() == self.referenceType():
- if other.isForward() == self.isForward():
- return 0
- return 1
- ###
- ### Node ID's as a builtin type are useless. using this one instead.
- ###
- class opcua_node_id_t():
- """ Implementation of a node ID.
- The ID will encoding itself appropriatly as string. If multiple ID's (numeric, string, guid)
- are defined, the order of preference for the ID string is always numeric, guid,
- bytestring, string. Binary encoding only applies to numeric values (UInt16).
- """
- i = -1
- o = ""
- g = ""
- s = ""
- ns = 0
- __mystrname__ = ""
- def __init__(self, idstring):
- idparts = idstring.split(";")
- self.i = None
- self.b = None
- self.g = None
- self.s = None
- self.ns = 0
- for p in idparts:
- if p[:2] == "ns":
- self.ns = int(p[3:])
- elif p[:2] == "i=":
- self.i = int(p[2:])
- elif p[:2] == "o=":
- self.b = p[2:]
- elif p[:2] == "g=":
- tmp = []
- self.g = p[2:].split("-")
- for i in self.g:
- i = "0x"+i
- tmp.append(int(i,16))
- self.g = tmp
- elif p[:2] == "s=":
- self.s = p[2:]
- self.__mystrname__ = ""
- self.toString()
- def toString(self):
- self.__mystrname__ = ""
- if self.ns != 0:
- self.__mystrname__ = "ns="+str(self.ns)+";"
- # Order of preference is numeric, guid, bytestring, string
- if self.i != None:
- self.__mystrname__ = self.__mystrname__ + "i="+str(self.i)
- elif self.g != None:
- self.__mystrname__ = self.__mystrname__ + "g="
- tmp = []
- for i in self.g:
- tmp.append(hex(i).replace("0x",""))
- for i in tmp:
- self.__mystrname__ = self.__mystrname__ + "-" + i
- self.__mystrname__ = self.__mystrname__.replace("g=-","g=")
- elif self.b != None:
- self.__mystrname__ = self.__mystrname__ + "b="+str(self.b)
- elif self.s != None:
- self.__mystrname__ = self.__mystrname__ + "s="+str(self.s)
- def __str__(self):
- return self.__mystrname__
- def __eq__(self, nodeId2):
- if not nodeId2:
- return False
- return (self.toString() == nodeId2.toString())
- def __repr__(self):
- return self.__mystrname__
- ###
- ### Actually existing node types
- ###
- class opcua_node_t:
- __node_id__ = None
- __node_class__ = 0
- __node_browseName__ = ""
- __node_displayName__ = ""
- __node_description__ = ""
- __node_writeMask__ = 0
- __node_userWriteMask__ = 0
- __node_namespace__ = None
- __node_references__ = []
- __node_referencedBy__ = []
- __binary__ = ""
- __address__ = 0
- def __init__(self, id, ns):
- self.__node_namespace__ = ns
- self.__node_id__ = id
- self.__node_class__ = 0
- self.__node_browseName__ = ""
- self.__node_displayName__ = ""
- self.__node_description__ = ""
- self.__node_writeMask__ = 0
- self.__node_userWriteMask__ = 0
- self.__node_references__ = []
- self.__node_referencedBy__ = []
- self.__init_subType__()
- self.FLAG_ISABSTRACT = 128
- self.FLAG_SYMMETRIC = 64
- self.FLAG_CONTAINSNOLOOPS = 32
- self.FLAG_EXECUTABLE = 16
- self.FLAG_USEREXECUTABLE = 8
- self.FLAG_HISTORIZING = 4
- self.__binary__ = ""
- def __init_subType__(self):
- self.nodeClass(0)
- def __str__(self):
- if isinstance(self.id(), opcua_node_id_t):
- return self.__class__.__name__ + "(" + str(self.id()) + ")"
- return self.__class__.__name__ + "( no ID )"
- def __repr__(self):
- if isinstance(self.id(), opcua_node_id_t):
- return self.__class__.__name__ + "(" + str(self.id()) + ")"
- return self.__class__.__name__ + "( no ID )"
- def getCodePrintableID(self):
- CodePrintable="NODE_"
- if isinstance(self.id(), opcua_node_id_t):
- CodePrintable = self.__class__.__name__ + "_" + str(self.id())
- else:
- CodePrintable = self.__class__.__name__ + "_unknown_nid"
- CodePrintable = CodePrintable.lower()
- cleanPrintable = ""
- for i in range(0,len(CodePrintable)):
- if not CodePrintable[i] in "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_".lower():
- cleanPrintable = cleanPrintable + "_"
- else:
- cleanPrintable = cleanPrintable + CodePrintable[i]
- return cleanPrintable
- def addReference(self, ref):
- """ Add a opcua_referencePointer_t to the list of
- references this node carries.
- """
- if not ref in self.__node_references__:
- self.__node_references__.append(ref)
- def removeReference(self, ref):
- if ref in self.__node_references__:
- self.__node_references__.remove(ref)
- def removeReferenceToNode(self, targetNode):
- tmp = []
- if ref in self.__node_references__:
- if ref.target() != targetNode:
- tmp.append(ref)
- self.__node_references__ = tmp
- def addInverseReferenceTarget(self, node):
- """ Adds a reference to the inverse reference list of this node.
- Inverse references are considered as "this node is referenced by"
- and facilitate lookups when between nodes that reference this node,
- but are not referenced by this node. These references would
- require the namespace to be traversed by references to be found
- if this node was not aware of them.
- """
- # Only add this target if it is not already referenced
- if not node in self.__node_referencedBy__:
- if not self.hasReferenceTarget(node):
- self.__node_referencedBy__.append(opcua_referencePointer_t(node, hidden=True, parentNode=self))
- # logger.debug(self.__node_browseName__ + " added reverse reference to " + str(node.__node_browseName__))
- # else:
- # logger.debug(self.__node_browseName__ + " refusing reverse reference to " + str(node.__node_browseName__) + " (referenced normally)")
- # else:
- # logger.debug(self.__node_browseName__ + " refusing reverse reference to " + str(node.__node_browseName__) + " (already reversed referenced)")
- def getReferences(self):
- return self.__node_references__
- def getInverseReferences(self):
- return self.__node_referencedBy__
- def hasInverseReferenceTarget(self, node):
- for r in self.getInverseReferences():
- if node == r.target():
- return True
- return False
- def hasReferenceTarget(self, node):
- for r in self.getReferences():
- if node == r.target():
- return True
- return False
- def getFirstParentNode(self):
- """ getFirstParentNode
- return a tuple of (opcua_node_t, opcua_referencePointer_t) indicating
- the first node found that references this node. If this node is not
- referenced at all, None will be returned.
- This function requires a linked namespace.
- Note that there may be more than one nodes that reference this node.
- The parent returned will be determined by the first isInverse()
- Reference of this node found. If none exists, the first hidden
- reference will be returned.
- """
- parent = None
- revref = None
- for hiddenstatus in [False, True]:
- for r in self.getReferences():
- if r.isHidden() == hiddenstatus and r.isForward() == False:
- parent = r.target()
- for r in parent.getReferences():
- if r.target() == self:
- revref = r
- break
- if revref != None:
- return (parent, revref)
- return (parent, revref)
- def updateInverseReferences(self):
- """ Updates inverse references in all nodes referenced by this node.
- The function will look up all referenced nodes and check if they
- have a reference that points back at this node. If none is found,
- that means that the target is not aware that this node references
- it. In that case an inverse reference will be registered with
- the target node to point back to this node instance.
- """
- # Update inverse references in all nodes we have referenced
- for r in self.getReferences():
- if isinstance(r.target(), opcua_node_t):
- if not r.target().hasInverseReferenceTarget(self):
- #logger.debug(self.__node_browseName__ + " req. rev. referencing in" + str(r.target().__node_browseName__))
- r.target().addInverseReferenceTarget(self)
- #else:
- #logger.debug("Cannot register inverse link to " + str(r.target()) + " (not a node)")
- def id(self):
- return self.__node_id__
- def getNamespace(self):
- return self.__node_namespace__
- def nodeClass(self, c = 0):
- """ Sets the node class attribute if c is passed.
- Returns the current node class.
- """
- # Allow overwriting only if it is not set
- if isinstance(c, int):
- if self.__node_class__ == 0 and c < 256:
- self.__node_class__ = c
- return self.__node_class__
- def browseName(self, data=0):
- """ Sets the browse name attribute if data is passed.
- Returns the current browse name.
- """
- if isinstance(data, str):
- self.__node_browseName__ = data
- if sys.version_info[0] < 3:
- return self.__node_browseName__.encode('utf-8')
- return self.__node_browseName__
- def displayName(self, data=None):
- """ Sets the display name attribute if data is passed.
- Returns the current display name.
- """
- if data != None:
- self.__node_displayName__ = data
- return self.__node_displayName__.encode('utf-8')
- def description(self, data=None):
- """ Sets the description attribute if data is passed.
- Returns the current description.
- """
- if data != None:
- self.__node_description__ = data
- return self.__node_description__.encode('utf-8')
- def writeMask(self, data=None):
- """ Sets the write mask attribute if data is passed.
- Returns the current write mask.
- """
- if data != None:
- self.__node_writeMask__ = data
- return self.__node_writeMask__
- def userWriteMask(self, data=None):
- """ Sets the user write mask attribute if data is passed.
- Returns the current user write mask.
- """
- if data != None:
- self.__node_userWriteMask__ = data
- return self.__node_userWriteMask__
- def initiateDummyXMLReferences(self, xmlelement):
- """ Initiates references found in the XML <References> element.
- All references initiated will be registered with this node, but
- their targets will be strings extracted from the XML description
- (hence "dummy").
- References created will however be registered with the namespace
- for linkLater(), which will eventually replace the string target
- with an actual instance of an opcua_node_t.
- """
- if not xmlelement.tagName == "References":
- logger.error("XMLElement passed is not a reference list")
- return
- for ref in xmlelement.childNodes:
- if ref.nodeType == ref.ELEMENT_NODE:
- dummy = opcua_referencePointer_t(unicode(ref.firstChild.data), parentNode=self)
- self.addReference(dummy)
- self.getNamespace().linkLater(dummy)
- for (at, av) in ref.attributes.items():
- if at == "ReferenceType":
- dummy.referenceType(av)
- elif at == "IsForward":
- if "false" in av.lower():
- dummy.isForward(False)
- else:
- logger.error("Don't know how to process attribute " + at + "(" + av + ") for references.")
- def printDot(self):
- cleanname = "node_" + str(self.id()).replace(";","").replace("=","")
- dot = cleanname + " [label = \"{" + str(self.id()) + "|" + str(self.browseName()) + "}\", shape=\"record\"]"
- for r in self.__node_references__:
- if isinstance(r.target(), opcua_node_t):
- tgtname = "node_" + str(r.target().id()).replace(";","").replace("=","")
- dot = dot + "\n"
- if r.isForward() == True:
- dot = dot + cleanname + " -> " + tgtname + " [label=\"" + str(r.referenceType().browseName()) + "\"]\n"
- else:
- if len(r.referenceType().inverseName()) == 0:
- logger.warn("Inverse name of reference is null " + str(r.referenceType().id()))
- dot = dot + cleanname + " -> " + tgtname + " [label=\"" + str(r.referenceType().inverseName()) + "\"]\n"
- return dot
- def sanitize(self):
- """ Check the health of this node.
- Return True if all mandatory attributes are valid and all references have been
- correclty linked to nodes. Returns False on failure, which should indicate
- that this node needs to be removed from the namespace.
- """
- # Do we have an id?
- if not isinstance(self.id(), opcua_node_id_t):
- logger.error("HELP! I'm an id'less node!")
- return False
- # Remove unlinked references
- tmp = []
- for r in self.getReferences():
- if not isinstance(r, opcua_referencePointer_t):
- logger.error("Reference is not a reference!?.")
- elif not isinstance(r.referenceType(), opcua_node_t):
- logger.error("Reference has no valid reference type and will be removed.")
- elif not isinstance(r.target(), opcua_node_t):
- logger.warn("Reference to " + str(r.target()) + " is not a node. It has been removed.")
- else:
- tmp.append(r)
- self.__node_references__ = tmp
- # Make sure that every inverse referenced node actually does reference us
- tmp = []
- for r in self.getInverseReferences():
- if not isinstance(r.target(), opcua_node_t):
- logger.warn("Invers reference to " + str(r.target()) + " does not reference a real node. It has been removed.")
- else:
- if r.target().hasReferenceTarget(self):
- tmp.append(r)
- else:
- logger.warn("Node " + str(self.id()) + " was falsely under the impression that it is referenced by " + str(r.target().id()))
- self.__node_referencedBy__ = tmp
- # Remove references from inverse list if we can reach this not "the regular way"
- # over a normal reference
- tmp=[]
- for r in self.getInverseReferences():
- if not self.hasReferenceTarget(r.target()):
- tmp.append(r)
- else:
- logger.debug("Removing unnecessary inverse reference to " + str(r.target.id()))
- self.__node_referencedBy__ = tmp
- return self.sanitizeSubType()
- def sanitizeSubType(self):
- pass
- def address(self, addr = None):
- """ If addr is passed, the address of this node within the binary
- representation will be set.
- If an address within the binary representation is known/set, this
- function will return it. NoneType is returned if no address has been
- set.
- """
- if addr != None:
- self.__address__ = addr
- return self.__address__
- def parseXML(self, xmlelement):
- """ Extracts base attributes from the XML description of an element.
- Parsed basetype attributes are:
- * browseName
- * displayName
- * Description
- * writeMask
- * userWriteMask
- * eventNotifier
- ParentNodeIds are ignored.
- If recognized, attributes and elements found will be removed from
- the XML Element passed. Type-specific attributes and child elements
- are handled by the parseXMLSubType() functions of all classes deriving
- from this base type and will be called automatically.
- """
- thisxml = xmlelement
- for (at, av) in thisxml.attributes.items():
- if at == "NodeId":
- xmlelement.removeAttribute(at)
- elif at == "BrowseName":
- self.browseName(str(av))
- xmlelement.removeAttribute(at)
- elif at == "DisplayName":
- self.displayName(av)
- xmlelement.removeAttribute(at)
- elif at == "Description":
- self.description(av)
- xmlelement.removeAttribute(at)
- elif at == "WriteMask":
- self.writeMask(int(av))
- xmlelement.removeAttribute(at)
- elif at == "UserWriteMask":
- self.userWriteMask(int(av))
- xmlelement.removeAttribute(at)
- elif at == "EventNotifier":
- self.eventNotifier(int(av))
- xmlelement.removeAttribute(at)
- elif at == "ParentNodeId":
- # Silently ignore this one..
- xmlelement.removeAttribute(at)
- for x in thisxml.childNodes:
- if x.nodeType == x.ELEMENT_NODE:
- if x.firstChild:
- if x.tagName == "BrowseName":
- self.browseName(unicode(x.firstChild.data))
- xmlelement.removeChild(x)
- elif x.tagName == "DisplayName":
- self.displayName(unicode(x.firstChild.data))
- xmlelement.removeChild(x)
- elif x.tagName == "Description":
- self.description(unicode(x.firstChild.data))
- xmlelement.removeChild(x)
- elif x.tagName == "WriteMask":
- self.writeMask(int(unicode(x.firstChild.data)))
- xmlelement.removeChild(x)
- elif x.tagName == "UserWriteMask":
- self.userWriteMask(int(unicode(x.firstChild.data)))
- xmlelement.removeChild(x)
- if x.tagName == "References":
- self.initiateDummyXMLReferences(x)
- xmlelement.removeChild(x)
- self.parseXMLSubType(xmlelement)
- def parseXMLSubType(self, xmlelement):
- pass
- def printXML(self):
- pass
- def printOpen62541CCode_SubtypeEarly(self, bootstrapping = True):
- """ printOpen62541CCode_SubtypeEarly
- Initiate code segments for the nodes instantiotion that preceed
- the actual UA_Server_addNode or UA_NodeStore_insert calls.
- """
- return []
- def printOpen62541CCode_Subtype(self, unPrintedReferences=[], bootstrapping = True):
- """ printOpen62541CCode_Subtype
- Appends node type specific information to the nodes UA_Server_addNode
- or UA_NodeStore_insert calls.
- """
- return []
- def printOpen62541CCode(self, unPrintedNodes=[], unPrintedReferences=[], supressGenerationOfAttribute=[]):
- """ printOpen62541CCode
- Returns a list of strings containing the C-code necessary to intialize
- this node for the open62541 OPC-UA Stack.
- Note that this function will fail if the nodeid is non-numeric, as
- there is no UA_EXPANDEDNNODEID_[STRING|GUID|BYTESTRING] macro.
- """
- codegen = open62541_MacroHelper(supressGenerationOfAttribute=supressGenerationOfAttribute)
- code = []
- code.append("")
- code.append("do {")
- # Just to be sure...
- if not (self in unPrintedNodes):
- logger.warn(str(self) + " attempted to reprint already printed node " + str(self)+ ".")
- return []
- # If we are being passed a parent node by the namespace, use that for registering ourselves in the namespace
- # Note: getFirstParentNode will return [parentNode, referenceToChild]
- (parentNode, parentRef) = self.getFirstParentNode()
- if not (parentNode in unPrintedNodes) and (parentNode != None) and (parentRef.referenceType() != None):
- code.append("// Referencing node found and declared as parent: " + str(parentNode .id()) + "/" +
- str(parentNode .__node_browseName__) + " using " + str(parentRef.referenceType().id()) +
- "/" + str(parentRef.referenceType().__node_browseName__))
- code = code + codegen.getCreateNodeNoBootstrap(self, parentNode, parentRef, unPrintedNodes)
- # Parent to child reference is added by the server, do not reprint that reference
- if parentRef in unPrintedReferences:
- unPrintedReferences.remove(parentRef)
- # the UA_Server_addNode function will use addReference which creates a bidirectional reference; remove any inverse
- # references to our parent to avoid duplicate refs
- for ref in self.getReferences():
- if ref.target() == parentNode and ref.referenceType() == parentRef.referenceType() and ref.isForward() == False:
- while ref in unPrintedReferences:
- unPrintedReferences.remove(ref)
- # Otherwise use the "Bootstrapping" method and we will get registered with other nodes later.
- else:
- code = code + self.printOpen62541CCode_SubtypeEarly(bootstrapping = True)
- code = code + codegen.getCreateNodeBootstrap(self)
- code = code + self.printOpen62541CCode_Subtype(unPrintedReferences = unPrintedReferences, bootstrapping = True)
- code.append("// Parent node does not exist yet. This node will be bootstrapped and linked later.")
- code.append("UA_RCU_LOCK();")
- code.append("UA_NodeStore_insert(server->nodestore, (UA_Node*) " + self.getCodePrintableID() + ");")
- code.append("UA_RCU_UNLOCK();")
- # Try to print all references to nodes that already exist
- # Note: we know the reference types exist, because the namespace class made sure they were
- # the first ones being printed
- tmprefs = []
- for r in self.getReferences():
- #logger.debug("Checking if reference from " + str(r.parent()) + "can be created...")
- if not (r.target() in unPrintedNodes):
- if r in unPrintedReferences:
- if (len(tmprefs) == 0):
- code.append("// This node has the following references that can be created:")
- code = code + codegen.getCreateStandaloneReference(self, r)
- tmprefs.append(r)
- # Remove printed refs from list
- for r in tmprefs:
- unPrintedReferences.remove(r)
- # Again, but this time check if other nodes deffered their node creation because this node did
- # not exist...
- tmprefs = []
- for r in unPrintedReferences:
- #logger.debug("Checking if another reference " + str(r.target()) + "can be created...")
- if (r.target() == self) and not (r.parent() in unPrintedNodes):
- if not isinstance(r.parent(), opcua_node_t):
- logger.debug("Reference has no parent!")
- elif not isinstance(r.parent().id(), opcua_node_id_t):
- logger.debug("Parents nodeid is not a nodeID!")
- else:
- if (len(tmprefs) == 0):
- code.append("// Creating this node has resolved the following open references:")
- code = code + codegen.getCreateStandaloneReference(r.parent(), r)
- tmprefs.append(r)
- # Remove printed refs from list
- for r in tmprefs:
- unPrintedReferences.remove(r)
- # Again, just to be sure...
- if self in unPrintedNodes:
- # This is necessery to make printing work at all!
- unPrintedNodes.remove(self)
- code.append("} while(0);")
- return code
- class opcua_node_referenceType_t(opcua_node_t):
- __isAbstract__ = False
- __symmetric__ = False
- __reference_inverseName__ = ""
- __reference_referenceType__ = None
- def __init_subType__(self):
- self.nodeClass(NODE_CLASS_REFERENCETYPE)
- self.__reference_isAbstract__ = False
- self.__reference_symmetric__ = False
- self.__reference_inverseName__ = ""
- self.__reference_referenceType__ = None
- def referenceType(self,data=None):
- if isinstance(data, opcua_node_t):
- self.__reference_referenceType__ = data
- return self.__reference_referenceType__
- def isAbstract(self,data=None):
- if isinstance(data, bool):
- self.__isAbstract__ = data
- return self.__isAbstract__
- def symmetric(self,data=None):
- if isinstance(data, bool):
- self.__symmetric__ = data
- return self.__symmetric__
- def inverseName(self,data=None):
- if isinstance(data, str):
- self.__reference_inverseName__ = data
- return self.__reference_inverseName__
- def sanitizeSubType(self):
- if not isinstance(self.referenceType(), opcua_referencePointer_t):
- logger.error("ReferenceType " + str(self.referenceType()) + " of " + str(self.id()) + " is not a pointer (ReferenceType is mandatory for references).")
- self.__reference_referenceType__ = None
- return False
- return True
- def parseXMLSubType(self, xmlelement):
- for (at, av) in xmlelement.attributes.items():
- if at == "Symmetric":
- if "false" in av.lower():
- self.symmetric(False)
- else:
- self.symmetric(True)
- xmlelement.removeAttribute(at)
- elif at == "InverseName":
- self.inverseName(str(av))
- xmlelement.removeAttribute(at)
- elif at == "IsAbstract":
- if "false" in str(av).lower():
- self.isAbstract(False)
- else:
- self.isAbstract(True)
- xmlelement.removeAttribute(at)
- else:
- logger.warn("Don't know how to process attribute " + at + " (" + av + ")")
- for x in xmlelement.childNodes:
- if x.nodeType == x.ELEMENT_NODE:
- if x.tagName == "InverseName" and x.firstChild:
- self.inverseName(str(unicode(x.firstChild.data)))
- else:
- logger.warn( "Unprocessable XML Element: " + x.tagName)
- def printOpen62541CCode_Subtype(self, unPrintedReferences=[], bootstrapping = True):
- code = []
- codegen = open62541_MacroHelper()
- # Detect if this is bootstrapping or if we are attempting to use userspace...
- if bootstrapping == False:
- typeDefs = self.getNamespace().getSubTypesOf() # defaults to TypeDefinition
- myTypeRef = None
- for ref in self.getReferences():
- if ref.referenceType() in typeDefs:
- myTypeRef = ref
- break
- if myTypeRef==None:
- for ref in self.getReferences():
- if ref.referenceType().browseName() == "HasSubtype" and ref.isForward() == False:
- myTypeRef = ref
- break
- if myTypeRef==None:
- logger.warn(str(self) + " failed to locate a type definition, assuming BaseDataType.")
- code.append(" // No valid typeDefinition found; assuming BaseDataType")
- code.append(" UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_BASEDATATYPE),")
- else:
- code.append(" " + codegen.getCreateExpandedNodeIDMacro(myTypeRef.target()) + ",")
- while myTypeRef in unPrintedReferences:
- unPrintedReferences.remove(myTypeRef)
- code.append(" UA_LOCALIZEDTEXT(\"\",\"" + str(self.inverseName()) + "\"),");
- code.append(" // FIXME: Missing, isAbstract")
- code.append(" // FIXME: Missing, symmetric")
- return code
- if self.isAbstract():
- code.append(self.getCodePrintableID() + "->isAbstract = true;")
- if self.symmetric():
- code.append(self.getCodePrintableID() + "->symmetric = true;")
- if self.__reference_inverseName__ != "":
- code.append(self.getCodePrintableID() + "->inverseName = UA_LOCALIZEDTEXT_ALLOC(\"en_US\", \"" + self.__reference_inverseName__ + "\");")
- return code;
- class opcua_node_object_t(opcua_node_t):
- __object_eventNotifier__ = 0
- def __init_subType__(self):
- self.nodeClass(NODE_CLASS_OBJECT)
- self.__object_eventNotifier__ = 0
- def eventNotifier(self, data=""):
- if isinstance(data, int):
- self.__object_eventNotifier__ == data
- return self.__object_eventNotifier__
- def parseXMLSubType(self, xmlelement):
- for (at, av) in xmlelement.attributes.items():
- if at == "EventNotifier":
- self.eventNotifier(int(av))
- xmlelement.removeAttribute(at)
- elif at == "SymbolicName":
- # Silently ignore this one
- xmlelement.removeAttribute(at)
- else:
- logger.error("Don't know how to process attribute " + at + " (" + av + ")")
- for x in xmlelement.childNodes:
- if x.nodeType == x.ELEMENT_NODE:
- logger.info( "Unprocessable XML Element: " + x.tagName)
- def printOpen62541CCode_Subtype(self, unPrintedReferences=[], bootstrapping = True):
- code = []
- codegen = open62541_MacroHelper()
- # Detect if this is bootstrapping or if we are attempting to use userspace...
- if bootstrapping == False:
- typeDefs = self.getNamespace().getSubTypesOf() # defaults to TypeDefinition
- myTypeRef = None
- for ref in self.getReferences():
- if ref.referenceType() in typeDefs:
- myTypeRef = ref
- break
- if myTypeRef==None:
- for ref in self.getReferences():
- if ref.referenceType().browseName() == "HasSubtype" and ref.isForward() == False:
- myTypeRef = ref
- break
- if myTypeRef==None:
- logger.warn(str(self) + " failed to locate a type definition, assuming BaseObjectType.")
- code.append(" // No valid typeDefinition found; assuming BaseObjectType")
- code.append(" UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_BASEOBJECTTYPE),")
- else:
- code.append(" " + codegen.getCreateExpandedNodeIDMacro(myTypeRef.target()) + ",")
- while myTypeRef in unPrintedReferences:
- unPrintedReferences.remove(myTypeRef)
- #FIXME: No event notifier in UA_Server_addNode call!
- return code
- # We are being bootstrapped! Add the raw attributes to the node.
- code.append(self.getCodePrintableID() + "->eventNotifier = (UA_Byte) " + str(self.eventNotifier()) + ";")
- return code
- if sys.version_info[0] >= 3:
- # strings are already parsed to unicode
- def unicode(s):
- return s
- class opcua_node_variable_t(opcua_node_t):
- __value__ = 0
- __dataType__ = None
- __valueRank__ = 0
- __arrayDimensions__ = 0
- __accessLevel__ = 0
- __userAccessLevel__ = 0
- __minimumSamplingInterval__ = 0
- __historizing__ = False
- def __init_subType__(self):
- self.nodeClass(NODE_CLASS_VARIABLE)
- self.__value__ = None
- self.__dataType__ = None
- self.__valueRank__ = -1
- self.__arrayDimensions__ = []
- self.__accessLevel__ = 3
- self.__userAccessLevel__ = 3
- self.__minimumSamplingInterval__ = 0.0
- self.__historizing__ = False
- self.__xmlValueDef__ = None
- def value(self, data=0):
- if isinstance(data, opcua_value_t):
- self.__value__ = data
- return self.__value__
- def dataType(self, data=None):
- if data != None:
- self.__dataType__ = data
- return self.__dataType__
- def valueRank(self, data=""):
- if isinstance(data, int):
- self.__valueRank__ = data
- return self.__valueRank__
- def arrayDimensions(self, data=None):
- if not data==None:
- self.__arrayDimensions__ = data
- return self.__arrayDimensions__
- def accessLevel(self, data=None):
- if not data==None:
- self.__accessLevel__ = data
- return self.__accessLevel__
- def userAccessLevel(self, data=None):
- if not data==None:
- self.__userAccessLevel__ = data
- return self.__userAccessLevel__
- def minimumSamplingInterval(self, data=None):
- if not data==None:
- self.__minimumSamplingInterval__ = data
- return self.__minimumSamplingInterval__
- def historizing(self, data=None):
- if data != None:
- self.__historizing__ = data
- return self.__historizing__
- def sanitizeSubType(self):
- if not isinstance(self.dataType(), opcua_referencePointer_t):
- logger.error("DataType " + str(self.dataType()) + " of " + str(self.id()) + " is not a pointer (DataType is mandatory for variables).")
- self.__dataType__ = None
- return False
- if not isinstance(self.dataType().target(), opcua_node_t):
- logger.error("DataType " + str(self.dataType().target()) + " of " + str(self.id()) + " does not point to a node (DataType is mandatory for variables).")
- self.__dataType__ = None
- return False
- return True
- def allocateValue(self):
- if not isinstance(self.dataType(), opcua_referencePointer_t):
- logger.error("Variable " + self.browseName() + "/" + str(self.id()) + " does not reference a valid dataType.")
- return False
- if not isinstance(self.dataType().target(), opcua_node_dataType_t):
- logger.error("Variable " + self.browseName() + "/" + str(self.id()) + " does not have a valid dataType reference.")
- return False
- if not self.dataType().target().isEncodable():
- logger.error("DataType for Variable " + self.browseName() + "/" + str(self.id()) + " is not encodable.")
- return False
- # FIXME: Don't build at all or allocate "defaults"? I'm for not building at all.
- if self.__xmlValueDef__ == None:
- #logger.warn("Variable " + self.browseName() + "/" + str(self.id()) + " is not initialized. No memory will be allocated.")
- return False
- self.value(opcua_value_t(self))
- self.value().parseXML(self.__xmlValueDef__)
- # Array Dimensions must accurately represent the value and will be patched
- # reflect the exaxt dimensions attached binary stream.
- if not isinstance(self.value(), opcua_value_t) or len(self.value().value) == 0:
- self.arrayDimensions([])
- else:
- # Parser only permits 1-d arrays, which means we do not have to check further dimensions
- self.arrayDimensions([len(self.value().value)])
- return True
- def parseXMLSubType(self, xmlelement):
- for (at, av) in xmlelement.attributes.items():
- if at == "ValueRank":
- self.valueRank(int(av))
- xmlelement.removeAttribute(at)
- elif at == "AccessLevel":
- self.accessLevel(int(av))
- xmlelement.removeAttribute(at)
- elif at == "UserAccessLevel":
- self.userAccessLevel(int(av))
- xmlelement.removeAttribute(at)
- elif at == "MinimumSamplingInterval":
- self.minimumSamplingInterval(float(av))
- xmlelement.removeAttribute(at)
- elif at == "DataType":
- self.dataType(opcua_referencePointer_t(str(av), parentNode=self))
- # dataType needs to be linked to a node once the namespace is read
- self.getNamespace().linkLater(self.dataType())
- xmlelement.removeAttribute(at)
- elif at == "SymbolicName":
- # Silently ignore this one
- xmlelement.removeAttribute(at)
- else:
- logger.error("Don't know how to process attribute " + at + " (" + av + ")")
- for x in xmlelement.childNodes:
- if x.nodeType == x.ELEMENT_NODE:
- if x.tagName == "Value":
- # We need to be able to parse the DataType to build the variable value,
- # which can only be done if the namespace is linked.
- # Store the Value for later parsing
- self.__xmlValueDef__ = x
- #logger.debug( "Value description stored for later elaboration.")
- elif x.tagName == "DataType":
- self.dataType(opcua_referencePointer_t(str(av), parentNode=self))
- # dataType needs to be linked to a node once the namespace is read
- self.getNamespace().linkLater(self.dataType())
- elif x.tagName == "ValueRank":
- self.valueRank(int(unicode(x.firstChild.data)))
- elif x.tagName == "ArrayDimensions":
- self.arrayDimensions(int(unicode(x.firstChild.data)))
- elif x.tagName == "AccessLevel":
- self.accessLevel(int(unicode(x.firstChild.data)))
- elif x.tagName == "UserAccessLevel":
- self.userAccessLevel(int(unicode(x.firstChild.data)))
- elif x.tagName == "MinimumSamplingInterval":
- self.minimumSamplingInterval(float(unicode(x.firstChild.data)))
- elif x.tagName == "Historizing":
- if "true" in x.firstChild.data.lower():
- self.historizing(True)
- else:
- logger.info( "Unprocessable XML Element: " + x.tagName)
- def printOpen62541CCode_SubtypeEarly(self, bootstrapping = True):
- code = []
- # If we have an encodable value, try to encode that
- if self.dataType() != None and isinstance(self.dataType().target(), opcua_node_dataType_t):
- # Delegate the encoding of the datavalue to the helper if we have
- # determined a valid encoding
- if self.dataType().target().isEncodable():
- if self.value() != None:
- code = code + self.value().printOpen62541CCode(bootstrapping)
- return code
- if(bootstrapping):
- code.append("UA_Variant *" + self.getCodePrintableID() + "_variant = UA_alloca(sizeof(UA_Variant));")
- code.append("UA_Variant_init(" + self.getCodePrintableID() + "_variant);")
- return code
- def printOpen62541CCode_Subtype(self, unPrintedReferences=[], bootstrapping = True):
- code = []
- codegen = open62541_MacroHelper()
- # Detect if this is bootstrapping or if we are attempting to use userspace...
- if bootstrapping == False:
- code.append(" " + self.getCodePrintableID() + "_variant, ")
- code.append(" // FIXME: missing minimumSamplingInterval")
- code.append(" // FIXME: missing accessLevel")
- code.append(" // FIXME: missing userAccessLevel")
- code.append(" // FIXME: missing valueRank")
- return code
- if self.historizing():
- code.append(self.getCodePrintableID() + "->historizing = true;")
- code.append(self.getCodePrintableID() + "->minimumSamplingInterval = (UA_Double) " + str(self.minimumSamplingInterval()) + ";")
- code.append(self.getCodePrintableID() + "->accessLevel = (UA_Int32) " + str(self.accessLevel()) + ";")
- code.append(self.getCodePrintableID() + "->valueRank = (UA_Int32) " + str(self.valueRank()) + ";")
- # The variant is guaranteed to exist by SubtypeEarly()
- code.append("UA_Variant_copy(" + self.getCodePrintableID() + "_variant, &" + self.getCodePrintableID() + "->value.data.value.value );")
- code.append(self.getCodePrintableID() + "->valueSource = UA_VALUESOURCE_DATA;")
- return code
- class opcua_node_method_t(opcua_node_t):
- __executable__ = True
- __userExecutable__ = True
- __methodDecalaration__ = None
- def __init_subType__(self):
- self.nodeClass(NODE_CLASS_METHOD)
- self.__executable__ = True
- self.__userExecutable__ = True
- self.__methodDecalaration__ = None
- def methodDeclaration(self, data=None):
- if not data==None:
- self.__methodDecalaration__ = data
- return self.__methodDecalaration__
- def executable(self, data=None):
- if isinstance(data, bool):
- self.__executable__ == data
- return self.__executable__
- def userExecutable(self, data=None):
- if isinstance(data, bool):
- self.__userExecutable__ == data
- return self.__userExecutable__
- def sanitizeSubType(self):
- if self.methodDeclaration() != None:
- if not isinstance(self.methodDeclaration().target(), opcua_node_t):
- return False
- else:
- #FIXME: Is this even permitted!?
- pass
- def parseXMLSubType(self, xmlelement):
- for (at, av) in xmlelement.attributes.items():
- if at == "MethodDeclarationId":
- self.methodDeclaration(opcua_referencePointer_t(str(av), parentNode=self))
- # dataType needs to be linked to a node once the namespace is read
- self.getNamespace().linkLater(self.methodDeclaration())
- else:
- logger.error("Don't know how to process attribute " + at + " (" + av + ")")
- for x in xmlelement.childNodes:
- if x.nodeType == x.ELEMENT_NODE:
- logger.info( "Unprocessable XML Element: " + x.tagName)
- def printOpen62541CCode_Subtype(self, unPrintedReferences=[], bootstrapping = True):
- code = []
- # Detect if this is bootstrapping or if we are attempting to use userspace...
- if bootstrapping == False:
- code.append(" // Note: in/outputArguments are added by attaching the variable nodes,")
- code.append(" // not by including the in the addMethodNode() call.")
- code.append(" NULL,")
- code.append(" NULL,")
- code.append(" 0, NULL,")
- code.append(" 0, NULL,")
- code.append(" // FIXME: Missing executable")
- code.append(" // FIXME: Missing userExecutable")
- return code
- # UA_False is default for booleans on _init()
- if self.executable():
- code.append(self.getCodePrintableID() + "->executable = true;")
- return code
- class opcua_node_objectType_t(opcua_node_t):
- __isAbstract__ = False
- def __init_subType__(self):
- self.nodeClass(NODE_CLASS_OBJECTTYPE)
- self.__isAbstract__ == False
- def isAbstract(self, data=None):
- if isinstance(data, bool):
- self.__isAbstract__ = data
- return self.__isAbstract__
- def parseXMLSubType(self, xmlelement):
- for (at, av) in xmlelement.attributes.items():
- if at == "IsAbstract":
- if "false" in av.lower():
- self.isAbstract(False)
- xmlelement.removeAttribute(at)
- else:
- logger.error("Don't know how to process attribute " + at + " (" + av + ")")
- for x in xmlelement.childNodes:
- if x.nodeType == x.ELEMENT_NODE:
- logger.info( "Unprocessable XML Element: " + x.tagName)
- def printOpen62541CCode_Subtype(self, unPrintedReferences=[], bootstrapping = True):
- code = []
- codegen = open62541_MacroHelper();
- # Detect if this is bootstrapping or if we are attempting to use userspace...
- if bootstrapping == False:
- typeDefs = self.getNamespace().getSubTypesOf() # defaults to TypeDefinition
- myTypeRef = None
- for ref in self.getReferences():
- if ref.referenceType() in typeDefs:
- myTypeRef = ref
- break
- if myTypeRef==None:
- for ref in self.getReferences():
- if ref.referenceType().browseName() == "HasSubtype" and ref.isForward() == False:
- myTypeRef = ref
- break
- if myTypeRef==None:
- logger.warn(str(self) + " failed to locate a type definition, assuming BaseObjectType.")
- code.append(" // No valid typeDefinition found; assuming BaseObjectType")
- code.append(" UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_BASEOBJECTTYPE),")
- else:
- code.append(" " + codegen.getCreateExpandedNodeIDMacro(myTypeRef.target()) + ",")
- while myTypeRef in unPrintedReferences:
- code.append(" // removed " + str(myTypeRef))
- unPrintedReferences.remove(myTypeRef)
- if (self.isAbstract()):
- code.append(" true,")
- else:
- code.append(" false,")
- # Fallback mode for bootstrapping
- if (self.isAbstract()):
- code.append(self.getCodePrintableID() + "->isAbstract = true;")
- return code
- class opcua_node_variableType_t(opcua_node_t):
- __value__ = 0
- __dataType__ = None
- __valueRank__ = 0
- __arrayDimensions__ = 0
- __isAbstract__ = False
- __xmlDefinition__ = None
- def __init_subType__(self):
- self.nodeClass(NODE_CLASS_VARIABLETYPE)
- self.__value__ = 0
- self.__dataType__ = None
- self.__valueRank__ = -1
- self.__arrayDimensions__ = 0
- self.__isAbstract__ = False
- self.__xmlDefinition__ = None
- def value(self, data=None):
- logger.error("Setting data not implemented!")
- def dataType(self, data=None):
- if data != None:
- self.__dataType__ = data
- return self.__dataType__
- def valueRank(self,data=None):
- if isinstance(data, int):
- self.__valueRank__ = data
- return self.__valueRank__
- def arrayDimensions(self,data=None):
- if isinstance(data, int):
- self.__arrayDimensions__ = data
- return self.__arrayDimensions__
- def isAbstract(self,data=None):
- if isinstance(data, bool):
- self.__isAbstract__ = data
- return self.__isAbstract__
- def sanitizeSubType(self):
- # DataType fields appear to be optional for VariableTypes
- # but if it does have a node set, it must obviously be a valid node
- if not self.dataType() != None:
- if not isinstance(self.dataType(), opcua_referencePointer_t):
- logger.error("DataType attribute of " + str(self.id()) + " is not a pointer")
- return False
- else:
- if not isinstance(self.dataType().target(), opcua_node_t):
- logger.error("DataType attribute of " + str(self.id()) + " does not point to a node")
- return False
- else:
- # FIXME: It's unclear wether this is ok or not.
- logger.warn("DataType attribute of variableType " + str(self.id()) + " is not defined.")
- return False
- def parseXMLSubType(self, xmlelement):
- for (at, av) in xmlelement.attributes.items():
- if at == "IsAbstract":
- if "false" in av.lower():
- self.isAbstract(False)
- else:
- self.isAbstract(True)
- xmlelement.removeAttribute(at)
- elif at == "ValueRank":
- self.valueRank(int(av))
- if self.valueRank() != -1:
- logger.warn("Array's or matrices are only permitted in variables and not supported for variableTypes. This attribute (" + at + "=" + av + ") will effectively be ignored.")
- xmlelement.removeAttribute(at)
- elif at == "DataType":
- self.dataType(opcua_referencePointer_t(str(av), parentNode=self))
- # dataType needs to be linked to a node once the namespace is read
- self.getNamespace().linkLater(self.dataType())
- else:
- logger.error("Don't know how to process attribute " + at + " (" + av + ")")
- for x in xmlelement.childNodes:
- if x.nodeType == x.ELEMENT_NODE:
- if x.tagName == "Definition":
- self.__xmlDefinition__ = x
- logger.debug( "Definition stored for future processing")
- else:
- logger.info( "Unprocessable XML Element: " + x.tagName)
- def printOpen62541CCode_SubtypeEarly(self, bootstrapping = True):
- code = []
- # If we have an encodable value, try to encode that
- if self.dataType() != None and isinstance(self.dataType().target(), opcua_node_dataType_t):
- # Delegate the encoding of the datavalue to the helper if we have
- # determined a valid encoding
- if self.dataType().target().isEncodable():
- if self.value() != None:
- code = code + self.value().printOpen62541CCode(bootstrapping)
- return code
- if(bootstrapping):
- code.append("UA_Variant *" + self.getCodePrintableID() + "_variant = UA_alloca(sizeof(UA_Variant));")
- code.append("UA_Variant_init(" + self.getCodePrintableID() + "_variant);")
- return code
- def printOpen62541CCode_Subtype(self, unPrintedReferences=[], bootstrapping = True):
- code = []
- codegen = open62541_MacroHelper()
- if bootstrapping == False:
- code.append(" " + self.getCodePrintableID() + "_variant, ")
- code.append(" " + str(self.valueRank()) + ",")
- if self.isAbstract():
- code.append(" true,")
- else:
- code.append(" false,")
- return code
- if (self.isAbstract()):
- code.append(self.getCodePrintableID() + "->isAbstract = true;")
- else:
- code.append(self.getCodePrintableID() + "->isAbstract = false;")
- # The variant is guaranteed to exist by SubtypeEarly()
- code.append("UA_Variant_copy(" + self.getCodePrintableID() + "_variant, &" + self.getCodePrintableID() + "->value.data.value.value );")
- code.append(self.getCodePrintableID() + "->valueSource = UA_VALUESOURCE_DATA;")
- return code
- class opcua_node_dataType_t(opcua_node_t):
- """ opcua_node_dataType_t is a subtype of opcua_note_t describing DataType nodes.
- DataType contain definitions and structure information usable for Variables.
- The format of this structure is determined by buildEncoding()
- Two definition styles are distinguished in XML:
- 1) A DataType can be a structure of fields, each field having a name and a type.
- The type must be either an encodable builtin node (ex. UInt32) or point to
- another DataType node that inherits its encoding from a builtin type using
- a inverse "hasSubtype" (hasSuperType) reference.
- 2) A DataType may be an enumeration, in which each field has a name and a numeric
- value.
- The definition is stored as an ordered list of tuples. Depending on which
- definition style was used, the __definition__ will hold
- 1) A list of ("Fieldname", opcua_node_t) tuples.
- 2) A list of ("Fieldname", int) tuples.
- A DataType (and in consequence all Variables using it) shall be deemed not
- encodable if any of its fields cannot be traced to an encodable builtin type.
- A DataType shall be further deemed not encodable if it contains mixed structure/
- enumaration definitions.
- If encodable, the encoding can be retrieved using getEncoding().
- """
- __isAbstract__ = False
- __isEnum__ = False
- __xmlDefinition__ = None
- __baseTypeEncoding__ = []
- __encodable__ = False
- __encodingBuilt__ = False
- __definition__ = []
- def __init_subType__(self):
- self.nodeClass(NODE_CLASS_DATATYPE)
- self.__isAbstract__ == False
- self.__xmlDefinition__ = None
- self.__baseTypeEncoding__ = []
- self.__encodable__ = None
- self.__encodingBuilt__ = False
- self.__definition__ = []
- self.__isEnum__ = False
- def isAbstract(self,data=None):
- """ Will return True if isAbstract was defined.
- Calling this function with an arbitrary data parameter will set
- isAbstract = data.
- """
- if isinstance(data, bool):
- self.__isAbstract__ = data
- return self.__isAbstract__
- def isEncodable(self):
- """ Will return True if buildEncoding() was able to determine which builtin
- type corresponds to all fields of this DataType.
- If no encoding has been build yet, this function will call buildEncoding()
- and return True if it succeeds.
- """
- return self.__encodable__
- def getEncoding(self):
- """ If the dataType is encodable, getEncoding() returns a nested list
- containing the encoding the structure definition for this type.
- If no encoding has been build yet, this function will call buildEncoding()
- and return the encoding if buildEncoding() succeeds.
- If buildEncoding() fails or has failed, an empty list will be returned.
- """
- if self.__encodable__ == False:
- if self.__encodingBuilt__ == False:
- return self.buildEncoding()
- return []
- else:
- return self.__baseTypeEncoding__
- def buildEncoding(self, indent=0, force=False):
- """ buildEncoding() determines the structure and aliases used for variables
- of this DataType.
- The function will parse the XML <Definition> of the dataType and extract
- "Name"-"Type" tuples. If successfull, buildEncoding will return a nested
- list of the following format:
- [['Alias1', ['Alias2', ['BuiltinType']]], [Alias2, ['BuiltinType']], ...]
- Aliases are fieldnames defined by this DataType or DataTypes referenced. A
- list such as ['DataPoint', ['Int32']] indicates that a value will encode
- an Int32 with the alias 'DataPoint' such as <DataPoint>12827</DataPoint>.
- Only the first Alias of a nested list is considered valid for the BuiltinType.
- Single-Elemented lists are always BuiltinTypes. Every nested list must
- converge in a builtin type to be encodable. buildEncoding will follow
- the first type inheritance reference (hasSupertype) of the dataType if
- necessary;
- If instead to "DataType" a numeric "Value" attribute is encountered,
- the DataType will be considered an enumeration and all Variables using
- it will be encoded as Int32.
- DataTypes can be either structures or enumeration - mixed definitions will
- be unencodable.
- Calls to getEncoding() will be iterative. buildEncoding() can be called
- only once per dataType, with all following calls returning the predetermined
- value. Use of the 'force=True' parameter will force the Definition to be
- reparsed.
- After parsing, __definition__ holds the field definition as a list. Note
- that this might deviate from the encoding, especially if inheritance was
- used.
- """
- proxy = opcua_value_t(None)
- prefix = " " + "|"*indent+ "+"
- if force==True:
- self.__encodingBuilt__ = False
- if self.__encodingBuilt__ == True:
- if self.isEncodable():
- logger.debug(prefix + str(self.__baseTypeEncoding__) + " (already analyzed)")
- else:
- logger.debug( prefix + str(self.__baseTypeEncoding__) + "(already analyzed, not encodable!)")
- return self.__baseTypeEncoding__
- self.__encodingBuilt__ = True # signify that we have attempted to built this type
- self.__encodable__ = True
- if indent==0:
- logger.debug("Parsing DataType " + self.browseName() + " (" + str(self.id()) + ")")
- if proxy.isBuiltinByString(self.browseName()):
- self.__baseTypeEncoding__ = [self.browseName()]
- self.__encodable__ = True
- logger.debug( prefix + self.browseName() + "*")
- logger.debug("Encodable as: " + str(self.__baseTypeEncoding__))
- logger.debug("")
- return self.__baseTypeEncoding__
- if self.__xmlDefinition__ == None:
- # Check if there is a supertype available
- for ref in self.getReferences():
- if "hassubtype" in ref.referenceType().browseName().lower() and ref.isForward() == False:
- if isinstance(ref.target(), opcua_node_dataType_t):
- logger.debug( prefix + "Attempting definition using supertype " + ref.target().browseName() + " for DataType " + " " + self.browseName())
- subenc = ref.target().buildEncoding(indent=indent+1)
- if not ref.target().isEncodable():
- self.__encodable__ = False
- break
- else:
- self.__baseTypeEncoding__ = self.__baseTypeEncoding__ + [self.browseName(), subenc, 0]
- if len(self.__baseTypeEncoding__) == 0:
- logger.debug(prefix + "No viable definition for " + self.browseName() + " " + str(self.id()) + " found.")
- self.__encodable__ = False
- if indent==0:
- if not self.__encodable__:
- logger.debug("Not encodable (partial): " + str(self.__baseTypeEncoding__))
- else:
- logger.debug("Encodable as: " + str(self.__baseTypeEncoding__))
- logger.debug( "")
- return self.__baseTypeEncoding__
- isEnum = True
- isSubType = True
- hasValueRank = 0
- # We need to store the definition as ordered data, but can't use orderedDict
- # for backward compatibility with Python 2.6 and 3.4
- enumDict = []
- typeDict = []
- # An XML Definition is provided and will be parsed... now
- for x in self.__xmlDefinition__.childNodes:
- if x.nodeType == x.ELEMENT_NODE:
- fname = ""
- fdtype = ""
- enumVal = ""
- hasValueRank = 0
- for at,av in x.attributes.items():
- if at == "DataType":
- fdtype = str(av)
- isEnum = False
- elif at == "Name":
- fname = str(av)
- elif at == "Value":
- enumVal = int(av)
- isSubType = False
- elif at == "ValueRank":
- hasValueRank = int(av)
- logger.warn("Arrays or matrices (ValueRank) are not supported for datatypes. This DT will become scalar.")
- else:
- logger.warn("Unknown Field Attribute " + str(at))
- # This can either be an enumeration OR a structure, not both.
- # Figure out which of the dictionaries gets the newly read value pair
- if isEnum == isSubType:
- # This is an error
- logger.warn("DataType contains both enumeration and subtype (or neither)")
- self.__encodable__ = False
- break
- elif isEnum:
- # This is an enumeration
- enumDict.append((fname, enumVal))
- continue
- else:
- # This might be a subtype... follow the node defined as datatype to find out
- # what encoding to use
- dtnode = self.getNamespace().getNodeByIDString(fdtype)
- if dtnode == None:
- # Node found in datatype element is invalid
- logger.debug( prefix + fname + " ?? " + av + " ??")
- self.__encodable__ = False
- else:
- # The node in the datatype element was found. we inherit its encoding,
- # but must still ensure that the dtnode is itself validly encodable
- typeDict.append([fname, dtnode])
- if hasValueRank < 0:
- hasValueRank = 0
- fdtype = str(dtnode.browseName()) + "+"*hasValueRank
- logger.debug( prefix + fname + " : " + fdtype + " -> " + str(dtnode.id()))
- subenc = dtnode.buildEncoding(indent=indent+1)
- self.__baseTypeEncoding__ = self.__baseTypeEncoding__ + [[fname, subenc, hasValueRank]]
- if not dtnode.isEncodable():
- # If we inherit an encoding from an unencodable not, this node is
- # also not encodable
- self.__encodable__ = False
- break
- # If we used inheritance to determine an encoding without alias, there is a
- # the possibility that lists got double-nested despite of only one element
- # being encoded, such as [['Int32']] or [['alias',['int32']]]. Remove that
- # enclosing list.
- while len(self.__baseTypeEncoding__) == 1 and isinstance(self.__baseTypeEncoding__[0], list):
- self.__baseTypeEncoding__ = self.__baseTypeEncoding__[0]
- if isEnum == True:
- self.__baseTypeEncoding__ = self.__baseTypeEncoding__ + ['Int32']
- self.__definition__ = enumDict
- self.__isEnum__ = True
- logger.debug( prefix+"Int32* -> enumeration with dictionary " + str(enumDict) + " encodable " + str(self.__encodable__))
- return self.__baseTypeEncoding__
- if indent==0:
- if not self.__encodable__:
- logger.debug( "Not encodable (partial): " + str(self.__baseTypeEncoding__))
- else:
- logger.debug( "Encodable as: " + str(self.__baseTypeEncoding__))
- self.__isEnum__ = False
- self.__definition__ = typeDict
- logger.debug( "")
- return self.__baseTypeEncoding__
- def parseXMLSubType(self, xmlelement):
- """ Parses all XML data that is not considered part of the base node attributes.
- XML attributes fields processed are "isAbstract"
- XML elements processed are "Definition"
- """
- for (at, av) in xmlelement.attributes.items():
- if at == "IsAbstract":
- if "true" in str(av).lower():
- self.isAbstract(True)
- else:
- self.isAbstract(False)
- xmlelement.removeAttribute(at)
- else:
- logger.warn("Don't know how to process attribute " + at + " (" + av + ")")
- for x in xmlelement.childNodes:
- if x.nodeType == x.ELEMENT_NODE:
- if x.tagName == "Definition":
- self.__xmlDefinition__ = x
- #logger.debug( "Definition stored for future processing")
- else:
- logger.warn( "Unprocessable XML Element: " + x.tagName)
- def encodedTypeId(self):
- """ Returns a number of the builtin Type that should be used
- to represent this datatype.
- """
- if self.isEncodable() != True or len(self.getEncoding()) == 0:
- # Encoding is []
- return 0
- else:
- enc = self.getEncoding()
- if len(enc) > 1 and isinstance(enc[0], list):
- # [ [?], [?], [?] ]
- # Encoding is a list representing an extensionobject
- return opcua_BuiltinType_extensionObject_t(None).getNumericRepresentation()
- else:
- if len(enc)==1 and isinstance(enc[0], str):
- # [ 'BuiltinType' ]
- return opcua_value_t(None).getTypeByString(enc[0]).getNumericRepresentation()
- else:
- # [ ['Alias', [?]] ]
- # Determine if [?] is reducable to a builtin type or if [?] is an aliased
- # extensionobject
- while len(enc) > 1 and isinstance(enc[0], str):
- enc = enc[1]
- if len(enc) > 1:
- return opcua_BuiltinType_extensionObject_t(None).getNumericRepresentation()
- else:
- return opcua_value_t(None).getTypeByString(enc[0]).getNumericRepresentation()
- def printOpen62541CCode_Subtype(self, unPrintedReferences=[], bootstrapping = True):
- code = []
- codegen = open62541_MacroHelper()
- # Detect if this is bootstrapping or if we are attempting to use userspace...
- if bootstrapping == False:
- typeDefs = self.getNamespace().getSubTypesOf() # defaults to TypeDefinition
- myTypeRef = None
- for ref in self.getReferences():
- if ref.referenceType() in typeDefs:
- myTypeRef = ref
- break
- if myTypeRef==None:
- for ref in self.getReferences():
- if ref.referenceType().browseName() == "HasSubtype" and ref.isForward() == False:
- myTypeRef = ref
- break
- if myTypeRef==None:
- logger.warn(str(self) + " failed to locate a type definition, assuming BaseDataType.")
- code.append(" // No valid typeDefinition found; assuming BaseDataType")
- code.append(" UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_BASEDATATYPE),")
- else:
- code.append(" " + codegen.getCreateExpandedNodeIDMacro(myTypeRef.target()) + ",")
- while myTypeRef in unPrintedReferences:
- unPrintedReferences.remove(myTypeRef)
- if (self.isAbstract()):
- code.append(" true,")
- else:
- code.append(" false,")
- return code
- if (self.isAbstract()):
- code.append(self.getCodePrintableID() + "->isAbstract = true;")
- else:
- code.append(self.getCodePrintableID() + "->isAbstract = false;")
- return code
- class opcua_node_view_t(opcua_node_t):
- __containsNoLoops__ = True
- __eventNotifier__ = 0
- def __init_subType__(self):
- self.nodeClass(NODE_CLASS_VIEW)
- self.__containsNoLoops__ == False
- self.__eventNotifier__ == False
- def containsNoLoops(self,data=None):
- if isinstance(data, bool):
- self.__containsNoLoops__ = data
- return self.__containsNoLoops__
- def eventNotifier(self,data=None):
- if isinstance(data, int):
- self.__eventNotifier__ = data
- return self.__eventNotifier__
- def parseXMLSubtype(self, xmlelement):
- for (at, av) in xmlelement.attributes.items():
- logger.error("Don't know how to process attribute " + at + " (" + av + ")")
- for x in xmlelement.childNodes:
- if x.nodeType == x.ELEMENT_NODE:
- logger.info( "Unprocessable XML Element: " + x.tagName)
- def printOpen62541CCode_Subtype(self, unPrintedReferences=[], bootstrapping = True):
- code = []
- codegen = open62541_MacroHelper()
- # Detect if this is bootstrapping or if we are attempting to use userspace...
- if bootstrapping == False:
- typeDefs = self.getNamespace().getSubTypesOf() # defaults to TypeDefinition
- myTypeRef = None
- for ref in self.getReferences():
- if ref.referenceType() in typeDefs:
- myTypeRef = ref
- break
- if myTypeRef==None:
- for ref in self.getReferences():
- if ref.referenceType().browseName() == "HasSubtype" and ref.isForward() == False:
- myTypeRef = ref
- break
- if myTypeRef==None:
- logger.warn(str(self) + " failed to locate a type definition, assuming BaseViewType.")
- code.append(" // No valid typeDefinition found; assuming BaseViewType")
- code.append(" UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_BASEViewTYPE),")
- else:
- code.append(" " + codegen.getCreateExpandedNodeIDMacro(myTypeRef.target()) + ",")
- while myTypeRef in unPrintedReferences:
- unPrintedReferences.remove(myTypeRef)
- code.append(" // FIXME: Missing eventNotifier")
- code.append(" // FIXME: Missing containsNoLoops")
- return code
- if self.containsNoLoops():
- code.append(self.getCodePrintableID() + "->containsNoLoops = true;")
- else:
- code.append(self.getCodePrintableID() + "->containsNoLoops = false;")
- code.append(self.getCodePrintableID() + "->eventNotifier = (UA_Byte) " + str(self.eventNotifier()) + ";")
- return code
|