Procházet zdrojové kódy

fix use of worker mutexes with pthread_cond

The patch was provided by @janitza-luwe in #949
janitza-luwe před 7 roky
rodič
revize
51d04c438b

+ 1 - 0
src/server/ua_server.c

@@ -266,6 +266,7 @@ void UA_Server_delete(UA_Server *server) {
 
 #ifdef UA_ENABLE_MULTITHREADING
     pthread_cond_destroy(&server->dispatchQueue_condition);
+    pthread_mutex_destroy(&server->dispatchQueue_mutex);
 #endif
     UA_free(server);
 }

+ 1 - 0
src/server/ua_server_internal.h

@@ -120,6 +120,7 @@ struct UA_Server {
                                           by worker threads */
     struct DelayedJobs *delayedJobs;
     pthread_cond_t dispatchQueue_condition; /* so the workers don't spin if the queue is empty */
+    pthread_mutex_t dispatchQueue_mutex; /* mutex for access to condition variable */
     struct cds_wfcq_tail dispatchQueue_tail; /* Dispatch queue tail for the worker threads */
 #endif
 

+ 4 - 7
src/server/ua_server_worker.c

@@ -99,10 +99,6 @@ workerLoop(UA_Worker *worker) {
     UA_random_seed((uintptr_t)worker);
     rcu_register_thread();
 
-    pthread_mutex_t mutex; // required for the condition variable
-    pthread_mutex_init(&mutex, 0);
-    pthread_mutex_lock(&mutex);
-
     while(*running) {
         struct DispatchJob *dj = (struct DispatchJob*)
             cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
@@ -111,13 +107,13 @@ workerLoop(UA_Worker *worker) {
             UA_free(dj);
         } else {
             /* nothing to do. sleep until a job is dispatched (and wakes up all worker threads) */
-            pthread_cond_wait(&server->dispatchQueue_condition, &mutex);
+            pthread_mutex_lock(&server->dispatchQueue_mutex);
+            pthread_cond_wait(&server->dispatchQueue_condition, &server->dispatchQueue_mutex);
+            pthread_mutex_unlock(&server->dispatchQueue_mutex);
         }
         UA_atomic_add(counter, 1);
     }
 
-    pthread_mutex_unlock(&mutex);
-    pthread_mutex_destroy(&mutex);
     UA_ASSERT_RCU_UNLOCKED();
     rcu_barrier(); // wait for all scheduled call_rcu work to complete
     rcu_unregister_thread();
@@ -537,6 +533,7 @@ UA_StatusCode UA_Server_run_startup(UA_Server *server) {
     UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
                 "Spinning up %u worker thread(s)", server->config.nThreads);
     pthread_cond_init(&server->dispatchQueue_condition, 0);
+    pthread_mutex_init(&server->dispatchQueue_mutex, 0);
     server->workers = UA_malloc(server->config.nThreads * sizeof(UA_Worker));
     if(!server->workers)
         return UA_STATUSCODE_BADOUTOFMEMORY;