|
@@ -24,26 +24,17 @@
|
|
|
* connection can be found in ``tutorial_pubsub_connection.c``.
|
|
|
*/
|
|
|
|
|
|
-/* server */
|
|
|
#include "open62541/server.h"
|
|
|
#include "open62541/server_config_default.h"
|
|
|
-
|
|
|
-/* PubSubConnection */
|
|
|
#include "ua_pubsub.h"
|
|
|
-
|
|
|
-/* UA_PubSubTransportLayerMQTT */
|
|
|
#include "ua_network_pubsub_mqtt.h"
|
|
|
-/* Logging */
|
|
|
#include "open62541/plugin/log_stdout.h"
|
|
|
|
|
|
-/* uncomment to use json encoding */
|
|
|
-//#define USE_JSON_ENCODING
|
|
|
-
|
|
|
-/* global ids */
|
|
|
+static UA_Boolean useJson = false;
|
|
|
static UA_NodeId connectionIdent, publishedDataSetIdent, writerGroupIdent;
|
|
|
|
|
|
static void
|
|
|
-addPubSubConnection(UA_Server *server){
|
|
|
+addPubSubConnection(UA_Server *server, char *addressUrl) {
|
|
|
/* Details about the connection configuration and handling are located
|
|
|
* in the pubsub connection tutorial */
|
|
|
UA_PubSubConnectionConfig connectionConfig;
|
|
@@ -53,7 +44,7 @@ addPubSubConnection(UA_Server *server){
|
|
|
connectionConfig.enabled = UA_TRUE;
|
|
|
|
|
|
/* configure address of the mqtt broker (local on default port) */
|
|
|
- UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING("opc.mqtt://127.0.0.1:1883/")};
|
|
|
+ UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING(addressUrl)};
|
|
|
UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
|
|
|
connectionConfig.publisherId.numeric = UA_UInt32_random();
|
|
|
|
|
@@ -137,21 +128,22 @@ addDataSetField(UA_Server *server) {
|
|
|
* parameters for the message creation.
|
|
|
*/
|
|
|
static void
|
|
|
-addWriterGroup(UA_Server *server) {
|
|
|
+addWriterGroup(UA_Server *server, int interval) {
|
|
|
/* Now we create a new WriterGroupConfig and add the group to the existing PubSubConnection. */
|
|
|
UA_WriterGroupConfig writerGroupConfig;
|
|
|
memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
|
|
|
writerGroupConfig.name = UA_STRING("Demo WriterGroup");
|
|
|
- writerGroupConfig.publishingInterval = 500;
|
|
|
+ writerGroupConfig.publishingInterval = interval;
|
|
|
writerGroupConfig.enabled = UA_FALSE;
|
|
|
writerGroupConfig.writerGroupId = 100;
|
|
|
|
|
|
/* decide whether to use JSON or UADP encoding*/
|
|
|
-#if defined(USE_JSON_ENCODING) && defined(UA_ENABLE_JSON_ENCODING)
|
|
|
+#ifdef UA_ENABLE_JSON_ENCODING
|
|
|
+ if(useJson)
|
|
|
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;
|
|
|
-#else
|
|
|
- writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
|
|
|
+ else
|
|
|
#endif
|
|
|
+ writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
|
|
|
|
|
|
/* configure the mqtt publish topic */
|
|
|
UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
|
|
@@ -190,23 +182,24 @@ addDataSetWriter(UA_Server *server) {
|
|
|
dataSetWriterConfig.dataSetWriterId = 62541;
|
|
|
dataSetWriterConfig.keyFrameCount = 10;
|
|
|
|
|
|
-#if defined(USE_JSON_ENCODING) && defined(UA_ENABLE_JSON_ENCODING)
|
|
|
+#ifdef UA_ENABLE_JSON_ENCODING
|
|
|
+ UA_JsonDataSetWriterMessageDataType jsonDswMd;
|
|
|
+ UA_ExtensionObject messageSettings;
|
|
|
+ if(useJson) {
|
|
|
/* JSON config for the dataSetWriter */
|
|
|
- UA_JsonDataSetWriterMessageDataType jsonDswMd;
|
|
|
jsonDswMd.dataSetMessageContentMask = (UA_JsonDataSetMessageContentMask)
|
|
|
- (UA_JSONDATASETMESSAGECONTENTMASK_DATASETWRITERID
|
|
|
- | UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER
|
|
|
- | UA_JSONDATASETMESSAGECONTENTMASK_STATUS
|
|
|
- | UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION
|
|
|
- | UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP);
|
|
|
+ (UA_JSONDATASETMESSAGECONTENTMASK_DATASETWRITERID |
|
|
|
+ UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER |
|
|
|
+ UA_JSONDATASETMESSAGECONTENTMASK_STATUS |
|
|
|
+ UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION |
|
|
|
+ UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP);
|
|
|
|
|
|
- UA_ExtensionObject messageSettings;
|
|
|
messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
|
|
|
messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE];
|
|
|
messageSettings.content.decoded.data = &jsonDswMd;
|
|
|
|
|
|
-
|
|
|
dataSetWriterConfig.messageSettings = messageSettings;
|
|
|
+ }
|
|
|
#endif
|
|
|
|
|
|
UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
|
|
@@ -251,12 +244,11 @@ static void callback(UA_ByteString *encodedBuffer, UA_ByteString *topic){
|
|
|
|
|
|
/* Adds a subscription */
|
|
|
static void
|
|
|
-addSubscription(UA_Server *server, UA_PubSubConnection *connection){
|
|
|
-
|
|
|
- /* transportSettings for subscription! */
|
|
|
+addSubscription(UA_Server *server, UA_PubSubConnection *connection, char *topic) {
|
|
|
+ /* transportSettings for subscription */
|
|
|
UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
|
|
|
memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
|
|
|
- brokerTransportSettings.queueName = UA_STRING("customTopic");
|
|
|
+ brokerTransportSettings.queueName = UA_STRING(topic);
|
|
|
brokerTransportSettings.resourceUri = UA_STRING_NULL;
|
|
|
brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
|
|
|
|
|
@@ -273,38 +265,109 @@ addSubscription(UA_Server *server, UA_PubSubConnection *connection){
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
-int main(void) {
|
|
|
+static void usage(void) {
|
|
|
+ printf("Usage: tutorial_pubsub_mqtt [--url <opc.mqtt://hostname:port>] "
|
|
|
+ "[--topic <mqttTopic>] "
|
|
|
+ "[--freq <frequency in ms> "
|
|
|
+ "[--json]\n"
|
|
|
+ " Defaults are:\n"
|
|
|
+ " - Url: opc.mqtt://127.0.0.1:1883\n"
|
|
|
+ " - Topic: customTopic\n"
|
|
|
+ " - Frequency: 500\n"
|
|
|
+ " - JSON: Off\n");
|
|
|
+}
|
|
|
+
|
|
|
+int main(int argc, char **argv) {
|
|
|
signal(SIGINT, stopHandler);
|
|
|
signal(SIGTERM, stopHandler);
|
|
|
|
|
|
- UA_StatusCode retval = UA_STATUSCODE_GOOD;
|
|
|
- UA_Server *server = UA_Server_new();
|
|
|
- UA_ServerConfig *config = UA_Server_getConfig(server);
|
|
|
- UA_ServerConfig_setDefault(config);
|
|
|
- /* Details about the connection configuration and handling are located in the pubsub connection tutorial */
|
|
|
- config->pubsubTransportLayers = (UA_PubSubTransportLayer *) UA_malloc(1 * sizeof(UA_PubSubTransportLayer));
|
|
|
+ char *addressUrl = "opc.mqtt://127.0.0.1:1883";
|
|
|
+ char *topic = "customTopic";
|
|
|
+ int interval = 500;
|
|
|
+
|
|
|
+ /* Parse arguments */
|
|
|
+ for(int argpos = 1; argpos < argc; argpos++) {
|
|
|
+ if(strcmp(argv[argpos], "--help") == 0) {
|
|
|
+ usage();
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ if(strcmp(argv[argpos], "--json") == 0) {
|
|
|
+ useJson = true;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if(strcmp(argv[argpos], "--url") == 0) {
|
|
|
+ if(argpos + 1 == argc) {
|
|
|
+ usage();
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ argpos++;
|
|
|
+ addressUrl = argv[argpos];
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if(strcmp(argv[argpos], "--topic") == 0) {
|
|
|
+ if(argpos + 1 == argc) {
|
|
|
+ usage();
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ argpos++;
|
|
|
+ topic = argv[argpos];
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if(strcmp(argv[argpos], "--freq") == 0) {
|
|
|
+ if(argpos + 1 == argc) {
|
|
|
+ usage();
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ argpos++;
|
|
|
+ topic = argv[argpos];
|
|
|
+ if(sscanf(argv[argpos], "%d", &interval) != 1) {
|
|
|
+ usage();
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ if(interval <= 10) {
|
|
|
+ UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
|
|
|
+ "Publication interval too small");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ usage();
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Set up the server config */
|
|
|
+ UA_Server *server = UA_Server_new();
|
|
|
+ UA_ServerConfig *config = UA_Server_getConfig(server);
|
|
|
+ /* Details about the connection configuration and handling are located in the pubsub connection tutorial */
|
|
|
+ UA_ServerConfig_setDefault(config);
|
|
|
+ config->pubsubTransportLayers = (UA_PubSubTransportLayer *) UA_malloc(1 * sizeof(UA_PubSubTransportLayer));
|
|
|
if(!config->pubsubTransportLayers) {
|
|
|
return -1;
|
|
|
}
|
|
|
-
|
|
|
config->pubsubTransportLayers[0] = UA_PubSubTransportLayerMQTT();
|
|
|
config->pubsubTransportLayersSize++;
|
|
|
+
|
|
|
addVariable(server);
|
|
|
- addPubSubConnection(server);
|
|
|
+ addPubSubConnection(server, addressUrl);
|
|
|
addPublishedDataSet(server);
|
|
|
addDataSetField(server);
|
|
|
- addWriterGroup(server);
|
|
|
+ addWriterGroup(server, interval);
|
|
|
addDataSetWriter(server);
|
|
|
-
|
|
|
- UA_PubSubConnection *connection =
|
|
|
- UA_PubSubConnection_findConnectionbyId(server, connectionIdent);
|
|
|
-
|
|
|
- if(connection != NULL) {
|
|
|
- addSubscription(server, connection);
|
|
|
+ UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, connectionIdent);
|
|
|
+ if(!connection) {
|
|
|
+ UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
|
|
|
+ "Could not create a PubSubConnection");
|
|
|
+ UA_Server_delete(server);
|
|
|
+ return -1;
|
|
|
}
|
|
|
-
|
|
|
- retval |= UA_Server_run(server, &running);
|
|
|
+ addSubscription(server, connection, topic);
|
|
|
+ UA_Server_run(server, &running);
|
|
|
UA_Server_delete(server);
|
|
|
- return (int)retval;
|
|
|
+ return 0;
|
|
|
}
|
|
|
|