#!/usr/bin/env/python
# -*- coding: utf-8 -*-
###
### Author: Chris Iatrou (ichrispa@core-vector.net)
### Version: rev 13
###
### This program was created for educational purposes and has been
### contributed to the open62541 project by the author. All licensing
### terms for this source is inherited by the terms and conditions
### specified for by the open62541 project (see the projects readme
### file for more information on the LGPL terms and restrictions).
###
### This program is not meant to be used in a production environment. The
### author is not liable for any complications arising due to the use of
### this program.
###
from __future__ import print_function
import sys
import xml.dom.minidom as dom
from struct import pack as structpack
from time import struct_time, strftime, strptime, mktime
import logging; logger = logging.getLogger(__name__)
from datatypes import *
from nodes import *
from constants import *
####################
# Helper Functions #
####################
hassubtype = NodeId("ns=0;i=45")
def getSubTypesOf(nodeset, node):
re = [node]
for ref in node.references:
if ref.referenceType == hassubtype and ref.isForward:
re = re + getSubTypesOf(nodeset, nodeset.nodes[ref.target])
return re
def extractNamespaces(xmlfile):
# Extract a list of namespaces used. The first namespace is always
# "http://opcfoundation.org/UA/". minidom gobbles up
# elements, without a decent way to reliably
# access this dom2 elements (only attribute xmlns= are accessible
# using minidom). We need them for dereferencing though... This function
# attempts to do just that.
namespaces = ["http://opcfoundation.org/UA/"]
infile = open(xmlfile.name)
foundURIs = False
nsline = ""
line = infile.readline()
for line in infile:
if "" in line.lower():
foundURIs = True
elif "" in line.lower():
foundURIs = False
nsline = nsline + line
break
if foundURIs:
nsline = nsline + line
if len(nsline) > 0:
ns = dom.parseString(nsline).getElementsByTagName("NamespaceUris")
for uri in ns[0].childNodes:
if uri.nodeType != uri.ELEMENT_NODE:
continue
if uri.firstChild.data in namespaces:
continue
namespaces.append(uri.firstChild.data)
infile.close()
return namespaces
def buildAliasList(xmlelement):
"""Parses the XML Element present in must XML NodeSet definitions.
Contents the Alias element are stored in a dictionary for further
dereferencing during pointer linkage (see linkOpenPointer())."""
aliases = {}
for al in xmlelement.childNodes:
if al.nodeType == al.ELEMENT_NODE:
if al.hasAttribute("Alias"):
aliasst = al.getAttribute("Alias")
aliasnd = unicode(al.firstChild.data)
aliases[aliasst] = aliasnd
return aliases
class NodeSet(object):
""" This class handles parsing XML description of namespaces, instantiating
nodes, linking references, graphing the namespace and compiling a binary
representation.
Note that nodes assigned to this class are not restricted to having a
single namespace ID. This class represents the entire physical address
space of the binary representation and all nodes that are to be included
in that segment of memory.
"""
def __init__(self):
self.nodes = {}
self.namespaces = ["http://opcfoundation.org/UA/"]
def sanitize(self):
for n in self.nodes.values():
if n.sanitize() == False:
raise Exception("Failed to sanitize node " + str(n))
# Sanitize reference consistency
for n in self.nodes.values():
for ref in n.references:
if not ref.source == n.id:
raise Exception("Reference " + str(ref) + " has an invalid source")
if not ref.referenceType in self.nodes:
raise Exception("Reference " + str(ref) + " has an unknown reference type")
if not ref.target in self.nodes:
raise Exception("Reference " + str(ref) + " has an unknown target")
def addNamespace(self, nsURL):
if not nsURL in self.namespaces:
self.namespaces.append(nsURL)
def createNamespaceMapping(self, orig_namespaces):
"""Creates a dict that maps from the nsindex in the original nodeset to the
nsindex in the combined nodeset"""
m = {}
for index,name in enumerate(orig_namespaces):
m[index] = self.namespaces.index(name)
return m
def getNodeByBrowseName(self, idstring):
return next((n for n in self.nodes.values() if idstring==n.browseName.name), None)
def getRoot(self):
return self.getNodeByBrowseName("Root")
def createNode(self, xmlelement, nsMapping, hidden=False):
ndtype = xmlelement.tagName.lower()
if ndtype[:2] == "ua":
ndtype = ndtype[2:]
node = None
if ndtype == 'variable':
node = VariableNode(xmlelement)
if ndtype == 'object':
node = ObjectNode(xmlelement)
if ndtype == 'method':
node = MethodNode(xmlelement)
if ndtype == 'objecttype':
node = ObjectTypeNode(xmlelement)
if ndtype == 'variabletype':
node = VariableTypeNode(xmlelement)
if ndtype == 'methodtype':
node = MethodNode(xmlelement)
if ndtype == 'datatype':
node = DataTypeNode(xmlelement)
if ndtype == 'referencetype':
node = ReferenceTypeNode(xmlelement)
if node and hidden:
node.hidden = True
# References from an existing nodeset are all suppressed
for ref in node.references:
ref.hidden = True
for ref in node.inverseReferences:
ref.hidden = True
return node
def addNodeSet(self, xmlfile, hidden = False):
# Extract NodeSet DOM
nodesets = dom.parse(xmlfile).getElementsByTagName("UANodeSet")
if len(nodesets) == 0 or len(nodesets) > 1:
raise Exception(self, self.originXML + " contains no or more then 1 nodeset")
nodeset = nodesets[0]
# Create the namespace mapping
orig_namespaces = extractNamespaces(xmlfile) # List of namespaces used in the xml file
for ns in orig_namespaces:
self.addNamespace(ns)
nsMapping = self.createNamespaceMapping(orig_namespaces)
# Extract the aliases
aliases = None
for nd in nodeset.childNodes:
if nd.nodeType != nd.ELEMENT_NODE:
continue
ndtype = nd.tagName.lower()
if 'aliases' in ndtype:
aliases = buildAliasList(nd)
# Instantiate nodes
newnodes = []
for nd in nodeset.childNodes:
if nd.nodeType != nd.ELEMENT_NODE:
continue
node = self.createNode(nd, nsMapping, hidden)
if not node:
continue
node.replaceAliases(aliases)
node.replaceNamespaces(nsMapping)
# Add the node the the global dict
if node.id in self.nodes:
raise Exception("XMLElement with duplicate ID " + str(node.id))
self.nodes[node.id] = node
newnodes.append(node)
# add inverse references
for node in newnodes:
for ref in node.references:
newsource = self.nodes[ref.target]
hide = ref.hidden or (node.hidden and newsource.hidden)
newref = Reference(newsource.id, ref.referenceType, ref.source, False, hide)
newsource.inverseReferences.add(newref)
for ref in node.inverseReferences:
newsource = self.nodes[ref.target]
hide = ref.hidden or (node.hidden and newsource.hidden)
newref = Reference(newsource.id, ref.referenceType, ref.source, True, hide)
newsource.references.add(newref)