Kaynağa Gözat

Merge pull request #155 from acplt/udp

Merging nonstandard udp transport
Julius Pfrommer 10 yıl önce
ebeveyn
işleme
734016e9f2

+ 12 - 1
CMakeLists.txt

@@ -74,6 +74,14 @@ endif()
 
 ## extensions
 option(EXTENSION_STATELESS "Enable stateless extension" OFF)
+option(EXTENSION_UDP "Enable udp extension" OFF)
+
+if(EXTENSION_UDP)
+	set(EXTENSION_STATELESS ON)
+	message(STATUS "Extensions: enabling udp")
+	add_definitions(-DEXTENSION_UDP)
+endif()
+
 if(EXTENSION_STATELESS)
 	message(STATUS "Extensions: enabling stateless")
 	add_definitions(-DEXTENSION_STATELESS)
@@ -215,6 +223,9 @@ if(EXAMPLESERVER)
 set(server_sources examples/opcuaServer.c
                    examples/networklayer_tcp.c
                    examples/logger_stdout.c)
+if(EXTENSION_UDP)
+	list(APPEND server_sources examples/networklayer_udp.c)
+endif()
 add_executable(exampleServer ${server_sources} ${exported_headers} ${generated_headers})
 target_link_libraries(exampleServer open62541)
 if(WIN32)
@@ -244,4 +255,4 @@ if(GENERATE_DOCUMENTATION)
                       ${DOXYGEN_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/Doxyfile
                       WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}
                       COMMENT "Generating API documentation with Doxygen")
-endif()
+endif()

+ 1 - 2
examples/networklayer_tcp.c

@@ -329,8 +329,7 @@ UA_Int32 NetworkLayerTCP_getWork(NetworkLayerTCP *layer, UA_WorkItem **workItems
 #else
         buf.length = read(layer->conLinks[i].sockfd, buf.data, layer->conf.recvBufferSize);
 #endif
-
-        if (errno != 0 || buf.length == 0) {
+        if (buf.length <= 0) {
             closeConnection(layer->conLinks[i].connection); // work is returned in the next iteration
         } else {
             items[j].type = UA_WORKITEMTYPE_BINARYNETWORKMESSAGE;

+ 273 - 0
examples/networklayer_udp.c

@@ -0,0 +1,273 @@
+ /*
+ * This work is licensed under a Creative Commons CCZero 1.0 Universal License.
+ * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
+ */
+
+#include <stdlib.h> // malloc, free
+#ifdef _WIN32
+#include <malloc.h>
+#include <winsock2.h>
+#include <sys/types.h>
+#include <Windows.h>
+#include <ws2tcpip.h>
+#define CLOSESOCKET(S) closesocket(S)
+#else
+#include <strings.h> //bzero
+#include <sys/select.h> 
+#include <netinet/in.h>
+#include <sys/socketvar.h>
+#include <sys/ioctl.h>
+#include <unistd.h> // read, write, close
+#include <arpa/inet.h>
+#define CLOSESOCKET(S) close(S)
+#endif
+
+#include <stdio.h>
+#include <errno.h> // errno, EINTR
+#include <fcntl.h> // fcntl
+
+#include "networklayer_udp.h" // UA_MULTITHREADING is defined in here
+
+#ifdef UA_MULTITHREADING
+#include <urcu/uatomic.h>
+#endif
+
+#define MAXBACKLOG 100
+
+struct Networklayer_UDP;
+
+/* Forwarded to the server as a (UA_Connection) and used for callbacks back into
+   the networklayer */
+typedef struct {
+	UA_Connection connection;
+	struct sockaddr from;
+	socklen_t fromlen;
+	struct NetworkLayerUDP *layer;
+} UDPConnection;
+
+typedef struct NetworkLayerUDP {
+	UA_ConnectionConfig conf;
+	fd_set fdset;
+	UA_Int32 serversockfd;
+    UA_UInt32 port;
+} NetworkLayerUDP;
+
+static UA_StatusCode setNonBlocking(int sockid) {
+#ifdef _WIN32
+	u_long iMode = 1;
+	if(ioctlsocket(sockid, FIONBIO, &iMode) != NO_ERROR)
+		return UA_STATUSCODE_BADINTERNALERROR;
+#else
+	int opts = fcntl(sockid,F_GETFL);
+	if(opts < 0 || fcntl(sockid,F_SETFL,opts|O_NONBLOCK) < 0)
+		return UA_STATUSCODE_BADINTERNALERROR;
+#endif
+	return UA_STATUSCODE_GOOD;
+}
+
+static void setFDSet(NetworkLayerUDP *layer) {
+	FD_ZERO(&layer->fdset);
+	FD_SET(layer->serversockfd, &layer->fdset);
+}
+
+// the callbacks are thread-safe if UA_MULTITHREADING is defined
+void closeConnectionUDP(UDPConnection *handle){
+	free(handle);
+}
+void writeCallbackUDP(UDPConnection *handle, UA_ByteStringArray gather_buf);
+
+/** Accesses only the sockfd in the handle. Can be run from parallel threads. */
+void writeCallbackUDP(UDPConnection *handle, UA_ByteStringArray gather_buf) {
+	UA_UInt32 total_len = 0, nWritten = 0;
+#ifdef _WIN32
+	LPWSABUF buf = _alloca(gather_buf.stringsSize * sizeof(WSABUF));
+	int result = 0;
+	for(UA_UInt32 i = 0; i<gather_buf.stringsSize; i++) {
+		buf[i].buf = gather_buf.strings[i].data;
+		buf[i].len = gather_buf.strings[i].length;
+		total_len += gather_buf.strings[i].length;
+	}
+	while(nWritten < total_len) {
+		UA_UInt32 n = 0;
+		do {
+			result = WSASendto(handle->sockfd, buf, gather_buf.stringsSize ,
+                             (LPDWORD)&n, 0, NULL, NULL);
+			//FIXME:
+			if(result != 0)
+				printf("Error WSASend, code: %d \n", WSAGetLastError());
+		} while(errno == EINTR);
+		nWritten += n;
+	}
+#else
+	struct iovec iov[gather_buf.stringsSize];
+	for(UA_UInt32 i=0;i<gather_buf.stringsSize;i++) {
+		iov[i] = (struct iovec) {.iov_base = gather_buf.strings[i].data,
+                                 .iov_len = gather_buf.strings[i].length};
+		total_len += gather_buf.strings[i].length;
+	}
+
+
+	struct sockaddr_in *sin = UA_NULL;
+	if (handle->from.sa_family == AF_INET)
+	{
+	    sin = (struct sockaddr_in *) &(handle->from);
+	}else{
+		//FIXME:
+		return;
+	}
+
+	struct msghdr message = {.msg_name = sin, .msg_namelen = handle->fromlen, .msg_iov = iov,
+							 .msg_iovlen = gather_buf.stringsSize, .msg_control = NULL,
+							 .msg_controllen = 0, .msg_flags = 0};
+	while (nWritten < total_len) {
+		UA_Int32 n = 0;
+		do {
+            n = sendmsg(handle->layer->serversockfd, &message, 0);
+            if(n==-1L){
+            	printf("ERROR:%i\n", errno);
+            }
+        } while (n == -1L && errno == EINTR);
+        nWritten += n;
+	}
+#endif
+}
+
+UA_StatusCode NetworkLayerUDP_start(NetworkLayerUDP *layer) {
+#ifdef _WIN32
+	WORD wVersionRequested;
+	WSADATA wsaData;
+	wVersionRequested = MAKEWORD(2, 2);
+	WSAStartup(wVersionRequested, &wsaData);
+	if((layer->serversockfd = socket(PF_INET, SOCK_DGRAM,0)) == INVALID_SOCKET) {
+		printf("ERROR opening socket, code: %d\n", WSAGetLastError());
+		return UA_STATUSCODE_BADINTERNALERROR;
+	}
+#else
+    if((layer->serversockfd = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
+		perror("ERROR opening socket");
+		return UA_STATUSCODE_BADINTERNALERROR;
+	} 
+#endif
+
+	const struct sockaddr_in serv_addr = {
+        .sin_family = AF_INET, .sin_addr.s_addr = INADDR_ANY,
+        .sin_port = htons(layer->port), .sin_zero = {0}};
+
+	int optval = 1;
+	if(setsockopt(layer->serversockfd, SOL_SOCKET,
+                  SO_REUSEADDR, (const char *)&optval,
+                  sizeof(optval)) == -1) {
+		perror("setsockopt");
+		CLOSESOCKET(layer->serversockfd);
+		return UA_STATUSCODE_BADINTERNALERROR;
+	}
+
+	if(bind(layer->serversockfd, (const struct sockaddr *)&serv_addr,
+            sizeof(serv_addr)) < 0) {
+		perror("binding");
+		CLOSESOCKET(layer->serversockfd);
+		return UA_STATUSCODE_BADINTERNALERROR;
+	}
+
+	setNonBlocking(layer->serversockfd);
+
+    printf("Listening for UDP connections on %s:%d\n",
+           inet_ntoa(serv_addr.sin_addr),
+           ntohs(serv_addr.sin_port));
+    return UA_STATUSCODE_GOOD;
+}
+
+UA_Int32 NetworkLayerUDP_getWork(NetworkLayerUDP *layer, UA_WorkItem **workItems,
+                                 UA_UInt16 timeout) {
+    UA_WorkItem *items = UA_NULL;
+    setFDSet(layer);
+    struct timeval tmptv = {0, timeout};
+    UA_Int32 resultsize = select(layer->serversockfd+1, &layer->fdset, NULL, NULL, &tmptv);
+
+    if(resultsize <= 0 || !FD_ISSET(layer->serversockfd, &layer->fdset)) {
+        *workItems = items;
+        return 0;
+    }
+
+    items = malloc(sizeof(UA_WorkItem)*(resultsize));
+
+	// read from established sockets
+    UA_Int32 j = 0;
+	UA_ByteString buf = { -1, NULL};
+		if(!buf.data) {
+			buf.data = malloc(sizeof(UA_Byte) * layer->conf.recvBufferSize);
+			if(!buf.data){
+				//TODO:
+				printf("malloc failed");
+			}
+		}
+
+		struct sockaddr sender;
+		socklen_t sendsize = sizeof(sender);
+		bzero(&sender, sizeof(sender));
+
+#ifdef _WIN32
+        buf.length = recvfrom(layer->conLinks[i].sockfd, (char *)buf.data,
+                          layer->conf.recvBufferSize, 0);
+        //todo: fixme
+#else
+        buf.length = recvfrom(layer->serversockfd, buf.data, layer->conf.recvBufferSize, 0, &sender, &sendsize);
+#endif
+
+        if (buf.length <= 0) {
+        } else {
+            UDPConnection *c = malloc(sizeof(UDPConnection));
+
+        	if(!c)
+        		return UA_STATUSCODE_BADINTERNALERROR;
+            c->layer = layer;
+            c->from = sender;
+            c->fromlen = sendsize;
+            c->connection.state = UA_CONNECTION_OPENING;
+            c->connection.localConf = layer->conf;
+            c->connection.channel = UA_NULL;
+            c->connection.close = (void (*)(void*))closeConnectionUDP;
+            c->connection.write = (void (*)(void*, UA_ByteStringArray))writeCallbackUDP;
+
+
+            items[j].type = UA_WORKITEMTYPE_BINARYNETWORKMESSAGE;
+            items[j].work.binaryNetworkMessage.message = buf;
+            items[j].work.binaryNetworkMessage.connection = (UA_Connection*)c;
+            buf.data = NULL;
+            j++;
+        }
+
+
+    if(buf.data)
+        free(buf.data);
+
+    if(j == 0) {
+        free(items);
+        *workItems = NULL;
+    } else
+        *workItems = items;
+    return j;
+}
+
+UA_Int32 NetworkLayerUDP_stop(NetworkLayerUDP * layer, UA_WorkItem **workItems) {
+	CLOSESOCKET(layer->serversockfd);
+	return 0;
+}
+
+void NetworkLayerUDP_delete(NetworkLayerUDP *layer) {
+	free(layer);
+}
+
+UA_NetworkLayer NetworkLayerUDP_new(UA_ConnectionConfig conf, UA_UInt32 port) {
+    NetworkLayerUDP *udplayer = malloc(sizeof(NetworkLayerUDP));
+	udplayer->conf = conf;
+    udplayer->port = port;
+
+    UA_NetworkLayer nl;
+    nl.nlHandle = udplayer;
+    nl.start = (UA_StatusCode (*)(void*))NetworkLayerUDP_start;
+    nl.getWork = (UA_Int32 (*)(void*, UA_WorkItem**, UA_UInt16)) NetworkLayerUDP_getWork;
+    nl.stop = (UA_Int32 (*)(void*, UA_WorkItem**)) NetworkLayerUDP_stop;
+    nl.delete = (void (*)(void*))NetworkLayerUDP_delete;
+    return nl;
+}

+ 22 - 0
examples/networklayer_udp.h

@@ -0,0 +1,22 @@
+/*
+ * This work is licensed under a Creative Commons CCZero 1.0 Universal License.
+ * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
+ */
+
+#ifndef NETWORKLAYERUDP_H_
+#define NETWORKLAYERUDP_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ua_server.h"
+
+/** @brief Create the UDP networklayer and listen to the specified port */
+UA_NetworkLayer NetworkLayerUDP_new(UA_ConnectionConfig conf, UA_UInt32 port);
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* NETWORKLAYERUDP_H_ */

+ 19 - 4
examples/opcuaClient.c

@@ -277,14 +277,18 @@ UA_Int64 sendReadRequest(ConnectionInfo *connectionInfo, UA_Int32 nodeIds_size,U
 	return tic;
 }
 
-int ua_client_connectUA(char* ipaddress,int port, UA_String *endpointUrl, ConnectionInfo *connectionInfo, UA_Boolean stateless)
+int ua_client_connectUA(char* ipaddress,int port, UA_String *endpointUrl, ConnectionInfo *connectionInfo, UA_Boolean stateless, UA_Boolean udp)
 {
 	UA_ByteString reply;
 	UA_ByteString_newMembers(&reply, 65536);
 	int sock;
 	struct sockaddr_in server;
 	//Create socket
-	sock = socket(AF_INET, SOCK_STREAM, 0);
+	if(udp==UA_TRUE){
+		sock = socket(AF_INET, SOCK_DGRAM, 0);
+	}else{
+		sock = socket(AF_INET, SOCK_STREAM, 0);
+	}
 	if(sock == -1) {
 		printf("Could not create socket");
         return 1;
@@ -359,6 +363,7 @@ int main(int argc, char *argv[]) {
 		printf("5th parameter: ip adress \n");
 		printf("6th parameter: port \n");
 		printf("7th parameter: 0=stateful, 1=stateless\n");
+		printf("8th parameter: 0=tcp, 1=udp (only with stateless calls)\n");
 		printf("\nUsing default parameters. \n");
 	}
 
@@ -368,6 +373,7 @@ int main(int argc, char *argv[]) {
 	UA_ByteString reply;
 	UA_ByteString_newMembers(&reply, 65536);
 	UA_Boolean stateless;
+	UA_Boolean udp;
 
 	if(defaultParams)
 		nodesToReadSize = 1;
@@ -397,6 +403,15 @@ int main(int argc, char *argv[]) {
 			stateless = UA_FALSE;
 	}
 
+	if(defaultParams){
+		udp = UA_FALSE;
+	}else{
+		if(atoi(argv[8]) != 0)
+			udp = UA_TRUE;
+		else
+			udp = UA_FALSE;
+	}
+
 
 
     //Connect to remote server
@@ -428,11 +443,11 @@ int main(int argc, char *argv[]) {
 		//if(stateless || (!stateless && i==0)){
 		tic = UA_DateTime_now();
 			if(defaultParams){
-				if(ua_client_connectUA("127.0.0.1",atoi("16664"),&endpoint,&connectionInfo,stateless) != 0){
+				if(ua_client_connectUA("127.0.0.1",atoi("16664"),&endpoint,&connectionInfo,stateless,udp) != 0){
 					return 0;
 				}
 			}else{
-				if(ua_client_connectUA(argv[5],atoi(argv[6]),&endpoint,&connectionInfo,stateless) != 0){
+				if(ua_client_connectUA(argv[5],atoi(argv[6]),&endpoint,&connectionInfo,stateless,udp) != 0){
 					return 0;
 				}
 			}

+ 9 - 0
examples/opcuaServer.c

@@ -15,6 +15,10 @@
 // provided by the user, implementations available in the /examples folder
 #include "logger_stdout.h"
 #include "networklayer_tcp.h"
+#ifdef EXTENSION_UDP
+#include "networklayer_udp.h"
+#endif
+
 
 UA_Boolean running = 1;
 
@@ -57,7 +61,12 @@ int main(int argc, char** argv) {
 
 	UA_Server *server = UA_Server_new();
     UA_Server_setServerCertificate(server, loadCertificate());
+#ifdef EXTENSION_UDP
+    UA_Server_addNetworkLayer(server, NetworkLayerUDP_new(UA_ConnectionConfig_standard, 16664));
+#else
     UA_Server_addNetworkLayer(server, NetworkLayerTCP_new(UA_ConnectionConfig_standard, 16664));
+#endif
+
 
     UA_WorkItem work = {.type = UA_WORKITEMTYPE_METHODCALL, .work.methodCall = {.method = testCallback, .data = UA_NULL} };
     UA_Server_addRepeatedWorkItem(server, &work, 20000000); // call every 2 sec

+ 4 - 0
examples/statelessClient.c

@@ -25,7 +25,11 @@ int main(int argc , char *argv[])
 	unsigned int messagepos = 0;
 
 	//Create socket
+#ifdef EXTENSION_UDP
+	sock = socket(AF_INET , SOCK_DGRAM , 0);
+#else
 	sock = socket(AF_INET , SOCK_STREAM , 0);
+#endif
 	if (sock == -1)
 	{
 		printf("Could not create socket");

+ 1 - 0
src/server/ua_server_binary.c

@@ -54,6 +54,7 @@ static void processHEL(UA_Connection *connection, const UA_ByteString *msg, UA_U
     UA_TcpMessageHeader_encodeBinary(&ackHeader, &ack_msg, &tmpPos);
     UA_TcpAcknowledgeMessage_encodeBinary(&ackMessage, &ack_msg, &tmpPos);
     UA_ByteStringArray answer_buf = { .stringsSize = 1, .strings = &ack_msg };
+    // the string is freed internall in the (asynchronous) write
     connection->write(connection, answer_buf);
     UA_TcpHelloMessage_deleteMembers(&helloMessage);
 }

+ 8 - 5
src/server/ua_server_worker.c

@@ -248,8 +248,10 @@ static UA_UInt16 processTimedWork(UA_Server *server) {
 
     tw = LIST_FIRST(&server->timedWork);
     UA_UInt16 timeout = MAXTIMEOUT;
-    if(tw)
+    if(tw){
         timeout = (tw->time - current)/10;
+        if(timeout>MAXTIMEOUT)timeout = MAXTIMEOUT;
+    }
     return timeout;
 }
 
@@ -411,10 +413,10 @@ UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *r
             UA_WorkItem *work;
             UA_Int32 workSize;
             if(*running) {
-                if(i == server->nlsSize-1)
-                    workSize = nl->getWork(nl->nlHandle, &work, timeout);
-                else
-                    workSize = nl->getWork(nl->nlHandle, &work, 0);
+            	if(i == server->nlsSize-1)
+            		workSize = nl->getWork(nl->nlHandle, &work, timeout);
+            	else
+            		workSize = nl->getWork(nl->nlHandle, &work, 0);
             } else {
                 workSize = server->nls[i].stop(nl->nlHandle, &work);
             }
@@ -434,6 +436,7 @@ UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *r
 #endif
         }
 
+
         // 3.3) Exit?
         if(!*running)
             break;