ua_mqtt_adapter.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2018 Fraunhofer IOSB (Author: Lukas Meling)
  6. */
  7. #include "ua_mqtt_adapter.h"
  8. #include "../../deps/mqtt-c/mqtt.h"
  9. #include "open62541/plugin/log_stdout.h"
  10. #include "open62541/util.h"
  11. /* forward decl for callback */
  12. void
  13. publish_callback(void**, struct mqtt_response_publish*);
  14. UA_StatusCode
  15. connectMqtt(UA_PubSubChannelDataMQTT* channelData){
  16. if(channelData == NULL){
  17. return UA_STATUSCODE_BADINVALIDARGUMENT;
  18. }
  19. /* Get address and replace mqtt with tcp
  20. * because we use a tcp UA_ClientConnectionTCP for mqtt */
  21. UA_NetworkAddressUrlDataType address = channelData->address;
  22. UA_String hostname, path;
  23. UA_UInt16 networkPort;
  24. if(UA_parseEndpointUrl(&address.url, &hostname, &networkPort, &path) != UA_STATUSCODE_GOOD){
  25. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
  26. "MQTT PubSub Connection creation failed. Invalid URL.");
  27. return UA_STATUSCODE_BADINVALIDARGUMENT;
  28. }
  29. /* Build the url, replace mqtt with tcp */
  30. UA_STACKARRAY(UA_Byte, addressAsChar, 10 + (sizeof(char) * path.length));
  31. memcpy((char*)addressAsChar, "opc.tcp://", 10);
  32. memcpy((char*)&addressAsChar[10],(char*)path.data, path.length);
  33. address.url.data = addressAsChar;
  34. address.url.length = 10 + (sizeof(char) * path.length);
  35. /* check if buffers are correct */
  36. if(!(channelData->mqttRecvBufferSize > 0 && channelData->mqttRecvBuffer != NULL
  37. && channelData->mqttSendBufferSize > 0 && channelData->mqttSendBuffer != NULL)){
  38. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "MQTT PubSub Connection creation failed. No Mqtt buffer allocated.");
  39. return UA_STATUSCODE_BADARGUMENTSMISSING;
  40. }
  41. /* Config with default parameters */
  42. UA_ConnectionConfig conf;
  43. memset(&conf, 0, sizeof(UA_ConnectionConfig));
  44. conf.protocolVersion = 0;
  45. conf.sendBufferSize = 1000;
  46. conf.recvBufferSize = 2000;
  47. conf.maxMessageSize = 1000;
  48. conf.maxChunkCount = 1;
  49. /* Create TCP connection: open the blocking TCP socket (connecting to the broker) */
  50. UA_Connection connection = UA_ClientConnectionTCP( conf, address.url, 1000, NULL);
  51. if(connection.state != UA_CONNECTION_ESTABLISHED && connection.state != UA_CONNECTION_OPENING){
  52. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: Connection creation failed. Tcp connection failed!");
  53. return UA_STATUSCODE_BADCOMMUNICATIONERROR;
  54. }
  55. /* Set socket to nonblocking!*/
  56. UA_socket_set_nonblocking(connection.sockfd);
  57. /* save connection */
  58. channelData->connection = (UA_Connection*)UA_calloc(1, sizeof(UA_Connection));
  59. if(!channelData->connection){
  60. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
  61. return UA_STATUSCODE_BADOUTOFMEMORY;
  62. }
  63. memcpy(channelData->connection, &connection, sizeof(UA_Connection));
  64. /* calloc mqtt_client */
  65. struct mqtt_client* client = (struct mqtt_client*)UA_calloc(1, sizeof(struct mqtt_client));
  66. if(!client){
  67. UA_free(channelData->connection);
  68. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
  69. return UA_STATUSCODE_BADOUTOFMEMORY;
  70. }
  71. /* save reference */
  72. channelData->mqttClient = client;
  73. /* create custom sockethandle */
  74. struct my_custom_socket_handle* handle =
  75. (struct my_custom_socket_handle*)UA_calloc(1, sizeof(struct my_custom_socket_handle));
  76. if(!handle){
  77. UA_free(channelData->connection);
  78. UA_free(client);
  79. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
  80. return UA_STATUSCODE_BADOUTOFMEMORY;
  81. }
  82. handle->client = client;
  83. handle->connection = channelData->connection;
  84. /* init mqtt client struct with buffers and callback */
  85. enum MQTTErrors mqttErr = mqtt_init(client, handle, channelData->mqttSendBuffer, channelData->mqttSendBufferSize,
  86. channelData->mqttRecvBuffer, channelData->mqttRecvBufferSize, publish_callback);
  87. if(mqttErr != MQTT_OK){
  88. UA_free(channelData->connection);
  89. UA_free(client);
  90. const char* errorStr = mqtt_error_str(mqttErr);
  91. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. MQTT error: %s", errorStr);
  92. return UA_STATUSCODE_BADCOMMUNICATIONERROR;
  93. }
  94. /* Init custom data for subscribe callback function:
  95. * A reference to the channeldata will be available in the callback.
  96. * This is used to call the user callback channelData.callback */
  97. client->publish_response_callback_state = channelData;
  98. /* Convert clientId UA_String to char* null terminated */
  99. char* clientId = (char*)calloc(1,channelData->mqttClientId->length + 1);
  100. if(!clientId){
  101. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
  102. UA_free(channelData->connection);
  103. UA_free(client);
  104. return UA_STATUSCODE_BADOUTOFMEMORY;
  105. }
  106. memcpy(clientId, channelData->mqttClientId->data, channelData->mqttClientId->length);
  107. /* Connect mqtt with socket fd of networktcp */
  108. mqttErr = mqtt_connect(client, clientId, NULL, NULL, 0, NULL, NULL, 0, 400);
  109. UA_free(clientId);
  110. if(mqttErr != MQTT_OK){
  111. UA_free(channelData->connection);
  112. UA_free(client);
  113. const char* errorStr = mqtt_error_str(mqttErr);
  114. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "%s", errorStr);
  115. return UA_STATUSCODE_BADCOMMUNICATIONERROR;
  116. }
  117. /* sync the first mqtt packets in the buffer to send connection request.
  118. After that yield must be called frequently to exchange mqtt messages. */
  119. UA_StatusCode ret = yieldMqtt(channelData, 100);
  120. if(ret != UA_STATUSCODE_GOOD){
  121. UA_free(channelData->connection);
  122. UA_free(client);
  123. return ret;
  124. }
  125. return UA_STATUSCODE_GOOD;
  126. }
  127. UA_StatusCode
  128. disconnectMqtt(UA_PubSubChannelDataMQTT* channelData){
  129. if(channelData == NULL){
  130. return UA_STATUSCODE_BADINVALIDARGUMENT;
  131. }
  132. channelData->callback = NULL;
  133. struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
  134. if(client){
  135. mqtt_disconnect(client);
  136. yieldMqtt(channelData, 10);
  137. UA_free(client->socketfd);
  138. }
  139. if(channelData->connection != NULL){
  140. channelData->connection->close(channelData->connection);
  141. channelData->connection->free(channelData->connection);
  142. UA_free(channelData->connection);
  143. channelData->connection = NULL;
  144. }
  145. UA_free(channelData->mqttRecvBuffer);
  146. channelData->mqttRecvBuffer = NULL;
  147. UA_free(channelData->mqttSendBuffer);
  148. channelData->mqttSendBuffer = NULL;
  149. UA_free(channelData->mqttClient);
  150. channelData->mqttClient = NULL;
  151. return UA_STATUSCODE_GOOD;
  152. }
  153. void
  154. publish_callback(void** channelDataPtr, struct mqtt_response_publish *published)
  155. {
  156. if(channelDataPtr != NULL){
  157. UA_PubSubChannelDataMQTT *channelData = (UA_PubSubChannelDataMQTT*)*channelDataPtr;
  158. if(channelData != NULL){
  159. if(channelData->callback != NULL){
  160. //Setup topic
  161. UA_ByteString *topic = UA_ByteString_new();
  162. if(!topic) return;
  163. UA_ByteString *msg = UA_ByteString_new();
  164. if(!msg) return;
  165. /* memory for topic */
  166. UA_StatusCode ret = UA_ByteString_allocBuffer(topic, published->topic_name_size);
  167. if(ret){
  168. UA_free(topic);
  169. UA_free(msg);
  170. return;
  171. }
  172. /* memory for message */
  173. ret = UA_ByteString_allocBuffer(msg, published->application_message_size);
  174. if(ret){
  175. UA_ByteString_delete(topic);
  176. UA_free(msg);
  177. return;
  178. }
  179. /* copy topic and msg, call the cb */
  180. memcpy(topic->data, published->topic_name, published->topic_name_size);
  181. memcpy(msg->data, published->application_message, published->application_message_size);
  182. channelData->callback(msg, topic);
  183. }
  184. }
  185. }
  186. }
  187. UA_StatusCode
  188. subscribeMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic, UA_Byte qos){
  189. if(channelData == NULL || topic.length == 0){
  190. return UA_STATUSCODE_BADINVALIDARGUMENT;
  191. }
  192. struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
  193. UA_STACKARRAY(char, topicStr, sizeof(char) * topic.length +1);
  194. memcpy(topicStr, topic.data, topic.length);
  195. topicStr[topic.length] = '\0';
  196. enum MQTTErrors mqttErr = mqtt_subscribe(client, topicStr, (UA_Byte) qos);
  197. if(mqttErr != MQTT_OK){
  198. const char* errorStr = mqtt_error_str(mqttErr);
  199. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: subscribe: %s", errorStr);
  200. return UA_STATUSCODE_BADCOMMUNICATIONERROR;
  201. }
  202. return UA_STATUSCODE_GOOD;
  203. }
  204. UA_StatusCode
  205. unSubscribeMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic){
  206. return UA_STATUSCODE_BADNOTIMPLEMENTED;
  207. }
  208. UA_StatusCode
  209. yieldMqtt(UA_PubSubChannelDataMQTT* channelData, UA_UInt16 timeout){
  210. if(channelData == NULL || timeout == 0){
  211. return UA_STATUSCODE_BADINVALIDARGUMENT;
  212. }
  213. UA_Connection *connection = channelData->connection;
  214. if(connection == NULL){
  215. return UA_STATUSCODE_BADCOMMUNICATIONERROR;
  216. }
  217. if(connection->state != UA_CONNECTION_ESTABLISHED && connection->state != UA_CONNECTION_OPENING){
  218. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: yield: Tcp Connection not established!");
  219. return UA_STATUSCODE_BADCOMMUNICATIONERROR;
  220. }
  221. struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
  222. client->socketfd->timeout = timeout;
  223. enum MQTTErrors error = mqtt_sync(client);
  224. if(error == MQTT_OK){
  225. return UA_STATUSCODE_GOOD;
  226. }else if(error == -1){
  227. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: yield: Communication Error.");
  228. return UA_STATUSCODE_BADCOMMUNICATIONERROR;
  229. }
  230. /* map mqtt errors to ua errors */
  231. const char* errorStr = mqtt_error_str(error);
  232. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: yield: error: %s", errorStr);
  233. switch(error){
  234. case MQTT_ERROR_CONNECTION_CLOSED:
  235. return UA_STATUSCODE_BADNOTCONNECTED;
  236. case MQTT_ERROR_SOCKET_ERROR:
  237. return UA_STATUSCODE_BADCOMMUNICATIONERROR;
  238. case MQTT_ERROR_CONNECTION_REFUSED:
  239. return UA_STATUSCODE_BADCONNECTIONREJECTED;
  240. default:
  241. return UA_STATUSCODE_BADCOMMUNICATIONERROR;
  242. }
  243. }
  244. UA_StatusCode
  245. publishMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic, const UA_ByteString *buf, UA_Byte qos){
  246. if(channelData == NULL || buf == NULL ){
  247. return UA_STATUSCODE_BADINVALIDARGUMENT;
  248. }
  249. UA_STACKARRAY(char, topicChar, sizeof(char) * topic.length +1);
  250. memcpy(topicChar, topic.data, topic.length);
  251. topicChar[topic.length] = '\0';
  252. struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
  253. if(client == NULL)
  254. return UA_STATUSCODE_BADNOTCONNECTED;
  255. /* publish */
  256. enum MQTTPublishFlags flags;
  257. if(qos == 0){
  258. flags = MQTT_PUBLISH_QOS_0;
  259. }else if( qos == 1){
  260. flags = MQTT_PUBLISH_QOS_1;
  261. }else if( qos == 2){
  262. flags = MQTT_PUBLISH_QOS_2;
  263. }else{
  264. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: publish: Bad Qos Level.");
  265. return UA_STATUSCODE_BADINVALIDARGUMENT;
  266. }
  267. mqtt_publish(client, topicChar, buf->data, buf->length, (uint8_t)flags);
  268. if (client->error != MQTT_OK) {
  269. if(client->error == MQTT_ERROR_SEND_BUFFER_IS_FULL){
  270. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: publish: Send buffer is full. "
  271. "Possible reasons: send buffer is to small, "
  272. "sending to fast, broker not responding.");
  273. }else{
  274. UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: publish: %s", mqtt_error_str(client->error));
  275. }
  276. return UA_STATUSCODE_BADCONNECTIONREJECTED;
  277. }
  278. return UA_STATUSCODE_GOOD;
  279. }