Forráskód Böngészése

feat(ex): Standalone MQTT publisher application

 - Modify examples/CMakelists.txt
 - Add Broker transport settings
Shriya Chaurasia 5 éve
szülő
commit
69dc0fe59b

+ 1 - 1
examples/CMakeLists.txt

@@ -203,7 +203,7 @@ if(UA_ENABLE_PUBSUB)
     if(UA_ENABLE_PUBSUB_MQTT)
         if(NOT WIN32)
             include_directories(${PROJECT_SOURCE_DIR}/../plugins)
-            add_example(tutorial_pubsub_mqtt pubsub/tutorial_pubsub_mqtt.c)
+            add_example(tutorial_pubsub_mqtt_publish pubsub/tutorial_pubsub_mqtt_publish.c)
         endif()
     endif()
 endif()

+ 115 - 116
examples/pubsub/tutorial_pubsub_mqtt.c

@@ -7,21 +7,21 @@
  * Working with Publish/Subscribe
  * ------------------------------
  *
- * Work in progress:
- * This Tutorial will be continuously extended during the next PubSub batches. More details about
- * the PubSub extension and corresponding open62541 API are located here: :ref:`pubsub`.
+ * Work in progress: This Tutorial will be continuously extended during the next
+ * PubSub batches. More details about the PubSub extension and corresponding
+ * open62541 API are located here: :ref:`pubsub`.
  *
  * Publishing Fields
  * ^^^^^^^^^^^^^^^^^
- * The PubSub mqtt publish subscribe example demonstrate the simplest way to publish
- * informations from the information model over MQTT using
- * the UADP (or later JSON) encoding.
- * To receive information the subscribe functionality of mqtt is used.
- * A periodical call to yield is necessary to update the mqtt stack.
+ * The PubSub MQTT publish example demonstrate the simplest way to publish
+ * informations from the information model over MQTT using the UADP (or later
+ * JSON) encoding. To receive information the subscribe functionality of mqtt is
+ * used. A periodical call to yield is necessary to update the mqtt stack.
  *
  * **Connection handling**
- * PubSubConnections can be created and deleted on runtime. More details about the system preconfiguration and
- * connection can be found in ``tutorial_pubsub_connection.c``.
+ * PubSubConnections can be created and deleted on runtime. More details about
+ * the system preconfiguration and connection can be found in
+ * ``tutorial_pubsub_connection.c``.
  */
 
 #include "open62541/server.h"
@@ -29,9 +29,22 @@
 #include "ua_pubsub.h"
 #include "ua_network_pubsub_mqtt.h"
 #include "open62541/plugin/log_stdout.h"
+#include <signal.h>
+
+#define CONNECTION_NAME              "MQTT Publisher Connection"
+#define TRANSPORT_PROFILE_URI        "http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt"
+#define MQTT_CLIENT_ID               "TESTCLIENTPUBSUBMQTT"
+#define CONNECTIONOPTION_NAME        "mqttClientId"
+#define PUBLISHER_TOPIC              "customTopic"
+#define PUBLISHER_METADATAQUEUENAME  "MetaDataTopic"
+#define PUBLISHER_METADATAUPDATETIME 0
+#define BROKER_ADDRESS_URL           "opc.mqtt://127.0.0.1:1883"
+#define PUBLISH_INTERVAL             500
 
 static UA_Boolean useJson = false;
-static UA_NodeId connectionIdent, publishedDataSetIdent, writerGroupIdent;
+static UA_NodeId connectionIdent;
+static UA_NodeId publishedDataSetIdent;
+static UA_NodeId writerGroupIdent;
 
 static void
 addPubSubConnection(UA_Server *server, char *addressUrl) {
@@ -39,31 +52,34 @@ addPubSubConnection(UA_Server *server, char *addressUrl) {
      * in the pubsub connection tutorial */
     UA_PubSubConnectionConfig connectionConfig;
     memset(&connectionConfig, 0, sizeof(connectionConfig));
-    connectionConfig.name = UA_STRING("MQTT Connection 1");
-    connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt");
+    connectionConfig.name = UA_STRING(CONNECTION_NAME);
+    connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI);
     connectionConfig.enabled = UA_TRUE;
 
     /* configure address of the mqtt broker (local on default port) */
     UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING(addressUrl)};
-    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
-    connectionConfig.publisherId.numeric = UA_UInt32_random();
-    
+    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
+                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
+    /* Changed to static publisherId from random generation to identify
+     * the publisher on Subscriber side */
+    connectionConfig.publisherId.numeric = 2234;
+
     /* configure options, set mqtt client id */
     UA_KeyValuePair connectionOptions[1];
-    connectionOptions[0].key = UA_QUALIFIEDNAME(0, "mqttClientId");
-    UA_String mqttClientId = UA_STRING("TESTCLIENTPUBSUBMQTT");
+    connectionOptions[0].key = UA_QUALIFIEDNAME(0, CONNECTIONOPTION_NAME);
+    UA_String mqttClientId = UA_STRING(MQTT_CLIENT_ID);
     UA_Variant_setScalar(&connectionOptions[0].value, &mqttClientId, &UA_TYPES[UA_TYPES_STRING]);
     connectionConfig.connectionProperties = connectionOptions;
     connectionConfig.connectionPropertiesSize = 1;
-    
     UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
 }
 
 /**
  * **PublishedDataSet handling**
- * The PublishedDataSet (PDS) and PubSubConnection are the toplevel entities and can exist alone. The PDS contains
- * the collection of the published fields.
- * All other PubSub elements are directly or indirectly linked with the PDS or connection.
+ * The PublishedDataSet (PDS) and PubSubConnection are the toplevel entities and
+ * can exist alone. The PDS contains the collection of the published fields. All
+ * other PubSub elements are directly or indirectly linked with the PDS or
+ * connection.
  */
 static void
 addPublishedDataSet(UA_Server *server) {
@@ -77,23 +93,6 @@ addPublishedDataSet(UA_Server *server) {
     UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
 }
 
-static void
-addVariable(UA_Server *server) {
-    // Define the attribute of the myInteger variable node 
-    UA_VariableAttributes attr = UA_VariableAttributes_default;
-    UA_Int32 myInteger = 42;
-    UA_Variant_setScalar(&attr.value, &myInteger, &UA_TYPES[UA_TYPES_INT32]);
-    attr.description = UA_LOCALIZEDTEXT("en-US","the answer");
-    attr.displayName = UA_LOCALIZEDTEXT("en-US","the answer");
-    attr.dataType = UA_TYPES[UA_TYPES_INT32].typeId;
-    attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
-
-    // Add the variable node to the information model 
-    UA_Server_addVariableNode(server, UA_NODEID_NUMERIC(1, 42), UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
-                              UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES), UA_QUALIFIEDNAME(1, "the answer"),
-                              UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), attr, NULL, NULL);
-}
-
 /**
  * **DataSetField handling**
  * The DataSetField (DSF) is part of the PDS and describes exactly one published field.
@@ -111,15 +110,7 @@ addDataSetField(UA_Server *server) {
     UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
     dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
     UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig, NULL);
-    
-    UA_DataSetFieldConfig dataSetFieldConfig2;
-    memset(&dataSetFieldConfig2, 0, sizeof(UA_DataSetFieldConfig));
-    dataSetFieldConfig2.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
-    dataSetFieldConfig2.field.variable.fieldNameAlias = UA_STRING("Test");
-    dataSetFieldConfig2.field.variable.promotedField = UA_FALSE;
-    dataSetFieldConfig2.field.variable.publishParameters.publishedVariable = UA_NODEID_NUMERIC(1, 42);
-    dataSetFieldConfig2.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
-    UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig2, NULL);
+
 }
 
 /**
@@ -128,7 +119,7 @@ addDataSetField(UA_Server *server) {
  * parameters for the message creation.
  */
 static void
-addWriterGroup(UA_Server *server, int interval) {
+addWriterGroup(UA_Server *server, char *topic, int interval) {
     /* Now we create a new WriterGroupConfig and add the group to the existing PubSubConnection. */
     UA_WriterGroupConfig writerGroupConfig;
     memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
@@ -136,7 +127,7 @@ addWriterGroup(UA_Server *server, int interval) {
     writerGroupConfig.publishingInterval = interval;
     writerGroupConfig.enabled = UA_FALSE;
     writerGroupConfig.writerGroupId = 100;
-    
+
     /* decide whether to use JSON or UADP encoding*/
 #ifdef UA_ENABLE_JSON_ENCODING
     if(useJson)
@@ -144,35 +135,56 @@ addWriterGroup(UA_Server *server, int interval) {
     else
 #endif
         writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
+    writerGroupConfig.messageSettings.encoding             = UA_EXTENSIONOBJECT_DECODED;
+    writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
+    /* The configuration flags for the messages are encapsulated inside the
+     * message- and transport settings extension objects. These extension
+     * objects are defined by the standard. e.g.
+     * UadpWriterGroupMessageDataType */
+    UA_UadpWriterGroupMessageDataType *writerGroupMessage  = UA_UadpWriterGroupMessageDataType_new();
+    /* Change message settings of writerGroup to send PublisherId,
+     * WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader
+     * of NetworkMessage */
+    writerGroupMessage->networkMessageContentMask =
+        (UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
+        (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
+        (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
+        (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
+    writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage;
 
     /* configure the mqtt publish topic */
     UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
     memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
-    brokerTransportSettings.queueName = UA_STRING("customTopic");
+    /* Assign the Topic at which MQTT publish should happen */
+    /*ToDo: Pass the topic as argument from the writer group */
+    brokerTransportSettings.queueName = UA_STRING(topic);
     brokerTransportSettings.resourceUri = UA_STRING_NULL;
     brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
-    
+
     /* Choose the QOS Level for MQTT */
     brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;
-    
+
     /* Encapsulate config in transportSettings */
     UA_ExtensionObject transportSettings;
     memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
     transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
     transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE];
     transportSettings.content.decoded.data = &brokerTransportSettings;
-    
+
     writerGroupConfig.transportSettings = transportSettings;
     UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent);
+    UA_Server_setWriterGroupOperational(server, writerGroupIdent);
+    UA_UadpWriterGroupMessageDataType_delete(writerGroupMessage);
 }
 
 /**
  * **DataSetWriter handling**
- * A DataSetWriter (DSW) is the glue between the WG and the PDS. The DSW is linked to exactly one
- * PDS and contains additional informations for the message generation.
+ * A DataSetWriter (DSW) is the glue between the WG and the PDS. The DSW is
+ * linked to exactly one PDS and contains additional informations for the
+ * message generation.
  */
 static void
-addDataSetWriter(UA_Server *server) {
+addDataSetWriter(UA_Server *server, char *topic) {
     /* We need now a DataSetWriter within the WriterGroup. This means we must
      * create a new DataSetWriterConfig and add call the addWriterGroup function. */
     UA_NodeId dataSetWriterIdent;
@@ -201,22 +213,48 @@ addDataSetWriter(UA_Server *server) {
         dataSetWriterConfig.messageSettings = messageSettings;
     }
 #endif
-    
+    /*TODO: Modify MQTT send to add DataSetWriters broker transport settings */
+    /*TODO: Pass the topic as argument from the writer group */
+    /*TODO: Publish Metadata to metaDataQueueName */
+    /* configure the mqtt publish topic */
+    UA_BrokerDataSetWriterTransportDataType brokerTransportSettings;
+    memset(&brokerTransportSettings, 0, sizeof(UA_BrokerDataSetWriterTransportDataType));
+
+    /* Assign the Topic at which MQTT publish should happen */
+    brokerTransportSettings.queueName = UA_STRING(topic);
+    brokerTransportSettings.resourceUri = UA_STRING_NULL;
+    brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
+    brokerTransportSettings.metaDataQueueName = UA_STRING(PUBLISHER_METADATAQUEUENAME);
+    brokerTransportSettings.metaDataUpdateTime = PUBLISHER_METADATAUPDATETIME;
+
+    /* Choose the QOS Level for MQTT */
+    brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;
+
+    /* Encapsulate config in transportSettings */
+    UA_ExtensionObject transportSettings;
+    memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
+    transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
+    transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERDATASETWRITERTRANSPORTDATATYPE];
+    transportSettings.content.decoded.data = &brokerTransportSettings;
+
+    dataSetWriterConfig.transportSettings = transportSettings;
     UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
                                &dataSetWriterConfig, &dataSetWriterIdent);
 }
 
 /**
- * That's it! You're now publishing the selected fields.
- * Open a packet inspection tool of trust e.g. wireshark and take a look on the outgoing packages.
- * The following graphic figures out the packages created by this tutorial.
+ * That's it! You're now publishing the selected fields. Open a packet
+ * inspection tool of trust e.g. wireshark and take a look on the outgoing
+ * packages. The following graphic figures out the packages created by this
+ * tutorial.
  *
  * .. figure:: ua-wireshark-pubsub.png
  *     :figwidth: 100 %
  *     :alt: OPC UA PubSub communication in wireshark
  *
- * The open62541 subscriber API will be released later. If you want to process the the datagrams,
- * take a look on the ua_network_pubsub_networkmessage.c which already contains the decoding code for UADP messages.
+ * The open62541 subscriber API will be released later. If you want to process
+ * the the datagrams, take a look on the ua_network_pubsub_networkmessage.c
+ * which already contains the decoding code for UADP messages.
  *
  * It follows the main server code, making use of the above definitions. */
 UA_Boolean running = true;
@@ -225,46 +263,6 @@ static void stopHandler(int sign) {
     running = false;
 }
 
-static void callback(UA_ByteString *encodedBuffer, UA_ByteString *topic){
-     UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSub MQTT: Message Received");
-     
-     /* For example try to decode as a Json Networkmessage...
-     UA_NetworkMessage dst;
-     UA_StatusCode ret = UA_NetworkMessage_decodeJson(&dst, encodedBuffer);
-     if( ret == UA_STATUSCODE_GOOD){
-
-     }
-      */
-     
-     UA_ByteString_delete(encodedBuffer);
-     UA_ByteString_delete(topic);
-     
-     //UA_NetworkMessage_deleteMembers(&dst);
-}
-
-/* Adds a subscription */
-static void 
-addSubscription(UA_Server *server, UA_PubSubConnection *connection, char *topic) {
-    /* transportSettings for subscription */
-    UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
-    memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
-    brokerTransportSettings.queueName = UA_STRING(topic);
-    brokerTransportSettings.resourceUri = UA_STRING_NULL;
-    brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
-    
-    /* QOS */
-    brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;
-
-    UA_ExtensionObject transportSettings;
-    memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
-    transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
-    transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE];
-    transportSettings.content.decoded.data = &brokerTransportSettings;
-
-    connection->channel->regist(connection->channel, &transportSettings, &callback);
-    return; 
-}
-
 static void usage(void) {
     printf("Usage: tutorial_pubsub_mqtt [--url <opc.mqtt://hostname:port>] "
            "[--topic <mqttTopic>] "
@@ -281,9 +279,10 @@ int main(int argc, char **argv) {
     signal(SIGINT, stopHandler);
     signal(SIGTERM, stopHandler);
 
-    char *addressUrl = "opc.mqtt://127.0.0.1:1883";
-    char *topic = "customTopic";
-    int interval = 500;
+    /* TODO: Change to secure mqtt port:8883 */
+    char *addressUrl = BROKER_ADDRESS_URL;
+    char *topic = PUBLISHER_TOPIC;
+    int interval = PUBLISH_INTERVAL;
 
     /* Parse arguments */
     for(int argpos = 1; argpos < argc; argpos++) {
@@ -322,8 +321,6 @@ int main(int argc, char **argv) {
                 usage();
                 return -1;
             }
-            argpos++;
-            topic = argv[argpos];
             if(sscanf(argv[argpos], "%d", &interval) != 1) {
                 usage();
                 return -1;
@@ -342,30 +339,32 @@ int main(int argc, char **argv) {
 
     /* Set up the server config */
     UA_Server *server = UA_Server_new();
-    UA_ServerConfig *config = UA_Server_getConfig(server); 
-	/* Details about the connection configuration and handling are located in the pubsub connection tutorial */
+    UA_ServerConfig *config = UA_Server_getConfig(server);
+    /* Details about the connection configuration and handling are located in
+     * the pubsub connection tutorial */
     UA_ServerConfig_setDefault(config);
-     config->pubsubTransportLayers = (UA_PubSubTransportLayer *) UA_malloc(1 * sizeof(UA_PubSubTransportLayer));
+     config->pubsubTransportLayers = (UA_PubSubTransportLayer *)
+         UA_malloc(1 * sizeof(UA_PubSubTransportLayer));
     if(!config->pubsubTransportLayers) {
         return -1;
     }
     config->pubsubTransportLayers[0] = UA_PubSubTransportLayerMQTT();
     config->pubsubTransportLayersSize++;
-    
-    addVariable(server);
+
     addPubSubConnection(server, addressUrl);
     addPublishedDataSet(server);
     addDataSetField(server);
-    addWriterGroup(server, interval);
-    addDataSetWriter(server);
+    addWriterGroup(server, topic, interval);
+    addDataSetWriter(server, topic);
     UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, connectionIdent);
+
     if(!connection) {
         UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                        "Could not create a PubSubConnection");
         UA_Server_delete(server);
         return -1;
     }
-    addSubscription(server, connection, topic);
+
     UA_Server_run(server, &running);
     UA_Server_delete(server);
     return 0;

+ 1 - 0
include/open62541/server_pubsub.h

@@ -401,6 +401,7 @@ typedef struct {
     UA_DataSetFieldContentMask dataSetFieldContentMask;
     UA_UInt32 keyFrameCount;
     UA_ExtensionObject messageSettings;
+    UA_ExtensionObject transportSettings;
     UA_String dataSetName;
     size_t dataSetWriterPropertiesSize;
     UA_KeyValuePair *dataSetWriterProperties;