ua_workqueue.c 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2014-2016 (c) Sten Grüner
  7. * Copyright 2015 (c) Chris Iatrou
  8. * Copyright 2015 (c) Nick Goossens
  9. * Copyright 2015 (c) Jörg Schüler-Maroldt
  10. * Copyright 2015-2016 (c) Oleksiy Vasylyev
  11. * Copyright 2016-2017 (c) Florian Palm
  12. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  13. * Copyright 2016 (c) Lorenz Haas
  14. * Copyright 2017 (c) Jonas Green
  15. */
  16. #include "ua_workqueue.h"
  17. void UA_WorkQueue_init(UA_WorkQueue *wq) {
  18. /* Initialized the linked list for delayed callbacks */
  19. SIMPLEQ_INIT(&wq->delayedCallbacks);
  20. #ifdef UA_ENABLE_MULTITHREADING
  21. wq->delayedCallbacks_checkpoint = NULL;
  22. pthread_mutex_init(&wq->delayedCallbacks_accessMutex, NULL);
  23. /* Initialize the dispatch queue for worker threads */
  24. SIMPLEQ_INIT(&wq->dispatchQueue);
  25. pthread_mutex_init(&wq->dispatchQueue_accessMutex, NULL);
  26. pthread_cond_init(&wq->dispatchQueue_condition, NULL);
  27. pthread_mutex_init(&wq->dispatchQueue_conditionMutex, NULL);
  28. #endif
  29. }
  30. #ifdef UA_ENABLE_MULTITHREADING
  31. /* Forward declaration */
  32. static void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq);
  33. #endif
  34. void UA_WorkQueue_cleanup(UA_WorkQueue *wq) {
  35. #ifdef UA_ENABLE_MULTITHREADING
  36. /* Shut down workers */
  37. UA_WorkQueue_stop(wq);
  38. /* Execute remaining work in the dispatch queue */
  39. while(true) {
  40. pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
  41. UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue);
  42. if(!dc) {
  43. pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
  44. break;
  45. }
  46. SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next);
  47. pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
  48. dc->callback(dc->application, dc->data);
  49. UA_free(dc);
  50. }
  51. #endif
  52. /* All workers are shut down. Execute remaining delayed work here. */
  53. UA_WorkQueue_manuallyProcessDelayed(wq);
  54. #ifdef UA_ENABLE_MULTITHREADING
  55. wq->delayedCallbacks_checkpoint = NULL;
  56. pthread_mutex_destroy(&wq->dispatchQueue_accessMutex);
  57. pthread_cond_destroy(&wq->dispatchQueue_condition);
  58. pthread_mutex_destroy(&wq->dispatchQueue_conditionMutex);
  59. pthread_mutex_destroy(&wq->delayedCallbacks_accessMutex);
  60. #endif
  61. }
  62. /***********/
  63. /* Workers */
  64. /***********/
  65. #ifdef UA_ENABLE_MULTITHREADING
  66. static void *
  67. workerLoop(UA_Worker *worker) {
  68. UA_WorkQueue *wq = worker->queue;
  69. UA_UInt32 *counter = &worker->counter;
  70. volatile UA_Boolean *running = &worker->running;
  71. /* Initialize the (thread local) random seed with the ram address
  72. * of the worker. Not for security-critical entropy! */
  73. UA_random_seed((uintptr_t)worker);
  74. while(*running) {
  75. UA_atomic_addUInt32(counter, 1);
  76. /* Remove a callback from the queue */
  77. pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
  78. UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue);
  79. if(dc)
  80. SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next);
  81. pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
  82. /* Nothing to do. Sleep until a callback is dispatched */
  83. if(!dc) {
  84. pthread_mutex_lock(&wq->dispatchQueue_conditionMutex);
  85. pthread_cond_wait(&wq->dispatchQueue_condition,
  86. &wq->dispatchQueue_conditionMutex);
  87. pthread_mutex_unlock(&wq->dispatchQueue_conditionMutex);
  88. continue;
  89. }
  90. /* Execute */
  91. if(dc->callback)
  92. dc->callback(dc->application, dc->data);
  93. UA_free(dc);
  94. }
  95. return NULL;
  96. }
  97. /* Can be called repeatedly and starts additional workers */
  98. UA_StatusCode
  99. UA_WorkQueue_start(UA_WorkQueue *wq, size_t workersCount) {
  100. if(wq->workersSize > 0 || workersCount == 0)
  101. return UA_STATUSCODE_BADINTERNALERROR;
  102. /* Create the worker array */
  103. wq->workers = (UA_Worker*)UA_calloc(workersCount, sizeof(UA_Worker));
  104. if(!wq->workers)
  105. return UA_STATUSCODE_BADOUTOFMEMORY;
  106. wq->workersSize = workersCount;
  107. /* Spin up the workers */
  108. for(size_t i = 0; i < workersCount; ++i) {
  109. UA_Worker *w = &wq->workers[i];
  110. w->queue = wq;
  111. w->counter = 0;
  112. w->running = true;
  113. pthread_create(&w->thread, NULL, (void* (*)(void*))workerLoop, w);
  114. }
  115. return UA_STATUSCODE_GOOD;
  116. }
  117. void UA_WorkQueue_stop(UA_WorkQueue *wq) {
  118. if(wq->workersSize == 0)
  119. return;
  120. /* Signal the workers to stop */
  121. for(size_t i = 0; i < wq->workersSize; ++i)
  122. wq->workers[i].running = false;
  123. /* Wake up all workers */
  124. pthread_cond_broadcast(&wq->dispatchQueue_condition);
  125. /* Wait for the workers to finish, then clean up */
  126. for(size_t i = 0; i < wq->workersSize; ++i)
  127. pthread_join(wq->workers[i].thread, NULL);
  128. UA_free(wq->workers);
  129. wq->workers = NULL;
  130. wq->workersSize = 0;
  131. }
  132. void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb,
  133. void *application, void *data) {
  134. UA_DelayedCallback *dc = (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback));
  135. if(!dc) {
  136. cb(application, data); /* Execute immediately if the memory could not be allocated */
  137. return;
  138. }
  139. dc->callback = cb;
  140. dc->application = application;
  141. dc->data = data;
  142. /* Enqueue for the worker threads */
  143. pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
  144. SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, dc, next);
  145. pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
  146. /* Wake up sleeping workers */
  147. pthread_cond_broadcast(&wq->dispatchQueue_condition);
  148. }
  149. #endif
  150. /*********************/
  151. /* Delayed Callbacks */
  152. /*********************/
  153. #ifdef UA_ENABLE_MULTITHREADING
  154. /* Delayed Callbacks are called only when all callbacks that were dispatched
  155. * prior are finished. After every UA_MAX_DELAYED_SAMPLE delayed Callbacks that
  156. * were added to the queue, we sample the counters from the workers. The
  157. * counters are compared to the last counters that were sampled. If every worker
  158. * has proceeded the counter, then we know that all delayed callbacks prior to
  159. * the last sample-point are safe to execute. */
  160. /* Sample the worker counter for every nth delayed callback. This is used to
  161. * test that all workers have **finished** their current job before the delayed
  162. * callback is processed. */
  163. #define UA_MAX_DELAYED_SAMPLE 100
  164. /* Call only with a held mutex for the delayed callbacks */
  165. static void
  166. dispatchDelayedCallbacks(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
  167. /* Are callbacks before the last checkpoint ready? */
  168. for(size_t i = 0; i < wq->workersSize; ++i) {
  169. if(wq->workers[i].counter == wq->workers[i].checkpointCounter)
  170. return;
  171. }
  172. /* Dispatch all delayed callbacks up to the checkpoint.
  173. * TODO: Move over the entire queue up to the checkpoint in one step. */
  174. if(wq->delayedCallbacks_checkpoint != NULL) {
  175. UA_DelayedCallback *iter, *tmp_iter;
  176. SIMPLEQ_FOREACH_SAFE(iter, &wq->delayedCallbacks, next, tmp_iter) {
  177. pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
  178. SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, iter, next);
  179. pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
  180. if(iter == wq->delayedCallbacks_checkpoint)
  181. break;
  182. }
  183. }
  184. /* Create the new sample point */
  185. for(size_t i = 0; i < wq->workersSize; ++i)
  186. wq->workers[i].checkpointCounter = wq->workers[i].counter;
  187. wq->delayedCallbacks_checkpoint = cb;
  188. }
  189. #endif
  190. void
  191. UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
  192. #ifdef UA_ENABLE_MULTITHREADING
  193. pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
  194. #endif
  195. SIMPLEQ_INSERT_HEAD(&wq->delayedCallbacks, cb, next);
  196. #ifdef UA_ENABLE_MULTITHREADING
  197. wq->delayedCallbacks_sinceDispatch++;
  198. if(wq->delayedCallbacks_sinceDispatch > UA_MAX_DELAYED_SAMPLE) {
  199. dispatchDelayedCallbacks(wq, cb);
  200. wq->delayedCallbacks_sinceDispatch = 0;
  201. }
  202. pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
  203. #endif
  204. }
  205. /* Assumes all workers are shut down */
  206. void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq) {
  207. UA_DelayedCallback *dc, *dc_tmp;
  208. SIMPLEQ_FOREACH_SAFE(dc, &wq->delayedCallbacks, next, dc_tmp) {
  209. SIMPLEQ_REMOVE_HEAD(&wq->delayedCallbacks, next);
  210. if(dc->callback)
  211. dc->callback(dc->application, dc->data);
  212. UA_free(dc);
  213. }
  214. #ifdef UA_ENABLE_MULTITHREADING
  215. wq->delayedCallbacks_checkpoint = NULL;
  216. #endif
  217. }