Browse Source

Merge pull request #287 from acplt/simple_tcp

simplfy closing connections in the network layer
Sten Grüner 9 years ago
parent
commit
4d5ddc81f8
3 changed files with 62 additions and 140 deletions
  1. 60 137
      examples/networklayer_tcp.c
  2. 1 1
      include/ua_server.h
  3. 1 2
      src/server/ua_server_worker.c

+ 60 - 137
examples/networklayer_tcp.c

@@ -57,21 +57,30 @@ static UA_StatusCode socket_write(UA_Connection *connection, UA_ByteString *buf,
     return UA_STATUSCODE_GOOD;
 }
 
+static void socket_close(UA_Connection *connection) {
+    connection->state = UA_CONNECTION_CLOSED;
+    shutdown(connection->sockfd,2);
+    CLOSESOCKET(connection->sockfd);
+}
+
 static UA_StatusCode socket_recv(UA_Connection *connection, UA_ByteString *response, UA_UInt32 timeout) {
     response->data = malloc(connection->localConf.recvBufferSize);
-    if(!response->data)
-        return UA_STATUSCODE_BADOUTOFMEMORY;
+    if(!response->data) {
+        UA_ByteString_init(response);
+        return UA_STATUSCODE_GOOD; /* not enough memory retry */
+    }
     struct timeval tmptv = {0, timeout * 1000};
     if(0 != setsockopt(connection->sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tmptv, sizeof(struct timeval))){
 		free(response->data);
+        UA_ByteString_init(response);
+        socket_close(connection);
         return UA_STATUSCODE_BADINTERNALERROR;
     }
     int ret = recv(connection->sockfd, (char*)response->data, connection->localConf.recvBufferSize, 0);
 	if(ret == 0) {
 		free(response->data);
         UA_ByteString_init(response);
-        connection->close(connection);
-        connection->state = UA_CONNECTION_CLOSED;
+        socket_close(connection);
         return UA_CONNECTION_CLOSED; /* ret == 0 -> server has closed the connection */
 	} else if(ret < 0) {
         free(response->data);
@@ -79,12 +88,11 @@ static UA_StatusCode socket_recv(UA_Connection *connection, UA_ByteString *respo
 #ifdef _WIN32
 		if(WSAGetLastError() == WSAEINTR || WSAGetLastError() == WSAEWOULDBLOCK) {
 #else
-		if (errno == EAGAIN || errno == EWOULDBLOCK) {
+		if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
 #endif
             return UA_STATUSCODE_GOOD; /* retry */
         } else {
-            connection->close(connection);
-            connection->state = UA_CONNECTION_CLOSED;
+            socket_close(connection);
             return UA_STATUSCODE_BADCONNECTIONCLOSED;
         }
     }
@@ -93,12 +101,6 @@ static UA_StatusCode socket_recv(UA_Connection *connection, UA_ByteString *respo
     return UA_STATUSCODE_GOOD;
 }
 
-static void socket_close(UA_Connection *connection) {
-    connection->state = UA_CONNECTION_CLOSED;
-    shutdown(connection->sockfd,2);
-    CLOSESOCKET(connection->sockfd);
-}
-
 static UA_StatusCode socket_set_nonblocking(UA_Int32 sockfd) {
 #ifdef _WIN32
     u_long iMode = 1;
@@ -112,6 +114,11 @@ static UA_StatusCode socket_set_nonblocking(UA_Int32 sockfd) {
     return UA_STATUSCODE_GOOD;
 }
 
+static void FreeConnectionCallback(UA_Server *server, void *ptr) {
+    UA_Connection_deleteMembers((UA_Connection*)ptr);
+     free(ptr);
+ }
+
 /***************************/
 /* Server NetworkLayer TCP */
 /***************************/
@@ -167,12 +174,6 @@ typedef struct {
         UA_Connection *connection;
         UA_Int32 sockfd;
     } *mappings;
-
-    /* to-be-deleted connections */
-    struct DeleteList {
-        struct DeleteList *next;
-        UA_Connection *connection;
-    } *deletes;
 } ServerNetworkLayerTCP;
 
 static UA_StatusCode ServerNetworkLayerGetBuffer(UA_Connection *connection, UA_ByteString *buf) {
@@ -221,23 +222,8 @@ static void ServerNetworkLayerTCP_closeConnection(UA_Connection *connection) {
         return;
     connection->state = UA_CONNECTION_CLOSED;
 #endif
-    socket_close(connection);
-    ServerNetworkLayerTCP *layer = (ServerNetworkLayerTCP*)connection->handle;
-    struct DeleteList *d = malloc(sizeof(struct DeleteList));
-    if(!d){
-        return;
-    }
-    d->connection = connection;
-#ifdef UA_MULTITHREADING
-    while(1) {
-        d->next = layer->deletes;
-        if(uatomic_cmpxchg(&layer->deletes, d->next, d) == d->next)
-            break;
-    }
-#else
-    d->next = layer->deletes;
-    layer->deletes = d;
-#endif
+    shutdown(connection->sockfd, 2); /* only shut down here. this triggers the select, where the socket
+                                        is closed in the main thread */
 }
 
 /* call only from the single networking thread */
@@ -305,48 +291,19 @@ static UA_StatusCode ServerNetworkLayerTCP_start(UA_ServerNetworkLayer *nl, UA_L
     return UA_STATUSCODE_GOOD;
 }
 
-/* delayed callback that frees old connections */
-static void freeConnections(UA_Server *server, struct DeleteList *d) {
-    while(d) {
-        UA_Connection_deleteMembers(d->connection);
-        free(d->connection);
-        struct DeleteList *old = d;
-        d = d->next;
-        free(old);
-    }
-}
-
-/* remove the closed sockets from the mappings array */
-static void removeMappings(ServerNetworkLayerTCP *layer, struct DeleteList *d) {
-    while(d) {
-        size_t i = 0;
-        for(; i < layer->mappingsSize; i++) {
-            if(layer->mappings[i].sockfd == d->connection->sockfd)
-                break;
-        }
-        if(i >= layer->mappingsSize)
-            continue;
-        layer->mappingsSize--;
-        layer->mappings[i] = layer->mappings[layer->mappingsSize];
-        d = d->next;
-    }
-}
-
 static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job **jobs, UA_UInt16 timeout) {
     ServerNetworkLayerTCP *layer = nl->handle;
-    /* remove the deleted sockets from the array */
-    struct DeleteList *deletes;
-#ifdef UA_MULTITHREADING
-        deletes = uatomic_xchg(&layer->deletes, NULL);
-#else
-        deletes = layer->deletes;
-        layer->deletes = NULL;
-#endif
-    removeMappings(layer, deletes);
-
     setFDSet(layer);
     struct timeval tmptv = {0, timeout};
-    UA_Int32 resultsize = select(layer->highestfd+1, &layer->fdset, NULL, NULL, &tmptv);
+    UA_Int32 resultsize;
+ repeat_select:
+    resultsize = select(layer->highestfd+1, &layer->fdset, NULL, NULL, &tmptv);
+    if(resultsize < 0) {
+        if(errno == EINTR)
+            goto repeat_select;
+        *jobs = NULL;
+        return resultsize;
+    }
 
     /* accept new connections (can only be a single one) */
     if(FD_ISSET(layer->serversockfd, &layer->fdset)) {
@@ -356,40 +313,18 @@ static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job
         int newsockfd = accept(layer->serversockfd, (struct sockaddr *) &cli_addr, &cli_len);
         int i = 1;
         setsockopt(newsockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&i, sizeof(i));
-        if (newsockfd >= 0) {
+        if(newsockfd >= 0) {
             socket_set_nonblocking(newsockfd);
             ServerNetworkLayerTCP_add(layer, newsockfd);
         }
     }
 
-    if(!deletes && resultsize <= 0) {
-        *jobs = NULL;
+    /* alloc enough space for a cleanup-connection and free-connection job per resulted socket */
+    if(resultsize == 0)
+        return 0;
+    UA_Job *js = malloc(sizeof(UA_Job) * resultsize * 2);
+    if(!js)
         return 0;
-    }
-    if(resultsize < 0)
-        resultsize = 0;
-    UA_Int32 deletesJob = 0;
-    if(deletes)
-        deletesJob = 1;
-        
-    UA_Job *items = malloc(sizeof(UA_Job) * (resultsize + deletesJob));
-    if(deletes && !items) {
-        /* abort. reattach the deletes so that they get deleted eventually. */
-#ifdef UA_MULTITHREADING
-        struct DeleteList *last_delete;
-        while(deletes) {
-            last_delete = deletes;
-            deletes = deletes->next;
-        }
-        while(1) {
-            last_delete->next = layer->deletes;
-            if(uatomic_cmpxchg(&layer->deletes, last_delete->next, deletes) == last_delete->next)
-                break;
-        }
-#else
-        layer->deletes = deletes;
-#endif
-    }
 
     /* read from established sockets */
     UA_Int32 j = 0;
@@ -400,63 +335,52 @@ static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job
         if(socket_recv(layer->mappings[i].connection, &buf, 0) == UA_STATUSCODE_GOOD) {
             if(!buf.data)
                 continue;
-            items[j].type = UA_JOBTYPE_BINARYMESSAGE;
-            items[j].job.binaryMessage.message = buf;
-            items[j].job.binaryMessage.connection = layer->mappings[i].connection;
-            buf.data = NULL;
+            js[j].type = UA_JOBTYPE_BINARYMESSAGE;
+            js[j].job.binaryMessage.message = buf;
+            js[j].job.binaryMessage.connection = layer->mappings[i].connection;
         } else {
-            items[j].type = UA_JOBTYPE_CLOSECONNECTION;
-            items[j].job.closeConnection = layer->mappings[i].connection;
+            UA_Connection *c = layer->mappings[i].connection;
+            /* the socket is already closed */
+            js[j].type = UA_JOBTYPE_DETACHCONNECTION;
+            js[j].job.closeConnection = layer->mappings[i].connection;
+            layer->mappings[i] = layer->mappings[layer->mappingsSize-1];
+            layer->mappingsSize--;
+            j++;
+            i--; // iterate over the same index again
+            js[j].type = UA_JOBTYPE_DELAYEDMETHODCALL;
+            js[j].job.methodCall.method = FreeConnectionCallback;
+            js[j].job.methodCall.data = c;
         }
         j++;
     }
 
-    /* add the delayed job that frees the connections */
-    if(deletes) {
-        items[j].type = UA_JOBTYPE_DELAYEDMETHODCALL;
-        items[j].job.methodCall.data = deletes;
-        items[j].job.methodCall.method = (void (*)(UA_Server *server, void *data))freeConnections;
-        j++;
-    }
-
-    /* free the array if there is no job */
-    if(j == 0) {
-        free(items);
-        *jobs = NULL;
-    } else
-        *jobs = items;
+    *jobs = js;
     return j;
 }
 
 static UA_Int32 ServerNetworkLayerTCP_stop(UA_ServerNetworkLayer *nl, UA_Job **jobs) {
     ServerNetworkLayerTCP *layer = nl->handle;
-    struct DeleteList *deletes;
-#ifdef UA_MULTITHREADING
-        deletes = uatomic_xchg(&layer->deletes, NULL);
-#else
-        deletes = layer->deletes;
-        layer->deletes = NULL;
-#endif
-    removeMappings(layer, deletes);
-    UA_Job *items = malloc(sizeof(UA_Job) * layer->mappingsSize);
+    UA_Job *items = malloc(sizeof(UA_Job) * layer->mappingsSize * 2);
     if(!items)
         return 0;
     for(size_t i = 0; i < layer->mappingsSize; i++) {
-        items[i].type = UA_JOBTYPE_CLOSECONNECTION;
-        items[i].job.closeConnection = layer->mappings[i].connection;
+        socket_close(layer->mappings[i].connection);
+        items[i*2].type = UA_JOBTYPE_DETACHCONNECTION;
+        items[i*2].job.closeConnection = layer->mappings[i].connection;
+        items[(i*2)+1].type = UA_JOBTYPE_DELAYEDMETHODCALL;
+        items[(i*2)+1].job.methodCall.method = FreeConnectionCallback;
+        items[(i*2)+1].job.methodCall.data = layer->mappings[i].connection;
     }
 #ifdef _WIN32
     WSACleanup();
 #endif
     *jobs = items;
-    return layer->mappingsSize;
+    return layer->mappingsSize*2;
 }
 
 /* run only when the server is stopped */
 static void ServerNetworkLayerTCP_deleteMembers(UA_ServerNetworkLayer *nl) {
     ServerNetworkLayerTCP *layer = nl->handle;
-    removeMappings(layer, layer->deletes);
-    freeConnections(NULL, layer->deletes);
 #ifndef UA_MULTITHREADING
     UA_ByteString_deleteMembers(&layer->buffer);
 #endif
@@ -484,7 +408,6 @@ UA_ServerNetworkLayer ServerNetworkLayerTCP_new(UA_ConnectionConfig conf, UA_UIn
     layer->mappingsSize = 0;
     layer->mappings = NULL;
     layer->port = port;
-    layer->deletes = NULL;
     char hostname[256];
     gethostname(hostname, 255);
     UA_String_copyprintf("opc.tcp://%s:%d", &nl.discoveryUrl, hostname, port);

+ 1 - 1
include/ua_server.h

@@ -146,7 +146,7 @@ UA_Server_AddMonodirectionalReference(UA_Server *server, UA_NodeId sourceNodeId,
 typedef struct {
     enum {
         UA_JOBTYPE_NOTHING,
-        UA_JOBTYPE_CLOSECONNECTION,
+        UA_JOBTYPE_DETACHCONNECTION,
         UA_JOBTYPE_BINARYMESSAGE,
         UA_JOBTYPE_METHODCALL,
         UA_JOBTYPE_DELAYEDMETHODCALL,

+ 1 - 2
src/server/ua_server_worker.c

@@ -35,9 +35,8 @@ static void processJobs(UA_Server *server, UA_Job *jobs, size_t jobsSize) {
             UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
                                            &job->job.binaryMessage.message);
             break;
-        case UA_JOBTYPE_CLOSECONNECTION:
+        case UA_JOBTYPE_DETACHCONNECTION:
             UA_Connection_detachSecureChannel(job->job.closeConnection);
-            job->job.closeConnection->close(job->job.closeConnection);
             break;
         case UA_JOBTYPE_METHODCALL:
         case UA_JOBTYPE_DELAYEDMETHODCALL: