123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- from __future__ import print_function
- import sys
- import time
- import platform
- import getpass
- from collections import OrderedDict
- import re
- from lxml import etree
- import itertools
- import argparse
- from pprint import pprint
- types = OrderedDict() # contains types that were already parsed
- typedescriptions = {} # contains type nodeids
- excluded_types = ["NodeIdType", "InstanceNode", "TypeNode", "Node", "ObjectNode",
- "ObjectTypeNode", "VariableNode", "VariableTypeNode", "ReferenceTypeNode",
- "MethodNode", "ViewNode", "DataTypeNode",
- "UA_ServerDiagnosticsSummaryDataType", "UA_SamplingIntervalDiagnosticsDataType",
- "UA_SessionSecurityDiagnosticsDataType", "UA_SubscriptionDiagnosticsDataType",
- "UA_SessionDiagnosticsDataType"]
- builtin_types = ["Boolean", "SByte", "Byte", "Int16", "UInt16", "Int32", "UInt32",
- "Int64", "UInt64", "Float", "Double", "String", "DateTime", "Guid",
- "ByteString", "XmlElement", "NodeId", "ExpandedNodeId", "StatusCode",
- "QualifiedName", "LocalizedText", "ExtensionObject", "DataValue",
- "Variant", "DiagnosticInfo"]
- # If the type does not contain pointers, it can be copied with memcpy
- # (internally, not into the protocol message). This dict contains the sizes of
- # fixed-size types. Parsed types are added if they apply.
- builtin_fixed_size = ["Boolean", "SByte", "Byte", "Int16", "UInt16", "Int32", "UInt32",
- "Int64", "UInt64", "Float", "Double", "DateTime", "Guid", "StatusCode"]
- # Some types can be memcpy'd off the binary stream. That's especially important
- # for arrays. But we need to check if they contain padding and whether the
- # endianness is correct. This dict gives the C-statement that must be true for the
- # type to be overlayable. Parsed types are added if they apply.
- builtin_overlayable = {"Boolean": "true", "SByte": "true", "Byte": "true",
- "Int16": "UA_BINARY_OVERLAYABLE_INTEGER", "UInt16": "UA_BINARY_OVERLAYABLE_INTEGER",
- "Int32": "UA_BINARY_OVERLAYABLE_INTEGER", "UInt32": "UA_BINARY_OVERLAYABLE_INTEGER",
- "Int64": "UA_BINARY_OVERLAYABLE_INTEGER", "UInt64": "UA_BINARY_OVERLAYABLE_INTEGER",
- "Float": "UA_BINARY_OVERLAYABLE_FLOAT", "Double": "UA_BINARY_OVERLAYABLE_FLOAT",
- "DateTime": "UA_BINARY_OVERLAYABLE_INTEGER", "StatusCode": "UA_BINARY_OVERLAYABLE_INTEGER",
- "Guid": "(UA_BINARY_OVERLAYABLE_INTEGER && offsetof(UA_Guid, data2) == sizeof(UA_UInt32) && " + \
- "offsetof(UA_Guid, data3) == (sizeof(UA_UInt16) + sizeof(UA_UInt32)) && " + \
- "offsetof(UA_Guid, data4) == (2*sizeof(UA_UInt32)))"}
- ################
- # Type Classes #
- ################
- class StructMember(object):
- def __init__(self, name, memberType, isArray):
- self.name = name
- self.memberType = memberType
- self.isArray = isArray
- class Type(object):
- def __init__(self, outname, xml):
- self.name = xml.get("Name")
- self.ns0 = ("true" if outname == "ua_types" else "false")
- self.typeIndex = outname.upper() + "_" + self.name.upper()
- self.outname = outname
- self.description = ""
- self.fixed_size = "false"
- self.overlayable = "false"
- if self.name in builtin_types:
- self.builtin = "true"
- else:
- self.builtin = "false"
- self.members = [StructMember("", self, False)] # Returns one member: itself. Overwritten by some types.
- for child in xml:
- if child.tag == "{http://opcfoundation.org/BinarySchema/}Documentation":
- self.description = child.text
- break
- def datatype_c(self):
- if self.name in typedescriptions:
- description = typedescriptions[self.name]
- typeid = "{.namespaceIndex = %s, .identifierType = UA_NODEIDTYPE_NUMERIC, .identifier.numeric = %s}" % (description.namespaceid, description.nodeid)
- else:
- typeid = "{.namespaceIndex = 0, .identifierType = UA_NODEIDTYPE_NUMERIC, .identifier.numeric = 0}"
- return "{ .typeId = " + typeid + \
- ",\n .typeIndex = " + self.typeIndex + \
- ",\n#ifdef UA_ENABLE_TYPENAMES\n .typeName = \"%s\",\n#endif\n" % self.name + \
- " .memSize = sizeof(UA_" + self.name + ")" + \
- ",\n .builtin = " + self.builtin + \
- ",\n .fixedSize = " + self.fixed_size + \
- ",\n .overlayable = " + self.overlayable + \
- ",\n .membersSize = " + str(len(self.members)) + \
- ",\n .members = %s_members" % self.name + " }"
- def members_c(self):
- members = "static UA_DataTypeMember %s_members[%s] = {" % (self.name, len(self.members))
- before = None
- for index, member in enumerate(self.members):
- m = "\n { .memberTypeIndex = %s_%s,\n" % (member.memberType.outname.upper(), member.memberType.name.upper())
- m += "#ifdef UA_ENABLE_TYPENAMES\n .memberName = \"%s\",\n#endif\n" % member.name
- m += " .namespaceZero = %s,\n" % member.memberType.ns0
- m += " .padding = "
- if not before:
- m += "0,\n"
- else:
- if member.isArray:
- m += "offsetof(UA_%s, %sSize)" % (self.name, member.name)
- else:
- m += "offsetof(UA_%s, %s)" % (self.name, member.name)
- m += " - offsetof(UA_%s, %s)" % (self.name, before.name)
- if before.isArray:
- m += " - sizeof(void*),\n"
- else:
- m += " - sizeof(UA_%s),\n" % before.memberType.name
- m += " .isArray = " + ("true" if member.isArray else "false")
- members += m + "\n },"
- before = member
- return members + "};"
- def datatype_ptr(self):
- return "&" + self.outname.upper() + "[" + self.outname.upper() + "_" + self.name.upper() + "]"
-
- def functions_c(self):
- funcs = "static UA_INLINE void UA_%s_init(UA_%s *p) { memset(p, 0, sizeof(UA_%s)); }\n" % (self.name, self.name, self.name)
- funcs += "static UA_INLINE UA_%s * UA_%s_new(void) { return (UA_%s*) UA_new(%s); }\n" % (self.name, self.name, self.name, self.datatype_ptr())
- if self.fixed_size == "true":
- funcs += "static UA_INLINE UA_StatusCode UA_%s_copy(const UA_%s *src, UA_%s *dst) { *dst = *src; return UA_STATUSCODE_GOOD; }\n" % (self.name, self.name, self.name)
- funcs += "static UA_INLINE void UA_%s_deleteMembers(UA_%s *p) { }\n" % (self.name, self.name)
- else:
- funcs += "static UA_INLINE UA_StatusCode UA_%s_copy(const UA_%s *src, UA_%s *dst) { return UA_copy(src, dst, %s); }\n" % (self.name, self.name, self.name, self.datatype_ptr())
- funcs += "static UA_INLINE void UA_%s_deleteMembers(UA_%s *p) { UA_deleteMembers(p, %s); }\n" % (self.name, self.name, self.datatype_ptr())
- funcs += "static UA_INLINE void UA_%s_delete(UA_%s *p) { UA_delete(p, %s); }" % (self.name, self.name, self.datatype_ptr())
- return funcs
- def encoding_h(self):
- enc = "static UA_INLINE UA_StatusCode UA_%s_encodeBinary(const UA_%s *src, UA_ByteString *dst, size_t *offset) { return UA_encodeBinary(src, %s, dst, offset); }\n"
- enc += "static UA_INLINE UA_StatusCode UA_%s_decodeBinary(const UA_ByteString *src, size_t *offset, UA_%s *dst) { return UA_decodeBinary(src, offset, dst, %s); }"
- return enc % tuple(list(itertools.chain(*itertools.repeat([self.name, self.name, self.datatype_ptr()], 2))))
- class BuiltinType(Type):
- def __init__(self, name):
- self.name = name
- self.ns0 = "true"
- self.typeIndex = "UA_TYPES_" + self.name.upper()
- self.outname = "ua_types"
- self.description = ""
- self.fixed_size = "false"
- if self.name in builtin_fixed_size:
- self.fixed_size = "true"
- self.overlayable = "false"
- if name in builtin_overlayable:
- self.overlayable = builtin_overlayable[name]
- self.builtin = "true"
- if self.name == "QualifiedName":
- self.members = [StructMember("namespaceIndex", types["Int16"], False), StructMember("name", types["String"], False)]
- elif self.name in ["String", "ByteString", "XmlElement"]:
- self.members = [StructMember("", types["Byte"], True)]
- else:
- self.members = [StructMember("", self, False)]
- class EnumerationType(Type):
- def __init__(self, outname, xml):
- Type.__init__(self, outname, xml)
- self.fixed_size = "true"
- self.overlayable = "UA_BINARY_OVERLAYABLE_INTEGER"
- self.members = [StructMember("", types["Int32"], False)] # encoded as uint32
- self.builtin = "true"
- self.typeIndex = "UA_TYPES_INT32"
- self.elements = OrderedDict()
- for child in xml:
- if child.tag == "{http://opcfoundation.org/BinarySchema/}EnumeratedValue":
- self.elements[child.get("Name")] = child.get("Value")
-
- def typedef_h(self):
- if sys.version_info[0] < 3:
- values = self.elements.iteritems()
- else:
- values = self.elements.items()
- return "typedef enum { \n " + ",\n ".join(map(lambda kv : "UA_" + self.name.upper() + "_" + kv[0].upper() + \
- " = " + kv[1], values)) + "\n} UA_%s;" % self.name
- class OpaqueType(Type):
- def __init__(self, outname, xml):
- Type.__init__(self, outname, xml)
- self.members = [StructMember("", types["ByteString"], False)] # encoded as string
- def typedef_h(self):
- return "typedef UA_ByteString UA_%s;" % self.name
- class StructType(Type):
- def __init__(self, outname, xml):
- Type.__init__(self, outname, xml)
- self.members = []
- lengthfields = [] # lengthfields of arrays are not included as members
- for child in xml:
- if child.get("LengthField"):
- lengthfields.append(child.get("LengthField"))
- for child in xml:
- if not child.tag == "{http://opcfoundation.org/BinarySchema/}Field":
- continue
- if child.get("Name") in lengthfields:
- continue
- memberName = child.get("Name")
- memberName = memberName[:1].lower() + memberName[1:]
- memberTypeName = child.get("TypeName")
- memberType = types[memberTypeName[memberTypeName.find(":")+1:]]
- isArray = True if child.get("LengthField") else False
- self.members.append(StructMember(memberName, memberType, isArray))
- self.fixed_size = "true"
- self.overlayable = "true"
- before = None
- for m in self.members:
- if m.isArray or m.memberType.fixed_size != "true":
- self.fixed_size = "false"
- self.overlayable = "false"
- else:
- self.overlayable += " && " + m.memberType.overlayable
- if before:
- self.overlayable += " && offsetof(UA_%s, %s) == (offsetof(UA_%s, %s) + sizeof(UA_%s))" % \
- (self.name, m.name, self.name, before.name, before.memberType.name)
- if "false" in self.overlayable:
- self.overlayable = "false"
- before = m
- def typedef_h(self):
- if len(self.members) == 0:
- return "typedef void * UA_%s;" % self.name
- returnstr = "typedef struct {\n"
- for member in self.members:
- if member.isArray:
- returnstr += " size_t %sSize;\n" % member.name
- returnstr += " UA_%s *%s;\n" % (member.memberType.name, member.name)
- else:
- returnstr += " UA_%s %s;\n" % (member.memberType.name, member.name)
- return returnstr + "} UA_%s;" % self.name
- #########################
- # Parse Typedefinitions #
- #########################
- def parseTypeDefinitions(outname, xmlDescription):
- ns = {"opc": "http://opcfoundation.org/BinarySchema/"}
- tree = etree.parse(xmlDescription)
- typeSnippets = tree.xpath("/opc:TypeDictionary/*[not(self::opc:Import)]", namespaces=ns)
- def typeReady(element):
- "Are all member types defined?"
- for child in element:
- if child.tag == "{http://opcfoundation.org/BinarySchema/}Field":
- childname = child.get("TypeName")
- if childname[childname.find(":")+1:] not in types:
- return False
- return True
- def skipType(name):
- if name in excluded_types:
- return True
- if "Test" in name: # skip all test types
- return True
- if re.search("NodeId$", name) != None:
- return True
- return False
- snippets = {}
- for typeXml in typeSnippets:
- name = typeXml.get("Name")
- snippets[name] = typeXml
- while(len(snippets) > 0):
- for name, typeXml in snippets.items():
- if name in types or skipType(name):
- del snippets[name]
- continue
- if not typeReady(typeXml):
- continue
- if name in builtin_types:
- types[name] = BuiltinType(name)
- elif typeXml.tag == "{http://opcfoundation.org/BinarySchema/}EnumeratedType":
- types[name] = EnumerationType(outname, typeXml)
- elif typeXml.tag == "{http://opcfoundation.org/BinarySchema/}OpaqueType":
- types[name] = OpaqueType(outname, typeXml)
- elif typeXml.tag == "{http://opcfoundation.org/BinarySchema/}StructuredType":
- types[name] = StructType(outname, typeXml)
- else:
- raise Exception("Type not known")
- del snippets[name]
- ##########################
- # Parse TypeDescriptions #
- ##########################
- class TypeDescription(object):
- def __init__(self, name, nodeid, namespaceid):
- self.name = name
- self.nodeid = nodeid
- self.namespaceid = namespaceid
- def parseTypeDescriptions(filename, namespaceid):
- definitions = {}
- with open(filename) as f:
- input_str = f.read()
- input_str = input_str.replace('\r','')
- rows = map(lambda x:tuple(x.split(',')), input_str.split('\n'))
- for index, row in enumerate(rows):
- if len(row) < 3:
- continue
- if row[2] != "DataType":
- continue
- if row[0] == "BaseDataType":
- definitions["Variant"] = TypeDescription(row[0], row[1], namespaceid)
- elif row[0] == "Structure":
- definitions["ExtensionObject"] = TypeDescription(row[0], row[1], namespaceid)
- elif row[0] not in types:
- continue
- elif type(types[row[0]]) == EnumerationType:
- definitions[row[0]] = TypeDescription(row[0], "6", namespaceid) # enumerations look like int32 on the wire
- else:
- definitions[row[0]] = TypeDescription(row[0], row[1], namespaceid)
- return definitions
- ###############################
- # Parse the Command Line Input#
- ###############################
- parser = argparse.ArgumentParser()
- parser.add_argument('--typedescriptions', help='csv file with type descriptions')
- parser.add_argument('--namespace', type=int, default=0, help='namespace id of the generated type nodeids (defaults to 0)')
- parser.add_argument('--selected_types', help='file with list of types (among those parsed) to be generated')
- parser.add_argument('typexml_ns0', help='path/to/Opc.Ua.Types.bsd ...')
- parser.add_argument('typexml_additional', nargs='*', help='path/to/Opc.Ua.Types.bsd ...')
- parser.add_argument('outfile', help='output file w/o extension')
- args = parser.parse_args()
- outname = args.outfile.split("/")[-1]
- inname = ', '.join([args.typexml_ns0.split("/")[-1]] + map(lambda x:x.split("/")[-1], args.typexml_additional))
- ################
- # Create Types #
- ################
- for builtin in builtin_types:
- types[builtin] = BuiltinType(builtin)
- with open(args.typexml_ns0) as f:
- parseTypeDefinitions("ua_types", f)
- for typexml in args.typexml_additional:
- with open(typexml) as f:
- parseTypeDefinitions(outname, f)
- typedescriptions = {}
- if args.typedescriptions:
- typedescriptions = parseTypeDescriptions(args.typedescriptions, args.namespace)
- selected_types = types.keys()
- if args.selected_types:
- with open(args.selected_types) as f:
- selected_types = filter(len, [line.strip() for line in f])
- #############################
- # Write out the Definitions #
- #############################
- fh = open(args.outfile + "_generated.h",'w')
- fe = open(args.outfile + "_generated_encoding_binary.h",'w')
- fc = open(args.outfile + "_generated.c",'w')
- def printh(string):
- print(string, end='\n', file=fh)
- def printe(string):
- print(string, end='\n', file=fe)
- def printc(string):
- print(string, end='\n', file=fc)
- printh('''/* Generated from ''' + inname + ''' with script ''' + sys.argv[0] + '''
- * on host ''' + platform.uname()[1] + ''' by user ''' + getpass.getuser() + \
- ''' at ''' + time.strftime("%Y-%m-%d %I:%M:%S") + ''' */
- #ifndef ''' + outname.upper() + '''_GENERATED_H_
- #define ''' + outname.upper() + '''_GENERATED_H_
- #ifdef __cplusplus
- extern "C" {
- #endif
- #include "ua_types.h"
- #ifdef UA_INTERNAL
- #include "ua_types_encoding_binary.h"
- #endif''' + ('\n#include "ua_types_generated.h"\n' if outname != "ua_types" else '') + '''
- /**
- * Additional Data Type Definitions
- * ================================
- */
- ''')
- printh("#define " + outname.upper() + "_COUNT %s" % (str(len(selected_types))))
- printh("extern UA_EXPORT const UA_DataType " + outname.upper() + "[" + outname.upper() + "_COUNT];")
- printc('''/* Generated from ''' + inname + ''' with script ''' + sys.argv[0] + '''
- * on host ''' + platform.uname()[1] + ''' by user ''' + getpass.getuser() + \
- ''' at ''' + time.strftime("%Y-%m-%d %I:%M:%S") + ''' */
-
- #include "stddef.h"
- #include "ua_types.h"
- #include "''' + outname + '''_generated.h"''')
- printe('''/* Generated from ''' + inname + ''' with script ''' + sys.argv[0] + '''
- * on host ''' + platform.uname()[1] + ''' by user ''' + getpass.getuser() + \
- ''' at ''' + time.strftime("%Y-%m-%d %I:%M:%S") + ''' */
-
- #include "ua_types_encoding_binary.h"
- #include "''' + outname + '''_generated.h"''')
- if sys.version_info[0] < 3:
- values = types.itervalues()
- else:
- values = types.values()
- # Datatype members
- for t in values:
- if not t.name in selected_types:
- continue
- printc("")
- printc("/* " + t.name + " */")
- printc(t.members_c())
- printc("const UA_DataType %s[%s_COUNT] = {" % (outname.upper(), outname.upper()))
- if sys.version_info[0] < 3:
- values = types.itervalues()
- else:
- values = types.values()
- i = 0
- for t in values:
- if not t.name in selected_types:
- continue
- # Header
- printh("\n/**\n * " + t.name)
- printh(" * " + "-" * len(t.name))
- if t.description == "":
- printh(" */")
- else:
- printh(" * " + t.description + " */")
- if type(t) != BuiltinType:
- printh(t.typedef_h() + "\n")
- printh("#define " + outname.upper() + "_" + t.name.upper() + " " + str(i))
- printh(t.functions_c())
- i += 1
- # Datatype
- printc("")
- printc("/* " + t.name + " */")
- printc(t.datatype_c() + ",")
- # Encoding
- printe("")
- printe("/* " + t.name + " */")
- printe(t.encoding_h())
- printh('''
- #ifdef __cplusplus
- } // extern "C"
- #endif\n
- #endif /* %s_GENERATED_H_ */''' % outname.upper())
- printc("};\n")
- fh.close()
- fc.close()
- fe.close()
|