Kaynağa Gözat

initial work on UDP support, got it to compile, however it is completely untested

Stasik0 10 yıl önce
ebeveyn
işleme
fec4c1ed62
4 değiştirilmiş dosya ile 304 ekleme ve 1 silme
  1. 10 1
      CMakeLists.txt
  2. 264 0
      examples/networklayer_udp.c
  3. 22 0
      examples/networklayer_udp.h
  4. 8 0
      examples/opcuaServer.c

+ 10 - 1
CMakeLists.txt

@@ -83,6 +83,12 @@ if(EXTENSION_STATELESS)
     endif()
 endif()
 
+option(EXTENSION_UDP "Enable udp extension" OFF)
+if(EXTENSION_UDP)
+	message(STATUS "Extensions: enabling udp")
+	add_definitions(-DEXTENSION_UDP)
+endif()
+
 ## self-signed certificates
 option(ENABLE_SELFSIGNED "Enable self-signed certificates" OFF)
 if(ENABLE_SELFSIGNED)
@@ -215,6 +221,9 @@ if(EXAMPLESERVER)
 set(server_sources examples/opcuaServer.c
                    examples/networklayer_tcp.c
                    examples/logger_stdout.c)
+if(EXTENSION_UDP)
+	list(APPEND server_sources examples/networklayer_udp.c)
+endif()
 add_executable(exampleServer ${server_sources} ${exported_headers} ${generated_headers})
 target_link_libraries(exampleServer open62541)
 if(WIN32)
@@ -244,4 +253,4 @@ if(GENERATE_DOCUMENTATION)
                       ${DOXYGEN_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
                       WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}
                       COMMENT "Generating API documentation with Doxygen")
-endif()
+endif()

+ 264 - 0
examples/networklayer_udp.c

@@ -0,0 +1,264 @@
+ /*
+ * This work is licensed under a Creative Commons CCZero 1.0 Universal License.
+ * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
+ */
+
+#include <stdlib.h> // malloc, free
+#ifdef _WIN32
+#include <malloc.h>
+#include <winsock2.h>
+#include <sys/types.h>
+#include <Windows.h>
+#include <ws2tcpip.h>
+#define CLOSESOCKET(S) closesocket(S)
+#else
+#include <sys/select.h> 
+#include <netinet/in.h>
+#include <sys/socketvar.h>
+#include <sys/ioctl.h>
+#include <unistd.h> // read, write, close
+#include <arpa/inet.h>
+#define CLOSESOCKET(S) close(S)
+#endif
+
+#include <stdio.h>
+#include <errno.h> // errno, EINTR
+#include <fcntl.h> // fcntl
+
+#include "networklayer_udp.h" // UA_MULTITHREADING is defined in here
+
+#ifdef UA_MULTITHREADING
+#include <urcu/uatomic.h>
+#endif
+
+#define MAXBACKLOG 100
+
+struct Networklayer_UDP;
+
+/* Forwarded to the server as a (UA_Connection) and used for callbacks back into
+   the networklayer */
+typedef struct {
+	UA_Connection connection;
+	struct sockaddr from;
+	socklen_t fromlen;
+	struct NetworkLayerUDP *layer;
+} UDPConnection;
+
+typedef struct NetworkLayerUDP {
+	UA_ConnectionConfig conf;
+	fd_set fdset;
+	UA_Int32 serversockfd;
+    UA_UInt32 port;
+    /* We remove the connection links only in the main thread. Attach
+       to-be-deleted links with atomic operations */
+    struct deleteLink {
+        UA_Int32 sockfd;
+        struct deleteLink *next;
+    } *deleteLinkList;
+} NetworkLayerUDP;
+
+static UA_StatusCode setNonBlocking(int sockid) {
+#ifdef _WIN32
+	u_long iMode = 1;
+	if(ioctlsocket(sockid, FIONBIO, &iMode) != NO_ERROR)
+		return UA_STATUSCODE_BADINTERNALERROR;
+#else
+	int opts = fcntl(sockid,F_GETFL);
+	if(opts < 0 || fcntl(sockid,F_SETFL,opts|O_NONBLOCK) < 0)
+		return UA_STATUSCODE_BADINTERNALERROR;
+#endif
+	return UA_STATUSCODE_GOOD;
+}
+
+// after every select, reset the set of sockets we want to listen on
+static void setFDSet(NetworkLayerUDP *layer) {
+	FD_ZERO(&layer->fdset);
+	FD_SET(layer->serversockfd, &layer->fdset);
+}
+
+// the callbacks are thread-safe if UA_MULTITHREADING is defined
+void closeConnectionUDP(UDPConnection *handle){
+
+}
+void writeCallbackUDP(UDPConnection *handle, UA_ByteStringArray gather_buf);
+
+/** Accesses only the sockfd in the handle. Can be run from parallel threads. */
+void writeCallbackUDP(UDPConnection *handle, UA_ByteStringArray gather_buf) {
+	UA_UInt32 total_len = 0, nWritten = 0;
+#ifdef _WIN32
+	LPWSABUF buf = _alloca(gather_buf.stringsSize * sizeof(WSABUF));
+	int result = 0;
+	for(UA_UInt32 i = 0; i<gather_buf.stringsSize; i++) {
+		buf[i].buf = gather_buf.strings[i].data;
+		buf[i].len = gather_buf.strings[i].length;
+		total_len += gather_buf.strings[i].length;
+	}
+	while(nWritten < total_len) {
+		UA_UInt32 n = 0;
+		do {
+			result = WSASendto(handle->sockfd, buf, gather_buf.stringsSize ,
+                             (LPDWORD)&n, 0, NULL, NULL);
+			//FIXME:
+			if(result != 0)
+				printf("Error WSASend, code: %d \n", WSAGetLastError());
+		} while(errno == EINTR);
+		nWritten += n;
+	}
+#else
+	struct iovec iov[gather_buf.stringsSize];
+	for(UA_UInt32 i=0;i<gather_buf.stringsSize;i++) {
+		iov[i] = (struct iovec) {.iov_base = gather_buf.strings[i].data,
+                                 .iov_len = gather_buf.strings[i].length};
+		total_len += gather_buf.strings[i].length;
+	}
+	struct msghdr message = {.msg_name = &(handle->from), .msg_namelen = handle->fromlen, .msg_iov = iov,
+							 .msg_iovlen = gather_buf.stringsSize, .msg_control = NULL,
+							 .msg_controllen = 0, .msg_flags = 0};
+	while (nWritten < total_len) {
+		UA_Int32 n = 0;
+		do {
+            n = sendmsg(handle->layer->serversockfd, &message, 0);
+        } while (n == -1L && errno == EINTR);
+        nWritten += n;
+	}
+#endif
+}
+
+UA_StatusCode NetworkLayerUDP_start(NetworkLayerUDP *layer) {
+#ifdef _WIN32
+	WORD wVersionRequested;
+	WSADATA wsaData;
+	wVersionRequested = MAKEWORD(2, 2);
+	WSAStartup(wVersionRequested, &wsaData);
+	if((layer->serversockfd = socket(PF_INET, SOCK_DGRAM,0)) == INVALID_SOCKET) {
+		printf("ERROR opening socket, code: %d\n", WSAGetLastError());
+		return UA_STATUSCODE_BADINTERNALERROR;
+	}
+#else
+    if((layer->serversockfd = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
+		perror("ERROR opening socket");
+		return UA_STATUSCODE_BADINTERNALERROR;
+	} 
+#endif
+
+	const struct sockaddr_in serv_addr = {
+        .sin_family = AF_INET, .sin_addr.s_addr = INADDR_ANY,
+        .sin_port = htons(layer->port), .sin_zero = {0}};
+
+	int optval = 1;
+	if(setsockopt(layer->serversockfd, SOL_SOCKET,
+                  SO_REUSEADDR, (const char *)&optval,
+                  sizeof(optval)) == -1) {
+		perror("setsockopt");
+		CLOSESOCKET(layer->serversockfd);
+		return UA_STATUSCODE_BADINTERNALERROR;
+	}
+		
+	if(bind(layer->serversockfd, (const struct sockaddr *)&serv_addr,
+            sizeof(serv_addr)) < 0) {
+		perror("binding");
+		CLOSESOCKET(layer->serversockfd);
+		return UA_STATUSCODE_BADINTERNALERROR;
+	}
+
+	setNonBlocking(layer->serversockfd);
+
+    printf("Listening for UDP connections on %s:%d\n",
+           inet_ntoa(serv_addr.sin_addr),
+           ntohs(serv_addr.sin_port));
+    return UA_STATUSCODE_GOOD;
+}
+
+UA_Int32 NetworkLayerUDP_getWork(NetworkLayerUDP *layer, UA_WorkItem **workItems,
+                                 UA_UInt16 timeout) {
+    UA_WorkItem *items = UA_NULL;
+    UA_Int32 itemsCount = 0;
+    setFDSet(layer);
+    struct timeval tmptv = {0, timeout};
+    UA_Int32 resultsize = select(layer->serversockfd+1, &layer->fdset, NULL, NULL, &tmptv);
+
+    if(resultsize < 0) {
+        *workItems = items;
+        return itemsCount;
+    }
+
+    items = realloc(items, sizeof(UA_WorkItem)*(itemsCount+resultsize));
+
+	// read from established sockets
+    UA_Int32 j = itemsCount;
+	UA_ByteString buf = { -1, NULL};
+		if(!buf.data) {
+			buf.data = malloc(sizeof(UA_Byte) * layer->conf.recvBufferSize);
+			if(!buf.data){
+				//TODO:
+			}
+		}
+
+	struct sockaddr src_addr;
+	socklen_t addrlen;
+        
+#ifdef _WIN32
+        buf.length = recvfrom(layer->conLinks[i].sockfd, (char *)buf.data,
+                          layer->conf.recvBufferSize, 0);
+        //todo: fixme
+#else
+        buf.length = recvfrom(layer->serversockfd, buf.data, layer->conf.recvBufferSize, 0, &src_addr, &addrlen);
+#endif
+
+        if (errno != 0 || buf.length == 0) {
+        } else {
+            UDPConnection *c = malloc(sizeof(UDPConnection));
+        	if(!c)
+        		return UA_STATUSCODE_BADINTERNALERROR;
+            c->layer = layer;
+            c->from = src_addr;
+            c->fromlen = addrlen;
+            c->connection.state = UA_CONNECTION_OPENING;
+            c->connection.localConf = layer->conf;
+            c->connection.channel = UA_NULL;
+            c->connection.close = (void (*)(void*))closeConnectionUDP;
+            c->connection.write = (void (*)(void*, UA_ByteStringArray))writeCallbackUDP;
+
+
+            items[j].type = UA_WORKITEMTYPE_BINARYNETWORKMESSAGE;
+            items[j].work.binaryNetworkMessage.message = buf;
+            items[j].work.binaryNetworkMessage.connection = (UA_Connection*)c;
+            buf.data = NULL;
+            j++;
+        }
+
+
+    if(buf.data)
+        free(buf.data);
+
+    if(j == 0) {
+        free(items);
+        *workItems = NULL;
+    } else
+        *workItems = items;
+    return j;
+}
+
+UA_Int32 NetworkLayerUDP_stop(NetworkLayerUDP * layer, UA_WorkItem **workItems) {
+	CLOSESOCKET(layer->serversockfd);
+	return 0;
+}
+
+void NetworkLayerUDP_delete(NetworkLayerUDP *layer) {
+	free(layer);
+}
+
+UA_NetworkLayer NetworkLayerUDP_new(UA_ConnectionConfig conf, UA_UInt32 port) {
+    NetworkLayerUDP *udplayer = malloc(sizeof(NetworkLayerUDP));
+	udplayer->conf = conf;
+    udplayer->port = port;
+    udplayer->deleteLinkList = UA_NULL;
+
+    UA_NetworkLayer nl;
+    nl.nlHandle = udplayer;
+    nl.start = (UA_StatusCode (*)(void*))NetworkLayerUDP_start;
+    nl.getWork = (UA_Int32 (*)(void*, UA_WorkItem**, UA_UInt16)) NetworkLayerUDP_getWork;
+    nl.stop = (UA_Int32 (*)(void*, UA_WorkItem**)) NetworkLayerUDP_stop;
+    nl.delete = (void (*)(void*))NetworkLayerUDP_delete;
+    return nl;
+}

+ 22 - 0
examples/networklayer_udp.h

@@ -0,0 +1,22 @@
+/*
+ * This work is licensed under a Creative Commons CCZero 1.0 Universal License.
+ * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
+ */
+
+#ifndef NETWORKLAYERUDP_H_
+#define NETWORKLAYERUDP_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ua_server.h"
+
+/** @brief Create the UDP networklayer and listen to the specified port */
+UA_NetworkLayer NetworkLayerUDP_new(UA_ConnectionConfig conf, UA_UInt32 port);
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* NETWORKLAYERUDP_H_ */

+ 8 - 0
examples/opcuaServer.c

@@ -15,6 +15,10 @@
 // provided by the user, implementations available in the /examples folder
 #include "logger_stdout.h"
 #include "networklayer_tcp.h"
+#ifdef EXTENSION_UDP
+#include "networklayer_udp.h"
+#endif
+
 
 UA_Boolean running = 1;
 
@@ -58,6 +62,10 @@ int main(int argc, char** argv) {
 	UA_Server *server = UA_Server_new();
     UA_Server_setServerCertificate(server, loadCertificate());
     UA_Server_addNetworkLayer(server, NetworkLayerTCP_new(UA_ConnectionConfig_standard, 16664));
+#ifdef EXTENSION_UDP
+    UA_Server_addNetworkLayer(server, NetworkLayerUDP_new(UA_ConnectionConfig_standard, 16664));
+#endif
+
 
     UA_WorkItem work = {.type = UA_WORKITEMTYPE_METHODCALL, .work.methodCall = {.method = testCallback, .data = UA_NULL} };
     UA_Server_addRepeatedWorkItem(server, &work, 20000000); // call every 2 sec