/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright (c) 2018 Fraunhofer IOSB (Author: Lukas Meling) */ #include "ua_mqtt_adapter.h" #include "../../deps/mqtt-c/mqtt.h" #include "open62541/plugin/log_stdout.h" #include "open62541/util.h" /* forward decl for callback */ void publish_callback(void**, struct mqtt_response_publish*); UA_StatusCode connectMqtt(UA_PubSubChannelDataMQTT* channelData){ if(channelData == NULL){ return UA_STATUSCODE_BADINVALIDARGUMENT; } /* Get address and replace mqtt with tcp * because we use a tcp UA_ClientConnectionTCP for mqtt */ UA_NetworkAddressUrlDataType address = channelData->address; UA_String hostname, path; UA_UInt16 networkPort; if(UA_parseEndpointUrl(&address.url, &hostname, &networkPort, &path) != UA_STATUSCODE_GOOD){ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "MQTT PubSub Connection creation failed. Invalid URL."); return UA_STATUSCODE_BADINVALIDARGUMENT; } /* Build the url, replace mqtt with tcp */ UA_STACKARRAY(UA_Byte, addressAsChar, 10 + (sizeof(char) * path.length)); memcpy((char*)addressAsChar, "opc.tcp://", 10); memcpy((char*)&addressAsChar[10],(char*)path.data, path.length); address.url.data = addressAsChar; address.url.length = 10 + (sizeof(char) * path.length); /* check if buffers are correct */ if(!(channelData->mqttRecvBufferSize > 0 && channelData->mqttRecvBuffer != NULL && channelData->mqttSendBufferSize > 0 && channelData->mqttSendBuffer != NULL)){ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "MQTT PubSub Connection creation failed. No Mqtt buffer allocated."); return UA_STATUSCODE_BADARGUMENTSMISSING; } /* Config with default parameters */ UA_ConnectionConfig conf; memset(&conf, 0, sizeof(UA_ConnectionConfig)); conf.protocolVersion = 0; conf.sendBufferSize = 1000; conf.recvBufferSize = 2000; conf.maxMessageSize = 1000; conf.maxChunkCount = 1; /* Create TCP connection: open the blocking TCP socket (connecting to the broker) */ UA_Connection connection = UA_ClientConnectionTCP( conf, address.url, 1000, NULL); if(connection.state != UA_CONNECTION_ESTABLISHED && connection.state != UA_CONNECTION_OPENING){ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: Connection creation failed. Tcp connection failed!"); return UA_STATUSCODE_BADCOMMUNICATIONERROR; } /* Set socket to nonblocking!*/ UA_socket_set_nonblocking(connection.sockfd); /* save connection */ channelData->connection = (UA_Connection*)UA_calloc(1, sizeof(UA_Connection)); if(!channelData->connection){ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory."); return UA_STATUSCODE_BADOUTOFMEMORY; } memcpy(channelData->connection, &connection, sizeof(UA_Connection)); /* calloc mqtt_client */ struct mqtt_client* client = (struct mqtt_client*)UA_calloc(1, sizeof(struct mqtt_client)); if(!client){ UA_free(channelData->connection); UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory."); return UA_STATUSCODE_BADOUTOFMEMORY; } /* save reference */ channelData->mqttClient = client; /* create custom sockethandle */ struct my_custom_socket_handle* handle = (struct my_custom_socket_handle*)UA_calloc(1, sizeof(struct my_custom_socket_handle)); if(!handle){ UA_free(channelData->connection); UA_free(client); UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory."); return UA_STATUSCODE_BADOUTOFMEMORY; } handle->client = client; handle->connection = channelData->connection; /* init mqtt client struct with buffers and callback */ enum MQTTErrors mqttErr = mqtt_init(client, handle, channelData->mqttSendBuffer, channelData->mqttSendBufferSize, channelData->mqttRecvBuffer, channelData->mqttRecvBufferSize, publish_callback); if(mqttErr != MQTT_OK){ UA_free(channelData->connection); UA_free(client); const char* errorStr = mqtt_error_str(mqttErr); UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. MQTT error: %s", errorStr); return UA_STATUSCODE_BADCOMMUNICATIONERROR; } /* Init custom data for subscribe callback function: * A reference to the channeldata will be available in the callback. * This is used to call the user callback channelData.callback */ client->publish_response_callback_state = channelData; /* Convert clientId UA_String to char* null terminated */ char* clientId = (char*)calloc(1,channelData->mqttClientId->length + 1); if(!clientId){ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory."); UA_free(channelData->connection); UA_free(client); return UA_STATUSCODE_BADOUTOFMEMORY; } memcpy(clientId, channelData->mqttClientId->data, channelData->mqttClientId->length); /* Connect mqtt with socket fd of networktcp */ mqttErr = mqtt_connect(client, clientId, NULL, NULL, 0, NULL, NULL, 0, 400); UA_free(clientId); if(mqttErr != MQTT_OK){ UA_free(channelData->connection); UA_free(client); const char* errorStr = mqtt_error_str(mqttErr); UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "%s", errorStr); return UA_STATUSCODE_BADCOMMUNICATIONERROR; } /* sync the first mqtt packets in the buffer to send connection request. After that yield must be called frequently to exchange mqtt messages. */ UA_StatusCode ret = yieldMqtt(channelData, 100); if(ret != UA_STATUSCODE_GOOD){ UA_free(channelData->connection); UA_free(client); return ret; } return UA_STATUSCODE_GOOD; } UA_StatusCode disconnectMqtt(UA_PubSubChannelDataMQTT* channelData){ if(channelData == NULL){ return UA_STATUSCODE_BADINVALIDARGUMENT; } channelData->callback = NULL; struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient; if(client){ mqtt_disconnect(client); yieldMqtt(channelData, 10); UA_free(client->socketfd); } if(channelData->connection != NULL){ channelData->connection->close(channelData->connection); channelData->connection->free(channelData->connection); UA_free(channelData->connection); channelData->connection = NULL; } UA_free(channelData->mqttRecvBuffer); channelData->mqttRecvBuffer = NULL; UA_free(channelData->mqttSendBuffer); channelData->mqttSendBuffer = NULL; UA_free(channelData->mqttClient); channelData->mqttClient = NULL; return UA_STATUSCODE_GOOD; } void publish_callback(void** channelDataPtr, struct mqtt_response_publish *published) { if(channelDataPtr != NULL){ UA_PubSubChannelDataMQTT *channelData = (UA_PubSubChannelDataMQTT*)*channelDataPtr; if(channelData != NULL){ if(channelData->callback != NULL){ //Setup topic UA_ByteString *topic = UA_ByteString_new(); if(!topic) return; UA_ByteString *msg = UA_ByteString_new(); if(!msg) return; /* memory for topic */ UA_StatusCode ret = UA_ByteString_allocBuffer(topic, published->topic_name_size); if(ret){ UA_free(topic); UA_free(msg); return; } /* memory for message */ ret = UA_ByteString_allocBuffer(msg, published->application_message_size); if(ret){ UA_ByteString_delete(topic); UA_free(msg); return; } /* copy topic and msg, call the cb */ memcpy(topic->data, published->topic_name, published->topic_name_size); memcpy(msg->data, published->application_message, published->application_message_size); channelData->callback(msg, topic); } } } } UA_StatusCode subscribeMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic, UA_Byte qos){ if(channelData == NULL || topic.length == 0){ return UA_STATUSCODE_BADINVALIDARGUMENT; } struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient; UA_STACKARRAY(char, topicStr, sizeof(char) * topic.length +1); memcpy(topicStr, topic.data, topic.length); topicStr[topic.length] = '\0'; enum MQTTErrors mqttErr = mqtt_subscribe(client, topicStr, (UA_Byte) qos); if(mqttErr != MQTT_OK){ const char* errorStr = mqtt_error_str(mqttErr); UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: subscribe: %s", errorStr); return UA_STATUSCODE_BADCOMMUNICATIONERROR; } return UA_STATUSCODE_GOOD; } UA_StatusCode unSubscribeMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic){ return UA_STATUSCODE_BADNOTIMPLEMENTED; } UA_StatusCode yieldMqtt(UA_PubSubChannelDataMQTT* channelData, UA_UInt16 timeout){ if(channelData == NULL || timeout == 0){ return UA_STATUSCODE_BADINVALIDARGUMENT; } UA_Connection *connection = channelData->connection; if(connection == NULL){ return UA_STATUSCODE_BADCOMMUNICATIONERROR; } if(connection->state != UA_CONNECTION_ESTABLISHED && connection->state != UA_CONNECTION_OPENING){ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: yield: Tcp Connection not established!"); return UA_STATUSCODE_BADCOMMUNICATIONERROR; } struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient; client->socketfd->timeout = timeout; enum MQTTErrors error = mqtt_sync(client); if(error == MQTT_OK){ return UA_STATUSCODE_GOOD; }else if(error == -1){ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: yield: Communication Error."); return UA_STATUSCODE_BADCOMMUNICATIONERROR; } /* map mqtt errors to ua errors */ const char* errorStr = mqtt_error_str(error); UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: yield: error: %s", errorStr); switch(error){ case MQTT_ERROR_CONNECTION_CLOSED: return UA_STATUSCODE_BADNOTCONNECTED; case MQTT_ERROR_SOCKET_ERROR: return UA_STATUSCODE_BADCOMMUNICATIONERROR; case MQTT_ERROR_CONNECTION_REFUSED: return UA_STATUSCODE_BADCONNECTIONREJECTED; default: return UA_STATUSCODE_BADCOMMUNICATIONERROR; } } UA_StatusCode publishMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic, const UA_ByteString *buf, UA_Byte qos){ if(channelData == NULL || buf == NULL ){ return UA_STATUSCODE_BADINVALIDARGUMENT; } UA_STACKARRAY(char, topicChar, sizeof(char) * topic.length +1); memcpy(topicChar, topic.data, topic.length); topicChar[topic.length] = '\0'; struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient; if(client == NULL) return UA_STATUSCODE_BADNOTCONNECTED; /* publish */ enum MQTTPublishFlags flags; if(qos == 0){ flags = MQTT_PUBLISH_QOS_0; }else if( qos == 1){ flags = MQTT_PUBLISH_QOS_1; }else if( qos == 2){ flags = MQTT_PUBLISH_QOS_2; }else{ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: publish: Bad Qos Level."); return UA_STATUSCODE_BADINVALIDARGUMENT; } mqtt_publish(client, topicChar, buf->data, buf->length, (uint8_t)flags); if (client->error != MQTT_OK) { if(client->error == MQTT_ERROR_SEND_BUFFER_IS_FULL){ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: publish: Send buffer is full. " "Possible reasons: send buffer is to small, " "sending to fast, broker not responding."); }else{ UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: publish: %s", mqtt_error_str(client->error)); } return UA_STATUSCODE_BADCONNECTIONREJECTED; } return UA_STATUSCODE_GOOD; }