Explorar el Código

remove closed sockets before select

Julius Pfrommer hace 8 años
padre
commit
e861392b1c
Se han modificado 1 ficheros con 49 adiciones y 26 borrados
  1. 49 26
      plugins/ua_network_tcp.c

+ 49 - 26
plugins/ua_network_tcp.c

@@ -440,20 +440,52 @@ ServerNetworkLayerTCP_start(UA_ServerNetworkLayer *nl, UA_Logger logger) {
     return UA_STATUSCODE_GOOD;
 }
 
+static size_t
+removeClosedConnections(ServerNetworkLayerTCP *layer, UA_Job *js) {
+    size_t c = 0;
+    for(size_t i = 0; i < layer->mappingsSize; ++i) {
+        if(layer->mappings[i].connection &&
+           layer->mappings[i].connection->state != UA_CONNECTION_CLOSED)
+            continue;
+        /* the socket was closed from remote */
+        UA_Connection *conn = layer->mappings[i].connection;
+        js[c].type = UA_JOBTYPE_DETACHCONNECTION;
+        js[c].job.closeConnection = conn;
+        layer->mappings[i] = layer->mappings[layer->mappingsSize-1];
+        --layer->mappingsSize;
+        ++c;
+        js[c].type = UA_JOBTYPE_METHODCALL_DELAYED;
+        js[c].job.methodCall.method = FreeConnectionCallback;
+        js[c].job.methodCall.data = conn;
+        ++c;
+    }
+    return c;
+}
+
 static size_t
 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job **jobs, UA_UInt16 timeout) {
+    /* Every open socket can generate two jobs */
     ServerNetworkLayerTCP *layer = nl->handle;
+    UA_Job *js = malloc(sizeof(UA_Job) * (size_t)((layer->mappingsSize * 2)));
+    if(!js)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+
+    /* Remove closed sockets */
+    size_t totalJobs = removeClosedConnections(layer, js);
+
+    /* Listen on open sockets (including the server) */
     fd_set fdset, errset;
     UA_Int32 highestfd = setFDSet(layer, &fdset);
     setFDSet(layer, &errset);
     struct timeval tmptv = {0, timeout * 1000};
     UA_Int32 resultsize = select(highestfd+1, &fdset, NULL, &errset, &tmptv);
-    if(resultsize <= 0) {
+    if(totalJobs == 0 && resultsize <= 0) {
+        free(js);
         *jobs = NULL;
         return 0;
     }
 
-    /* accept new connections (can only be a single one) */
+    /* Accept new connection via the server socket (can only be a single one) */
     if(UA_fd_isset(layer->serversockfd, &fdset)) {
         --resultsize;
         SOCKET newsockfd = accept((SOCKET)layer->serversockfd, NULL, NULL);
@@ -464,25 +496,16 @@ ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job **jobs, UA_UInt1
 #endif
         {
             socket_set_nonblocking(newsockfd);
-            /* Send messages directly and do wait to merge packets (disable
-               Nagle's algorithm) */
+            /* Do not merge packets on the socket (disable Nagle's algorithm) */
             int i = 1;
             setsockopt(newsockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&i, sizeof(i));
             ServerNetworkLayerTCP_add(layer, (UA_Int32)newsockfd);
         }
     }
 
-    /* alloc enough space for a cleanup-connection and free-connection job per
-       resulted socket */
-    if(resultsize == 0)
-        return 0;
-    UA_Job *js = malloc(sizeof(UA_Job) * (size_t)resultsize * 2);
-    if(!js)
-        return 0;
-
-    /* read from established sockets */
-    size_t j = 0;
+    /* Read from established sockets */
     UA_ByteString buf = UA_BYTESTRING_NULL;
+    size_t j = 0;
     for(size_t i = 0; i < layer->mappingsSize && j < (size_t)resultsize; ++i) {
         if(!UA_fd_isset(layer->mappings[i].sockfd, &errset) &&
            !UA_fd_isset(layer->mappings[i].sockfd, &fdset))
@@ -490,34 +513,34 @@ ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job **jobs, UA_UInt1
 
         UA_StatusCode retval = socket_recv(layer->mappings[i].connection, &buf, 0);
         if(retval == UA_STATUSCODE_GOOD) {
-            js[j].job.binaryMessage.connection = layer->mappings[i].connection;
-            js[j].job.binaryMessage.message = buf;
-            js[j].type = UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER;
+            js[totalJobs + j].job.binaryMessage.connection = layer->mappings[i].connection;
+            js[totalJobs + j].job.binaryMessage.message = buf;
+            js[totalJobs + j].type = UA_JOBTYPE_BINARYMESSAGE_NETWORKLAYER;
             ++j;
         } else if (retval == UA_STATUSCODE_BADCONNECTIONCLOSED) {
             UA_Connection *c = layer->mappings[i].connection;
             UA_LOG_INFO(layer->logger, UA_LOGCATEGORY_NETWORK,
                         "Connection %i | Connection closed from remote", c->sockfd);
             /* the socket was closed from remote */
-            js[j].type = UA_JOBTYPE_DETACHCONNECTION;
-            js[j].job.closeConnection = layer->mappings[i].connection;
+            js[totalJobs + j].type = UA_JOBTYPE_DETACHCONNECTION;
+            js[totalJobs + j].job.closeConnection = c;
             layer->mappings[i] = layer->mappings[layer->mappingsSize-1];
             --layer->mappingsSize;
-            ++j;
-            js[j].type = UA_JOBTYPE_METHODCALL_DELAYED;
-            js[j].job.methodCall.method = FreeConnectionCallback;
-            js[j].job.methodCall.data = c;
+            ++totalJobs; /* increase j only once */
+            js[totalJobs + j].type = UA_JOBTYPE_METHODCALL_DELAYED;
+            js[totalJobs + j].job.methodCall.method = FreeConnectionCallback;
+            js[totalJobs + j].job.methodCall.data = c;
             ++j;
         }
     }
+    totalJobs += j;
 
-    if(j == 0) {
+    if(totalJobs == 0) {
         free(js);
         js = NULL;
     }
-
     *jobs = js;
-    return j;
+    return totalJobs;
 }
 
 static size_t