ua_server_worker.c 14 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. #include "ua_util.h"
  5. #include "ua_server_internal.h"
  6. #define UA_MAXTIMEOUT 50 /* Max timeout in ms between main-loop iterations */
  7. /**
  8. * Worker Threads and Dispatch Queue
  9. * ---------------------------------
  10. * The worker threads dequeue callbacks from a central Multi-Producer
  11. * Multi-Consumer Queue (MPMC). When there are no callbacks, workers go idle.
  12. * The condition to wake them up is triggered whenever a callback is
  13. * dispatched.
  14. *
  15. * Future Plans: Use work-stealing to load-balance between cores.
  16. * Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory
  17. * models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */
  18. #ifdef UA_ENABLE_MULTITHREADING
  19. struct UA_Worker {
  20. UA_Server *server;
  21. pthread_t thr;
  22. UA_UInt32 counter;
  23. volatile UA_Boolean running;
  24. /* separate cache lines */
  25. char padding[64 - sizeof(void*) - sizeof(pthread_t) -
  26. sizeof(UA_UInt32) - sizeof(UA_Boolean)];
  27. };
  28. typedef struct {
  29. struct cds_wfcq_node node;
  30. UA_ServerCallback callback;
  31. void *data;
  32. UA_Boolean delayed; /* Is it a delayed callback? */
  33. UA_Boolean countersSampled; /* Have the worker counters been sampled? */
  34. UA_UInt32 workerCounters[]; /* Counter value for each worker */
  35. } WorkerCallback;
  36. /* Forward Declaration */
  37. static void
  38. processDelayedCallback(UA_Server *server, WorkerCallback *dc);
  39. static void *
  40. workerLoop(UA_Worker *worker) {
  41. UA_Server *server = worker->server;
  42. UA_UInt32 *counter = &worker->counter;
  43. volatile UA_Boolean *running = &worker->running;
  44. /* Initialize the (thread local) random seed with the ram address
  45. * of the worker. Not for security-critical entropy! */
  46. UA_random_seed((uintptr_t)worker);
  47. while(*running) {
  48. UA_atomic_add(counter, 1);
  49. WorkerCallback *dc = (WorkerCallback*)
  50. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head,
  51. &server->dispatchQueue_tail);
  52. if(!dc) {
  53. /* Nothing to do. Sleep until a callback is dispatched */
  54. pthread_mutex_lock(&server->dispatchQueue_mutex);
  55. pthread_cond_wait(&server->dispatchQueue_condition,
  56. &server->dispatchQueue_mutex);
  57. pthread_mutex_unlock(&server->dispatchQueue_mutex);
  58. continue;
  59. }
  60. if(dc->delayed) {
  61. processDelayedCallback(server, dc);
  62. continue;
  63. }
  64. dc->callback(server, dc->data);
  65. UA_free(dc);
  66. }
  67. UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
  68. "Worker shut down");
  69. return NULL;
  70. }
  71. static void
  72. emptyDispatchQueue(UA_Server *server) {
  73. while(!cds_wfcq_empty(&server->dispatchQueue_head,
  74. &server->dispatchQueue_tail)) {
  75. WorkerCallback *dc = (WorkerCallback*)
  76. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head,
  77. &server->dispatchQueue_tail);
  78. dc->callback(server, dc->data);
  79. UA_free(dc);
  80. }
  81. }
  82. #endif
  83. /**
  84. * Repeated Callbacks
  85. * ------------------
  86. * Repeated Callbacks are handled by UA_Timer (used in both client and server).
  87. * In the multi-threaded case, callbacks are dispatched to workers. Otherwise,
  88. * they are executed immediately. */
  89. void
  90. UA_Server_workerCallback(UA_Server *server, UA_ServerCallback callback,
  91. void *data) {
  92. #ifndef UA_ENABLE_MULTITHREADING
  93. /* Execute immediately */
  94. callback(server, data);
  95. #else
  96. /* Execute immediately if memory could not be allocated */
  97. WorkerCallback *dc = (WorkerCallback*)UA_malloc(sizeof(WorkerCallback));
  98. if(!dc) {
  99. callback(server, data);
  100. return;
  101. }
  102. /* Enqueue for the worker threads */
  103. dc->callback = callback;
  104. dc->data = data;
  105. dc->delayed = false;
  106. cds_wfcq_node_init(&dc->node);
  107. cds_wfcq_enqueue(&server->dispatchQueue_head,
  108. &server->dispatchQueue_tail, &dc->node);
  109. /* Wake up sleeping workers */
  110. pthread_cond_broadcast(&server->dispatchQueue_condition);
  111. #endif
  112. }
  113. /**
  114. * Delayed Callbacks
  115. * -----------------
  116. *
  117. * Delayed Callbacks are called only when all callbacks that were dispatched
  118. * prior are finished. In the single-threaded case, the callback is added to a
  119. * singly-linked list that is processed at the end of the server's main-loop. In
  120. * the multi-threaded case, the delay is ensure by a three-step procedure:
  121. *
  122. * 1. The delayed callback is dispatched to the worker queue. So it is only
  123. * dequeued when all prior callbacks have been dequeued.
  124. *
  125. * 2. When the callback is first dequeued by a worker, sample the counter of all
  126. * workers. Once all counters have advanced, the callback is ready.
  127. *
  128. * 3. Check regularly if the callback is ready by adding it back to the dispatch
  129. * queue. */
  130. #ifndef UA_ENABLE_MULTITHREADING
  131. typedef struct UA_DelayedCallback {
  132. SLIST_ENTRY(UA_DelayedCallback) next;
  133. UA_ServerCallback callback;
  134. void *data;
  135. } UA_DelayedCallback;
  136. UA_StatusCode
  137. UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
  138. void *data) {
  139. UA_DelayedCallback *dc =
  140. (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback));
  141. if(!dc)
  142. return UA_STATUSCODE_BADOUTOFMEMORY;
  143. dc->callback = callback;
  144. dc->data = data;
  145. SLIST_INSERT_HEAD(&server->delayedCallbacks, dc, next);
  146. return UA_STATUSCODE_GOOD;
  147. }
  148. static void
  149. processDelayedCallbacks(UA_Server *server) {
  150. UA_DelayedCallback *dc, *dc_tmp;
  151. SLIST_FOREACH_SAFE(dc, &server->delayedCallbacks, next, dc_tmp) {
  152. SLIST_REMOVE(&server->delayedCallbacks, dc, UA_DelayedCallback, next);
  153. dc->callback(server, dc->data);
  154. UA_free(dc);
  155. }
  156. }
  157. #else /* UA_ENABLE_MULTITHREADING */
  158. UA_StatusCode
  159. UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
  160. void *data) {
  161. size_t dcsize = sizeof(WorkerCallback) +
  162. (sizeof(UA_UInt32) * server->config.nThreads);
  163. WorkerCallback *dc = (WorkerCallback*)UA_malloc(dcsize);
  164. if(!dc)
  165. return UA_STATUSCODE_BADOUTOFMEMORY;
  166. /* Enqueue for the worker threads */
  167. dc->callback = callback;
  168. dc->data = data;
  169. dc->delayed = true;
  170. dc->countersSampled = false;
  171. cds_wfcq_node_init(&dc->node);
  172. cds_wfcq_enqueue(&server->dispatchQueue_head,
  173. &server->dispatchQueue_tail, &dc->node);
  174. /* Wake up sleeping workers */
  175. pthread_cond_broadcast(&server->dispatchQueue_condition);
  176. return UA_STATUSCODE_GOOD;
  177. }
  178. /* Called from the worker loop */
  179. static void
  180. processDelayedCallback(UA_Server *server, WorkerCallback *dc) {
  181. /* Set the worker counters */
  182. if(!dc->countersSampled) {
  183. for(size_t i = 0; i < server->config.nThreads; ++i)
  184. dc->workerCounters[i] = server->workers[i].counter;
  185. dc->countersSampled = true;
  186. /* Re-add to the dispatch queue */
  187. cds_wfcq_node_init(&dc->node);
  188. cds_wfcq_enqueue(&server->dispatchQueue_head,
  189. &server->dispatchQueue_tail, &dc->node);
  190. /* Wake up sleeping workers */
  191. pthread_cond_broadcast(&server->dispatchQueue_condition);
  192. return;
  193. }
  194. /* Have all other jobs finished? */
  195. UA_Boolean ready = true;
  196. for(size_t i = 0; i < server->config.nThreads; ++i) {
  197. if(dc->workerCounters[i] == server->workers[i].counter) {
  198. ready = false;
  199. break;
  200. }
  201. }
  202. /* Re-add to the dispatch queue.
  203. * TODO: What is the impact of this loop?
  204. * Can we add a small delay here? */
  205. if(!ready) {
  206. cds_wfcq_node_init(&dc->node);
  207. cds_wfcq_enqueue(&server->dispatchQueue_head,
  208. &server->dispatchQueue_tail, &dc->node);
  209. /* Wake up sleeping workers */
  210. pthread_cond_broadcast(&server->dispatchQueue_condition);
  211. return;
  212. }
  213. /* Execute the callback */
  214. dc->callback(server, dc->data);
  215. UA_free(dc);
  216. }
  217. #endif
  218. /**
  219. * Main Server Loop
  220. * ----------------
  221. * Start: Spin up the workers and the network layer and sample the server's
  222. * start time.
  223. * Iterate: Process repeated callbacks and events in the network layer.
  224. * This part can be driven from an external main-loop in an
  225. * event-driven single-threaded architecture.
  226. * Stop: Stop workers, finish all callbacks, stop the network layer,
  227. * clean up */
  228. UA_StatusCode
  229. UA_Server_run_startup(UA_Server *server) {
  230. UA_Variant var;
  231. UA_StatusCode result = UA_STATUSCODE_GOOD;
  232. /* Sample the start time and set it to the Server object */
  233. server->startTime = UA_DateTime_now();
  234. UA_Variant_init(&var);
  235. UA_Variant_setScalar(&var, &server->startTime, &UA_TYPES[UA_TYPES_DATETIME]);
  236. UA_Server_writeValue(server,
  237. UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_STARTTIME),
  238. var);
  239. /* Start the networklayers */
  240. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  241. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  242. result |= nl->start(nl);
  243. }
  244. /* Spin up the worker threads */
  245. #ifdef UA_ENABLE_MULTITHREADING
  246. UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
  247. "Spinning up %u worker thread(s)", server->config.nThreads);
  248. pthread_cond_init(&server->dispatchQueue_condition, 0);
  249. pthread_mutex_init(&server->dispatchQueue_mutex, 0);
  250. server->workers = (UA_Worker*)UA_malloc(server->config.nThreads * sizeof(UA_Worker));
  251. if(!server->workers)
  252. return UA_STATUSCODE_BADOUTOFMEMORY;
  253. for(size_t i = 0; i < server->config.nThreads; ++i) {
  254. UA_Worker *worker = &server->workers[i];
  255. worker->server = server;
  256. worker->counter = 0;
  257. worker->running = true;
  258. pthread_create(&worker->thr, NULL, (void* (*)(void*))workerLoop, worker);
  259. }
  260. #endif
  261. /* Start the multicast discovery server */
  262. #ifdef UA_ENABLE_DISCOVERY_MULTICAST
  263. if(server->config.applicationDescription.applicationType ==
  264. UA_APPLICATIONTYPE_DISCOVERYSERVER)
  265. startMulticastDiscoveryServer(server);
  266. #endif
  267. return result;
  268. }
  269. UA_UInt16
  270. UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
  271. /* Process repeated work */
  272. UA_DateTime now = UA_DateTime_nowMonotonic();
  273. UA_DateTime nextRepeated =
  274. UA_Timer_process(&server->timer, now,
  275. (UA_TimerDispatchCallback)UA_Server_workerCallback,
  276. server);
  277. UA_DateTime latest = now + (UA_MAXTIMEOUT * UA_MSEC_TO_DATETIME);
  278. if(nextRepeated > latest)
  279. nextRepeated = latest;
  280. UA_UInt16 timeout = 0;
  281. if(waitInternal)
  282. timeout = (UA_UInt16)((nextRepeated - now) / UA_MSEC_TO_DATETIME);
  283. /* Listen on the networklayer */
  284. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  285. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  286. nl->listen(nl, server, timeout);
  287. }
  288. #ifndef UA_ENABLE_MULTITHREADING
  289. /* Process delayed callbacks when all callbacks and
  290. * network events are done */
  291. processDelayedCallbacks(server);
  292. #endif
  293. #if defined(UA_ENABLE_DISCOVERY_MULTICAST) && !defined(UA_ENABLE_MULTITHREADING)
  294. if(server->config.applicationDescription.applicationType ==
  295. UA_APPLICATIONTYPE_DISCOVERYSERVER) {
  296. // TODO multicastNextRepeat does not consider new input data (requests)
  297. // on the socket. It will be handled on the next call. if needed, we
  298. // need to use select with timeout on the multicast socket
  299. // server->mdnsSocket (see example in mdnsd library) on higher level.
  300. UA_DateTime multicastNextRepeat = 0;
  301. UA_StatusCode hasNext =
  302. iterateMulticastDiscoveryServer(server, &multicastNextRepeat,
  303. UA_TRUE);
  304. if(hasNext == UA_STATUSCODE_GOOD && multicastNextRepeat < nextRepeated)
  305. nextRepeated = multicastNextRepeat;
  306. }
  307. #endif
  308. now = UA_DateTime_nowMonotonic();
  309. timeout = 0;
  310. if(nextRepeated > now)
  311. timeout = (UA_UInt16)((nextRepeated - now) / UA_MSEC_TO_DATETIME);
  312. return timeout;
  313. }
  314. UA_StatusCode
  315. UA_Server_run_shutdown(UA_Server *server) {
  316. /* Stop the netowrk layer */
  317. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  318. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  319. nl->stop(nl, server);
  320. }
  321. #ifndef UA_ENABLE_MULTITHREADING
  322. /* Process remaining delayed callbacks */
  323. processDelayedCallbacks(server);
  324. #else
  325. /* Shut down the workers */
  326. if(server->workers) {
  327. UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
  328. "Shutting down %u worker thread(s)",
  329. server->config.nThreads);
  330. for(size_t i = 0; i < server->config.nThreads; ++i)
  331. server->workers[i].running = false;
  332. pthread_cond_broadcast(&server->dispatchQueue_condition);
  333. for(size_t i = 0; i < server->config.nThreads; ++i)
  334. pthread_join(server->workers[i].thr, NULL);
  335. UA_free(server->workers);
  336. server->workers = NULL;
  337. }
  338. /* Execute the remaining callbacks in the dispatch queue.
  339. * This also executes the delayed callbacks. */
  340. emptyDispatchQueue(server);
  341. #endif
  342. /* Stop multicast discovery */
  343. #ifdef UA_ENABLE_DISCOVERY_MULTICAST
  344. if(server->config.applicationDescription.applicationType ==
  345. UA_APPLICATIONTYPE_DISCOVERYSERVER)
  346. stopMulticastDiscoveryServer(server);
  347. #endif
  348. return UA_STATUSCODE_GOOD;
  349. }
  350. UA_StatusCode
  351. UA_Server_run(UA_Server *server, volatile UA_Boolean *running) {
  352. UA_StatusCode retval = UA_Server_run_startup(server);
  353. if(retval != UA_STATUSCODE_GOOD)
  354. return retval;
  355. while(*running)
  356. UA_Server_run_iterate(server, true);
  357. return UA_Server_run_shutdown(server);
  358. }