Browse Source

refactor(core): Use lock macros for the workqueue

Julius Pfrommer 4 years ago
parent
commit
62d283e817
2 changed files with 23 additions and 22 deletions
  1. 20 19
      src/ua_workqueue.c
  2. 3 3
      src/ua_workqueue.h

+ 20 - 19
src/ua_workqueue.c

@@ -22,13 +22,13 @@ void UA_WorkQueue_init(UA_WorkQueue *wq) {
 
 #if UA_MULTITHREADING >= 200
     wq->delayedCallbacks_checkpoint = NULL;
-    pthread_mutex_init(&wq->delayedCallbacks_accessMutex,  NULL);
+    UA_LOCK_INIT(wq->delayedCallbacks_accessMutex)
 
     /* Initialize the dispatch queue for worker threads */
     SIMPLEQ_INIT(&wq->dispatchQueue);
-    pthread_mutex_init(&wq->dispatchQueue_accessMutex, NULL);
+    UA_LOCK_INIT(wq->dispatchQueue_accessMutex)
     pthread_cond_init(&wq->dispatchQueue_condition, NULL);
-    pthread_mutex_init(&wq->dispatchQueue_conditionMutex, NULL);
+    UA_LOCK_INIT(wq->dispatchQueue_conditionMutex)
 #endif
 }
 
@@ -44,14 +44,14 @@ void UA_WorkQueue_cleanup(UA_WorkQueue *wq) {
 
     /* Execute remaining work in the dispatch queue */
     while(true) {
-        pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
+        UA_LOCK(wq->dispatchQueue_accessMutex);
         UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue);
         if(!dc) {
-            pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+            UA_UNLOCK(wq->dispatchQueue_accessMutex);
             break;
         }
         SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next);
-        pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+        UA_UNLOCK(wq->dispatchQueue_accessMutex);
         dc->callback(dc->application, dc->data);
         UA_free(dc);
     }
@@ -62,10 +62,10 @@ void UA_WorkQueue_cleanup(UA_WorkQueue *wq) {
 
 #if UA_MULTITHREADING >= 200
     wq->delayedCallbacks_checkpoint = NULL;
-    pthread_mutex_destroy(&wq->dispatchQueue_accessMutex);
+    UA_LOCK_DESTROY(wq->dispatchQueue_accessMutex);
     pthread_cond_destroy(&wq->dispatchQueue_condition);
-    pthread_mutex_destroy(&wq->dispatchQueue_conditionMutex);
-    pthread_mutex_destroy(&wq->delayedCallbacks_accessMutex);
+    UA_LOCK_DESTROY(wq->dispatchQueue_conditionMutex);
+    UA_LOCK_DESTROY(wq->delayedCallbacks_accessMutex);
 #endif
 }
 
@@ -89,18 +89,18 @@ workerLoop(UA_Worker *worker) {
         UA_atomic_addUInt32(counter, 1);
 
         /* Remove a callback from the queue */
-        pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
+        UA_LOCK(wq->dispatchQueue_accessMutex);
         UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue);
         if(dc)
             SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next);
-        pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+        UA_UNLOCK(wq->dispatchQueue_accessMutex);
 
         /* Nothing to do. Sleep until a callback is dispatched */
         if(!dc) {
-            pthread_mutex_lock(&wq->dispatchQueue_conditionMutex);
+            UA_LOCK(wq->dispatchQueue_conditionMutex);
             pthread_cond_wait(&wq->dispatchQueue_condition,
                               &wq->dispatchQueue_conditionMutex);
-            pthread_mutex_unlock(&wq->dispatchQueue_conditionMutex);
+            UA_UNLOCK(wq->dispatchQueue_conditionMutex);
             continue;
         }
 
@@ -169,9 +169,9 @@ void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb,
     dc->data = data;
 
     /* Enqueue for the worker threads */
-    pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
+    UA_LOCK(wq->dispatchQueue_accessMutex);
     SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, dc, next);
-    pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+    UA_UNLOCK(wq->dispatchQueue_accessMutex);
 
     /* Wake up sleeping workers */
     pthread_cond_broadcast(&wq->dispatchQueue_condition);
@@ -211,9 +211,9 @@ dispatchDelayedCallbacks(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
     if(wq->delayedCallbacks_checkpoint != NULL) {
         UA_DelayedCallback *iter, *tmp_iter;
         SIMPLEQ_FOREACH_SAFE(iter, &wq->delayedCallbacks, next, tmp_iter) {
-            pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
+            UA_LOCK(wq->dispatchQueue_accessMutex);
             SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, iter, next);
-            pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+            UA_UNLOCK(wq->dispatchQueue_accessMutex);
             if(iter == wq->delayedCallbacks_checkpoint)
                 break;
         }
@@ -230,7 +230,7 @@ dispatchDelayedCallbacks(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
 void
 UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
 #if UA_MULTITHREADING >= 200
-    pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
+    UA_LOCK(wq->dispatchQueue_accessMutex);
 #endif
 
     SIMPLEQ_INSERT_HEAD(&wq->delayedCallbacks, cb, next);
@@ -241,7 +241,8 @@ UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
         dispatchDelayedCallbacks(wq, cb);
         wq->delayedCallbacks_sinceDispatch = 0;
     }
-    pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+
+    UA_UNLOCK(wq->dispatchQueue_accessMutex);
 #endif
 }
 

+ 3 - 3
src/ua_workqueue.h

@@ -74,16 +74,16 @@ struct UA_WorkQueue {
 
     /* Work queue */
     SIMPLEQ_HEAD(, UA_DelayedCallback) dispatchQueue; /* Dispatch queue for the worker threads */
-    pthread_mutex_t dispatchQueue_accessMutex; /* mutex for access to queue */
+    UA_LOCK_TYPE(dispatchQueue_accessMutex) /* mutex for access to queue */
     pthread_cond_t dispatchQueue_condition; /* so the workers don't spin if the queue is empty */
-    pthread_mutex_t dispatchQueue_conditionMutex; /* mutex for access to condition variable */
+    UA_LOCK_TYPE(dispatchQueue_conditionMutex) /* mutex for access to condition variable */
 #endif
 
     /* Delayed callbacks
      * To be executed after all curretly dispatched works has finished */
     SIMPLEQ_HEAD(, UA_DelayedCallback) delayedCallbacks;
 #if UA_MULTITHREADING >= 200
-    pthread_mutex_t delayedCallbacks_accessMutex;
+    UA_LOCK_TYPE(delayedCallbacks_accessMutex)
     UA_DelayedCallback *delayedCallbacks_checkpoint;
     size_t delayedCallbacks_sinceDispatch; /* How many have been added since we
                                             * tried to dispatch callbacks? */