Browse Source

Add pubsub publish callback mechanism

Andreas Ebner 6 years ago
parent
commit
79cb8c68b8

+ 3 - 0
doc/CMakeLists.txt

@@ -44,6 +44,7 @@ generate_rst(${PROJECT_SOURCE_DIR}/examples/tutorial_server_datasource.c ${DOC_S
 generate_rst(${PROJECT_SOURCE_DIR}/examples/tutorial_server_object.c ${DOC_SRC_DIR}/tutorial_server_object.rst)
 generate_rst(${PROJECT_SOURCE_DIR}/examples/tutorial_server_method.c ${DOC_SRC_DIR}/tutorial_server_method.rst)
 generate_rst(${PROJECT_SOURCE_DIR}/examples/tutorial_client_firststeps.c ${DOC_SRC_DIR}/tutorial_client_firststeps.rst)
+generate_rst(${PROJECT_SOURCE_DIR}/examples/pubsub/tutorial_pubsub_publish.c ${DOC_SRC_DIR}/tutorial_pubsub_publish.rst)
 
 add_custom_target(doc_latex ${SPHINX_EXECUTABLE}
   -b latex "${DOC_SRC_DIR}" "${DOC_LATEX_DIR}"
@@ -64,6 +65,7 @@ add_custom_target(doc_latex ${SPHINX_EXECUTABLE}
           ${DOC_SRC_DIR}/tutorial_server_datasource.rst
           ${DOC_SRC_DIR}/tutorial_server_object.rst
           ${DOC_SRC_DIR}/tutorial_server_method.rst
+          ${DOC_SRC_DIR}/tutorial_pubsub_publish.rst
           ${DOC_SRC_DIR}/plugin_pubsub_connection.rst
           ${DOC_SRC_DIR}/pubsub.rst
   COMMENT "Building LaTeX sources for documentation with Sphinx")
@@ -96,6 +98,7 @@ add_custom_target(doc ${SPHINX_EXECUTABLE}
           ${DOC_SRC_DIR}/tutorial_server_datasource.rst
           ${DOC_SRC_DIR}/tutorial_server_object.rst
           ${DOC_SRC_DIR}/tutorial_server_method.rst
+          ${DOC_SRC_DIR}/tutorial_pubsub_publish.rst
           ${DOC_SRC_DIR}/plugin_pubsub_connection.rst
           ${DOC_SRC_DIR}/pubsub.rst
   COMMENT "Building HTML documentation with Sphinx")

+ 1 - 0
doc/tutorials.rst

@@ -13,3 +13,4 @@ Tutorials
    tutorial_server_object.rst
    tutorial_server_method.rst
    tutorial_client_firststeps.rst
+   tutorial_pubsub_publish.rst

BIN
doc/ua-wireshark-pubsub.png


+ 109 - 47
examples/pubsub/tutorial_pubsub_publish.c

@@ -1,21 +1,34 @@
 /* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
  * See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */
 
+/**
+ * .. _pubsub-tutorial:
+ *
+ * Working with Publish/Subscribe
+ * ------------------------------
+ *
+ * Work in progress:
+ * This Tutorial will be continuously extended during the next PubSub batches. More details about
+ * the PubSub extension and corresponding open62541 API are located here: :ref:`pubsub`.
+ *
+ * Publishing Fields
+ * ^^^^^^^^^^^^^^^^^
+ * The PubSub publish example demonstrate the simplest way to publish
+ * informations from the information model over UDP multicast using
+ * the UADP encoding.
+ *
+ * **Connection handling**
+ * PubSubConnections can be created and deleted on runtime. More details about the system preconfiguration and
+ * connection can be found in ``tutorial_pubsub_connection.c``.
+ */
 #include <signal.h>
 #include "open62541.h"
+UA_NodeId connectionIdent, publishedDataSetIdent, writerGroupIdent;
 
-/* Work in progress: This Tutorial/Example will be continuously extended during the next PubSub batches */
-UA_NodeId connectionIdentifier;
-
-UA_Boolean running = true;
-static void stopHandler(int sign) {
-    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
-    running = false;
-}
-
-static UA_StatusCode
+static void
 addPubSubConnection(UA_Server *server){
-    /* Details about the connection configuration and handling are located in the pubsub connection tutorial */
+    /* Details about the connection configuration and handling are located
+     * in the pubsub connection tutorial */
     UA_PubSubConnectionConfig connectionConfig;
     memset(&connectionConfig, 0, sizeof(connectionConfig));
     connectionConfig.name = UA_STRING("UDP-UADP Connection 1");
@@ -24,44 +37,33 @@ addPubSubConnection(UA_Server *server){
     UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING("opc.udp://224.0.0.22:4840/")};
     UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
     connectionConfig.publisherId.numeric = UA_UInt32_random();
-    UA_StatusCode retval = UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdentifier);
-    return retval;
+    UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
 }
 
 /**
- * The PubSub publish example demonstrate the simplest way to publish
- * informations from the information model over UDP Multicast.
+ * **PublishedDataSet handling**
+ * The PublishedDataSet (PDS) and PubSubConnection are the toplevel entities and can exist alone. The PDS contains
+ * the collection of the published fields.
+ * All other PubSub elements are directly or indirectly linked with the PDS or connection.
  */
-int main(void) {
-    signal(SIGINT, stopHandler);
-    signal(SIGTERM, stopHandler);
-
-    UA_StatusCode retval = UA_STATUSCODE_GOOD;
-    UA_ServerConfig *config = UA_ServerConfig_new_default();
-    /* Details about the connection configuration and handling are located in the pubsub connection tutorial */
-    config->pubsubTransportLayers = (UA_PubSubTransportLayer *) UA_malloc(sizeof(UA_PubSubTransportLayer));
-    if(!config->pubsubTransportLayers) {
-        UA_ServerConfig_delete(config);
-        return -1;
-    }
-    config->pubsubTransportLayers[0] = UA_PubSubTransportLayerUDPMP();
-    config->pubsubTransportLayersSize++;
-    UA_Server *server = UA_Server_new(config);
-
-    if(addPubSubConnection(server) != UA_STATUSCODE_GOOD)
-        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed!");
-
-    /* The PublishedDataSetConfig contains all necessary public informations for the creation of a new PublishedDataSet */
+static void
+addPublishedDataSet(UA_Server *server) {
+    /* The PublishedDataSetConfig contains all necessary public
+    * informations for the creation of a new PublishedDataSet */
     UA_PublishedDataSetConfig publishedDataSetConfig;
     memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
     publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
-    publishedDataSetConfig.name = UA_STRING("Robot Axis");
-
+    publishedDataSetConfig.name = UA_STRING("Demo PDS");
     /* Create new PublishedDataSet based on the PublishedDataSetConfig. */
-    UA_NodeId publishedDataSetIdent;
-    if(UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent).addResult == UA_STATUSCODE_GOOD)
-        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub PublishedDataSet creation successful!");
+    UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
+}
 
+/**
+ * **DataSetField handling**
+ * The DataSetField (DSF) is part of the PDS and describes exactly one published field.
+ */
+static void
+addDataSetField(UA_Server *server) {
     /* Add a field to the previous created PublishedDataSet */
     UA_NodeId dataSetFieldIdent;
     UA_DataSetFieldConfig dataSetFieldConfig;
@@ -69,24 +71,41 @@ int main(void) {
     dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
     dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("Server localtime");
     dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
-    dataSetFieldConfig.field.variable.publishParameters.publishedVariable = UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_LOCALTIME);
+    dataSetFieldConfig.field.variable.publishParameters.publishedVariable =
+            UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_LOCALTIME);
     dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
     UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig, &dataSetFieldIdent);
+}
 
+/**
+ * **WriterGroup handling**
+ * The WriterGroup (WG) is part of the connection and contains the primary configuration
+ * parameters for the message creation.
+ */
+static void
+addWriterGroup(UA_Server *server) {
     /* Now we create a new WriterGroupConfig and add the group to the existing PubSubConnection. */
-    UA_NodeId writerGroupIdent;
     UA_WriterGroupConfig writerGroupConfig;
     memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
     writerGroupConfig.name = UA_STRING("Demo WriterGroup");
     writerGroupConfig.publishingInterval = 100;
     writerGroupConfig.enabled = UA_FALSE;
     writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
-    /* The configuration flags for the messages are encapsulated inside the message- and transport settings
-     * extension objects. These extension objects are defined by the standard. e.g. UadpWriterGroupMessageDataType */
-    UA_Server_addWriterGroup(server, connectionIdentifier, &writerGroupConfig, &writerGroupIdent);
+    /* The configuration flags for the messages are encapsulated inside the
+     * message- and transport settings extension objects. These extension objects
+     * are defined by the standard. e.g. UadpWriterGroupMessageDataType */
+    UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent);
+}
 
-    /* We need now a DataSetWriter within the WriterGroup. This means we must create a new DataSetWriterConfig and add
-     * call the addWriterGroup function. */
+/**
+ * **DataSetWriter handling**
+ * A DataSetWriter (DSW) is the glue between the WG and the PDS. The DSW is linked to exactly one
+ * PDS and contains additional informations for the message generation.
+ */
+static void
+addDataSetWriter(UA_Server *server) {
+    /* We need now a DataSetWriter within the WriterGroup. This means we must
+     * create a new DataSetWriterConfig and add call the addWriterGroup function. */
     UA_NodeId dataSetWriterIdent;
     UA_DataSetWriterConfig dataSetWriterConfig;
     memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
@@ -95,9 +114,52 @@ int main(void) {
     dataSetWriterConfig.keyFrameCount = 10;
     UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
                                &dataSetWriterConfig, &dataSetWriterIdent);
+}
+
+/**
+ * That's it! You're now publishing the selected fields.
+ * Open a packet inspection tool of trust e.g. wireshark and take a look on the outgoing packages.
+ * The following graphic figures out the packages created by this tutorial.
+ *
+ * .. figure:: ua-wireshark-pubsub.png
+ *     :figwidth: 100 %
+ *     :alt: OPC UA PubSub communication in wireshark
+ *
+ * The open62541 subscriber API will be released later. If you want to process the the datagrams,
+ * take a look on the ua_network_pubsub_networkmessage.c which already contains the decoding code for UADP messages.
+ *
+ * It follows the main server code, making use of the above definitions. */
+UA_Boolean running = true;
+static void stopHandler(int sign) {
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
+    running = false;
+}
+
+int main(void) {
+    signal(SIGINT, stopHandler);
+    signal(SIGTERM, stopHandler);
+
+    UA_StatusCode retval = UA_STATUSCODE_GOOD;
+    UA_ServerConfig *config = UA_ServerConfig_new_default();
+    /* Details about the connection configuration and handling are located in the pubsub connection tutorial */
+    config->pubsubTransportLayers = (UA_PubSubTransportLayer *) UA_malloc(sizeof(UA_PubSubTransportLayer));
+    if(!config->pubsubTransportLayers) {
+        UA_ServerConfig_delete(config);
+        return -1;
+    }
+    config->pubsubTransportLayers[0] = UA_PubSubTransportLayerUDPMP();
+    config->pubsubTransportLayersSize++;
+    UA_Server *server = UA_Server_new(config);
+
+    addPubSubConnection(server);
+    addPublishedDataSet(server);
+    addDataSetField(server);
+    addWriterGroup(server);
+    addDataSetWriter(server);
 
     retval |= UA_Server_run(server, &running);
     UA_Server_delete(server);
     UA_ServerConfig_delete(config);
     return (int)retval;
 }
+

+ 421 - 0
src/pubsub/ua_pubsub.c

@@ -5,10 +5,12 @@
  * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  */
 
+#include "ua_types_encoding_binary.h"
 #include "ua_server_pubsub.h"
 #include "server/ua_server_internal.h"
 #include "ua_pubsub.h"
 #include "ua_pubsub_manager.h"
+#include "ua_pubsub_networkmessage.h"
 
 /**********************************************/
 /*               Connection                   */
@@ -123,6 +125,7 @@ UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
     //deep copy of the config
     retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
     newWriterGroup->config = tmpWriterGroupConfig;
+    retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
     LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
     return retVal;
 }
@@ -138,6 +141,10 @@ UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
     if(!connection)
         return UA_STATUSCODE_BADNOTFOUND;
 
+    //unregister the publish callback
+    if(UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId) != UA_STATUSCODE_GOOD)
+        return UA_STATUSCODE_BADINTERNALERROR;
+
     UA_WriterGroup_deleteMembers(server, wg);
     UA_free(wg);
     return UA_STATUSCODE_GOOD;
@@ -601,3 +608,417 @@ void UA_DataSetField_deleteMembers(UA_DataSetField *field) {
     UA_DataValue_deleteMembers(&field->lastValue);
     LIST_REMOVE(field, listEntry);
 }
+
+/*********************************************************/
+/*               PublishValues handling                  */
+/*********************************************************/
+
+/**
+ * Compare two variants. Internally used for value change detection.
+ *
+ * @return UA_TRUE if the value has changed
+ */
+static UA_Boolean
+valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
+    if(! (oldValue && newValue))
+        return UA_FALSE;
+
+    UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
+    size_t oldValueEncodingSize, newValueEncodingSize;
+    oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
+    newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
+    if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
+        return UA_FALSE;
+
+    if(oldValueEncodingSize != newValueEncodingSize)
+        return UA_TRUE;
+
+    if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
+        return UA_FALSE;
+
+    if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
+        return UA_FALSE;
+
+    UA_Byte *bufPosOldValue = oldValueEncoding->data;
+    const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
+    UA_Byte *bufPosNewValue = newValueEncoding->data;
+    const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
+    if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
+                       &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
+        return UA_FALSE;
+    }
+    if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
+                       &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
+        return UA_FALSE;
+    }
+    oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
+    newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
+    UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
+    UA_ByteString_delete(oldValueEncoding);
+    UA_ByteString_delete(newValueEncoding);
+    return compareResult;
+}
+
+/**
+ * Obtain the latest value for a specific DataSetField. This method is currently
+ * called inside the DataSetMessage generation process.
+ */
+static UA_StatusCode
+UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field) {
+    /* Read the value */
+    UA_ReadValueId rvid;
+    UA_ReadValueId_init(&rvid);
+    rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
+    rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
+    rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
+    UA_DataValue value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
+    UA_DataValue_deleteMembers(&field->lastValue);
+    field->lastValue = value;
+    return UA_STATUSCODE_GOOD;
+}
+
+static UA_StatusCode
+UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
+                                               UA_DataSetWriter *dataSetWriter) {
+    UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
+    if(!currentDataSet)
+        return UA_STATUSCODE_BADNOTFOUND;
+    //prepare DataSetMessageContent
+    dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
+    dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
+    dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
+            UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
+    if(!dataSetMessage->data.keyFrameData.dataSetFields)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+
+    UA_DataSetField *tmpDataSetField;
+    size_t counter = 0;
+    LIST_FOREACH(tmpDataSetField, &currentDataSet->fields, listEntry){
+        if(UA_PubSubDataSetField_sampleValue(server, tmpDataSetField) == UA_STATUSCODE_GOOD){
+            //include field into DSM
+            UA_DataValue_init(&dataSetMessage->data.keyFrameData.dataSetFields[counter]);
+            UA_DataValue_copy(&tmpDataSetField->lastValue, &dataSetMessage->data.keyFrameData.dataSetFields[counter]);
+            if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0){
+                dataSetMessage->data.keyFrameData.dataSetFields[counter].hasStatus = UA_FALSE;
+            }
+            if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0){
+                dataSetMessage->data.keyFrameData.dataSetFields[counter].hasSourceTimestamp = UA_FALSE;
+                if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0){
+                    dataSetMessage->data.keyFrameData.dataSetFields[counter].hasServerPicoseconds = UA_FALSE;
+                }
+            }
+            if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0){
+                dataSetMessage->data.keyFrameData.dataSetFields[counter].hasServerTimestamp = UA_FALSE;
+            }
+            //Update lastValue store
+            UA_DataValue_deleteMembers(dataSetWriter->lastSamples[counter].value);
+            UA_DataValue_copy(&tmpDataSetField->lastValue, dataSetWriter->lastSamples[counter++].value);
+        }
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+static UA_StatusCode
+UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
+                                                 UA_DataSetWriter *dataSetWriter) {
+    UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
+    if(!currentDataSet)
+        return UA_STATUSCODE_BADNOTFOUND;
+    //prepare DataSetMessageContent
+    memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
+    dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
+    UA_DataSetField *tmpDataSetField;
+    size_t counter = 0;
+    LIST_FOREACH(tmpDataSetField, &currentDataSet->fields, listEntry) {
+        if(UA_PubSubDataSetField_sampleValue(server, tmpDataSetField) == UA_STATUSCODE_GOOD) {
+            //check if the value has changed
+            if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value->value, &tmpDataSetField->lastValue.value)){
+                //increase fieldCount for current delta message
+                dataSetMessage->data.deltaFrameData.fieldCount++;
+                dataSetWriter->lastSamples[counter].valeChanged = UA_TRUE;
+            }
+            //update last stored sample
+            UA_DataValue_init(dataSetWriter->lastSamples[counter].value);
+            UA_DataValue_copy(&tmpDataSetField->lastValue, dataSetWriter->lastSamples[counter++].value);
+        }
+    }
+    //allocate DeltaFrameFields
+    UA_DataSetMessage_DeltaFrameField * deltaFields = (UA_DataSetMessage_DeltaFrameField *)
+            UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
+    if(!deltaFields)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+
+    dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
+    size_t currentDeltaField = 0;
+    for(size_t i = 0; i < currentDataSet->fieldSize; i++){
+        if(dataSetWriter->lastSamples[i].valeChanged){
+            deltaFields[currentDeltaField].fieldIndex = (UA_UInt16) i;
+            UA_DataValue_copy(dataSetWriter->lastSamples[i].value, &deltaFields[currentDeltaField].fieldValue);
+            dataSetWriter->lastSamples[i].valeChanged = false;
+            if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0){
+                dataSetMessage->data.deltaFrameData.deltaFrameFields[currentDeltaField].fieldValue.hasStatus = UA_FALSE;
+            }
+            if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0){
+                dataSetMessage->data.deltaFrameData.deltaFrameFields[currentDeltaField].fieldValue.hasSourceTimestamp = UA_FALSE;
+                if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0){
+                    dataSetMessage->data.deltaFrameData.deltaFrameFields[currentDeltaField].fieldValue.hasServerPicoseconds = UA_FALSE;
+                }
+            }
+            if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0){
+                dataSetMessage->data.deltaFrameData.deltaFrameFields[currentDeltaField].fieldValue.hasServerTimestamp = UA_FALSE;
+            }
+            currentDeltaField++;
+        }
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+/**
+ * Generate a DataSetMessage for the given writer.
+ *
+ * @param dataSetWriter ptr to corresponding writer
+ * @return ptr to generated DataSetMessage
+ */
+static UA_StatusCode
+UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
+                                        UA_DataSetWriter *dataSetWriter) {
+    UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
+    if(!currentDataSet)
+        return UA_STATUSCODE_BADNOTFOUND;
+    memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
+    //currently is only UADP supported. The configuration Flags are included inside the std. defined UA_UadpDataSetWriterMessageDataType
+    UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
+    if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
+        dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
+       (dataSetWriter->config.messageSettings.content.decoded.type == &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
+        dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *) dataSetWriter->config.messageSettings.content.decoded.data;
+    } else {
+        //create default flag configuration if no UadpDataSetWriterMessageDataType was passed in
+        UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
+        memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
+        defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask) ((unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP |
+                                                                                                 (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
+                                                                                                 (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
+        dataSetWriterMessageDataType = &defaultUadpConfiguration;
+    }
+    if(dataSetWriterMessageDataType->networkMessageNumber != 0 || dataSetWriterMessageDataType->dataSetOffset != 0 ||
+       dataSetWriterMessageDataType->configuredSize !=0 ){
+        UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "Static DSM configuration not supported. Using defaults");
+        dataSetWriterMessageDataType->networkMessageNumber = 0;
+        dataSetWriterMessageDataType->dataSetOffset = 0;
+        dataSetWriterMessageDataType->configuredSize = 0;
+    }
+    //The encoding depends on the flags inside the writer config.
+    if(dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_RAWDATAENCODING) {
+        dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
+    } else if (dataSetWriter->config.dataSetFieldContentMask &
+               ((unsigned  int) UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP |
+                (unsigned  int) UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
+                (unsigned  int) UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS |
+                (unsigned  int) UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
+        dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
+    } else {
+        dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
+    }
+    //Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.'
+    if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION){
+        dataSetMessage->header.configVersionMajorVersionEnabled = UA_TRUE;
+        dataSetMessage->header.configVersionMajorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
+    }
+    if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION){
+        dataSetMessage->header.configVersionMinorVersionEnabled = UA_TRUE;
+        dataSetMessage->header.configVersionMinorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
+    }
+    if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
+        dataSetMessage->header.dataSetMessageSequenceNrEnabled = UA_TRUE;
+        dataSetMessage->header.dataSetMessageSequenceNr = dataSetWriter->actualDataSetMessageSequenceCount;
+    }
+    if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
+        dataSetMessage->header.timestampEnabled = UA_TRUE;
+        dataSetMessage->header.timestamp = UA_DateTime_now();
+        if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
+            dataSetMessage->header.picoSecondsIncluded = UA_FALSE;
+            UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DSM picosecond field is currently not supported. Using defaults");
+        }
+    }
+    if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_STATUS){
+        dataSetMessage->header.statusEnabled = UA_FALSE;
+        UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DSM status field is currently not supported. Using defaults");
+    }
+    if(dataSetWriter->actualDataSetMessageSequenceCount < UA_UINT16_MAX){
+        dataSetWriter->actualDataSetMessageSequenceCount++;
+    } else {
+        dataSetWriter->actualDataSetMessageSequenceCount = 0;
+    }
+    //check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame.
+    if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
+       dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
+
+        //realloc pds dependent memory
+        dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
+        dataSetWriter->lastSamples = (UA_DataSetWriterSample * ) UA_realloc(dataSetWriter->lastSamples,
+                                                                            sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
+        if(!dataSetWriter->lastSamples)
+            return UA_STATUSCODE_BADOUTOFMEMORY;
+
+        for (size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
+            dataSetWriter->lastSamples[i].value = (UA_DataValue *) UA_calloc(1, sizeof(UA_DataValue));
+            if(!dataSetWriter->lastSamples[i].value)
+                return UA_STATUSCODE_BADOUTOFMEMORY;
+        }
+        dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
+        UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
+        dataSetWriter->deltaFrameCounter = 0;
+    } else if (currentDataSet->fieldSize == 1 || dataSetWriter->deltaFrameCounter == 0 || dataSetWriter->deltaFrameCounter > dataSetWriter->config.keyFrameCount){
+        //@info the standard defines: if a PDS contains only one fields no delta messages should be generated
+        //because they need more memory than a keyframe with 1 field.
+        UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
+        dataSetWriter->deltaFrameCounter = 1;
+    } else {
+        UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
+        dataSetWriter->deltaFrameCounter++;
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+/*
+ * This callback triggers the collection and publish of NetworkMessages and the contained DataSetMessages.
+ */
+void
+UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
+    if(!writerGroup){
+        UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. WriterGroup not found");
+        return;
+    }
+    if(writerGroup->writersCount <= 0)
+        return;
+
+    if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP) {
+        UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Unknown encoding type.");
+        return;
+    }
+    //prevent error if the maxEncapsulatedDataSetMessageCount is set to 0->1
+    writerGroup->config.maxEncapsulatedDataSetMessageCount = (UA_UInt16) (writerGroup->config.maxEncapsulatedDataSetMessageCount == 0 ||
+                                                                          writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX
+                                                                          ? 1 : writerGroup->config.maxEncapsulatedDataSetMessageCount);
+
+    UA_DataSetMessage *dsmStore = (UA_DataSetMessage *) UA_calloc(writerGroup->writersCount, sizeof(UA_DataSetMessage));
+    if(!dsmStore) {
+        UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage allocation failed");
+        return;
+    }
+    //The binary DataSetMessage sizes are part of the payload. Memory is allocated on the stack.
+    UA_STACKARRAY(UA_UInt16, dsmSizes, writerGroup->writersCount);
+    memset(dsmSizes, 0, writerGroup->writersCount * sizeof(UA_UInt16));
+    /*
+     * Calculate the number of needed NetworkMessages. The previous allocated DataSetMessage array is
+     * filled from left for combined DSM messages and from the right for single DSM.
+     *     Allocated DSM Array
+     *    +----------------------------+
+     *    |DSM1||DSM2||DSM3||DSM4||DSM5|
+     *    +--+----+-----+-----+-----+--+
+     *       |    |     |     |     |
+     *       |    |     |     |     |
+     *    +--v----v-----v-----v--+--v--+
+     *    |    NM1        || NM2 | NM3 |
+     *    +----------------------+-----+
+     *    NetworkMessages
+     */
+    UA_UInt16 combinedNetworkMessageCount = 0, singleNetworkMessagesCount = 0;
+    UA_DataSetWriter *tmpDataSetWriter;
+    LIST_FOREACH(tmpDataSetWriter, &writerGroup->writers, listEntry){
+        //if promoted fields are contained in the PublishedDataSet, then this DSM must encapsulated in one NM
+        UA_PublishedDataSet *tmpPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, tmpDataSetWriter->connectedDataSet);
+        if(!tmpPublishedDataSet) {
+            UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. PublishedDataSet not found");
+            return;
+        }
+        if(tmpPublishedDataSet->promotedFieldsCount > 0) {
+            if(UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[(writerGroup->writersCount - 1) - singleNetworkMessagesCount],
+                                                       tmpDataSetWriter) != UA_STATUSCODE_GOOD){
+                UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. DataSetMessage creation failed");
+                return;
+            };
+            dsmSizes[(writerGroup->writersCount-1) - singleNetworkMessagesCount] = (UA_UInt16) UA_DataSetMessage_calcSizeBinary(&dsmStore[(writerGroup->writersCount-1)
+                                                                                                                                          - singleNetworkMessagesCount]);
+            singleNetworkMessagesCount++;
+        } else {
+            if(UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[combinedNetworkMessageCount], tmpDataSetWriter) != UA_STATUSCODE_GOOD){
+                UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. DataSetMessage creation failed");
+                return;
+            };
+            dsmSizes[combinedNetworkMessageCount] = (UA_UInt16) UA_DataSetMessage_calcSizeBinary(&dsmStore[combinedNetworkMessageCount]);
+            combinedNetworkMessageCount++;
+        }
+    }
+    UA_UInt32 networkMessageCount = singleNetworkMessagesCount;
+    if(combinedNetworkMessageCount != 0){
+        networkMessageCount += combinedNetworkMessageCount / writerGroup->config.maxEncapsulatedDataSetMessageCount +
+                               (combinedNetworkMessageCount % writerGroup->config.maxEncapsulatedDataSetMessageCount) == 0 ? 0 : 1;
+    }
+    //Alloc memory for the NetworkMessages on the stack
+    UA_STACKARRAY(UA_NetworkMessage, nmStore, networkMessageCount);
+    memset(nmStore, 0, networkMessageCount * sizeof(UA_NetworkMessage));
+    UA_UInt32 currentDSMPosition = 0;
+    for(UA_UInt32 i = 0; i < networkMessageCount; i++) {
+        nmStore[i].version = 1;
+        nmStore[i].networkMessageType = UA_NETWORKMESSAGE_DATASET;
+        //create combined NetworkMessages
+        if(i < (networkMessageCount-singleNetworkMessagesCount)){
+            if(combinedNetworkMessageCount - (i * writerGroup->config.maxEncapsulatedDataSetMessageCount) > 0){
+                currentDSMPosition = i * writerGroup->config.maxEncapsulatedDataSetMessageCount;
+                nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) writerGroup->config.maxEncapsulatedDataSetMessageCount;
+                nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
+                nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
+            } else {
+                currentDSMPosition = i * writerGroup->config.maxEncapsulatedDataSetMessageCount;
+                nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) (currentDSMPosition - ((i - 1) * writerGroup->config.maxEncapsulatedDataSetMessageCount)); //attention cast from uint32 to byte
+                nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
+                nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
+            }
+        } else {///create single NetworkMessages (1 DSM per NM)
+            nmStore[i].payloadHeader.dataSetPayloadHeader.count = 1;
+            currentDSMPosition = (UA_UInt32) combinedNetworkMessageCount + (i - combinedNetworkMessageCount/writerGroup->config.maxEncapsulatedDataSetMessageCount
+                                                                            + (combinedNetworkMessageCount % writerGroup->config.maxEncapsulatedDataSetMessageCount) == 0 ? 0 : 1);
+            nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
+            nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
+        }
+        UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
+        if(!connection){
+            UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. PubSubConnection invalid.");
+            return;
+        }
+        //send the prepared messages
+        UA_ByteString buf;
+        size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nmStore[i]);
+        if(UA_ByteString_allocBuffer(&buf, msgSize) == UA_STATUSCODE_GOOD) {
+            UA_Byte *bufPos = buf.data;
+            memset(bufPos, 0, msgSize);
+            const UA_Byte *bufEnd = &(buf.data[buf.length]);
+            if(UA_NetworkMessage_encodeBinary(&nmStore[i], &bufPos, bufEnd) != UA_STATUSCODE_GOOD){
+                UA_ByteString_deleteMembers(&buf);
+                return;
+            };
+            connection->channel->send(connection->channel, NULL, &buf);
+        }
+        UA_ByteString_deleteMembers(&buf);
+        UA_NetworkMessage_deleteMembers(&nmStore[i]);
+    }
+}
+
+/*
+ * Add new publishCallback. The first execution is triggered directly after creation.
+ * @Warning - The duration (double) is currently casted to int. -> intervals smaller 1ms are not possible.
+ */
+UA_StatusCode
+UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
+    UA_StatusCode retval =
+            UA_PubSubManager_addRepeatedCallback(server, (UA_ServerCallback) UA_WriterGroup_publishCallback,
+                                                 writerGroup, (UA_UInt32) writerGroup->config.publishingInterval,
+                                                 &writerGroup->publishCallbackId);
+    if(retval == UA_STATUSCODE_GOOD)
+        writerGroup->publishCallbackIsRegistered = true;
+    //run once after creation
+    UA_WriterGroup_publishCallback(server, writerGroup);
+    return retval;
+}

+ 9 - 0
src/pubsub/ua_pubsub.h

@@ -140,6 +140,15 @@ UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier);
 void
 UA_DataSetField_deleteMembers(UA_DataSetField *field);
 
+/*********************************************************/
+/*               PublishValues handling                  */
+/*********************************************************/
+
+UA_StatusCode
+UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup);
+void
+UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup);
+
 #ifdef __cplusplus
 } // extern "C"
 #endif

+ 21 - 0
src/pubsub/ua_pubsub_manager.c

@@ -273,3 +273,24 @@ UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager) {
         UA_free(&server->config.pubsubTransportLayers[i]);
     }
 }
+
+/***********************************/
+/*      PubSub Jobs abstraction    */
+/***********************************/
+UA_StatusCode
+UA_PubSubManager_addRepeatedCallback(UA_Server *server, UA_ServerCallback callback,
+                                     void *data, UA_UInt32 interval, UA_UInt64 *callbackId) {
+    return UA_Timer_addRepeatedCallback(&server->timer, (UA_TimerCallback)callback,
+                                        data, interval, callbackId);
+}
+
+UA_StatusCode
+UA_PubSubManager_changeRepeatedCallbackInterval(UA_Server *server, UA_UInt64 callbackId,
+                                                UA_UInt32 interval) {
+    return UA_Timer_changeRepeatedCallbackInterval(&server->timer, callbackId, interval);
+}
+
+UA_StatusCode
+UA_PubSubManager_removeRepeatedPubSubCallback(UA_Server *server, UA_UInt64 callbackId) {
+    return UA_Timer_removeRepeatedCallback(&server->timer, callbackId);
+}

+ 12 - 0
src/pubsub/ua_pubsub_manager.h

@@ -32,6 +32,18 @@ UA_PubSubManager_generateUniqueNodeId(UA_Server *server, UA_NodeId *nodeId);
 UA_UInt32
 UA_PubSubConfigurationVersionTimeDifference(void);
 
+/***********************************/
+/*      PubSub Jobs abstraction    */
+/***********************************/
+UA_StatusCode
+UA_PubSubManager_addRepeatedCallback(UA_Server *server, UA_ServerCallback callback,
+                                     void *data, UA_UInt32 interval, UA_UInt64 *callbackId);
+UA_StatusCode
+UA_PubSubManager_changeRepeatedCallbackInterval(UA_Server *server, UA_UInt64 callbackId,
+                                                UA_UInt32 interval);
+UA_StatusCode
+UA_PubSubManager_removeRepeatedPubSubCallback(UA_Server *server, UA_UInt64 callbackId);
+
 #ifdef __cplusplus
 } // extern "C"
 #endif

+ 53 - 3
tests/pubsub/check_pubsub_publish.c

@@ -5,14 +5,17 @@
  * Copyright (c) 2017 - 2018 Fraunhofer IOSB (Author: Andreas Ebner)
  */
 
-#include <ua_server_pubsub.h>
-#include <src_generated/ua_types_generated_encoding_binary.h>
 #include <ua_types.h>
-#include <ua_pubsub.h>
+#include <ua_server_pubsub.h>
+#include "ua_server_pubsub.h"
+#include "src_generated/ua_types_generated_encoding_binary.h"
+#include "ua_types.h"
+#include "ua_pubsub.h"
 #include "ua_config_default.h"
 #include "ua_network_pubsub_udp.h"
 #include "ua_server_internal.h"
 #include "check.h"
+#include "stdio.h"
 
 UA_Server *server = NULL;
 UA_ServerConfig *config = NULL;
@@ -146,12 +149,15 @@ static void setupDataSetWriterTestEnvironment(void){
     memset(&writerGroupConfig, 0, sizeof(writerGroupConfig));
     writerGroupConfig.name = UA_STRING("WriterGroup 1");
     writerGroupConfig.publishingInterval = 10;
+    writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
     UA_Server_addWriterGroup(server, connection1, &writerGroupConfig, &writerGroup1);
     writerGroupConfig.name = UA_STRING("WriterGroup 2");
     writerGroupConfig.publishingInterval = 50;
+    writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
     UA_Server_addWriterGroup(server, connection2, &writerGroupConfig, &writerGroup2);
     writerGroupConfig.name = UA_STRING("WriterGroup 3");
     writerGroupConfig.publishingInterval = 100;
+    writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
     UA_Server_addWriterGroup(server, connection2, &writerGroupConfig, &writerGroup3);
     UA_PublishedDataSetConfig pdsConfig;
     memset(&pdsConfig, 0, sizeof(UA_PublishedDataSetConfig));
@@ -346,6 +352,45 @@ START_TEST(GetDataSetFieldConfigurationAndCompareValues){
         UA_DataSetFieldConfig_deleteMembers(&fieldConfigCopy);
     } END_TEST
 
+
+START_TEST(SinglePublishDataSetField){
+        UA_WriterGroupConfig writerGroupConfig;
+        memset(&writerGroupConfig, 0, sizeof(writerGroupConfig));
+        writerGroupConfig.name = UA_STRING("WriterGroup 1");
+        writerGroupConfig.publishingInterval = 10;
+        writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
+        UA_Server_addWriterGroup(server, connection1, &writerGroupConfig, &writerGroup1);
+        writerGroupConfig.name = UA_STRING("WriterGroup 2");
+        writerGroupConfig.publishingInterval = 50;
+        writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
+        UA_Server_addWriterGroup(server, connection2, &writerGroupConfig, &writerGroup2);
+        writerGroupConfig.name = UA_STRING("WriterGroup 3");
+        writerGroupConfig.publishingInterval = 100;
+        writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
+        UA_Server_addWriterGroup(server, connection2, &writerGroupConfig, &writerGroup3);
+        UA_PublishedDataSetConfig pdsConfig;
+        memset(&pdsConfig, 0, sizeof(UA_PublishedDataSetConfig));
+        pdsConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
+        pdsConfig.name = UA_STRING("PublishedDataSet 1");
+        UA_Server_addPublishedDataSet(server, &pdsConfig, &publishedDataSet1);
+        UA_Server_addPublishedDataSet(server, &pdsConfig, &publishedDataSet2);
+        UA_DataSetWriterConfig dataSetWriterConfig;
+        memset(&dataSetWriterConfig, 0, sizeof(dataSetWriterConfig));
+        dataSetWriterConfig.name = UA_STRING("DataSetWriter 1");
+        UA_Server_addDataSetWriter(server, writerGroup1, publishedDataSet1, &dataSetWriterConfig, &dataSetWriter1);
+        UA_DataSetFieldConfig dataSetFieldConfig;
+        memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
+        dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
+        dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("Server localtime");
+        dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
+        dataSetFieldConfig.field.variable.publishParameters.publishedVariable = UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_LOCALTIME);
+        dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
+        UA_Server_addDataSetField(server, publishedDataSet1, &dataSetFieldConfig, NULL);
+
+        UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup1);
+        UA_WriterGroup_publishCallback(server, wg);
+    } END_TEST
+
 int main(void) {
     TCase *tc_add_pubsub_writergroup = tcase_create("PubSub WriterGroup items handling");
     tcase_add_checked_fixture(tc_add_pubsub_writergroup, setup, teardown);
@@ -371,10 +416,15 @@ int main(void) {
     tcase_add_test(tc_add_pubsub_datasetfields, AddDataSetFieldWithInvalidPDSId);
     tcase_add_test(tc_add_pubsub_datasetfields, GetDataSetFieldConfigurationAndCompareValues);
 
+    TCase *tc_pubsub_publish = tcase_create("PubSub publish DataSetFields");
+    tcase_add_checked_fixture(tc_pubsub_publish, setup, teardown);
+    tcase_add_test(tc_pubsub_publish, SinglePublishDataSetField);
+
     Suite *s = suite_create("PubSub WriterGroups/Writer/Fields handling and publishing");
     suite_add_tcase(s, tc_add_pubsub_writergroup);
     suite_add_tcase(s, tc_add_pubsub_datasetwriter);
     suite_add_tcase(s, tc_add_pubsub_datasetfields);
+    suite_add_tcase(s, tc_pubsub_publish);
 
     SRunner *sr = srunner_create(s);
     srunner_set_fork_status(sr, CK_NOFORK);