Browse Source

feat(pubsub) extended encoding layer with buffered message handling

Andreas Ebner 4 years ago
parent
commit
e5fbae7609

+ 2 - 2
CMakeLists.txt

@@ -205,7 +205,7 @@ endif()
 option(UA_ENABLE_EXPERIMENTAL_HISTORIZING "Enable client experimental historical access features" OFF)
 mark_as_advanced(UA_ENABLE_EXPERIMENTAL_HISTORIZING)
 
-option(UA_ENABLE_PUBSUB "Enable publish/subscribe" OFF)
+option(UA_ENABLE_PUBSUB "Enable publish/subscribe" ON)
 mark_as_advanced(UA_ENABLE_PUBSUB)
 
 option(UA_ENABLE_PUBSUB_ETH_UADP "Enable publish/subscribe UADP over Ethernet" OFF)
@@ -333,7 +333,7 @@ if(CMAKE_VERSION VERSION_GREATER 3.6)
 endif()
 
 # Build Targets
-option(UA_BUILD_EXAMPLES "Build example servers and clients" OFF)
+option(UA_BUILD_EXAMPLES "Build example servers and clients" ON)
 option(UA_BUILD_TOOLS "Build OPC UA shell tools" OFF)
 option(UA_BUILD_UNIT_TESTS "Build the unit tests" OFF)
 option(UA_BUILD_FUZZING "Build the fuzzing executables" OFF)

+ 4 - 0
examples/pubsub/pubsub_subscribe_standalone.c

@@ -85,6 +85,10 @@ subscriberListen(UA_PubSubChannel *psc) {
                 UA_Byte value = *(UA_Byte *)dsm->data.keyFrameData.dataSetFields[i].value.data;
                 UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                             "Message content: [Byte] \tReceived data: %i", value);
+            } else if (currentType == &UA_TYPES[UA_TYPES_UINT32]) {
+                UA_UInt32 value = *(UA_UInt32 *)dsm->data.keyFrameData.dataSetFields[i].value.data;
+                UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
+                            "Message content: [UInt32] \tReceived data: %u", value);
             } else if (currentType == &UA_TYPES[UA_TYPES_DATETIME]) {
                 UA_DateTime value = *(UA_DateTime *)dsm->data.keyFrameData.dataSetFields[i].value.data;
                 UA_DateTimeStruct receivedTime = UA_DateTime_toStruct(value);

+ 24 - 2
examples/pubsub/server_pubsub_publisher_rt_level.c

@@ -48,6 +48,14 @@ addMinimalPubSubConfiguration(UA_Server * server){
     UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
 }
 
+static void
+valueUpdateCallback(UA_Server *server, void *data) {
+    UA_DataValue dataValue = *((UA_DataValue *) data);
+    UA_UInt32 *integerValue = (UA_UInt32 *) dataValue.value.data;
+    *integerValue += 1;
+    UA_Variant_setScalar(&dataValue.value, integerValue, &UA_TYPES[UA_TYPES_UINT32]);
+}
+
 int main(void) {
     signal(SIGINT, stopHandler);
     signal(SIGTERM, stopHandler);
@@ -76,7 +84,18 @@ int main(void) {
     writerGroupConfig.enabled = UA_FALSE;
     writerGroupConfig.writerGroupId = 100;
     writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
+    writerGroupConfig.messageSettings.encoding             = UA_EXTENSIONOBJECT_DECODED;
+    writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
     /* RT Level 0 setup */
+    UA_UadpWriterGroupMessageDataType *writerGroupMessage  = UA_UadpWriterGroupMessageDataType_new();
+    /* Change message settings of writerGroup to send PublisherId,
+     * WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader
+     * of NetworkMessage */
+    writerGroupMessage->networkMessageContentMask = (UA_UadpNetworkMessageContentMask) ((UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
+                                                    (UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
+                                                    (UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
+                                                    (UA_UadpNetworkMessageContentMask) UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
+    writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage;
     writerGroupConfig.rtLevel = UA_PUBSUB_RT_FIXED_SIZE;
     UA_Server_addWriterGroup(server, connectionIdentifier, &writerGroupConfig, &writerGroupIdent);
     /* Add one DataSetWriter */
@@ -116,8 +135,11 @@ int main(void) {
         return EXIT_FAILURE;
 
     /* Unfreeze the PubSub configuration (and stop implicitly the publish callback) */
-    UA_Server_setWriterGroupDisabled(server, writerGroupIdent);
-    UA_Server_unfreezeWriterGroupConfiguration(server, writerGroupIdent);
+    //UA_Server_setWriterGroupDisabled(server, writerGroupIdent);
+    //UA_Server_unfreezeWriterGroupConfiguration(server, writerGroupIdent);
+
+    UA_UInt64 callbackId;
+    UA_Server_addRepeatedCallback(server, valueUpdateCallback, &staticValueSource, 1000, &callbackId);
 
     UA_StatusCode retval = UA_STATUSCODE_GOOD;
     retval |= UA_Server_run(server, &running);

+ 112 - 274
src/pubsub/ua_pubsub_networkmessage.c

@@ -51,10 +51,87 @@ static UA_Boolean UA_NetworkMessage_ExtendedFlags1Enabled(const UA_NetworkMessag
 static UA_Boolean UA_NetworkMessage_ExtendedFlags2Enabled(const UA_NetworkMessage* src);
 static UA_Boolean UA_DataSetMessageHeader_DataSetFlags2Enabled(const UA_DataSetMessageHeader* src);
 
-UA_StatusCode
+size_t
+UA_DataSetMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
+                                       const UA_DataSetMessage* p, size_t currentOffset) {
+    size_t size = currentOffset;
+    UA_Byte byte;
+    size += UA_Byte_calcSizeBinary(&byte); // DataSetMessage Type + Flags
+    if(UA_DataSetMessageHeader_DataSetFlags2Enabled(&p->header))
+        size += UA_Byte_calcSizeBinary(&byte);
+
+    if(p->header.dataSetMessageSequenceNrEnabled)
+        size += UA_UInt16_calcSizeBinary(&p->header.dataSetMessageSequenceNr);
+
+    if(p->header.timestampEnabled)
+        size += UA_DateTime_calcSizeBinary(&p->header.timestamp); /* UtcTime */
+
+    if(p->header.picoSecondsIncluded)
+        size += UA_UInt16_calcSizeBinary(&p->header.picoSeconds);
+
+    if(p->header.statusEnabled)
+        size += UA_UInt16_calcSizeBinary(&p->header.status);
+
+    if(p->header.configVersionMajorVersionEnabled)
+        size += UA_UInt32_calcSizeBinary(&p->header.configVersionMajorVersion);
+
+    if(p->header.configVersionMinorVersionEnabled)
+        size += UA_UInt32_calcSizeBinary(&p->header.configVersionMinorVersion);
+
+    if(p->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) {
+        if(p->header.fieldEncoding != UA_FIELDENCODING_RAWDATA){
+            //TODO calirfy RT and Rawdata behavior
+            size += UA_calcSizeBinary(&p->data.keyFrameData.fieldCount, &UA_TYPES[UA_TYPES_UINT16]);
+        }
+
+        if(p->header.fieldEncoding == UA_FIELDENCODING_VARIANT) {
+            for (UA_UInt16 i = 0; i < p->data.keyFrameData.fieldCount; i++){
+                offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize + 1);
+                offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
+                offsetBuffer->offsets[offsetBuffer->offsetsSize].contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT;
+                //TODO check value source and alloc!
+                offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value = UA_DataValue_new();
+                offsetBuffer->offsets[offsetBuffer->offsetsSize++].offsetData.value.value->value = p->data.keyFrameData.dataSetFields->value;
+                size += UA_calcSizeBinary(&p->data.keyFrameData.dataSetFields[i].value, &UA_TYPES[UA_TYPES_VARIANT]);
+            }
+        } else if(p->header.fieldEncoding == UA_FIELDENCODING_RAWDATA) {
+            // not implemented
+        } else if(p->header.fieldEncoding == UA_FIELDENCODING_DATAVALUE) {
+            for (UA_UInt16 i = 0; i < p->data.keyFrameData.fieldCount; i++) {
+                offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize + 1);
+                offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
+                offsetBuffer->offsets[offsetBuffer->offsetsSize].contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_DATAVALUE;
+                //TODO check value source
+                offsetBuffer->offsets[offsetBuffer->offsetsSize++].offsetData.value.value = p->data.keyFrameData.dataSetFields;
+                size += UA_calcSizeBinary(&p->data.keyFrameData.dataSetFields[i], &UA_TYPES[UA_TYPES_DATAVALUE]);
+            }
+        }
+    } else if(p->header.dataSetMessageType == UA_DATASETMESSAGE_DATADELTAFRAME) {
+        //TODO clarify how to handle DATADELTAFRAME messages with RT
+        if(p->header.fieldEncoding != UA_FIELDENCODING_RAWDATA)
+            size += UA_calcSizeBinary(&p->data.deltaFrameData.fieldCount, &UA_TYPES[UA_TYPES_UINT16]);
+
+        if(p->header.fieldEncoding == UA_FIELDENCODING_VARIANT) {
+            for (UA_UInt16 i = 0; i < p->data.deltaFrameData.fieldCount; i++) {
+                size += UA_calcSizeBinary(&p->data.deltaFrameData.deltaFrameFields[i].fieldIndex, &UA_TYPES[UA_TYPES_UINT16]);
+                size += UA_calcSizeBinary(&p->data.deltaFrameData.deltaFrameFields[i].fieldValue.value, &UA_TYPES[UA_TYPES_VARIANT]);
+            }
+        } else if(p->header.fieldEncoding == UA_FIELDENCODING_RAWDATA) {
+            // not implemented
+        } else if(p->header.fieldEncoding == UA_FIELDENCODING_DATAVALUE) {
+            for (UA_UInt16 i = 0; i < p->data.deltaFrameData.fieldCount; i++) {
+                size += UA_calcSizeBinary(&p->data.deltaFrameData.deltaFrameFields[i].fieldIndex, &UA_TYPES[UA_TYPES_UINT16]);
+                size += UA_calcSizeBinary(&p->data.deltaFrameData.deltaFrameFields[i].fieldValue, &UA_TYPES[UA_TYPES_DATAVALUE]);
+            }
+        }
+    }
+    /* KeepAlive-Message contains no Payload Data */
+    return size;
+}
+
+size_t
 UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
         const UA_NetworkMessage* p) {
-    size_t offsetArrayIndex = 0;
     size_t retval = 0;
     UA_Byte byte;
     size_t size = UA_Byte_calcSizeBinary(&byte); // UADPVersion + UADPFlags
@@ -102,14 +179,16 @@ UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuff
             size += UA_UInt32_calcSizeBinary(&p->groupHeader.groupVersion);
 
         if(p->groupHeader.networkMessageNumberEnabled) {
-            offsetBuffer->offsets[offsetArrayIndex++].offset = size;
-            offsetBuffer->offsets[offsetArrayIndex++].contentType = UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGENUMBER;
+            offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize+1);
+            offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
+            offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGENUMBER;
             size += UA_UInt16_calcSizeBinary(&p->groupHeader.networkMessageNumber);
         }
 
         if(p->groupHeader.sequenceNumberEnabled){
-            offsetBuffer->offsets[offsetArrayIndex++].offset = size;
-            offsetBuffer->offsets[offsetArrayIndex++].contentType = UA_PUBSUB_OFFSETTYPE_SEQUENCENUMBER;
+            offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize+1);
+            offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
+            offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_SEQUENCENUMBER;
             size += UA_UInt16_calcSizeBinary(&p->groupHeader.sequenceNumber);
         }
     }
@@ -130,14 +209,16 @@ UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuff
     }
 
     if(p->timestampEnabled) {
-        offsetBuffer->offsets[offsetArrayIndex++].offset = size;
-        offsetBuffer->offsets[offsetArrayIndex++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP;
+        offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize+1);
+        offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
+        offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP;
         size += UA_DateTime_calcSizeBinary(&p->timestamp);
     }
 
     if(p->picosecondsEnabled){
-        offsetBuffer->offsets[offsetArrayIndex++].offset = size;
-        offsetBuffer->offsets[offsetArrayIndex++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP_PICOSECONDS;
+        offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffsetBuffer) * offsetBuffer->offsetsSize+1);
+        offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
+        offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP_PICOSECONDS;
         size += UA_UInt16_calcSizeBinary(&p->picoseconds);
     }
 
@@ -165,8 +246,10 @@ UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuff
                 size += UA_UInt16_calcSizeBinary(&(p->payload.dataSetPayload.sizes[0])) * count;
         }
 
-        for (size_t i = 0; i < count; i++)
+        for (size_t i = 0; i < count; i++) {
+            UA_DataSetMessage_generateOffsetBuffer(offsetBuffer, &(p->payload.dataSetPayload.dataSetMessages[i]), size);
             size += UA_DataSetMessage_calcSizeBinary(&(p->payload.dataSetPayload.dataSetMessages[i]));
+        }
     }
 
     if (p->securityEnabled) {
@@ -178,283 +261,38 @@ UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuff
     }
 
     retval = size;
-    return UA_STATUSCODE_GOOD;
+    return retval;
 }
 
 UA_StatusCode
-UA_NetworkMessage_generateOffsetBuffer2(UA_NetworkMessageOffsetBuffer *offsetBuffer, const UA_NetworkMessage* src, UA_Byte **bufPos,
-                                       const UA_Byte *bufEnd){
-    size_t currentOffsetCounter = 0;
-    size_t offsetArrayIndex = 0;
-    /* UADPVersion + UADP Flags */
-    UA_Byte v = src->version;
-    if(src->publisherIdEnabled)
-        v |= NM_PUBLISHER_ID_ENABLED_MASK;
-
-    if(src->groupHeaderEnabled)
-        v |= NM_GROUP_HEADER_ENABLED_MASK;
+UA_NetworkMessage_updateBufferedMessage(UA_NetworkMessageOffsetBuffer *buffer){
+    UA_StatusCode rv = UA_STATUSCODE_GOOD;
+    for (size_t i = 0; i < buffer->offsetsSize; ++i) {
+        const UA_Byte *bufEnd = &buffer->buffer.data[buffer->buffer.length];
+        UA_Byte *bufPos = &buffer->buffer.data[buffer->offsets[i].offset];
+        switch (buffer->offsets[i].contentType) {
+            case UA_PUBSUB_OFFSETTYPE_SEQUENCENUMBER:
 
-    if(src->payloadHeaderEnabled)
-        v |= NM_PAYLOAD_HEADER_ENABLED_MASK;
-
-    if(UA_NetworkMessage_ExtendedFlags1Enabled(src))
-        v |= NM_EXTENDEDFLAGS1_ENABLED_MASK;
-
-    UA_StatusCode rv = UA_Byte_encodeBinary(&v, bufPos, bufEnd);
-    currentOffsetCounter += UA_Byte_calcSizeBinary(&v);
-    if(rv != UA_STATUSCODE_GOOD)
-        return rv;
-
-    // ExtendedFlags1
-    if(UA_NetworkMessage_ExtendedFlags1Enabled(src)) {
-        v = (UA_Byte)src->publisherIdType;
-
-        if(src->dataSetClassIdEnabled)
-            v |= NM_DATASET_CLASSID_ENABLED_MASK;
-
-        if(src->securityEnabled)
-            v |= NM_SECURITY_ENABLED_MASK;
-
-        if(src->timestampEnabled)
-            v |= NM_TIMESTAMP_ENABLED_MASK;
-
-        if(src->picosecondsEnabled)
-            v |= NM_PICOSECONDS_ENABLED_MASK;
-
-        if(UA_NetworkMessage_ExtendedFlags2Enabled(src))
-            v |= NM_EXTENDEDFLAGS2_ENABLED_MASK;
-
-        rv = UA_Byte_encodeBinary(&v, bufPos, bufEnd);
-        if(UA_NetworkMessage_ExtendedFlags1Enabled(src))
-            currentOffsetCounter += UA_Byte_calcSizeBinary(&v);
-        if(rv != UA_STATUSCODE_GOOD)
-            return rv;
-
-        // ExtendedFlags2
-        if(UA_NetworkMessage_ExtendedFlags2Enabled(src)) {
-            v = (UA_Byte)src->networkMessageType;
-            // shift left 2 bit
-            v = (UA_Byte) (v << NM_SHIFT_LEN);
-
-            if(src->chunkMessage)
-                v |= NM_CHUNK_MESSAGE_MASK;
-
-            if(src->promotedFieldsEnabled)
-                v |= NM_PROMOTEDFIELDS_ENABLED_MASK;
-
-            rv = UA_Byte_encodeBinary(&v, bufPos, bufEnd);
-            if(UA_NetworkMessage_ExtendedFlags2Enabled(src))
-                currentOffsetCounter += UA_Byte_calcSizeBinary(&v);
-            if(rv != UA_STATUSCODE_GOOD)
-                return rv;
-        }
-    }
-
-    // PublisherId
-    if(src->publisherIdEnabled) {
-        switch (src->publisherIdType) {
-            case UA_PUBLISHERDATATYPE_BYTE:
-                currentOffsetCounter += UA_Byte_calcSizeBinary(&(src->publisherId.publisherIdByte));
-                rv = UA_Byte_encodeBinary(&(src->publisherId.publisherIdByte), bufPos, bufEnd);
-                break;
-
-            case UA_PUBLISHERDATATYPE_UINT16:
-                currentOffsetCounter += UA_UInt16_calcSizeBinary(&(src->publisherId.publisherIdUInt16));
-                rv = UA_UInt16_encodeBinary(&(src->publisherId.publisherIdUInt16), bufPos, bufEnd);
                 break;
+            case UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGENUMBER:
 
-            case UA_PUBLISHERDATATYPE_UINT32:
-                currentOffsetCounter += UA_UInt32_calcSizeBinary(&(src->publisherId.publisherIdUInt32));
-                rv = UA_UInt32_encodeBinary(&(src->publisherId.publisherIdUInt32), bufPos, bufEnd);
                 break;
-
-            case UA_PUBLISHERDATATYPE_UINT64:
-                currentOffsetCounter += UA_UInt64_calcSizeBinary(&(src->publisherId.publisherIdUInt64));
-                rv = UA_UInt64_encodeBinary(&(src->publisherId.publisherIdUInt64), bufPos, bufEnd);
+            case UA_PUBSUB_OFFSETTYPE_PAYLOAD_DATAVALUE:
+                //memcpy(&buffer->buffer.data[buffer->offsets[i].offset],
+                //       buffer->offsets[i].offsetData.value.value->value.data,
+                //       buffer->offsets[i].offsetData.value.valueBinarySize);
+                rv = UA_DataValue_encodeBinary(buffer->offsets[i].offsetData.value.value, &bufPos, bufEnd);
                 break;
-
-            case UA_PUBLISHERDATATYPE_STRING:
-                currentOffsetCounter += UA_String_calcSizeBinary(&(src->publisherId.publisherIdString));
-                rv = UA_String_encodeBinary(&(src->publisherId.publisherIdString), bufPos, bufEnd);
+            case UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT:
+                rv = UA_Variant_encodeBinary(&buffer->offsets[i].offsetData.value.value->value, &bufPos, bufEnd);
                 break;
-
             default:
-                rv = UA_STATUSCODE_BADINTERNALERROR;
-                break;
+                return UA_STATUSCODE_BADNOTSUPPORTED;
         }
-
-        if(rv != UA_STATUSCODE_GOOD)
-            return rv;
     }
-
-    // DataSetClassId
-    if(src->dataSetClassIdEnabled) {
-        currentOffsetCounter += UA_Guid_calcSizeBinary(&(src->dataSetClassId));
-        rv = UA_Guid_encodeBinary(&(src->dataSetClassId), bufPos, bufEnd);
-        if(rv != UA_STATUSCODE_GOOD)
-            return rv;
-    }
-
-    // Group Header
-    if(src->groupHeaderEnabled) {
-        v = 0;
-
-        if(src->groupHeader.writerGroupIdEnabled)
-            v |= GROUP_HEADER_WRITER_GROUPID_ENABLED;
-
-        if(src->groupHeader.groupVersionEnabled)
-            v |= GROUP_HEADER_GROUP_VERSION_ENABLED;
-
-        if(src->groupHeader.networkMessageNumberEnabled)
-            v |= GROUP_HEADER_NM_NUMBER_ENABLED;
-
-        if(src->groupHeader.sequenceNumberEnabled)
-            v |= GROUP_HEADER_SEQUENCE_NUMBER_ENABLED;
-
-        currentOffsetCounter += UA_Byte_calcSizeBinary(&v);
-        rv = UA_Byte_encodeBinary(&v, bufPos, bufEnd);
-        if(rv != UA_STATUSCODE_GOOD)
-            return rv;
-
-        if(src->groupHeader.writerGroupIdEnabled) {
-            currentOffsetCounter += UA_UInt16_calcSizeBinary(&(src->groupHeader.writerGroupId));
-            rv = UA_UInt16_encodeBinary(&(src->groupHeader.writerGroupId), bufPos, bufEnd);
-            if(rv != UA_STATUSCODE_GOOD)
-                return rv;
-        }
-
-        if(src->groupHeader.groupVersionEnabled) {
-            currentOffsetCounter += UA_UInt32_calcSizeBinary(&(src->groupHeader.groupVersion));
-            rv = UA_UInt32_encodeBinary(&(src->groupHeader.groupVersion), bufPos, bufEnd);
-            if(rv != UA_STATUSCODE_GOOD)
-                return rv;
-        }
-
-        if(src->groupHeader.networkMessageNumberEnabled) {
-            currentOffsetCounter += UA_UInt16_calcSizeBinary(&(src->groupHeader.networkMessageNumber));
-            offsetBuffer->offsets[offsetArrayIndex++].offset = currentOffsetCounter;
-            offsetBuffer->offsets[offsetArrayIndex++].contentType = UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGENUMBER;
-            rv = UA_UInt16_encodeBinary(&(src->groupHeader.networkMessageNumber), bufPos, bufEnd);
-            if(rv != UA_STATUSCODE_GOOD)
-                return rv;
-        }
-
-        if(src->groupHeader.sequenceNumberEnabled) {
-            currentOffsetCounter += UA_UInt16_calcSizeBinary(&(src->groupHeader.sequenceNumber));
-            offsetBuffer->offsets[offsetArrayIndex++].offset = currentOffsetCounter;
-            offsetBuffer->offsets[offsetArrayIndex++].contentType = UA_PUBSUB_OFFSETTYPE_SEQUENCENUMBER;
-            rv = UA_UInt16_encodeBinary(&(src->groupHeader.sequenceNumber), bufPos, bufEnd);
-            if(rv != UA_STATUSCODE_GOOD)
-                return rv;
-        }
-    }
-
-    // Payload-Header
-    if(src->payloadHeaderEnabled) {
-        if(src->networkMessageType != UA_NETWORKMESSAGE_DATASET)
-            return UA_STATUSCODE_BADNOTIMPLEMENTED;
-
-        currentOffsetCounter += UA_Byte_calcSizeBinary(&(src->payloadHeader.dataSetPayloadHeader.count));
-        rv = UA_Byte_encodeBinary(&(src->payloadHeader.dataSetPayloadHeader.count), bufPos, bufEnd);
-
-        if(src->payloadHeader.dataSetPayloadHeader.dataSetWriterIds == NULL)
-            return UA_STATUSCODE_BADENCODINGERROR;
-
-        for(UA_Byte i = 0; i < src->payloadHeader.dataSetPayloadHeader.count; i++) {
-            currentOffsetCounter += UA_UInt16_calcSizeBinary(&(src->payloadHeader.dataSetPayloadHeader.dataSetWriterIds[i]));
-            rv = UA_UInt16_encodeBinary(&(src->payloadHeader.dataSetPayloadHeader.dataSetWriterIds[i]),
-                                        bufPos, bufEnd);
-            if(rv != UA_STATUSCODE_GOOD)
-                return rv;
-        }
-    }
-
-    // Timestamp
-    if(src->timestampEnabled){
-        currentOffsetCounter += UA_DateTime_calcSizeBinary(&(src->timestamp));
-        offsetBuffer->offsets[offsetArrayIndex++].offset = currentOffsetCounter;
-        offsetBuffer->offsets[offsetArrayIndex++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP;
-        rv = UA_DateTime_encodeBinary(&(src->timestamp), bufPos, bufEnd);
-    }
-
-    // Picoseconds
-    if(src->picosecondsEnabled){
-        currentOffsetCounter += UA_UInt16_calcSizeBinary(&(src->picoseconds));
-        offsetBuffer->offsets[offsetArrayIndex++].offset = currentOffsetCounter;
-        offsetBuffer->offsets[offsetArrayIndex++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP_PICOSECONDS;
-        rv = UA_UInt16_encodeBinary(&(src->picoseconds), bufPos, bufEnd);
-    }
-
-    // PromotedFields
-    if(src->promotedFieldsEnabled) {
-        /* Size (calculate & encode) */
-        UA_UInt16 pfSize = 0;
-        for(UA_UInt16 i = 0; i < src->promotedFieldsSize; i++)
-            pfSize = (UA_UInt16) (pfSize + UA_Variant_calcSizeBinary(&src->promotedFields[i]));
-        currentOffsetCounter += UA_UInt16_calcSizeBinary(&pfSize);
-        rv |= UA_UInt16_encodeBinary(&pfSize, bufPos, bufEnd);
-
-        for (UA_UInt16 i = 0; i < src->promotedFieldsSize; i++){
-            currentOffsetCounter += UA_Variant_calcSizeBinary(&(src->promotedFields[i]));
-            //TODO Clarify promoted fields handling with RT / allow this with rt?
-            rv |= UA_Variant_encodeBinary(&(src->promotedFields[i]), bufPos, bufEnd);
-        }
-    }
-
-    // SecurityHeader
-    if(src->securityEnabled) {
-        // SecurityFlags
-        v = 0;
-        if(src->securityHeader.networkMessageSigned)
-            v |= SECURITY_HEADER_NM_SIGNED;
-
-        if(src->securityHeader.networkMessageEncrypted)
-            v |= SECURITY_HEADER_NM_ENCRYPTED;
-
-        if(src->securityHeader.securityFooterEnabled)
-            v |= SECURITY_HEADER_SEC_FOOTER_ENABLED;
-
-        if(src->securityHeader.forceKeyReset)
-            v |= SECURITY_HEADER_FORCE_KEY_RESET;
-
-        currentOffsetCounter += UA_Byte_calcSizeBinary(&v);
-        rv = UA_Byte_encodeBinary(&v, bufPos, bufEnd);
-        if(rv != UA_STATUSCODE_GOOD)
-            return rv;
-
-        // SecurityTokenId
-        currentOffsetCounter += UA_UInt32_calcSizeBinary(&src->securityHeader.securityTokenId);
-        rv = UA_UInt32_encodeBinary(&src->securityHeader.securityTokenId, bufPos, bufEnd);
-        if(rv != UA_STATUSCODE_GOOD)
-            return rv;
-
-        // NonceLength
-        currentOffsetCounter += UA_Byte_calcSizeBinary(&src->securityHeader.nonceLength);
-        rv = UA_Byte_encodeBinary(&src->securityHeader.nonceLength, bufPos, bufEnd);
-        if(rv != UA_STATUSCODE_GOOD)
-            return rv;
-
-        // MessageNonce
-        for (UA_Byte i = 0; i < src->securityHeader.nonceLength; i++) {
-            currentOffsetCounter += UA_Byte_calcSizeBinary(&(src->securityHeader.messageNonce.data[i]));
-            rv = UA_Byte_encodeBinary(&(src->securityHeader.messageNonce.data[i]), bufPos, bufEnd);
-            if(rv != UA_STATUSCODE_GOOD)
-                return rv;
-        }
-
-        // SecurityFooterSize
-        if(src->securityHeader.securityFooterEnabled) {
-            currentOffsetCounter += UA_UInt16_calcSizeBinary(&src->securityHeader.securityFooterSize);
-            rv = UA_UInt16_encodeBinary(&src->securityHeader.securityFooterSize, bufPos, bufEnd);
-            if(rv != UA_STATUSCODE_GOOD)
-                return rv;
-        }
-    }
-
-    return UA_STATUSCODE_BADNOTIMPLEMENTED;
+    return rv;
 }
 
-
 UA_StatusCode
 UA_NetworkMessage_encodeBinary(const UA_NetworkMessage* src, UA_Byte **bufPos,
                                const UA_Byte *bufEnd) {

+ 9 - 5
src/pubsub/ua_pubsub_networkmessage.h

@@ -81,8 +81,11 @@ typedef enum {
 typedef struct {
     UA_NetworkMessageOffsetType contentType;
     union {
+        union {
+            UA_DataValue *value;
+            size_t valueBinarySize;
+        } value;
         UA_DateTime *timestamp;
-        UA_DataValue *value;
     } offsetData;
     size_t offset;
 } UA_NetworkMessageOffset;
@@ -231,14 +234,15 @@ typedef struct {
     UA_ByteString signature;
 } UA_NetworkMessage;
 
-UA_StatusCode
+size_t
 UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
                                        const UA_NetworkMessage* p);
 
+size_t
+UA_DataSetMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
+                                       const UA_DataSetMessage* p, size_t currentOffset);
 UA_StatusCode
-UA_NetworkMessage_generateOffsetBuffer2(UA_NetworkMessageOffsetBuffer *offsetBuffer,
-                                       const UA_NetworkMessage* src, UA_Byte **bufPos,
-                                       const UA_Byte *bufEnd);
+UA_NetworkMessage_updateBufferedMessage(UA_NetworkMessageOffsetBuffer *buffer);
 
 UA_StatusCode
 UA_NetworkMessage_encodeBinary(const UA_NetworkMessage* src,

+ 36 - 5
src/pubsub/ua_pubsub_writer.c

@@ -8,11 +8,13 @@
  */
 
 #include <open62541/server_pubsub.h>
+#include <open62541/plugin/log_stdout.h>
 #include "server/ua_server_internal.h"
 
 #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
 
 #include "ua_pubsub.h"
+#include "ua_pubsub_networkmessage.h"
 
 #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
 #include "ua_pubsub_ns0.h"
@@ -258,17 +260,14 @@ UA_Server_freezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writ
         }
     }
     if(wg->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE){
-        //UA_NetworkMessage_decodeBinary(, )
-        //UA_NetworkMessage_calculateBufferAndOffets(server, w)
-
         if(wg->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP) {
             UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                           "PubSub RT Fail: Non RT capable encoding.");
+                           "PubSub-RT configuration fail: Non-RT capable encoding.");
             return UA_STATUSCODE_BADNOTSUPPORTED;
         }
         //TODO Clarify: should we only allow = maxEncapsulatedDataSetMessageCount == 1 with RT?
         //TODO Clarify: Behaviour if the finale size is more than MTU
-        //TODO Check if there are not allowed promoted fields contained
+
         //generate data set messages
         UA_STACKARRAY(UA_UInt16, dsWriterIds, wg->writersCount);
         UA_STACKARRAY(UA_DataSetMessage, dsmStore, wg->writersCount);
@@ -282,7 +281,14 @@ UA_Server_freezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writ
                                "PubSub Publish: PublishedDataSet not found");
                 continue;
             }
+            if(pds->promotedFieldsCount > 0) {
+                UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                               "PubSub-RT configuration fail: PDS contains promoted fields.");
+                return UA_STATUSCODE_BADNOTSUPPORTED;
+            }
             //TODO Check if all fields have an fixed size and static value source
+            //loop over the fields and check value source + type
+
             /* Generate the DSM */
             UA_StatusCode res =
                     UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
@@ -298,6 +304,16 @@ UA_Server_freezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writ
         generateNetworkMessage(pubSubConnection, wg, dsmStore, dsWriterIds, (UA_Byte) dsmCount,
                                &wg->config.messageSettings, &wg->config.transportSettings, &networkMessage);
         UA_NetworkMessage_generateOffsetBuffer(&wg->bufferedMessage, &networkMessage);
+        /* Allocate the buffer. Allocate on the stack if the buffer is small. */
+        UA_ByteString buf;
+        size_t msgSize = UA_NetworkMessage_calcSizeBinary(&networkMessage);
+        UA_StatusCode  retval = UA_ByteString_allocBuffer(&buf, msgSize);
+        if(retval != UA_STATUSCODE_GOOD)
+            return UA_STATUSCODE_BADOUTOFMEMORY;
+        wg->bufferedMessage.buffer = buf;
+        const UA_Byte *bufEnd = &wg->bufferedMessage.buffer.data[wg->bufferedMessage.buffer.length];
+        UA_Byte *bufPos = wg->bufferedMessage.buffer.data;
+        UA_NetworkMessage_encodeBinary(&networkMessage, &bufPos, bufEnd);
     }
     return UA_STATUSCODE_GOOD;
 }
@@ -1764,6 +1780,15 @@ generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
     return UA_STATUSCODE_GOOD;
 }
 
+static UA_StatusCode
+sendBufferedNetworkMessage(UA_PubSubConnection *connection,
+        UA_NetworkMessageOffsetBuffer *buffer, UA_ExtensionObject *transportSettings) {
+    if(UA_NetworkMessage_updateBufferedMessage(buffer) != UA_STATUSCODE_GOOD)
+        UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub sending. Unknown field type.");
+    UA_StatusCode retval = connection->channel->send(connection->channel, transportSettings, &buffer->buffer);
+    return retval;
+}
+
 static UA_StatusCode
 sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
                    UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
@@ -1840,6 +1865,12 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
         return;
     }
 
+    //TODO Avoid connection lookup while publish! Attention simple pointer vs. realloc in pubsub manager
+    if(writerGroup->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE) {
+        sendBufferedNetworkMessage(connection, &writerGroup->bufferedMessage, &writerGroup->config.transportSettings);
+        return;
+    }
+
     /* How many DSM can be sent in one NM? */
     UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
     if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)