ua_server_worker.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. #include "ua_util.h"
  5. #include "ua_server_internal.h"
  6. #define UA_MAXTIMEOUT 50 /* Max timeout in ms between main-loop iterations */
  7. /**
  8. * Worker Threads and Dispatch Queue
  9. * ---------------------------------
  10. * The worker threads dequeue callbacks from a central Multi-Producer
  11. * Multi-Consumer Queue (MPMC). When there are no callbacks, workers go idle.
  12. * The condition to wake them up is triggered whenever a callback is
  13. * dispatched.
  14. *
  15. * Future Plans: Use work-stealing to load-balance between cores.
  16. * Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory
  17. * models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */
  18. #ifdef UA_ENABLE_MULTITHREADING
  19. struct UA_Worker {
  20. UA_Server *server;
  21. pthread_t thr;
  22. UA_UInt32 counter;
  23. volatile UA_Boolean running;
  24. /* separate cache lines */
  25. char padding[64 - sizeof(void*) - sizeof(pthread_t) -
  26. sizeof(UA_UInt32) - sizeof(UA_Boolean)];
  27. };
  28. typedef struct {
  29. struct cds_wfcq_node node;
  30. UA_ServerCallback callback;
  31. void *data;
  32. UA_Boolean delayed; /* Is it a delayed callback? */
  33. UA_Boolean countersSampled; /* Have the worker counters been sampled? */
  34. UA_UInt32 workerCounters[]; /* Counter value for each worker */
  35. } WorkerCallback;
  36. /* Forward Declaration */
  37. static void
  38. processDelayedCallback(UA_Server *server, WorkerCallback *dc);
  39. static void *
  40. workerLoop(UA_Worker *worker) {
  41. UA_Server *server = worker->server;
  42. UA_UInt32 *counter = &worker->counter;
  43. volatile UA_Boolean *running = &worker->running;
  44. /* Initialize the (thread local) random seed with the ram address
  45. * of the worker. Not for security-critical entropy! */
  46. UA_random_seed((uintptr_t)worker);
  47. rcu_register_thread();
  48. while(*running) {
  49. UA_atomic_add(counter, 1);
  50. WorkerCallback *dc = (WorkerCallback*)
  51. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head,
  52. &server->dispatchQueue_tail);
  53. if(!dc) {
  54. /* Nothing to do. Sleep until a callback is dispatched */
  55. pthread_mutex_lock(&server->dispatchQueue_mutex);
  56. pthread_cond_wait(&server->dispatchQueue_condition,
  57. &server->dispatchQueue_mutex);
  58. pthread_mutex_unlock(&server->dispatchQueue_mutex);
  59. continue;
  60. }
  61. if(dc->delayed) {
  62. processDelayedCallback(server, dc);
  63. continue;
  64. }
  65. UA_RCU_LOCK();
  66. dc->callback(server, dc->data);
  67. UA_free(dc);
  68. UA_RCU_UNLOCK();
  69. }
  70. UA_ASSERT_RCU_UNLOCKED();
  71. rcu_barrier();
  72. rcu_unregister_thread();
  73. UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
  74. "Worker shut down");
  75. return NULL;
  76. }
  77. static void
  78. emptyDispatchQueue(UA_Server *server) {
  79. while(!cds_wfcq_empty(&server->dispatchQueue_head,
  80. &server->dispatchQueue_tail)) {
  81. WorkerCallback *dc = (WorkerCallback*)
  82. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head,
  83. &server->dispatchQueue_tail);
  84. dc->callback(server, dc->data);
  85. UA_free(dc);
  86. }
  87. }
  88. #endif
  89. /**
  90. * Repeated Callbacks
  91. * ------------------
  92. * Repeated Callbacks are handled by UA_Timer (used in both client and server).
  93. * In the multi-threaded case, callbacks are dispatched to workers. Otherwise,
  94. * they are executed immediately. */
  95. void
  96. UA_Server_workerCallback(UA_Server *server, UA_ServerCallback callback,
  97. void *data) {
  98. #ifndef UA_ENABLE_MULTITHREADING
  99. /* Execute immediately */
  100. callback(server, data);
  101. #else
  102. /* Execute immediately if memory could not be allocated */
  103. WorkerCallback *dc = UA_malloc(sizeof(WorkerCallback));
  104. if(!dc) {
  105. callback(server, data);
  106. return;
  107. }
  108. /* Enqueue for the worker threads */
  109. dc->callback = callback;
  110. dc->data = data;
  111. dc->delayed = false;
  112. cds_wfcq_node_init(&dc->node);
  113. cds_wfcq_enqueue(&server->dispatchQueue_head,
  114. &server->dispatchQueue_tail, &dc->node);
  115. /* Wake up sleeping workers */
  116. pthread_cond_broadcast(&server->dispatchQueue_condition);
  117. #endif
  118. }
  119. /**
  120. * Delayed Callbacks
  121. * -----------------
  122. *
  123. * Delayed Callbacks are called only when all callbacks that were dispatched
  124. * prior are finished. In the single-threaded case, the callback is added to a
  125. * singly-linked list that is processed at the end of the server's main-loop. In
  126. * the multi-threaded case, the delay is ensure by a three-step procedure:
  127. *
  128. * 1. The delayed callback is dispatched to the worker queue. So it is only
  129. * dequeued when all prior callbacks have been dequeued.
  130. *
  131. * 2. When the callback is first dequeued by a worker, sample the counter of all
  132. * workers. Once all counters have advanced, the callback is ready.
  133. *
  134. * 3. Check regularly if the callback is ready by adding it back to the dispatch
  135. * queue. */
  136. #ifndef UA_ENABLE_MULTITHREADING
  137. typedef struct UA_DelayedCallback {
  138. SLIST_ENTRY(UA_DelayedCallback) next;
  139. UA_ServerCallback callback;
  140. void *data;
  141. } UA_DelayedCallback;
  142. UA_StatusCode
  143. UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
  144. void *data) {
  145. UA_DelayedCallback *dc =
  146. (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback));
  147. if(!dc)
  148. return UA_STATUSCODE_BADOUTOFMEMORY;
  149. dc->callback = callback;
  150. dc->data = data;
  151. SLIST_INSERT_HEAD(&server->delayedCallbacks, dc, next);
  152. return UA_STATUSCODE_GOOD;
  153. }
  154. static void
  155. processDelayedCallbacks(UA_Server *server) {
  156. UA_DelayedCallback *dc, *dc_tmp;
  157. SLIST_FOREACH_SAFE(dc, &server->delayedCallbacks, next, dc_tmp) {
  158. SLIST_REMOVE(&server->delayedCallbacks, dc, UA_DelayedCallback, next);
  159. dc->callback(server, dc->data);
  160. UA_free(dc);
  161. }
  162. }
  163. #else /* UA_ENABLE_MULTITHREADING */
  164. UA_StatusCode
  165. UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
  166. void *data) {
  167. size_t dcsize = sizeof(WorkerCallback) +
  168. (sizeof(UA_UInt32) * server->config.nThreads);
  169. WorkerCallback *dc = UA_malloc(dcsize);
  170. if(!dc)
  171. return UA_STATUSCODE_BADOUTOFMEMORY;
  172. /* Enqueue for the worker threads */
  173. dc->callback = callback;
  174. dc->data = data;
  175. dc->delayed = true;
  176. dc->countersSampled = false;
  177. cds_wfcq_node_init(&dc->node);
  178. cds_wfcq_enqueue(&server->dispatchQueue_head,
  179. &server->dispatchQueue_tail, &dc->node);
  180. /* Wake up sleeping workers */
  181. pthread_cond_broadcast(&server->dispatchQueue_condition);
  182. return UA_STATUSCODE_GOOD;
  183. }
  184. /* Called from the worker loop */
  185. static void
  186. processDelayedCallback(UA_Server *server, WorkerCallback *dc) {
  187. /* Set the worker counters */
  188. if(!dc->countersSampled) {
  189. for(size_t i = 0; i < server->config.nThreads; ++i)
  190. dc->workerCounters[i] = server->workers[i].counter;
  191. dc->countersSampled = true;
  192. /* Re-add to the dispatch queue */
  193. cds_wfcq_node_init(&dc->node);
  194. cds_wfcq_enqueue(&server->dispatchQueue_head,
  195. &server->dispatchQueue_tail, &dc->node);
  196. /* Wake up sleeping workers */
  197. pthread_cond_broadcast(&server->dispatchQueue_condition);
  198. return;
  199. }
  200. /* Have all other jobs finished? */
  201. UA_Boolean ready = true;
  202. for(size_t i = 0; i < server->config.nThreads; ++i) {
  203. if(dc->workerCounters[i] == server->workers[i].counter) {
  204. ready = false;
  205. break;
  206. }
  207. }
  208. /* Re-add to the dispatch queue.
  209. * TODO: What is the impact of this loop?
  210. * Can we add a small delay here? */
  211. if(!ready) {
  212. cds_wfcq_node_init(&dc->node);
  213. cds_wfcq_enqueue(&server->dispatchQueue_head,
  214. &server->dispatchQueue_tail, &dc->node);
  215. /* Wake up sleeping workers */
  216. pthread_cond_broadcast(&server->dispatchQueue_condition);
  217. return;
  218. }
  219. /* Execute the callback */
  220. dc->callback(server, dc->data);
  221. UA_free(dc);
  222. }
  223. #endif
  224. /**
  225. * Main Server Loop
  226. * ----------------
  227. * Start: Spin up the workers and the network layer
  228. * Iterate: Process repeated callbacks and events in the network layer.
  229. * This part can be driven from an external main-loop in an
  230. * event-driven single-threaded architecture.
  231. * Stop: Stop workers, finish all callbacks, stop the network layer,
  232. * clean up */
  233. UA_StatusCode
  234. UA_Server_run_startup(UA_Server *server) {
  235. /* Start the networklayers */
  236. UA_StatusCode result = UA_STATUSCODE_GOOD;
  237. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  238. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  239. result |= nl->start(nl);
  240. }
  241. /* Spin up the worker threads */
  242. #ifdef UA_ENABLE_MULTITHREADING
  243. UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
  244. "Spinning up %u worker thread(s)", server->config.nThreads);
  245. pthread_cond_init(&server->dispatchQueue_condition, 0);
  246. pthread_mutex_init(&server->dispatchQueue_mutex, 0);
  247. server->workers = UA_malloc(server->config.nThreads * sizeof(UA_Worker));
  248. if(!server->workers)
  249. return UA_STATUSCODE_BADOUTOFMEMORY;
  250. for(size_t i = 0; i < server->config.nThreads; ++i) {
  251. UA_Worker *worker = &server->workers[i];
  252. worker->server = server;
  253. worker->counter = 0;
  254. worker->running = true;
  255. pthread_create(&worker->thr, NULL, (void* (*)(void*))workerLoop, worker);
  256. }
  257. #endif
  258. /* Start the multicast discovery server */
  259. #ifdef UA_ENABLE_DISCOVERY_MULTICAST
  260. if(server->config.applicationDescription.applicationType ==
  261. UA_APPLICATIONTYPE_DISCOVERYSERVER)
  262. startMulticastDiscoveryServer(server);
  263. #endif
  264. return result;
  265. }
  266. UA_UInt16
  267. UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
  268. /* Process repeated work */
  269. UA_DateTime now = UA_DateTime_nowMonotonic();
  270. UA_DateTime nextRepeated =
  271. UA_Timer_process(&server->timer, now,
  272. (UA_TimerDispatchCallback)UA_Server_workerCallback,
  273. server);
  274. UA_DateTime latest = now + (UA_MAXTIMEOUT * UA_MSEC_TO_DATETIME);
  275. if(nextRepeated > latest)
  276. nextRepeated = latest;
  277. UA_UInt16 timeout = 0;
  278. if(waitInternal)
  279. timeout = (UA_UInt16)((nextRepeated - now) / UA_MSEC_TO_DATETIME);
  280. /* Listen on the networklayer */
  281. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  282. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  283. nl->listen(nl, server, timeout);
  284. }
  285. #ifndef UA_ENABLE_MULTITHREADING
  286. /* Process delayed callbacks when all callbacks and
  287. * network events are done */
  288. processDelayedCallbacks(server);
  289. #endif
  290. #if defined(UA_ENABLE_DISCOVERY_MULTICAST) && !defined(UA_ENABLE_MULTITHREADING)
  291. if(server->config.applicationDescription.applicationType ==
  292. UA_APPLICATIONTYPE_DISCOVERYSERVER) {
  293. // TODO multicastNextRepeat does not consider new input data (requests)
  294. // on the socket. It will be handled on the next call. if needed, we
  295. // need to use select with timeout on the multicast socket
  296. // server->mdnsSocket (see example in mdnsd library) on higher level.
  297. UA_DateTime multicastNextRepeat = 0;
  298. UA_StatusCode hasNext =
  299. iterateMulticastDiscoveryServer(server, &multicastNextRepeat,
  300. UA_TRUE);
  301. if(hasNext == UA_STATUSCODE_GOOD && multicastNextRepeat < nextRepeated)
  302. nextRepeated = multicastNextRepeat;
  303. }
  304. #endif
  305. now = UA_DateTime_nowMonotonic();
  306. timeout = 0;
  307. if(nextRepeated > now)
  308. timeout = (UA_UInt16)((nextRepeated - now) / UA_MSEC_TO_DATETIME);
  309. return timeout;
  310. }
  311. UA_StatusCode
  312. UA_Server_run_shutdown(UA_Server *server) {
  313. /* Stop the netowrk layer */
  314. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  315. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  316. nl->stop(nl, server);
  317. }
  318. #ifndef UA_ENABLE_MULTITHREADING
  319. /* Process remaining delayed callbacks */
  320. processDelayedCallbacks(server);
  321. #else
  322. /* Shut down the workers */
  323. if(server->workers) {
  324. UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
  325. "Shutting down %u worker thread(s)",
  326. server->config.nThreads);
  327. for(size_t i = 0; i < server->config.nThreads; ++i)
  328. server->workers[i].running = false;
  329. pthread_cond_broadcast(&server->dispatchQueue_condition);
  330. for(size_t i = 0; i < server->config.nThreads; ++i)
  331. pthread_join(server->workers[i].thr, NULL);
  332. UA_free(server->workers);
  333. server->workers = NULL;
  334. }
  335. /* Execute the remaining callbacks in the dispatch queue.
  336. * This also executes the delayed callbacks. */
  337. emptyDispatchQueue(server);
  338. /* Wait for all scheduled call_rcu work to complete */
  339. UA_ASSERT_RCU_UNLOCKED();
  340. rcu_barrier();
  341. #endif
  342. /* Stop multicast discovery */
  343. #ifdef UA_ENABLE_DISCOVERY_MULTICAST
  344. if(server->config.applicationDescription.applicationType ==
  345. UA_APPLICATIONTYPE_DISCOVERYSERVER)
  346. stopMulticastDiscoveryServer(server);
  347. #endif
  348. return UA_STATUSCODE_GOOD;
  349. }
  350. UA_StatusCode
  351. UA_Server_run(UA_Server *server, volatile UA_Boolean *running) {
  352. UA_StatusCode retval = UA_Server_run_startup(server);
  353. if(retval != UA_STATUSCODE_GOOD)
  354. return retval;
  355. while(*running)
  356. UA_Server_run_iterate(server, true);
  357. return UA_Server_run_shutdown(server);
  358. }