|
- #include "ua_mqtt_adapter.h"
- #include "../../deps/mqtt-c/mqtt.h"
- #include "open62541/plugin/log_stdout.h"
- #include "open62541/util.h"
- void
- publish_callback(void**, struct mqtt_response_publish*);
- UA_StatusCode
- connectMqtt(UA_PubSubChannelDataMQTT* channelData){
- if(channelData == NULL){
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
-
- UA_NetworkAddressUrlDataType address = channelData->address;
- UA_String hostname, path;
- UA_UInt16 networkPort;
- if(UA_parseEndpointUrl(&address.url, &hostname, &networkPort, &path) != UA_STATUSCODE_GOOD){
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
- "MQTT PubSub Connection creation failed. Invalid URL.");
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
-
- UA_STACKARRAY(UA_Byte, addressAsChar, 10 + (sizeof(char) * path.length));
- memcpy((char*)addressAsChar, "opc.tcp://", 10);
- memcpy((char*)&addressAsChar[10],(char*)path.data, path.length);
- address.url.data = addressAsChar;
- address.url.length = 10 + (sizeof(char) * path.length);
-
- if(!(channelData->mqttRecvBufferSize > 0 && channelData->mqttRecvBuffer != NULL
- && channelData->mqttSendBufferSize > 0 && channelData->mqttSendBuffer != NULL)){
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "MQTT PubSub Connection creation failed. No Mqtt buffer allocated.");
- return UA_STATUSCODE_BADARGUMENTSMISSING;
- }
-
- UA_ConnectionConfig conf;
- memset(&conf, 0, sizeof(UA_ConnectionConfig));
- conf.protocolVersion = 0;
- conf.sendBufferSize = 1000;
- conf.recvBufferSize = 2000;
- conf.maxMessageSize = 1000;
- conf.maxChunkCount = 1;
-
- UA_Connection connection = UA_ClientConnectionTCP( conf, address.url, 1000, NULL);
- if(connection.state != UA_CONNECTION_ESTABLISHED && connection.state != UA_CONNECTION_OPENING){
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: Connection creation failed. Tcp connection failed!");
- return UA_STATUSCODE_BADCOMMUNICATIONERROR;
- }
-
- UA_socket_set_nonblocking(connection.sockfd);
-
- channelData->connection = (UA_Connection*)UA_calloc(1, sizeof(UA_Connection));
- if(!channelData->connection){
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
- return UA_STATUSCODE_BADOUTOFMEMORY;
- }
- memcpy(channelData->connection, &connection, sizeof(UA_Connection));
-
- struct mqtt_client* client = (struct mqtt_client*)UA_calloc(1, sizeof(struct mqtt_client));
- if(!client){
- UA_free(channelData->connection);
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
- return UA_STATUSCODE_BADOUTOFMEMORY;
- }
-
- channelData->mqttClient = client;
-
- struct my_custom_socket_handle* handle =
- (struct my_custom_socket_handle*)UA_calloc(1, sizeof(struct my_custom_socket_handle));
- if(!handle){
- UA_free(channelData->connection);
- UA_free(client);
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
- return UA_STATUSCODE_BADOUTOFMEMORY;
- }
- handle->client = client;
- handle->connection = channelData->connection;
-
- enum MQTTErrors mqttErr = mqtt_init(client, handle, channelData->mqttSendBuffer, channelData->mqttSendBufferSize,
- channelData->mqttRecvBuffer, channelData->mqttRecvBufferSize, publish_callback);
- if(mqttErr != MQTT_OK){
- UA_free(channelData->connection);
- UA_free(client);
- const char* errorStr = mqtt_error_str(mqttErr);
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. MQTT error: %s", errorStr);
- return UA_STATUSCODE_BADCOMMUNICATIONERROR;
- }
-
- client->publish_response_callback_state = channelData;
-
- char* clientId = (char*)calloc(1,channelData->mqttClientId->length + 1);
- if(!clientId){
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
- UA_free(channelData->connection);
- UA_free(client);
- return UA_STATUSCODE_BADOUTOFMEMORY;
- }
- memcpy(clientId, channelData->mqttClientId->data, channelData->mqttClientId->length);
-
- mqttErr = mqtt_connect(client, clientId, NULL, NULL, 0, NULL, NULL, 0, 400);
- UA_free(clientId);
- if(mqttErr != MQTT_OK){
- UA_free(channelData->connection);
- UA_free(client);
- const char* errorStr = mqtt_error_str(mqttErr);
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "%s", errorStr);
- return UA_STATUSCODE_BADCOMMUNICATIONERROR;
- }
-
- UA_StatusCode ret = yieldMqtt(channelData, 100);
- if(ret != UA_STATUSCODE_GOOD){
- UA_free(channelData->connection);
- UA_free(client);
- return ret;
- }
- return UA_STATUSCODE_GOOD;
- }
- UA_StatusCode
- disconnectMqtt(UA_PubSubChannelDataMQTT* channelData){
- if(channelData == NULL){
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- channelData->callback = NULL;
- struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
- if(client){
- mqtt_disconnect(client);
- yieldMqtt(channelData, 10);
- UA_free(client->socketfd);
- }
- if(channelData->connection != NULL){
- channelData->connection->close(channelData->connection);
- channelData->connection->free(channelData->connection);
- UA_free(channelData->connection);
- channelData->connection = NULL;
- }
- UA_free(channelData->mqttRecvBuffer);
- channelData->mqttRecvBuffer = NULL;
- UA_free(channelData->mqttSendBuffer);
- channelData->mqttSendBuffer = NULL;
- UA_free(channelData->mqttClient);
- channelData->mqttClient = NULL;
- return UA_STATUSCODE_GOOD;
- }
- void
- publish_callback(void** channelDataPtr, struct mqtt_response_publish *published)
- {
- if(channelDataPtr != NULL){
- UA_PubSubChannelDataMQTT *channelData = (UA_PubSubChannelDataMQTT*)*channelDataPtr;
- if(channelData != NULL){
- if(channelData->callback != NULL){
-
- UA_ByteString *topic = UA_ByteString_new();
- if(!topic) return;
- UA_ByteString *msg = UA_ByteString_new();
- if(!msg) return;
-
-
- UA_StatusCode ret = UA_ByteString_allocBuffer(topic, published->topic_name_size);
- if(ret){
- UA_free(topic);
- UA_free(msg);
- return;
- }
-
- ret = UA_ByteString_allocBuffer(msg, published->application_message_size);
- if(ret){
- UA_ByteString_delete(topic);
- UA_free(msg);
- return;
- }
-
- memcpy(topic->data, published->topic_name, published->topic_name_size);
- memcpy(msg->data, published->application_message, published->application_message_size);
- channelData->callback(msg, topic);
- }
- }
- }
- }
- UA_StatusCode
- subscribeMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic, UA_Byte qos){
- if(channelData == NULL || topic.length == 0){
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
-
- UA_STACKARRAY(char, topicStr, sizeof(char) * topic.length +1);
- memcpy(topicStr, topic.data, topic.length);
- topicStr[topic.length] = '\0';
- enum MQTTErrors mqttErr = mqtt_subscribe(client, topicStr, (UA_Byte) qos);
- if(mqttErr != MQTT_OK){
- const char* errorStr = mqtt_error_str(mqttErr);
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: subscribe: %s", errorStr);
- return UA_STATUSCODE_BADCOMMUNICATIONERROR;
- }
- return UA_STATUSCODE_GOOD;
- }
- UA_StatusCode
- unSubscribeMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic){
- return UA_STATUSCODE_BADNOTIMPLEMENTED;
- }
- UA_StatusCode
- yieldMqtt(UA_PubSubChannelDataMQTT* channelData, UA_UInt16 timeout){
- if(channelData == NULL || timeout == 0){
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- UA_Connection *connection = channelData->connection;
- if(connection == NULL){
- return UA_STATUSCODE_BADCOMMUNICATIONERROR;
- }
-
- if(connection->state != UA_CONNECTION_ESTABLISHED && connection->state != UA_CONNECTION_OPENING){
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: yield: Tcp Connection not established!");
- return UA_STATUSCODE_BADCOMMUNICATIONERROR;
- }
-
- struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
- client->socketfd->timeout = timeout;
- enum MQTTErrors error = mqtt_sync(client);
- if(error == MQTT_OK){
- return UA_STATUSCODE_GOOD;
- }else if(error == -1){
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: yield: Communication Error.");
- return UA_STATUSCODE_BADCOMMUNICATIONERROR;
- }
-
-
- const char* errorStr = mqtt_error_str(error);
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: yield: error: %s", errorStr);
-
- switch(error){
- case MQTT_ERROR_CONNECTION_CLOSED:
- return UA_STATUSCODE_BADNOTCONNECTED;
- case MQTT_ERROR_SOCKET_ERROR:
- return UA_STATUSCODE_BADCOMMUNICATIONERROR;
- case MQTT_ERROR_CONNECTION_REFUSED:
- return UA_STATUSCODE_BADCONNECTIONREJECTED;
-
- default:
- return UA_STATUSCODE_BADCOMMUNICATIONERROR;
- }
- }
- UA_StatusCode
- publishMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic, const UA_ByteString *buf, UA_Byte qos){
- if(channelData == NULL || buf == NULL ){
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- UA_STACKARRAY(char, topicChar, sizeof(char) * topic.length +1);
- memcpy(topicChar, topic.data, topic.length);
- topicChar[topic.length] = '\0';
-
- struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
- if(client == NULL)
- return UA_STATUSCODE_BADNOTCONNECTED;
-
- enum MQTTPublishFlags flags;
- if(qos == 0){
- flags = MQTT_PUBLISH_QOS_0;
- }else if( qos == 1){
- flags = MQTT_PUBLISH_QOS_1;
- }else if( qos == 2){
- flags = MQTT_PUBLISH_QOS_2;
- }else{
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: publish: Bad Qos Level.");
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- mqtt_publish(client, topicChar, buf->data, buf->length, (uint8_t)flags);
- if (client->error != MQTT_OK) {
- if(client->error == MQTT_ERROR_SEND_BUFFER_IS_FULL){
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: publish: Send buffer is full. "
- "Possible reasons: send buffer is to small, "
- "sending to fast, broker not responding.");
- }else{
- UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: publish: %s", mqtt_error_str(client->error));
- }
- return UA_STATUSCODE_BADCONNECTIONREJECTED;
- }
- return UA_STATUSCODE_GOOD;
- }
|