Selaa lähdekoodia

feat(pubsub): Add MQTT support

 - Rebase to Master branch
 - Modify Server APIs un tutorial_pubsub_mqtt.c
 - TODO:Resolve unit test error in check_pubsub_connection_mqtt.c
Lukas M 6 vuotta sitten
vanhempi
commit
7ff1fe598f

+ 18 - 0
CMakeLists.txt

@@ -251,6 +251,14 @@ endif()
 option(UA_ENABLE_JSON_ENCODING "Enable Json encoding (EXPERIMENTAL)" OFF)
 mark_as_advanced(UA_ENABLE_JSON_ENCODING)
 
+option(UA_ENABLE_PUBSUB_MQTT "Enable publish/subscribe with mqtt (experimental)" OFF)
+mark_as_advanced(UA_ENABLE_PUBSUB_MQTT)
+if(UA_ENABLE_PUBSUB_MQTT)
+    if(NOT UA_ENABLE_PUBSUB)
+    message(FATAL_ERROR "Mqtt cannot be used with disabled PubSub function.")
+    endif()
+endif()
+
 option(UA_ENABLE_STATUSCODE_DESCRIPTIONS "Enable conversion of StatusCode to human-readable error message" ON)
 mark_as_advanced(UA_ENABLE_STATUSCODE_DESCRIPTIONS)
 
@@ -723,6 +731,16 @@ if(UA_ENABLE_PUBSUB)
         list(APPEND default_plugin_headers ${PROJECT_SOURCE_DIR}/plugins/include/open62541/plugin/pubsub_ethernet.h)
         list(APPEND default_plugin_sources ${PROJECT_SOURCE_DIR}/plugins/ua_pubsub_ethernet.c)
     endif()
+    if(UA_ENABLE_PUBSUB_MQTT)
+        list(APPEND default_plugin_headers ${PROJECT_SOURCE_DIR}/deps/mqtt-c/mqtt_pal.h)
+        list(APPEND default_plugin_sources ${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt_pal.c)
+        list(APPEND default_plugin_headers ${PROJECT_SOURCE_DIR}/deps/mqtt-c/mqtt.h)
+        list(APPEND default_plugin_sources ${PROJECT_SOURCE_DIR}/deps/mqtt-c/mqtt.c)
+        list(APPEND default_plugin_headers ${PROJECT_SOURCE_DIR}/plugins/ua_network_pubsub_mqtt.h)
+        list(APPEND default_plugin_sources ${PROJECT_SOURCE_DIR}/plugins/ua_network_pubsub_mqtt.c)  
+        list(APPEND default_plugin_headers ${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt_adapter.h)                   
+        list(APPEND default_plugin_sources ${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt_adapter.c)
+    endif()
 endif()
 
 if(UA_ENABLE_JSON_ENCODING)

+ 6 - 0
examples/CMakeLists.txt

@@ -200,4 +200,10 @@ if(UA_ENABLE_PUBSUB)
         add_example(tutorial_pubsub_subscribe pubsub/tutorial_pubsub_subscribe.c)
         add_example(pubsub_subscribe_standalone pubsub/pubsub_subscribe_standalone.c)
     endif()
+    if(UA_ENABLE_PUBSUB_MQTT)
+        if(NOT WIN32)
+            include_directories(${PROJECT_SOURCE_DIR}/../plugins)
+            add_example(tutorial_pubsub_mqtt pubsub/tutorial_pubsub_mqtt.c)
+        endif()
+    endif()
 endif()

+ 310 - 0
examples/pubsub/tutorial_pubsub_mqtt.c

@@ -0,0 +1,310 @@
+/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
+ * See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */
+
+/**
+ * .. _pubsub-tutorial:
+ *
+ * Working with Publish/Subscribe
+ * ------------------------------
+ *
+ * Work in progress:
+ * This Tutorial will be continuously extended during the next PubSub batches. More details about
+ * the PubSub extension and corresponding open62541 API are located here: :ref:`pubsub`.
+ *
+ * Publishing Fields
+ * ^^^^^^^^^^^^^^^^^
+ * The PubSub mqtt publish subscribe example demonstrate the simplest way to publish
+ * informations from the information model over MQTT using
+ * the UADP (or later JSON) encoding.
+ * To receive information the subscribe functionality of mqtt is used.
+ * A periodical call to yield is necessary to update the mqtt stack.
+ *
+ * **Connection handling**
+ * PubSubConnections can be created and deleted on runtime. More details about the system preconfiguration and
+ * connection can be found in ``tutorial_pubsub_connection.c``.
+ */
+
+/* server */
+#include "open62541/server.h"
+#include "open62541/server_config_default.h"
+
+/* PubSubConnection */
+#include "ua_pubsub.h"
+
+/* UA_PubSubTransportLayerMQTT */
+#include "ua_network_pubsub_mqtt.h"
+/* Logging */
+#include "open62541/plugin/log_stdout.h"
+
+/* uncomment to use json encoding */
+//#define USE_JSON_ENCODING
+
+/* global ids */
+static UA_NodeId connectionIdent, publishedDataSetIdent, writerGroupIdent;
+
+static void
+addPubSubConnection(UA_Server *server){
+    /* Details about the connection configuration and handling are located
+     * in the pubsub connection tutorial */
+    UA_PubSubConnectionConfig connectionConfig;
+    memset(&connectionConfig, 0, sizeof(connectionConfig));
+    connectionConfig.name = UA_STRING("MQTT Connection 1");
+    connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt");
+    connectionConfig.enabled = UA_TRUE;
+
+    /* configure address of the mqtt broker (local on default port) */
+    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING("opc.mqtt://127.0.0.1:1883/")};
+    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    connectionConfig.publisherId.numeric = UA_UInt32_random();
+    
+    /* configure options, set mqtt client id */
+    UA_KeyValuePair connectionOptions[1];
+    connectionOptions[0].key = UA_QUALIFIEDNAME(0, "mqttClientId");
+    UA_String mqttClientId = UA_STRING("TESTCLIENTPUBSUBMQTT");
+    UA_Variant_setScalar(&connectionOptions[0].value, &mqttClientId, &UA_TYPES[UA_TYPES_STRING]);
+    connectionConfig.connectionProperties = connectionOptions;
+    connectionConfig.connectionPropertiesSize = 1;
+    
+    UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
+}
+
+/**
+ * **PublishedDataSet handling**
+ * The PublishedDataSet (PDS) and PubSubConnection are the toplevel entities and can exist alone. The PDS contains
+ * the collection of the published fields.
+ * All other PubSub elements are directly or indirectly linked with the PDS or connection.
+ */
+static void
+addPublishedDataSet(UA_Server *server) {
+    /* The PublishedDataSetConfig contains all necessary public
+    * informations for the creation of a new PublishedDataSet */
+    UA_PublishedDataSetConfig publishedDataSetConfig;
+    memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
+    publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
+    publishedDataSetConfig.name = UA_STRING("Demo PDS");
+    /* Create new PublishedDataSet based on the PublishedDataSetConfig. */
+    UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
+}
+
+static void
+addVariable(UA_Server *server) {
+    // Define the attribute of the myInteger variable node 
+    UA_VariableAttributes attr = UA_VariableAttributes_default;
+    UA_Int32 myInteger = 42;
+    UA_Variant_setScalar(&attr.value, &myInteger, &UA_TYPES[UA_TYPES_INT32]);
+    attr.description = UA_LOCALIZEDTEXT("en-US","the answer");
+    attr.displayName = UA_LOCALIZEDTEXT("en-US","the answer");
+    attr.dataType = UA_TYPES[UA_TYPES_INT32].typeId;
+    attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
+
+    // Add the variable node to the information model 
+    UA_Server_addVariableNode(server, UA_NODEID_NUMERIC(1, 42), UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
+                              UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES), UA_QUALIFIEDNAME(1, "the answer"),
+                              UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), attr, NULL, NULL);
+}
+
+/**
+ * **DataSetField handling**
+ * The DataSetField (DSF) is part of the PDS and describes exactly one published field.
+ */
+static void
+addDataSetField(UA_Server *server) {
+    /* Add a field to the previous created PublishedDataSet */
+    UA_DataSetFieldConfig dataSetFieldConfig;
+    memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
+    dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
+
+    dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("Server localtime");
+    dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
+    dataSetFieldConfig.field.variable.publishParameters.publishedVariable =
+    UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
+    dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
+    UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig, NULL);
+    
+    UA_DataSetFieldConfig dataSetFieldConfig2;
+    memset(&dataSetFieldConfig2, 0, sizeof(UA_DataSetFieldConfig));
+    dataSetFieldConfig2.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
+    dataSetFieldConfig2.field.variable.fieldNameAlias = UA_STRING("Test");
+    dataSetFieldConfig2.field.variable.promotedField = UA_FALSE;
+    dataSetFieldConfig2.field.variable.publishParameters.publishedVariable = UA_NODEID_NUMERIC(1, 42);
+    dataSetFieldConfig2.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
+    UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig2, NULL);
+}
+
+/**
+ * **WriterGroup handling**
+ * The WriterGroup (WG) is part of the connection and contains the primary configuration
+ * parameters for the message creation.
+ */
+static void
+addWriterGroup(UA_Server *server) {
+    /* Now we create a new WriterGroupConfig and add the group to the existing PubSubConnection. */
+    UA_WriterGroupConfig writerGroupConfig;
+    memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
+    writerGroupConfig.name = UA_STRING("Demo WriterGroup");
+    writerGroupConfig.publishingInterval = 500;
+    writerGroupConfig.enabled = UA_FALSE;
+    writerGroupConfig.writerGroupId = 100;
+    
+    /* decide whether to use JSON or UADP encoding*/
+#if defined(USE_JSON_ENCODING) && defined(UA_ENABLE_JSON_ENCODING)
+        writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;
+#else
+        writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
+#endif
+
+    /* configure the mqtt publish topic */
+    UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
+    memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
+    brokerTransportSettings.queueName = UA_STRING("customTopic");
+    brokerTransportSettings.resourceUri = UA_STRING_NULL;
+    brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
+    
+    /* Choose the QOS Level for MQTT */
+    brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;
+    
+    /* Encapsulate config in transportSettings */
+    UA_ExtensionObject transportSettings;
+    memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
+    transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
+    transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE];
+    transportSettings.content.decoded.data = &brokerTransportSettings;
+    
+    writerGroupConfig.transportSettings = transportSettings;
+    UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent);
+}
+
+/**
+ * **DataSetWriter handling**
+ * A DataSetWriter (DSW) is the glue between the WG and the PDS. The DSW is linked to exactly one
+ * PDS and contains additional informations for the message generation.
+ */
+static void
+addDataSetWriter(UA_Server *server) {
+    /* We need now a DataSetWriter within the WriterGroup. This means we must
+     * create a new DataSetWriterConfig and add call the addWriterGroup function. */
+    UA_NodeId dataSetWriterIdent;
+    UA_DataSetWriterConfig dataSetWriterConfig;
+    memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
+    dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
+    dataSetWriterConfig.dataSetWriterId = 62541;
+    dataSetWriterConfig.keyFrameCount = 10;
+
+#if defined(USE_JSON_ENCODING) && defined(UA_ENABLE_JSON_ENCODING)
+        /* JSON config for the dataSetWriter */
+        UA_JsonDataSetWriterMessageDataType jsonDswMd;
+        jsonDswMd.dataSetMessageContentMask = (UA_JsonDataSetMessageContentMask)
+                (UA_JSONDATASETMESSAGECONTENTMASK_DATASETWRITERID
+                | UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER
+                | UA_JSONDATASETMESSAGECONTENTMASK_STATUS
+                | UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION
+                | UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP);
+
+        UA_ExtensionObject messageSettings;
+        messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
+        messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE];
+        messageSettings.content.decoded.data = &jsonDswMd;
+
+
+        dataSetWriterConfig.messageSettings = messageSettings;
+#endif
+    
+    UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
+                               &dataSetWriterConfig, &dataSetWriterIdent);
+}
+
+/**
+ * That's it! You're now publishing the selected fields.
+ * Open a packet inspection tool of trust e.g. wireshark and take a look on the outgoing packages.
+ * The following graphic figures out the packages created by this tutorial.
+ *
+ * .. figure:: ua-wireshark-pubsub.png
+ *     :figwidth: 100 %
+ *     :alt: OPC UA PubSub communication in wireshark
+ *
+ * The open62541 subscriber API will be released later. If you want to process the the datagrams,
+ * take a look on the ua_network_pubsub_networkmessage.c which already contains the decoding code for UADP messages.
+ *
+ * It follows the main server code, making use of the above definitions. */
+UA_Boolean running = true;
+static void stopHandler(int sign) {
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
+    running = false;
+}
+
+static void callback(UA_ByteString *encodedBuffer, UA_ByteString *topic){
+     UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSub MQTT: Message Received");
+     
+     /* For example try to decode as a Json Networkmessage...
+     UA_NetworkMessage dst;
+     UA_StatusCode ret = UA_NetworkMessage_decodeJson(&dst, encodedBuffer);
+     if( ret == UA_STATUSCODE_GOOD){
+
+     }
+      */
+     
+     UA_ByteString_delete(encodedBuffer);
+     UA_ByteString_delete(topic);
+     
+     //UA_NetworkMessage_deleteMembers(&dst);
+}
+
+/* Adds a subscription */
+static void 
+addSubscription(UA_Server *server, UA_PubSubConnection *connection){
+    
+    /* transportSettings for subscription! */
+    UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
+    memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
+    brokerTransportSettings.queueName = UA_STRING("customTopic");
+    brokerTransportSettings.resourceUri = UA_STRING_NULL;
+    brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
+    
+    /* QOS */
+    brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;
+
+    UA_ExtensionObject transportSettings;
+    memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
+    transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
+    transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE];
+    transportSettings.content.decoded.data = &brokerTransportSettings;
+
+    connection->channel->regist(connection->channel, &transportSettings, &callback);
+    return; 
+}
+
+int main(void) {
+    signal(SIGINT, stopHandler);
+    signal(SIGTERM, stopHandler);
+
+    UA_StatusCode retval = UA_STATUSCODE_GOOD;
+	UA_Server *server = UA_Server_new();
+    UA_ServerConfig *config = UA_Server_getConfig(server);
+	UA_ServerConfig_setDefault(config); 
+    /* Details about the connection configuration and handling are located in the pubsub connection tutorial */
+    config->pubsubTransportLayers = (UA_PubSubTransportLayer *) UA_malloc(1  * sizeof(UA_PubSubTransportLayer));
+    if(!config->pubsubTransportLayers) {
+        return -1;
+    }
+    
+    config->pubsubTransportLayers[0] = UA_PubSubTransportLayerMQTT();
+    config->pubsubTransportLayersSize++;
+    addVariable(server);
+    addPubSubConnection(server);
+    addPublishedDataSet(server);
+    addDataSetField(server);
+    addWriterGroup(server);
+    addDataSetWriter(server);
+
+    UA_PubSubConnection *connection =
+        UA_PubSubConnection_findConnectionbyId(server, connectionIdent);
+    
+    if(connection != NULL) {
+        addSubscription(server, connection);
+    }
+    
+    retval |= UA_Server_run(server, &running);
+    UA_Server_delete(server);
+    return (int)retval;
+}
+

+ 1 - 0
include/open62541/config.h.in

@@ -38,6 +38,7 @@
 #cmakedefine UA_ENABLE_EXPERIMENTAL_HISTORIZING
 #cmakedefine UA_ENABLE_SUBSCRIPTIONS_EVENTS
 #cmakedefine UA_ENABLE_JSON_ENCODING
+#cmakedefine UA_ENABLE_PUBSUB_MQTT
 
 /* Multithreading */
 #cmakedefine UA_ENABLE_IMMUTABLE_NODES

+ 322 - 0
plugins/mqtt/ua_mqtt_adapter.c

@@ -0,0 +1,322 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2018 Fraunhofer IOSB (Author: Lukas Meling)
+ */
+
+#include "ua_mqtt_adapter.h"
+#include "../../deps/mqtt-c/mqtt.h"
+#include "open62541/plugin/log_stdout.h"
+#include "open62541/util.h"
+
+/* forward decl for callback */
+void
+publish_callback(void**, struct mqtt_response_publish*);
+
+UA_StatusCode
+connectMqtt(UA_PubSubChannelDataMQTT* channelData){
+    if(channelData == NULL){
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+    }
+
+    /* Get address and replace mqtt with tcp
+     * because we use a tcp UA_ClientConnectionTCP for mqtt */
+    UA_NetworkAddressUrlDataType address = channelData->address;
+
+    UA_String hostname, path;
+    UA_UInt16 networkPort;
+    if(UA_parseEndpointUrl(&address.url, &hostname, &networkPort, &path) != UA_STATUSCODE_GOOD){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
+                     "MQTT PubSub Connection creation failed. Invalid URL.");
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+    }
+
+    /* Build the url, replace mqtt with tcp */
+    UA_STACKARRAY(UA_Byte, addressAsChar, 10 + (sizeof(char) * path.length));
+    memcpy((char*)addressAsChar, "opc.tcp://", 10);
+    memcpy((char*)&addressAsChar[10],(char*)path.data, path.length);
+    address.url.data = addressAsChar;
+    address.url.length = 10 + (sizeof(char) * path.length);
+
+    /* check if buffers are correct */
+    if(!(channelData->mqttRecvBufferSize > 0 && channelData->mqttRecvBuffer != NULL
+            && channelData->mqttSendBufferSize > 0 && channelData->mqttSendBuffer != NULL)){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "MQTT PubSub Connection creation failed. No Mqtt buffer allocated.");
+        return UA_STATUSCODE_BADARGUMENTSMISSING;
+    }
+
+    /* Config with default parameters */
+    UA_ConnectionConfig conf;
+    memset(&conf, 0, sizeof(UA_ConnectionConfig));
+    conf.protocolVersion = 0;
+    conf.sendBufferSize = 1000;
+    conf.recvBufferSize = 2000;
+    conf.maxMessageSize = 1000;
+    conf.maxChunkCount = 1;
+
+    /* Create TCP connection: open the blocking TCP socket (connecting to the broker) */
+    UA_Connection connection = UA_ClientConnectionTCP( conf, address.url, 1000, NULL);
+    if(connection.state != UA_CONNECTION_ESTABLISHED && connection.state != UA_CONNECTION_OPENING){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: Connection creation failed. Tcp connection failed!");
+        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    }
+
+    /* Set socket to nonblocking!*/
+    UA_socket_set_nonblocking(connection.sockfd);
+
+    /* save connection */
+    channelData->connection = (UA_Connection*)UA_calloc(1, sizeof(UA_Connection));
+    if(!channelData->connection){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    }
+
+    memcpy(channelData->connection, &connection, sizeof(UA_Connection));
+
+    /* calloc mqtt_client */
+    struct mqtt_client* client = (struct mqtt_client*)UA_calloc(1, sizeof(struct mqtt_client));
+    if(!client){
+        UA_free(channelData->connection);
+
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    }
+
+    /* save reference */
+    channelData->mqttClient = client;
+
+    /* create custom sockethandle */
+    struct my_custom_socket_handle* handle =
+        (struct my_custom_socket_handle*)UA_calloc(1, sizeof(struct my_custom_socket_handle));
+    if(!handle){
+        UA_free(channelData->connection);
+        UA_free(client);
+
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    }
+    handle->client = client;
+    handle->connection = channelData->connection;
+
+    /* init mqtt client struct with buffers and callback */
+    enum MQTTErrors mqttErr = mqtt_init(client, handle, channelData->mqttSendBuffer, channelData->mqttSendBufferSize,
+                channelData->mqttRecvBuffer, channelData->mqttRecvBufferSize, publish_callback);
+    if(mqttErr != MQTT_OK){
+        UA_free(channelData->connection);
+        UA_free(client);
+
+        const char* errorStr = mqtt_error_str(mqttErr);
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. MQTT error: %s", errorStr);
+        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    }
+
+    /* Init custom data for subscribe callback function:
+     * A reference to the channeldata will be available in the callback.
+     * This is used to call the user callback channelData.callback */
+    client->publish_response_callback_state = channelData;
+
+    /* Convert clientId UA_String to char* null terminated */
+    char* clientId = (char*)calloc(1,channelData->mqttClientId->length + 1);
+    if(!clientId){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
+        UA_free(channelData->connection);
+        UA_free(client);
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    }
+    memcpy(clientId, channelData->mqttClientId->data, channelData->mqttClientId->length);
+
+    /* Connect mqtt with socket fd of networktcp  */
+    mqttErr = mqtt_connect(client, clientId, NULL, NULL, 0, NULL, NULL, 0, 400);
+    UA_free(clientId);
+    if(mqttErr != MQTT_OK){
+        UA_free(channelData->connection);
+        UA_free(client);
+
+        const char* errorStr = mqtt_error_str(mqttErr);
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "%s", errorStr);
+        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    }
+
+    /* sync the first mqtt packets in the buffer to send connection request.
+       After that yield must be called frequently to exchange mqtt messages. */
+    UA_StatusCode ret = yieldMqtt(channelData, 100);
+    if(ret != UA_STATUSCODE_GOOD){
+        UA_free(channelData->connection);
+        UA_free(client);
+        return ret;
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+UA_StatusCode
+disconnectMqtt(UA_PubSubChannelDataMQTT* channelData){
+    if(channelData == NULL){
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+    }
+    channelData->callback = NULL;
+    struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
+    if(client){
+        mqtt_disconnect(client);
+        yieldMqtt(channelData, 10);
+        UA_free(client->socketfd);
+    }
+    if(channelData->connection != NULL){
+        channelData->connection->close(channelData->connection);
+        channelData->connection->free(channelData->connection);
+        UA_free(channelData->connection);
+        channelData->connection = NULL;
+    }
+    UA_free(channelData->mqttRecvBuffer);
+    channelData->mqttRecvBuffer = NULL;
+    UA_free(channelData->mqttSendBuffer);
+    channelData->mqttSendBuffer = NULL;
+    UA_free(channelData->mqttClient);
+    channelData->mqttClient = NULL;
+    return UA_STATUSCODE_GOOD;
+}
+
+void
+publish_callback(void** channelDataPtr, struct mqtt_response_publish *published)
+{
+    if(channelDataPtr != NULL){
+        UA_PubSubChannelDataMQTT *channelData = (UA_PubSubChannelDataMQTT*)*channelDataPtr;
+        if(channelData != NULL){
+            if(channelData->callback != NULL){
+                //Setup topic
+                UA_ByteString *topic = UA_ByteString_new();
+                if(!topic) return;
+                UA_ByteString *msg = UA_ByteString_new();  
+                if(!msg) return;
+                
+                /* memory for topic */
+                UA_StatusCode ret = UA_ByteString_allocBuffer(topic, published->topic_name_size);
+                if(ret){
+                    UA_free(topic);
+                    UA_free(msg);
+                    return;
+                }
+                /* memory for message */
+                ret = UA_ByteString_allocBuffer(msg, published->application_message_size);
+                if(ret){
+                    UA_ByteString_delete(topic);
+                    UA_free(msg);
+                    return;
+                }
+                /* copy topic and msg, call the cb */
+                memcpy(topic->data, published->topic_name, published->topic_name_size);
+                memcpy(msg->data, published->application_message, published->application_message_size);
+                channelData->callback(msg, topic);
+            }
+        }
+    }  
+}
+
+UA_StatusCode
+subscribeMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic, UA_Byte qos){
+    if(channelData == NULL || topic.length == 0){
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+    }
+    struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
+    
+    UA_STACKARRAY(char, topicStr, sizeof(char) * topic.length +1);
+    memcpy(topicStr, topic.data, topic.length);
+    topicStr[topic.length] = '\0';
+
+    enum MQTTErrors mqttErr = mqtt_subscribe(client, topicStr, (UA_Byte) qos);
+    if(mqttErr != MQTT_OK){
+        const char* errorStr = mqtt_error_str(mqttErr);
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: subscribe: %s", errorStr);
+        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+UA_StatusCode
+unSubscribeMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic){
+    return UA_STATUSCODE_BADNOTIMPLEMENTED;
+}
+
+UA_StatusCode
+yieldMqtt(UA_PubSubChannelDataMQTT* channelData, UA_UInt16 timeout){
+    if(channelData == NULL || timeout == 0){
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+    }
+
+    UA_Connection *connection = channelData->connection;
+    if(connection == NULL){
+        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    }
+    
+    if(connection->state != UA_CONNECTION_ESTABLISHED && connection->state != UA_CONNECTION_OPENING){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: yield: Tcp Connection not established!");
+        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    }
+    
+    struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
+    client->socketfd->timeout = timeout;
+
+    enum MQTTErrors error = mqtt_sync(client);
+    if(error == MQTT_OK){
+        return UA_STATUSCODE_GOOD;
+    }else if(error == -1){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: yield: Communication Error.");
+        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    }
+    
+    /* map mqtt errors to ua errors */
+    const char* errorStr = mqtt_error_str(error);
+    UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: yield: error: %s", errorStr);
+    
+    switch(error){
+        case MQTT_ERROR_CONNECTION_CLOSED:
+            return UA_STATUSCODE_BADNOTCONNECTED;
+        case MQTT_ERROR_SOCKET_ERROR:
+            return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+        case MQTT_ERROR_CONNECTION_REFUSED:
+            return UA_STATUSCODE_BADCONNECTIONREJECTED;
+            
+        default:
+            return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    }
+}
+
+UA_StatusCode
+publishMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic, const UA_ByteString *buf, UA_Byte qos){
+    if(channelData == NULL || buf == NULL ){
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+    }
+
+    UA_STACKARRAY(char, topicChar, sizeof(char) * topic.length +1);
+    memcpy(topicChar, topic.data, topic.length);
+    topicChar[topic.length] = '\0';
+    
+    struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
+    if(client == NULL)
+        return UA_STATUSCODE_BADNOTCONNECTED;
+
+    /* publish */
+    enum MQTTPublishFlags flags;
+    if(qos == 0){
+        flags = MQTT_PUBLISH_QOS_0;
+    }else if( qos == 1){
+        flags = MQTT_PUBLISH_QOS_1;
+    }else if( qos == 2){
+        flags = MQTT_PUBLISH_QOS_2;
+    }else{
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: publish: Bad Qos Level.");
+        return UA_STATUSCODE_BADINVALIDARGUMENT;
+    }
+    mqtt_publish(client, topicChar, buf->data, buf->length, (uint8_t)flags);
+    if (client->error != MQTT_OK) {
+        if(client->error == MQTT_ERROR_SEND_BUFFER_IS_FULL){
+            UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: publish: Send buffer is full. "
+                                                               "Possible reasons: send buffer is to small, "
+                                                               "sending to fast, broker not responding.");
+        }else{
+            UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: publish: %s", mqtt_error_str(client->error));
+        }
+        return UA_STATUSCODE_BADCONNECTIONREJECTED;
+    }
+    return UA_STATUSCODE_GOOD;
+}

+ 39 - 0
plugins/mqtt/ua_mqtt_adapter.h

@@ -0,0 +1,39 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2018 Fraunhofer IOSB (Author: Lukas Meling)
+ */
+
+#ifndef UA_PLUGIN_MQTT_H_
+#define UA_PLUGIN_MQTT_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ua_network_pubsub_mqtt.h"
+
+UA_StatusCode
+connectMqtt(UA_PubSubChannelDataMQTT*);
+
+UA_StatusCode
+disconnectMqtt(UA_PubSubChannelDataMQTT*);
+
+UA_StatusCode
+unSubscribeMqtt(UA_PubSubChannelDataMQTT*, UA_String topic);
+
+UA_StatusCode
+publishMqtt(UA_PubSubChannelDataMQTT*, UA_String topic, const UA_ByteString *buf, UA_Byte qos);
+
+UA_StatusCode
+subscribeMqtt(UA_PubSubChannelDataMQTT*, UA_String topic, UA_Byte qos);
+
+UA_StatusCode
+yieldMqtt(UA_PubSubChannelDataMQTT*, UA_UInt16 timeout);
+    
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* UA_PLUGIN_MQTT_H_ */

+ 304 - 0
plugins/ua_network_pubsub_mqtt.c

@@ -0,0 +1,304 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2018 Fraunhofer IOSB (Author: Lukas Meling)
+ */
+
+/**
+ * This uses the mqtt/ua_mqtt_adapter.h to call mqtt functions.
+ * Currently only ua_mqtt_adapter.c implements this 
+ * interface and maps them to the specific "MQTT-C" library functions. 
+ * Another mqtt lib could be used.
+ * "ua_mqtt_pal.c" forwards the network calls (send/recv) to UA_Connection (TCP).
+ */
+
+#include "mqtt/ua_mqtt_adapter.h"
+#include "open62541/plugin/log_stdout.h"
+
+static UA_StatusCode
+UA_uaQos_toMqttQos(UA_BrokerTransportQualityOfService uaQos, UA_Byte *qos){
+    switch (uaQos){
+        case UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT:
+            *qos = 0;
+            break;
+        case UA_BROKERTRANSPORTQUALITYOFSERVICE_ATLEASTONCE:
+            *qos = 1;
+            break;
+        case UA_BROKERTRANSPORTQUALITYOFSERVICE_ATMOSTONCE:
+            *qos = 2;
+            break;
+        default:
+            break;
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+/**
+ * Open mqtt connection based on the connectionConfig.
+ * 
+ *
+ * @return ref to created channel, NULL on error
+ */
+static UA_PubSubChannel *
+UA_PubSubChannelMQTT_open(const UA_PubSubConnectionConfig *connectionConfig) {
+    UA_NetworkAddressUrlDataType address;
+    memset(&address, 0, sizeof(UA_NetworkAddressUrlDataType));
+    if(UA_Variant_hasScalarType(&connectionConfig->address, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE])){
+        address = *(UA_NetworkAddressUrlDataType *)connectionConfig->address.data;
+    } else {
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT Connection creation failed. Invalid Address.");
+        return NULL;
+    }
+    
+    /* allocate and init memory for the Mqtt specific internal data */
+    UA_PubSubChannelDataMQTT * channelDataMQTT =
+            (UA_PubSubChannelDataMQTT *) UA_calloc(1, (sizeof(UA_PubSubChannelDataMQTT)));
+    if(!channelDataMQTT){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT Connection creation failed. Out of memory.");
+        return NULL;
+    }
+    
+    /* set default values */
+    UA_String mqttClientId = UA_STRING("open62541_pub");
+    memcpy(channelDataMQTT, &(UA_PubSubChannelDataMQTT){address, 2000,2000, NULL, NULL,&mqttClientId, NULL, NULL, NULL}, sizeof(UA_PubSubChannelDataMQTT));
+    /* iterate over the given KeyValuePair paramters */
+    UA_String sendBuffer = UA_STRING("sendBufferSize"), recvBuffer = UA_STRING("recvBufferSize"), clientId = UA_STRING("mqttClientId");
+    for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
+        if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &sendBuffer)){
+            if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_UINT32])){
+                channelDataMQTT->mqttSendBufferSize = *(UA_UInt32 *) connectionConfig->connectionProperties[i].value.data;
+            }
+        } else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &recvBuffer)){
+            if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_UINT32])){
+                channelDataMQTT->mqttRecvBufferSize = *(UA_UInt32 *) connectionConfig->connectionProperties[i].value.data;
+            }
+        } else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &clientId)){
+            if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_STRING])){
+                channelDataMQTT->mqttClientId = (UA_String *) connectionConfig->connectionProperties[i].value.data;
+            }
+        } else {
+            UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT Connection creation. Unknown connection parameter.");
+        }
+    }
+    
+    /* Create a new channel */
+    UA_PubSubChannel *newChannel = (UA_PubSubChannel *) UA_calloc(1, sizeof(UA_PubSubChannel));
+    if(!newChannel){
+        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT Connection creation failed. Out of memory.");
+        UA_free(channelDataMQTT);
+        return NULL;
+    }
+
+    /* Allocate memory for mqtt receive buffer */
+    if(channelDataMQTT->mqttRecvBufferSize > 0){
+        channelDataMQTT->mqttRecvBuffer = (uint8_t*)UA_calloc(channelDataMQTT->mqttRecvBufferSize, sizeof(uint8_t));
+        if(channelDataMQTT->mqttRecvBuffer == NULL){
+            UA_free(channelDataMQTT);
+            UA_free(newChannel);
+            return NULL;
+        }
+    }
+    
+    /* Allocate memory for mqtt send buffer */
+    if(channelDataMQTT->mqttSendBufferSize > 0){
+        channelDataMQTT->mqttSendBuffer = (uint8_t*)UA_calloc(channelDataMQTT->mqttSendBufferSize, sizeof(uint8_t));
+        if(channelDataMQTT->mqttSendBuffer == NULL){
+            if(channelDataMQTT->mqttRecvBufferSize > 0){
+                UA_free(channelDataMQTT->mqttRecvBuffer);
+            }
+            UA_free(channelDataMQTT);
+            UA_free(newChannel);
+            return NULL;
+        }
+    }
+    
+    /*link channel and internal channel data*/
+    newChannel->handle = channelDataMQTT;
+    
+    /* MQTT Client connect call. */
+    UA_StatusCode ret = connectMqtt(channelDataMQTT);
+    
+    if(ret != UA_STATUSCODE_GOOD){
+        /* try to disconnect tcp */
+        disconnectMqtt(channelDataMQTT);
+        UA_free(channelDataMQTT->mqttSendBuffer);
+        UA_free(channelDataMQTT->mqttRecvBuffer);
+        UA_free(channelDataMQTT);
+        UA_free(newChannel);
+        return NULL;
+    }
+    newChannel->state = UA_PUBSUB_CHANNEL_RDY;
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT Connection established.");
+    return newChannel;
+}
+
+/**
+ * Subscribe to topic specified in brokerTransportSettings->queueName.
+ *
+ * @return UA_STATUSCODE_GOOD on success
+ */
+static UA_StatusCode
+UA_PubSubChannelMQTT_regist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings,
+                            void (*callback)(UA_ByteString *encodedBuffer, UA_ByteString *topic)) {
+    if(channel->state != UA_PUBSUB_CHANNEL_RDY){
+        UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT regist failed. Channel closed.");
+        return UA_STATUSCODE_BADCONNECTIONCLOSED;
+    }
+
+    UA_PubSubChannelDataMQTT *channelDataMQTT = (UA_PubSubChannelDataMQTT *) channel->handle;
+    channelDataMQTT->callback = callback;
+    
+    if(transportSettings != NULL && transportSettings->encoding == UA_EXTENSIONOBJECT_DECODED
+            && transportSettings->content.decoded.type->typeIndex == UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE){
+        UA_BrokerWriterGroupTransportDataType *brokerTransportSettings =
+                (UA_BrokerWriterGroupTransportDataType*)transportSettings->content.decoded.data;
+
+        UA_Byte qos = 0;
+        UA_uaQos_toMqttQos(brokerTransportSettings->requestedDeliveryGuarantee, &qos);
+        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: register");
+        return subscribeMqtt(channelDataMQTT, brokerTransportSettings->queueName, qos);
+    }else{
+         return UA_STATUSCODE_BADARGUMENTSMISSING;
+    }
+}
+
+/**
+ * Remove subscription specified in brokerTransportSettings->queueName.
+ *
+ * @return UA_STATUSCODE_GOOD on success
+ */
+static UA_StatusCode
+UA_PubSubChannelMQTT_unregist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings) {
+    if(channel->state != UA_PUBSUB_CHANNEL_RDY){
+        UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: unregister failed. Channel closed.");
+        return UA_STATUSCODE_BADCONNECTIONCLOSED;
+    }
+
+    UA_PubSubChannelDataMQTT * channelDataMQTT = (UA_PubSubChannelDataMQTT *) channel->handle;
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: unregister");
+    if(transportSettings != NULL && transportSettings->encoding == UA_EXTENSIONOBJECT_DECODED
+            && transportSettings->content.decoded.type->typeIndex == UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE){
+        UA_BrokerWriterGroupTransportDataType *brokerTransportSettings =
+                (UA_BrokerWriterGroupTransportDataType*)transportSettings->content.decoded.data;
+
+        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: unregister");
+        return unSubscribeMqtt(channelDataMQTT, brokerTransportSettings->queueName);
+    }else{
+         return UA_STATUSCODE_BADARGUMENTSMISSING;
+    }
+}
+
+/**
+ * Send a message.
+ *
+ * @return UA_STATUSCODE_GOOD if success
+ */
+static UA_StatusCode
+UA_PubSubChannelMQTT_send(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings, const UA_ByteString *buf) {
+    if(channel->state != UA_PUBSUB_CHANNEL_RDY){
+        UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: sending failed. Invalid state.");
+        return UA_STATUSCODE_BADCONNECTIONCLOSED;
+    }
+
+    UA_PubSubChannelDataMQTT *channelDataMQTT = (UA_PubSubChannelDataMQTT *) channel->handle;
+    UA_StatusCode ret = UA_STATUSCODE_GOOD;
+    UA_Byte qos = 0;
+    
+    if(transportSettings != NULL && transportSettings->encoding == UA_EXTENSIONOBJECT_DECODED 
+            && transportSettings->content.decoded.type->typeIndex == UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE){
+        UA_BrokerWriterGroupTransportDataType *brokerTransportSettings = (UA_BrokerWriterGroupTransportDataType*)transportSettings->content.decoded.data;
+        UA_uaQos_toMqttQos(brokerTransportSettings->requestedDeliveryGuarantee, &qos);
+        
+        UA_String topic;
+        topic = brokerTransportSettings->queueName;
+        ret = publishMqtt(channelDataMQTT, topic, buf, qos);
+
+        if(ret){
+            channel->state = UA_PUBSUB_CHANNEL_ERROR;
+            UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Publish failed");
+        }else{
+            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Publish");
+        }
+    }else{
+        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Transport settings not found.");
+    }
+    return ret;
+}
+
+/**
+ * Close channel and free the channel data.
+ *
+ * @return UA_STATUSCODE_GOOD if success
+ */
+static UA_StatusCode
+UA_PubSubChannelMQTT_close(UA_PubSubChannel *channel) {
+    /* already closed */
+    if(channel->state == UA_PUBSUB_CHANNEL_CLOSED)
+        return UA_STATUSCODE_GOOD;
+    UA_PubSubChannelDataMQTT *channelDataMQTT = (UA_PubSubChannelDataMQTT *) channel->handle;
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Closing PubSubChannel.");
+    disconnectMqtt(channelDataMQTT);
+    UA_free(channelDataMQTT);
+    UA_free(channel);
+    return UA_STATUSCODE_GOOD;
+}
+
+/**
+ * Calls the send and receive functions of the mqtt network stack.
+ *
+ * @return UA_STATUSCODE_GOOD if success
+ */
+static UA_StatusCode
+UA_PubSubChannelMQTT_yield(UA_PubSubChannel *channel, UA_UInt16 timeout){
+    UA_StatusCode ret = UA_STATUSCODE_BADINVALIDARGUMENT;
+    if(channel == NULL){
+        return ret;
+    }
+
+    if(channel->state == UA_PUBSUB_CHANNEL_ERROR){
+        return UA_STATUSCODE_BADINTERNALERROR;
+    }
+
+    UA_PubSubChannelDataMQTT *channelDataMQTT = (UA_PubSubChannelDataMQTT *) channel->handle;
+    ret = yieldMqtt(channelDataMQTT, timeout);
+    if(ret != UA_STATUSCODE_GOOD){
+        channel->state = UA_PUBSUB_CHANNEL_ERROR;
+        return ret;
+    }
+
+    return ret;
+}
+
+/**
+ * Generate a new MQTT channel. Based on the given configuration. Uses yield and no recv call.
+ *
+ * @param connectionConfig connection configuration
+ * @return  ref to created channel, NULL on error
+ */
+static UA_PubSubChannel *
+TransportLayerMQTT_addChannel(UA_PubSubConnectionConfig *connectionConfig) {
+    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSub MQTT: channel requested.");
+    UA_PubSubChannel * pubSubChannel = UA_PubSubChannelMQTT_open(connectionConfig);
+    if(pubSubChannel){
+        pubSubChannel->regist = UA_PubSubChannelMQTT_regist;
+        pubSubChannel->unregist = UA_PubSubChannelMQTT_unregist;
+        pubSubChannel->send = UA_PubSubChannelMQTT_send;
+        pubSubChannel->close = UA_PubSubChannelMQTT_close;
+        pubSubChannel->yield = UA_PubSubChannelMQTT_yield;
+        
+        pubSubChannel->connectionConfig = connectionConfig;
+    }
+    return pubSubChannel;
+}
+
+//MQTT channel factory
+UA_PubSubTransportLayer
+UA_PubSubTransportLayerMQTT(){
+    UA_PubSubTransportLayer pubSubTransportLayer;
+    pubSubTransportLayer.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt");
+    pubSubTransportLayer.createPubSubChannel = &TransportLayerMQTT_addChannel;
+    return pubSubTransportLayer;
+}
+
+#undef _POSIX_C_SOURCE

+ 47 - 0
plugins/ua_network_pubsub_mqtt.h

@@ -0,0 +1,47 @@
+/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
+ * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
+ * 
+ *    Copyright 2018 (c) Fraunhofer IOSB (Author: Lukas Meling)
+ */
+
+#ifndef UA_NETWORK_MQTT_H_
+#define UA_NETWORK_MQTT_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "open62541/plugin/pubsub.h"
+#include "open62541/network_tcp.h"
+
+/* mqtt network layer specific internal data */
+typedef struct {
+    UA_NetworkAddressUrlDataType address;
+    UA_UInt32 mqttRecvBufferSize;
+    UA_UInt32 mqttSendBufferSize;
+    uint8_t *mqttSendBuffer; 
+    uint8_t *mqttRecvBuffer; 
+    UA_String *mqttClientId;
+    UA_Connection *connection;
+    void * mqttClient;
+    void (*callback)(UA_ByteString *encodedBuffer, UA_ByteString *topic);
+} UA_PubSubChannelDataMQTT;
+/* TODO:
+ * will topic,
+ * will message,
+ * user name,
+ * password,
+ * keep alive
+ * ssl: cert, flag
+ */    
+
+
+UA_PubSubTransportLayer
+UA_PubSubTransportLayerMQTT(void);
+
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* UA_NETWORK_MQTT_H_ */

+ 9 - 0
src/server/ua_server.c

@@ -696,6 +696,15 @@ UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
         nl->listen(nl, server, timeout);
     }
 
+#if defined(UA_ENABLE_PUBSUB_MQTT)
+    /* Listen on the pubsublayer, but only if the yield function is set */
+    for(size_t i = 0; i < server->pubSubManager.connectionsSize; ++i) {
+        UA_PubSubConnection *ps = &server->pubSubManager.connections[i];
+            if(ps && ps->channel->yield){
+                ps->channel->yield(ps->channel, timeout);
+        }
+    }
+#endif
 #if defined(UA_ENABLE_DISCOVERY_MULTICAST) && (UA_MULTITHREADING < 200)
     if(server->config.discovery.mdnsEnable) {
         // TODO multicastNextRepeat does not consider new input data (requests)

+ 7 - 1
tests/CMakeLists.txt

@@ -32,6 +32,7 @@ include_directories("${PROJECT_SOURCE_DIR}/src/server")
 include_directories("${CMAKE_CURRENT_SOURCE_DIR}/testing-plugins")
 # #include <src_generated/<...>.h>
 include_directories("${PROJECT_BINARY_DIR}")
+include_directories("${PROJECT_BINARY_DIR}/../plugins")
 
 if(UA_ENABLE_ENCRYPTION)
     # mbedtls includes
@@ -51,6 +52,7 @@ else()
     set(TESTS_BINARY_DIR ${CMAKE_BINARY_DIR}/bin/tests)
 endif()
 
+
 # Use different plugins for testing
 set(test_plugin_sources ${PROJECT_SOURCE_DIR}/arch/network_tcp.c
     ${PROJECT_SOURCE_DIR}/tests/testing-plugins/testing_clock.c
@@ -303,7 +305,6 @@ if(UA_ENABLE_PUBSUB)
     #Link libraries for executing subscriber unit test
     add_executable(check_pubsub_subscribe pubsub/check_pubsub_subscribe.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-plugins>)
     target_link_libraries(check_pubsub_subscribe ${LIBS})
-    add_test_valgrind(check_pubsub_subscribe ${TESTS_BINARY_DIR}/check_pubsub_subscribe)
     add_executable(check_pubsub_publishspeed pubsub/check_pubsub_publishspeed.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-plugins>)
     target_link_libraries(check_pubsub_publishspeed ${LIBS})
     add_test_valgrind(pubsub_publishspeed ${TESTS_BINARY_DIR}/check_pubsub_publish)
@@ -326,6 +327,11 @@ if(UA_ENABLE_PUBSUB)
 
         endif()
     endif()
+    if(UA_ENABLE_PUBSUB_MQTT)
+        add_executable(check_pubsub_connection_mqtt pubsub/check_pubsub_connection_mqtt.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-plugins>)
+        target_link_libraries(check_pubsub_connection_mqtt ${LIBS})
+        add_test_valgrind(pubsub_connection_mqtt ${TESTS_BINARY_DIR}/check_pubsub_connection_mqtt)
+    endif()
 endif()
 
 add_executable(check_server_readspeed server/check_server_readspeed.c $<TARGET_OBJECTS:open62541-object> $<TARGET_OBJECTS:open62541-testplugins>)

+ 221 - 0
tests/pubsub/check_pubsub_connection_mqtt.c

@@ -0,0 +1,221 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2017 - 2018 Fraunhofer IOSB (Author: Andreas Ebner)
+ * Copyright (c) 2018 Fraunhofer IOSB (Author: Lukas Meling)
+ * Copyright (c) 2019 Kalycito Infotech Private Limited
+ */
+
+#include "open62541/server.h"
+#include "open62541/types_generated_encoding_binary.h"
+#include "open62541/server_config_default.h"
+#include "ua_network_pubsub_mqtt.h"
+#include "ua_server_internal.h"
+#include "check.h"
+
+UA_Server *server = NULL;
+UA_ServerConfig *config = NULL;
+
+static void setup(void) {
+    server = UA_Server_new();
+    config = UA_Server_getConfig(server);
+    config->pubsubTransportLayers = (UA_PubSubTransportLayer *) UA_malloc(1 * sizeof(UA_PubSubTransportLayer));
+    if(!config->pubsubTransportLayers) {
+        UA_Server_delete(server);
+    }
+    config->pubsubTransportLayers[0] = UA_PubSubTransportLayerMQTT();
+    config->pubsubTransportLayersSize++;
+    UA_Server_run_startup(server);
+}
+
+static void teardown(void) {
+    UA_Server_run_shutdown(server);
+    UA_Server_delete(server);
+}
+
+START_TEST(AddConnectionsWithMinimalValidConfiguration){
+    UA_StatusCode retVal;
+    UA_PubSubConnectionConfig connectionConfig;
+    memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
+    connectionConfig.name = UA_STRING("Mqtt Connection");
+    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL, UA_STRING("opc.mqtt://test.mosquitto.org:1883/")};
+    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
+                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt");
+    retVal = UA_Server_addPubSubConnection(server, &connectionConfig, NULL);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    ck_assert(server->pubSubManager.connections[0].channel != NULL);
+    retVal = UA_Server_addPubSubConnection(server, &connectionConfig, NULL);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    ck_assert(server->pubSubManager.connections[1].channel != NULL);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 2);
+} END_TEST
+
+START_TEST(AddRemoveAddConnectionWithMinimalValidConfiguration){
+    UA_StatusCode retVal;
+    UA_PubSubConnectionConfig connectionConfig;
+    memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
+    connectionConfig.name = UA_STRING("Mqtt Connection");
+    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL, UA_STRING("opc.mqtt://test.mosquitto.org:1883/")};
+    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
+                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt");
+    UA_NodeId connectionIdent;
+    retVal = UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    ck_assert(server->pubSubManager.connections[0].channel != NULL);
+    retVal |= UA_Server_removePubSubConnection(server, connectionIdent);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 0);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    retVal = UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
+    ck_assert(server->pubSubManager.connections[0].channel != NULL);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+} END_TEST
+
+START_TEST(AddConnectionWithInvalidAddress){
+    UA_StatusCode retVal;
+    UA_PubSubConnectionConfig connectionConfig;
+    memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
+    connectionConfig.name = UA_STRING("MQTT Connection");
+    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL, UA_STRING("opc.mqtt://127.0..1:1883/")};
+    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
+                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-invalid");
+    retVal = UA_Server_addPubSubConnection(server, &connectionConfig, NULL);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 0);
+    ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+    retVal = UA_Server_addPubSubConnection(server, &connectionConfig, NULL);
+    ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 0);
+} END_TEST
+
+START_TEST(AddConnectionWithUnknownTransportURL){
+        UA_StatusCode retVal;
+        UA_PubSubConnectionConfig connectionConfig;
+        memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
+        connectionConfig.name = UA_STRING("MQTT Connection");
+        UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL, UA_STRING("opc.mqtt://test.mosquitto.org:1883/")};
+        UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
+                             &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+        connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/unknown-udp-uadp");
+        UA_NodeId connectionIdent;
+        retVal = UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
+        ck_assert_int_eq(server->pubSubManager.connectionsSize, 0);
+        ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+} END_TEST
+
+START_TEST(AddConnectionWithNullConfig){
+        UA_StatusCode retVal;
+        retVal = UA_Server_addPubSubConnection(server, NULL, NULL);
+        ck_assert_int_eq(server->pubSubManager.connectionsSize, 0);
+        ck_assert_int_ne(retVal, UA_STATUSCODE_GOOD);
+    } END_TEST
+
+START_TEST(AddSingleConnectionWithMaximalConfiguration){
+    UA_NetworkAddressUrlDataType networkAddressUrlData = {UA_STRING("127.0.0.1"), UA_STRING("opc.mqtt://test.mosquitto.org:1883/")};
+    UA_Variant address;
+    UA_Variant_setScalar(&address, &networkAddressUrlData, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    UA_KeyValuePair connectionOptions[3];
+    connectionOptions[0].key = UA_QUALIFIEDNAME(0, "sendBufferSize");
+    UA_UInt32 sBs = 1000;
+    UA_Variant_setScalar(&connectionOptions[0].value, &sBs, &UA_TYPES[UA_TYPES_UINT32]);
+    connectionOptions[1].key = UA_QUALIFIEDNAME(0, "recvBufferSize");
+    UA_UInt32 rBs = 1000;
+    UA_Variant_setScalar(&connectionOptions[1].value, &rBs, &UA_TYPES[UA_TYPES_UINT32]);
+    connectionOptions[2].key = UA_QUALIFIEDNAME(0, "mqttClientId");
+    UA_String id = UA_STRING("client");
+    UA_Variant_setScalar(&connectionOptions[2].value, &id, &UA_TYPES[UA_TYPES_STRING]);
+
+    UA_PubSubConnectionConfig connectionConf;
+    memset(&connectionConf, 0, sizeof(UA_PubSubConnectionConfig));
+    connectionConf.name = UA_STRING("MQTT Connection");
+    connectionConf.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt");
+    connectionConf.enabled = true;
+    connectionConf.publisherId.numeric = 223344;
+    connectionConf.connectionPropertiesSize = 3;
+    connectionConf.connectionProperties = connectionOptions;
+    connectionConf.address = address;
+    UA_NodeId connection;
+    UA_StatusCode retVal = UA_Server_addPubSubConnection(server, &connectionConf, &connection);
+    ck_assert_int_eq(server->pubSubManager.connectionsSize, 1);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    ck_assert(server->pubSubManager.connections[0].channel != NULL);
+} END_TEST
+
+START_TEST(GetMaximalConnectionConfigurationAndCompareValues){
+    UA_NetworkAddressUrlDataType networkAddressUrlData = {UA_STRING("127.0.0.1"), UA_STRING("opc.mqtt://test.mosquitto.org:1883/")};
+    UA_Variant address;
+    UA_Variant_setScalar(&address, &networkAddressUrlData, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    UA_KeyValuePair connectionOptions[3];
+    connectionOptions[0].key = UA_QUALIFIEDNAME(0, "sendBufferSize");
+    UA_UInt32 sBs = 1000;
+    UA_Variant_setScalar(&connectionOptions[0].value, &sBs, &UA_TYPES[UA_TYPES_UINT32]);
+    connectionOptions[1].key = UA_QUALIFIEDNAME(0, "recvBufferSize");
+    UA_UInt32 rBs = 1000;
+    UA_Variant_setScalar(&connectionOptions[1].value, &rBs, &UA_TYPES[UA_TYPES_UINT32]);
+    connectionOptions[2].key = UA_QUALIFIEDNAME(0, "mqttClientId");
+    UA_String id = UA_STRING("client");
+    UA_Variant_setScalar(&connectionOptions[2].value, &id, &UA_TYPES[UA_TYPES_STRING]);
+
+    UA_PubSubConnectionConfig connectionConf;
+    memset(&connectionConf, 0, sizeof(UA_PubSubConnectionConfig));
+    connectionConf.name = UA_STRING("MQTT Connection");
+    connectionConf.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt");
+    connectionConf.enabled = true;
+    connectionConf.publisherId.numeric = 223344;
+    connectionConf.connectionPropertiesSize = 3;
+    connectionConf.connectionProperties = connectionOptions;
+    connectionConf.address = address;
+    UA_NodeId connection;
+    UA_StatusCode retVal = UA_Server_addPubSubConnection(server, &connectionConf, &connection);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    UA_PubSubConnectionConfig connectionConfig;
+    memset(&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
+    retVal |= UA_Server_getPubSubConnectionConfig(server, connection, &connectionConfig);
+    ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD);
+    ck_assert(connectionConfig.connectionPropertiesSize == connectionConf.connectionPropertiesSize);
+    ck_assert(UA_String_equal(&connectionConfig.name, &connectionConf.name) == UA_TRUE);
+    ck_assert(UA_String_equal(&connectionConfig.transportProfileUri, &connectionConf.transportProfileUri) == UA_TRUE);
+    UA_NetworkAddressUrlDataType networkAddressUrlDataCopy = *((UA_NetworkAddressUrlDataType *)connectionConfig.address.data);
+    ck_assert(UA_NetworkAddressUrlDataType_calcSizeBinary(&networkAddressUrlDataCopy) == UA_NetworkAddressUrlDataType_calcSizeBinary(&networkAddressUrlData));
+    for(size_t i = 0; i < connectionConfig.connectionPropertiesSize; i++){
+        ck_assert(UA_String_equal(&connectionConfig.connectionProperties[i].key.name, &connectionConf.connectionProperties[i].key.name) == UA_TRUE);
+        ck_assert(UA_Variant_calcSizeBinary(&connectionConfig.connectionProperties[i].value) == UA_Variant_calcSizeBinary(&connectionConf.connectionProperties[i].value));
+    }
+    UA_PubSubConnectionConfig_clear(&connectionConfig);
+    } END_TEST
+
+int main(void) {
+    TCase *tc_add_pubsub_connections_minimal_config = tcase_create("Create PubSub Mqtt Connections with minimal valid config");
+    tcase_add_checked_fixture(tc_add_pubsub_connections_minimal_config, setup, teardown);
+    tcase_add_test(tc_add_pubsub_connections_minimal_config, AddConnectionsWithMinimalValidConfiguration);
+    tcase_add_test(tc_add_pubsub_connections_minimal_config, AddRemoveAddConnectionWithMinimalValidConfiguration);
+
+    TCase *tc_add_pubsub_connections_invalid_config = tcase_create("Create PubSub Mqtt Connections with invalid configurations");
+    tcase_add_checked_fixture(tc_add_pubsub_connections_invalid_config, setup, teardown);
+    tcase_add_test(tc_add_pubsub_connections_invalid_config, AddConnectionWithInvalidAddress);
+    tcase_add_test(tc_add_pubsub_connections_invalid_config, AddConnectionWithUnknownTransportURL);
+    tcase_add_test(tc_add_pubsub_connections_invalid_config, AddConnectionWithNullConfig);
+
+    TCase *tc_add_pubsub_connections_maximal_config = tcase_create("Create PubSub Mqtt Connections with maximal valid config");
+    tcase_add_checked_fixture(tc_add_pubsub_connections_maximal_config, setup, teardown);
+    tcase_add_test(tc_add_pubsub_connections_maximal_config, AddSingleConnectionWithMaximalConfiguration);
+    tcase_add_test(tc_add_pubsub_connections_maximal_config, GetMaximalConnectionConfigurationAndCompareValues);
+
+    Suite *s = suite_create("PubSub Mqtt connection creation");
+    suite_add_tcase(s, tc_add_pubsub_connections_minimal_config);
+    suite_add_tcase(s, tc_add_pubsub_connections_invalid_config);
+    suite_add_tcase(s, tc_add_pubsub_connections_maximal_config);
+    //suite_add_tcase(s, tc_decode);
+
+    SRunner *sr = srunner_create(s);
+    srunner_set_fork_status(sr, CK_NOFORK);
+    srunner_run_all(sr,CK_NORMAL);
+    int number_failed = srunner_ntests_failed(sr);
+    srunner_free(sr);
+    return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}

+ 1 - 0
tools/appveyor/build.ps1

@@ -57,6 +57,7 @@ try {
             -DUA_ENABLE_PUBSUB_DELTAFRAMES:BOOL=ON `
             -DUA_ENABLE_PUBSUB_INFORMATIONMODEL:BOOL=ON `
             -DUA_ENABLE_SUBSCRIPTIONS_EVENTS:BOOL=ON `
+            -DUA_ENABLE_PUBSUB_MQTT:BOOL=ON `
             -DUA_NAMESPACE_ZERO:STRING=FULL ..
     & cmake --build . --config RelWithDebInfo
     if ($LASTEXITCODE -and $LASTEXITCODE -ne 0) {

+ 11 - 0
tools/travis/travis_linux_script.sh

@@ -409,6 +409,16 @@ if [ $? -ne 0 ] ; then exit 1 ; fi
 cd .. && rm build -rf
 echo -en 'travis_fold:end:script.build.json\\r'
 
+    echo -e "\r\n== Compile PubSub MQTT ==" && echo -en 'travis_fold:start:script.build.mqtt\\r'
+    mkdir -p build && cd build
+    cmake \
+        -DPYTHON_EXECUTABLE:FILEPATH=/usr/bin/$PYTHON \
+        -DUA_ENABLE_PUBSUB=ON -DUA_ENABLE_PUBSUB_MQTT=ON ..
+    make -j
+    if [ $? -ne 0 ] ; then exit 1 ; fi
+    cd .. && rm build -rf
+    echo -en 'travis_fold:end:script.build.mqtt\\r'
+
 echo -e "\r\n== Unit tests (full NS0) ==" && echo -en 'travis_fold:start:script.build.unit_test_ns0_full\\r'
 mkdir -p build && cd build
 # Valgrind cannot handle the full NS0 because the generated file is too big. Thus run NS0 full without valgrind
@@ -426,6 +436,7 @@ cmake \
     -DUA_ENABLE_PUBSUB=ON \
     -DUA_ENABLE_PUBSUB_DELTAFRAMES=ON \
     -DUA_ENABLE_PUBSUB_INFORMATIONMODEL=ON \
+    -DUA_ENABLE_PUBSUB_MQTT=ON \
     -DUA_ENABLE_SUBSCRIPTIONS=ON \
     -DUA_ENABLE_SUBSCRIPTIONS_EVENTS=ON \
     -DUA_ENABLE_UNIT_TESTS_MEMCHECK=OFF \