Browse Source

add a condition variable so the worker threads don't spin

Julius Pfrommer 10 years ago
parent
commit
15faebb8ed
3 changed files with 23 additions and 3 deletions
  1. 2 1
      src/server/ua_server.c
  2. 1 0
      src/server/ua_server_internal.h
  3. 20 2
      src/server/ua_server_worker.c

+ 2 - 1
src/server/ua_server.c

@@ -62,10 +62,11 @@ void UA_Server_delete(UA_Server *server) {
     UA_NodeStore_delete(server->nodestore);
     UA_ByteString_deleteMembers(&server->serverCertificate);
     UA_Array_delete(server->endpointDescriptions, server->endpointDescriptionsSize, &UA_TYPES[UA_ENDPOINTDESCRIPTION]);
-    UA_free(server);
 #ifdef UA_MULTITHREADING
+    pthread_cond_destroy(&server->dispatchQueue_condition); // so the workers don't spin if the queue is empty
     rcu_barrier(); // wait for all scheduled call_rcu work to complete
 #endif
+    UA_free(server);
 }
 
 UA_Server * UA_Server_new(void) {

+ 1 - 0
src/server/ua_server_internal.h

@@ -57,6 +57,7 @@ struct UA_Server {
     // worker threads wait on the queue
 	struct cds_wfcq_head dispatchQueue_head;
 	struct cds_wfcq_tail dispatchQueue_tail;
+    pthread_cond_t dispatchQueue_condition; // so the workers don't spin if the queue is empty
 #endif
 
     LIST_HEAD(UA_TimedWorkList, UA_TimedWork) timedWork;

+ 20 - 2
src/server/ua_server_worker.c

@@ -1,4 +1,9 @@
 #include <stdio.h>
+#define __USE_POSIX
+#define _XOPEN_SOURCE 500
+#define __USE_POSIX199309
+#include <sys/time.h>
+#include <time.h>
 #include "ua_server_internal.h"
 
 /**
@@ -14,7 +19,7 @@
  *    all previous work has actually finished (only for multithreading)
  */
 
-#define MAXTIMEOUT 5000 // max timeout in usec until the next main loop iteration
+#define MAXTIMEOUT 50000 // max timeout in usec until the next main loop iteration
 #define BATCHSIZE 20 // max size of worklists that are dispatched to workers
 
 static void processWork(UA_Server *server, const UA_WorkItem *work, UA_Int32 workSize) {
@@ -93,6 +98,11 @@ static void * workerLoop(struct workerStartData *startInfo) {
     UA_Server *server = startInfo->server;
     UA_free(startInfo);
     
+    pthread_mutex_t mutex; // required for the condition variable
+    pthread_mutex_init(&mutex,0);
+    pthread_mutex_lock(&mutex);
+    struct timespec to;
+
     while(*server->running) {
         struct workListNode *wln = (struct workListNode*)
             cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
@@ -100,9 +110,15 @@ static void * workerLoop(struct workerStartData *startInfo) {
             processWork(server, wln->work, wln->workSize);
             UA_free(wln->work);
             UA_free(wln);
+        } else {
+            clock_gettime(CLOCK_REALTIME, &to);
+            to.tv_sec += 2;
+            pthread_cond_timedwait(&server->dispatchQueue_condition, &mutex, &to);
         }
         uatomic_inc(c); // increase the workerCounter;
     }
+    pthread_mutex_unlock(&mutex);
+    pthread_mutex_destroy(&mutex);
    	rcu_unregister_thread();
     return UA_NULL;
 }
@@ -392,6 +408,7 @@ UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *r
     // 1) Prepare the threads
     server->running = running; // the threads need to access the variable
     server->nThreads = nThreads;
+    pthread_cond_init(&server->dispatchQueue_condition, 0);
     pthread_t *thr = UA_malloc(nThreads * sizeof(pthread_t));
     server->workerCounters = UA_malloc(nThreads * sizeof(UA_UInt32 *));
     for(UA_UInt32 i=0;i<nThreads;i++) {
@@ -439,13 +456,14 @@ UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *r
                 work[k].type = UA_WORKITEMTYPE_NOTHING;
             }
             dispatchWork(server, workSize, work);
+            if(workSize > 0)
+                pthread_cond_broadcast(&server->dispatchQueue_condition); 
 #else
             processWork(server, work, workSize);
             UA_free(work);
 #endif
         }
 
-
         // 3.3) Exit?
         if(!*running)
             break;