Browse Source

Merge branch 'master' of https://github.com/acplt/open62541

FlorianPalm 9 years ago
parent
commit
880ebc5b8d
4 changed files with 255 additions and 4 deletions
  1. 4 4
      CMakeLists.txt
  2. 83 0
      cmake/FindLibUV.cmake
  3. 166 0
      examples/networklayer_tcp_concurrent.c
  4. 2 0
      src/ua_transport.c

+ 4 - 4
CMakeLists.txt

@@ -163,11 +163,11 @@ endif()
 # build example server
 set(server_sources examples/opcuaServer.c
                    examples/logger_stdout.c)
-# if(NOT ENABLE_MULTITHREADING)
+if(NOT ENABLE_MULTITHREADING)
     list(APPEND server_sources examples/networklayer_tcp.c)
-# else()
-#    list(APPEND server_sources examples/networklayer_tcp_concurrent.c)
-# endif()
+else()
+    list(APPEND server_sources examples/networklayer_tcp_concurrent.c)
+endif()
 add_executable(exampleServer ${server_sources})
 target_link_libraries(exampleServer open62541)
 if(WIN32)

+ 83 - 0
cmake/FindLibUV.cmake

@@ -0,0 +1,83 @@
+# - Try to find libuv
+# Once done, this will define
+#
+#  LIBUV_FOUND - system has libuv
+#  LIBUV_INCLUDE_DIRS - the libuv include directories
+#  LIBUV_LIBRARIES - link these to use libuv
+#
+# Set the LIBUV_USE_STATIC variable to specify if static libraries should
+# be preferred to shared ones.
+
+find_package(PkgConfig)
+if (PKG_CONFIG_FOUND)
+    pkg_check_modules(PC_LIBUV QUIET libuv)
+endif()
+
+find_path(LIBUV_INCLUDE_DIR uv.h
+  HINTS ${PC_LIBUV_INCLUDEDIR} ${PC_LIBUV_INCLUDE_DIRS}
+  ${LIMIT_SEARCH})
+
+# If we're asked to use static linkage, add libuv.a as a preferred library name.
+if(LIBUV_USE_STATIC)
+  list(APPEND LIBUV_NAMES
+    "${CMAKE_STATIC_LIBRARY_PREFIX}uv${CMAKE_STATIC_LIBRARY_SUFFIX}")
+endif(LIBUV_USE_STATIC)
+
+list(APPEND LIBUV_NAMES uv)
+
+find_library(LIBUV_LIBRARY NAMES ${LIBUV_NAMES}
+  HINTS ${PC_LIBUV_LIBDIR} ${PC_LIBUV_LIBRARY_DIRS}
+  ${LIMIT_SEARCH})
+
+mark_as_advanced(LIBUV_INCLUDE_DIR LIBUV_LIBRARY)
+
+set(LIBUV_LIBRARIES ${LIBUV_LIBRARY})
+set(LIBUV_INCLUDE_DIRS ${LIBUV_INCLUDE_DIR})
+
+# Deal with the fact that libuv.pc is missing important dependency information.
+
+include(CheckLibraryExists)
+
+check_library_exists(dl dlopen "dlfcn.h" HAVE_LIBDL)
+if(HAVE_LIBDL)
+  list(APPEND LIBUV_LIBRARIES dl)
+endif()
+
+check_library_exists(kstat kstat_lookup "kstat.h" HAVE_LIBKSTAT)
+if(HAVE_LIBKSTAT)
+  list(APPEND LIBUV_LIBRARIES kstat)
+endif()
+
+check_library_exists(kvm kvm_open "kvm.h" HAVE_LIBKVM)
+if(HAVE_LIBKVM)
+  list(APPEND LIBUV_LIBRARIES kvm)
+endif()
+
+check_library_exists(nsl gethostbyname "nsl.h" HAVE_LIBNSL)
+if(HAVE_LIBNSL)
+  list(APPEND LIBUV_LIBRARIES nsl)
+endif()
+
+check_library_exists(perfstat perfstat_cpu "libperfstat.h" HAVE_LIBPERFSTAT)
+if(HAVE_LIBPERFSTAT)
+  list(APPEND LIBUV_LIBRARIES perfstat)
+endif()
+
+check_library_exists(rt clock_gettime "time.h" HAVE_LIBRT)
+if(HAVE_LIBRT)
+  list(APPEND LIBUV_LIBRARIES rt)
+endif()
+
+check_library_exists(sendfile sendfile "" HAVE_LIBSENDFILE)
+if(HAVE_LIBSENDFILE)
+  list(APPEND LIBUV_LIBRARIES sendfile)
+endif()
+
+include(FindPackageHandleStandardArgs)
+
+# handle the QUIETLY and REQUIRED arguments and set LIBUV_FOUND to TRUE
+# if all listed variables are TRUE
+find_package_handle_standard_args(LibUV DEFAULT_MSG
+                                  LIBUV_LIBRARY LIBUV_INCLUDE_DIR)
+
+mark_as_advanced(LIBUV_INCLUDE_DIR LIBUV_LIBRARY)

+ 166 - 0
examples/networklayer_tcp_concurrent.c

@@ -0,0 +1,166 @@
+/*
+ * This work is licensed under a Creative Commons CCZero 1.0 Universal License.
+ * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
+ */
+
+#define _GNU_SOURCE
+
+#include <uv.h>
+#include <assert.h>
+#include <malloc.h>
+#include "networklayer_tcp.h"
+#include "ua_transport.h"
+
+struct NetworklayerTCP {
+    UA_Server *server;
+    uv_loop_t *uvloop;
+    uv_tcp_t uvserver;
+	UA_ConnectionConfig localConf;
+	UA_UInt32 port;
+	UA_UInt32 connectionsSize;
+};
+
+UA_Int32 NetworklayerTCP_new(NetworklayerTCP **newlayer, UA_ConnectionConfig localConf, UA_UInt32 port) {
+    *newlayer = malloc(sizeof(NetworklayerTCP));
+    if(newlayer == UA_NULL)
+        return UA_ERROR;
+	(*newlayer)->localConf = localConf;
+	(*newlayer)->port = port;
+	return UA_SUCCESS;
+}
+
+void NetworklayerTCP_delete(NetworklayerTCP *layer) {
+    free(layer);
+}
+
+// callback structure to delete the buffer after the asynchronous write finished
+typedef struct {
+	uv_write_t req;
+    unsigned int bufsize;
+    uv_buf_t *bufs;
+} write_req_t;
+
+static void on_close(uv_handle_t * handle) {
+	if (handle->data) {
+        //UA_Connection_deleteMembers((UA_Connection*) handle->data);
+        free(handle->data);
+    }
+    free(handle);
+}
+
+void close(void *handle) {
+    uv_close((uv_handle_t *)handle, on_close);
+}
+
+
+static void after_shutdown(uv_shutdown_t * req, int status) {
+    uv_close((uv_handle_t *) req->handle, on_close);
+    free(req);
+}
+
+static void after_write(uv_write_t * req, int status) {
+    write_req_t *wr = (write_req_t*)req; // todo: use container_of
+    for(UA_UInt32 i=0;i<wr->bufsize;i++)
+        free(wr->bufs[i].base);
+    free(wr->bufs);
+
+    if (status) {
+		printf("uv_write error");
+		uv_close((uv_handle_t *) req->handle, on_close);
+	}
+
+    free(wr);
+}
+
+static void write(void *handle, const UA_ByteStringArray buf) {
+    uv_buf_t *uv_bufs = malloc(buf.stringsSize * sizeof(uv_buf_t));
+    for(UA_UInt32 i=0; i<buf.stringsSize; i++) {
+        uv_bufs[i].len = buf.strings[i].length;
+        uv_bufs[i].base = (char*)buf.strings[i].data;
+    }
+
+	write_req_t *wr = malloc(sizeof(write_req_t));
+    wr->bufsize = buf.stringsSize;
+    wr->bufs = uv_bufs;
+
+	if(uv_write(&wr->req, (uv_stream_t*)handle, uv_bufs, buf.stringsSize, after_write))
+        printf("uv_write failed");
+}
+
+static void handle_message(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
+    if (nread < 0) {
+		printf("connection ended");
+		if (buf.base)
+            free(buf.base);
+		uv_shutdown_t *req = malloc(sizeof(uv_shutdown_t));
+		uv_shutdown(req, handle, after_shutdown);
+		return;
+    }
+    if (nread == 0) {
+		free(buf.base);
+		return;
+    }
+
+    NetworklayerTCP *layer = (NetworklayerTCP*)handle->loop->data;
+    UA_Server *server = (UA_Server*) layer->server;
+    UA_Connection *connection = (UA_Connection*) handle->data;
+    UA_ByteString readBuffer;
+    readBuffer.length = nread; // the buffer might be longer
+    readBuffer.data = (UA_Byte*)buf.base;
+    UA_Server_processBinaryMessage(server, connection, &readBuffer);
+
+    free(buf.base);
+	return;
+}
+
+static uv_buf_t read_alloc(uv_handle_t * handle, size_t suggested_size) {
+    //UA_Server *server = (UA_Server*)handle->loop->data;
+    UA_UInt32 receive_bufsize = 2048; // todo: server->layer.localConf.recvBufferSize;
+	char* buf = malloc(sizeof(char)*receive_bufsize);
+    return uv_buf_init(buf, receive_bufsize);
+}
+
+static void on_connection(uv_stream_t *server, int status) {
+    if (status) {
+        printf("Connect error");
+        return;
+    }
+    uv_loop_t *loop = server->loop;
+    NetworklayerTCP *layer= (NetworklayerTCP*)loop->data;
+
+    uv_tcp_t *stream = malloc(sizeof(uv_tcp_t));
+    if(uv_tcp_init(loop, stream))
+        return;
+
+    UA_Connection *connection = malloc(sizeof(UA_Connection));
+    UA_Connection_init(connection, layer->localConf, stream, close, write);
+    stream->data = connection;
+
+    assert(uv_accept(server, (uv_stream_t*)stream) == 0);
+    assert(uv_read_start((uv_stream_t*)stream, read_alloc, handle_message) == 0);
+}
+
+UA_Int32 NetworkLayerTCP_run(NetworklayerTCP *layer, UA_Server *server, struct timeval tv, void(*worker)(UA_Server*), UA_Boolean *running) {
+    layer->uvloop = uv_default_loop();
+    layer->server = server;
+    struct sockaddr_in addr = uv_ip4_addr("0.0.0.0", layer->port);
+    if(uv_tcp_init(layer->uvloop, &layer->uvserver)) {
+		printf("Socket creation error\n");
+        return UA_ERROR;
+    }
+    
+    if(uv_tcp_bind(&layer->uvserver, addr)) {
+        printf("Bind error\n");
+        return UA_ERROR;
+    }
+
+#define MAXBACKLOG 10
+    if(uv_listen((uv_stream_t*)&layer->uvserver, MAXBACKLOG, on_connection)) {
+        printf("Listen error");
+        return UA_ERROR;
+    }
+    layer->uvloop->data = (void*)layer; // so we can get the pointer to the server
+    uv_run(layer->uvloop, UV_RUN_DEFAULT);
+    uv_loop_delete(layer->uvloop);
+    return UA_SUCCESS;
+}

+ 2 - 0
src/ua_transport.c

@@ -24,6 +24,8 @@ UA_Int32 UA_MessageType_encodeBinary(UA_MessageType const *src, UA_ByteString *d
 }
 
 UA_Int32 UA_MessageType_decodeBinary(UA_ByteString const *src, UA_UInt32 *offset, UA_MessageType *dst) {
+    if(*offset+3 > (UA_UInt32)src->length)
+        return UA_ERROR;
     UA_Int32 retval = UA_SUCCESS;
     UA_Byte  tmpBuf[3];
     retval |= UA_Byte_decodeBinary(src, offset, &(tmpBuf[0])); //messageType to Byte representation