Explorar o código

refactored the networklayer interface

We make a difference between messages whose buffer was allocated by the networklayer and those who got copied on the heap.
This makes network layers with ringbuffers possible.

The send function of UA_Connection now deletes the message buffer internally, also when sending fails.
We always deleted the buffer anyway.

getSendBuffer takes a length argument. So we can save some RAM when the exact length is known
Julius Pfrommer %!s(int64=9) %!d(string=hai) anos
pai
achega
67cf0af100

+ 1 - 0
CMakeLists.txt

@@ -82,6 +82,7 @@ set(exported_headers ${PROJECT_BINARY_DIR}/src_generated/ua_config.h
                      ${PROJECT_SOURCE_DIR}/include/ua_types.h
                      ${PROJECT_BINARY_DIR}/src_generated/ua_nodeids.h
                      ${PROJECT_BINARY_DIR}/src_generated/ua_types_generated.h
+                     ${PROJECT_SOURCE_DIR}/include/ua_job.h
                      ${PROJECT_SOURCE_DIR}/include/ua_connection.h
                      ${PROJECT_SOURCE_DIR}/include/ua_log.h
                      ${PROJECT_SOURCE_DIR}/include/ua_server.h

+ 82 - 83
examples/networklayer_tcp.c

@@ -49,23 +49,30 @@ static void socket_close(UA_Connection *connection) {
     CLOSESOCKET(connection->sockfd);
 }
 
-static UA_StatusCode socket_write(UA_Connection *connection, UA_ByteString *buf, size_t buflen) {
+static UA_StatusCode socket_write(UA_Connection *connection, UA_ByteString *buf) {
     size_t nWritten = 0;
-    while (nWritten < buflen) {
+    while(buf->length > 0 && nWritten < (size_t)buf->length) {
         UA_Int32 n = 0;
         do {
 #ifdef _WIN32
-            n = send((SOCKET)connection->sockfd, (const char*)buf->data, buflen, 0);
+            n = send((SOCKET)connection->sockfd, (const char*)buf->data, (size_t)buf->length, 0);
             const int last_error = WSAGetLastError();
-            if(n < 0 && last_error != WSAEINTR && last_error != WSAEWOULDBLOCK){
+            if(n < 0 && last_error != WSAEINTR && last_error != WSAEWOULDBLOCK) {
                 connection->close(connection);
                 socket_close(connection);
+#ifdef UA_MULTITHREADING
+                UA_ByteString_deleteMembers(buf);
+#endif
                 return UA_STATUSCODE_BADCONNECTIONCLOSED;
             }
 #else
-            n = send(connection->sockfd, (const char*)buf->data, buflen, MSG_NOSIGNAL);
-            if(n == -1L && errno != EINTR && errno != EAGAIN){
+            n = send(connection->sockfd, (const char*)buf->data, (size_t)buf->length, MSG_NOSIGNAL);
+            if(n == -1L && errno != EINTR && errno != EAGAIN) {
+                connection->close(connection);
                 socket_close(connection);
+#ifdef UA_MULTITHREADING
+                UA_ByteString_deleteMembers(buf);
+#endif
                 return UA_STATUSCODE_BADCONNECTIONCLOSED;
             }
 #endif
@@ -81,16 +88,17 @@ static UA_StatusCode socket_write(UA_Connection *connection, UA_ByteString *buf,
 static UA_StatusCode socket_recv(UA_Connection *connection, UA_ByteString *response, UA_UInt32 timeout) {
     response->data = malloc(connection->localConf.recvBufferSize);
     if(!response->data) {
-        UA_ByteString_init(response);
+        response->length = -1;
         return UA_STATUSCODE_GOOD; /* not enough memory retry */
     }
+
     struct timeval tmptv = {0, timeout * 1000};
-    if(0 != setsockopt(connection->sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tmptv, sizeof(struct timeval))){
-		free(response->data);
-        UA_ByteString_init(response);
+    if(0 != setsockopt(connection->sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tmptv, sizeof(struct timeval))) {
+        UA_ByteString_deleteMembers(response);
         socket_close(connection);
         return UA_STATUSCODE_BADINTERNALERROR;
     }
+
     int ret = recv(connection->sockfd, (char*)response->data, connection->localConf.recvBufferSize, 0);
 	if(ret == 0) {
 		free(response->data);
@@ -102,18 +110,17 @@ static UA_StatusCode socket_recv(UA_Connection *connection, UA_ByteString *respo
         UA_ByteString_init(response);
 #ifdef _WIN32
         const int last_error = WSAGetLastError();
-        if(last_error == WSAEINTR || last_error == WSAEWOULDBLOCK) {
+        if(last_error == WSAEINTR || last_error == WSAEWOULDBLOCK)
 #else
-		if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
+		if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
 #endif
             return UA_STATUSCODE_GOOD; /* retry */
-        } else {
+        else {
             socket_close(connection);
             return UA_STATUSCODE_BADCONNECTIONCLOSED;
         }
     }
     response->length = ret;
-    *response = UA_Connection_completeMessages(connection, *response);
     return UA_STATUSCODE_GOOD;
 }
 
@@ -172,8 +179,9 @@ static void FreeConnectionCallback(UA_Server *server, void *ptr) {
 #define MAXBACKLOG 100
 
 typedef struct {
+    UA_ServerNetworkLayer layer;
+    
     /* config */
-    UA_Logger *logger;
     UA_UInt32 port;
     UA_ConnectionConfig conf; /* todo: rename to localconf. */
 
@@ -192,9 +200,11 @@ typedef struct {
     } *mappings;
 } ServerNetworkLayerTCP;
 
-static UA_StatusCode ServerNetworkLayerGetBuffer(UA_Connection *connection, UA_ByteString *buf) {
+static UA_StatusCode ServerNetworkLayerGetSendBuffer(UA_Connection *connection, UA_Int32 length, UA_ByteString *buf) {
+    if((UA_UInt32)length > connection->remoteConf.recvBufferSize)
+        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
 #ifdef UA_MULTITHREADING
-    return UA_ByteString_newMembers(buf, connection->remoteConf.recvBufferSize);
+    return UA_ByteString_newMembers(buf, length);
 #else
     ServerNetworkLayerTCP *layer = connection->handle;
     *buf = layer->buffer;
@@ -202,27 +212,23 @@ static UA_StatusCode ServerNetworkLayerGetBuffer(UA_Connection *connection, UA_B
 #endif
 }
 
-static void ServerNetworkLayerReleaseBuffer(UA_Connection *connection, UA_ByteString *buf) {
+static void ServerNetworkLayerReleaseSendBuffer(UA_Connection *connection, UA_ByteString *buf) {
 #ifdef UA_MULTITHREADING
     UA_ByteString_deleteMembers(buf);
 #endif
 }
 
+static void ServerNetworkLayerReleaseRecvBuffer(UA_Connection *connection, UA_ByteString *buf) {
+    UA_ByteString_deleteMembers(buf);
+}
+
 /* after every select, we need to reset the sockets we want to listen on */
 static void setFDSet(ServerNetworkLayerTCP *layer) {
     FD_ZERO(&layer->fdset);
-#ifdef _WIN32
     FD_SET((UA_UInt32)layer->serversockfd, &layer->fdset);
-#else
-    FD_SET(layer->serversockfd, &layer->fdset);
-#endif
     layer->highestfd = layer->serversockfd;
     for(size_t i = 0; i < layer->mappingsSize; i++) {
-#ifdef _WIN32
         FD_SET((UA_UInt32)layer->mappings[i].sockfd, &layer->fdset);
-#else
-        FD_SET(layer->mappings[i].sockfd, &layer->fdset);
-#endif
         if(layer->mappings[i].sockfd > layer->highestfd)
             layer->highestfd = layer->mappings[i].sockfd;
     }
@@ -251,13 +257,14 @@ static UA_StatusCode ServerNetworkLayerTCP_add(ServerNetworkLayerTCP *layer, UA_
     c->sockfd = newsockfd;
     c->handle = layer;
     c->localConf = layer->conf;
-    c->write = socket_write;
+    c->send = socket_write;
     c->close = ServerNetworkLayerTCP_closeConnection;
-    c->getBuffer = ServerNetworkLayerGetBuffer;
-    c->releaseBuffer = ServerNetworkLayerReleaseBuffer;
+    c->getSendBuffer = ServerNetworkLayerGetSendBuffer;
+    c->releaseSendBuffer = ServerNetworkLayerReleaseSendBuffer;
+    c->releaseRecvBuffer = ServerNetworkLayerReleaseRecvBuffer;
     c->state = UA_CONNECTION_OPENING;
-    struct ConnectionMapping *nm =
-        realloc(layer->mappings, sizeof(struct ConnectionMapping)*(layer->mappingsSize+1));
+    struct ConnectionMapping *nm;
+    nm = realloc(layer->mappings, sizeof(struct ConnectionMapping)*(layer->mappingsSize+1));
     if(!nm) {
         free(c);
         return UA_STATUSCODE_BADINTERNALERROR;
@@ -268,18 +275,17 @@ static UA_StatusCode ServerNetworkLayerTCP_add(ServerNetworkLayerTCP *layer, UA_
     return UA_STATUSCODE_GOOD;
 }
 
-static UA_StatusCode ServerNetworkLayerTCP_start(UA_ServerNetworkLayer *nl, UA_Logger *logger) {
-    ServerNetworkLayerTCP *layer = nl->handle;
-    layer->logger = logger;
+static UA_StatusCode ServerNetworkLayerTCP_start(ServerNetworkLayerTCP *layer, UA_Logger logger) {
+    layer->layer.logger = logger;
 #ifdef _WIN32
     if((layer->serversockfd = socket(PF_INET, SOCK_STREAM,0)) == (UA_Int32)INVALID_SOCKET) {
-        UA_LOG_WARNING((*layer->logger), UA_LOGCATEGORY_COMMUNICATION, "Error opening socket, code: %d",
+        UA_LOG_WARNING(layer->layer.logger, UA_LOGCATEGORY_COMMUNICATION, "Error opening socket, code: %d",
                        WSAGetLastError());
         return UA_STATUSCODE_BADINTERNALERROR;
     }
 #else
     if((layer->serversockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
-        UA_LOG_WARNING((*layer->logger), UA_LOGCATEGORY_COMMUNICATION, "Error opening socket");
+        UA_LOG_WARNING(layer->layer.logger, UA_LOGCATEGORY_COMMUNICATION, "Error opening socket");
         return UA_STATUSCODE_BADINTERNALERROR;
     }
 #endif
@@ -290,25 +296,25 @@ static UA_StatusCode ServerNetworkLayerTCP_start(UA_ServerNetworkLayer *nl, UA_L
     if(setsockopt(layer->serversockfd, SOL_SOCKET,
                   SO_REUSEADDR, (const char *)&optval,
                   sizeof(optval)) == -1) {
-        UA_LOG_WARNING((*layer->logger), UA_LOGCATEGORY_COMMUNICATION, "Error during setting of socket options");
+        UA_LOG_WARNING(layer->layer.logger, UA_LOGCATEGORY_COMMUNICATION,
+                       "Error during setting of socket options");
         CLOSESOCKET(layer->serversockfd);
         return UA_STATUSCODE_BADINTERNALERROR;
     }
     if(bind(layer->serversockfd, (const struct sockaddr *)&serv_addr,
             sizeof(serv_addr)) < 0) {
-        UA_LOG_WARNING((*layer->logger), UA_LOGCATEGORY_COMMUNICATION, "Error during socket binding");
+        UA_LOG_WARNING(layer->layer.logger, UA_LOGCATEGORY_COMMUNICATION, "Error during socket binding");
         CLOSESOCKET(layer->serversockfd);
         return UA_STATUSCODE_BADINTERNALERROR;
     }
     socket_set_nonblocking(layer->serversockfd);
     listen(layer->serversockfd, MAXBACKLOG);
-    UA_LOG_INFO((*layer->logger), UA_LOGCATEGORY_COMMUNICATION, "Listening on %.*s",
-                nl->discoveryUrl.length, nl->discoveryUrl.data);
+    UA_LOG_INFO(layer->layer.logger, UA_LOGCATEGORY_COMMUNICATION, "Listening on %.*s",
+                layer->layer.discoveryUrl.length, layer->layer.discoveryUrl.data);
     return UA_STATUSCODE_GOOD;
 }
 
-static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job **jobs, UA_UInt16 timeout) {
-    ServerNetworkLayerTCP *layer = nl->handle;
+static size_t ServerNetworkLayerTCP_getJobs(ServerNetworkLayerTCP *layer, UA_Job **jobs, UA_UInt16 timeout) {
     setFDSet(layer);
     struct timeval tmptv = {0, timeout};
     UA_Int32 resultsize;
@@ -318,7 +324,7 @@ static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job
         if(errno == EINTR)
             goto repeat_select;
         *jobs = NULL;
-        return resultsize;
+        return 0;
     }
 
     /* accept new connections (can only be a single one) */
@@ -343,17 +349,15 @@ static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job
         return 0;
 
     /* read from established sockets */
-    UA_Int32 j = 0;
+    size_t j = 0;
     UA_ByteString buf = UA_BYTESTRING_NULL;
-    for(size_t i = 0; i < layer->mappingsSize && j < resultsize; i++) {
+    for(size_t i = 0; i < layer->mappingsSize && j < (size_t)resultsize; i++) {
         if(!(FD_ISSET(layer->mappings[i].sockfd, &layer->fdset)))
             continue;
         if(socket_recv(layer->mappings[i].connection, &buf, 0) == UA_STATUSCODE_GOOD) {
             if(!buf.data)
                 continue;
-            js[j].type = UA_JOBTYPE_BINARYMESSAGE;
-            js[j].job.binaryMessage.message = buf;
-            js[j].job.binaryMessage.connection = layer->mappings[i].connection;
+            js[j] = UA_Connection_completeMessages(layer->mappings[i].connection, buf);
         } else {
             UA_Connection *c = layer->mappings[i].connection;
             /* the socket is already closed */
@@ -363,15 +367,14 @@ static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job
             layer->mappingsSize--;
             j++;
             i--; // iterate over the same index again
-            js[j].type = UA_JOBTYPE_DELAYEDMETHODCALL;
+            js[j].type = UA_JOBTYPE_METHODCALL_DELAYED;
             js[j].job.methodCall.method = FreeConnectionCallback;
             js[j].job.methodCall.data = c;
         }
         j++;
     }
 
-    if (j == 0)
-    {
+    if(j == 0) {
     	free(js);
     	js = NULL;
     }
@@ -380,8 +383,7 @@ static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job
     return j;
 }
 
-static UA_Int32 ServerNetworkLayerTCP_stop(UA_ServerNetworkLayer *nl, UA_Job **jobs) {
-    ServerNetworkLayerTCP *layer = nl->handle;
+static size_t ServerNetworkLayerTCP_stop(ServerNetworkLayerTCP *layer, UA_Job **jobs) {
     UA_Job *items = malloc(sizeof(UA_Job) * layer->mappingsSize * 2);
     if(!items)
         return 0;
@@ -389,7 +391,7 @@ static UA_Int32 ServerNetworkLayerTCP_stop(UA_ServerNetworkLayer *nl, UA_Job **j
         socket_close(layer->mappings[i].connection);
         items[i*2].type = UA_JOBTYPE_DETACHCONNECTION;
         items[i*2].job.closeConnection = layer->mappings[i].connection;
-        items[(i*2)+1].type = UA_JOBTYPE_DELAYEDMETHODCALL;
+        items[(i*2)+1].type = UA_JOBTYPE_METHODCALL_DELAYED;
         items[(i*2)+1].job.methodCall.method = FreeConnectionCallback;
         items[(i*2)+1].job.methodCall.data = layer->mappings[i].connection;
     }
@@ -401,56 +403,52 @@ static UA_Int32 ServerNetworkLayerTCP_stop(UA_ServerNetworkLayer *nl, UA_Job **j
 }
 
 /* run only when the server is stopped */
-static void ServerNetworkLayerTCP_deleteMembers(UA_ServerNetworkLayer *nl) {
-    ServerNetworkLayerTCP *layer = nl->handle;
+static void ServerNetworkLayerTCP_deleteMembers(ServerNetworkLayerTCP *layer) {
 #ifndef UA_MULTITHREADING
     UA_ByteString_deleteMembers(&layer->buffer);
 #endif
     for(size_t i = 0; i < layer->mappingsSize; i++)
         free(layer->mappings[i].connection);
     free(layer->mappings);
-    free(layer);
 }
 
-UA_ServerNetworkLayer ServerNetworkLayerTCP_new(UA_ConnectionConfig conf, UA_UInt32 port) {
+UA_ServerNetworkLayer * ServerNetworkLayerTCP_new(UA_ConnectionConfig conf, UA_UInt32 port) {
 #ifdef _WIN32
     WORD wVersionRequested;
     WSADATA wsaData;
     wVersionRequested = MAKEWORD(2, 2);
     WSAStartup(wVersionRequested, &wsaData);
 #endif
-    UA_ServerNetworkLayer nl;
-    memset(&nl, 0, sizeof(UA_ServerNetworkLayer));
-
     ServerNetworkLayerTCP *layer = malloc(sizeof(ServerNetworkLayerTCP));
-    if(!layer){
-        return nl;
-    }
+    if(!layer)
+        return NULL;
+    memset(layer, 0, sizeof(ServerNetworkLayerTCP));
     layer->conf = conf;
     layer->mappingsSize = 0;
     layer->mappings = NULL;
     layer->port = port;
     char hostname[256];
     gethostname(hostname, 255);
-    UA_String_copyprintf("opc.tcp://%s:%d", &nl.discoveryUrl, hostname, port);
+    UA_String_copyprintf("opc.tcp://%s:%d", &layer->layer.discoveryUrl, hostname, port);
 
 #ifndef UA_MULTITHREADING
     layer->buffer = (UA_ByteString){.length = conf.maxMessageSize, .data = malloc(conf.maxMessageSize)};
 #endif
 
-    nl.handle = layer;
-    nl.start = ServerNetworkLayerTCP_start;
-    nl.getJobs = ServerNetworkLayerTCP_getJobs;
-    nl.stop = ServerNetworkLayerTCP_stop;
-    nl.deleteMembers = ServerNetworkLayerTCP_deleteMembers;
-    return nl;
+    layer->layer.start = (UA_StatusCode(*)(UA_ServerNetworkLayer*,UA_Logger))ServerNetworkLayerTCP_start;
+    layer->layer.getJobs = (size_t(*)(UA_ServerNetworkLayer*,UA_Job**,UA_UInt16))ServerNetworkLayerTCP_getJobs;
+    layer->layer.stop = (size_t(*)(UA_ServerNetworkLayer*, UA_Job**))ServerNetworkLayerTCP_stop;
+    layer->layer.deleteMembers = (void(*)(UA_ServerNetworkLayer*))ServerNetworkLayerTCP_deleteMembers;
+    return &layer->layer;
 }
 
 /***************************/
 /* Client NetworkLayer TCP */
 /***************************/
 
-static UA_StatusCode ClientNetworkLayerGetBuffer(UA_Connection *connection, UA_ByteString *buf) {
+static UA_StatusCode ClientNetworkLayerGetBuffer(UA_Connection *connection, UA_Int32 length, UA_ByteString *buf) {
+    if((UA_UInt32)length > connection->remoteConf.recvBufferSize)
+        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
 #ifndef UA_MULTITHREADING
     if(connection->state == UA_CONNECTION_CLOSED)
         return UA_STATUSCODE_BADCONNECTIONCLOSED;
@@ -480,12 +478,11 @@ static void ClientNetworkLayerClose(UA_Connection *connection) {
 }
 
 /* we have no networklayer. instead, attach the reusable buffer to the handle */
-UA_Connection ClientNetworkLayerTCP_connect(UA_ConnectionConfig localConf, char *endpointUrl,
-                                            UA_Logger *logger) {
+UA_Connection
+ClientNetworkLayerTCP_connect(UA_ConnectionConfig localConf, char *endpointUrl, UA_Logger logger) {
     UA_Connection connection;
     UA_Connection_init(&connection);
     connection.localConf = localConf;
-
 #ifndef UA_MULTITHREADING
     connection.handle = UA_ByteString_new();
     UA_ByteString_newMembers(connection.handle, localConf.maxMessageSize);
@@ -502,8 +499,8 @@ UA_Connection ClientNetworkLayerTCP_connect(UA_ConnectionConfig localConf, char
     }
 
     UA_UInt16 portpos = 9;
-    UA_UInt16 port = 0;
-    for(;portpos < urlLength-1; portpos++) {
+    UA_UInt16 port;
+    for(port = 0; portpos < urlLength-1; portpos++) {
         if(endpointUrl[portpos] == ':') {
             port = atoi(&endpointUrl[portpos+1]);
             break;
@@ -531,7 +528,7 @@ UA_Connection ClientNetworkLayerTCP_connect(UA_ConnectionConfig localConf, char
         return connection;
     }
     struct hostent *server = gethostbyname(hostname);
-    if(server == NULL) {
+    if(!server) {
         UA_LOG_WARNING((*logger), UA_LOGCATEGORY_COMMUNICATION, "DNS lookup of %s failed", hostname);
         return connection;
     }
@@ -549,16 +546,18 @@ UA_Connection ClientNetworkLayerTCP_connect(UA_ConnectionConfig localConf, char
 
 #ifdef SO_NOSIGPIPE
     int val = 1;
-    if (setsockopt(connection.sockfd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)) < 0) {
-    UA_LOG_WARNING((*logger), UA_LOGCATEGORY_COMMUNICATION, "Couldn't set SO_NOSIGPIPE");
+    if(setsockopt(connection.sockfd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)) < 0) {
+        UA_LOG_WARNING((*logger), UA_LOGCATEGORY_COMMUNICATION, "Couldn't set SO_NOSIGPIPE");
+        return connection;
     }
 #endif
 
     //socket_set_nonblocking(connection.sockfd);
-    connection.write = socket_write;
+    connection.send = socket_write;
     connection.recv = socket_recv;
     connection.close = ClientNetworkLayerClose;
-    connection.getBuffer = ClientNetworkLayerGetBuffer;
-    connection.releaseBuffer = ClientNetworkLayerReleaseBuffer;
+    connection.getSendBuffer = ClientNetworkLayerGetBuffer;
+    connection.releaseSendBuffer = ClientNetworkLayerReleaseBuffer;
+    connection.releaseRecvBuffer = ClientNetworkLayerReleaseBuffer;
     return connection;
 }

+ 4 - 2
examples/networklayer_tcp.h

@@ -14,8 +14,10 @@ extern "C" {
 #include "ua_client.h"
 
 /** @brief Create the TCP networklayer and listen to the specified port */
-UA_EXPORT UA_ServerNetworkLayer ServerNetworkLayerTCP_new(UA_ConnectionConfig conf, UA_UInt32 port);
-UA_EXPORT UA_Connection ClientNetworkLayerTCP_connect(UA_ConnectionConfig conf, char *endpointUrl, UA_Logger *logger);
+UA_ServerNetworkLayer UA_EXPORT * ServerNetworkLayerTCP_new(UA_ConnectionConfig conf, UA_UInt32 port);
+
+UA_Connection UA_EXPORT
+ClientNetworkLayerTCP_connect(UA_ConnectionConfig conf, char *endpointUrl, UA_Logger logger);
 
 #ifdef __cplusplus
 } // extern "C"

+ 92 - 95
examples/networklayer_udp.c

@@ -32,17 +32,17 @@
 #define MAXBACKLOG 100
 
 #ifdef _WIN32
-# error fixme: udp not yet implemented for windows
+# error udp not yet implemented for windows
 #endif
 
 /*****************************/
 /* Generic Buffer Management */
 /*****************************/
 
-static UA_StatusCode GetMallocedBuffer(UA_Connection *connection, UA_ByteString *buf, size_t minSize) {
-    if(minSize > connection->remoteConf.recvBufferSize)
-        return UA_STATUSCODE_BADINTERNALERROR;
-    return UA_ByteString_newMembers(buf, minSize);
+static UA_StatusCode GetMallocedBuffer(UA_Connection *connection, UA_Int32 length, UA_ByteString *buf) {
+    if((UA_UInt32)length > connection->remoteConf.recvBufferSize)
+        return UA_STATUSCODE_BADCOMMUNICATIONERROR;
+    return UA_ByteString_newMembers(buf, connection->remoteConf.recvBufferSize);
 }
 
 static void ReleaseMallocedBuffer(UA_Connection *connection, UA_ByteString *buf) {
@@ -62,50 +62,46 @@ typedef struct {
 } UDPConnection;
 
 typedef struct {
-    UA_Server *server;
+    UA_ServerNetworkLayer layer;
 	UA_ConnectionConfig conf;
 	fd_set fdset;
 	UA_Int32 serversockfd;
     UA_UInt32 port;
-    UA_Logger *logger;
 } ServerNetworkLayerUDP;
 
 /** Accesses only the sockfd in the handle. Can be run from parallel threads. */
-static void writeCallbackUDP(UDPConnection *handle, UA_ByteStringArray gather_buf) {
-	UA_UInt32 total_len = 0, nWritten = 0;
-	struct iovec iov[gather_buf.stringsSize];
-	for(UA_UInt32 i=0;i<gather_buf.stringsSize;i++) {
-		iov[i] = (struct iovec) {.iov_base = gather_buf.strings[i].data,
-                                 .iov_len = gather_buf.strings[i].length};
-		total_len += gather_buf.strings[i].length;
-	}
+static UA_StatusCode sendUDP(UA_Connection *connection, UA_ByteString *buf) {
+    UDPConnection *udpc = (UDPConnection*)connection;
+    ServerNetworkLayerUDP *layer = (ServerNetworkLayerUDP*)connection->handle;
+	size_t nWritten = 0;
 	struct sockaddr_in *sin = NULL;
-	if (handle->from.sa_family == AF_INET) {
+
+	if (udpc->from.sa_family == AF_INET) {
 #if ((__GNUC__ == 4 && __GNUC_MINOR__ >= 6) || __GNUC__ > 4 || defined(__clang__))
 #pragma GCC diagnostic push
 #pragma GCC diagnostic ignored "-Wcast-align"
 #endif
-	    sin = (struct sockaddr_in *) &(handle->from);
+	    sin = (struct sockaddr_in *) &udpc->from;
 #if ((__GNUC__ == 4 && __GNUC_MINOR__ >= 6) || __GNUC__ > 4 || defined(__clang__))
 #pragma GCC diagnostic pop
 #endif
 	} else {
-		//FIXME:
-		return;
-	}
-	struct msghdr message = {.msg_name = sin, .msg_namelen = handle->fromlen, .msg_iov = iov,
-							 .msg_iovlen = gather_buf.stringsSize, .msg_control = NULL,
-							 .msg_controllen = 0, .msg_flags = 0};
-	while (nWritten < total_len) {
-		UA_Int32 n = 0;
-		do {
-            n = sendmsg(((ServerNetworkLayerUDP*)handle->connection.handle)->serversockfd, &message, 0);
-            if(n == -1L) {
-            	printf("ERROR:%i\n", errno);
-            }
-        } while (n == -1L && errno == EINTR);
+        UA_ByteString_deleteMembers(buf);
+		return UA_STATUSCODE_BADINTERNALERROR;
+    }
+
+	while (nWritten < (size_t)buf->length) {
+		UA_Int32 n = sendto(layer->serversockfd, buf->data, buf->length, 0,
+                            (struct sockaddr*)sin, sizeof(struct sockaddr_in));
+        if(n == -1L) {
+            UA_LOG_WARNING(layer->layer.logger, UA_LOGCATEGORY_COMMUNICATION, "UDP send error %i", errno);
+            UA_ByteString_deleteMembers(buf);
+            return UA_STATUSCODE_BADINTERNALERROR;
+        }
         nWritten += n;
 	}
+    UA_ByteString_deleteMembers(buf);
+    return UA_STATUSCODE_GOOD;
 }
 
 static UA_StatusCode socket_set_nonblocking(UA_Int32 sockfd) {
@@ -120,14 +116,15 @@ static void setFDSet(ServerNetworkLayerUDP *layer) {
 	FD_SET(layer->serversockfd, &layer->fdset);
 }
 
-static void closeConnectionUDP(UDPConnection *handle) {
+static void closeConnectionUDP(UA_Connection *handle) {
 	free(handle);
 }
 
-static UA_StatusCode ServerNetworkLayerUDP_start(ServerNetworkLayerUDP *layer, UA_Logger *logger) {
-    layer->logger = logger;
-    if((layer->serversockfd = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
-		perror("ERROR opening socket");
+static UA_StatusCode ServerNetworkLayerUDP_start(ServerNetworkLayerUDP *layer, UA_Logger logger) {
+    layer->layer.logger = logger;
+    layer->serversockfd = socket(PF_INET, SOCK_DGRAM, 0);
+    if(layer->serversockfd < 0) {
+		UA_LOG_WARNING(layer->layer.logger, UA_LOGCATEGORY_COMMUNICATION, "Error opening socket");
 		return UA_STATUSCODE_BADINTERNALERROR;
 	} 
 	const struct sockaddr_in serv_addr =
@@ -135,98 +132,98 @@ static UA_StatusCode ServerNetworkLayerUDP_start(ServerNetworkLayerUDP *layer, U
          .sin_port = htons(layer->port), .sin_zero = {0}};
 	int optval = 1;
 	if(setsockopt(layer->serversockfd, SOL_SOCKET,
-                  SO_REUSEADDR, (const char *)&optval,
-                  sizeof(optval)) == -1) {
-		perror("setsockopt");
+                  SO_REUSEADDR, (const char *)&optval, sizeof(optval)) == -1) {
+        UA_LOG_WARNING(layer->layer.logger, UA_LOGCATEGORY_COMMUNICATION, "Could not setsockopt");
 		CLOSESOCKET(layer->serversockfd);
 		return UA_STATUSCODE_BADINTERNALERROR;
 	}
 	if(bind(layer->serversockfd, (const struct sockaddr *)&serv_addr,
             sizeof(serv_addr)) < 0) {
-		perror("binding");
+        UA_LOG_WARNING(layer->layer.logger, UA_LOGCATEGORY_COMMUNICATION, "Could not bind the socket");
 		CLOSESOCKET(layer->serversockfd);
 		return UA_STATUSCODE_BADINTERNALERROR;
 	}
 	socket_set_nonblocking(layer->serversockfd);
-    printf("Listening for UDP connections on %s:%d", inet_ntoa(serv_addr.sin_addr),
-           ntohs(serv_addr.sin_port));
+    UA_LOG_WARNING(layer->layer.logger, UA_LOGCATEGORY_COMMUNICATION, "Listening for UDP connections on %s:%d",
+                   inet_ntoa(serv_addr.sin_addr), ntohs(serv_addr.sin_port));
     return UA_STATUSCODE_GOOD;
 }
 
-static UA_Int32 ServerNetworkLayerUDP_getWork(ServerNetworkLayerUDP *layer, UA_WorkItem **workItems,
-                                              UA_UInt16 timeout) {
-    UA_WorkItem *items = NULL;
+static size_t ServerNetworkLayerUDP_getJobs(ServerNetworkLayerUDP *layer, UA_Job **jobs, UA_UInt16 timeout) {
+    UA_Job *items = NULL;
     setFDSet(layer);
     struct timeval tmptv = {0, timeout};
     UA_Int32 resultsize = select(layer->serversockfd+1, &layer->fdset, NULL, NULL, &tmptv);
     if(resultsize <= 0 || !FD_ISSET(layer->serversockfd, &layer->fdset)) {
-        *workItems = items;
+        *jobs = items;
         return 0;
     }
-    items = malloc(sizeof(UA_WorkItem)*(resultsize));
+    items = malloc(sizeof(UA_Job)*resultsize);
 	// read from established sockets
     UA_Int32 j = 0;
 	UA_ByteString buf = {-1, NULL};
-		if(!buf.data) {
-			buf.data = malloc(sizeof(UA_Byte) * layer->conf.recvBufferSize);
-			if(!buf.data)
-				printf("malloc failed");
-		}
-		struct sockaddr sender;
-		socklen_t sendsize = sizeof(sender);
-		bzero(&sender, sizeof(sender));
-        buf.length = recvfrom(layer->serversockfd, buf.data, layer->conf.recvBufferSize, 0, &sender, &sendsize);
-        if (buf.length <= 0) {
-        } else {
-            UDPConnection *c = malloc(sizeof(UDPConnection));
-        	if(!c)
-        		return UA_STATUSCODE_BADINTERNALERROR;
-            c->from = sender;
-            c->fromlen = sendsize;
-            c->connection.state = UA_CONNECTION_OPENING;
-            c->connection.localConf = layer->conf;
-            c->connection.channel = NULL;
-            c->connection.close = (void (*)(UA_Connection*))closeConnectionUDP;
-            c->connection.write = (UA_StatusCode (*)(UA_Connection*, UA_ByteStringArray))writeCallbackUDP;
-            c->connection.getBuffer = GetMallocedBuffer;
-            c->connection.releaseBuffer = ReleaseMallocedBuffer;
-            c->connection.handle = layer;
-            items[j].type = UA_WORKITEMTYPE_BINARYMESSAGE;
-            items[j].work.binaryMessage.message = buf;
-            items[j].work.binaryMessage.connection = (UA_Connection*)c;
-            buf.data = NULL;
-            j++;
-        }
+    if(!buf.data) {
+        buf.data = malloc(sizeof(UA_Byte) * layer->conf.recvBufferSize);
+        if(!buf.data)
+            UA_LOG_WARNING(layer->layer.logger, UA_LOGCATEGORY_COMMUNICATION, "malloc failed");
+    }
+    struct sockaddr sender;
+    socklen_t sendsize = sizeof(sender);
+    bzero(&sender, sizeof(sender));
+    buf.length = recvfrom(layer->serversockfd, buf.data, layer->conf.recvBufferSize, 0, &sender, &sendsize);
+    if (buf.length <= 0) {
+    } else {
+        UDPConnection *c = malloc(sizeof(UDPConnection));
+        if(!c)
+            return UA_STATUSCODE_BADINTERNALERROR;
+        UA_Connection_init(&c->connection);
+        c->from = sender;
+        c->fromlen = sendsize;
+        // c->sockfd = newsockfd;
+        c->connection.getSendBuffer = GetMallocedBuffer;
+        c->connection.releaseSendBuffer = ReleaseMallocedBuffer;
+        c->connection.releaseRecvBuffer = ReleaseMallocedBuffer;
+        c->connection.handle = layer;
+        c->connection.send = sendUDP;
+        c->connection.close = closeConnectionUDP;
+        c->connection.localConf = layer->conf;
+        c->connection.state = UA_CONNECTION_OPENING;
+
+        items[j].type = UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER;
+        items[j].job.binaryMessage.message = buf;
+        items[j].job.binaryMessage.connection = (UA_Connection*)c;
+        buf.data = NULL;
+        j++;
+    }
     if(buf.data)
         free(buf.data);
     if(j == 0) {
         free(items);
-        *workItems = NULL;
+        *jobs = NULL;
     } else
-        *workItems = items;
+        *jobs = items;
     return j;
 }
 
-static UA_Int32 ServerNetworkLayerUDP_stop(ServerNetworkLayerUDP * layer, UA_WorkItem **workItems) {
+static UA_Int32 ServerNetworkLayerUDP_stop(ServerNetworkLayerUDP * layer, UA_Job **jobs) {
 	CLOSESOCKET(layer->serversockfd);
 	return 0;
 }
 
-static void ServerNetworkLayerUDP_delete(ServerNetworkLayerUDP *layer) {
-	free(layer);
+static void ServerNetworkLayerUDP_deleteMembers(ServerNetworkLayerUDP *layer) {
 }
 
-UA_ServerNetworkLayer ServerNetworkLayerUDP_new(UA_ConnectionConfig conf, UA_UInt32 port) {
-    ServerNetworkLayerUDP *udplayer = malloc(sizeof(ServerNetworkLayerUDP));
-	udplayer->conf = conf;
-    udplayer->port = port;
-
-    UA_ServerNetworkLayer nl;
-    nl.nlHandle = udplayer;
-    nl.start = (UA_StatusCode (*)(void*, UA_Logger *logger))ServerNetworkLayerUDP_start;
-    nl.getWork = (UA_Int32 (*)(void*, UA_WorkItem**, UA_UInt16)) ServerNetworkLayerUDP_getWork;
-    nl.stop = (UA_Int32 (*)(void*, UA_WorkItem**)) ServerNetworkLayerUDP_stop;
-    nl.free = (void (*)(void*))ServerNetworkLayerUDP_delete;
-    nl.discoveryUrl = NULL;
-    return nl;
+UA_ServerNetworkLayer * ServerNetworkLayerUDP_new(UA_ConnectionConfig conf, UA_UInt32 port) {
+    ServerNetworkLayerUDP *layer = malloc(sizeof(ServerNetworkLayerUDP));
+    if(!layer)
+        return NULL;
+    memset(layer, 0, sizeof(ServerNetworkLayerUDP));
+
+	layer->conf = conf;
+    layer->port = port;
+    layer->layer.start = (UA_StatusCode(*)(UA_ServerNetworkLayer*,UA_Logger))ServerNetworkLayerUDP_start;
+    layer->layer.getJobs = (size_t(*)(UA_ServerNetworkLayer*,UA_Job**,UA_UInt16))ServerNetworkLayerUDP_getJobs;
+    layer->layer.stop = (size_t(*)(UA_ServerNetworkLayer*, UA_Job**))ServerNetworkLayerUDP_stop;
+    layer->layer.deleteMembers = (void(*)(UA_ServerNetworkLayer*))ServerNetworkLayerUDP_deleteMembers;
+    return &layer->layer;
 }

+ 6 - 6
examples/networklayer_udp.h

@@ -10,14 +10,14 @@
 extern "C" {
 #endif
 
-#ifdef NOT_AMALGATED
-# include "ua_server.h"
-#else
-# include "open62541.h"
-#endif
+#include "ua_server.h"
+#include "ua_client.h"
 
 /** @brief Create the UDP networklayer and listen to the specified port */
-UA_ServerNetworkLayer ServerNetworkLayerUDP_new(UA_ConnectionConfig conf, UA_UInt32 port);
+UA_ServerNetworkLayer UA_EXPORT * ServerNetworkLayerUDP_new(UA_ConnectionConfig conf, UA_UInt32 port);
+
+UA_Connection UA_EXPORT
+ClientNetworkLayerUDP_connect(UA_ConnectionConfig conf, char endpointUrl[], UA_Logger logger);
 
 #ifdef __cplusplus
 } // extern "C"

+ 9 - 8
examples/server_udp.c

@@ -3,14 +3,13 @@
  * See http://creativecommons.org/publicdomain/zero/1.0/ for more information.
  */
 #include <time.h>
-#include "ua_types.h"
-
 #include <stdio.h>
 #include <stdlib.h> 
 #include <signal.h>
 
 // provided by the open62541 lib
-#include "ua_server.h"
+# include "ua_types.h"
+# include "ua_server.h"
 
 // provided by the user, implementations available in the /examples folder
 #include "logger_stdout.h"
@@ -26,8 +25,9 @@ static void stopHandler(int sign) {
 int main(int argc, char** argv) {
 	signal(SIGINT, stopHandler); /* catches ctrl-c */
 
-	UA_Server *server = UA_Server_new();
-    UA_Server_addNetworkLayer(server, ServerNetworkLayerUDP_new(UA_ConnectionConfig_standard, 16664));
+	UA_Server *server = UA_Server_new(UA_ServerConfig_standard);
+    UA_ServerNetworkLayer *nl = ServerNetworkLayerUDP_new(UA_ConnectionConfig_standard, 16664);
+    UA_Server_addNetworkLayer(server, nl);
 
 	// add a variable node to the adresspace
     UA_Variant *myIntegerVariant = UA_Variant_new();
@@ -35,11 +35,12 @@ int main(int argc, char** argv) {
     UA_Variant_setScalarCopy(myIntegerVariant, &myInteger, &UA_TYPES[UA_TYPES_INT32]);
     const UA_QualifiedName myIntegerName = UA_QUALIFIEDNAME(1, "the answer");
     const UA_NodeId myIntegerNodeId = UA_NODEID_STRING(1, "the.answer");
+    UA_LocalizedText myIntegerBrowseName = UA_LOCALIZEDTEXT("en_US","the answer");
     UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
     UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
-    UA_Server_addVariableNode(server, myIntegerVariant, myIntegerName,
-                              myIntegerNodeId, parentNodeId, parentReferenceNodeId);
-
+    UA_Server_addVariableNode(server, myIntegerNodeId, myIntegerName, myIntegerBrowseName,
+                              myIntegerBrowseName, 0,0,
+                              parentNodeId, parentReferenceNodeId, myIntegerVariant, NULL);
 
     UA_StatusCode retval = UA_Server_run(server, 1, &running);
 	UA_Server_delete(server);

+ 1 - 1
include/ua_client.h

@@ -19,7 +19,7 @@ typedef struct UA_Client UA_Client;
  * successfully connecting.
  */
 typedef UA_Connection (*UA_ConnectClientConnection)(UA_ConnectionConfig localConf, char *endpointUrl,
-                                                    UA_Logger *logger);
+                                                    UA_Logger logger);
 
 typedef struct UA_ClientConfig {
     UA_Int32 timeout; //sync response timeout

+ 31 - 26
include/ua_connection.h

@@ -21,12 +21,7 @@ extern "C" {
 #endif
 
 #include "ua_types.h"
-
-/**
- * @defgroup communication Communication
- *
- * @{
- */
+#include "ua_job.h"
 
 typedef enum UA_ConnectionState {
     UA_CONNECTION_OPENING, ///< The socket is open, but the HEL/ACK handshake is not done
@@ -48,9 +43,6 @@ extern const UA_EXPORT UA_ConnectionConfig UA_ConnectionConfig_standard;
 struct UA_SecureChannel;
 typedef struct UA_SecureChannel UA_SecureChannel;
 
-struct UA_Connection;
-typedef struct UA_Connection UA_Connection;
-
 /**
  * The connection to a single client (or server). The connection is defined independent of the
  * underlying network layer implementation. This allows a plugging-in custom implementations (e.g.
@@ -60,29 +52,42 @@ struct UA_Connection {
     UA_ConnectionState state;
     UA_ConnectionConfig localConf;
     UA_ConnectionConfig remoteConf;
-    UA_SecureChannel *channel; ///> The securechannel that is attached to this connection (or null)
-    UA_Int32 sockfd; ///> Most connectivity solutions run on sockets. Having the socket id here simplifies the design.
-    void *handle; ///> A pointer to the networklayer
-    UA_ByteString incompleteMessage; ///> Half-received messages (tcp is a streaming protocol) get stored here
-    UA_StatusCode (*getBuffer)(UA_Connection *connection, UA_ByteString *buf); ///> Get a buffer of the maximum remote recv size
-    void (*releaseBuffer)(UA_Connection *connection, UA_ByteString *buf); ///> Release the buffer manually
+    UA_SecureChannel *channel; ///< The securechannel that is attached to this connection (or null)
+    UA_Int32 sockfd; ///< Most connectivity solutions run on sockets. Having the socket id here
+                     ///  simplifies the design.
+    void *handle; ///< A pointer to the networklayer
+    UA_ByteString incompleteMessage; ///< A half-received message (TCP is a streaming protocol) is stored here
+
+    /** Get a buffer for sending */
+    UA_StatusCode (*getSendBuffer)(UA_Connection *connection, UA_Int32 length, UA_ByteString *buf);
+
+    /** Release the send buffer manually */
+    void (*releaseSendBuffer)(UA_Connection *connection, UA_ByteString *buf);
+
     /**
      * Sends a message over the connection.
      * @param connection The connection
-     * @param buf The message buffer is potentially reused (or freed) internally if sending succeeds.
-     * @param buflen Since the buffer is potentially reused, we provide a separate content length.
+     * @param buf The message buffer is always released (freed) internally
      * @return Returns an error code or UA_STATUSCODE_GOOD.
      */
-    UA_StatusCode (*write)(UA_Connection *connection, UA_ByteString *buf, size_t buflen);
+    UA_StatusCode (*send)(UA_Connection *connection, UA_ByteString *buf);
+
    /**
      * Receive a message from the remote connection
 	 * @param connection The connection
-	 * @param response The response string. It is allocated by the connection and needs to be freed with connection->releaseBuffer
+	 * @param response The response string. It is allocated by the connection and needs to be freed
+              with connection->releaseBuffer
      * @param timeout Timeout of the recv operation in milliseconds
-     * @return Returns UA_STATUSCODE_BADCOMMUNICATIONERROR if the recv operation can be repeated, UA_STATUSCODE_GOOD if it succeeded and
-     * UA_STATUSCODE_BADCONNECTIONCLOSED if the connection was closed.
+     * @return Returns UA_STATUSCODE_BADCOMMUNICATIONERROR if the recv operation can be repeated,
+     *         UA_STATUSCODE_GOOD if it succeeded and UA_STATUSCODE_BADCONNECTIONCLOSED if the
+     *         connection was closed.
 	 */
     UA_StatusCode (*recv)(UA_Connection *connection, UA_ByteString *response, UA_UInt32 timeout);
+
+    /** Release the buffer of a received message */
+    void (*releaseRecvBuffer)(UA_Connection *connection, UA_ByteString *buf);
+
+    /** Close the connection */
     void (*close)(UA_Connection *connection);
 };
 
@@ -92,11 +97,11 @@ void UA_EXPORT UA_Connection_deleteMembers(UA_Connection *connection);
 void UA_EXPORT UA_Connection_detachSecureChannel(UA_Connection *connection);
 void UA_EXPORT UA_Connection_attachSecureChannel(UA_Connection *connection, UA_SecureChannel *channel);
 
-/** Returns a string of complete message (the length entry is decoded for that).
-    If the received message is incomplete, it is retained in the connection. */
-UA_ByteString UA_EXPORT UA_Connection_completeMessages(UA_Connection *connection, UA_ByteString received);
-
-/** @} */
+/** Returns a job that contains either a message-bytestring managed by the network layer or a
+    message-bytestring that was newly allocated (or a nothing-job). Half-received messages are
+    attached to the connection. The next completion tries to create a complete message with the next
+    buffer the connection receives. */
+UA_Job UA_EXPORT UA_Connection_completeMessages(UA_Connection *connection, UA_ByteString received);
 
 #ifdef __cplusplus
 } // extern "C"

+ 56 - 0
include/ua_job.h

@@ -0,0 +1,56 @@
+ /*
+ * Copyright (C) 2014 the contributors as stated in the AUTHORS file
+ *
+ * This file is part of open62541. open62541 is free software: you can
+ * redistribute it and/or modify it under the terms of the GNU Lesser General
+ * Public License, version 3 (as published by the Free Software Foundation) with
+ * a static linking exception as stated in the LICENSE file provided with
+ * open62541.
+ *
+ * open62541 is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ */
+
+#ifndef UA_JOB_H_
+#define UA_JOB_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct UA_Connection;
+typedef struct UA_Connection UA_Connection;
+
+struct UA_Server;
+typedef struct UA_Server UA_Server;
+
+/** Jobs describe work that is executed once or repeatedly in the server */
+typedef struct {
+    enum {
+        UA_JOBTYPE_NOTHING, ///< Guess what?
+        UA_JOBTYPE_DETACHCONNECTION, ///< Detach the connection from the secure channel (but don't delete it)
+        UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER, ///< The binary message is memory managed by the networklayer
+        UA_JOBTYPE_BINARYMESSAGE_ALLOCATED, ///< The binary message was relocated away from the networklayer
+        UA_JOBTYPE_METHODCALL, ///< Call the method as soon as possible
+        UA_JOBTYPE_METHODCALL_DELAYED, ///< Call the method as soon as all previous jobs have finished
+    } type;
+    union {
+        UA_Connection *closeConnection;
+        struct {
+            UA_Connection *connection;
+            UA_ByteString message;
+        } binaryMessage;
+        struct {
+            void *data;
+            void (*method)(UA_Server *server, void *data);
+        } methodCall;
+    } job;
+} UA_Job;
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* UA_JOB_H_ */

+ 14 - 38
include/ua_server.h

@@ -24,9 +24,10 @@ extern "C" {
 #include "ua_types.h"
 #include "ua_types_generated.h"
 #include "ua_nodeids.h"
-#include "ua_connection.h"
 #include "ua_log.h"
-  
+#include "ua_job.h"
+#include "ua_connection.h"
+
 /**
  * @defgroup server Server
  *
@@ -47,9 +48,6 @@ typedef struct UA_ServerConfig {
 
 extern UA_EXPORT const UA_ServerConfig UA_ServerConfig_standard;
 
-struct UA_Server;
-typedef struct UA_Server UA_Server;
-
 UA_Server UA_EXPORT * UA_Server_new(UA_ServerConfig config);
 void UA_EXPORT UA_Server_setServerCertificate(UA_Server *server, UA_ByteString certificate);
 void UA_EXPORT UA_Server_delete(UA_Server *server);
@@ -426,28 +424,6 @@ UA_Server_getAttribute_method(UA_Server *server, UA_NodeId methodNodeId, UA_Meth
 UA_StatusCode UA_EXPORT
 UA_Server_getAttribute_DataSource(UA_Server *server, UA_NodeId nodeId, UA_DataSource **value);
 
-/** Jobs describe work that is executed once or repeatedly. */
-typedef struct {
-    enum {
-        UA_JOBTYPE_NOTHING,
-        UA_JOBTYPE_DETACHCONNECTION,
-        UA_JOBTYPE_BINARYMESSAGE,
-        UA_JOBTYPE_METHODCALL,
-        UA_JOBTYPE_DELAYEDMETHODCALL,
-    } type;
-    union {
-        UA_Connection *closeConnection;
-        struct {
-            UA_Connection *connection;
-            UA_ByteString message;
-        } binaryMessage;
-        struct {
-            void *data;
-            void (*method)(UA_Server *server, void *data);
-        } methodCall;
-    } job;
-} UA_Job;
-
 /**
  * @param server The server object.
  *
@@ -467,8 +443,8 @@ UA_StatusCode UA_EXPORT UA_Server_addRepeatedJob(UA_Server *server, UA_Job job,
                                                  UA_Guid *jobId);
 
 /**
- * Remove repeated job. The entry will be removed asynchronously during the
- * next iteration of the server main loop.
+ * Remove repeated job. The entry will be removed asynchronously during the next iteration of the
+ * server main loop.
  *
  * @param server The server object.
  *
@@ -486,8 +462,8 @@ UA_StatusCode UA_EXPORT UA_Server_removeRepeatedJob(UA_Server *server, UA_Guid j
  * layer does not need to be thread-safe.
  */
 typedef struct UA_ServerNetworkLayer {
-    void *handle;
     UA_String discoveryUrl;
+    UA_Logger logger; ///< Set during _start
 
     /**
      * Starts listening on the the networklayer.
@@ -496,30 +472,30 @@ typedef struct UA_ServerNetworkLayer {
      * @param logger The logger
      * @return Returns UA_STATUSCODE_GOOD or an error code.
      */
-    UA_StatusCode (*start)(struct UA_ServerNetworkLayer *nl, UA_Logger *logger);
+    UA_StatusCode (*start)(struct UA_ServerNetworkLayer *nl, UA_Logger logger);
     
     /**
      * Gets called from the main server loop and returns the jobs (accumulated messages and close
      * events) for dispatch.
      *
      * @param nl The network layer
-     * @param jobs When the returned integer is positive, *jobs points to an array of UA_Job of the
+     * @param jobs When the returned integer is >0, *jobs points to an array of UA_Job of the
      * returned size.
      * @param timeout The timeout during which an event must arrive in microseconds
      * @return The size of the jobs array. If the result is negative, an error has occurred.
      */
-    UA_Int32 (*getJobs)(struct UA_ServerNetworkLayer *nl, UA_Job **jobs, UA_UInt16 timeout);
+    size_t (*getJobs)(struct UA_ServerNetworkLayer *nl, UA_Job **jobs, UA_UInt16 timeout);
 
     /**
      * Closes the network connection and returns all the jobs that need to be finished before the
      * network layer can be safely deleted.
      *
      * @param nl The network layer
-     * @param jobs When the returned integer is positive, jobs points to an array of UA_Job of the
+     * @param jobs When the returned integer is >0, jobs points to an array of UA_Job of the
      * returned size.
      * @return The size of the jobs array. If the result is negative, an error has occurred.
      */
-    UA_Int32 (*stop)(struct UA_ServerNetworkLayer *nl, UA_Job **jobs);
+    size_t (*stop)(struct UA_ServerNetworkLayer *nl, UA_Job **jobs);
 
     /** Deletes the network layer. Call only after a successful shutdown. */
     void (*deleteMembers)(struct UA_ServerNetworkLayer *nl);
@@ -530,7 +506,7 @@ typedef struct UA_ServerNetworkLayer {
  * with the server. Do not use it after adding it as it might be moved around on
  * the heap.
  */
-void UA_EXPORT UA_Server_addNetworkLayer(UA_Server *server, UA_ServerNetworkLayer networkLayer);
+void UA_EXPORT UA_Server_addNetworkLayer(UA_Server *server, UA_ServerNetworkLayer *networkLayer);
 
 /** @} */
 
@@ -588,8 +564,8 @@ typedef UA_Int32 (*UA_ExternalNodeStore_browseNodes)
  UA_BrowseResult *browseResults, UA_DiagnosticInfo *diagnosticInfos);
 
 typedef UA_Int32 (*UA_ExternalNodeStore_translateBrowsePathsToNodeIds)
-(void *ensHandle, const UA_RequestHeader *requestHeader, UA_BrowsePath *browsePath,
- UA_UInt32 *indices, UA_UInt32 indicesSize, UA_BrowsePathResult *browsePathResults, UA_DiagnosticInfo *diagnosticInfos);
+(void *ensHandle, const UA_RequestHeader *requestHeader, UA_BrowsePath *browsePath, UA_UInt32 *indices,
+ UA_UInt32 indicesSize, UA_BrowsePathResult *browsePathResults, UA_DiagnosticInfo *diagnosticInfos);
 
 typedef UA_Int32 (*UA_ExternalNodeStore_delete)(void *ensHandle);
 

+ 17 - 18
src/client/ua_client.c

@@ -113,7 +113,7 @@ static UA_StatusCode HelAckHandshake(UA_Client *c) {
     hello.sendBufferSize = conn->localConf.sendBufferSize;
 
     UA_ByteString message;
-    UA_StatusCode retval = c->connection.getBuffer(&c->connection, &message);
+    UA_StatusCode retval = c->connection.getSendBuffer(&c->connection, c->connection.remoteConf.recvBufferSize, &message);
     if(retval != UA_STATUSCODE_GOOD)
         return retval;
 
@@ -124,15 +124,14 @@ static UA_StatusCode HelAckHandshake(UA_Client *c) {
     retval |= UA_TcpMessageHeader_encodeBinary(&messageHeader, &message, &offset);
     UA_TcpHelloMessage_deleteMembers(&hello);
     if(retval != UA_STATUSCODE_GOOD) {
-        c->connection.releaseBuffer(&c->connection, &message);
+        c->connection.releaseSendBuffer(&c->connection, &message);
         return retval;
     }
 
-    retval = c->connection.write(&c->connection, &message, messageHeader.messageSize);
-    if(retval != UA_STATUSCODE_GOOD) {
-        c->connection.releaseBuffer(&c->connection, &message);
+    message.length = messageHeader.messageSize;
+    retval = c->connection.send(&c->connection, &message);
+    if(retval != UA_STATUSCODE_GOOD)
         return retval;
-    }
 
     UA_ByteString reply;
     UA_ByteString_init(&reply);
@@ -194,7 +193,8 @@ static UA_StatusCode SecureChannelHandshake(UA_Client *client, UA_Boolean renew)
     }
 
     UA_ByteString message;
-    UA_StatusCode retval = client->connection.getBuffer(&client->connection, &message);
+    UA_Connection *c = &client->connection;
+    UA_StatusCode retval = c->getSendBuffer(c, c->remoteConf.recvBufferSize, &message);
     if(retval != UA_STATUSCODE_GOOD) {
         UA_AsymmetricAlgorithmSecurityHeader_deleteMembers(&asymHeader);
         UA_OpenSecureChannelRequest_deleteMembers(&opnSecRq);
@@ -213,15 +213,14 @@ static UA_StatusCode SecureChannelHandshake(UA_Client *client, UA_Boolean renew)
     UA_AsymmetricAlgorithmSecurityHeader_deleteMembers(&asymHeader);
     UA_OpenSecureChannelRequest_deleteMembers(&opnSecRq);
     if(retval != UA_STATUSCODE_GOOD) {
-        client->connection.releaseBuffer(&client->connection, &message);
+        client->connection.releaseSendBuffer(&client->connection, &message);
         return retval;
     }
 
-    retval = client->connection.write(&client->connection, &message, messageHeader.messageHeader.messageSize);
-    if(retval != UA_STATUSCODE_GOOD) {
-        client->connection.releaseBuffer(&client->connection, &message);
+    message.length = messageHeader.messageHeader.messageSize;
+    retval = client->connection.send(&client->connection, &message);
+    if(retval != UA_STATUSCODE_GOOD)
         return retval;
-    }
 
     UA_ByteString reply;
     UA_ByteString_init(&reply);
@@ -508,7 +507,8 @@ static UA_StatusCode CloseSecureChannel(UA_Client *client) {
     UA_NodeId typeId = UA_NODEID_NUMERIC(0, UA_NS0ID_CLOSESECURECHANNELREQUEST + UA_ENCODINGOFFSET_BINARY);
 
     UA_ByteString message;
-    UA_StatusCode retval = client->connection.getBuffer(&client->connection, &message);
+    UA_Connection *c = &client->connection;
+    UA_StatusCode retval = c->getSendBuffer(c, c->remoteConf.recvBufferSize, &message);
     if(retval != UA_STATUSCODE_GOOD)
         return retval;
 
@@ -523,13 +523,12 @@ static UA_StatusCode CloseSecureChannel(UA_Client *client) {
     retval |= UA_SecureConversationMessageHeader_encodeBinary(&msgHeader, &message, &offset);
 
     if(retval != UA_STATUSCODE_GOOD) {
-        client->connection.releaseBuffer(&client->connection, &message);
+        client->connection.releaseSendBuffer(&client->connection, &message);
         return retval;
     }
         
-    retval = client->connection.write(&client->connection, &message, msgHeader.messageHeader.messageSize);
-    if(retval != UA_STATUSCODE_GOOD)
-        client->connection.releaseBuffer(&client->connection, &message);
+    message.length = msgHeader.messageHeader.messageSize;
+    retval = client->connection.send(&client->connection, &message);
     return retval;
 }
 
@@ -548,7 +547,7 @@ UA_StatusCode UA_Client_connect(UA_Client *client, UA_ConnectClientConnection co
         UA_Client_reset(client);
     }
 
-    client->connection = connectFunc(UA_ConnectionConfig_standard, endpointUrl, &client->logger);
+    client->connection = connectFunc(UA_ConnectionConfig_standard, endpointUrl, client->logger);
     if(client->connection.state != UA_CONNECTION_OPENING){
         retval = UA_STATUSCODE_BADCONNECTIONCLOSED;
         goto cleanup;

+ 8 - 7
src/server/ua_server.c

@@ -107,9 +107,9 @@ UA_Logger UA_Server_getLogger(UA_Server *server) {
     return server->logger;
 }
 
-void UA_Server_addNetworkLayer(UA_Server *server, UA_ServerNetworkLayer networkLayer) {
-    UA_ServerNetworkLayer *newlayers = UA_realloc(server->networkLayers,
-                                                  sizeof(UA_ServerNetworkLayer)*(server->networkLayersSize+1));
+void UA_Server_addNetworkLayer(UA_Server *server, UA_ServerNetworkLayer *networkLayer) {
+    UA_ServerNetworkLayer **newlayers =
+        UA_realloc(server->networkLayers, sizeof(void*)*(server->networkLayersSize+1));
     if(!newlayers) {
         UA_LOG_ERROR(server->logger, UA_LOGCATEGORY_SERVER, "Networklayer added");
         return;
@@ -127,12 +127,12 @@ void UA_Server_addNetworkLayer(UA_Server *server, UA_ServerNetworkLayer networkL
         return;
     }
     server->description.discoveryUrls = newUrls;
-    UA_String_copy(&networkLayer.discoveryUrl,
+    UA_String_copy(&networkLayer->discoveryUrl,
                    &server->description.discoveryUrls[server->description.discoveryUrlsSize]);
     server->description.discoveryUrlsSize++;
     for(UA_Int32 i = 0; i < server->endpointDescriptionsSize; i++)
         if(!server->endpointDescriptions[i].endpointUrl.data)
-            UA_String_copy(&networkLayer.discoveryUrl, &server->endpointDescriptions[i].endpointUrl);
+            UA_String_copy(&networkLayer->discoveryUrl, &server->endpointDescriptions[i].endpointUrl);
 }
 
 void UA_Server_setServerCertificate(UA_Server *server, UA_ByteString certificate) {
@@ -169,8 +169,9 @@ void UA_Server_delete(UA_Server *server) {
 
     // Delete the network layers
     for(size_t i = 0; i < server->networkLayersSize; i++) {
-        UA_String_deleteMembers(&server->networkLayers[i].discoveryUrl);
-        server->networkLayers[i].deleteMembers(&server->networkLayers[i]);
+        UA_String_deleteMembers(&server->networkLayers[i]->discoveryUrl);
+        server->networkLayers[i]->deleteMembers(server->networkLayers[i]);
+        UA_free(server->networkLayers[i]);
     }
     UA_free(server->networkLayers);
 

+ 20 - 32
src/server/ua_server_binary.c

@@ -42,18 +42,18 @@ static void processHEL(UA_Connection *connection, const UA_ByteString *msg, size
     ackHeader.messageSize =  8 + 20; /* ackHeader + ackMessage */
 
     UA_ByteString ack_msg;
-    if(connection->getBuffer(connection, &ack_msg) != UA_STATUSCODE_GOOD)
+    if(connection->getSendBuffer(connection, connection->remoteConf.recvBufferSize,
+                                 &ack_msg) != UA_STATUSCODE_GOOD)
         return;
 
     size_t tmpPos = 0;
     UA_TcpMessageHeader_encodeBinary(&ackHeader, &ack_msg, &tmpPos);
     UA_TcpAcknowledgeMessage_encodeBinary(&ackMessage, &ack_msg, &tmpPos);
-    if(connection->write(connection, &ack_msg, ackHeader.messageSize) != UA_STATUSCODE_GOOD)
-        connection->releaseBuffer(connection, &ack_msg);
+    ack_msg.length = ackHeader.messageSize;
+    connection->send(connection, &ack_msg);
 }
 
-static void processOPN(UA_Connection *connection, UA_Server *server, const UA_ByteString *msg,
-                       size_t *pos) {
+static void processOPN(UA_Connection *connection, UA_Server *server, const UA_ByteString *msg, size_t *pos) {
     if(connection->state != UA_CONNECTION_ESTABLISHED) {
         connection->close(connection);
         return;
@@ -112,7 +112,7 @@ static void processOPN(UA_Connection *connection, UA_Server *server, const UA_By
                                                UA_ENCODINGOFFSET_BINARY);
 
     UA_ByteString resp_msg;
-    retval = connection->getBuffer(connection, &resp_msg);
+    retval = connection->getSendBuffer(connection, connection->remoteConf.recvBufferSize, &resp_msg);
     if(retval != UA_STATUSCODE_GOOD) {
         UA_OpenSecureChannelResponse_deleteMembers(&p);
         UA_AsymmetricAlgorithmSecurityHeader_deleteMembers(&asymHeader);
@@ -126,16 +126,14 @@ static void processOPN(UA_Connection *connection, UA_Server *server, const UA_By
     retval |= UA_OpenSecureChannelResponse_encodeBinary(&p, &resp_msg, &tmpPos);
 
     if(retval != UA_STATUSCODE_GOOD) {
-        connection->releaseBuffer(connection, &resp_msg);
+        connection->releaseSendBuffer(connection, &resp_msg);
         connection->close(connection);
     } else {
         respHeader.messageHeader.messageSize = tmpPos;
         tmpPos = 0;
         UA_SecureConversationMessageHeader_encodeBinary(&respHeader, &resp_msg, &tmpPos);
-
-        if(connection->write(connection, &resp_msg,
-                             respHeader.messageHeader.messageSize) != UA_STATUSCODE_GOOD)
-            connection->releaseBuffer(connection, &resp_msg);
+        resp_msg.length = respHeader.messageHeader.messageSize;
+        connection->send(connection, &resp_msg);
     }
 
     UA_OpenSecureChannelResponse_deleteMembers(&p);
@@ -173,8 +171,8 @@ static void invoke_service(UA_Server *server, UA_SecureChannel *channel, UA_UInt
         response->serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;
     } else if(session->activated == UA_FALSE) {
         response->serviceResult = UA_STATUSCODE_BADSESSIONNOTACTIVATED;
-        /* the session is invalidated FIXME: do this delayed*/
-        UA_SessionManager_removeSession(&server->sessionManager, server, &request->authenticationToken);
+        /* /\* the session is invalidated FIXME: do this delayed*\/ */
+        /* UA_SessionManager_removeSession(&server->sessionManager, server, &request->authenticationToken); */
     } else {
         UA_Session_updateLifetime(session);
         service(server, session, request, response);
@@ -224,22 +222,16 @@ static void processMSG(UA_Connection *connection, UA_Server *server, const UA_By
     UA_SequenceHeader sequenceHeader;
     retval = UA_UInt32_decodeBinary(msg, pos, &tokenId);
     retval |= UA_SequenceHeader_decodeBinary(msg, pos, &sequenceHeader);
-#ifndef EXTENSION_STATELESS
-    if(retval != UA_STATUSCODE_GOOD || tokenId==0) //0 is invalid
-#else
-    if(retval != UA_STATUSCODE_GOOD)
-#endif
+    if(retval != UA_STATUSCODE_GOOD || tokenId == 0) // 0 is invalid
         return;
 
-    if(clientChannel != &anonymousChannel){
-        if(tokenId!=clientChannel->securityToken.tokenId){
-            //is client using a newly issued token?
-            if(tokenId==clientChannel->nextSecurityToken.tokenId){ //tokenId is not 0
-                UA_SecureChannel_revolveTokens(clientChannel);
-            }else{
-                //FIXME: how to react to this, what do we have to return? Or just kill the channel
-            }
+    if(clientChannel != &anonymousChannel && tokenId != clientChannel->securityToken.tokenId) {
+        if(tokenId != clientChannel->nextSecurityToken.tokenId) {
+            /* close the securechannel but keep the connection open */
+            Service_CloseSecureChannel(server, clientChannel->securityToken.channelId);
+            return;
         }
+        UA_SecureChannel_revolveTokens(clientChannel);
     }
 
     /* Read the request type */
@@ -415,7 +407,6 @@ void UA_Server_processBinaryMessage(UA_Server *server, UA_Connection *connection
         }
 
         size_t targetpos = pos - 8 + tcpMessageHeader.messageSize;
-
         switch(tcpMessageHeader.messageTypeAndFinal & 0xffffff) {
         case UA_MESSAGETYPEANDFINAL_HELF & 0xffffff:
             processHEL(connection, msg, &pos);
@@ -425,18 +416,16 @@ void UA_Server_processBinaryMessage(UA_Server *server, UA_Connection *connection
             break;
         case UA_MESSAGETYPEANDFINAL_MSGF & 0xffffff:
 #ifndef EXTENSION_STATELESS
-            if(connection->state != UA_CONNECTION_ESTABLISHED){
+            if(connection->state != UA_CONNECTION_ESTABLISHED) {
                 connection->close(connection);
-                UA_ByteString_deleteMembers(msg);
                 return;
-            }else
+            } else
 #endif
                 processMSG(connection, server, msg, &pos);
             break;
         case UA_MESSAGETYPEANDFINAL_CLOF & 0xffffff:
             processCLO(connection, server, msg, &pos);
             connection->close(connection);
-            UA_ByteString_deleteMembers(msg);
             return;
         }
 
@@ -447,5 +436,4 @@ void UA_Server_processBinaryMessage(UA_Server *server, UA_Connection *connection
             pos = targetpos;
         }
     } while(msg->length > (UA_Int32)pos);
-    UA_ByteString_deleteMembers(msg);
 }

+ 1 - 1
src/server/ua_server_internal.h

@@ -38,7 +38,7 @@ struct UA_Server {
 
     /* Communication */
     size_t networkLayersSize;
-    UA_ServerNetworkLayer *networkLayers;
+    UA_ServerNetworkLayer **networkLayers;
 
     /* Security */
     UA_ByteString serverCertificate;

+ 17 - 8
src/server/ua_server_worker.c

@@ -51,15 +51,24 @@ static void processJobs(UA_Server *server, UA_Job *jobs, UA_Int32 jobsSize) {
     for (UA_Int32 i = 0; i < jobsSize; i++) {
         UA_Job *job = &jobs[i];
         switch(job->type) {
-        case UA_JOBTYPE_BINARYMESSAGE:
-            UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
-                                           &job->job.binaryMessage.message);
+        case UA_JOBTYPE_NOTHING:
             break;
         case UA_JOBTYPE_DETACHCONNECTION:
             UA_Connection_detachSecureChannel(job->job.closeConnection);
             break;
+        case UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER:
+            UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
+                                           &job->job.binaryMessage.message);
+            UA_Connection *connection = job->job.binaryMessage.connection;
+            connection->releaseRecvBuffer(connection, &job->job.binaryMessage.message);
+            break;
+        case UA_JOBTYPE_BINARYMESSAGE_ALLOCATED:
+            UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
+                                           &job->job.binaryMessage.message);
+            UA_ByteString_deleteMembers(&job->job.binaryMessage.message);
+            break;
         case UA_JOBTYPE_METHODCALL:
-        case UA_JOBTYPE_DELAYEDMETHODCALL:
+        case UA_JOBTYPE_METHODCALL_DELAYED:
             job->job.methodCall.method(server, job->job.methodCall.data);
             break;
         default:
@@ -569,7 +578,7 @@ UA_StatusCode result = UA_STATUSCODE_GOOD;
 
     /* Start the networklayers */
     for(size_t i = 0; i < server->networkLayersSize; i++)
-        result |= server->networkLayers[i].start(&server->networkLayers[i], &server->logger);
+        result |= server->networkLayers[i]->start(server->networkLayers[i], server->logger);
 
     return result;
 }
@@ -584,7 +593,7 @@ UA_StatusCode UA_Server_run_mainloop(UA_Server *server, UA_Boolean *running) {
 
     /* Get work from the networklayer */
     for(size_t i = 0; i < server->networkLayersSize; i++) {
-        UA_ServerNetworkLayer *nl = &server->networkLayers[i];
+        UA_ServerNetworkLayer *nl = server->networkLayers[i];
         UA_Job *jobs;
         UA_Int32 jobsSize;
         if(*running) {
@@ -593,12 +602,12 @@ UA_StatusCode UA_Server_run_mainloop(UA_Server *server, UA_Boolean *running) {
             else
                 jobsSize = nl->getJobs(nl, &jobs, 0);
         } else
-            jobsSize = server->networkLayers[i].stop(nl, &jobs);
+            jobsSize = server->networkLayers[i]->stop(nl, &jobs);
 
 #ifdef UA_MULTITHREADING
         /* Filter out delayed work */
         for(UA_Int32 k=0;k<jobsSize;k++) {
-            if(jobs[k].type != UA_JOBTYPE_DELAYEDMETHODCALL)
+            if(jobs[k].type != UA_JOBTYPE_METHODCALL_DELAYED)
                 continue;
             addDelayedJob(server, &jobs[k]);
             jobs[k].type = UA_JOBTYPE_NOTHING;

+ 61 - 40
src/ua_connection.c

@@ -16,88 +16,109 @@ void UA_Connection_init(UA_Connection *connection) {
     connection->sockfd = 0;
     connection->handle = UA_NULL;
     UA_ByteString_init(&connection->incompleteMessage);
-    connection->write = UA_NULL;
+    connection->send = UA_NULL;
     connection->close = UA_NULL;
     connection->recv = UA_NULL;
-    connection->getBuffer = UA_NULL;
-    connection->releaseBuffer = UA_NULL;
+    connection->getSendBuffer = UA_NULL;
+    connection->releaseSendBuffer = UA_NULL;
+    connection->releaseRecvBuffer = UA_NULL;
 }
 
 void UA_Connection_deleteMembers(UA_Connection *connection) {
     UA_ByteString_deleteMembers(&connection->incompleteMessage);
 }
 
-UA_ByteString UA_Connection_completeMessages(UA_Connection *connection, UA_ByteString received)
-{
+UA_Job UA_Connection_completeMessages(UA_Connection *connection, UA_ByteString received) {
+    UA_Job job = (UA_Job){.type = UA_JOBTYPE_NOTHING};
     if(received.length == -1)
-        return received;
+        return job;
 
-    /* concat the existing incomplete message with the new message */
     UA_ByteString current;
-    if(connection->incompleteMessage.length < 0)
+    if(connection->incompleteMessage.length <= 0)
         current = received;
     else {
+        /* concat the existing incomplete message with the new message */
         current.data = UA_realloc(connection->incompleteMessage.data,
                                   connection->incompleteMessage.length + received.length);
         if(!current.data) {
             /* not enough memory */
             UA_ByteString_deleteMembers(&connection->incompleteMessage);
-            connection->incompleteMessage.length = -1;
-            UA_ByteString_deleteMembers(&received);
-            received.length = -1;
-            return received;
+            connection->releaseRecvBuffer(connection, &received);
+            return job;
         }
         UA_memcpy(current.data + connection->incompleteMessage.length, received.data, received.length);
         current.length = connection->incompleteMessage.length + received.length;
-        UA_ByteString_deleteMembers(&received);
+        connection->releaseRecvBuffer(connection, &received);
         UA_ByteString_init(&connection->incompleteMessage);
     }
 
-    /* find the first non-complete message */
-    size_t end_pos = 0; // the end of the last complete message
-    while(current.length - end_pos >= 16) {
-        if(!(current.data[0] == 'M' && current.data[1] == 'S' && current.data[2] == 'G') &&
-           !(current.data[0] == 'O' && current.data[1] == 'P' && current.data[2] == 'N') &&
-           !(current.data[0] == 'H' && current.data[1] == 'E' && current.data[2] == 'L') &&
-           !(current.data[0] == 'A' && current.data[1] == 'C' && current.data[2] == 'K') &&
-           !(current.data[0] == 'C' && current.data[1] == 'L' && current.data[2] == 'O')) {
-            current.length = end_pos; // throw the remaining bytestring away
+    /* the while loop sets pos to the first element after the last complete message. if a message
+       contains garbage, the buffer length is set to contain only the "good" messages before. */
+    size_t pos = 0;
+    while(current.length - pos >= 16) {
+        UA_UInt32 msgtype = current.data[pos] + (current.data[pos+1] << 8) + (current.data[pos+2] << 16);
+        if(msgtype != ('M' + ('S' << 8) + ('G' << 16)) &&
+           msgtype != ('O' + ('P' << 8) + ('N' << 16)) &&
+           msgtype != ('H' + ('E' << 8) + ('L' << 16)) &&
+           msgtype != ('A' + ('C' << 8) + ('K' << 16)) &&
+           msgtype != ('C' + ('L' << 8) + ('O' << 16))) {
+            /* the message type is not recognized. throw the remaining bytestring away */
+            current.length = pos;
             break;
         }
         UA_Int32 length = 0;
-        size_t pos = end_pos + 4;
-        UA_StatusCode retval = UA_Int32_decodeBinary(&current, &pos, &length);
+        size_t length_pos = pos + 4;
+        UA_StatusCode retval = UA_Int32_decodeBinary(&current, &length_pos, &length);
         if(retval != UA_STATUSCODE_GOOD || length < 16 ||
            length > (UA_Int32)connection->localConf.maxMessageSize) {
-            current.length = end_pos; // throw the remaining bytestring away
+            /* the message size is not allowed. throw the remaining bytestring away */
+            current.length = pos;
             break;
         }
-        if(length + (UA_Int32)end_pos > current.length)
-            break; // the message is incomplete
-        end_pos += length;
+        if(length + (UA_Int32)pos > current.length)
+            break; /* the message is incomplete. keep the beginning */
+        pos += length;
     }
 
     if(current.length == 0) {
         /* throw everything away */
-        UA_ByteString_deleteMembers(&current);
-        current.length = -1;
-        return current;
+        if(current.data == received.data)
+            connection->releaseRecvBuffer(connection, &received);
+        else
+            UA_ByteString_deleteMembers(&current);
+        return job;
     }
 
-    if(end_pos == 0) {
+    if(pos == 0) {
         /* no complete message in current */
-        connection->incompleteMessage = current;
-        UA_ByteString_init(&current);
-    } else if(current.length != (UA_Int32)end_pos) {
+        if(current.data == received.data) {
+            /* copy the data into the connection */
+            UA_ByteString_copy(&current, &connection->incompleteMessage);
+            connection->releaseRecvBuffer(connection, &received);
+        } else {
+            /* the data is already copied off the network stack */
+            connection->incompleteMessage = current;
+        }
+        return job;
+    }
+
+    if(current.length != (UA_Int32)pos) {
         /* there is an incomplete message at the end of current */
-        connection->incompleteMessage.data = UA_malloc(current.length - end_pos);
+        connection->incompleteMessage.data = UA_malloc(current.length - pos);
         if(connection->incompleteMessage.data) {
-            UA_memcpy(connection->incompleteMessage.data, &current.data[end_pos], current.length - end_pos);
-            connection->incompleteMessage.length = current.length - end_pos;
+            UA_memcpy(connection->incompleteMessage.data, &current.data[pos], current.length - pos);
+            connection->incompleteMessage.length = current.length - pos;
         }
-        current.length = end_pos;
+        current.length = pos;
     }
-    return current;
+
+    job.job.binaryMessage.message = current;
+    job.job.binaryMessage.connection = connection;
+    if(current.data == received.data)
+        job.type = UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER;
+    else
+        job.type = UA_JOBTYPE_BINARYMESSAGE_ALLOCATED;
+    return job;
 }
 
 void UA_Connection_detachSecureChannel(UA_Connection *connection) {

+ 6 - 6
src/ua_securechannel.c

@@ -124,7 +124,8 @@ UA_StatusCode UA_SecureChannel_sendBinaryMessage(UA_SecureChannel *channel, UA_U
     seqHeader.requestId = requestId;
 
     UA_ByteString message;
-    UA_StatusCode retval = connection->getBuffer(connection, &message);
+    UA_StatusCode retval = connection->getSendBuffer(connection, connection->remoteConf.recvBufferSize,
+                                                     &message);
     if(retval != UA_STATUSCODE_GOOD)
         return retval;
 
@@ -133,7 +134,7 @@ UA_StatusCode UA_SecureChannel_sendBinaryMessage(UA_SecureChannel *channel, UA_U
     retval |= UA_encodeBinary(content, contentType, &message, &messagePos);
 
     if(retval != UA_STATUSCODE_GOOD) {
-        connection->releaseBuffer(connection, &message);
+        connection->releaseSendBuffer(connection, &message);
         return retval;
     }
 
@@ -149,9 +150,8 @@ UA_StatusCode UA_SecureChannel_sendBinaryMessage(UA_SecureChannel *channel, UA_U
     UA_SecureConversationMessageHeader_encodeBinary(&respHeader, &message, &messagePos);
     UA_SymmetricAlgorithmSecurityHeader_encodeBinary(&symSecHeader, &message, &messagePos);
     UA_SequenceHeader_encodeBinary(&seqHeader, &message, &messagePos);
-    
-    retval = connection->write(connection, &message, respHeader.messageHeader.messageSize);
-    if(retval != UA_STATUSCODE_GOOD)
-        connection->releaseBuffer(connection, &message);
+    message.length = respHeader.messageHeader.messageSize;
+
+    retval = connection->send(connection, &message);
     return retval;
 }

+ 1 - 0
src/ua_types.c

@@ -97,6 +97,7 @@ void UA_String_init(UA_String *p) {
 void UA_String_deleteMembers(UA_String *p) {
     UA_free(p->data);
     p->data = UA_NULL;
+    p->length = -1;
 }
 
 UA_StatusCode UA_String_copy(UA_String const *src, UA_String *dst) {