ua_workqueue.h 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2014-2016 (c) Sten Grüner
  7. * Copyright 2015 (c) Chris Iatrou
  8. * Copyright 2015 (c) Nick Goossens
  9. * Copyright 2015 (c) Jörg Schüler-Maroldt
  10. * Copyright 2015-2016 (c) Oleksiy Vasylyev
  11. * Copyright 2016-2017 (c) Florian Palm
  12. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  13. * Copyright 2016 (c) Lorenz Haas
  14. * Copyright 2017 (c) Jonas Green
  15. */
  16. #ifndef UA_WORKQUEUE_H_
  17. #define UA_WORKQUEUE_H_
  18. #include "ua_util_internal.h"
  19. #include "open62541_queue.h"
  20. #ifdef UA_ENABLE_MULTITHREADING
  21. #include <pthread.h>
  22. #endif
  23. _UA_BEGIN_DECLS
  24. /* Callback where the application is either a client or a server */
  25. typedef void (*UA_ApplicationCallback)(void *application, void *data);
  26. /* Delayed callbacks are executed when all previously enqueue work is finished.
  27. * This is used to free memory that might used by a parallel worker or where the
  28. * current threat has remaining pointers to until the current operation
  29. * finishes. */
  30. typedef struct UA_DelayedCallback {
  31. SIMPLEQ_ENTRY(UA_DelayedCallback) next;
  32. UA_ApplicationCallback callback;
  33. void *application;
  34. void *data;
  35. } UA_DelayedCallback;
  36. struct UA_WorkQueue;
  37. typedef struct UA_WorkQueue UA_WorkQueue;
  38. #ifdef UA_ENABLE_MULTITHREADING
  39. /* Workers take out callbacks from the work queue and execute them.
  40. *
  41. * Future Plans: Use work-stealing to load-balance between cores.
  42. * Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory
  43. * models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */
  44. typedef struct {
  45. pthread_t thread;
  46. volatile UA_Boolean running;
  47. UA_WorkQueue *queue;
  48. UA_UInt32 counter;
  49. UA_UInt32 checkpointCounter; /* Counter when the last checkpoint was made
  50. * for the delayed callbacks */
  51. /* separate cache lines */
  52. char padding[64 - sizeof(void*) - sizeof(pthread_t) -
  53. sizeof(UA_UInt32) - sizeof(UA_Boolean)];
  54. } UA_Worker;
  55. #endif
  56. struct UA_WorkQueue {
  57. /* Worker threads and work queue. Without multithreading, work is executed
  58. immediately. */
  59. #ifdef UA_ENABLE_MULTITHREADING
  60. UA_Worker *workers;
  61. size_t workersSize;
  62. /* Work queue */
  63. SIMPLEQ_HEAD(, UA_DelayedCallback) dispatchQueue; /* Dispatch queue for the worker threads */
  64. pthread_mutex_t dispatchQueue_accessMutex; /* mutex for access to queue */
  65. pthread_cond_t dispatchQueue_condition; /* so the workers don't spin if the queue is empty */
  66. pthread_mutex_t dispatchQueue_conditionMutex; /* mutex for access to condition variable */
  67. #endif
  68. /* Delayed callbacks
  69. * To be executed after all curretly dispatched works has finished */
  70. SIMPLEQ_HEAD(, UA_DelayedCallback) delayedCallbacks;
  71. #ifdef UA_ENABLE_MULTITHREADING
  72. pthread_mutex_t delayedCallbacks_accessMutex;
  73. UA_DelayedCallback *delayedCallbacks_checkpoint;
  74. size_t delayedCallbacks_sinceDispatch; /* How many have been added since we
  75. * tried to dispatch callbacks? */
  76. #endif
  77. };
  78. void UA_WorkQueue_init(UA_WorkQueue *wq);
  79. /* Enqueue a delayed callback. It is executed when all previous work in the
  80. * queue has been finished. The ``cb`` pointer is freed afterwards. ``cb`` can
  81. * have a NULL callback that is not executed.
  82. *
  83. * This method checks internally if existing delayed work can be moved from the
  84. * delayed queue to the worker dispatch queue. */
  85. void UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb);
  86. /* Stop the workers, process all enqueued work in the calling thread, clean up
  87. * mutexes etc. */
  88. void UA_WorkQueue_cleanup(UA_WorkQueue *wq);
  89. #ifndef UA_ENABLE_MULTITHREADING
  90. /* Process all enqueued delayed work. This is not needed when workers are
  91. * running for the multithreading case. (UA_WorkQueue_cleanup still calls this
  92. * method during cleanup when the workers are shut down.) */
  93. void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq);
  94. #else
  95. /* Spin up a number of worker threads that listen on the work queue */
  96. UA_StatusCode UA_WorkQueue_start(UA_WorkQueue *wq, size_t workersCount);
  97. void UA_WorkQueue_stop(UA_WorkQueue *wq);
  98. /* Enqueue work for the worker threads */
  99. void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb,
  100. void *application, void *data);
  101. #endif
  102. _UA_END_DECLS
  103. #endif /* UA_SERVER_WORKQUEUE_H_ */