/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright (c) 2018 Fraunhofer IOSB (Author: Lukas Meling) */ /** * This uses the mqtt/ua_mqtt_adapter.h to call mqtt functions. * Currently only ua_mqtt_adapter.c implements this * interface and maps them to the specific "MQTT-C" library functions. * Another mqtt lib could be used. * "ua_mqtt_pal.c" forwards the network calls (send/recv) to UA_Connection (TCP). */ #include "mqtt/ua_mqtt_adapter.h" #include "open62541/plugin/log_stdout.h" static UA_StatusCode UA_uaQos_toMqttQos(UA_BrokerTransportQualityOfService uaQos, UA_Byte *qos){ switch (uaQos){ case UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT: *qos = 0; break; case UA_BROKERTRANSPORTQUALITYOFSERVICE_ATLEASTONCE: *qos = 1; break; case UA_BROKERTRANSPORTQUALITYOFSERVICE_ATMOSTONCE: *qos = 2; break; default: break; } return UA_STATUSCODE_GOOD; } /** * Open mqtt connection based on the connectionConfig. * * * @return ref to created channel, NULL on error */ static UA_PubSubChannel * UA_PubSubChannelMQTT_open(const UA_PubSubConnectionConfig *connectionConfig) { UA_NetworkAddressUrlDataType address; memset(&address, 0, sizeof(UA_NetworkAddressUrlDataType)); if(UA_Variant_hasScalarType(&connectionConfig->address, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE])){ address = *(UA_NetworkAddressUrlDataType *)connectionConfig->address.data; } else { UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT Connection creation failed. Invalid Address."); return NULL; } /* allocate and init memory for the Mqtt specific internal data */ UA_PubSubChannelDataMQTT * channelDataMQTT = (UA_PubSubChannelDataMQTT *) UA_calloc(1, (sizeof(UA_PubSubChannelDataMQTT))); if(!channelDataMQTT){ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT Connection creation failed. Out of memory."); return NULL; } /* set default values */ UA_String mqttClientId = UA_STRING("open62541_pub"); memcpy(channelDataMQTT, &(UA_PubSubChannelDataMQTT){address, 2000,2000, NULL, NULL,&mqttClientId, NULL, NULL, NULL}, sizeof(UA_PubSubChannelDataMQTT)); /* iterate over the given KeyValuePair paramters */ UA_String sendBuffer = UA_STRING("sendBufferSize"), recvBuffer = UA_STRING("recvBufferSize"), clientId = UA_STRING("mqttClientId"); for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){ if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &sendBuffer)){ if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_UINT32])){ channelDataMQTT->mqttSendBufferSize = *(UA_UInt32 *) connectionConfig->connectionProperties[i].value.data; } } else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &recvBuffer)){ if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_UINT32])){ channelDataMQTT->mqttRecvBufferSize = *(UA_UInt32 *) connectionConfig->connectionProperties[i].value.data; } } else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &clientId)){ if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_STRING])){ channelDataMQTT->mqttClientId = (UA_String *) connectionConfig->connectionProperties[i].value.data; } } else { UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT Connection creation. Unknown connection parameter."); } } /* Create a new channel */ UA_PubSubChannel *newChannel = (UA_PubSubChannel *) UA_calloc(1, sizeof(UA_PubSubChannel)); if(!newChannel){ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT Connection creation failed. Out of memory."); UA_free(channelDataMQTT); return NULL; } /* Allocate memory for mqtt receive buffer */ if(channelDataMQTT->mqttRecvBufferSize > 0){ channelDataMQTT->mqttRecvBuffer = (uint8_t*)UA_calloc(channelDataMQTT->mqttRecvBufferSize, sizeof(uint8_t)); if(channelDataMQTT->mqttRecvBuffer == NULL){ UA_free(channelDataMQTT); UA_free(newChannel); return NULL; } } /* Allocate memory for mqtt send buffer */ if(channelDataMQTT->mqttSendBufferSize > 0){ channelDataMQTT->mqttSendBuffer = (uint8_t*)UA_calloc(channelDataMQTT->mqttSendBufferSize, sizeof(uint8_t)); if(channelDataMQTT->mqttSendBuffer == NULL){ if(channelDataMQTT->mqttRecvBufferSize > 0){ UA_free(channelDataMQTT->mqttRecvBuffer); } UA_free(channelDataMQTT); UA_free(newChannel); return NULL; } } /*link channel and internal channel data*/ newChannel->handle = channelDataMQTT; /* MQTT Client connect call. */ UA_StatusCode ret = connectMqtt(channelDataMQTT); if(ret != UA_STATUSCODE_GOOD){ /* try to disconnect tcp */ disconnectMqtt(channelDataMQTT); UA_free(channelDataMQTT->mqttSendBuffer); UA_free(channelDataMQTT->mqttRecvBuffer); UA_free(channelDataMQTT); UA_free(newChannel); return NULL; } newChannel->state = UA_PUBSUB_CHANNEL_RDY; UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT Connection established."); return newChannel; } /** * Subscribe to topic specified in brokerTransportSettings->queueName. * * @return UA_STATUSCODE_GOOD on success */ static UA_StatusCode UA_PubSubChannelMQTT_regist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings, void (*callback)(UA_ByteString *encodedBuffer, UA_ByteString *topic)) { if(channel->state != UA_PUBSUB_CHANNEL_RDY){ UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT regist failed. Channel closed."); return UA_STATUSCODE_BADCONNECTIONCLOSED; } UA_PubSubChannelDataMQTT *channelDataMQTT = (UA_PubSubChannelDataMQTT *) channel->handle; channelDataMQTT->callback = callback; if(transportSettings != NULL && transportSettings->encoding == UA_EXTENSIONOBJECT_DECODED && transportSettings->content.decoded.type->typeIndex == UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE){ UA_BrokerWriterGroupTransportDataType *brokerTransportSettings = (UA_BrokerWriterGroupTransportDataType*)transportSettings->content.decoded.data; UA_Byte qos = 0; UA_uaQos_toMqttQos(brokerTransportSettings->requestedDeliveryGuarantee, &qos); UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: register"); return subscribeMqtt(channelDataMQTT, brokerTransportSettings->queueName, qos); }else{ return UA_STATUSCODE_BADARGUMENTSMISSING; } } /** * Remove subscription specified in brokerTransportSettings->queueName. * * @return UA_STATUSCODE_GOOD on success */ static UA_StatusCode UA_PubSubChannelMQTT_unregist(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings) { if(channel->state != UA_PUBSUB_CHANNEL_RDY){ UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: unregister failed. Channel closed."); return UA_STATUSCODE_BADCONNECTIONCLOSED; } UA_PubSubChannelDataMQTT * channelDataMQTT = (UA_PubSubChannelDataMQTT *) channel->handle; UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: unregister"); if(transportSettings != NULL && transportSettings->encoding == UA_EXTENSIONOBJECT_DECODED && transportSettings->content.decoded.type->typeIndex == UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE){ UA_BrokerWriterGroupTransportDataType *brokerTransportSettings = (UA_BrokerWriterGroupTransportDataType*)transportSettings->content.decoded.data; UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: unregister"); return unSubscribeMqtt(channelDataMQTT, brokerTransportSettings->queueName); }else{ return UA_STATUSCODE_BADARGUMENTSMISSING; } } /** * Send a message. * * @return UA_STATUSCODE_GOOD if success */ static UA_StatusCode UA_PubSubChannelMQTT_send(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettings, const UA_ByteString *buf) { if(channel->state != UA_PUBSUB_CHANNEL_RDY){ UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: sending failed. Invalid state."); return UA_STATUSCODE_BADCONNECTIONCLOSED; } UA_PubSubChannelDataMQTT *channelDataMQTT = (UA_PubSubChannelDataMQTT *) channel->handle; UA_StatusCode ret = UA_STATUSCODE_GOOD; UA_Byte qos = 0; if(transportSettings != NULL && transportSettings->encoding == UA_EXTENSIONOBJECT_DECODED && transportSettings->content.decoded.type->typeIndex == UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE){ UA_BrokerWriterGroupTransportDataType *brokerTransportSettings = (UA_BrokerWriterGroupTransportDataType*)transportSettings->content.decoded.data; UA_uaQos_toMqttQos(brokerTransportSettings->requestedDeliveryGuarantee, &qos); UA_String topic; topic = brokerTransportSettings->queueName; ret = publishMqtt(channelDataMQTT, topic, buf, qos); if(ret){ channel->state = UA_PUBSUB_CHANNEL_ERROR; UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Publish failed"); }else{ UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Publish"); } }else{ UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Transport settings not found."); } return ret; } /** * Close channel and free the channel data. * * @return UA_STATUSCODE_GOOD if success */ static UA_StatusCode UA_PubSubChannelMQTT_close(UA_PubSubChannel *channel) { /* already closed */ if(channel->state == UA_PUBSUB_CHANNEL_CLOSED) return UA_STATUSCODE_GOOD; UA_PubSubChannelDataMQTT *channelDataMQTT = (UA_PubSubChannelDataMQTT *) channel->handle; UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Closing PubSubChannel."); disconnectMqtt(channelDataMQTT); UA_free(channelDataMQTT); UA_free(channel); return UA_STATUSCODE_GOOD; } /** * Calls the send and receive functions of the mqtt network stack. * * @return UA_STATUSCODE_GOOD if success */ static UA_StatusCode UA_PubSubChannelMQTT_yield(UA_PubSubChannel *channel, UA_UInt16 timeout){ UA_StatusCode ret = UA_STATUSCODE_BADINVALIDARGUMENT; if(channel == NULL){ return ret; } if(channel->state == UA_PUBSUB_CHANNEL_ERROR){ return UA_STATUSCODE_BADINTERNALERROR; } UA_PubSubChannelDataMQTT *channelDataMQTT = (UA_PubSubChannelDataMQTT *) channel->handle; ret = yieldMqtt(channelDataMQTT, timeout); if(ret != UA_STATUSCODE_GOOD){ channel->state = UA_PUBSUB_CHANNEL_ERROR; return ret; } return ret; } /** * Generate a new MQTT channel. Based on the given configuration. Uses yield and no recv call. * * @param connectionConfig connection configuration * @return ref to created channel, NULL on error */ static UA_PubSubChannel * TransportLayerMQTT_addChannel(UA_PubSubConnectionConfig *connectionConfig) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSub MQTT: channel requested."); UA_PubSubChannel * pubSubChannel = UA_PubSubChannelMQTT_open(connectionConfig); if(pubSubChannel){ pubSubChannel->regist = UA_PubSubChannelMQTT_regist; pubSubChannel->unregist = UA_PubSubChannelMQTT_unregist; pubSubChannel->send = UA_PubSubChannelMQTT_send; pubSubChannel->close = UA_PubSubChannelMQTT_close; pubSubChannel->yield = UA_PubSubChannelMQTT_yield; pubSubChannel->connectionConfig = connectionConfig; } return pubSubChannel; } //MQTT channel factory UA_PubSubTransportLayer UA_PubSubTransportLayerMQTT(){ UA_PubSubTransportLayer pubSubTransportLayer; pubSubTransportLayer.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt"); pubSubTransportLayer.createPubSubChannel = &TransportLayerMQTT_addChannel; return pubSubTransportLayer; } #undef _POSIX_C_SOURCE