Kaynağa Gözat

Add pubsub WriterGroup/Writer/DataSetField handling

Andreas Ebner 6 yıl önce
ebeveyn
işleme
4b2dfd0241

+ 36 - 1
examples/pubsub/tutorial_pubsub_publish.c

@@ -5,6 +5,7 @@
 #include "open62541.h"
 
 /* Work in progress: This Tutorial/Example will be continuously extended during the next PubSub batches */
+UA_NodeId connectionIdentifier;
 
 UA_Boolean running = true;
 static void stopHandler(int sign) {
@@ -23,7 +24,6 @@ addPubSubConnection(UA_Server *server){
     UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING("opc.udp://224.0.0.22:4840/")};
     UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
     connectionConfig.publisherId.numeric = UA_UInt32_random();
-    UA_NodeId connectionIdentifier;
     UA_StatusCode retval = UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdentifier);
     return retval;
 }
@@ -53,6 +53,7 @@ int main(void) {
 
     /* The PublishedDataSetConfig contains all necessary public informations for the creation of a new PublishedDataSet */
     UA_PublishedDataSetConfig publishedDataSetConfig;
+    memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
     publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
     publishedDataSetConfig.name = UA_STRING("Robot Axis");
 
@@ -61,6 +62,40 @@ int main(void) {
     if(UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent).addResult == UA_STATUSCODE_GOOD)
         UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub PublishedDataSet creation successful!");
 
+    /* Add a field to the previous created PublishedDataSet */
+    UA_NodeId dataSetFieldIdent;
+    UA_DataSetFieldConfig dataSetFieldConfig;
+    memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
+    dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
+    dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("Server localtime");
+    dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
+    dataSetFieldConfig.field.variable.publishParameters.publishedVariable = UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_LOCALTIME);
+    dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
+    UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig, &dataSetFieldIdent);
+
+    /* Now we create a new WriterGroupConfig and add the group to the existing PubSubConnection. */
+    UA_NodeId writerGroupIdent;
+    UA_WriterGroupConfig writerGroupConfig;
+    memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
+    writerGroupConfig.name = UA_STRING("Demo WriterGroup");
+    writerGroupConfig.publishingInterval = 100;
+    writerGroupConfig.enabled = UA_FALSE;
+    writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
+    /* The configuration flags for the messages are encapsulated inside the message- and transport settings
+     * extension objects. These extension objects are defined by the standard. e.g. UadpWriterGroupMessageDataType */
+    UA_Server_addWriterGroup(server, connectionIdentifier, &writerGroupConfig, &writerGroupIdent);
+
+    /* We need now a DataSetWriter within the WriterGroup. This means we must create a new DataSetWriterConfig and add
+     * call the addWriterGroup function. */
+    UA_NodeId dataSetWriterIdent;
+    UA_DataSetWriterConfig dataSetWriterConfig;
+    memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
+    dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
+    dataSetWriterConfig.dataSetWriterId = 62541;
+    dataSetWriterConfig.keyFrameCount = 10;
+    UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
+                               &dataSetWriterConfig, &dataSetWriterIdent);
+
     retval |= UA_Server_run(server, &running);
     UA_Server_delete(server);
     UA_ServerConfig_delete(config);

+ 205 - 53
include/ua_server_pubsub.h

@@ -34,7 +34,7 @@ extern "C" {
  * 1. Create a configuration for the needed PubSub element.
  *
  * 2. Call the add[element] function and pass in the configuration.
- * 
+ *
  * 3. The add[element] function returns the unique nodeId of the internally created element.
  *
  * Take a look on the PubSub Tutorials for mor details about the API usage.::
@@ -46,20 +46,28 @@ extern "C" {
  *   |    |
  *   |    |
  *   |    |  +----------------------+
- *   |    +->| UA_PubSubConnections |  UA_Server_addPubSubConnection
+ *   |    +--> UA_PubSubConnection  |  UA_Server_addPubSubConnection
  *   |       +----------------------+
  *   |        |    |
- *   |        |    |    +-----------------------+
- *   |        |    +--->| UA_PubSubWriterGroups |
- *   |        |         +-----------------------+
- *   |        |
- *   |        |         +-----------------------+
- *   |        +-------->| UA_PubSubReaderGroups |
- *   |                  +-----------------------+
- *   |
- *   |       +---------------------------+
- *   +------>| UA_PubSubPublishedDataSet |  UA_Server_addPublishedDataSet
+ *   |        |    |    +----------------+
+ *   |        |    +----> UA_WriterGroup |  UA_PubSubConnection_addWriterGroup
+ *   |        |         +----------------+
+ *   |        |              |
+ *   |        |              |    +------------------+
+ *   |        |              +----> UA_DataSetWriter |  UA_WriterGroup_addDataSetWriter  +-+
+ *   |        |                   +------------------+                                     |
+ *   |        |                                                                            |
+ *   |        |         +----------------+                                                 | r
+ *   |        +---------> UA_ReaderGroup |                                                 | e
+ *   |                  +----------------+                                                 | f
+ *   |                                                                                     |
+ *   |       +---------------------------+                                                 |
+ *   +-------> UA_PubSubPublishedDataSet |  UA_Server_addPublishedDataSet                <-+
  *           +---------------------------+
+ *                 |
+ *                 |    +-----------------+
+ *                 +----> UA_DataSetField |  UA_PublishedDataSet_addDataSetField
+ *                      +-----------------+
  *
  * Connections
  * -----------
@@ -68,17 +76,15 @@ extern "C" {
  * different transport protocols at runtime.
  *
  * Take a look on the PubSub Tutorials for mor details about the API usage.
- * Connections
- * -----------
  */
 
-typedef struct{
+typedef struct {
     UA_String name;
     UA_Boolean enabled;
-    union{ //std: valid types UInt or String
+    union { /* std: valid types UInt or String */
         UA_UInt32 numeric;
         UA_String string;
-    }publisherId;
+    } publisherId;
     UA_String transportProfileUri;
     UA_Variant address;
     size_t connectionPropertiesSize;
@@ -86,38 +92,34 @@ typedef struct{
     UA_Variant connectionTransportSettings;
 } UA_PubSubConnectionConfig;
 
-/**
- * Connection handling
- * ^^^^^^^^^^^^^^^^^^^
- */
-
 UA_StatusCode
-UA_Server_addPubSubConnection(UA_Server *server, const UA_PubSubConnectionConfig *connectionConfig,
+UA_Server_addPubSubConnection(UA_Server *server,
+                              const UA_PubSubConnectionConfig *connectionConfig,
                               UA_NodeId *connectionIdentifier);
 
+/* Returns a deep copy of the config */
 UA_StatusCode
-UA_Server_removePubSubConnection(UA_Server *server, UA_NodeId connectionIdentifier);
+UA_Server_getPubSubConnectionConfig(UA_Server *server,
+                                    const UA_NodeId connection,
+                                    UA_PubSubConnectionConfig *config);
 
+/* Remove Connection, identified by the NodeId. Deletion of Connection
+ * removes all contained WriterGroups and Writers. */
 UA_StatusCode
-UA_PubSubConnection_getConfig(UA_Server *server, UA_NodeId connectionIdentifier,
-                              UA_PubSubConnectionConfig *config);
+UA_Server_removePubSubConnection(UA_Server *server, const UA_NodeId connection);
 
 /**
  * PublishedDataSets
  * -----------------
- * The PublishedDataSets (PDS) are containers for the published information. The PDS contain
- * the published variables and meta informations. The metainformations are commonly autogenerated
- * or given as constant argument as part of the template functions. The template functions are standard
- * defined and should only be used for configuration tools. You should normally
- * create a empty PDS and call the functions to add new fields.
- */
+ * The PublishedDataSets (PDS) are containers for the published information. The
+ * PDS contain the published variables and meta informations. The metadata is
+ * commonly autogenerated or given as constant argument as part of the template
+ * functions. The template functions are standard defined and intended for
+ * configuration tools. You should normally create a empty PDS and call the
+ * functions to add new fields. */
 
-typedef struct {
-    UA_StatusCode addResult;
-    size_t fieldAddResultsSize;
-    UA_StatusCode *fieldAddResults;
-    UA_ConfigurationVersionDataType configurationVersion;
-} UA_AddPublishedDataSetResult;
+/* The UA_PUBSUB_DATASET_PUBLISHEDITEMS has currently no additional members and
+ * thus no dedicated config structure. */
 
 typedef enum {
     UA_PUBSUB_DATASET_PUBLISHEDITEMS,
@@ -126,10 +128,6 @@ typedef enum {
     UA_PUBSUB_DATASET_PUBLISHEDEVENTS_TEMPLATE,
 } UA_PublishedDataSetType;
 
-/* The UA_PUBSUB_DATASET_PUBLISHEDITEMS has currently no additional members
- * and thus no dedicated config structure.
- */
-
 typedef struct {
     UA_DataSetMetaDataType metaData;
     size_t variablesToAddSize;
@@ -149,11 +147,11 @@ typedef struct {
     UA_ContentFilter filter;
 } UA_PublishedEventTemplateConfig;
 
-/* Configuration structure for PubSubDataSet */
+/* Configuration structure for PublishedDataSet */
 typedef struct {
     UA_String name;
     UA_PublishedDataSetType publishedDataSetType;
-    union{
+    union {
         /* The UA_PUBSUB_DATASET_PUBLISHEDITEMS has currently no additional members
          * and thus no dedicated config structure.*/
         UA_PublishedDataItemsTemplateConfig itemsTemplate;
@@ -162,25 +160,179 @@ typedef struct {
     } config;
 } UA_PublishedDataSetConfig;
 
-/**
- * PublishedDataSet handling
- * ^^^^^^^^^^^^^^^^^^^^^^^^^
- */
+void
+UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig);
+
+typedef struct {
+    UA_StatusCode addResult;
+    size_t fieldAddResultsSize;
+    UA_StatusCode *fieldAddResults;
+    UA_ConfigurationVersionDataType configurationVersion;
+} UA_AddPublishedDataSetResult;
 
 UA_AddPublishedDataSetResult
-UA_Server_addPublishedDataSet(UA_Server *server, const UA_PublishedDataSetConfig *publishedDataSetConfig,
+UA_Server_addPublishedDataSet(UA_Server *server,
+                              const UA_PublishedDataSetConfig *publishedDataSetConfig,
                               UA_NodeId *pdsIdentifier);
 
+/* Returns a deep copy of the config */
 UA_StatusCode
-UA_Server_removePublishedDataSet(UA_Server *server, UA_NodeId pdsIdentifier);
+UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
+                                    UA_PublishedDataSetConfig *config);
 
+/* Remove PublishedDataSet, identified by the NodeId. Deletion of PDS removes
+ * all contained and linked PDS Fields. Connected WriterGroups will be also
+ * removed. */
 UA_StatusCode
-UA_PublishedDataSet_getConfig(UA_Server *server, UA_NodeId publishedDataSetIdentifier,
-                              UA_PublishedDataSetConfig *config);
+UA_Server_removePublishedDataSet(UA_Server *server, const UA_NodeId pds);
+
+/**
+ * DataSetFields
+ * -------------
+ * The description of published variables is named DataSetField. Each
+ * DataSetField contains the selection of one information model node. The
+ * DataSetField has additional parameters for the publishing, sampling and error
+ * handling process. */
+
+typedef struct{
+    UA_ConfigurationVersionDataType configurationVersion;
+    UA_String fieldNameAlias;
+    UA_Boolean promotedField;
+    UA_PublishedVariableDataType publishParameters;
+} UA_DataSetVariableConfig;
+
+typedef enum {
+    UA_PUBSUB_DATASETFIELD_VARIABLE,
+    UA_PUBSUB_DATASETFIELD_EVENT
+} UA_DataSetFieldType;
+
+typedef struct {
+    UA_DataSetFieldType dataSetFieldType;
+    union {
+        UA_DataSetVariableConfig variable;
+        //events need other config later
+    } field;
+} UA_DataSetFieldConfig;
+    
 void
-UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig);
+UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig);
 
+typedef struct {
+    UA_StatusCode result;
+    UA_ConfigurationVersionDataType configurationVersion;
+} UA_DataSetFieldResult;
+
+UA_DataSetFieldResult
+UA_Server_addDataSetField(UA_Server *server,
+                          const UA_NodeId publishedDataSet,
+                          const UA_DataSetFieldConfig *fieldConfig,
+                          UA_NodeId *fieldIdentifier);
+
+/* Returns a deep copy of the config */
+UA_StatusCode
+UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
+                                UA_DataSetFieldConfig *config);
+
+UA_DataSetFieldResult
+UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf);
+
+/**
+ * WriterGroup
+ * -----------
+ * All WriterGroups are created within a PubSubConnection and automatically
+ * deleted if the connection is removed. The WriterGroup is primary used as
+ * container for :ref:`dsw` and network message settings. The WriterGroup can be
+ * imagined as producer of the network messages. The creation of network
+ * messages is controlled by parameters like the publish interval, which is e.g.
+ * contained in the WriterGroup. */
+
+typedef enum {
+    UA_PUBSUB_ENCODING_BINARY,
+    UA_PUBSUB_ENCODING_JSON,
+    UA_PUBSUB_ENCODING_UADP
+} UA_PubSubEncodingType;
+
+typedef struct {
+    UA_String name;
+    UA_Boolean enabled;
+    UA_UInt16 writerGroupId;
+    UA_Double publishingInterval;
+    UA_Double keepAliveTime;
+    UA_Byte priority;
+    UA_MessageSecurityMode securityMode;
+    UA_ExtensionObject transportSettings;
+    UA_ExtensionObject messageSettings;
+    size_t groupPropertiesSize;
+    UA_KeyValuePair *groupProperties;
+    UA_PubSubEncodingType encodingMimeType;
+
+    /* non std. config parameter. maximum count of embedded DataSetMessage in
+     * one NetworkMessage */
+    UA_UInt16 maxEncapsulatedDataSetMessageCount;
+} UA_WriterGroupConfig;
+
+void
+UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig);
+
+/* Add a new WriterGroup to an existing Connection */
+UA_StatusCode
+UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
+                         const UA_WriterGroupConfig *writerGroupConfig,
+                         UA_NodeId *writerGroupIdentifier);
+
+/* Returns a deep copy of the config */
+UA_StatusCode
+UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
+                               UA_WriterGroupConfig *config);
+
+UA_StatusCode
+UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup);
 
+/**
+ * .. _dsw:
+ *
+ * DataSetWriter
+ * -------------
+ * The DataSetWriters are the glue between the WriterGroups and the
+ * PublishedDataSets. The DataSetWriter contain configuration parameters and
+ * flags which influence the creation of DataSet messages. These messages are
+ * encapsulated inside the network message. The DataSetWriter must be linked
+ * with an existing PublishedDataSet and be contained within a WriterGroup. */
+
+typedef struct {
+    UA_String name;
+    UA_UInt16 dataSetWriterId;
+    UA_DataSetFieldContentMask dataSetFieldContentMask;
+    UA_UInt32 keyFrameCount;
+    UA_ExtensionObject messageSettings;
+    UA_String dataSetName;
+    size_t dataSetWriterPropertiesSize;
+    UA_KeyValuePair *dataSetWriterProperties;
+} UA_DataSetWriterConfig;
+
+void
+UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig);
+
+/* Add a new DataSetWriter to a existing WriterGroup. The DataSetWriter must be
+ * coupled with a PublishedDataSet on creation.
+ *
+ * Part 14, 7.1.5.2.1 defines: The link between the PublishedDataSet and
+ * DataSetWriter shall be created when an instance of the DataSetWriterType is
+ * created. */
+UA_StatusCode
+UA_Server_addDataSetWriter(UA_Server *server,
+                           const UA_NodeId writerGroup, const UA_NodeId dataSet,
+                           const UA_DataSetWriterConfig *dataSetWriterConfig,
+                           UA_NodeId *writerIdentifier);
+
+/* Returns a deep copy of the config */
+UA_StatusCode
+UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
+                                 UA_DataSetWriterConfig *config);
+
+UA_StatusCode
+UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw);
+    
 #ifdef __cplusplus
 } // extern "C"
 #endif

+ 448 - 36
src/pubsub/ua_pubsub.c

@@ -13,8 +13,10 @@
 /**********************************************/
 /*               Connection                   */
 /**********************************************/
+
 UA_StatusCode
-UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src, UA_PubSubConnectionConfig *dst) {
+UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
+                               UA_PubSubConnectionConfig *dst) {
     UA_StatusCode retVal = UA_STATUSCODE_GOOD;
     memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
     retVal |= UA_String_copy(&src->name, &dst->name);
@@ -22,30 +24,29 @@ UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src, UA_PubSubCo
     retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
     retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
     if(src->connectionPropertiesSize > 0){
-        dst->connectionProperties = (UA_KeyValuePair *) UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
+        dst->connectionProperties = (UA_KeyValuePair *)
+            UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
         if(!dst->connectionProperties){
             return UA_STATUSCODE_BADOUTOFMEMORY;
         }
         for(size_t i = 0; i < src->connectionPropertiesSize; i++){
-            retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key, &dst->connectionProperties[i].key);
-            retVal |= UA_Variant_copy(&src->connectionProperties[i].value, &dst->connectionProperties[i].value);
+            retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
+                                            &dst->connectionProperties[i].key);
+            retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
+                                      &dst->connectionProperties[i].value);
         }
     }
     return retVal;
 }
 
-/**
- * Get the current config of the Connection.
- *
- * @return UA_STATUSCODE_GOOD on success
- */
 UA_StatusCode
-UA_PubSubConnection_getConfig(UA_Server *server, UA_NodeId connectionIdentifier,
-                              UA_PubSubConnectionConfig *config) {
+UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
+                                    UA_PubSubConnectionConfig *config) {
     if(!config)
         return UA_STATUSCODE_BADINVALIDARGUMENT;
 
-    UA_PubSubConnection *currentPubSubConnection = UA_PubSubConnection_findConnectionbyId(server, connectionIdentifier);
+    UA_PubSubConnection *currentPubSubConnection =
+        UA_PubSubConnection_findConnectionbyId(server, connection);
     if(!currentPubSubConnection)
         return UA_STATUSCODE_BADNOTFOUND;
 
@@ -56,11 +57,6 @@ UA_PubSubConnection_getConfig(UA_Server *server, UA_NodeId connectionIdentifier,
     return UA_STATUSCODE_GOOD;
 }
 
-/**
- * Find a Connection by the connectionIdentifier.
- *
- * @return ptr to Connection or NULL if not found
- */
 UA_PubSubConnection *
 UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
     for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
@@ -85,9 +81,14 @@ UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionCon
 }
 
 void
-UA_PubSubConnection_delete(UA_PubSubConnection *connection) {
+UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection) {
     //delete connection config
     UA_PubSubConnectionConfig_deleteMembers(connection->config);
+    //remove contained WriterGroups
+    UA_WriterGroup *writerGroup, *tmpWriterGroup;
+    LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
+        UA_Server_removeWriterGroup(server, writerGroup->identifier);
+    }
     UA_NodeId_deleteMembers(&connection->identifier);
     if(connection->channel){
         connection->channel->close(connection->channel);
@@ -95,6 +96,53 @@ UA_PubSubConnection_delete(UA_PubSubConnection *connection) {
     UA_free(connection->config);
 }
 
+UA_StatusCode
+UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
+                         const UA_WriterGroupConfig *writerGroupConfig,
+                         UA_NodeId *writerGroupIdentifier) {
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+    if(!writerGroupConfig)
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+    //search the connection by the given connectionIdentifier
+    UA_PubSubConnection *currentConnectionContext =
+        UA_PubSubConnection_findConnectionbyId(server, connection);
+    if(!currentConnectionContext)
+        return UA_STATUSCODE_BADNOTFOUND;
+
+    //allocate memory for new WriterGroup
+    UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
+    if (!newWriterGroup)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+
+    newWriterGroup->linkedConnection = currentConnectionContext->identifier;
+    UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
+    if(writerGroupIdentifier){
+        UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
+    }
+    UA_WriterGroupConfig tmpWriterGroupConfig;
+    //deep copy of the config
+    retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
+    newWriterGroup->config = tmpWriterGroupConfig;
+    LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
+    return retVal;
+}
+
+UA_StatusCode
+UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
+    UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
+    if(!wg)
+        return UA_STATUSCODE_BADNOTFOUND;
+
+    UA_PubSubConnection *connection =
+        UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
+    if(!connection)
+        return UA_STATUSCODE_BADNOTFOUND;
+
+    UA_WriterGroup_deleteMembers(server, wg);
+    UA_free(wg);
+    return UA_STATUSCODE_GOOD;
+}
+
 /**********************************************/
 /*               PublishedDataSet             */
 /**********************************************/
@@ -115,9 +163,11 @@ UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
                         src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
             }
             for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
-                retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i], &dst->config.itemsTemplate.variablesToAdd[i]);
+                retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
+                                                            &dst->config.itemsTemplate.variablesToAdd[i]);
             }
-            retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData, &dst->config.itemsTemplate.metaData);
+            retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
+                                                  &dst->config.itemsTemplate.metaData);
             break;
         default:
             return UA_STATUSCODE_BADINVALIDARGUMENT;
@@ -125,19 +175,13 @@ UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
     return retVal;
 }
 
-/**
- * Get the current config of the PublishedDataSetField.
- *
- * @return UA_STATUSCODE_GOOD on success
- */
 UA_StatusCode
-UA_PublishedDataSet_getConfig(UA_Server *server, UA_NodeId publishedDataSetIdentifier,
-                              UA_PublishedDataSetConfig *config){
+UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
+                                    UA_PublishedDataSetConfig *config){
     if(!config)
         return UA_STATUSCODE_BADINVALIDARGUMENT;
 
-    UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server,
-                                                                                         publishedDataSetIdentifier);
+    UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
     if(!currentPublishedDataSet)
         return UA_STATUSCODE_BADNOTFOUND;
 
@@ -148,11 +192,6 @@ UA_PublishedDataSet_getConfig(UA_Server *server, UA_NodeId publishedDataSetIdent
     return UA_STATUSCODE_GOOD;
 }
 
-/**
- * Get PDS by the dataSetIdentifier.
- *
- * @return ptr to PDS or NULL if not found
- */
 UA_PublishedDataSet *
 UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
     for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
@@ -163,7 +202,8 @@ UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
     return NULL;
 }
 
-void UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
+void
+UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
     //delete pds config
     UA_String_deleteMembers(&pdsConfig->name);
     switch (pdsConfig->publishedDataSetType){
@@ -184,8 +224,380 @@ void UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfi
     }
 }
 
-void UA_PublishedDataSet_delete(UA_PublishedDataSet *publishedDataSet){
+void
+UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
     UA_PublishedDataSetConfig_deleteMembers(&publishedDataSet->config);
     //delete PDS
     UA_DataSetMetaDataType_deleteMembers(&publishedDataSet->dataSetMetaData);
+    UA_DataSetField *field, *tmpField;
+    LIST_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
+        UA_Server_removeDataSetField(server, field->identifier);
+    }
+    UA_NodeId_deleteMembers(&publishedDataSet->identifier);
+}
+
+UA_DataSetFieldResult
+UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
+                          const UA_DataSetFieldConfig *fieldConfig,
+                          UA_NodeId *fieldIdentifier) {
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+    if(!fieldConfig)
+        return (UA_DataSetFieldResult) {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
+
+    UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
+    if(currentDataSet == NULL)
+        return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
+
+    if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS)
+        return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTIMPLEMENTED, {0, 0}};
+
+    UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
+    if(!newField)
+        return (UA_DataSetFieldResult) {UA_STATUSCODE_BADINTERNALERROR, {0, 0}};
+
+    UA_DataSetFieldConfig tmpFieldConfig;
+    retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
+    newField->config = tmpFieldConfig;
+    UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
+    if(fieldIdentifier != NULL){
+        UA_NodeId_copy(&newField->identifier, fieldIdentifier);
+    }
+    newField->publishedDataSet = currentDataSet->identifier;
+    //update major version of parent published data set
+    currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
+    LIST_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
+    if(newField->config.field.variable.promotedField)
+        currentDataSet->promotedFieldsCount++;
+    currentDataSet->fieldSize++;
+    UA_DataSetFieldResult result =
+        {retVal, {currentDataSet->dataSetMetaData.configurationVersion.majorVersion,
+                  currentDataSet->dataSetMetaData.configurationVersion.minorVersion}};
+    return result;
+}
+
+UA_DataSetFieldResult
+UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
+    UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
+    if(!currentField)
+        return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
+
+    UA_PublishedDataSet *parentPublishedDataSet =
+        UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
+    if(!parentPublishedDataSet)
+        return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
+
+    parentPublishedDataSet->fieldSize--;
+    if(currentField->config.field.variable.promotedField)
+        parentPublishedDataSet->promotedFieldsCount--;
+    
+    /* update major version of PublishedDataSet */
+    parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
+        UA_PubSubConfigurationVersionTimeDifference();
+    UA_DataSetField_deleteMembers(currentField);
+    UA_free(currentField);
+    UA_DataSetFieldResult result =
+        {UA_STATUSCODE_GOOD, {parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion,
+                              parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion}};
+    return result;
+}
+
+/**********************************************/
+/*               DataSetWriter                */
+/**********************************************/
+
+UA_StatusCode
+UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
+                            UA_DataSetWriterConfig *dst){
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+    memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
+    retVal |= UA_String_copy(&src->name, &dst->name);
+    retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
+    retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
+    dst->dataSetWriterProperties = (UA_KeyValuePair *)
+        UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
+    if(!dst->dataSetWriterProperties)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
+        retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
+    }
+    return retVal;
+}
+
+UA_StatusCode
+UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
+                                 UA_DataSetWriterConfig *config){
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+    if(!config)
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+
+    UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
+    if(!currentDataSetWriter)
+        return UA_STATUSCODE_BADNOTFOUND;
+
+    UA_DataSetWriterConfig tmpWriterConfig;
+    //deep copy of the actual config
+    retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
+    *config = tmpWriterConfig;
+    return retVal;
+}
+
+UA_DataSetWriter *
+UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
+    for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
+        UA_WriterGroup *tmpWriterGroup;
+        LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
+            UA_DataSetWriter *tmpWriter;
+            LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
+                if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
+                    return tmpWriter;
+                }
+            }
+        }
+    }
+    return NULL;
+}
+
+void
+UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig) {
+    UA_String_deleteMembers(&pdsConfig->name);
+    UA_String_deleteMembers(&pdsConfig->dataSetName);
+    for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
+        UA_KeyValuePair_deleteMembers(&pdsConfig->dataSetWriterProperties[i]);
+    }
+    UA_free(pdsConfig->dataSetWriterProperties);
+    UA_ExtensionObject_deleteMembers(&pdsConfig->messageSettings);
+}
+
+void
+UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter){
+    UA_DataSetWriterConfig_deleteMembers(&dataSetWriter->config);
+    //delete DataSetWriter
+    UA_NodeId_deleteMembers(&dataSetWriter->identifier);
+    UA_NodeId_deleteMembers(&dataSetWriter->linkedWriterGroup);
+    UA_NodeId_deleteMembers(&dataSetWriter->connectedDataSet);
+    LIST_REMOVE(dataSetWriter, listEntry);
+    //delete lastSamples store
+    for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++){
+        UA_DataValue_delete(dataSetWriter->lastSamples[i].value);
+    }
+    LIST_REMOVE(dataSetWriter, listEntry);
+    UA_free(dataSetWriter->lastSamples);
+}
+
+/**********************************************/
+/*               WriterGroup                  */
+/**********************************************/
+
+UA_StatusCode
+UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
+                          UA_WriterGroupConfig *dst){
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+    memcpy(dst, src, sizeof(UA_WriterGroupConfig));
+    retVal |= UA_String_copy(&src->name, &dst->name);
+    retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
+    retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
+    dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
+    if(!dst->groupProperties)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    for(size_t i = 0; i < src->groupPropertiesSize; i++){
+        retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
+    }
+    return retVal;
+}
+
+UA_StatusCode
+UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
+                               UA_WriterGroupConfig *config){
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+    if(!config)
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+
+    UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
+    if(!currentWriterGroup){
+        return UA_STATUSCODE_BADNOTFOUND;
+    }
+    UA_WriterGroupConfig tmpWriterGroupConfig;
+    //deep copy of the actual config
+    retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
+    *config = tmpWriterGroupConfig;
+    return retVal;
+}
+
+UA_WriterGroup *
+UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
+    for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
+        UA_WriterGroup *tmpWriterGroup;
+        LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
+            if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
+                return tmpWriterGroup;
+            }
+        }
+    }
+    return NULL;
+}
+
+void
+UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig){
+    //delete writerGroup config
+    UA_String_deleteMembers(&writerGroupConfig->name);
+    UA_ExtensionObject_deleteMembers(&writerGroupConfig->transportSettings);
+    UA_ExtensionObject_deleteMembers(&writerGroupConfig->messageSettings);
+    for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
+        UA_KeyValuePair_deleteMembers(&writerGroupConfig->groupProperties[i]);
+    }
+    UA_free(writerGroupConfig->groupProperties);
+}
+
+void
+UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup) {
+    UA_WriterGroupConfig_deleteMembers(&writerGroup->config);
+    //delete WriterGroup
+    //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
+    UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
+    LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
+        UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
+    }
+    LIST_REMOVE(writerGroup, listEntry);
+    UA_NodeId_deleteMembers(&writerGroup->linkedConnection);
+    UA_NodeId_deleteMembers(&writerGroup->identifier);
+}
+
+UA_StatusCode
+UA_Server_addDataSetWriter(UA_Server *server,
+                           const UA_NodeId writerGroup, const UA_NodeId dataSet,
+                           const UA_DataSetWriterConfig *dataSetWriterConfig,
+                           UA_NodeId *writerIdentifier) {
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+    if(!dataSetWriterConfig)
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+
+    UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
+    if(!currentDataSetContext)
+        return UA_STATUSCODE_BADNOTFOUND;
+
+    UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
+    if(!wg)
+        return UA_STATUSCODE_BADNOTFOUND;
+
+    UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
+    if(!newDataSetWriter)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+
+    //copy the config into the new dataSetWriter
+    UA_DataSetWriterConfig tmpDataSetWriterConfig;
+    retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
+    newDataSetWriter->config = tmpDataSetWriterConfig;
+    //save the current version of the connected PublishedDataSet
+    newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
+    //initialize the queue for the last values
+    newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
+    newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
+        UA_calloc(newDataSetWriter->lastSamplesCount, sizeof(UA_DataSetWriterSample));
+    if(!newDataSetWriter->lastSamples) {
+        UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
+        UA_free(newDataSetWriter);
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    }
+    for(size_t i = 0; i < newDataSetWriter->lastSamplesCount; i++) {
+        newDataSetWriter->lastSamples[i].value = (UA_DataValue *) UA_calloc(1, sizeof(UA_DataValue));
+        if(!newDataSetWriter->lastSamples[i].value) {
+            for(size_t j = 0; j < i; j++)
+                UA_free(newDataSetWriter->lastSamples[j].value);
+            UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
+            UA_free(newDataSetWriter);
+            return UA_STATUSCODE_BADOUTOFMEMORY;
+        }
+    }
+    //connect PublishedDataSet with DataSetWriter
+    newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
+    newDataSetWriter->linkedWriterGroup = wg->identifier;
+    UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
+    if(writerIdentifier != NULL)
+        UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
+    //add the new writer to the group
+    LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
+    wg->writersCount++;
+    return retVal;
+}
+
+UA_StatusCode
+UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
+    UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
+    if(!dataSetWriter)
+        return UA_STATUSCODE_BADNOTFOUND;
+
+    UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
+    if(!linkedWriterGroup)
+        return UA_STATUSCODE_BADNOTFOUND;
+
+    linkedWriterGroup->writersCount--;
+    //remove DataSetWriter from group
+    UA_DataSetWriter_deleteMembers(server, dataSetWriter);
+    UA_free(dataSetWriter);
+    return UA_STATUSCODE_GOOD;
+}
+
+/**********************************************/
+/*                DataSetField                */
+/**********************************************/
+
+UA_StatusCode
+UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
+    memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
+    if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
+        UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
+        UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
+                                          &dst->field.variable.publishParameters);
+    } else {
+        return UA_STATUSCODE_BADNOTSUPPORTED;
+    }
+
+    return UA_STATUSCODE_GOOD;
+}
+
+UA_StatusCode
+UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
+                                UA_DataSetFieldConfig *config) {
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+    if(!config)
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+    UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
+    if(!currentDataSetField)
+        return UA_STATUSCODE_BADNOTFOUND;
+    UA_DataSetFieldConfig tmpFieldConfig;
+    //deep copy of the actual config
+    retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
+    *config = tmpFieldConfig;
+    return retVal;
+}
+
+UA_DataSetField *
+UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
+    for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
+        UA_DataSetField *tmpField;
+        LIST_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
+            if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
+                return tmpField;
+            }
+        }
+    }
+    return NULL;
+}
+
+void
+UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig){
+    if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
+        UA_String_deleteMembers(&dataSetFieldConfig->field.variable.fieldNameAlias);
+        UA_PublishedVariableDataType_deleteMembers(&dataSetFieldConfig->field.variable.publishParameters);
+    }
+}
+
+void UA_DataSetField_deleteMembers(UA_DataSetField *field) {
+    UA_DataSetFieldConfig_deleteMembers(&field->config);
+    //delete DataSetField
+    UA_NodeId_deleteMembers(&field->identifier);
+    UA_NodeId_deleteMembers(&field->publishedDataSet);
+    UA_FieldMetaData_deleteMembers(&field->fieldMetaData);
+    UA_DataValue_deleteMembers(&field->lastValue);
+    LIST_REMOVE(field, listEntry);
 }

+ 84 - 3
src/pubsub/ua_pubsub.h

@@ -18,6 +18,10 @@ extern "C" {
 #include "ua_server.h"
 #include "ua_server_pubsub.h"
 
+//forward declarations
+struct UA_WriterGroup;
+typedef struct UA_WriterGroup UA_WriterGroup;
+
 /* The configuration structs (public part of PubSub entities) are defined in include/ua_plugin_pubsub.h */
 
 /**********************************************/
@@ -26,7 +30,7 @@ extern "C" {
 typedef struct{
     UA_PublishedDataSetConfig config;
     UA_DataSetMetaDataType dataSetMetaData;
-    LIST_HEAD(UA_ListOfPubSubDataSetField, UA_PubSubDataSetField) fields;
+    LIST_HEAD(UA_ListOfDataSetField, UA_DataSetField) fields;
     UA_NodeId identifier;
     UA_UInt16 fieldSize;
     UA_UInt16 promotedFieldsCount;
@@ -37,7 +41,7 @@ UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src, UA_Publishe
 UA_PublishedDataSet *
 UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier);
 void
-UA_PublishedDataSet_delete(UA_PublishedDataSet *publishedDataSet);
+UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet);
 
 /**********************************************/
 /*               Connection                   */
@@ -48,6 +52,7 @@ typedef struct{
     //internal fields
     UA_PubSubChannel *channel;
     UA_NodeId identifier;
+    LIST_HEAD(UA_ListOfWriterGroup, UA_WriterGroup) writerGroups;
 } UA_PubSubConnection;
 
 UA_StatusCode
@@ -57,7 +62,83 @@ UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionId
 void
 UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig);
 void
-UA_PubSubConnection_delete(UA_PubSubConnection *connection);
+UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection);
+
+/**********************************************/
+/*              DataSetWriter                 */
+/**********************************************/
+
+typedef struct UA_DataSetWriterSample{
+    UA_Boolean valeChanged;
+    UA_DataValue *value;
+} UA_DataSetWriterSample;
+
+typedef struct UA_DataSetWriter{
+    UA_DataSetWriterConfig config;
+    //internal fields
+    LIST_ENTRY(UA_DataSetWriter) listEntry;
+    UA_NodeId identifier;
+    UA_NodeId linkedWriterGroup;
+    UA_NodeId connectedDataSet;
+    UA_ConfigurationVersionDataType connectedDataSetVersion;
+    UA_UInt16 deltaFrameCounter;            //actual count of sent deltaFrames
+    size_t lastSamplesCount;
+    UA_DataSetWriterSample *lastSamples;
+    UA_UInt16 actualDataSetMessageSequenceCount;
+} UA_DataSetWriter;
+
+UA_StatusCode
+UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src, UA_DataSetWriterConfig *dst);
+UA_DataSetWriter *
+UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier);
+void
+UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter);
+
+/**********************************************/
+/*               WriterGroup                  */
+/**********************************************/
+
+struct UA_WriterGroup{
+    UA_WriterGroupConfig config;
+    //internal fields
+    LIST_ENTRY(UA_WriterGroup) listEntry;
+    UA_NodeId identifier;
+    UA_NodeId linkedConnection;
+    LIST_HEAD(UA_ListOfDataSetWriter, UA_DataSetWriter) writers;
+    UA_UInt32 writersCount;
+    UA_UInt64 publishCallbackId;
+    UA_Boolean publishCallbackIsRegistered;
+};
+
+UA_StatusCode
+UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src, UA_WriterGroupConfig *dst);
+UA_WriterGroup *
+UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier);
+void
+UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup);
+
+/**********************************************/
+/*               DataSetField                 */
+/**********************************************/
+
+typedef struct UA_DataSetField{
+    UA_DataSetFieldConfig config;
+    //internal fields
+    LIST_ENTRY(UA_DataSetField) listEntry;
+    UA_NodeId identifier;
+    UA_NodeId publishedDataSet;             //ref to parent pds
+    UA_FieldMetaData fieldMetaData;
+    UA_UInt64 sampleCallbackId;
+    UA_Boolean sampleCallbackIsRegistered;
+    UA_DataValue lastValue;
+} UA_DataSetField;
+
+UA_StatusCode
+UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst);
+UA_DataSetField *
+UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier);
+void
+UA_DataSetField_deleteMembers(UA_DataSetField *field);
 
 #ifdef __cplusplus
 } // extern "C"

+ 61 - 60
src/pubsub/ua_pubsub_manager.c

@@ -10,14 +10,6 @@
 
 #define UA_DATETIMESTAMP_2000 125911584000000000
 
-/**
- * Add new Connection to the current PubSub configuration.
- *
- * @param server
- * @param connectionConfig config of the new Connection
- * @param connectionIdentifier nodeId of the generated Connection (NULL if not needed)
- * @return UA_STATUSCODE_GOOD if success
- */
 UA_StatusCode
 UA_Server_addPubSubConnection(UA_Server *server, const UA_PubSubConnectionConfig *connectionConfig,
                               UA_NodeId *connectionIdentifier) {
@@ -53,10 +45,17 @@ UA_Server_addPubSubConnection(UA_Server *server, const UA_PubSubConnectionConfig
             server->pubSubManager.connections = newConnectionsField;
             UA_PubSubConnection *newConnection = &server->pubSubManager.connections[server->pubSubManager.connectionsSize];
             memset(newConnection, 0, sizeof(UA_PubSubConnection));
+            LIST_INIT(&newConnection->writerGroups);
+            //workaround - fixing issue with queue.h and realloc.
+            for(size_t n = 0; n < server->pubSubManager.connectionsSize; n++){
+                if(server->pubSubManager.connections[n].writerGroups.lh_first){
+                    server->pubSubManager.connections[n].writerGroups.lh_first->listEntry.le_prev = &server->pubSubManager.connections[n].writerGroups.lh_first;
+                }
+            }
             newConnection->config = tmpConnectionConfig;
             newConnection->channel = server->config.pubsubTransportLayers[i].createPubSubChannel(newConnection->config);
             if(!newConnection->channel){
-                UA_PubSubConnection_delete(newConnection);
+                UA_PubSubConnection_deleteMembers(server, newConnection);
                 if(server->pubSubManager.connectionsSize > 0){
                     newConnectionsField = (UA_PubSubConnection *)
                             UA_realloc(server->pubSubManager.connections,
@@ -86,55 +85,49 @@ UA_Server_addPubSubConnection(UA_Server *server, const UA_PubSubConnectionConfig
     return UA_STATUSCODE_BADNOTFOUND;
 }
 
-/**
- * Remove Connection, identified by the NodeId. Deletion of Connection
- * removes all contained WriterGroups and Writers.
- *
- * @param server
- * @param connectionIdentifier
- * @return UA_STATUSCODE_GOOD on success
- */
-UA_StatusCode UA_Server_removePubSubConnection(UA_Server *server, UA_NodeId connectionIdentifier) {
+UA_StatusCode
+UA_Server_removePubSubConnection(UA_Server *server, const UA_NodeId connection) {
     //search the identified Connection and store the Connection index
     size_t connectionIndex;
     UA_PubSubConnection *currentConnection = NULL;
     for(connectionIndex = 0; connectionIndex < server->pubSubManager.connectionsSize; connectionIndex++){
-        if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[connectionIndex].identifier)){
+        if(UA_NodeId_equal(&connection, &server->pubSubManager.connections[connectionIndex].identifier)){
             currentConnection = &server->pubSubManager.connections[connectionIndex];
             break;
         }
     }
-    if(!currentConnection){
+    if(!currentConnection)
         return UA_STATUSCODE_BADNOTFOUND;
-    }
-    UA_PubSubConnection_delete(currentConnection);
+
+    UA_PubSubConnection_deleteMembers(server, currentConnection);
     server->pubSubManager.connectionsSize--;
-    //remove the connection from the pubSubManager, move the last connection into the allocated memory of the deleted connection
+    //remove the connection from the pubSubManager, move the last connection
+    //into the allocated memory of the deleted connection
     if(server->pubSubManager.connectionsSize != connectionIndex){
         memcpy(&server->pubSubManager.connections[connectionIndex],
-               &server->pubSubManager.connections[server->pubSubManager.connectionsSize], sizeof(UA_PubSubConnection));
+               &server->pubSubManager.connections[server->pubSubManager.connectionsSize],
+               sizeof(UA_PubSubConnection));
     }
+
     if(server->pubSubManager.connectionsSize <= 0){
         UA_free(server->pubSubManager.connections);
         server->pubSubManager.connections = NULL;
-    }  else {
+    } else {
         server->pubSubManager.connections = (UA_PubSubConnection *)
                 UA_realloc(server->pubSubManager.connections, sizeof(UA_PubSubConnection) * server->pubSubManager.connectionsSize);
         if(!server->pubSubManager.connections){
             return UA_STATUSCODE_BADINTERNALERROR;
         }
+        //workaround - fixing issue with queue.h and realloc.
+        for(size_t n = 0; n < server->pubSubManager.connectionsSize; n++){
+            if(server->pubSubManager.connections[n].writerGroups.lh_first){
+                server->pubSubManager.connections[n].writerGroups.lh_first->listEntry.le_prev = &server->pubSubManager.connections[n].writerGroups.lh_first;
+            }
+        }
     }
     return UA_STATUSCODE_GOOD;
 }
 
-/**
- * Add PublishedDataSet to the current PubSub configuration.
- *
- * @param server
- * @param publishedDataSetConfig config of the new PDS
- * @param pdsIdentifier nodeId of the generated PDS (NULL if not needed)
- * @return UA_STATUSCODE_GOOD on success
- */
 UA_AddPublishedDataSetResult
 UA_Server_addPublishedDataSet(UA_Server *server, const UA_PublishedDataSetConfig *publishedDataSetConfig,
                               UA_NodeId *pdsIdentifier) {
@@ -169,6 +162,13 @@ UA_Server_addPublishedDataSet(UA_Server *server, const UA_PublishedDataSetConfig
     server->pubSubManager.publishedDataSets = newPubSubDataSetField;
     UA_PublishedDataSet *newPubSubDataSet = &server->pubSubManager.publishedDataSets[server->pubSubManager.publishedDataSetsSize];
     memset(newPubSubDataSet, 0, sizeof(UA_PublishedDataSet));
+    LIST_INIT(&newPubSubDataSet->fields);
+    //workaround - fixing issue with queue.h and realloc.
+    for(size_t n = 0; n < server->pubSubManager.publishedDataSetsSize; n++){
+        if(server->pubSubManager.publishedDataSets[n].fields.lh_first){
+            server->pubSubManager.publishedDataSets[n].fields.lh_first->listEntry.le_prev = &server->pubSubManager.publishedDataSets[n].fields.lh_first;
+        }
+    }
     newPubSubDataSet->config = tmpPublishedDataSetConfig;
     if(tmpPublishedDataSetConfig.publishedDataSetType == UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE){
         //parse template config and add fields (later PubSub batch)
@@ -185,22 +185,13 @@ UA_Server_addPublishedDataSet(UA_Server *server, const UA_PublishedDataSetConfig
     return result;
 }
 
-/**
- * Remove PublishedDataSet, identified by the NodeId. Deletion of PDS
- * removes all contained and linked PDS Fields. Connected WriterGroups
- * will be also removed.
- *
- * @param server
- * @param pdsIdentifier
- * @return UA_STATUSCODE_GOOD on success
- */
 UA_StatusCode
-UA_Server_removePublishedDataSet(UA_Server *server, UA_NodeId pdsIdentifier) {
+UA_Server_removePublishedDataSet(UA_Server *server, UA_NodeId pds) {
     //search the identified PublishedDataSet and store the PDS index
     UA_PublishedDataSet *publishedDataSet = NULL;
     size_t publishedDataSetIndex;
     for(publishedDataSetIndex = 0; publishedDataSetIndex < server->pubSubManager.publishedDataSetsSize; publishedDataSetIndex++){
-        if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[publishedDataSetIndex].identifier, &pdsIdentifier)){
+        if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[publishedDataSetIndex].identifier, &pds)){
             publishedDataSet = &server->pubSubManager.publishedDataSets[publishedDataSetIndex];
             break;
         };
@@ -208,7 +199,19 @@ UA_Server_removePublishedDataSet(UA_Server *server, UA_NodeId pdsIdentifier) {
     if(!publishedDataSet){
         return UA_STATUSCODE_BADNOTFOUND;
     }
-    UA_PublishedDataSet_delete(publishedDataSet);
+    //search for referenced writers -> delete this writers. (Standard: writer must be connected with PDS)
+    for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
+        UA_WriterGroup *writerGroup;
+        LIST_FOREACH(writerGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
+            UA_DataSetWriter *currentWriter, *tmpWriterGroup;
+            LIST_FOREACH_SAFE(currentWriter, &writerGroup->writers, listEntry, tmpWriterGroup){
+                if(UA_NodeId_equal(&currentWriter->identifier, &publishedDataSet->identifier)){
+                    UA_Server_removeDataSetWriter(server, currentWriter->identifier);
+                }
+            }
+        }
+    }
+    UA_PublishedDataSet_deleteMembers(server, publishedDataSet);
     server->pubSubManager.publishedDataSetsSize--;
     //copy the last PDS to the removed PDS inside the allocated memory block
     if(server->pubSubManager.publishedDataSetsSize != publishedDataSetIndex){
@@ -222,26 +225,29 @@ UA_Server_removePublishedDataSet(UA_Server *server, UA_NodeId pdsIdentifier) {
     } else {
         server->pubSubManager.publishedDataSets = (UA_PublishedDataSet *)
                 UA_realloc(server->pubSubManager.publishedDataSets, sizeof(UA_PublishedDataSet) * server->pubSubManager.publishedDataSetsSize);
-        if (!server->pubSubManager.publishedDataSets) {
+        if(!server->pubSubManager.publishedDataSets){
             return UA_STATUSCODE_BADINTERNALERROR;
         }
+        //workaround - fixing issue with queue.h and realloc.
+        for(size_t n = 0; n < server->pubSubManager.publishedDataSetsSize; n++){
+            if(server->pubSubManager.publishedDataSets[n].fields.lh_first){
+                server->pubSubManager.publishedDataSets[n].fields.lh_first->listEntry.le_prev = &server->pubSubManager.publishedDataSets[n].fields.lh_first;
+            }
+        }
     }
     return UA_STATUSCODE_GOOD;
 }
 
-/**
- * Calculate the time difference between current time and UTC (00:00) on January 1, 2000.
- */
+/* Calculate the time difference between current time and UTC (00:00) on January
+ * 1, 2000. */
 UA_UInt32
 UA_PubSubConfigurationVersionTimeDifference() {
     UA_UInt32 timeDiffSince2000 = (UA_UInt32) (UA_DateTime_now() - UA_DATETIMESTAMP_2000);
     return timeDiffSince2000;
 }
 
-/**
- * Generate a new unique NodeId. This NodeId will be used for the
- * information model representation of PubSub entities.
- */
+/* Generate a new unique NodeId. This NodeId will be used for the information
+ * model representation of PubSub entities. */
 void
 UA_PubSubManager_generateUniqueNodeId(UA_Server *server, UA_NodeId *nodeId) {
     UA_NodeId newNodeId = UA_NODEID_NUMERIC(0, 0);
@@ -250,13 +256,8 @@ UA_PubSubManager_generateUniqueNodeId(UA_Server *server, UA_NodeId *nodeId) {
     UA_NodeId_copy(&newNodeId, nodeId);
 }
 
-/**
- * Delete the current PubSub configuration including all nested members. This action also delete
- * the configured PubSub transport Layers.
- *
- * @param server
- * @param pubSubManager
- */
+/* Delete the current PubSub configuration including all nested members. This
+ * action also delete the configured PubSub transport Layers. */
 void
 UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager) {
     UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER, "PubSub cleanup was called.");

+ 4 - 2
src/pubsub/ua_pubsub_manager.h

@@ -23,9 +23,11 @@ typedef struct UA_PubSubManager{
     UA_PublishedDataSet *publishedDataSets;
 } UA_PubSubManager;
 
-void UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager);
+void
+UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager);
 
-void UA_PubSubManager_generateUniqueNodeId(UA_Server *server, UA_NodeId *nodeId);
+void
+UA_PubSubManager_generateUniqueNodeId(UA_Server *server, UA_NodeId *nodeId);
 
 UA_UInt32
 UA_PubSubConfigurationVersionTimeDifference(void);

+ 3 - 0
tests/CMakeLists.txt

@@ -177,6 +177,9 @@ if(UA_ENABLE_PUBSUB)
     add_executable(check_pubsub_pds pubsub/check_pubsub_pds.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-plugins>)
     target_link_libraries(check_pubsub_pds ${LIBS})
     add_test(check_pubsub_pds ${TESTS_BINARY_DIR}/check_pubsub_pds)
+    add_executable(check_pubsub_publish pubsub/check_pubsub_publish.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-plugins>)
+    target_link_libraries(check_pubsub_publish ${LIBS})
+    add_test(check_pubsub_publish ${TESTS_BINARY_DIR}/check_pubsub_publish)
 endif()
 
 add_executable(check_server_readspeed server/check_server_readspeed.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-testplugins>)

+ 1 - 1
tests/pubsub/check_pubsub_connection_udp.c

@@ -174,7 +174,7 @@ START_TEST(GetMaximalConnectionConfigurationAndCompareValues){
     ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
     UA_PubSubConnectionConfig connectionConfig;
     memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
-    retVal |= UA_PubSubConnection_getConfig(server, connection, &connectionConfig);
+    retVal |= UA_Server_getPubSubConnectionConfig(server, connection, &connectionConfig);
     ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
     ck_assert(connectionConfig.connectionPropertiesSize == connectionConf.connectionPropertiesSize);
     ck_assert(UA_String_equal(&connectionConfig.name, &connectionConf.name) == UA_TRUE);

+ 1 - 1
tests/pubsub/check_pubsub_pds.c

@@ -105,7 +105,7 @@ START_TEST(GetPDSConfigurationAndCompareValues){
     ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
     UA_PublishedDataSetConfig pdsConfigCopy;
     memset(&pdsConfigCopy, 0, sizeof(UA_PublishedDataSetConfig));
-        UA_PublishedDataSet_getConfig(server, pdsIdentifier, &pdsConfigCopy);
+        UA_Server_getPublishedDataSetConfig(server, pdsIdentifier, &pdsConfigCopy);
     ck_assert_int_eq(UA_String_equal(&pdsConfig.name, &pdsConfigCopy.name), UA_TRUE);
     UA_PublishedDataSetConfig_deleteMembers(&pdsConfigCopy);
 } END_TEST

+ 385 - 0
tests/pubsub/check_pubsub_publish.c

@@ -0,0 +1,385 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2017 - 2018 Fraunhofer IOSB (Author: Andreas Ebner)
+ */
+
+#include <ua_server_pubsub.h>
+#include <src_generated/ua_types_generated_encoding_binary.h>
+#include <ua_types.h>
+#include <ua_pubsub.h>
+#include "ua_config_default.h"
+#include "ua_network_pubsub_udp.h"
+#include "ua_server_internal.h"
+#include "check.h"
+
+UA_Server *server = NULL;
+UA_ServerConfig *config = NULL;
+UA_NodeId connection1, connection2, writerGroup1, writerGroup2, writerGroup3,
+        publishedDataSet1, publishedDataSet2, dataSetWriter1, dataSetWriter2, dataSetWriter3;
+
+static void setup(void) {
+    config = UA_ServerConfig_new_default();
+    config->pubsubTransportLayers = (UA_PubSubTransportLayer *) UA_malloc(sizeof(UA_PubSubTransportLayer));
+    if(!config->pubsubTransportLayers) {
+        UA_ServerConfig_delete(config);
+    }
+    config->pubsubTransportLayers[0] = UA_PubSubTransportLayerUDPMP();
+    config->pubsubTransportLayersSize++;
+    server = UA_Server_new(config);
+    UA_Server_run_startup(server);
+    //add 2 connections
+    UA_PubSubConnectionConfig connectionConfig;
+    memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
+    connectionConfig.name = UA_STRING("UADP Connection");
+    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL, UA_STRING("opc.udp://224.0.0.22:4840/")};
+    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
+                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
+    UA_Server_addPubSubConnection(server, &connectionConfig, &connection1);
+    UA_Server_addPubSubConnection(server, &connectionConfig, &connection2);
+}
+
+static void teardown(void) {
+    UA_Server_run_shutdown(server);
+    UA_Server_delete(server);
+    UA_ServerConfig_delete(config);
+}
+
+START_TEST(AddWriterGroupWithValidConfiguration){
+        UA_StatusCode retVal;
+        UA_WriterGroupConfig writerGroupConfig;
+        memset(&writerGroupConfig, 0, sizeof(writerGroupConfig));
+        writerGroupConfig.name = UA_STRING("WriterGroup 1");
+        writerGroupConfig.publishingInterval = 10;
+        UA_NodeId localWriterGroup;
+        retVal = UA_Server_addWriterGroup(server, connection1, &writerGroupConfig, &localWriterGroup);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        size_t writerGroupCount = 0;
+        UA_WriterGroup *writerGroup;
+        LIST_FOREACH(writerGroup, &UA_PubSubConnection_findConnectionbyId(server, connection1)->writerGroups, listEntry){
+            writerGroupCount++;
+        }
+        ck_assert_int_eq(writerGroupCount, 1);
+    } END_TEST
+
+START_TEST(AddRemoveAddWriterGroupWithMinimalValidConfiguration){
+        UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+        UA_WriterGroupConfig writerGroupConfig;
+        memset(&writerGroupConfig, 0, sizeof(writerGroupConfig));
+        writerGroupConfig.name = UA_STRING("WriterGroup 1");
+        writerGroupConfig.publishingInterval = 10;
+        UA_NodeId localWriterGroup;
+        retVal |= UA_Server_addWriterGroup(server, connection1, &writerGroupConfig, &localWriterGroup);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        retVal |= UA_Server_removeWriterGroup(server, localWriterGroup);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        size_t writerGroupCount = 0;
+        UA_WriterGroup *writerGroup;
+        LIST_FOREACH(writerGroup, &UA_PubSubConnection_findConnectionbyId(server, connection1)->writerGroups, listEntry){
+            writerGroupCount++;
+        }
+        ck_assert_int_eq(writerGroupCount, 0);
+        retVal |= UA_Server_addWriterGroup(server, connection1, &writerGroupConfig, &localWriterGroup);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        writerGroupCount = 0;
+        LIST_FOREACH(writerGroup, &UA_PubSubConnection_findConnectionbyId(server, connection1)->writerGroups, listEntry){
+            writerGroupCount++;
+        }
+        ck_assert_int_eq(writerGroupCount, 1);
+        retVal |= UA_Server_addWriterGroup(server, connection1, &writerGroupConfig, &localWriterGroup);
+        writerGroupCount = 0;
+        LIST_FOREACH(writerGroup, &UA_PubSubConnection_findConnectionbyId(server, connection1)->writerGroups, listEntry){
+            writerGroupCount++;
+        }
+        ck_assert_int_eq(writerGroupCount, 2);
+    } END_TEST
+
+START_TEST(AddWriterGroupWithNullConfig){
+        UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+        retVal |= UA_Server_addWriterGroup(server, connection1, NULL, NULL);
+        size_t writerGroupCount = 0;
+        UA_WriterGroup *writerGroup;
+        LIST_FOREACH(writerGroup, &UA_PubSubConnection_findConnectionbyId(server, connection1)->writerGroups, listEntry){
+            writerGroupCount++;
+        }
+        ck_assert_int_eq(writerGroupCount, 0);
+        ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+    } END_TEST
+
+START_TEST(AddWriterGroupWithInvalidConnectionId){
+        UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+        UA_WriterGroupConfig writerGroupConfig;
+        memset(&writerGroupConfig, 0, sizeof(writerGroupConfig));
+        writerGroupConfig.name = UA_STRING("WriterGroup 1");
+        writerGroupConfig.publishingInterval = 10;
+        retVal |= UA_Server_addWriterGroup(server, UA_NODEID_NUMERIC(0, UA_UINT32_MAX), &writerGroupConfig, NULL);
+        size_t writerGroupCount = 0;
+        UA_WriterGroup *writerGroup;
+        LIST_FOREACH(writerGroup, &UA_PubSubConnection_findConnectionbyId(server, connection1)->writerGroups, listEntry){
+            writerGroupCount++;
+        }
+        ck_assert_int_eq(writerGroupCount, 0);
+        ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+    } END_TEST
+
+START_TEST(GetWriterGroupConfigurationAndCompareValues){
+        UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+        UA_WriterGroupConfig writerGroupConfig;
+        memset(&writerGroupConfig, 0, sizeof(writerGroupConfig));
+        writerGroupConfig.name = UA_STRING("WriterGroup 1");
+        writerGroupConfig.publishingInterval = 10;
+        UA_NodeId localWriterGroup;
+        retVal |= UA_Server_addWriterGroup(server, connection1, &writerGroupConfig, &localWriterGroup);
+        UA_WriterGroupConfig writerGroupConfigCopy;
+        retVal |= UA_Server_getWriterGroupConfig(server, localWriterGroup, &writerGroupConfigCopy);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(UA_String_equal(&writerGroupConfig.name, &writerGroupConfigCopy.name), UA_TRUE);
+        //todo remove == for floating point compare
+        ck_assert(writerGroupConfig.publishingInterval == writerGroupConfig.publishingInterval);
+        UA_WriterGroupConfig_deleteMembers(&writerGroupConfigCopy);
+    } END_TEST
+
+static void setupDataSetWriterTestEnvironment(void){
+    UA_WriterGroupConfig writerGroupConfig;
+    memset(&writerGroupConfig, 0, sizeof(writerGroupConfig));
+    writerGroupConfig.name = UA_STRING("WriterGroup 1");
+    writerGroupConfig.publishingInterval = 10;
+    UA_Server_addWriterGroup(server, connection1, &writerGroupConfig, &writerGroup1);
+    writerGroupConfig.name = UA_STRING("WriterGroup 2");
+    writerGroupConfig.publishingInterval = 50;
+    UA_Server_addWriterGroup(server, connection2, &writerGroupConfig, &writerGroup2);
+    writerGroupConfig.name = UA_STRING("WriterGroup 3");
+    writerGroupConfig.publishingInterval = 100;
+    UA_Server_addWriterGroup(server, connection2, &writerGroupConfig, &writerGroup3);
+    UA_PublishedDataSetConfig pdsConfig;
+    memset(&pdsConfig, 0, sizeof(UA_PublishedDataSetConfig));
+    pdsConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
+    pdsConfig.name = UA_STRING("PublishedDataSet 1");
+    UA_Server_addPublishedDataSet(server, &pdsConfig, &publishedDataSet1);
+    UA_Server_addPublishedDataSet(server, &pdsConfig, &publishedDataSet2);
+}
+
+START_TEST(AddDataSetWriterWithValidConfiguration){
+        setupDataSetWriterTestEnvironment();
+        UA_StatusCode retVal;
+        UA_DataSetWriterConfig dataSetWriterConfig;
+        memset(&dataSetWriterConfig, 0, sizeof(dataSetWriterConfig));
+        dataSetWriterConfig.name = UA_STRING("DataSetWriter 1 ");
+        UA_NodeId localDataSetWriter;
+        retVal = UA_Server_addDataSetWriter(server, writerGroup1, publishedDataSet1, &dataSetWriterConfig, &localDataSetWriter);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        UA_DataSetWriter *dsw1 = UA_DataSetWriter_findDSWbyId(server, localDataSetWriter);
+        ck_assert_ptr_ne(dsw1, NULL);
+        UA_WriterGroup *wg1 = UA_WriterGroup_findWGbyId(server, writerGroup1);
+        ck_assert_ptr_ne(wg1, NULL);
+        ck_assert_int_eq(wg1->writersCount, 1);
+    } END_TEST
+
+START_TEST(AddRemoveAddDataSetWriterWithValidConfiguration){
+        setupDataSetWriterTestEnvironment();
+        UA_StatusCode retVal;
+        UA_WriterGroup *wg1 = UA_WriterGroup_findWGbyId(server, writerGroup1);
+        ck_assert_ptr_ne(wg1, NULL);
+        UA_DataSetWriterConfig dataSetWriterConfig;
+        memset(&dataSetWriterConfig, 0, sizeof(dataSetWriterConfig));
+        dataSetWriterConfig.name = UA_STRING("DataSetWriter 1 ");
+        UA_NodeId dataSetWriter;
+        ck_assert_int_eq(wg1->writersCount, 0);
+        retVal = UA_Server_addDataSetWriter(server, writerGroup1, publishedDataSet1, &dataSetWriterConfig, &dataSetWriter);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(wg1->writersCount, 1);
+        retVal = UA_Server_removeDataSetWriter(server, dataSetWriter);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(wg1->writersCount, 0);
+        retVal = UA_Server_addDataSetWriter(server, writerGroup1, publishedDataSet1, &dataSetWriterConfig, &dataSetWriter);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(wg1->writersCount, 1);
+        retVal = UA_Server_addDataSetWriter(server, writerGroup1, publishedDataSet1, &dataSetWriterConfig, &dataSetWriter);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(wg1->writersCount, 2);
+
+        UA_WriterGroup *wg2 = UA_WriterGroup_findWGbyId(server, writerGroup2);
+        ck_assert_ptr_ne(wg2, NULL);
+        retVal = UA_Server_addDataSetWriter(server, writerGroup2, publishedDataSet1, &dataSetWriterConfig, &dataSetWriter);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(wg2->writersCount, 1);
+    } END_TEST
+
+START_TEST(AddDataSetWriterWithNullConfig){
+        setupDataSetWriterTestEnvironment();
+        UA_StatusCode retVal;
+        retVal = UA_Server_addDataSetWriter(server, writerGroup1, publishedDataSet1, NULL, NULL);
+        UA_WriterGroup *wg1 = UA_WriterGroup_findWGbyId(server, writerGroup1);
+        ck_assert_ptr_ne(wg1, NULL);
+        ck_assert_int_eq(wg1->writersCount, 0);
+        ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+    } END_TEST
+
+START_TEST(AddDataSetWriterWithInvalidPDSId){
+        setupDataSetWriterTestEnvironment();
+        UA_StatusCode retVal;
+        UA_DataSetWriterConfig dataSetWriterConfig;
+        memset(&dataSetWriterConfig, 0, sizeof(dataSetWriterConfig));
+        dataSetWriterConfig.name = UA_STRING("DataSetWriter 1 ");
+        retVal = UA_Server_addDataSetWriter(server, writerGroup1, UA_NODEID_NUMERIC(0, UA_UINT32_MAX), &dataSetWriterConfig, NULL);
+        UA_WriterGroup *wg1 = UA_WriterGroup_findWGbyId(server, writerGroup1);
+        ck_assert_ptr_ne(wg1, NULL);
+        ck_assert_int_eq(wg1->writersCount, 0);
+        ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+    } END_TEST
+
+START_TEST(GetDataSetWriterConfigurationAndCompareValues){
+        setupDataSetWriterTestEnvironment();
+        UA_StatusCode retVal;
+        UA_DataSetWriterConfig dataSetWriterConfig;
+        memset(&dataSetWriterConfig, 0, sizeof(dataSetWriterConfig));
+        dataSetWriterConfig.name = UA_STRING("DataSetWriter 1 ");
+        UA_NodeId localDataSetWriter;
+        retVal = UA_Server_addDataSetWriter(server, writerGroup1, publishedDataSet1, &dataSetWriterConfig, &localDataSetWriter);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        UA_DataSetWriterConfig dataSetWiterConfigCopy;
+        retVal |= UA_Server_getDataSetWriterConfig(server, localDataSetWriter, &dataSetWiterConfigCopy);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(UA_String_equal(&dataSetWiterConfigCopy.name, &dataSetWiterConfigCopy.name), UA_TRUE);
+        UA_DataSetWriterConfig_deleteMembers(&dataSetWiterConfigCopy);
+    } END_TEST
+
+static void setupDataSetFieldTestEnvironment(void){
+    setupDataSetWriterTestEnvironment();
+    UA_DataSetWriterConfig dataSetWriterConfig;
+    memset(&dataSetWriterConfig, 0, sizeof(dataSetWriterConfig));
+    dataSetWriterConfig.name = UA_STRING("DataSetWriter 1");
+    UA_Server_addDataSetWriter(server, writerGroup1, publishedDataSet1, &dataSetWriterConfig, &dataSetWriter1);
+    dataSetWriterConfig.name = UA_STRING("DataSetWriter 2");
+    UA_Server_addDataSetWriter(server, writerGroup1, publishedDataSet1, &dataSetWriterConfig, &dataSetWriter2);
+    dataSetWriterConfig.name = UA_STRING("DataSetWriter 3");
+    UA_Server_addDataSetWriter(server, writerGroup2, publishedDataSet2, &dataSetWriterConfig, &dataSetWriter3);
+}
+
+START_TEST(AddDataSetFieldWithValidConfiguration){
+        setupDataSetFieldTestEnvironment();
+        UA_StatusCode retVal;
+        UA_DataSetFieldConfig fieldConfig;
+        memset(&fieldConfig, 0, sizeof(UA_DataSetFieldConfig));
+        fieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
+        fieldConfig.field.variable.fieldNameAlias = UA_STRING("field 1");
+        UA_NodeId localDataSetField;
+        UA_PublishedDataSet *pds = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet1);
+        ck_assert_ptr_ne(pds, NULL);
+        ck_assert_int_eq(pds->fieldSize, 0);
+        retVal = UA_Server_addDataSetField(server, publishedDataSet1, &fieldConfig, &localDataSetField).result;
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(pds->fieldSize, 1);
+    } END_TEST
+
+START_TEST(AddRemoveAddDataSetFieldWithValidConfiguration){
+        setupDataSetFieldTestEnvironment();
+        UA_StatusCode retVal;
+        UA_DataSetFieldConfig fieldConfig;
+        memset(&fieldConfig, 0, sizeof(UA_DataSetFieldConfig));
+        fieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
+        fieldConfig.field.variable.fieldNameAlias = UA_STRING("field 1");
+        UA_NodeId localDataSetField;
+        UA_PublishedDataSet *pds1 = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet1);
+        ck_assert_ptr_ne(pds1, NULL);
+        ck_assert_int_eq(pds1->fieldSize, 0);
+        retVal = UA_Server_addDataSetField(server, publishedDataSet1, &fieldConfig, &localDataSetField).result;
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(pds1->fieldSize, 1);
+        retVal = UA_Server_removeDataSetField(server, localDataSetField).result;
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(pds1->fieldSize, 0);
+        retVal = UA_Server_addDataSetField(server, publishedDataSet1, &fieldConfig, &localDataSetField).result;
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(pds1->fieldSize, 1);
+        retVal = UA_Server_addDataSetField(server, publishedDataSet1, &fieldConfig, &localDataSetField).result;
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(pds1->fieldSize, 2);
+        UA_PublishedDataSet *pds2 = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet2);
+        ck_assert_ptr_ne(pds2, NULL);
+        retVal = UA_Server_addDataSetField(server, publishedDataSet2, &fieldConfig, &localDataSetField).result;
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(pds2->fieldSize, 1);
+    } END_TEST
+
+START_TEST(AddDataSetFieldWithNullConfig){
+        setupDataSetFieldTestEnvironment();
+        UA_StatusCode retVal;
+        retVal = UA_Server_addDataSetField(server, publishedDataSet1, NULL, NULL).result;
+        ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+        UA_PublishedDataSet *pds1 = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet1);
+        ck_assert_ptr_ne(pds1, NULL);
+        ck_assert_int_eq(pds1->fieldSize, 0);
+    } END_TEST
+
+START_TEST(AddDataSetFieldWithInvalidPDSId){
+        setupDataSetFieldTestEnvironment();
+        UA_StatusCode retVal;
+        UA_DataSetFieldConfig fieldConfig;
+        memset(&fieldConfig, 0, sizeof(UA_DataSetFieldConfig));
+        fieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
+        fieldConfig.field.variable.fieldNameAlias = UA_STRING("field 1");
+        retVal = UA_Server_addDataSetField(server, UA_NODEID_NUMERIC(0, UA_UINT32_MAX), &fieldConfig, NULL).result;
+        ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+        UA_PublishedDataSet *pds1 = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet1);
+        ck_assert_ptr_ne(pds1, NULL);
+        ck_assert_int_eq(pds1->fieldSize, 0);
+    } END_TEST
+
+START_TEST(GetDataSetFieldConfigurationAndCompareValues){
+        setupDataSetFieldTestEnvironment();
+        UA_StatusCode retVal;
+        UA_DataSetFieldConfig fieldConfig;
+        memset(&fieldConfig, 0, sizeof(UA_DataSetFieldConfig));
+        fieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
+        fieldConfig.field.variable.fieldNameAlias = UA_STRING("field 1");
+        UA_NodeId dataSetFieldId;
+        retVal = UA_Server_addDataSetField(server, publishedDataSet1, &fieldConfig, &dataSetFieldId).result;
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        UA_DataSetFieldConfig fieldConfigCopy;
+        retVal |= UA_Server_getDataSetFieldConfig(server, dataSetFieldId, &fieldConfigCopy);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert_int_eq(fieldConfig.dataSetFieldType, fieldConfigCopy.dataSetFieldType);
+        ck_assert_int_eq(UA_String_equal(&fieldConfig.field.variable.fieldNameAlias, &fieldConfigCopy.field.variable.fieldNameAlias), UA_TRUE);
+        UA_DataSetFieldConfig_deleteMembers(&fieldConfigCopy);
+    } END_TEST
+
+int main(void) {
+    TCase *tc_add_pubsub_writergroup = tcase_create("PubSub WriterGroup items handling");
+    tcase_add_checked_fixture(tc_add_pubsub_writergroup, setup, teardown);
+    tcase_add_test(tc_add_pubsub_writergroup, AddWriterGroupWithValidConfiguration);
+    tcase_add_test(tc_add_pubsub_writergroup, AddRemoveAddWriterGroupWithMinimalValidConfiguration);
+    tcase_add_test(tc_add_pubsub_writergroup, AddWriterGroupWithNullConfig);
+    tcase_add_test(tc_add_pubsub_writergroup, AddWriterGroupWithInvalidConnectionId);
+    tcase_add_test(tc_add_pubsub_writergroup, GetWriterGroupConfigurationAndCompareValues);
+
+    TCase *tc_add_pubsub_datasetwriter = tcase_create("PubSub DataSetWriter items handling");
+    tcase_add_checked_fixture(tc_add_pubsub_datasetwriter, setup, teardown);
+    tcase_add_test(tc_add_pubsub_datasetwriter, AddDataSetWriterWithValidConfiguration);
+    tcase_add_test(tc_add_pubsub_datasetwriter, AddRemoveAddDataSetWriterWithValidConfiguration);
+    tcase_add_test(tc_add_pubsub_datasetwriter, AddDataSetWriterWithNullConfig);
+    tcase_add_test(tc_add_pubsub_datasetwriter, AddDataSetWriterWithInvalidPDSId);
+    tcase_add_test(tc_add_pubsub_datasetwriter, GetDataSetWriterConfigurationAndCompareValues);
+
+    TCase *tc_add_pubsub_datasetfields = tcase_create("PubSub DataSetField items handling");
+    tcase_add_checked_fixture(tc_add_pubsub_datasetfields, setup, teardown);
+    tcase_add_test(tc_add_pubsub_datasetfields, AddDataSetFieldWithValidConfiguration);
+    tcase_add_test(tc_add_pubsub_datasetfields, AddRemoveAddDataSetFieldWithValidConfiguration);
+    tcase_add_test(tc_add_pubsub_datasetfields, AddDataSetFieldWithNullConfig);
+    tcase_add_test(tc_add_pubsub_datasetfields, AddDataSetFieldWithInvalidPDSId);
+    tcase_add_test(tc_add_pubsub_datasetfields, GetDataSetFieldConfigurationAndCompareValues);
+
+    Suite *s = suite_create("PubSub WriterGroups/Writer/Fields handling and publishing");
+    suite_add_tcase(s, tc_add_pubsub_writergroup);
+    suite_add_tcase(s, tc_add_pubsub_datasetwriter);
+    suite_add_tcase(s, tc_add_pubsub_datasetfields);
+
+    SRunner *sr = srunner_create(s);
+    srunner_set_fork_status(sr, CK_NOFORK);
+    srunner_run_all(sr,CK_NORMAL);
+    int number_failed = srunner_ntests_failed(sr);
+    srunner_free(sr);
+    return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}