Преглед изворни кода

feat(pubsub) Add MQTT-C dependency

Lukas M пре 6 година
родитељ
комит
1a309820f8
4 измењених фајлова са 3398 додато и 0 уклоњено
  1. 1701 0
      deps/mqtt-c/mqtt.c
  2. 1542 0
      deps/mqtt-c/mqtt.h
  3. 112 0
      deps/mqtt-c/mqtt_pal.h
  4. 43 0
      plugins/mqtt/ua_mqtt_pal.c

Разлика између датотеке није приказан због своје велике величине
+ 1701 - 0
deps/mqtt-c/mqtt.c


Разлика између датотеке није приказан због своје велике величине
+ 1542 - 0
deps/mqtt-c/mqtt.h


+ 112 - 0
deps/mqtt-c/mqtt_pal.h

@@ -0,0 +1,112 @@
+#ifndef __MQTT_PAL_H__
+#define __MQTT_PAL_H__
+
+/**
+ * Copyright (c) 2019 Kalycito Infotech Private Limited
+ * @file
+ * @brief Includes/supports the types/calls required by the MQTT-C client.
+ * 
+ * @note This is the \em only file included in mqtt.h, and mqtt.c. It is therefore 
+ *       responsible for including/supporting all the required types and calls. 
+ * 
+ * @defgroup pal Platform abstraction layer
+ * @brief Documentation of the types and calls required to port MQTT-C to a new platform.
+ * 
+ * mqtt_pal.h is the \em only header file included in mqtt.c. Therefore, to port MQTT-C to a 
+ * new platform the following types, functions, constants, and macros must be defined in 
+ * mqtt_pal.h:
+ *  - Types:
+ *      - \c size_t, \c ssize_t
+ *      - \c uint8_t, \c uint16_t, \c uint32_t
+ *      - \c va_list
+ *      - \c mqtt_pal_time_t : return type of \c MQTT_PAL_TIME() 
+ *      - \c mqtt_pal_mutex_t : type of the argument that is passed to \c MQTT_PAL_MUTEX_LOCK and 
+ *        \c MQTT_PAL_MUTEX_RELEASE
+ *  - Functions:
+ *      - \c memcpy, \c strlen
+ *      - \c va_start, \c va_arg, \c va_end
+ *  - Constants:
+ *      - \c INT_MIN
+ * 
+ * Additionally, three macro's are required:
+ *  - \c MQTT_PAL_HTONS(s) : host-to-network endian conversion for uint16_t.
+ *  - \c MQTT_PAL_NTOHS(s) : network-to-host endian conversion for uint16_t.
+ *  - \c MQTT_PAL_TIME()   : returns [type: \c mqtt_pal_time_t] current time in seconds. 
+ *  - \c MQTT_PAL_MUTEX_LOCK(mtx_pointer) : macro that locks the mutex pointed to by \c mtx_pointer.
+ *  - \c MQTT_PAL_MUTEX_RELEASE(mtx_pointer) : macro that unlocks the mutex pointed to by 
+ *    \c mtx_pointer.
+ * 
+ * Lastly, \ref mqtt_pal_sendall and \ref mqtt_pal_recvall, must be implemented in mqtt_pal.c 
+ * for sending and receiving data using the platforms socket calls.
+ */
+
+#include <open62541/types.h>
+
+/* UNIX-like platform support */
+#ifdef __unix__
+    #include <limits.h>
+    #include <string.h>
+    #include <stdarg.h>
+    #include <time.h>
+    #include <arpa/inet.h>
+    #include <pthread.h>
+
+    /*#ifdef MQTT_USE_BIO
+        #include <openssl/bio.h>
+        typedef BIO* mqtt_pal_socket_handle;
+    #else
+        typedef int mqtt_pal_socket_handle;
+    #endif*/
+#endif
+
+#ifdef __MINGW32__
+    #include <pthread.h>
+#endif
+
+
+    #define MQTT_PAL_HTONS(s) htons(s)
+    #define MQTT_PAL_NTOHS(s) ntohs(s)
+
+    #define MQTT_PAL_TIME() time(NULL)
+
+    typedef time_t mqtt_pal_time_t;
+    typedef pthread_mutex_t mqtt_pal_mutex_t;
+
+    #define MQTT_PAL_MUTEX_INIT(mtx_ptr) pthread_mutex_init(mtx_ptr, NULL)
+    #define MQTT_PAL_MUTEX_LOCK(mtx_ptr) pthread_mutex_lock(mtx_ptr)
+    #define MQTT_PAL_MUTEX_UNLOCK(mtx_ptr) pthread_mutex_unlock(mtx_ptr)
+
+    struct my_custom_socket_handle {
+        void* client;
+        void* connection;
+        uint16_t timeout;
+    };
+    
+    typedef struct my_custom_socket_handle* mqtt_pal_socket_handle;
+/**
+ * @brief Sends all the bytes in a buffer.
+ * @ingroup pal
+ * 
+ * @param[in] fd The file-descriptor (or handle) of the socket.
+ * @param[in] buf A pointer to the first byte in the buffer to send.
+ * @param[in] len The number of bytes to send (starting at \p buf).
+ * @param[in] flags Flags which are passed to the underlying socket.
+ * 
+ * @returns The number of bytes sent if successful, an \ref MQTTErrors otherwise.
+ */
+ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags);
+
+/**
+ * @brief Non-blocking receive all the byte available.
+ * @ingroup pal
+ * 
+ * @param[in] fd The file-descriptor (or handle) of the socket.
+ * @param[in] buf A pointer to the receive buffer.
+ * @param[in] bufsz The max number of bytes that can be put into \p buf.
+ * @param[in] flags Flags which are passed to the underlying socket.
+ * 
+ * @returns The number of bytes received if successful, an \ref MQTTErrors otherwise.
+ */
+ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags);
+
+#endif /* __MQTT_PAL_H__ */

+ 43 - 0
plugins/mqtt/ua_mqtt_pal.c

@@ -0,0 +1,43 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright (c) 2018 Fraunhofer IOSB (Author: Lukas Meling)
+ * Copyright (c) 2019 Kalycito Infotech Private Limited
+ */
+
+#include "../../deps/mqtt-c/mqtt.h"
+#include <open62541/network_tcp.h>
+
+ssize_t
+mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
+    UA_Connection *connection = (UA_Connection*) fd->connection;
+    UA_ByteString sendBuffer;
+    sendBuffer.data = (UA_Byte*)UA_malloc(len);
+    sendBuffer.length = len;
+    memcpy(sendBuffer.data, buf, len);
+    UA_StatusCode ret = connection->send(connection, &sendBuffer);
+    if(ret != UA_STATUSCODE_GOOD)
+        return -1;
+    return (ssize_t)len;
+}
+
+ssize_t
+mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
+    UA_Connection *connection = (UA_Connection*) fd->connection;
+    connection->config.recvBufferSize = (UA_UInt32) bufsz;
+    UA_ByteString inBuffer;
+    UA_StatusCode ret = connection->recv(connection, &inBuffer, fd->timeout);
+    if(ret == UA_STATUSCODE_GOOD ){
+        // Buffer received, copy to recv buffer
+        memcpy(buf, inBuffer.data, inBuffer.length);
+        ssize_t bytesReceived = (ssize_t)inBuffer.length;
+        /* free recv buffer */
+        connection->releaseRecvBuffer(connection, &inBuffer);
+        return bytesReceived;
+    }else if(ret == UA_STATUSCODE_GOODNONCRITICALTIMEOUT){
+        return 0;
+    }else{
+        return -1; //error case, no free necessary
+    }
+}