Browse Source

feat(pubsub) extended pubsub fastpath with fixed offsets and static value source, memory fixes, added support for sequence number, added an direct pointer from DSW to the PDS to avoid lookup -> PTR will be updated during each PDS add/remove

Andreas Ebner 5 years ago
parent
commit
4b52b9e49b

+ 2 - 2
CMakeLists.txt

@@ -205,7 +205,7 @@ endif()
 option(UA_ENABLE_EXPERIMENTAL_HISTORIZING "Enable client experimental historical access features" OFF)
 option(UA_ENABLE_EXPERIMENTAL_HISTORIZING "Enable client experimental historical access features" OFF)
 mark_as_advanced(UA_ENABLE_EXPERIMENTAL_HISTORIZING)
 mark_as_advanced(UA_ENABLE_EXPERIMENTAL_HISTORIZING)
 
 
-option(UA_ENABLE_PUBSUB "Enable publish/subscribe" ON)
+option(UA_ENABLE_PUBSUB "Enable publish/subscribe" OFF)
 mark_as_advanced(UA_ENABLE_PUBSUB)
 mark_as_advanced(UA_ENABLE_PUBSUB)
 
 
 option(UA_ENABLE_PUBSUB_ETH_UADP "Enable publish/subscribe UADP over Ethernet" OFF)
 option(UA_ENABLE_PUBSUB_ETH_UADP "Enable publish/subscribe UADP over Ethernet" OFF)
@@ -333,7 +333,7 @@ if(CMAKE_VERSION VERSION_GREATER 3.6)
 endif()
 endif()
 
 
 # Build Targets
 # Build Targets
-option(UA_BUILD_EXAMPLES "Build example servers and clients" ON)
+option(UA_BUILD_EXAMPLES "Build example servers and clients" OFF)
 option(UA_BUILD_TOOLS "Build OPC UA shell tools" OFF)
 option(UA_BUILD_TOOLS "Build OPC UA shell tools" OFF)
 option(UA_BUILD_UNIT_TESTS "Build the unit tests" OFF)
 option(UA_BUILD_UNIT_TESTS "Build the unit tests" OFF)
 option(UA_BUILD_FUZZING "Build the fuzzing executables" OFF)
 option(UA_BUILD_FUZZING "Build the fuzzing executables" OFF)

+ 49 - 32
examples/pubsub/server_pubsub_publisher_rt_level.c

@@ -10,7 +10,24 @@
 #include <stdlib.h>
 #include <stdlib.h>
 #include <open62541/server_pubsub.h>
 #include <open62541/server_pubsub.h>
 
 
+#define PUBSUB_CONFIG_PUBLISH_CYCLE_MS 100
+#define PUBSUB_CONFIG_PUBLISH_CYCLES 100
+/* possible values: PUBSUB_CONFIG_FASTPATH_NONE (WIP not implemented), PUBSUB_CONFIG_FASTPATH_FIXED_OFFSETS, PUBSUB_CONFIG_FASTPATH_STATIC_VALUES (WIP  not implemented)*/
+#define PUBSUB_CONFIG_FASTPATH_FIXED_OFFSETS
+#define PUBSUB_CONFIG_FIELD_COUNT 10
+
+/**
+ * The PubSub RT level example points out the configuration of different PubSub RT levels. These levels will be later
+ * used for deterministic message generation. The main target is to reduce the time spread and effort during the publish cycle.
+ * Most of the RT levels are based on a pre-generated and buffered DataSetMesseges and
+ * NetworkMessages. Since changes in the PubSub-configuration will invalidate the buffered frames, the PubSub
+ * configuration can be frozen after the configuration phase.
+ *
+ * This example can be configured to compare and measure the different PubSub options.
+ */
+
 UA_NodeId publishedDataSetIdent, dataSetFieldIdent, writerGroupIdent, connectionIdentifier;
 UA_NodeId publishedDataSetIdent, dataSetFieldIdent, writerGroupIdent, connectionIdentifier;
+UA_UInt32 *valueStore[PUBSUB_CONFIG_FIELD_COUNT];
 
 
 UA_Boolean running = true;
 UA_Boolean running = true;
 static void stopHandler(int sign) {
 static void stopHandler(int sign) {
@@ -18,14 +35,6 @@ static void stopHandler(int sign) {
     running = false;
     running = false;
 }
 }
 
 
-/**
- * The PubSub RT level example points out the configuration of different PubSub RT levels. These levels will be later
- * used for deterministic message generation. The underlying base concept is the target to reduce the time spread and
- * affort during the publish cycle. Most of the RT levels are based on a pregenerated and buffered DataSetMesseges and
- * NetworkMessages. Since changes in the PubSub configuration will invalidate the buffered frames, the pubsub
- * configuration can be frozen after the configuration phase
- */
-
 /* The following PubSub configuration does not differ from the 'normal' configuration */
 /* The following PubSub configuration does not differ from the 'normal' configuration */
 static void
 static void
 addMinimalPubSubConfiguration(UA_Server * server){
 addMinimalPubSubConfiguration(UA_Server * server){
@@ -50,10 +59,10 @@ addMinimalPubSubConfiguration(UA_Server * server){
 
 
 static void
 static void
 valueUpdateCallback(UA_Server *server, void *data) {
 valueUpdateCallback(UA_Server *server, void *data) {
-    UA_DataValue dataValue = *((UA_DataValue *) data);
-    UA_UInt32 *integerValue = (UA_UInt32 *) dataValue.value.data;
-    *integerValue += 1;
-    UA_Variant_setScalar(&dataValue.value, integerValue, &UA_TYPES[UA_TYPES_UINT32]);
+    for (int i = 0; i < PUBSUB_CONFIG_FIELD_COUNT; ++i)
+        *valueStore[i] = *valueStore[i]+1;
+    if(*valueStore[0] > PUBSUB_CONFIG_PUBLISH_CYCLES)
+        running = false;
 }
 }
 
 
 int main(void) {
 int main(void) {
@@ -80,23 +89,26 @@ int main(void) {
     UA_WriterGroupConfig writerGroupConfig;
     UA_WriterGroupConfig writerGroupConfig;
     memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
     memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
     writerGroupConfig.name = UA_STRING("Demo WriterGroup");
     writerGroupConfig.name = UA_STRING("Demo WriterGroup");
-    writerGroupConfig.publishingInterval = 100;
+    writerGroupConfig.publishingInterval = PUBSUB_CONFIG_PUBLISH_CYCLE_MS;
     writerGroupConfig.enabled = UA_FALSE;
     writerGroupConfig.enabled = UA_FALSE;
     writerGroupConfig.writerGroupId = 100;
     writerGroupConfig.writerGroupId = 100;
     writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
     writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
     writerGroupConfig.messageSettings.encoding             = UA_EXTENSIONOBJECT_DECODED;
     writerGroupConfig.messageSettings.encoding             = UA_EXTENSIONOBJECT_DECODED;
     writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
     writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
     /* RT Level 0 setup */
     /* RT Level 0 setup */
-    UA_UadpWriterGroupMessageDataType *writerGroupMessage  = UA_UadpWriterGroupMessageDataType_new();
+    UA_UadpWriterGroupMessageDataType writerGroupMessage;
+    UA_UadpWriterGroupMessageDataType_init(&writerGroupMessage);
     /* Change message settings of writerGroup to send PublisherId,
     /* Change message settings of writerGroup to send PublisherId,
      * WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader
      * WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader
      * of NetworkMessage */
      * of NetworkMessage */
-    writerGroupMessage->networkMessageContentMask = (UA_UadpNetworkMessageContentMask) ((UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
+    writerGroupMessage.networkMessageContentMask = (UA_UadpNetworkMessageContentMask) ((UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
                                                     (UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
                                                     (UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
                                                     (UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
                                                     (UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
                                                     (UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
                                                     (UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
-    writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage;
+    writerGroupConfig.messageSettings.content.decoded.data = &writerGroupMessage;
+#if defined PUBSUB_CONFIG_FASTPATH_FIXED_OFFSETS
     writerGroupConfig.rtLevel = UA_PUBSUB_RT_FIXED_SIZE;
     writerGroupConfig.rtLevel = UA_PUBSUB_RT_FIXED_SIZE;
+#endif
     UA_Server_addWriterGroup(server, connectionIdentifier, &writerGroupConfig, &writerGroupIdent);
     UA_Server_addWriterGroup(server, connectionIdentifier, &writerGroupConfig, &writerGroupIdent);
     /* Add one DataSetWriter */
     /* Add one DataSetWriter */
     UA_NodeId dataSetWriterIdent;
     UA_NodeId dataSetWriterIdent;
@@ -106,24 +118,29 @@ int main(void) {
     dataSetWriterConfig.dataSetWriterId = 62541;
     dataSetWriterConfig.dataSetWriterId = 62541;
     dataSetWriterConfig.keyFrameCount = 10;
     dataSetWriterConfig.keyFrameCount = 10;
     UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent, &dataSetWriterConfig, &dataSetWriterIdent);
     UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent, &dataSetWriterConfig, &dataSetWriterIdent);
+
+#if defined PUBSUB_CONFIG_FASTPATH_FIXED_OFFSETS || defined PUBSUB_CONFIG_FASTPATH_STATIC_VALUES
     /* Add one DataSetField with static value source to PDS */
     /* Add one DataSetField with static value source to PDS */
     UA_DataSetFieldConfig dsfConfig;
     UA_DataSetFieldConfig dsfConfig;
-    memset(&dsfConfig, 0, sizeof(UA_DataSetFieldConfig));
-    UA_Server_getDataSetFieldConfig(server, dataSetFieldIdent, &dsfConfig);
-    /* Create Variant and configure as DataSetField source */
-    UA_UInt32 *intValue = UA_UInt32_new();
-    UA_Variant variant;
-    memset(&variant, 0, sizeof(UA_Variant));
-    UA_Variant_setScalar(&variant, intValue, &UA_TYPES[UA_TYPES_UINT32]);
-    UA_DataValue staticValueSource;
-    memset(&staticValueSource, 0, sizeof(staticValueSource));
-    staticValueSource.value = variant;
-    dsfConfig.field.variable.staticValueSourceEnabled = UA_TRUE;
-    dsfConfig.field.variable.staticValueSource.value = variant;
-    UA_Server_addDataSetField(server, publishedDataSetIdent, &dsfConfig, &dataSetFieldIdent);
-
+    for(size_t i = 0; i < PUBSUB_CONFIG_FIELD_COUNT; i++){
+        memset(&dsfConfig, 0, sizeof(UA_DataSetFieldConfig));
+        /* Create Variant and configure as DataSetField source */
+        UA_UInt32 *intValue = UA_UInt32_new();
+        *intValue = (UA_UInt32) i * 1000;
+        valueStore[i] = intValue;
+        UA_Variant variant;
+        memset(&variant, 0, sizeof(UA_Variant));
+        UA_Variant_setScalar(&variant, intValue, &UA_TYPES[UA_TYPES_UINT32]);
+        UA_DataValue staticValueSource;
+        memset(&staticValueSource, 0, sizeof(staticValueSource));
+        staticValueSource.value = variant;
+        dsfConfig.field.variable.staticValueSourceEnabled = UA_TRUE;
+        dsfConfig.field.variable.staticValueSource.value = variant;
+        UA_Server_addDataSetField(server, publishedDataSetIdent, &dsfConfig, &dataSetFieldIdent);
+    }
+#endif
     /* The PubSub configuration is currently editable and the publish callback is not running */
     /* The PubSub configuration is currently editable and the publish callback is not running */
-    writerGroupConfig.publishingInterval = 1000;
+    writerGroupConfig.publishingInterval = PUBSUB_CONFIG_PUBLISH_CYCLE_MS;
     UA_Server_updateWriterGroupConfig(server, writerGroupIdent, &writerGroupConfig);
     UA_Server_updateWriterGroupConfig(server, writerGroupIdent, &writerGroupConfig);
 
 
     /* Freeze the PubSub configuration (and start implicitly the publish callback) */
     /* Freeze the PubSub configuration (and start implicitly the publish callback) */
@@ -140,7 +157,7 @@ int main(void) {
     //UA_Server_unfreezeWriterGroupConfiguration(server, writerGroupIdent);
     //UA_Server_unfreezeWriterGroupConfiguration(server, writerGroupIdent);
 
 
     UA_UInt64 callbackId;
     UA_UInt64 callbackId;
-    UA_Server_addRepeatedCallback(server, valueUpdateCallback, &staticValueSource, 1000, &callbackId);
+    UA_Server_addRepeatedCallback(server, valueUpdateCallback, NULL, PUBSUB_CONFIG_PUBLISH_CYCLE_MS, &callbackId);
 
 
     UA_StatusCode retval = UA_STATUSCODE_GOOD;
     UA_StatusCode retval = UA_STATUSCODE_GOOD;
     retval |= UA_Server_run(server, &running);
     retval |= UA_Server_run(server, &running);

+ 2 - 0
src/pubsub/ua_pubsub.h

@@ -127,12 +127,14 @@ struct UA_WriterGroup{
     LIST_ENTRY(UA_WriterGroup) listEntry;
     LIST_ENTRY(UA_WriterGroup) listEntry;
     UA_NodeId identifier;
     UA_NodeId identifier;
     UA_NodeId linkedConnection;
     UA_NodeId linkedConnection;
+    UA_PubSubConnection *linkedConnectionPtr;
     LIST_HEAD(UA_ListOfDataSetWriter, UA_DataSetWriter) writers;
     LIST_HEAD(UA_ListOfDataSetWriter, UA_DataSetWriter) writers;
     UA_UInt32 writersCount;
     UA_UInt32 writersCount;
     UA_UInt64 publishCallbackId;
     UA_UInt64 publishCallbackId;
     UA_Boolean publishCallbackIsRegistered;
     UA_Boolean publishCallbackIsRegistered;
     UA_PubSubState state;
     UA_PubSubState state;
     UA_NetworkMessageOffsetBuffer bufferedMessage;
     UA_NetworkMessageOffsetBuffer bufferedMessage;
+    UA_UInt16 sequenceNumber;
 };
 };
 
 
 UA_StatusCode
 UA_StatusCode

+ 20 - 0
src/pubsub/ua_pubsub_manager.c

@@ -96,6 +96,16 @@ UA_Server_addPubSubConnection(UA_Server *server,
     if(connectionIdentifier)
     if(connectionIdentifier)
         UA_NodeId_copy(&newConnection->identifier, connectionIdentifier);
         UA_NodeId_copy(&newConnection->identifier, connectionIdentifier);
 
 
+    /* update all writerGroups and set new PTR */
+    for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
+        UA_WriterGroup *wg;
+        LIST_FOREACH(wg, &server->pubSubManager.connections[i].writerGroups, listEntry){
+            UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
+            /* TODO Check if the value is null -> how can we ensure consistency in this case? */
+            if(connection)
+                wg->linkedConnectionPtr = connection;
+        }
+    }
 #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
 #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
     addPubSubConnectionRepresentation(server, newConnection);
     addPubSubConnectionRepresentation(server, newConnection);
 #endif
 #endif
@@ -144,6 +154,16 @@ UA_Server_removePubSubConnection(UA_Server *server, const UA_NodeId connection)
                 server->pubSubManager.connections[n].writerGroups.lh_first->listEntry.le_prev = &server->pubSubManager.connections[n].writerGroups.lh_first;
                 server->pubSubManager.connections[n].writerGroups.lh_first->listEntry.le_prev = &server->pubSubManager.connections[n].writerGroups.lh_first;
             }
             }
         }
         }
+        /* update all writerGroups and set new PTR */
+        for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
+            UA_WriterGroup *wg;
+            LIST_FOREACH(wg, &server->pubSubManager.connections[i].writerGroups, listEntry){
+                UA_PubSubConnection *tmp_connection = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
+                /* TODO Check if the value is null -> how can we ensure consistency in this case? */
+                if(tmp_connection)
+                    wg->linkedConnectionPtr = tmp_connection;
+            }
+        }
     }
     }
     return UA_STATUSCODE_GOOD;
     return UA_STATUSCODE_GOOD;
 }
 }

+ 1 - 0
src/pubsub/ua_pubsub_manager.h

@@ -17,6 +17,7 @@ _UA_BEGIN_DECLS
 #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
 #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
 
 
 typedef struct UA_PubSubManager{
 typedef struct UA_PubSubManager{
+    //TODO connection and pds store to linked list
     //Connections and PublishedDataSets can exist alone (own lifecycle) -> top level components
     //Connections and PublishedDataSets can exist alone (own lifecycle) -> top level components
     size_t connectionsSize;
     size_t connectionsSize;
     UA_PubSubConnection *connections;
     UA_PubSubConnection *connections;

+ 31 - 22
src/pubsub/ua_pubsub_networkmessage.c

@@ -53,15 +53,24 @@ static UA_Boolean UA_DataSetMessageHeader_DataSetFlags2Enabled(const UA_DataSetM
 
 
 size_t
 size_t
 UA_DataSetMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
 UA_DataSetMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
-                                       const UA_DataSetMessage* p, size_t currentOffset) {
+                                       UA_DataSetMessage* p, size_t currentOffset) {
     size_t size = currentOffset;
     size_t size = currentOffset;
     UA_Byte byte;
     UA_Byte byte;
     size += UA_Byte_calcSizeBinary(&byte); // DataSetMessage Type + Flags
     size += UA_Byte_calcSizeBinary(&byte); // DataSetMessage Type + Flags
     if(UA_DataSetMessageHeader_DataSetFlags2Enabled(&p->header))
     if(UA_DataSetMessageHeader_DataSetFlags2Enabled(&p->header))
         size += UA_Byte_calcSizeBinary(&byte);
         size += UA_Byte_calcSizeBinary(&byte);
 
 
-    if(p->header.dataSetMessageSequenceNrEnabled)
+    if(p->header.dataSetMessageSequenceNrEnabled) {
+        offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets,
+                                                                       sizeof(UA_NetworkMessageOffsetBuffer) * (offsetBuffer->offsetsSize + 1));
+        offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
+        offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value = UA_DataValue_new();
+        UA_DataValue_init(offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value);
+        UA_Variant_setScalar(&offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value->value,
+                             &p->header.dataSetMessageSequenceNr, &UA_TYPES[UA_TYPES_UINT16]);
+        offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_DATASETMESSAGE_SEQUENCENUMBER;
         size += UA_UInt16_calcSizeBinary(&p->header.dataSetMessageSequenceNr);
         size += UA_UInt16_calcSizeBinary(&p->header.dataSetMessageSequenceNr);
+    }
 
 
     if(p->header.timestampEnabled)
     if(p->header.timestampEnabled)
         size += UA_DateTime_calcSizeBinary(&p->header.timestamp); /* UtcTime */
         size += UA_DateTime_calcSizeBinary(&p->header.timestamp); /* UtcTime */
@@ -80,28 +89,30 @@ UA_DataSetMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuff
 
 
     if(p->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) {
     if(p->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) {
         if(p->header.fieldEncoding != UA_FIELDENCODING_RAWDATA){
         if(p->header.fieldEncoding != UA_FIELDENCODING_RAWDATA){
-            //TODO calirfy RT and Rawdata behavior
+            //TODO clarify RT and Rawdata behavior
             size += UA_calcSizeBinary(&p->data.keyFrameData.fieldCount, &UA_TYPES[UA_TYPES_UINT16]);
             size += UA_calcSizeBinary(&p->data.keyFrameData.fieldCount, &UA_TYPES[UA_TYPES_UINT16]);
         }
         }
 
 
         if(p->header.fieldEncoding == UA_FIELDENCODING_VARIANT) {
         if(p->header.fieldEncoding == UA_FIELDENCODING_VARIANT) {
             for (UA_UInt16 i = 0; i < p->data.keyFrameData.fieldCount; i++){
             for (UA_UInt16 i = 0; i < p->data.keyFrameData.fieldCount; i++){
-                offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize + 1);
+                offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * (offsetBuffer->offsetsSize + 1));
                 offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
                 offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
                 offsetBuffer->offsets[offsetBuffer->offsetsSize].contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT;
                 offsetBuffer->offsets[offsetBuffer->offsetsSize].contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT;
                 //TODO check value source and alloc!
                 //TODO check value source and alloc!
                 offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value = UA_DataValue_new();
                 offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value = UA_DataValue_new();
-                offsetBuffer->offsets[offsetBuffer->offsetsSize++].offsetData.value.value->value = p->data.keyFrameData.dataSetFields->value;
+                UA_Variant_setScalar(&offsetBuffer->offsets[offsetBuffer->offsetsSize++].offsetData.value.value->value,
+                        p->data.keyFrameData.dataSetFields[i].value.data, p->data.keyFrameData.dataSetFields[i].value.type);
+                //offsetBuffer->offsets[offsetBuffer->offsetsSize++].offsetData.value.value->value = p->data.keyFrameData.dataSetFields->value;
                 size += UA_calcSizeBinary(&p->data.keyFrameData.dataSetFields[i].value, &UA_TYPES[UA_TYPES_VARIANT]);
                 size += UA_calcSizeBinary(&p->data.keyFrameData.dataSetFields[i].value, &UA_TYPES[UA_TYPES_VARIANT]);
             }
             }
         } else if(p->header.fieldEncoding == UA_FIELDENCODING_RAWDATA) {
         } else if(p->header.fieldEncoding == UA_FIELDENCODING_RAWDATA) {
             // not implemented
             // not implemented
         } else if(p->header.fieldEncoding == UA_FIELDENCODING_DATAVALUE) {
         } else if(p->header.fieldEncoding == UA_FIELDENCODING_DATAVALUE) {
             for (UA_UInt16 i = 0; i < p->data.keyFrameData.fieldCount; i++) {
             for (UA_UInt16 i = 0; i < p->data.keyFrameData.fieldCount; i++) {
-                offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize + 1);
+                offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * (offsetBuffer->offsetsSize + 1));
                 offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
                 offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
                 offsetBuffer->offsets[offsetBuffer->offsetsSize].contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_DATAVALUE;
                 offsetBuffer->offsets[offsetBuffer->offsetsSize].contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_DATAVALUE;
-                //TODO check value source
+                //TODO check value source, change implementation to 'variant'
                 offsetBuffer->offsets[offsetBuffer->offsetsSize++].offsetData.value.value = p->data.keyFrameData.dataSetFields;
                 offsetBuffer->offsets[offsetBuffer->offsetsSize++].offsetData.value.value = p->data.keyFrameData.dataSetFields;
                 size += UA_calcSizeBinary(&p->data.keyFrameData.dataSetFields[i], &UA_TYPES[UA_TYPES_DATAVALUE]);
                 size += UA_calcSizeBinary(&p->data.keyFrameData.dataSetFields[i], &UA_TYPES[UA_TYPES_DATAVALUE]);
             }
             }
@@ -131,7 +142,7 @@ UA_DataSetMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuff
 
 
 size_t
 size_t
 UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
 UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
-        const UA_NetworkMessage* p) {
+        UA_NetworkMessage* p) {
     size_t retval = 0;
     size_t retval = 0;
     UA_Byte byte;
     UA_Byte byte;
     size_t size = UA_Byte_calcSizeBinary(&byte); // UADPVersion + UADPFlags
     size_t size = UA_Byte_calcSizeBinary(&byte); // UADPVersion + UADPFlags
@@ -179,16 +190,17 @@ UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuff
             size += UA_UInt32_calcSizeBinary(&p->groupHeader.groupVersion);
             size += UA_UInt32_calcSizeBinary(&p->groupHeader.groupVersion);
 
 
         if(p->groupHeader.networkMessageNumberEnabled) {
         if(p->groupHeader.networkMessageNumberEnabled) {
-            offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize+1);
+            offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * (offsetBuffer->offsetsSize + 1));
             offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
             offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
-            offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGENUMBER;
+            offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value = UA_DataValue_new();
+            UA_DataValue_init(offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value);
+            UA_Variant_setScalar(&offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value->value,
+                                 &p->groupHeader.sequenceNumber, &UA_TYPES[UA_TYPES_UINT16]);
+            offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGE_SEQUENCENUMBER;
             size += UA_UInt16_calcSizeBinary(&p->groupHeader.networkMessageNumber);
             size += UA_UInt16_calcSizeBinary(&p->groupHeader.networkMessageNumber);
         }
         }
 
 
         if(p->groupHeader.sequenceNumberEnabled){
         if(p->groupHeader.sequenceNumberEnabled){
-            offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize+1);
-            offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
-            offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_SEQUENCENUMBER;
             size += UA_UInt16_calcSizeBinary(&p->groupHeader.sequenceNumber);
             size += UA_UInt16_calcSizeBinary(&p->groupHeader.sequenceNumber);
         }
         }
     }
     }
@@ -209,14 +221,14 @@ UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuff
     }
     }
 
 
     if(p->timestampEnabled) {
     if(p->timestampEnabled) {
-        offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize+1);
+        offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * (offsetBuffer->offsetsSize + 1));
         offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
         offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
         offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP;
         offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP;
         size += UA_DateTime_calcSizeBinary(&p->timestamp);
         size += UA_DateTime_calcSizeBinary(&p->timestamp);
     }
     }
 
 
     if(p->picosecondsEnabled){
     if(p->picosecondsEnabled){
-        offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize+1);
+        offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * (offsetBuffer->offsetsSize + 1));
         offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
         offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
         offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP_PICOSECONDS;
         offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP_PICOSECONDS;
         size += UA_UInt16_calcSizeBinary(&p->picoseconds);
         size += UA_UInt16_calcSizeBinary(&p->picoseconds);
@@ -271,16 +283,13 @@ UA_NetworkMessage_updateBufferedMessage(UA_NetworkMessageOffsetBuffer *buffer){
         const UA_Byte *bufEnd = &buffer->buffer.data[buffer->buffer.length];
         const UA_Byte *bufEnd = &buffer->buffer.data[buffer->buffer.length];
         UA_Byte *bufPos = &buffer->buffer.data[buffer->offsets[i].offset];
         UA_Byte *bufPos = &buffer->buffer.data[buffer->offsets[i].offset];
         switch (buffer->offsets[i].contentType) {
         switch (buffer->offsets[i].contentType) {
-            case UA_PUBSUB_OFFSETTYPE_SEQUENCENUMBER:
-
+            case UA_PUBSUB_OFFSETTYPE_DATASETMESSAGE_SEQUENCENUMBER:
+                rv = UA_UInt16_encodeBinary((UA_UInt16 *) buffer->offsets[i].offsetData.value.value->value.data, &bufPos, bufEnd);
                 break;
                 break;
-            case UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGENUMBER:
-
+            case UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGE_SEQUENCENUMBER:
+                rv = UA_UInt16_encodeBinary((UA_UInt16 *) buffer->offsets[i].offsetData.value.value->value.data, &bufPos, bufEnd);
                 break;
                 break;
             case UA_PUBSUB_OFFSETTYPE_PAYLOAD_DATAVALUE:
             case UA_PUBSUB_OFFSETTYPE_PAYLOAD_DATAVALUE:
-                //memcpy(&buffer->buffer.data[buffer->offsets[i].offset],
-                //       buffer->offsets[i].offsetData.value.value->value.data,
-                //       buffer->offsets[i].offsetData.value.valueBinarySize);
                 rv = UA_DataValue_encodeBinary(buffer->offsets[i].offsetData.value.value, &bufPos, bufEnd);
                 rv = UA_DataValue_encodeBinary(buffer->offsets[i].offsetData.value.value, &bufPos, bufEnd);
                 break;
                 break;
             case UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT:
             case UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT:

+ 4 - 4
src/pubsub/ua_pubsub_networkmessage.h

@@ -67,8 +67,8 @@ UA_DataSetMessageHeader_calcSizeBinary(const UA_DataSetMessageHeader* p);
 /* Offsets for buffered messages in the PubSub fast path.
 /* Offsets for buffered messages in the PubSub fast path.
  * TODO: Implement the generation of the offsets */
  * TODO: Implement the generation of the offsets */
 typedef enum {
 typedef enum {
-    UA_PUBSUB_OFFSETTYPE_SEQUENCENUMBER,
-    UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGENUMBER,
+    UA_PUBSUB_OFFSETTYPE_DATASETMESSAGE_SEQUENCENUMBER,
+    UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGE_SEQUENCENUMBER,
     UA_PUBSUB_OFFSETTYPE_TIMESTAMP_PICOSECONDS,
     UA_PUBSUB_OFFSETTYPE_TIMESTAMP_PICOSECONDS,
     UA_PUBSUB_OFFSETTYPE_TIMESTAMP,     /* source pointer */
     UA_PUBSUB_OFFSETTYPE_TIMESTAMP,     /* source pointer */
     UA_PUBSUB_OFFSETTYPE_TIMESTAMP_NOW, /* no source */
     UA_PUBSUB_OFFSETTYPE_TIMESTAMP_NOW, /* no source */
@@ -236,11 +236,11 @@ typedef struct {
 
 
 size_t
 size_t
 UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
 UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
-                                       const UA_NetworkMessage* p);
+                                       UA_NetworkMessage* p);
 
 
 size_t
 size_t
 UA_DataSetMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
 UA_DataSetMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
-                                       const UA_DataSetMessage* p, size_t currentOffset);
+                                       UA_DataSetMessage* p, size_t currentOffset);
 UA_StatusCode
 UA_StatusCode
 UA_NetworkMessage_updateBufferedMessage(UA_NetworkMessageOffsetBuffer *buffer);
 UA_NetworkMessage_updateBufferedMessage(UA_NetworkMessageOffsetBuffer *buffer);
 
 

+ 24 - 7
src/pubsub/ua_pubsub_writer.c

@@ -13,7 +13,6 @@
 #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
 #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
 
 
 #include "ua_pubsub.h"
 #include "ua_pubsub.h"
-#include "ua_pubsub_networkmessage.h"
 
 
 #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
 #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
 #include "ua_pubsub_ns0.h"
 #include "ua_pubsub_ns0.h"
@@ -173,6 +172,7 @@ UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
         return UA_STATUSCODE_BADOUTOFMEMORY;
         return UA_STATUSCODE_BADOUTOFMEMORY;
 
 
     newWriterGroup->linkedConnection = currentConnectionContext->identifier;
     newWriterGroup->linkedConnection = currentConnectionContext->identifier;
+    newWriterGroup->linkedConnectionPtr = currentConnectionContext;
     UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
     UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
     if(writerGroupIdentifier){
     if(writerGroupIdentifier){
         UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
         UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
@@ -330,6 +330,10 @@ UA_Server_freezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writ
         const UA_Byte *bufEnd = &wg->bufferedMessage.buffer.data[wg->bufferedMessage.buffer.length];
         const UA_Byte *bufEnd = &wg->bufferedMessage.buffer.data[wg->bufferedMessage.buffer.length];
         UA_Byte *bufPos = wg->bufferedMessage.buffer.data;
         UA_Byte *bufPos = wg->bufferedMessage.buffer.data;
         UA_NetworkMessage_encodeBinary(&networkMessage, &bufPos, bufEnd);
         UA_NetworkMessage_encodeBinary(&networkMessage, &bufPos, bufEnd);
+        /* Clean up DSM */
+        for(size_t i = 0; i < dsmCount; i++)
+            UA_free(dsmStore[i].data.keyFrameData.dataSetFields);
+            //UA_DataSetMessage_free(&dsmStore[i]);
     }
     }
     return UA_STATUSCODE_GOOD;
     return UA_STATUSCODE_GOOD;
 }
 }
@@ -1015,6 +1019,15 @@ UA_WriterGroup_clear(UA_Server *server, UA_WriterGroup *writerGroup) {
     LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
     LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
         UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
         UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
     }
     }
+    if(writerGroup->bufferedMessage.offsetsSize > 0){
+        for (size_t i = 0; i < writerGroup->bufferedMessage.offsetsSize; i++) {
+            if(writerGroup->bufferedMessage.offsets[i].contentType == UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT){
+                UA_DataValue_delete(writerGroup->bufferedMessage.offsets[i].offsetData.value.value);
+            }
+        }
+        UA_ByteString_deleteMembers(&writerGroup->bufferedMessage.buffer);
+        UA_free(writerGroup->bufferedMessage.offsets);
+    }
     UA_NodeId_clear(&writerGroup->linkedConnection);
     UA_NodeId_clear(&writerGroup->linkedConnection);
     UA_NodeId_clear(&writerGroup->identifier);
     UA_NodeId_clear(&writerGroup->identifier);
 }
 }
@@ -1787,6 +1800,8 @@ generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
         networkMessage->publisherIdType = UA_PUBLISHERDATATYPE_STRING;
         networkMessage->publisherIdType = UA_PUBLISHERDATATYPE_STRING;
         networkMessage->publisherId.publisherIdString = connection->config->publisherId.string;
         networkMessage->publisherId.publisherIdString = connection->config->publisherId.string;
     }
     }
+    if(networkMessage->groupHeader.sequenceNumberEnabled)
+        networkMessage->groupHeader.sequenceNumber = wg->sequenceNumber;
     /* Compute the length of the dsm separately for the header */
     /* Compute the length of the dsm separately for the header */
     UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
     UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
     for(UA_Byte i = 0; i < dsmCount; i++)
     for(UA_Byte i = 0; i < dsmCount; i++)
@@ -1795,6 +1810,7 @@ generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
     networkMessage->payloadHeader.dataSetPayloadHeader.count = dsmCount;
     networkMessage->payloadHeader.dataSetPayloadHeader.count = dsmCount;
     networkMessage->payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
     networkMessage->payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
     networkMessage->groupHeader.writerGroupId = wg->config.writerGroupId;
     networkMessage->groupHeader.writerGroupId = wg->config.writerGroupId;
+    /* number of the NetworkMessage inside a PublishingInterval */
     networkMessage->groupHeader.networkMessageNumber = 1;
     networkMessage->groupHeader.networkMessageNumber = 1;
     networkMessage->payload.dataSetPayload.sizes = dsmLengths;
     networkMessage->payload.dataSetPayload.sizes = dsmLengths;
     networkMessage->payload.dataSetPayload.dataSetMessages = dsm;
     networkMessage->payload.dataSetPayload.dataSetMessages = dsm;
@@ -1802,10 +1818,10 @@ generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
 }
 }
 
 
 static UA_StatusCode
 static UA_StatusCode
-sendBufferedNetworkMessage(UA_PubSubConnection *connection,
+sendBufferedNetworkMessage(UA_Server *server, UA_PubSubConnection *connection,
         UA_NetworkMessageOffsetBuffer *buffer, UA_ExtensionObject *transportSettings) {
         UA_NetworkMessageOffsetBuffer *buffer, UA_ExtensionObject *transportSettings) {
     if(UA_NetworkMessage_updateBufferedMessage(buffer) != UA_STATUSCODE_GOOD)
     if(UA_NetworkMessage_updateBufferedMessage(buffer) != UA_STATUSCODE_GOOD)
-        UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub sending. Unknown field type.");
+        UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "PubSub sending. Unknown field type.");
     UA_StatusCode retval = connection->channel->send(connection->channel, transportSettings, &buffer->buffer);
     UA_StatusCode retval = connection->channel->send(connection->channel, transportSettings, &buffer->buffer);
     return retval;
     return retval;
 }
 }
@@ -1878,17 +1894,16 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
     }
     }
 
 
     /* Find the connection associated with the writer */
     /* Find the connection associated with the writer */
-    UA_PubSubConnection *connection =
-        UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
+    UA_PubSubConnection *connection = writerGroup->linkedConnectionPtr;
     if(!connection) {
     if(!connection) {
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
         UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
                        "Publish failed. PubSubConnection invalid.");
                        "Publish failed. PubSubConnection invalid.");
         return;
         return;
     }
     }
 
 
-    //TODO Avoid connection lookup while publish! Attention simple pointer vs. realloc in pubsub manager
     if(writerGroup->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE) {
     if(writerGroup->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE) {
-        sendBufferedNetworkMessage(connection, &writerGroup->bufferedMessage, &writerGroup->config.transportSettings);
+        sendBufferedNetworkMessage(server, connection, &writerGroup->bufferedMessage, &writerGroup->config.transportSettings);
+        writerGroup->sequenceNumber++;
         return;
         return;
     }
     }
 
 
@@ -1963,9 +1978,11 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
                                       &dsWriterIds[i * maxDSM], nmDsmCount,
                                       &dsWriterIds[i * maxDSM], nmDsmCount,
                                       &writerGroup->config.messageSettings,
                                       &writerGroup->config.messageSettings,
                                       &writerGroup->config.transportSettings);
                                       &writerGroup->config.transportSettings);
+            writerGroup->sequenceNumber++;
         }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
         }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
             res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
             res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
                     &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
                     &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
+            writerGroup->sequenceNumber++;
         }
         }
         if(res3 != UA_STATUSCODE_GOOD)
         if(res3 != UA_STATUSCODE_GOOD)
             UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
             UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,