ソースを参照

pubsub improvements

pubsub improvements

pubsub improvements
Andreas Ebner 5 年 前
コミット
af8c26f7d1

+ 1 - 1
examples/pubsub/pubsub_subscribe_standalone.c

@@ -119,7 +119,7 @@ int main(int argc, char **argv) {
 
     UA_PubSubChannel *psc =
         udpLayer.createPubSubChannel(&connectionConfig);
-    psc->regist(psc, NULL);
+    psc->regist(psc, NULL, NULL);
 
     UA_StatusCode retval = UA_STATUSCODE_GOOD;
     while(running && retval == UA_STATUSCODE_GOOD)

+ 1 - 1
examples/pubsub/tutorial_pubsub_subscribe.c

@@ -140,7 +140,7 @@ run(UA_String *transportProfile, UA_NetworkAddressUrlDataType *networkAddressUrl
     UA_PubSubConnection *connection =
         UA_PubSubConnection_findConnectionbyId(server, connectionIdent);
     if(connection != NULL) {
-        UA_StatusCode rv = connection->channel->regist(connection->channel, NULL);
+        UA_StatusCode rv = connection->channel->regist(connection->channel, NULL, NULL);
         if (rv == UA_STATUSCODE_GOOD) {
             UA_UInt64 subscriptionCallbackId;
             UA_Server_addRepeatedCallback(server, (UA_ServerCallback)subscriptionPollingCallback,

+ 6 - 2
include/ua_plugin_pubsub.h

@@ -57,8 +57,9 @@ struct UA_PubSubChannel {
     UA_StatusCode (*send)(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings,
                           const UA_ByteString *buf);
 
-    /* Register to an specified message source, e.g. multicast group or topic */
-    UA_StatusCode (*regist)(UA_PubSubChannel * channel, UA_ExtensionObject *transportSettings);
+    /* Register to an specified message source, e.g. multicast group or topic. Callback is used for mqtt. */
+    UA_StatusCode (*regist)(UA_PubSubChannel * channel, UA_ExtensionObject *transportSettings,
+        void (*callback)(UA_ByteString *encodedBuffer, UA_ByteString *topic));
 
     /* Remove subscription to an specified message source, e.g. multicast group or topic */
     UA_StatusCode (*unregist)(UA_PubSubChannel * channel, UA_ExtensionObject *transportSettings);
@@ -69,6 +70,9 @@ struct UA_PubSubChannel {
 
     /* Closing the connection and implicit free of the channel structures. */
     UA_StatusCode (*close)(UA_PubSubChannel *channel);
+
+    /* Giving the connection protocoll time to process inbound and outbound traffic. */
+    UA_StatusCode (*yield)(UA_PubSubChannel *channel, UA_UInt16 timeout);
 };
 
 /**

+ 2 - 1
plugins/networking/ua_network_pubsub_udp.c

@@ -262,7 +262,8 @@ UA_PubSubChannelUDPMC_open(const UA_PubSubConnectionConfig *connectionConfig) {
  * @return UA_STATUSCODE_GOOD on success
  */
 static UA_StatusCode
-UA_PubSubChannelUDPMC_regist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings) {
+UA_PubSubChannelUDPMC_regist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings,
+        void (*notUsedHere)(UA_ByteString *encodedBuffer, UA_ByteString *topic)) {
     if(!(channel->state == UA_PUBSUB_CHANNEL_PUB || channel->state == UA_PUBSUB_CHANNEL_RDY)){
         UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
         return UA_STATUSCODE_BADINTERNALERROR;

+ 74 - 18
src/pubsub/ua_pubsub.c

@@ -137,9 +137,19 @@ UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
     if(writerGroupIdentifier){
         UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
     }
-    UA_WriterGroupConfig tmpWriterGroupConfig;
+
     //deep copy of the config
+    UA_WriterGroupConfig tmpWriterGroupConfig;
     retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
+
+    if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
+        UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
+        tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
+        tmpWriterGroupConfig.messageSettings.content.decoded.type =
+            &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
+        tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
+    }
+
     newWriterGroup->config = tmpWriterGroupConfig;
     retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
     LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
@@ -929,9 +939,10 @@ UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *da
     }
 
     /* Sanity-test the configuration */
-    if(dataSetWriterMessageDataType->networkMessageNumber != 0 ||
-       dataSetWriterMessageDataType->dataSetOffset != 0 ||
-       dataSetWriterMessageDataType->configuredSize !=0 ) {
+    if(dataSetWriterMessageDataType &&
+       (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
+        dataSetWriterMessageDataType->dataSetOffset != 0 ||
+        dataSetWriterMessageDataType->configuredSize !=0 )) {
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
                        "Static DSM configuration not supported. Using defaults");
         dataSetWriterMessageDataType->networkMessageNumber = 0;
@@ -1060,7 +1071,7 @@ UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *da
 
 static UA_StatusCode
 sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
-                   UA_UInt16 *writerIds, UA_Byte dsmCount) {
+                   UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
    UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
 #ifdef UA_ENABLE_JSON_ENCODING
     UA_NetworkMessage nm;
@@ -1100,7 +1111,7 @@ sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
     }
 
     /* Send the prepared messages */
-    retval = connection->channel->send(connection->channel, NULL, &buf);
+    retval = connection->channel->send(connection->channel, transportSettings, &buf);
     if(msgSize > UA_MAX_STACKBUF)
         UA_ByteString_deleteMembers(&buf);
 #endif
@@ -1108,13 +1119,52 @@ sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
 }
 
 static UA_StatusCode
-sendNetworkMessage(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
-                   UA_UInt16 *writerIds, UA_Byte dsmCount) {
+sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
+                   UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
+                   UA_ExtensionObject *messageSettings,
+                   UA_ExtensionObject *transportSettings) {
+
+    if(messageSettings->content.decoded.type !=
+       &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
+        return UA_STATUSCODE_BADINTERNALERROR;
+    UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
+        messageSettings->content.decoded.data;
+
     UA_NetworkMessage nm;
     memset(&nm, 0, sizeof(UA_NetworkMessage));
+
+    nm.publisherIdEnabled =
+        !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID);
+    nm.groupHeaderEnabled =
+        !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER);
+    nm.groupHeader.writerGroupIdEnabled =
+        !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID);
+    nm.groupHeader.groupVersionEnabled =
+        !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION);
+    nm.groupHeader.networkMessageNumberEnabled =
+        !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER);
+    nm.groupHeader.sequenceNumberEnabled =
+        !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER);
+    nm.payloadHeaderEnabled =
+        !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
+    nm.timestampEnabled =
+        !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP);
+    nm.picosecondsEnabled =
+        !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS);
+    nm.dataSetClassIdEnabled =
+        !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID);
+    nm.promotedFieldsEnabled =
+        !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS);
+
     nm.version = 1;
     nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
-    nm.payloadHeaderEnabled = true;
+    if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
+        nm.publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
+        nm.publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
+    } else if (connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
+        nm.publisherIdType = UA_PUBLISHERDATATYPE_STRING;
+        nm.publisherId.publisherIdString = connection->config->publisherId.string;
+    }
 
     /* Compute the length of the dsm separately for the header */
     UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
@@ -1123,6 +1173,8 @@ sendNetworkMessage(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
 
     nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
     nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
+    nm.groupHeader.writerGroupId = wg->config.writerGroupId;
+    nm.groupHeader.networkMessageNumber = 1;
     nm.payload.dataSetPayload.sizes = dsmLengths;
     nm.payload.dataSetPayload.dataSetMessages = dsm;
 
@@ -1154,7 +1206,7 @@ sendNetworkMessage(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
     }
 
     /* Send the prepared messages */
-    retval = connection->channel->send(connection->channel, NULL, &buf);
+    retval = connection->channel->send(connection->channel, transportSettings, &buf);
     if(msgSize > UA_MAX_STACKBUF)
         UA_ByteString_deleteMembers(&buf);
     return retval;
@@ -1177,8 +1229,8 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
         return;
 
     /* Binary or Json encoding?  */
-    if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP
-            && writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
+    if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
+       writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
                        "Publish failed: Unknown encoding type.");
         return;
@@ -1232,11 +1284,13 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
          * dedicated NM as well. */
         if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
             if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
-                res = sendNetworkMessage(connection, &dsmStore[dsmCount],
-                                         &dsw->config.dataSetWriterId, 1);
+                res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
+                                         &dsw->config.dataSetWriterId, 1,
+                                         &writerGroup->config.messageSettings,
+                                         &writerGroup->config.transportSettings);
             }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
                 res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
-                                         &dsw->config.dataSetWriterId, 1);
+                        &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
             }
 
             if(res != UA_STATUSCODE_GOOD)
@@ -1259,11 +1313,13 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
 
         UA_StatusCode res3 = UA_STATUSCODE_GOOD;
         if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
-            res3 = sendNetworkMessage(connection, &dsmStore[i * maxDSM],
-                                                            &dsWriterIds[i * maxDSM], nmDsmCount);
+            res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
+                                      &dsWriterIds[i * maxDSM], nmDsmCount,
+                                      &writerGroup->config.messageSettings,
+                                      &writerGroup->config.transportSettings);
         }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
             res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
-                                                            &dsWriterIds[i * maxDSM], nmDsmCount);
+                    &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
         }
 
         if(res3 != UA_STATUSCODE_GOOD)

+ 1 - 1
src/pubsub/ua_pubsub_manager.c

@@ -233,7 +233,7 @@ UA_Server_removePublishedDataSet(UA_Server *server, const UA_NodeId pds) {
         LIST_FOREACH(writerGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
             UA_DataSetWriter *currentWriter, *tmpWriterGroup;
             LIST_FOREACH_SAFE(currentWriter, &writerGroup->writers, listEntry, tmpWriterGroup){
-                if(UA_NodeId_equal(&currentWriter->identifier, &publishedDataSet->identifier)){
+                if(UA_NodeId_equal(&currentWriter->connectedDataSet, &publishedDataSet->identifier)){
                     UA_Server_removeDataSetWriter(server, currentWriter->identifier);
                 }
             }

+ 268 - 20
src/pubsub/ua_pubsub_ns0.c

@@ -36,6 +36,12 @@ static UA_StatusCode removeGroupAction(UA_Server *server,
                                           const UA_NodeId *objectId, void *objectContext,
                                           size_t inputSize, const UA_Variant *input,
                                           size_t outputSize, UA_Variant *output);
+static UA_StatusCode addDataSetWriterAction(UA_Server *server,
+                                          const UA_NodeId *sessionId, void *sessionHandle,
+                                          const UA_NodeId *methodId, void *methodContext,
+                                          const UA_NodeId *objectId, void *objectContext,
+                                          size_t inputSize, const UA_Variant *input,
+                                          size_t outputSize, UA_Variant *output);
 
 #endif
 
@@ -98,6 +104,10 @@ onRead(UA_Server *server, const UA_NodeId *sessionId, void *sessionContext,
     UA_NodeId myNodeId;
 	UA_WriterGroup *writerGroup = NULL;
     UA_PubSubConnection *pubSubConnection = NULL;
+    UA_PublishedDataSet *publishedDataSet = NULL;
+    UA_PublishedVariableDataType *pvd = NULL;
+    size_t counter = 0;
+
     switch(((UA_NodePropertyContext *) nodeContext)->parentCalssifier){
         case UA_NS0ID_PUBSUBCONNECTIONTYPE:
             myNodeId = ((UA_NodePropertyContext *) nodeContext)->parentNodeId;
@@ -129,6 +139,30 @@ onRead(UA_Server *server, const UA_NodeId *sessionId, void *sessionContext,
                                    "Read error! Unknown property.");
             }
             break;
+        case UA_NS0ID_PUBLISHEDDATAITEMSTYPE:
+            myNodeId = ((UA_NodePropertyContext *) nodeContext)->parentNodeId;
+            publishedDataSet = UA_PublishedDataSet_findPDSbyId(server, myNodeId);
+            if(!publishedDataSet)
+                return;
+            switch(((UA_NodePropertyContext *) nodeContext)->elementClassiefier) {
+                case UA_NS0ID_PUBLISHEDDATAITEMSTYPE_PUBLISHEDDATA:;
+                    pvd = (UA_PublishedVariableDataType *)
+                            UA_calloc(publishedDataSet->fieldSize, sizeof(UA_PublishedVariableDataType));
+                    UA_DataSetField *field;
+                    LIST_FOREACH(field, &publishedDataSet->fields, listEntry) {
+                        pvd[counter].attributeId = UA_ATTRIBUTEID_VALUE;
+                        pvd[counter].publishedVariable = field->config.field.variable.publishParameters.publishedVariable;
+                        //UA_NodeId_copy(&field->config.field.variable.publishParameters.publishedVariable, &pvd[counter].publishedVariable);
+                        counter++;
+                    }
+                    UA_Variant_setArray(&value, pvd, publishedDataSet->fieldSize,
+                                        &UA_TYPES[UA_TYPES_PUBLISHEDVARIABLEDATATYPE]);
+                    break;
+                default:
+                    UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                                   "Read error! Unknown property.");
+            }
+            break;
         default:
             UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
                            "Read error! Unknown parent element.");
@@ -266,37 +300,45 @@ addPubSubConnectionAction(UA_Server *server,
                           const UA_NodeId *objectId, void *objectContext,
                           size_t inputSize, const UA_Variant *input,
                           size_t outputSize, UA_Variant *output){
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
     UA_PubSubConnectionDataType pubSubConnectionDataType = *((UA_PubSubConnectionDataType *) input[0].data);
     UA_NetworkAddressUrlDataType networkAddressUrlDataType;
     memset(&networkAddressUrlDataType, 0, sizeof(networkAddressUrlDataType));
     UA_ExtensionObject eo = pubSubConnectionDataType.address;
-    if(eo.encoding == UA_EXTENSIONOBJECT_ENCODED_BYTESTRING){
-        size_t offset = 0;
-        UA_NetworkAddressUrlDataType_decodeBinary(&eo.content.encoded.body, &offset, &networkAddressUrlDataType);
-        if(networkAddressUrlDataType.url.length > 512)
-            return UA_STATUSCODE_BADOUTOFMEMORY;
-        UA_STACKARRAY(char, buffer, sizeof(char) * networkAddressUrlDataType.url.length +1);
-        memcpy(buffer, networkAddressUrlDataType.url.data, networkAddressUrlDataType.url.length);
-        buffer[networkAddressUrlDataType.url.length] = '\0';
-        printf("%s\n", buffer);
+    if(eo.encoding == UA_EXTENSIONOBJECT_DECODED){
+        if(eo.content.decoded.type == &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]){
+            if(UA_NetworkAddressUrlDataType_copy((UA_NetworkAddressUrlDataType *) eo.content.decoded.data,
+                                                 &networkAddressUrlDataType) != UA_STATUSCODE_GOOD){
+                return UA_STATUSCODE_BADOUTOFMEMORY;
+            };
+        }
     }
-
     UA_PubSubConnectionConfig connectionConfig;
     memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
-    connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
+    connectionConfig.transportProfileUri = pubSubConnectionDataType.transportProfileUri;
     connectionConfig.name = pubSubConnectionDataType.name;
+    //TODO set real connection state
+    connectionConfig.enabled = pubSubConnectionDataType.enabled;
+    //connectionConfig.enabled = pubSubConnectionDataType.enabled;
     UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrlDataType,
                          &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
     if(pubSubConnectionDataType.publisherId.type == &UA_TYPES[UA_TYPES_UINT32]){
         connectionConfig.publisherId.numeric = * ((UA_UInt32 *) pubSubConnectionDataType.publisherId.data);
-    } else {
+    } else if (pubSubConnectionDataType.publisherId.type == &UA_TYPES[UA_TYPES_STRING]){
         connectionConfig.publisherIdType = UA_PUBSUB_PUBLISHERID_STRING;
-        connectionConfig.publisherId.string = * ((UA_String *) pubSubConnectionDataType.publisherId.data);
+        UA_String_copy((UA_String *) pubSubConnectionDataType.publisherId.data, &connectionConfig.publisherId.string);
+    } else {
+        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER, "Unsupported PublisherId Type used.");
+        //TODO what's the best default behaviour here?
+        connectionConfig.publisherId.numeric = 0;
     }
     //call API function and create the connection
     UA_NodeId connectionId;
-    if(UA_Server_addPubSubConnection(server, &connectionConfig, &connectionId) != UA_STATUSCODE_GOOD){
-        //error handling
+
+    retVal |= UA_Server_addPubSubConnection(server, &connectionConfig, &connectionId);
+
+    if(retVal != UA_STATUSCODE_GOOD){
+        return retVal;
     };
     for(size_t i = 0; i < pubSubConnectionDataType.writerGroupsSize; i++){
         //UA_PubSubConnection_addWriterGroup(server, UA_NODEID_NULL, NULL, NULL);
@@ -428,7 +470,7 @@ addPublishedDataItemsRepresentation(UA_Server *server, UA_PublishedDataSet *publ
     retVal |= addPubSubObjectNode(server, pdsName, publishedDataSet->identifier.identifier.numeric, UA_NS0ID_PUBLISHSUBSCRIBE_PUBLISHEDDATASETS,
                             UA_NS0ID_HASPROPERTY, UA_NS0ID_PUBLISHEDDATAITEMSTYPE);
     //End lock zone
-    UA_NodeId configurationVersionNode;
+    UA_NodeId configurationVersionNode, publishedDataNode;
     configurationVersionNode = findSingleChildNode(server, UA_QUALIFIEDNAME(0, "ConfigurationVersion"),
                                                    UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
                                                    UA_NODEID_NUMERIC(0, publishedDataSet->identifier.identifier.numeric));
@@ -436,6 +478,28 @@ addPublishedDataItemsRepresentation(UA_Server *server, UA_PublishedDataSet *publ
     UA_Variant_init(&value);
     UA_Variant_setScalar(&value, &publishedDataSet->dataSetMetaData.configurationVersion, &UA_TYPES[UA_TYPES_CONFIGURATIONVERSIONDATATYPE]);
     UA_Server_writeValue(server, configurationVersionNode, value);
+
+    publishedDataNode = findSingleChildNode(server, UA_QUALIFIEDNAME(0, "PublishedData"),
+                                            UA_NODEID_NUMERIC(0, UA_NS0ID_HASPROPERTY),
+                                            UA_NODEID_NUMERIC(0, publishedDataSet->identifier.identifier.numeric));
+
+    UA_NodePropertyContext * publishingIntervalContext = (UA_NodePropertyContext *) UA_malloc(sizeof(UA_NodePropertyContext));
+    publishingIntervalContext->parentNodeId = publishedDataSet->identifier;
+    publishingIntervalContext->parentCalssifier = UA_NS0ID_PUBLISHEDDATAITEMSTYPE;
+    publishingIntervalContext->elementClassiefier = UA_NS0ID_PUBLISHEDDATAITEMSTYPE_PUBLISHEDDATA;
+    UA_ValueCallback valueCallback;
+    valueCallback.onRead = onRead;
+    valueCallback.onWrite = NULL;
+    retVal |= addVariableValueSource(server, valueCallback, publishedDataNode, publishingIntervalContext);
+
+#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL_METHODS
+    retVal |= UA_Server_addReference(server, publishedDataSet->identifier,
+                                     UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
+                                     UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_PUBLISHEDDATAITEMSTYPE_ADDVARIABLES), true);
+    retVal |= UA_Server_addReference(server, publishedDataSet->identifier,
+                                     UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
+                                     UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_PUBLISHEDDATAITEMSTYPE_REMOVEVARIABLES), true);
+#endif
     return retVal;
 }
 
@@ -453,7 +517,7 @@ addPublishedDataItemsAction(UA_Server *server,
     size_t fieldFlagsSize = input[2].arrayLength;
     UA_DataSetFieldFlags * fieldFlags = (UA_DataSetFieldFlags *) input[2].data;
     size_t variablesToAddSize = input[3].arrayLength;
-    UA_PublishedVariableDataType *variablesToAdd = (UA_PublishedVariableDataType *) input[3].data;
+    UA_PublishedVariableDataType *variablesToAddField = (UA_PublishedVariableDataType *) input[3].data;
 
     if(!(fieldNameAliasesSize == fieldFlagsSize || fieldFlagsSize == variablesToAddSize))
         return UA_STATUSCODE_BADINVALIDARGUMENT;
@@ -474,17 +538,46 @@ addPublishedDataItemsAction(UA_Server *server,
         if(fieldFlags[j] == UA_DATASETFIELDFLAGS_PROMOTEDFIELD){
             dataSetFieldConfig.field.variable.promotedField = UA_TRUE;
         }
-        dataSetFieldConfig.field.variable.publishParameters = variablesToAdd[j];
+        dataSetFieldConfig.field.variable.publishParameters = variablesToAddField[j];
         UA_Server_addDataSetField(server, dataSetItemsNodeId, &dataSetFieldConfig, NULL);
     }
+    UA_PublishedVariableDataType_clear(variablesToAddField);
     return retVal;
 }
 #endif
 
+#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL_METHODS
+static UA_StatusCode
+addVariablesAction(UA_Server *server,
+                   const UA_NodeId *sessionId, void *sessionHandle,
+                   const UA_NodeId *methodId, void *methodContext,
+                   const UA_NodeId *objectId, void *objectContext,
+                   size_t inputSize, const UA_Variant *input,
+                   size_t outputSize, UA_Variant *output){
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+
+    return retVal;
+}
+
+static UA_StatusCode
+removeVariablesAction(UA_Server *server,
+                      const UA_NodeId *sessionId, void *sessionHandle,
+                      const UA_NodeId *methodId, void *methodContext,
+                      const UA_NodeId *objectId, void *objectContext,
+                      size_t inputSize, const UA_Variant *input,
+                      size_t outputSize, UA_Variant *output){
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+
+    return retVal;
+}
+#endif
+
+
 UA_StatusCode
 removePublishedDataSetRepresentation(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
     UA_StatusCode retVal = UA_STATUSCODE_GOOD;
     retVal |= UA_Server_deleteNode(server, publishedDataSet->identifier, false);
+
     return retVal;
 }
 
@@ -506,6 +599,51 @@ removePublishedDataSetAction(UA_Server *server,
 /**********************************************/
 /*               WriterGroup                  */
 /**********************************************/
+
+static UA_StatusCode
+readContentMask(UA_Server *server, const UA_NodeId *sessionId,
+                void *sessionContext, const UA_NodeId *nodeId,
+                void *nodeContext, UA_Boolean includeSourceTimeStamp,
+                const UA_NumericRange *range, UA_DataValue *value) {
+    UA_WriterGroup *writerGroup = (UA_WriterGroup*)nodeContext;
+    if((writerGroup->config.messageSettings.encoding != UA_EXTENSIONOBJECT_DECODED &&
+        writerGroup->config.messageSettings.encoding != UA_EXTENSIONOBJECT_DECODED_NODELETE) ||
+       writerGroup->config.messageSettings.content.decoded.type !=
+       &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
+        return UA_STATUSCODE_BADINTERNALERROR;
+    UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
+        writerGroup->config.messageSettings.content.decoded.data;
+
+    UA_Variant_setScalarCopy(&value->value, &wgm->networkMessageContentMask,
+                             &UA_TYPES[UA_TYPES_UADPNETWORKMESSAGECONTENTMASK]);
+    value->hasValue = true;
+    return UA_STATUSCODE_GOOD;
+}
+
+static UA_StatusCode
+writeContentMask(UA_Server *server, const UA_NodeId *sessionId,
+                 void *sessionContext, const UA_NodeId *nodeId,
+                 void *nodeContext, const UA_NumericRange *range,
+                 const UA_DataValue *value) {
+    UA_WriterGroup *writerGroup = (UA_WriterGroup*)nodeContext;
+    if((writerGroup->config.messageSettings.encoding != UA_EXTENSIONOBJECT_DECODED &&
+        writerGroup->config.messageSettings.encoding != UA_EXTENSIONOBJECT_DECODED_NODELETE) ||
+       writerGroup->config.messageSettings.content.decoded.type !=
+       &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
+        return UA_STATUSCODE_BADINTERNALERROR;
+    UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
+        writerGroup->config.messageSettings.content.decoded.data;
+
+    if(!value->value.type)
+        return UA_STATUSCODE_BADTYPEMISMATCH;
+    if(value->value.type->typeKind != UA_DATATYPEKIND_ENUM &&
+       value->value.type->typeKind != UA_DATATYPEKIND_INT32)
+        return UA_STATUSCODE_BADTYPEMISMATCH;
+
+    wgm->networkMessageContentMask = *(UA_UadpNetworkMessageContentMask*)value->value.data;
+    return UA_STATUSCODE_GOOD;
+}
+
 UA_StatusCode
 addWriterGroupRepresentation(UA_Server *server, UA_WriterGroup *writerGroup){
     UA_StatusCode retVal = UA_STATUSCODE_GOOD;
@@ -552,6 +690,32 @@ addWriterGroupRepresentation(UA_Server *server, UA_WriterGroup *writerGroup){
     UA_Server_writeValue(server, priorityNode, value);
     UA_Variant_setScalar(&value, &writerGroup->config.writerGroupId, &UA_TYPES[UA_TYPES_UINT16]);
     UA_Server_writeValue(server, writerGroupIdNode, value);
+
+    retVal |= addPubSubObjectNode(server, "MessageSettings", 0,
+                                  writerGroup->identifier.identifier.numeric,
+                                  UA_NS0ID_HASCOMPONENT, UA_NS0ID_UADPWRITERGROUPMESSAGETYPE);
+
+    /* Find the variable with the content mask */
+
+    UA_NodeId messageSettingsId = findSingleChildNode(server, UA_QUALIFIEDNAME(0, "MessageSettings"),
+                                                       UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
+                                                       UA_NODEID_NUMERIC(0, writerGroup->identifier.identifier.numeric));
+    UA_NodeId contentMaskId = findSingleChildNode(server,
+                                                  UA_QUALIFIEDNAME(0, "NetworkMessageContentMask"),
+                                                  UA_NODEID_NUMERIC(0, UA_NS0ID_HASPROPERTY),
+                                                  messageSettingsId);
+
+    /* Set the callback */
+    UA_DataSource ds;
+    ds.read = readContentMask;
+    ds.write = writeContentMask;
+    UA_Server_setVariableNode_dataSource(server, contentMaskId, ds);
+    UA_Server_setNodeContext(server, contentMaskId, writerGroup);
+
+    /* Make writable */
+    UA_Server_writeAccessLevel(server, contentMaskId, UA_ACCESSLEVELMASK_WRITE | UA_ACCESSLEVELMASK_READ);
+    //                           UA_ACCESSLEVELTYPE_CURRENTREAD | UA_ACCESSLEVELTYPE_CURRENTWRITE);
+
     return retVal;
 }
 
@@ -573,6 +737,8 @@ addWriterGroupAction(UA_Server *server,
     writerGroupConfig.writerGroupId = writerGroupDataType->writerGroupId;
     writerGroupConfig.enabled = writerGroupDataType->enabled;
     writerGroupConfig.priority = writerGroupDataType->priority;
+    //TODO remove hard coded UADP
+    writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
     //ToDo transfer all arguments to internal WGConfiguration
     retVal |= UA_Server_addWriterGroup(server, *objectId, &writerGroupConfig, &generatedId);
     UA_Variant_setScalarCopy(output, &generatedId, &UA_TYPES[UA_TYPES_NODEID]);
@@ -618,15 +784,57 @@ addDataSetWriterRepresentation(UA_Server *server, UA_DataSetWriter *dataSetWrite
     dswName[dataSetWriter->config.name.length] = '\0';
     //This code block must use a lock
     UA_Nodestore_remove(server, &dataSetWriter->identifier);
-    retVal |= addPubSubObjectNode(server, dswName, dataSetWriter->identifier.identifier.numeric, dataSetWriter->linkedWriterGroup.identifier.numeric,
-                            UA_NS0ID_HASDATASETWRITER, UA_NS0ID_DATASETWRITERTYPE);
+    retVal |= addPubSubObjectNode(server, dswName, dataSetWriter->identifier.identifier.numeric,
+                                  dataSetWriter->linkedWriterGroup.identifier.numeric,
+                                  UA_NS0ID_HASDATASETWRITER, UA_NS0ID_DATASETWRITERTYPE);
     //End lock zone
     retVal |= UA_Server_addReference(server, dataSetWriter->connectedDataSet,
                                      UA_NODEID_NUMERIC(0, UA_NS0ID_DATASETTOWRITER),
                                      UA_EXPANDEDNODEID_NUMERIC(0, dataSetWriter->identifier.identifier.numeric), true);
+
+
+    retVal |= addPubSubObjectNode(server, "MessageSettings", 0,
+                                  dataSetWriter->identifier.identifier.numeric,
+                                  UA_NS0ID_HASCOMPONENT, UA_NS0ID_UADPDATASETWRITERMESSAGETYPE);
     return retVal;
 }
 
+#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL_METHODS
+static UA_StatusCode
+addDataSetWriterAction(UA_Server *server,
+                       const UA_NodeId *sessionId, void *sessionHandle,
+                       const UA_NodeId *methodId, void *methodContext,
+                       const UA_NodeId *objectId, void *objectContext,
+                       size_t inputSize, const UA_Variant *input,
+                       size_t outputSize, UA_Variant *output){
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+    UA_DataSetWriterDataType *dataSetWriterDataType = (UA_DataSetWriterDataType *) input[0].data;
+
+    UA_NodeId targetPDS = UA_NODEID_NULL;
+    for (size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; ++i) {
+        if(UA_String_equal(&dataSetWriterDataType->dataSetName, &server->pubSubManager.publishedDataSets[i].config.name)){
+            targetPDS = server->pubSubManager.publishedDataSets[i].identifier;
+        }
+    }
+    if(UA_NodeId_isNull(&targetPDS)){
+        return UA_STATUSCODE_BADPARENTNODEIDINVALID;
+    }
+
+    UA_NodeId generatedId;
+    UA_DataSetWriterConfig dataSetWriterConfig;
+    memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
+    dataSetWriterConfig.name = dataSetWriterDataType->name;
+    dataSetWriterConfig.dataSetName = dataSetWriterDataType->dataSetName;
+    dataSetWriterConfig.keyFrameCount =  dataSetWriterDataType->keyFrameCount;
+    dataSetWriterConfig.dataSetWriterId = dataSetWriterDataType->dataSetWriterId;
+
+    UA_Server_addDataSetWriter(server, *objectId, targetPDS, &dataSetWriterConfig, &generatedId);
+    UA_Variant_setScalarCopy(output, &generatedId, &UA_TYPES[UA_TYPES_NODEID]);
+    return retVal;
+}
+#endif
+
+
 UA_StatusCode
 removeDataSetWriterRepresentation(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
     UA_StatusCode retVal = UA_STATUSCODE_GOOD;
@@ -634,6 +842,21 @@ removeDataSetWriterRepresentation(UA_Server *server, UA_DataSetWriter *dataSetWr
     return retVal;
 }
 
+#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL_METHODS
+static UA_StatusCode
+removeDataSetWriterAction(UA_Server *server,
+                          const UA_NodeId *sessionId, void *sessionHandle,
+                          const UA_NodeId *methodId, void *methodContext,
+                          const UA_NodeId *objectId, void *objectContext,
+                          size_t inputSize, const UA_Variant *input,
+                          size_t outputSize, UA_Variant *output){
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+    UA_NodeId nodeToRemove = *((UA_NodeId *) input[0].data);
+    retVal |= UA_Server_removeDataSetWriter(server, nodeToRemove);
+    return retVal;
+}
+#endif
+
 /**********************************************/
 /*                Destructors                 */
 /**********************************************/
@@ -679,6 +902,22 @@ dataSetWriterTypeDestructor(UA_Server *server,
     UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "DataSetWriter destructor called!");
 }
 
+static void
+publishedDataItemsTypeDestructor(UA_Server *server,
+                            const UA_NodeId *sessionId, void *sessionContext,
+                            const UA_NodeId *typeId, void *typeContext,
+                            const UA_NodeId *nodeId, void **nodeContext) {
+    UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "PublishedDataItems destructor called!");
+    UA_NodeId intervalNode;
+    UA_NodePropertyContext *internalConnectionContext;
+    intervalNode = findSingleChildNode(server, UA_QUALIFIEDNAME(0, "PublishedData"),
+                                       UA_NODEID_NUMERIC(0, UA_NS0ID_HASPROPERTY), *nodeId);
+    UA_Server_getNodeContext(server, intervalNode, (void **) &internalConnectionContext);
+    if(!UA_NodeId_equal(&UA_NODEID_NULL , &intervalNode)){
+        UA_free(internalConnectionContext);
+    }
+}
+
 UA_StatusCode
 UA_Server_initPubSubNS0(UA_Server *server) {
     UA_StatusCode retVal = UA_STATUSCODE_GOOD;
@@ -714,8 +953,15 @@ UA_Server_initPubSubNS0(UA_Server *server) {
                                                UA_NODEID_NUMERIC(0, UA_NS0ID_DATASETFOLDERTYPE_ADDPUBLISHEDDATAITEMS), addPublishedDataItemsAction);
     retVal |= UA_Server_setMethodNode_callback(server,
                                                UA_NODEID_NUMERIC(0, UA_NS0ID_DATASETFOLDERTYPE_REMOVEPUBLISHEDDATASET), removePublishedDataSetAction);
+    retVal |= UA_Server_setMethodNode_callback(server,
+                                               UA_NODEID_NUMERIC(0, UA_NS0ID_PUBLISHEDDATAITEMSTYPE_ADDVARIABLES), addVariablesAction);
+    retVal |= UA_Server_setMethodNode_callback(server,
+                                               UA_NODEID_NUMERIC(0, UA_NS0ID_PUBLISHEDDATAITEMSTYPE_REMOVEVARIABLES), removeVariablesAction);
     retVal |= UA_Server_setMethodNode_callback(server, UA_NODEID_NUMERIC(0, UA_NS0ID_PUBSUBCONNECTIONTYPE_ADDWRITERGROUP), addWriterGroupAction);
     retVal |= UA_Server_setMethodNode_callback(server, UA_NODEID_NUMERIC(0, UA_NS0ID_PUBSUBCONNECTIONTYPE_REMOVEGROUP), removeGroupAction);
+    retVal |= UA_Server_setMethodNode_callback(server, UA_NODEID_NUMERIC(0, UA_NS0ID_WRITERGROUPTYPE_ADDDATASETWRITER), addDataSetWriterAction);
+    retVal |= UA_Server_setMethodNode_callback(server, UA_NODEID_NUMERIC(0, UA_NS0ID_WRITERGROUPTYPE_REMOVEDATASETWRITER), removeDataSetWriterAction);
+
 #else
     retVal |= UA_Server_deleteReference(server, UA_NODEID_NUMERIC(0, UA_NS0ID_PUBLISHSUBSCRIBE), UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), true,
                                         UA_EXPANDEDNODEID_NUMERIC(0, UA_NS0ID_PUBLISHSUBSCRIBE_ADDCONNECTION),
@@ -732,6 +978,8 @@ UA_Server_initPubSubNS0(UA_Server *server) {
     UA_Server_setNodeTypeLifecycle(server, UA_NODEID_NUMERIC(0, UA_NS0ID_WRITERGROUPTYPE), liveCycle);
     liveCycle.destructor = dataSetWriterTypeDestructor;
     UA_Server_setNodeTypeLifecycle(server, UA_NODEID_NUMERIC(0, UA_NS0ID_DATASETWRITERDATATYPE), liveCycle);
+    liveCycle.destructor = publishedDataItemsTypeDestructor;
+    UA_Server_setNodeTypeLifecycle(server, UA_NODEID_NUMERIC(0, UA_NS0ID_PUBLISHEDDATAITEMSTYPE), liveCycle);
 
     return retVal;
 }

+ 2 - 1
src/ua_util.c

@@ -49,7 +49,8 @@ UA_parseEndpointUrl(const UA_String *endpointUrl, UA_String *outHostname,
         return UA_STATUSCODE_BADTCPENDPOINTURLINVALID;
     } else if (strncmp((char*)endpointUrl->data, "opc.tcp://", 10) != 0) {
 #ifdef UA_ENABLE_PUBSUB
-        if (strncmp((char*)endpointUrl->data, "opc.udp://", 10) != 0) {
+        if (strncmp((char*)endpointUrl->data, "opc.udp://", 10) != 0 &&
+                strncmp((char*)endpointUrl->data, "opc.mqtt://", 11) != 0) {
             return UA_STATUSCODE_BADTCPENDPOINTURLINVALID;
         }
 #else

+ 7 - 5
tests/pubsub/check_pubsub_informationmodel_methods.c

@@ -94,13 +94,15 @@ static UA_NodeId addPubSubConnection(void){
     pubSubConnection.publisherId = publisherId;
     pubSubConnection.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
 
+
     UA_ExtensionObject eo;
-    eo.encoding = UA_EXTENSIONOBJECT_ENCODED_BYTESTRING;
     UA_NetworkAddressUrlDataType networkAddressDataType = {UA_STRING("eth0"), UA_STRING("opc.udp://224.0.0.22:4840/")};
-    UA_ByteString_allocBuffer(&eo.content.encoded.body, UA_NetworkAddressUrlDataType_calcSizeBinary(&networkAddressDataType));
-    UA_Byte *bufPos = eo.content.encoded.body.data;
-    UA_NetworkAddressUrlDataType_encodeBinary(&networkAddressDataType, &bufPos, &(eo.content.encoded.body.data[eo.content.encoded.body.length]));
-    eo.content.encoded.typeId = UA_NODEID_NUMERIC(0, UA_TYPES_NETWORKADDRESSURLDATATYPE);
+    UA_NetworkAddressUrlDataType* identityToken = UA_NetworkAddressUrlDataType_new();
+    UA_NetworkAddressUrlDataType_init(identityToken);
+    UA_NetworkAddressUrlDataType_copy(&networkAddressDataType, identityToken);
+    eo.encoding = UA_EXTENSIONOBJECT_DECODED;
+    eo.content.decoded.type = &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE];
+    eo.content.decoded.data = identityToken;
     pubSubConnection.address = eo;
     pubSubConnection.connectionPropertiesSize = 2;
     UA_KeyValuePair connectionOptions[2];

+ 66 - 1
tools/schema/Opc.Ua.NodeSet2.PubSubMinimal.xml

@@ -1239,17 +1239,82 @@
       <Reference ReferenceType="HasProperty">i=17739</Reference> <!--Priority-->
       <Reference ReferenceType="HasProperty">i=17740</Reference> <!--LocaleIds-->
       <Reference ReferenceType="i=15296">i=17743</Reference> <!--DataSetWriterName-->
+      <Reference ReferenceType="HasComponent">i=17969</Reference> <!--AddDataSetWriter-->
 
 
       <!--
             <Reference ReferenceType="HasComponent">i=17741</Reference> TransportSettings
             <Reference ReferenceType="HasComponent">i=17742</Reference> MessageSettings
             <Reference ReferenceType="HasComponent">i=17812</Reference> Diagnostics
-            <Reference ReferenceType="HasComponent">i=17969</Reference> AddDataSetWriter
             <Reference ReferenceType="HasComponent">i=17992</Reference> RemoveDataSetWriter-->
       <Reference ReferenceType="HasSubtype" IsForward="false">i=14232</Reference>  <!--PubSubGroupType-->
     </References>
   </UAObjectType>
+  <UAMethod NodeId="i=17969" BrowseName="AddDataSetWriter" ParentNodeId="i=17725">
+    <DisplayName>AddDataSetWriter</DisplayName>
+    <References>
+      <Reference ReferenceType="HasProperty">i=17976</Reference>
+      <Reference ReferenceType="HasProperty">i=17987</Reference>
+      <Reference ReferenceType="HasModellingRule">i=80</Reference>
+      <Reference ReferenceType="HasComponent" IsForward="false">i=17725</Reference>
+    </References>
+  </UAMethod>
+  <UAVariable NodeId="i=17976" BrowseName="InputArguments" ParentNodeId="i=17969" DataType="i=296" ValueRank="1">
+    <DisplayName>InputArguments</DisplayName>
+    <References>
+      <Reference ReferenceType="HasTypeDefinition">i=68</Reference>
+      <Reference ReferenceType="HasModellingRule">i=78</Reference>
+      <Reference ReferenceType="HasProperty" IsForward="false">i=17969</Reference>
+    </References>
+    <Value>
+      <ListOfExtensionObject xmlns="http://opcfoundation.org/UA/2008/02/Types.xsd">
+        <ExtensionObject>
+          <TypeId>
+            <Identifier>i=297</Identifier>
+          </TypeId>
+          <Body>
+            <Argument>
+              <Name>Configuration</Name>
+              <DataType>
+                <Identifier>i=15597</Identifier>
+              </DataType>
+              <ValueRank>-1</ValueRank>
+              <ArrayDimensions />
+              <Description p5:nil="true" xmlns:p5="http://www.w3.org/2001/XMLSchema-instance" />
+            </Argument>
+          </Body>
+        </ExtensionObject>
+      </ListOfExtensionObject>
+    </Value>
+  </UAVariable>
+  <UAVariable NodeId="i=17987" BrowseName="OutputArguments" ParentNodeId="i=17969" DataType="i=296" ValueRank="1">
+    <DisplayName>OutputArguments</DisplayName>
+    <References>
+      <Reference ReferenceType="HasTypeDefinition">i=68</Reference>
+      <Reference ReferenceType="HasModellingRule">i=78</Reference>
+      <Reference ReferenceType="HasProperty" IsForward="false">i=17969</Reference>
+    </References>
+    <Value>
+      <ListOfExtensionObject xmlns="http://opcfoundation.org/UA/2008/02/Types.xsd">
+        <ExtensionObject>
+          <TypeId>
+            <Identifier>i=297</Identifier>
+          </TypeId>
+          <Body>
+            <Argument>
+              <Name>DataSetWriterNodeId</Name>
+              <DataType>
+                <Identifier>i=17</Identifier>
+              </DataType>
+              <ValueRank>-1</ValueRank>
+              <ArrayDimensions />
+              <Description p5:nil="true" xmlns:p5="http://www.w3.org/2001/XMLSchema-instance" />
+            </Argument>
+          </Body>
+        </ExtensionObject>
+      </ListOfExtensionObject>
+    </Value>
+  </UAVariable>
   <UAVariable NodeId="i=17736" BrowseName="WriterGroupId" ParentNodeId="i=17725" DataType="UInt16">
     <DisplayName>WriterGroupId</DisplayName>
     <References>

+ 5 - 0
tools/schema/datatypes_pubsub.txt

@@ -28,3 +28,8 @@ PubSubState
 JsonDataSetWriterMessageDataType
 JsonDataSetMessageContentMask
 JsonNetworkMessageContentMask
+BrokerTransportQualityOfService
+BrokerConnectionTransportDataType
+BrokerWriterGroupTransportDataType
+BrokerDataSetWriterTransportDataType
+BrokerWriterGroupTransportType