Bladeren bron

Add pubsub connection handling and udp plugin

Update CMakeLists.txt

adjusted check_types_memory for nodeset-1.04 types, small code analyzer fixes

analyzer fixes, added newlines

analyzer fixes, added newlines

update licence to CCZero

fix freeaddrinfo in error case, code and comment project adjustments
Andreas Ebner 7 jaren geleden
bovenliggende
commit
008e3fe43f

+ 13 - 3
CMakeLists.txt

@@ -219,7 +219,7 @@ if(NOT WIN32)
   endif()
 else()
   list(APPEND open62541_LIBRARIES ws2_32)
-    if(UA_ENABLE_DISCOVERY_MULTICAST)
+    if(UA_ENABLE_DISCOVERY_MULTICAST OR UA_ENABLE_PUBSUB)
         list(APPEND open62541_LIBRARIES iphlpapi)
     endif()
 endif()
@@ -337,6 +337,7 @@ endif()
 include_directories(${PROJECT_SOURCE_DIR}/include
                     ${PROJECT_SOURCE_DIR}/plugins
                     ${PROJECT_SOURCE_DIR}/deps
+                    ${PROJECT_SOURCE_DIR}/src/pubsub
                     ${PROJECT_BINARY_DIR}
                     ${PROJECT_BINARY_DIR}/src_generated
                     ${MBEDTLS_INCLUDE_DIRS})
@@ -353,6 +354,8 @@ set(exported_headers ${PROJECT_BINARY_DIR}/src_generated/ua_config.h
                      ${PROJECT_SOURCE_DIR}/include/ua_plugin_access_control.h
                      ${PROJECT_SOURCE_DIR}/include/ua_plugin_pki.h
                      ${PROJECT_SOURCE_DIR}/include/ua_plugin_securitypolicy.h
+                     ${PROJECT_SOURCE_DIR}/include/ua_server_pubsub.h
+                     ${PROJECT_SOURCE_DIR}/include/ua_plugin_pubsub.h
                      ${PROJECT_SOURCE_DIR}/include/ua_plugin_nodestore.h
                      ${PROJECT_SOURCE_DIR}/include/ua_server_config.h
                      ${PROJECT_SOURCE_DIR}/include/ua_client_config.h
@@ -454,10 +457,17 @@ if(UA_ENABLE_ENCRYPTION)
 endif()
 
 if(UA_ENABLE_PUBSUB)
-    list(APPEND internal_headers ${PROJECT_SOURCE_DIR}/src/pubsub/ua_pubsub_networkmessage.h)
-    list(APPEND lib_sources ${PROJECT_SOURCE_DIR}/src/pubsub/ua_pubsub_networkmessage.c)
+    list(APPEND internal_headers ${PROJECT_SOURCE_DIR}/src/pubsub/ua_pubsub_networkmessage.h
+            ${PROJECT_SOURCE_DIR}/src/pubsub/ua_pubsub_manager.h)
+    list(APPEND lib_sources ${PROJECT_SOURCE_DIR}/src/pubsub/ua_pubsub_networkmessage.c
+            ${PROJECT_SOURCE_DIR}/src/pubsub/ua_pubsub.c
+            ${PROJECT_SOURCE_DIR}/src/pubsub/ua_pubsub_manager.c)
+
+    list(APPEND default_plugin_headers ${PROJECT_SOURCE_DIR}/plugins/ua_network_pubsub_udp.h)
+    list(APPEND default_plugin_sources ${PROJECT_SOURCE_DIR}/plugins/ua_network_pubsub_udp.c)
 endif()
 
+
 if(UA_DEBUG_DUMP_PKGS)
     list(APPEND lib_sources ${PROJECT_SOURCE_DIR}/plugins/ua_debug_dump_pkgs.c)
 endif()

+ 6 - 0
doc/CMakeLists.txt

@@ -31,6 +31,8 @@ generate_rst(${PROJECT_SOURCE_DIR}/include/ua_plugin_network.h ${DOC_SRC_DIR}/pl
 generate_rst(${PROJECT_SOURCE_DIR}/include/ua_plugin_access_control.h ${DOC_SRC_DIR}/plugin_access_control.rst)
 generate_rst(${PROJECT_SOURCE_DIR}/src/server/ua_services.h ${DOC_SRC_DIR}/services.rst)
 generate_rst(${PROJECT_SOURCE_DIR}/include/ua_plugin_nodestore.h ${DOC_SRC_DIR}/nodestore.rst)
+generate_rst(${PROJECT_SOURCE_DIR}/include/ua_plugin_pubsub.h ${DOC_SRC_DIR}/plugin_pubsub_connection.rst)
+generate_rst(${PROJECT_SOURCE_DIR}/include/ua_server_pubsub.h ${DOC_SRC_DIR}/pubsub.rst)
 
 # Doc from tutorial code
 generate_rst(${PROJECT_SOURCE_DIR}/examples/tutorial_datatypes.c ${DOC_SRC_DIR}/tutorial_datatypes.rst)
@@ -60,6 +62,8 @@ add_custom_target(doc_latex ${SPHINX_EXECUTABLE}
           ${DOC_SRC_DIR}/tutorial_server_datasource.rst
           ${DOC_SRC_DIR}/tutorial_server_object.rst
           ${DOC_SRC_DIR}/tutorial_server_method.rst
+          ${DOC_SRC_DIR}/plugin_pubsub_connection.rst
+          ${DOC_SRC_DIR}/pubsub.rst
   COMMENT "Building LaTeX sources for documentation with Sphinx")
 add_dependencies(doc_latex open62541)
 
@@ -89,6 +93,8 @@ add_custom_target(doc ${SPHINX_EXECUTABLE}
           ${DOC_SRC_DIR}/tutorial_server_datasource.rst
           ${DOC_SRC_DIR}/tutorial_server_object.rst
           ${DOC_SRC_DIR}/tutorial_server_method.rst
+          ${DOC_SRC_DIR}/plugin_pubsub_connection.rst
+          ${DOC_SRC_DIR}/pubsub.rst
   COMMENT "Building HTML documentation with Sphinx")
 add_dependencies(doc open62541)
 

+ 2 - 0
doc/internal.rst

@@ -6,3 +6,5 @@ Internals
    plugin_network
    plugin_access_control
    plugin_log
+   plugin_pubsub_connection
+   pubsub

+ 8 - 0
examples/CMakeLists.txt

@@ -113,3 +113,11 @@ if(UA_ENABLE_DISCOVERY)
 endif()
 
 add_subdirectory(nodeset)
+
+####################
+#  Example PubSub  #
+####################
+
+if(UA_ENABLE_PUBSUB)
+    add_example(tutorial_pubsub_connection pubsub/tutorial_pubsub_connection.c)
+endif()

+ 84 - 0
examples/pubsub/tutorial_pubsub_connection.c

@@ -0,0 +1,84 @@
+
+/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
+ * See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */
+
+#include <signal.h>
+#include "open62541.h"
+
+UA_Boolean running = true;
+static void stopHandler(int sign) {
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
+    running = false;
+}
+
+/**
+ * The PubSub connection example demonstrate the PubSub TransportLayer configuration and
+ * the dynamic creation of PubSub Connections on runtime.
+ */
+int main(void) {
+    signal(SIGINT, stopHandler);
+    signal(SIGTERM, stopHandler);
+
+    UA_ServerConfig *config = UA_ServerConfig_new_default();
+
+    /* Add the PubSubTransportLayer implementation to the server config.
+     * The PubSubTransportLayer is a factory to create new connections
+     * on runtime. The UA_PubSubTransportLayer is used for all kinds of
+     * concrete connections e.g. UDP, MQTT, AMQP...
+     */
+    config->pubsubTransportLayers = (UA_PubSubTransportLayer *) UA_malloc(sizeof(UA_PubSubTransportLayer));
+    if(!config->pubsubTransportLayers) {
+        UA_ServerConfig_delete(config);
+        return -1;
+    }
+    /* It is possible to use multiple PubSubTransportLayers on runtime. The correct factory
+     * is selected on runtime by the standard defined PubSub TransportProfileUri's.
+     */
+    config->pubsubTransportLayers[0] = UA_PubSubTransportLayerUDPMP();
+    config->pubsubTransportLayersSize++;
+
+    UA_Server *server = UA_Server_new(config);
+
+    /* Create a new ConnectionConfig. The addPubSubConnection function takes the
+     * config and create a new connection. The Connection identifier is
+     * copied to the NodeId parameter.*/
+    UA_PubSubConnectionConfig connectionConfig;
+    memset(&connectionConfig, 0, sizeof(connectionConfig));
+    connectionConfig.name = UA_STRING("UDP-UADP Connection 1");
+    connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
+    connectionConfig.enabled = UA_TRUE;
+    /* The address and interface is part of the standard
+     * defined UA_NetworkAddressUrlDataType.
+     */
+    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING("opc.udp://224.0.0.22:4840/")};
+    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    connectionConfig.publisherId.numeric = UA_UInt32_random();
+    /* Connection options are given as Key/Value Pairs. The available options are
+     * maybe standard or vendor defined.
+     */
+    UA_KeyValuePair connectionOptions[3];
+    connectionOptions[0].key = UA_QUALIFIEDNAME(0, "ttl");
+    UA_UInt32 ttl = 10;
+    UA_Variant_setScalar(&connectionOptions[0].value, &ttl, &UA_TYPES[UA_TYPES_UINT32]);
+    connectionOptions[1].key = UA_QUALIFIEDNAME(0, "loopback");
+    UA_Boolean loopback = UA_FALSE;
+    UA_Variant_setScalar(&connectionOptions[1].value, &loopback, &UA_TYPES[UA_TYPES_UINT32]);
+    connectionOptions[2].key = UA_QUALIFIEDNAME(0, "reuse");
+    UA_Boolean reuse = UA_TRUE;
+    UA_Variant_setScalar(&connectionOptions[2].value, &reuse, &UA_TYPES[UA_TYPES_UINT32]);
+    connectionConfig.connectionProperties = connectionOptions;
+    connectionConfig.connectionPropertiesSize = 3;
+    /* Create a new concrete connection and add the connection
+     * to the current PubSub configuration.
+     */
+    UA_NodeId connectionIdentifier;
+    UA_StatusCode retval = UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdentifier);
+    if(retval == UA_STATUSCODE_GOOD){
+        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "The PubSub Connection was created successfully!");
+    }
+
+    retval |= UA_Server_run(server, &running);
+    UA_Server_delete(server);
+    UA_ServerConfig_delete(config);
+    return (int)retval;
+}

+ 1 - 0
include/ua_config.h.in

@@ -27,6 +27,7 @@ extern "C" {
 #cmakedefine UA_ENABLE_NODEMANAGEMENT
 #cmakedefine UA_ENABLE_SUBSCRIPTIONS
 #cmakedefine UA_ENABLE_MULTITHREADING
+#cmakedefine UA_ENABLE_PUBSUB
 #cmakedefine UA_ENABLE_ENCRYPTION
 
 /* Advanced Options */

+ 93 - 0
include/ua_plugin_pubsub.h

@@ -0,0 +1,93 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
+ */
+
+#ifndef UA_PLUGIN_PUBSUB_H_
+#define UA_PLUGIN_PUBSUB_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ua_server_pubsub.h"
+
+/**
+ * .. _pubsub_connection:
+ *
+ * PubSub Connection Plugin API
+ * ============================
+ *
+ * The PubSub Connection API is the interface between concrete network implementations and the internal pubsub
+ * code.
+ *
+ * The PubSub specification enables the creation of new connections on runtime. Wording:
+ * 'Connection' -> OPC UA standard 'highlevel' perspective,
+ * 'Channel' -> open62541 implementation 'lowlevel' perspective. A channel can be assigned with different
+ * network implementations like UDP, MQTT, AMQP. The channel provides basis services
+ * like send, regist, unregist, receive, close.
+ */
+
+typedef enum {
+    UA_PUBSUB_CHANNEL_RDY,
+    UA_PUBSUB_CHANNEL_PUB,
+    UA_PUBSUB_CHANNEL_SUB,
+    UA_PUBSUB_CHANNEL_PUB_SUB,
+    UA_PUBSUB_CHANNEL_ERROR,
+    UA_PUBSUB_CHANNEL_CLOSED
+} UA_PubSubChannelState;
+
+struct UA_PubSubChannel;
+typedef struct UA_PubSubChannel UA_PubSubChannel;
+
+//interface structure between network plugin and internal implementation
+struct UA_PubSubChannel{
+    UA_UInt32 publisherId;                                  // unique identifier
+    UA_PubSubChannelState state;
+    UA_PubSubConnectionConfig *connectionConfig;            //link to parent connection config
+    UA_Int32 sockfd;
+    void *handle;                                           //implementation specific data
+    /*@info for handle: each network implementation should provide an structure
+    * UA_PubSubChannelData[ImplementationName] This structure can be used by the
+    * network implementation to store network implementation specific data.*/
+
+    /* Sending out the content of the buf parameter */
+    UA_StatusCode (*send)(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings,
+                          const UA_ByteString *buf);
+
+    /* Register to an specified message source, e.g. multicast group or topic */
+    UA_StatusCode (*regist)(UA_PubSubChannel * channel, UA_ExtensionObject *transportSettings);
+
+    /* Remove subscription to an specified message source, e.g. multicast group or topic */
+    UA_StatusCode (*unregist)(UA_PubSubChannel * channel, UA_ExtensionObject *transportSettings);
+
+    /* Receive messages. A regist to the message source is needed before. */
+    UA_StatusCode (*receive)(UA_PubSubChannel * channel, UA_ByteString *,
+                             UA_ExtensionObject *transportSettings, UA_UInt32 timeout);
+
+    /* Closing the connection and implicit free of the channel structures. */
+    UA_StatusCode (*close)(UA_PubSubChannel *channel);
+};
+
+/**
+ *
+ * The UA_PubSubTransportLayer is used for the creation of new connections. Whenever on runtime a new
+ * connection is request, the internal PubSub implementation call * the 'createPubSubChannel' function.
+ * The 'transportProfileUri' contains the standard defined transport profile information
+ * and is used to identify the type of connections which can be created by the
+ * TransportLayer. The server config contains a list of UA_PubSubTransportLayer.
+ * Take a look in the tutorial_pubsub_connection to get informations about the TransportLayer handling.
+ */
+
+typedef struct UA_PubSubTransportLayer{
+    UA_String transportProfileUri;
+    UA_PubSubChannel * (*createPubSubChannel)(UA_PubSubConnectionConfig *connectionConfig);
+} UA_PubSubTransportLayer;
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* UA_PLUGIN_PUBSUB_H_ */

+ 10 - 0
include/ua_server_config.h

@@ -22,6 +22,10 @@ extern "C" {
 #include "ua_plugin_securitypolicy.h"
 #include "ua_plugin_nodestore.h"
 
+#ifdef UA_ENABLE_PUBSUB
+#include "ua_plugin_pubsub.h"
+#endif
+
 /**
  * .. _server-configuration:
  *
@@ -86,6 +90,12 @@ struct UA_ServerConfig {
     UA_ServerNetworkLayer *networkLayers;
     UA_String customHostname;
 
+#ifdef UA_ENABLE_PUBSUB
+    /*PubSub network layer */
+    size_t pubsubTransportLayersSize;
+    UA_PubSubTransportLayer *pubsubTransportLayers;
+#endif
+
     /* Available endpoints */
     size_t endpointsSize;
     UA_Endpoint *endpoints;

+ 78 - 0
include/ua_server_pubsub.h

@@ -0,0 +1,78 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
+ */
+
+#ifndef UA_SERVER_PUBSUB_H
+#define UA_SERVER_PUBSUB_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ua_server.h"
+
+/**
+ * .. _pubsub:
+ *
+ * Publish/Subscribe
+ * =================
+ *
+ * Work in progress!
+ * This part will be a new chapter later.
+ *
+ * TODO: write general PubSub introduction
+ *
+ * The Publish/Subscribe (PubSub) extension for OPC UA enables fast and efficient
+ * 1:m communication. The PubSub extension is protocol agnostic and can be used
+ * with broker based protocols like MQTT and AMQP or brokerless implementations like UDP-Multicasting.
+ *
+ * The PubSub API uses the following scheme:
+ *
+ * 1. Create a configuration for the needed PubSub element.
+ *
+ * 2. Call the add[element] function and pass in the configuration.
+ * 
+ * 3. The add[element] function returns the unique nodeId of the internally created element.
+ *
+ * Take a look on the PubSub Tutorials for mor details about the API usage.
+ * Connections
+ * -----------
+ */
+
+typedef struct{
+    UA_String name;
+    UA_Boolean enabled;
+    union{ //std: valid types UInt or String
+        UA_UInt32 numeric;
+        UA_String string;
+    }publisherId;
+    UA_String transportProfileUri;
+    UA_Variant address;
+    size_t connectionPropertiesSize;
+    UA_KeyValuePair *connectionProperties;
+    UA_Variant connectionTransportSettings;
+} UA_PubSubConnectionConfig;
+
+UA_StatusCode
+UA_PubSubConnection_getConfig(UA_Server *server, UA_NodeId connectionIdentifier,
+                              UA_PubSubConnectionConfig *config);
+
+/**
+ * **Create and Remove Connections**
+ */
+
+UA_StatusCode
+UA_Server_addPubSubConnection(UA_Server *server, const UA_PubSubConnectionConfig *connectionConfig,
+                              UA_NodeId *connectionIdentifier);
+
+UA_StatusCode
+UA_Server_removePubSubConnection(UA_Server *server, UA_NodeId connectionIdentifier);
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* UA_SERVER_PUBSUB_H */

+ 484 - 0
plugins/ua_network_pubsub_udp.c

@@ -0,0 +1,484 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
+ */
+
+/* Enable POSIX features */
+#if !defined(_XOPEN_SOURCE) && !defined(_WRS_KERNEL)
+# define _XOPEN_SOURCE 600
+#endif
+#ifndef _DEFAULT_SOURCE
+# define _DEFAULT_SOURCE
+#endif
+
+/* On older systems we need to define _BSD_SOURCE.
+ * _DEFAULT_SOURCE is an alias for that. */
+#ifndef _BSD_SOURCE
+# define _BSD_SOURCE
+#endif
+
+ /* Disable some security warnings on MSVC */
+#ifdef _MSC_VER
+# define _CRT_SECURE_NO_WARNINGS
+#endif
+
+ /* Assume that Windows versions are newer than Windows XP */
+#if defined(__MINGW32__) && (!defined(WINVER) || WINVER < 0x501)
+# undef WINVER
+# undef _WIN32_WINDOWS
+# undef _WIN32_WINNT
+# define WINVER 0x0501
+# define _WIN32_WINDOWS 0x0501
+# define _WIN32_WINNT 0x0501
+#endif
+
+#ifdef _WIN32
+# include <winsock2.h>
+# include <ws2tcpip.h>
+# include <Iphlpapi.h>
+# define CLOSESOCKET(S) closesocket((SOCKET)S)
+# define ssize_t int
+# define UA_fd_set(fd, fds) FD_SET((unsigned int)fd, fds)
+# define UA_fd_isset(fd, fds) FD_ISSET((unsigned int)fd, fds)
+#else /* _WIN32 */
+#  define CLOSESOCKET(S) close(S)
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <net/if.h>
+# define UA_fd_set(fd, fds) FD_SET(fd, fds)
+# define UA_fd_isset(fd, fds) FD_ISSET(fd, fds)
+# endif /* Not Windows */
+
+#include <stdio.h>
+#include "ua_plugin_network.h"
+#include "ua_network_pubsub_udp.h"
+#include "ua_log_stdout.h"
+
+//UDP multicast network layer specific internal data
+typedef struct {
+    int ai_family;                        //Protocol family for socket.  IPv4/IPv6
+    struct sockaddr_storage *ai_addr;     //https://msdn.microsoft.com/de-de/library/windows/desktop/ms740496(v=vs.85).aspx
+    UA_UInt32 messageTTL;
+    UA_Boolean enableLoopback;
+    UA_Boolean enableReuse;
+} UA_PubSubChannelDataUDPMC;
+
+/**
+ * Open communication socket based on the connectionConfig. Protocol specific parameters are
+ * provided within the connectionConfig as KeyValuePair.
+ * Currently supported options: "ttl" , "loopback", "reuse"
+ *
+ * @return ref to created channel, NULL on error
+ */
+static UA_PubSubChannel *
+UA_PubSubChannelUDPMC_open(const UA_PubSubConnectionConfig *connectionConfig) {
+    #ifdef _WIN32
+        WSADATA wsaData;
+        WSAStartup(MAKEWORD(2, 2), &wsaData);
+    #endif /* Not Windows */
+
+    UA_NetworkAddressUrlDataType address;
+    if(UA_Variant_hasScalarType(&connectionConfig->address, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE])){
+        address = *(UA_NetworkAddressUrlDataType *)connectionConfig->address.data;
+    } else {
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Invalid Address.");
+        return NULL;
+    }
+    //allocate and init memory for the UDP multicast specific internal data
+    UA_PubSubChannelDataUDPMC * channelDataUDPMC =
+            (UA_PubSubChannelDataUDPMC *) UA_calloc(1, (sizeof(UA_PubSubChannelDataUDPMC)));
+    if(!channelDataUDPMC){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Out of memory.");
+        return NULL;
+    }
+    //set default values
+    memcpy(channelDataUDPMC, &(UA_PubSubChannelDataUDPMC){0, NULL, 255, UA_TRUE, UA_TRUE}, sizeof(UA_PubSubChannelDataUDPMC));
+    //iterate over the given KeyValuePair paramters
+    UA_String ttlParam = UA_STRING("ttl"), loopbackParam = UA_STRING("loopback"), reuseParam = UA_STRING("reuse");
+    for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
+        if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &ttlParam)){
+            if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_UINT32])){
+                channelDataUDPMC->messageTTL = *(UA_UInt32 *) connectionConfig->connectionProperties[i].value.data;
+            }
+        } else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &loopbackParam)){
+            if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_BOOLEAN])){
+                channelDataUDPMC->enableLoopback = *(UA_Boolean *) connectionConfig->connectionProperties[i].value.data;
+            }
+        } else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &reuseParam)){
+            if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_BOOLEAN])){
+                channelDataUDPMC->enableReuse = *(UA_Boolean *) connectionConfig->connectionProperties[i].value.data;
+            }
+        } else {
+            UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation. Unknown connection parameter.");
+        }
+    }
+
+    UA_PubSubChannel *newChannel = (UA_PubSubChannel *) UA_calloc(1, sizeof(UA_PubSubChannel));
+    if(!newChannel){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Out of memory.");
+        UA_free(channelDataUDPMC);
+        return NULL;
+    }
+    struct addrinfo hints, *rp, *requestResult = NULL;
+    memset(&hints, 0, sizeof hints);
+    hints.ai_family = AF_UNSPEC;
+    hints.ai_socktype = SOCK_DGRAM;
+    hints.ai_flags = 0;
+    hints.ai_protocol = 0;
+
+    UA_String hostname, path;
+    UA_UInt16 networkPort;
+    //TODO replace fallback to use the existing parseEndpointUrl function. Extend parseEndpointUrl for UDP or create own parseEndpointUrl function for PubSub.
+    if(strncmp((char*)&address.url.data, "opc.udp://", 10) != 0){
+        strncpy((char*)address.url.data, "opc.tcp://", 10);
+    } else {
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                     "PubSub Connection creation failed. Invalid URL.");
+        UA_free(channelDataUDPMC);
+        UA_free(newChannel);
+        return NULL;
+    }
+    if(UA_parseEndpointUrl(&address.url, &hostname, &networkPort, &path) != UA_STATUSCODE_GOOD){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                     "PubSub Connection creation failed. Invalid URL.");
+        UA_free(channelDataUDPMC);
+        UA_free(newChannel);
+        return NULL;
+    }
+    if(hostname.length > 512) {
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                     "PubSub Connection creation failed. URL maximum length is 512.");
+        UA_free(channelDataUDPMC);
+        UA_free(newChannel);
+        return NULL;
+    }
+
+    UA_STACKARRAY(char, addressAsChar, sizeof(char) * hostname.length +1);
+    memcpy(addressAsChar, hostname.data, hostname.length);
+    addressAsChar[hostname.length] = 0;
+    char port[6];
+    sprintf(port, "%u", networkPort);
+
+    if(getaddrinfo(addressAsChar, port, &hints, &requestResult) != 0) {
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                     "PubSub Connection creation failed. Internal error.");
+        UA_free(channelDataUDPMC);
+        UA_free(newChannel);
+        return NULL;
+    }
+
+    //check if the ip address is a multicast address
+    if(requestResult->ai_family == PF_INET){
+        struct in_addr imr_interface;
+        inet_pton(AF_INET, addressAsChar, &imr_interface);
+        if((ntohl(imr_interface.s_addr) & 0xF0000000) != 0xE0000000){
+            UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                         "PubSub Connection creation failed. No multicast address.");
+            freeaddrinfo(requestResult);
+            UA_free(channelDataUDPMC);
+            UA_free(newChannel);
+            return NULL;
+        }
+    } else {
+        //TODO check if ipv6 addrr is multicast address.
+    }
+
+    for(rp = requestResult; rp != NULL; rp = rp->ai_next){
+        newChannel->sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+        if(newChannel->sockfd != -1){
+            break; /*success*/
+        }
+    }
+    if(!rp){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                     "PubSub Connection creation failed. Internal error.");
+        freeaddrinfo(requestResult);
+        UA_free(channelDataUDPMC);
+        UA_free(newChannel);
+        return NULL;
+    }
+    channelDataUDPMC->ai_family = rp->ai_family;
+    channelDataUDPMC->ai_addr = (struct sockaddr_storage *) UA_calloc(1, sizeof(struct sockaddr_storage));
+    if(!channelDataUDPMC->ai_addr){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                     "PubSub Connection creation failed. Out of memory.");
+        CLOSESOCKET(newChannel->sockfd);
+        freeaddrinfo(requestResult);
+        UA_free(channelDataUDPMC);
+        UA_free(newChannel);
+        return NULL;
+    }
+    memcpy(channelDataUDPMC->ai_addr, rp->ai_addr, sizeof(*rp->ai_addr));
+    //link channel and internal channel data
+    newChannel->handle = channelDataUDPMC;
+
+    //Set loop back data to your host
+    if(setsockopt(newChannel->sockfd,
+                     requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6:IPPROTO_IP,
+                     requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_LOOP : IP_MULTICAST_LOOP,
+                     (const char *)&channelDataUDPMC->enableLoopback, sizeof (channelDataUDPMC->enableLoopback)) < 0) {
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                     "PubSub Connection creation failed. Loopback setup failed.");
+        CLOSESOCKET(newChannel->sockfd);
+        freeaddrinfo(requestResult);
+        UA_free(channelDataUDPMC);
+        UA_free(newChannel);
+        return NULL;
+    }
+
+    //Set Time to live (TTL). Value of 1 prevent forward beyond the local network.
+    if(setsockopt(newChannel->sockfd,
+                  requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6:IPPROTO_IP,
+                  requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_HOPS : IP_MULTICAST_TTL,
+                  (const char *)&channelDataUDPMC->messageTTL, sizeof(channelDataUDPMC->messageTTL)) < 0) {
+        UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                     "PubSub Connection creation problem. Time to live setup failed.");
+    }
+
+    //Set reuse address -> enables sharing of the same listening address on different sockets.
+    if(channelDataUDPMC->enableReuse){
+        int enableReuse = 1;
+        if(setsockopt(newChannel->sockfd,
+                      SOL_SOCKET, SO_REUSEADDR,
+                      (const char*)&enableReuse, sizeof(enableReuse)) < 0){
+            UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                           "PubSub Connection creation problem. Reuse address setup failed.");
+        }
+    }
+
+    //Set the physical interface for outgoing traffic
+    if(address.networkInterface.length > 0){
+        UA_STACKARRAY(char, interfaceAsChar, sizeof(char) * address.networkInterface.length + 1);
+        memcpy(interfaceAsChar, address.networkInterface.data, address.networkInterface.length);
+        interfaceAsChar[address.networkInterface.length] = 0;
+        enum{
+            IPv4,
+            IPv6,
+            INVALID
+        } ipVersion;
+        union {
+            struct ip_mreq ipv4;
+            struct ipv6_mreq ipv6;
+        } group;
+        if(inet_pton(AF_INET, interfaceAsChar, &group.ipv4.imr_interface)){
+            ipVersion = IPv4;
+        } else if (inet_pton(AF_INET6, interfaceAsChar, &group.ipv6.ipv6mr_multiaddr)){
+            group.ipv6.ipv6mr_interface = if_nametoindex(interfaceAsChar);
+            ipVersion = IPv6;
+        } else {
+            ipVersion = INVALID;
+        }
+        if(ipVersion == INVALID ||
+                setsockopt(newChannel->sockfd,
+                           requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP,
+                           requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_IF : IP_MULTICAST_IF,
+                           ipVersion == IPv6 ? (void *) &group.ipv6.ipv6mr_interface : &group.ipv4.imr_interface,
+                           ipVersion == IPv6 ? sizeof(group.ipv6.ipv6mr_interface) : sizeof(struct in_addr)) < 0){
+            UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                           "PubSub Connection creation problem. Interface selection failed.");
+        };
+    }
+    freeaddrinfo(requestResult);
+    newChannel->state = UA_PUBSUB_CHANNEL_PUB;
+    return newChannel;
+}
+
+/**
+ * Subscribe to a given address.
+ *
+ * @return UA_STATUSCODE_GOOD on success
+ */
+static UA_StatusCode
+UA_PubSubChannelUDPMC_regist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings) {
+    if(!(channel->state == UA_PUBSUB_CHANNEL_PUB || channel->state == UA_PUBSUB_CHANNEL_RDY)){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
+        return UA_STATUSCODE_BADINTERNALERROR;
+    }
+    UA_PubSubChannelDataUDPMC * connectionConfig = (UA_PubSubChannelDataUDPMC *) channel->handle;
+    if(connectionConfig->ai_family == PF_INET){//IPv4 handling
+        struct sockaddr_in addr;
+        memcpy(&addr, connectionConfig->ai_addr, sizeof(struct sockaddr_in));
+        addr.sin_addr.s_addr = INADDR_ANY;
+        if (bind(channel->sockfd, (const struct sockaddr *)&addr, sizeof(struct sockaddr_in)) != 0){
+            UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
+            return UA_STATUSCODE_BADINTERNALERROR;
+        }
+        struct ip_mreq groupV4;
+        memcpy(&groupV4.imr_multiaddr, &((const struct sockaddr_in *)connectionConfig->ai_addr)->sin_addr, sizeof(struct ip_mreq));
+        groupV4.imr_interface.s_addr = htonl(INADDR_ANY);
+        //multihomed hosts can join several groups on different IF, INADDR_ANY -> kernel decides
+
+        if(setsockopt(channel->sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &groupV4, sizeof(groupV4)) != 0){
+            UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
+            return UA_STATUSCODE_BADINTERNALERROR;
+        }
+
+    } else if (connectionConfig->ai_family == PF_INET6) {//IPv6 handling
+        //TODO implement regist for IPv6
+    } else {
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
+        return UA_STATUSCODE_BADINTERNALERROR;
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+/**
+ * Remove current subscription.
+ *
+ * @return UA_STATUSCODE_GOOD on success
+ */
+static UA_StatusCode
+UA_PubSubChannelUDPMC_unregist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings) {
+    if(!(channel->state == UA_PUBSUB_CHANNEL_PUB_SUB || channel->state == UA_PUBSUB_CHANNEL_SUB)){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection unregist failed.");
+        return UA_STATUSCODE_BADINTERNALERROR;
+    }
+    UA_PubSubChannelDataUDPMC * connectionConfig = (UA_PubSubChannelDataUDPMC *) channel->handle;
+    if(connectionConfig->ai_family == PF_INET){//IPv4 handling
+        struct ip_mreq groupV4;
+        memcpy(&groupV4.imr_multiaddr, &((const struct sockaddr_in *)connectionConfig->ai_addr)->sin_addr, sizeof(struct ip_mreq));
+        groupV4.imr_interface.s_addr = htonl(INADDR_ANY);
+
+        if(setsockopt(channel->sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (char *) &groupV4, sizeof(groupV4)) != 0){
+            UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection unregist failed.");
+            return UA_STATUSCODE_BADINTERNALERROR;
+        }
+    } else if (connectionConfig->ai_family == PF_INET6) {//IPv6 handling
+        //TODO implement unregist for IPv6
+    } else {
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection unregist failed.");
+        return UA_STATUSCODE_BADINTERNALERROR;
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+/**
+ * Send messages to the connection defined address
+ *
+ * @return UA_STATUSCODE_GOOD if success
+ */
+static UA_StatusCode
+UA_PubSubChannelUDPMC_send(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettigns, const UA_ByteString *buf) {
+    UA_PubSubChannelDataUDPMC *channelConfigUDPMC = (UA_PubSubChannelDataUDPMC *) channel->handle;
+    if(!(channel->state == UA_PUBSUB_CHANNEL_PUB || channel->state == UA_PUBSUB_CHANNEL_PUB_SUB)){
+        UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection sending failed. Invalid state.");
+        return UA_STATUSCODE_BADINTERNALERROR;
+    }
+    //TODO evalute: chunk messages or check against MTU?
+    long nWritten = 0;
+    while (nWritten < (long)buf->length) {
+        long n = sendto(channel->sockfd, buf->data, buf->length, 0,
+                        (struct sockaddr *) channelConfigUDPMC->ai_addr, sizeof(struct sockaddr_storage));
+        if(n == -1L) {
+            UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection sending failed.");
+            return UA_STATUSCODE_BADINTERNALERROR;
+        }
+        nWritten += n;
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+/**
+ * Receive messages. The regist function should be called before.
+ *
+ * @param timeout in usec | on windows platforms are only multiples of 1000usec possible
+ * @return
+ */
+static UA_StatusCode
+UA_PubSubChannelUDPMC_receive(UA_PubSubChannel *channel, UA_ByteString *message, UA_ExtensionObject *transportSettigns, UA_UInt32 timeout){
+    if(!(channel->state == UA_PUBSUB_CHANNEL_PUB || channel->state == UA_PUBSUB_CHANNEL_PUB_SUB)) {
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection receive failed. Invalid state.");
+        return UA_STATUSCODE_BADINTERNALERROR;
+    }
+    UA_PubSubChannelDataUDPMC *channelConfigUDPMC = (UA_PubSubChannelDataUDPMC *) channel->handle;
+
+    if(timeout > 0) {
+        fd_set fdset;
+        FD_ZERO(&fdset);
+        UA_fd_set(channel->sockfd, &fdset);
+        struct timeval tmptv = {(long int)(timeout / 1000000),
+                                (long int)(timeout % 1000000)};
+        int resultsize = select(channel->sockfd+1, &fdset, NULL,
+                                NULL, &tmptv);
+        if(resultsize == 0) {
+            message->length = 0;
+            return UA_STATUSCODE_GOODNONCRITICALTIMEOUT;
+        }
+        if (resultsize == -1) {
+            message->length = 0;
+            return UA_STATUSCODE_BADINTERNALERROR;
+        }
+    }
+
+    if(channelConfigUDPMC->ai_family == PF_INET){
+        ssize_t messageLength;
+        messageLength = recvfrom(channel->sockfd, message->data, message->length, 0, NULL, NULL);
+        if(messageLength > 0){
+            message->length = (size_t) messageLength;
+        } else {
+            message->length = 0;
+        }
+    } else {
+        //TODO implement recieve for IPv6
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+/**
+ * Close channel and free the channel data.
+ *
+ * @return UA_STATUSCODE_GOOD if success
+ */
+static UA_StatusCode
+UA_PubSubChannelUDPMC_close(UA_PubSubChannel *channel) {
+#ifdef _WIN32
+    WSACleanup();
+#endif /* Not Windows */
+    if(CLOSESOCKET(channel->sockfd) != 0){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection delete failed.");
+        return UA_STATUSCODE_BADINTERNALERROR;
+    }
+    //cleanup the internal NetworkLayer data
+    UA_PubSubChannelDataUDPMC *networkLayerData = (UA_PubSubChannelDataUDPMC *) channel->handle;
+    UA_free(networkLayerData->ai_addr);
+    UA_free(networkLayerData);
+    UA_free(channel);
+    return UA_STATUSCODE_GOOD;
+}
+
+/**
+ * Generate a new channel. based on the given configuration.
+ *
+ * @param connectionConfig connection configuration
+ * @return  ref to created channel, NULL on error
+ */
+static UA_PubSubChannel *
+TransportLayerUDPMC_addChannel(UA_PubSubConnectionConfig *connectionConfig) {
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSub channel requested");
+    UA_PubSubChannel * pubSubChannel = UA_PubSubChannelUDPMC_open(connectionConfig);
+    if(pubSubChannel){
+        pubSubChannel->regist = UA_PubSubChannelUDPMC_regist;
+        pubSubChannel->unregist = UA_PubSubChannelUDPMC_unregist;
+        pubSubChannel->send = UA_PubSubChannelUDPMC_send;
+        pubSubChannel->receive = UA_PubSubChannelUDPMC_receive;
+        pubSubChannel->close = UA_PubSubChannelUDPMC_close;
+        pubSubChannel->connectionConfig = connectionConfig;
+    }
+    return pubSubChannel;
+}
+
+//UDPMC channel factory
+UA_PubSubTransportLayer
+UA_PubSubTransportLayerUDPMP() {
+    UA_PubSubTransportLayer pubSubTransportLayer;
+    pubSubTransportLayer.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
+    pubSubTransportLayer.createPubSubChannel = &TransportLayerUDPMC_addChannel;
+    return pubSubTransportLayer;
+}
+
+#undef _POSIX_C_SOURCE

+ 23 - 0
plugins/ua_network_pubsub_udp.h

@@ -0,0 +1,23 @@
+/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
+ * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
+ *
+ *    Copyright 2017-2018 (c) Fraunhofer IOSB (Author: Andreas Ebner)
+ */
+
+#ifndef UA_NETWORK_UDPMC_H_
+#define UA_NETWORK_UDPMC_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ua_plugin_pubsub.h"
+
+UA_PubSubTransportLayer
+UA_PubSubTransportLayerUDPMP(void);
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* UA_NETWORK_UDPMC_H_ */

+ 82 - 0
src/pubsub/ua_pubsub.c

@@ -0,0 +1,82 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
+ */
+
+#include "ua_pubsub.h"
+#include "ua_pubsub_manager.h"
+
+/**********************************************/
+/*               Connection                   */
+/**********************************************/
+UA_StatusCode
+UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src, UA_PubSubConnectionConfig *dst) {
+    UA_StatusCode retVal = UA_STATUSCODE_GOOD;
+    memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
+    retVal |= UA_String_copy(&src->name, &dst->name);
+    retVal |= UA_Variant_copy(&src->address, &dst->address);
+    retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
+    retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
+    if(src->connectionPropertiesSize > 0){
+        dst->connectionProperties = (UA_KeyValuePair *) UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
+        if(!dst->connectionProperties){
+            return UA_STATUSCODE_BADOUTOFMEMORY;
+        }
+        for(size_t i = 0; i < src->connectionPropertiesSize; i++){
+            retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key, &dst->connectionProperties[i].key);
+            retVal |= UA_Variant_copy(&src->connectionProperties[i].value, &dst->connectionProperties[i].value);
+        }
+    }
+    return retVal;
+}
+
+/**
+ * Get the current config of the Connection.
+ *
+ * @param server
+ * @param connectionIdentifier
+ * @param config
+ * @return UA_STATUSCODE_GOOD on success
+ */
+UA_StatusCode
+UA_PubSubConnection_getConfig(UA_Server *server, UA_NodeId connectionIdentifier,
+                              UA_PubSubConnectionConfig *config) {
+    if(!config)
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+
+    UA_PubSubConnection *currentPubSubConnection = UA_PubSubManager_findConnectionbyId(server, connectionIdentifier);
+    if(!currentPubSubConnection)
+        return UA_STATUSCODE_BADNOTFOUND;
+
+    UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
+    //deep copy of the actual config
+    UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
+    *config = tmpPubSubConnectionConfig;
+    return UA_STATUSCODE_GOOD;
+}
+
+void
+UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig) {
+    UA_String_deleteMembers(&connectionConfig->name);
+    UA_String_deleteMembers(&connectionConfig->transportProfileUri);
+    UA_Variant_deleteMembers(&connectionConfig->connectionTransportSettings);
+    UA_Variant_deleteMembers(&connectionConfig->address);
+    for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
+        UA_QualifiedName_deleteMembers(&connectionConfig->connectionProperties[i].key);
+        UA_Variant_deleteMembers(&connectionConfig->connectionProperties[i].value);
+    }
+    UA_free(connectionConfig->connectionProperties);
+}
+
+void
+UA_PubSubConnection_delete(UA_PubSubConnection *connection) {
+    //delete connection config
+    UA_PubSubConnectionConfig_deleteMembers(connection->config);
+    UA_NodeId_deleteMembers(&connection->identifier);
+    if(connection->channel){
+        connection->channel->close(connection->channel);
+    }
+    UA_free(connection->config);
+}

+ 46 - 0
src/pubsub/ua_pubsub.h

@@ -0,0 +1,46 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
+ */
+
+#ifndef UA_PUBSUB_H_
+#define UA_PUBSUB_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <queue.h>
+#include "ua_plugin_pubsub.h"
+#include "ua_pubsub_networkmessage.h"
+#include "ua_server.h"
+#include "ua_server_pubsub.h"
+
+//forward declarations
+struct UA_PubSubConnection;
+typedef struct UA_PubSubConnection UA_PubSubConnection;
+
+/**********************************************/
+/*               Connection                   */
+/**********************************************/
+//the connection config (public part of connection) object is defined in include/ua_plugin_pubsub.h
+struct UA_PubSubConnection{
+    UA_PubSubConnectionConfig *config;
+    //internal fields
+    UA_PubSubChannel *channel;
+    UA_NodeId identifier;
+};
+
+UA_StatusCode
+UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src, UA_PubSubConnectionConfig *dst);
+
+void UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig);
+void UA_PubSubConnection_delete(UA_PubSubConnection *connection);
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* UA_PUBSUB_H_ */

+ 175 - 0
src/pubsub/ua_pubsub_manager.c

@@ -0,0 +1,175 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
+ */
+
+#include "ua_pubsub_manager.h"
+#include "ua_log_stdout.h"
+#include "server/ua_server_internal.h"
+
+/**
+ * Add new Connection to the current PubSub configuration.
+ *
+ * @param server
+ * @param connectionConfig config of the new Connection
+ * @param connectionIdentifier nodeId of the generated Connection (NULL if not needed)
+ * @return UA_STATUSCODE_GOOD if success
+ */
+UA_StatusCode
+UA_Server_addPubSubConnection(UA_Server *server, const UA_PubSubConnectionConfig *connectionConfig,
+                              UA_NodeId *connectionIdentifier) {
+    //iterate over the available UA_PubSubTransportLayers
+    for(size_t i = 0; i < server->config.pubsubTransportLayersSize; i++) {
+        if(connectionConfig && UA_String_equal(&server->config.pubsubTransportLayers[i].transportProfileUri,
+                                               &connectionConfig->transportProfileUri)){
+            //create new connection config
+            UA_PubSubConnectionConfig *tmpConnectionConfig = (UA_PubSubConnectionConfig *)
+                    UA_calloc(1, sizeof(UA_PubSubConnectionConfig));
+            if(!tmpConnectionConfig){
+                UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                             "PubSub Connection creation failed. Out of Memory.");
+                return UA_STATUSCODE_BADOUTOFMEMORY;
+            }
+            //deep copy the given connection config
+            if(UA_PubSubConnectionConfig_copy(connectionConfig, tmpConnectionConfig) != UA_STATUSCODE_GOOD){
+                UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                             "PubSub Connection creation failed. Config copy problem.");
+                return UA_STATUSCODE_BADOUTOFMEMORY;
+            }
+            //create new connection and add to UA_PubSubManager
+            UA_PubSubConnection *newConnectionsField = (UA_PubSubConnection *)
+                    UA_realloc(server->pubSubManager.connections,
+                               sizeof(UA_PubSubConnection) * (server->pubSubManager.connectionsSize + 1));
+            if(!newConnectionsField) {
+                UA_PubSubConnectionConfig_deleteMembers(tmpConnectionConfig);
+                UA_free(tmpConnectionConfig);
+                UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                             "PubSub Connection creation failed. Out of Memory.");
+                return UA_STATUSCODE_BADOUTOFMEMORY;
+            }
+            server->pubSubManager.connections = newConnectionsField;
+            UA_PubSubConnection *newConnection = &server->pubSubManager.connections[server->pubSubManager.connectionsSize];
+            memset(newConnection, 0, sizeof(UA_PubSubConnection));
+            newConnection->config = tmpConnectionConfig;
+            newConnection->channel = server->config.pubsubTransportLayers[i].createPubSubChannel(newConnection->config);
+            if(!newConnection->channel){
+                UA_PubSubConnection_delete(newConnection);
+                if(server->pubSubManager.connectionsSize > 0){
+                    newConnectionsField = (UA_PubSubConnection *)
+                            UA_realloc(server->pubSubManager.connections,
+                                       sizeof(UA_PubSubConnection) * (server->pubSubManager.connectionsSize));
+                    if(!newConnectionsField) {
+                        return UA_STATUSCODE_BADINTERNALERROR;
+                    }
+                    server->pubSubManager.connections = newConnectionsField;
+                } else  {
+                    UA_free(newConnectionsField);
+                    server->pubSubManager.connections = NULL;
+                }
+                UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                             "PubSub Connection creation failed. Transport layer creation problem.");
+                return UA_STATUSCODE_BADINTERNALERROR;
+            }
+            UA_PubSubManager_generateUniqueNodeId(server, &newConnection->identifier);
+            if(connectionIdentifier != NULL){
+                UA_NodeId_copy(&newConnection->identifier, connectionIdentifier);
+            }
+            server->pubSubManager.connectionsSize++;
+            return UA_STATUSCODE_GOOD;
+        }
+    }
+    UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                 "PubSub Connection creation failed. Requested transport layer not found.");
+    return UA_STATUSCODE_BADNOTFOUND;
+}
+
+/**
+ * Remove Connection, identified by the NodeId. Deletion of Connection
+ * removes all contained WriterGroups and Writers.
+ *
+ * @param server
+ * @param connectionIdentifier
+ * @return UA_STATUSCODE_GOOD on success
+ */
+UA_StatusCode UA_Server_removePubSubConnection(UA_Server *server, UA_NodeId connectionIdentifier) {
+    //search the identified Connection and store the Connection index
+    size_t connectionIndex;
+    UA_PubSubConnection *currentConnection = NULL;
+    for(connectionIndex = 0; connectionIndex < server->pubSubManager.connectionsSize; connectionIndex++){
+        if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[connectionIndex].identifier)){
+            currentConnection = &server->pubSubManager.connections[connectionIndex];
+            break;
+        }
+    }
+    if(!currentConnection){
+        return UA_STATUSCODE_BADNOTFOUND;
+    }
+    UA_PubSubConnection_delete(currentConnection);
+    server->pubSubManager.connectionsSize--;
+    //remove the connection from the pubSubManager, move the last connection into the allocated memory of the deleted connection
+    if(server->pubSubManager.connectionsSize != connectionIndex){
+        memcpy(&server->pubSubManager.connections[connectionIndex],
+               &server->pubSubManager.connections[server->pubSubManager.connectionsSize], sizeof(UA_PubSubConnection));
+    }
+    if(server->pubSubManager.connectionsSize <= 0){
+        UA_free(server->pubSubManager.connections);
+        server->pubSubManager.connections = NULL;
+    }  else {
+        server->pubSubManager.connections = (UA_PubSubConnection *)
+                UA_realloc(server->pubSubManager.connections, sizeof(UA_PubSubConnection) * server->pubSubManager.connectionsSize);
+        if(!server->pubSubManager.connections){
+            return UA_STATUSCODE_BADINTERNALERROR;
+        }
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+/**
+ * Generate a new unique NodeId. This NodeId will be used for the
+ * information model representation of PubSub entities.
+ */
+void UA_PubSubManager_generateUniqueNodeId(UA_Server *server, UA_NodeId *nodeId) {
+    UA_NodeId newNodeId = UA_NODEID_NUMERIC(0, 0);
+    UA_Node *newNode = UA_Nodestore_new(server, UA_NODECLASS_OBJECT);
+    UA_Nodestore_insert(server, newNode, &newNodeId);
+    UA_NodeId_copy(&newNodeId, nodeId);
+}
+
+/**
+ * Delete the current PubSub configuration including all nested members. This action also delete
+ * the configured PubSub transport Layers.
+ *
+ * @param server
+ * @param pubSubManager
+ */
+void
+UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager) {
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub cleanup was called.");
+    //remove Connections and WriterGroups
+    while(pubSubManager->connectionsSize > 0){
+        UA_Server_removePubSubConnection(server, pubSubManager->connections[pubSubManager->connectionsSize-1].identifier);
+    }
+    //free the currently configured transport layers
+    for(size_t i = 0; i < server->config.pubsubTransportLayersSize; i++){
+        UA_free(&server->config.pubsubTransportLayers[i]);
+    }
+}
+
+/**
+ * Find a Connection by the connectionIdentifier.
+ *
+ * @param server
+ * @param connectionIdentifier
+ * @return
+ */
+UA_PubSubConnection *
+UA_PubSubManager_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
+    for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
+        if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
+            return &server->pubSubManager.connections[i];
+        }
+    }
+    return NULL;
+}

+ 35 - 0
src/pubsub/ua_pubsub_manager.h

@@ -0,0 +1,35 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
+ */
+
+#ifndef UA_PUBSUB_MANAGER_H_
+#define UA_PUBSUB_MANAGER_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ua_pubsub.h"
+#include "ua_server_pubsub.h"
+
+typedef struct UA_PubSubManager{
+    //Connections and PublishedDataSets can exist alone (own lifecycle) -> top level components
+    size_t connectionsSize;
+    UA_PubSubConnection *connections;
+} UA_PubSubManager;
+
+void UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager);
+
+void UA_PubSubManager_generateUniqueNodeId(UA_Server *server, UA_NodeId *nodeId);
+
+UA_PubSubConnection *
+UA_PubSubManager_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier);
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* UA_PUBSUB_MANAGER_H_ */

+ 4 - 0
src/server/ua_server.c

@@ -105,6 +105,10 @@ void UA_Server_delete(UA_Server *server) {
     UA_SessionManager_deleteMembers(&server->sessionManager);
     UA_Array_delete(server->namespaces, server->namespacesSize, &UA_TYPES[UA_TYPES_STRING]);
 
+#ifdef UA_ENABLE_PUBSUB
+    UA_PubSubManager_delete(server, &server->pubSubManager);
+#endif
+
 #ifdef UA_ENABLE_DISCOVERY
     registeredServer_list_entry *rs, *rs_tmp;
     LIST_FOREACH_SAFE(rs, &server->registeredServers, pointers, rs_tmp) {

+ 9 - 0
src/server/ua_server_internal.h

@@ -26,6 +26,10 @@ extern "C" {
 #include "ua_session_manager.h"
 #include "ua_securechannel_manager.h"
 
+#ifdef UA_ENABLE_PUBSUB
+#include "ua_pubsub_manager.h"
+#endif
+
 #ifdef UA_ENABLE_MULTITHREADING
 
 #include <pthread.h>
@@ -141,6 +145,11 @@ struct UA_Server {
      * the parent and member instantiation */
     UA_Boolean bootstrapNS0;
 
+#ifdef UA_ENABLE_PUBSUB
+    /* Publish/Subscribe toplevel container */
+    UA_PubSubManager pubSubManager;
+#endif
+
     /* Config */
     UA_ServerConfig config;
 };

+ 3 - 0
tests/CMakeLists.txt

@@ -171,6 +171,9 @@ if(UA_ENABLE_PUBSUB)
     add_executable(check_pubsub_encoding pubsub/check_pubsub_encoding.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-testplugins>)
     target_link_libraries(check_pubsub_encoding ${LIBS})
     add_test_valgrind(pubsub_encoding ${TESTS_BINARY_DIR}/check_pubsub_encoding)
+    add_executable(check_pubsub_connection_udp pubsub/check_pubsub_connection_udp.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-plugins>)
+    target_link_libraries(check_pubsub_connection_udp ${LIBS})
+    add_test(NAME check_pubsub_connection_udp COMMAND check_pubsub_connection_udp)
 endif()
 
 add_executable(check_server_readspeed server/check_server_readspeed.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-testplugins>)

+ 58 - 3
tests/check_types_memory.c

@@ -31,6 +31,39 @@
 #ifndef UA_TYPES_MONITORINGFILTERRESULT
 #define UA_TYPES_MONITORINGFILTERRESULT UA_TYPES_COUNT
 #endif
+#ifndef UA_TYPES_DATASETREADERMESSAGEDATATYPE
+#define UA_TYPES_DATASETREADERMESSAGEDATATYPE UA_TYPES_COUNT
+#endif
+#ifndef UA_TYPES_WRITERGROUPTRANSPORTDATATYPE
+#define UA_TYPES_WRITERGROUPTRANSPORTDATATYPE UA_TYPES_COUNT
+#endif
+#ifndef UA_TYPES_CONNECTIONTRANSPORTDATATYPE
+#define UA_TYPES_CONNECTIONTRANSPORTDATATYPE UA_TYPES_COUNT
+#endif
+#ifndef UA_TYPES_WRITERGROUPMESSAGEDATATYPE
+#define UA_TYPES_WRITERGROUPMESSAGEDATATYPE UA_TYPES_COUNT
+#endif
+#ifndef UA_TYPES_READERGROUPTRANSPORTDATATYPE
+#define UA_TYPES_READERGROUPTRANSPORTDATATYPE UA_TYPES_COUNT
+#endif
+#ifndef UA_TYPES_PUBLISHEDDATASETSOURCEDATATYPE
+#define UA_TYPES_PUBLISHEDDATASETSOURCEDATATYPE UA_TYPES_COUNT
+#endif
+#ifndef UA_TYPES_DATASETREADERTRANSPORTDATATYPE
+#define UA_TYPES_DATASETREADERTRANSPORTDATATYPE UA_TYPES_COUNT
+#endif
+#ifndef UA_TYPES_DATASETWRITERTRANSPORTDATATYPE
+#define UA_TYPES_DATASETWRITERTRANSPORTDATATYPE UA_TYPES_COUNT
+#endif
+#ifndef UA_TYPES_SUBSCRIBEDDATASETDATATYPE
+#define UA_TYPES_SUBSCRIBEDDATASETDATATYPE UA_TYPES_COUNT
+#endif
+#ifndef UA_TYPES_READERGROUPMESSAGEDATATYPE
+#define UA_TYPES_READERGROUPMESSAGEDATATYPE UA_TYPES_COUNT
+#endif
+#ifndef UA_TYPES_DATASETWRITERMESSAGEDATATYPE
+#define UA_TYPES_DATASETWRITERMESSAGEDATATYPE UA_TYPES_COUNT
+#endif
 
 START_TEST(newAndEmptyObjectShallBeDeleted) {
     // given
@@ -97,7 +130,7 @@ START_TEST(encodeShallYieldDecode) {
     // when
     void *obj2 = UA_new(&UA_TYPES[_i]);
     size_t offset = 0;
-    retval = UA_decodeBinary(&msg1, &offset, obj2, &UA_TYPES[_i], 0, NULL); 
+    retval = UA_decodeBinary(&msg1, &offset, obj2, &UA_TYPES[_i], 0, NULL);
     ck_assert_msg(retval == UA_STATUSCODE_GOOD, "could not decode idx=%d,nodeid=%i",
                   _i, UA_TYPES[_i].typeId.identifier.numeric);
     ck_assert(!memcmp(obj1, obj2, UA_TYPES[_i].memSize)); // bit identical decoding
@@ -131,7 +164,18 @@ START_TEST(decodeShallFailWithTruncatedBufferButSurvive) {
         _i == UA_TYPES_HISTORYREADDETAILS ||
         _i == UA_TYPES_NOTIFICATIONDATA ||
         _i == UA_TYPES_MONITORINGFILTER ||
-        _i == UA_TYPES_MONITORINGFILTERRESULT)
+        _i == UA_TYPES_MONITORINGFILTERRESULT ||
+        _i == UA_TYPES_DATASETREADERMESSAGEDATATYPE ||
+        _i == UA_TYPES_WRITERGROUPTRANSPORTDATATYPE ||
+        _i == UA_TYPES_CONNECTIONTRANSPORTDATATYPE ||
+        _i == UA_TYPES_WRITERGROUPMESSAGEDATATYPE ||
+        _i == UA_TYPES_READERGROUPTRANSPORTDATATYPE ||
+        _i == UA_TYPES_PUBLISHEDDATASETSOURCEDATATYPE ||
+        _i == UA_TYPES_DATASETREADERTRANSPORTDATATYPE ||
+        _i == UA_TYPES_DATASETWRITERTRANSPORTDATATYPE ||
+        _i == UA_TYPES_SUBSCRIBEDDATASETDATATYPE ||
+        _i == UA_TYPES_READERGROUPMESSAGEDATATYPE ||
+        _i == UA_TYPES_DATASETWRITERMESSAGEDATATYPE)
         return;
     // given
     UA_ByteString msg1;
@@ -242,7 +286,18 @@ START_TEST(calcSizeBinaryShallBeCorrect) {
        _i == UA_TYPES_HISTORYREADDETAILS ||
        _i == UA_TYPES_NOTIFICATIONDATA ||
        _i == UA_TYPES_MONITORINGFILTER ||
-       _i == UA_TYPES_MONITORINGFILTERRESULT)
+        _i == UA_TYPES_MONITORINGFILTERRESULT ||
+        _i == UA_TYPES_DATASETREADERMESSAGEDATATYPE ||
+        _i == UA_TYPES_WRITERGROUPTRANSPORTDATATYPE ||
+        _i == UA_TYPES_CONNECTIONTRANSPORTDATATYPE ||
+        _i == UA_TYPES_WRITERGROUPMESSAGEDATATYPE ||
+        _i == UA_TYPES_READERGROUPTRANSPORTDATATYPE ||
+        _i == UA_TYPES_PUBLISHEDDATASETSOURCEDATATYPE ||
+        _i == UA_TYPES_DATASETREADERTRANSPORTDATATYPE ||
+        _i == UA_TYPES_DATASETWRITERTRANSPORTDATATYPE ||
+        _i == UA_TYPES_SUBSCRIBEDDATASETDATATYPE ||
+        _i == UA_TYPES_READERGROUPMESSAGEDATATYPE ||
+        _i == UA_TYPES_DATASETWRITERMESSAGEDATATYPE)
         return;
     void *obj = UA_new(&UA_TYPES[_i]);
     size_t predicted_size = UA_calcSizeBinary(obj, &UA_TYPES[_i]);

+ 220 - 0
tests/pubsub/check_pubsub_connection_udp.c

@@ -0,0 +1,220 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2017 - 2018 Fraunhofer IOSB (Author: Andreas Ebner)
+ */
+
+#include <ua_server_pubsub.h>
+#include <src_generated/ua_types_generated_encoding_binary.h>
+#include "ua_config_default.h"
+#include "ua_network_pubsub_udp.h"
+#include "ua_server_internal.h"
+#include "check.h"
+
+UA_Server *server = NULL;
+UA_ServerConfig *config = NULL;
+
+static void setup(void) {
+    config = UA_ServerConfig_new_default();
+    config->pubsubTransportLayers = (UA_PubSubTransportLayer *) UA_malloc(sizeof(UA_PubSubTransportLayer));
+    if(!config->pubsubTransportLayers) {
+        UA_ServerConfig_delete(config);
+    }
+    config->pubsubTransportLayers[0] = UA_PubSubTransportLayerUDPMP();
+    config->pubsubTransportLayersSize++;
+    server = UA_Server_new(config);
+    UA_Server_run_startup(server);
+}
+
+static void teardown(void) {
+    UA_Server_run_shutdown(server);
+    UA_Server_delete(server);
+    UA_ServerConfig_delete(config);
+}
+
+START_TEST(AddConnectionsWithMinimalValidConfiguration){
+    UA_StatusCode retVal;
+    UA_PubSubConnectionConfig connectionConfig;
+    memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
+    connectionConfig.name = UA_STRING("UADP Connection");
+    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL, UA_STRING("opc.udp://224.0.0.22:4840/")};
+    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
+                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
+    retVal = UA_Server_addPubSubConnection(server, &connectionConfig, NULL);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    ck_assert(server->pubSubManager.connections[0].channel != NULL);
+    retVal = UA_Server_addPubSubConnection(server, &connectionConfig, NULL);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    ck_assert(server->pubSubManager.connections[1].channel != NULL);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 2);
+} END_TEST
+
+START_TEST(AddRemoveAddConnectionWithMinimalValidConfiguration){
+        UA_StatusCode retVal;
+        UA_PubSubConnectionConfig connectionConfig;
+        memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
+        connectionConfig.name = UA_STRING("UADP Connection");
+        UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL, UA_STRING("opc.udp://224.0.0.22:4840/")};
+        UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
+                             &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+        connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
+        UA_NodeId connectionIdent;
+        retVal = UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
+        ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        ck_assert(server->pubSubManager.connections[0].channel != NULL);
+        retVal |= UA_Server_removePubSubConnection(server, connectionIdent);
+        ck_assert_int_eq(server->pubSubManager.connectionsSize, 0);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+        retVal = UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
+        ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
+        ck_assert(server->pubSubManager.connections[0].channel != NULL);
+        ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+} END_TEST
+
+START_TEST(AddConnectionWithInvalidAddress){
+    UA_StatusCode retVal;
+    UA_PubSubConnectionConfig connectionConfig;
+    memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
+    connectionConfig.name = UA_STRING("UADP Connection");
+    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL, UA_STRING("opc.udp://256.0.0.22:4840/")};
+    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
+                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
+    retVal = UA_Server_addPubSubConnection(server, &connectionConfig, NULL);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 0);
+    ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+    retVal = UA_Server_addPubSubConnection(server, &connectionConfig, NULL);
+    ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 0);
+} END_TEST
+
+START_TEST(AddConnectionWithUnknownTransportURL){
+        UA_StatusCode retVal;
+        UA_PubSubConnectionConfig connectionConfig;
+        memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
+        connectionConfig.name = UA_STRING("UADP Connection");
+        UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL, UA_STRING("opc.udp://224.0.0.22:4840/")};
+        UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
+                             &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+        connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/unknown-udp-uadp");
+        UA_NodeId connectionIdent;
+        retVal = UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
+        ck_assert_int_eq(server->pubSubManager.connectionsSize, 0);
+        ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+} END_TEST
+
+START_TEST(AddConnectionWithNullConfig){
+        UA_StatusCode retVal;
+        retVal = UA_Server_addPubSubConnection(server, NULL, NULL);
+        ck_assert_int_eq(server->pubSubManager.connectionsSize, 0);
+        ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+    } END_TEST
+
+START_TEST(AddSingleConnectionWithMaximalConfiguration){
+    UA_NetworkAddressUrlDataType networkAddressUrlData = {UA_STRING("127.0.0.1"), UA_STRING("opc.udp://224.0.0.22:4840/")};
+    UA_Variant address;
+    UA_Variant_setScalar(&address, &networkAddressUrlData, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    UA_KeyValuePair connectionOptions[3];
+    connectionOptions[0].key = UA_QUALIFIEDNAME(0, "ttl");
+    UA_UInt32 ttl = 10;
+    UA_Variant_setScalar(&connectionOptions[0].value, &ttl, &UA_TYPES[UA_TYPES_UINT32]);
+    connectionOptions[1].key = UA_QUALIFIEDNAME(0, "loopback");
+    UA_Boolean loopback = UA_FALSE;
+    UA_Variant_setScalar(&connectionOptions[1].value, &loopback, &UA_TYPES[UA_TYPES_UINT32]);
+    connectionOptions[2].key = UA_QUALIFIEDNAME(0, "reuse");
+    UA_Boolean reuse = UA_TRUE;
+    UA_Variant_setScalar(&connectionOptions[2].value, &reuse, &UA_TYPES[UA_TYPES_UINT32]);
+
+    UA_PubSubConnectionConfig connectionConf;
+    memset(&connectionConf, 0, sizeof(UA_PubSubConnectionConfig));
+    connectionConf.name = UA_STRING("UADP Connection");
+    connectionConf.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
+    connectionConf.enabled = true;
+    connectionConf.publisherId.numeric = 223344;
+    connectionConf.connectionPropertiesSize = 3;
+    connectionConf.connectionProperties = connectionOptions;
+    connectionConf.address = address;
+    UA_NodeId connection;
+    UA_StatusCode retVal = UA_Server_addPubSubConnection(server, &connectionConf, &connection);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    ck_assert(server->pubSubManager.connections[0].channel != NULL);
+} END_TEST
+
+START_TEST(GetMaximalConnectionConfigurationAndCompareValues){
+    UA_NetworkAddressUrlDataType networkAddressUrlData = {UA_STRING("127.0.0.1"), UA_STRING("opc.udp://224.0.0.22:4840/")};
+    UA_Variant address;
+    UA_Variant_setScalar(&address, &networkAddressUrlData, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    UA_KeyValuePair connectionOptions[3];
+    connectionOptions[0].key = UA_QUALIFIEDNAME(0, "ttl");
+    UA_UInt32 ttl = 10;
+    UA_Variant_setScalar(&connectionOptions[0].value, &ttl, &UA_TYPES[UA_TYPES_UINT32]);
+    connectionOptions[1].key = UA_QUALIFIEDNAME(0, "loopback");
+    UA_Boolean loopback = UA_FALSE;
+    UA_Variant_setScalar(&connectionOptions[1].value, &loopback, &UA_TYPES[UA_TYPES_UINT32]);
+    connectionOptions[2].key = UA_QUALIFIEDNAME(0, "reuse");
+    UA_Boolean reuse = UA_TRUE;
+    UA_Variant_setScalar(&connectionOptions[2].value, &reuse, &UA_TYPES[UA_TYPES_UINT32]);
+
+    UA_PubSubConnectionConfig connectionConf;
+    memset(&connectionConf, 0, sizeof(UA_PubSubConnectionConfig));
+    connectionConf.name = UA_STRING("UADP Connection");
+    connectionConf.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
+    connectionConf.enabled = true;
+    connectionConf.publisherId.numeric = 223344;
+    connectionConf.connectionPropertiesSize = 3;
+    connectionConf.connectionProperties = connectionOptions;
+    connectionConf.address = address;
+    UA_NodeId connection;
+    UA_StatusCode retVal = UA_Server_addPubSubConnection(server, &connectionConf, &connection);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    UA_PubSubConnectionConfig connectionConfig;
+    memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
+    retVal |= UA_PubSubConnection_getConfig(server, connection, &connectionConfig);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    ck_assert(connectionConfig.connectionPropertiesSize == connectionConf.connectionPropertiesSize);
+    ck_assert(UA_String_equal(&connectionConfig.name, &connectionConf.name) == UA_TRUE);
+    ck_assert(UA_String_equal(&connectionConfig.transportProfileUri, &connectionConf.transportProfileUri) == UA_TRUE);
+    UA_NetworkAddressUrlDataType networkAddressUrlDataCopy = *((UA_NetworkAddressUrlDataType *)connectionConfig.address.data);
+    ck_assert(UA_NetworkAddressUrlDataType_calcSizeBinary(&networkAddressUrlDataCopy) == UA_NetworkAddressUrlDataType_calcSizeBinary(&networkAddressUrlData));
+    for(size_t i = 0; i < connectionConfig.connectionPropertiesSize; i++){
+        ck_assert(UA_String_equal(&connectionConfig.connectionProperties[i].key.name, &connectionConf.connectionProperties[i].key.name) == UA_TRUE);
+        ck_assert(UA_Variant_calcSizeBinary(&connectionConfig.connectionProperties[i].value) == UA_Variant_calcSizeBinary(&connectionConf.connectionProperties[i].value));
+    }
+    UA_PubSubConnectionConfig_deleteMembers(&connectionConfig);
+    } END_TEST
+
+int main(void) {
+    TCase *tc_add_pubsub_connections_minimal_config = tcase_create("Create PubSub UDP Connections with minimal valid config");
+    tcase_add_checked_fixture(tc_add_pubsub_connections_minimal_config, setup, teardown);
+    tcase_add_test(tc_add_pubsub_connections_minimal_config, AddConnectionsWithMinimalValidConfiguration);
+    tcase_add_test(tc_add_pubsub_connections_minimal_config, AddRemoveAddConnectionWithMinimalValidConfiguration);
+
+    TCase *tc_add_pubsub_connections_invalid_config = tcase_create("Create PubSub UDP Connections with invalid configurations");
+    tcase_add_checked_fixture(tc_add_pubsub_connections_invalid_config, setup, teardown);
+    tcase_add_test(tc_add_pubsub_connections_invalid_config, AddConnectionWithInvalidAddress);
+    tcase_add_test(tc_add_pubsub_connections_invalid_config, AddConnectionWithUnknownTransportURL);
+    tcase_add_test(tc_add_pubsub_connections_invalid_config, AddConnectionWithNullConfig);
+
+    TCase *tc_add_pubsub_connections_maximal_config = tcase_create("Create PubSub UDP Connections with maximal valid config");
+    tcase_add_checked_fixture(tc_add_pubsub_connections_maximal_config, setup, teardown);
+    tcase_add_test(tc_add_pubsub_connections_maximal_config, AddSingleConnectionWithMaximalConfiguration);
+    tcase_add_test(tc_add_pubsub_connections_maximal_config, GetMaximalConnectionConfigurationAndCompareValues);
+
+    Suite *s = suite_create("PubSub UDP connection creation");
+    suite_add_tcase(s, tc_add_pubsub_connections_minimal_config);
+    suite_add_tcase(s, tc_add_pubsub_connections_invalid_config);
+    suite_add_tcase(s, tc_add_pubsub_connections_maximal_config);
+    //suite_add_tcase(s, tc_decode);
+
+    SRunner *sr = srunner_create(s);
+    srunner_set_fork_status(sr, CK_NOFORK);
+    srunner_run_all(sr,CK_NORMAL);
+    int number_failed = srunner_ntests_failed(sr);
+    srunner_free(sr);
+    return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}

+ 5 - 0
tools/schema/Opc.Ua.Types.bsd

@@ -2703,4 +2703,9 @@
     <opc:EnumeratedValue Name="Unknown" Value="4" />
   </opc:EnumeratedType>
 
+  <opc:StructuredType Name="NetworkAddressUrlDataType" BaseType="tns:NetworkAddressDataType">
+    <opc:Field Name="NetworkInterface" TypeName="opc:String" SourceType="tns:NetworkAddressDataType" />
+    <opc:Field Name="Url" TypeName="opc:String" />
+  </opc:StructuredType>
+
 </opc:TypeDictionary>

+ 2 - 0
tools/schema/datatypes_minimal.txt

@@ -201,3 +201,5 @@ AggregateConfiguration
 AggregateFilter
 SetTriggeringRequest
 SetTriggeringResponse
+KeyValuePair
+NetworkAddressUrlDataType