ua_server_worker.c 15 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2014-2017 (c) Julius Pfrommer, Fraunhofer IOSB
  6. * Copyright 2014-2016 (c) Sten Grüner
  7. * Copyright 2015 (c) Chris Iatrou
  8. * Copyright 2015 (c) Nick Goossens
  9. * Copyright 2015 (c) Jörg Schüler-Maroldt
  10. * Copyright 2015-2016 (c) Oleksiy Vasylyev
  11. * Copyright 2016-2017 (c) Florian Palm
  12. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  13. * Copyright 2016 (c) Lorenz Haas
  14. * Copyright 2017 (c) Jonas Green
  15. */
  16. #include "ua_util.h"
  17. #include "ua_server_internal.h"
  18. #define UA_MAXTIMEOUT 50 /* Max timeout in ms between main-loop iterations */
  19. /**
  20. * Worker Threads and Dispatch Queue
  21. * ---------------------------------
  22. * The worker threads dequeue callbacks from a central Multi-Producer
  23. * Multi-Consumer Queue (MPMC). When there are no callbacks, workers go idle.
  24. * The condition to wake them up is triggered whenever a callback is
  25. * dispatched.
  26. *
  27. * Future Plans: Use work-stealing to load-balance between cores.
  28. * Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory
  29. * models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */
  30. #ifdef UA_ENABLE_MULTITHREADING
  31. struct UA_Worker {
  32. UA_Server *server;
  33. pthread_t thr;
  34. UA_UInt32 counter;
  35. volatile UA_Boolean running;
  36. /* separate cache lines */
  37. char padding[64 - sizeof(void*) - sizeof(pthread_t) -
  38. sizeof(UA_UInt32) - sizeof(UA_Boolean)];
  39. };
  40. struct UA_WorkerCallback {
  41. SIMPLEQ_ENTRY(UA_WorkerCallback) next;
  42. UA_ServerCallback callback;
  43. void *data;
  44. UA_Boolean delayed; /* Is it a delayed callback? */
  45. UA_Boolean countersSampled; /* Have the worker counters been sampled? */
  46. UA_UInt32 workerCounters[]; /* Counter value for each worker */
  47. };
  48. typedef struct UA_WorkerCallback WorkerCallback;
  49. /* Forward Declaration */
  50. static void
  51. processDelayedCallback(UA_Server *server, WorkerCallback *dc);
  52. static void *
  53. workerLoop(UA_Worker *worker) {
  54. UA_Server *server = worker->server;
  55. UA_UInt32 *counter = &worker->counter;
  56. volatile UA_Boolean *running = &worker->running;
  57. /* Initialize the (thread local) random seed with the ram address
  58. * of the worker. Not for security-critical entropy! */
  59. UA_random_seed((uintptr_t)worker);
  60. while(*running) {
  61. UA_atomic_addUInt32(counter, 1);
  62. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  63. WorkerCallback *dc = SIMPLEQ_FIRST(&server->dispatchQueue);
  64. if(dc) {
  65. SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
  66. }
  67. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  68. if(!dc) {
  69. /* Nothing to do. Sleep until a callback is dispatched */
  70. pthread_mutex_lock(&server->dispatchQueue_conditionMutex);
  71. pthread_cond_wait(&server->dispatchQueue_condition,
  72. &server->dispatchQueue_conditionMutex);
  73. pthread_mutex_unlock(&server->dispatchQueue_conditionMutex);
  74. continue;
  75. }
  76. if(dc->delayed) {
  77. processDelayedCallback(server, dc);
  78. continue;
  79. }
  80. dc->callback(server, dc->data);
  81. UA_free(dc);
  82. }
  83. UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
  84. "Worker shut down");
  85. return NULL;
  86. }
  87. static void
  88. emptyDispatchQueue(UA_Server *server) {
  89. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  90. WorkerCallback *dc;
  91. while((dc = SIMPLEQ_FIRST(&server->dispatchQueue)) != NULL) {
  92. SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
  93. dc->callback(server, dc->data);
  94. UA_free(dc);
  95. }
  96. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  97. }
  98. #endif
  99. /**
  100. * Repeated Callbacks
  101. * ------------------
  102. * Repeated Callbacks are handled by UA_Timer (used in both client and server).
  103. * In the multi-threaded case, callbacks are dispatched to workers. Otherwise,
  104. * they are executed immediately. */
  105. void
  106. UA_Server_workerCallback(UA_Server *server, UA_ServerCallback callback,
  107. void *data) {
  108. #ifndef UA_ENABLE_MULTITHREADING
  109. /* Execute immediately */
  110. callback(server, data);
  111. #else
  112. /* Execute immediately if memory could not be allocated */
  113. WorkerCallback *dc = (WorkerCallback*)UA_malloc(sizeof(WorkerCallback));
  114. if(!dc) {
  115. callback(server, data);
  116. return;
  117. }
  118. /* Enqueue for the worker threads */
  119. dc->callback = callback;
  120. dc->data = data;
  121. dc->delayed = false;
  122. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  123. SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
  124. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  125. /* Wake up sleeping workers */
  126. pthread_cond_broadcast(&server->dispatchQueue_condition);
  127. #endif
  128. }
  129. /**
  130. * Delayed Callbacks
  131. * -----------------
  132. *
  133. * Delayed Callbacks are called only when all callbacks that were dispatched
  134. * prior are finished. In the single-threaded case, the callback is added to a
  135. * singly-linked list that is processed at the end of the server's main-loop. In
  136. * the multi-threaded case, the delay is ensure by a three-step procedure:
  137. *
  138. * 1. The delayed callback is dispatched to the worker queue. So it is only
  139. * dequeued when all prior callbacks have been dequeued.
  140. *
  141. * 2. When the callback is first dequeued by a worker, sample the counter of all
  142. * workers. Once all counters have advanced, the callback is ready.
  143. *
  144. * 3. Check regularly if the callback is ready by adding it back to the dispatch
  145. * queue. */
  146. #ifndef UA_ENABLE_MULTITHREADING
  147. typedef struct UA_DelayedCallback {
  148. SLIST_ENTRY(UA_DelayedCallback) next;
  149. UA_ServerCallback callback;
  150. void *data;
  151. } UA_DelayedCallback;
  152. UA_StatusCode
  153. UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
  154. void *data) {
  155. UA_DelayedCallback *dc =
  156. (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback));
  157. if(!dc)
  158. return UA_STATUSCODE_BADOUTOFMEMORY;
  159. dc->callback = callback;
  160. dc->data = data;
  161. SLIST_INSERT_HEAD(&server->delayedCallbacks, dc, next);
  162. return UA_STATUSCODE_GOOD;
  163. }
  164. static void
  165. processDelayedCallbacks(UA_Server *server) {
  166. UA_DelayedCallback *dc, *dc_tmp;
  167. SLIST_FOREACH_SAFE(dc, &server->delayedCallbacks, next, dc_tmp) {
  168. SLIST_REMOVE(&server->delayedCallbacks, dc, UA_DelayedCallback, next);
  169. dc->callback(server, dc->data);
  170. UA_free(dc);
  171. }
  172. }
  173. #else /* UA_ENABLE_MULTITHREADING */
  174. UA_StatusCode
  175. UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
  176. void *data) {
  177. size_t dcsize = sizeof(WorkerCallback) +
  178. (sizeof(UA_UInt32) * server->config.nThreads);
  179. WorkerCallback *dc = (WorkerCallback*)UA_malloc(dcsize);
  180. if(!dc)
  181. return UA_STATUSCODE_BADOUTOFMEMORY;
  182. /* Enqueue for the worker threads */
  183. dc->callback = callback;
  184. dc->data = data;
  185. dc->delayed = true;
  186. dc->countersSampled = false;
  187. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  188. SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
  189. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  190. /* Wake up sleeping workers */
  191. pthread_cond_broadcast(&server->dispatchQueue_condition);
  192. return UA_STATUSCODE_GOOD;
  193. }
  194. /* Called from the worker loop */
  195. static void
  196. processDelayedCallback(UA_Server *server, WorkerCallback *dc) {
  197. /* Set the worker counters */
  198. if(!dc->countersSampled) {
  199. for(size_t i = 0; i < server->config.nThreads; ++i)
  200. dc->workerCounters[i] = server->workers[i].counter;
  201. dc->countersSampled = true;
  202. /* Re-add to the dispatch queue */
  203. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  204. SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
  205. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  206. /* Wake up sleeping workers */
  207. pthread_cond_broadcast(&server->dispatchQueue_condition);
  208. return;
  209. }
  210. /* Have all other jobs finished? */
  211. UA_Boolean ready = true;
  212. for(size_t i = 0; i < server->config.nThreads; ++i) {
  213. if(dc->workerCounters[i] == server->workers[i].counter) {
  214. ready = false;
  215. break;
  216. }
  217. }
  218. /* Re-add to the dispatch queue.
  219. * TODO: What is the impact of this loop?
  220. * Can we add a small delay here? */
  221. if(!ready) {
  222. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  223. SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
  224. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  225. /* Wake up sleeping workers */
  226. pthread_cond_broadcast(&server->dispatchQueue_condition);
  227. return;
  228. }
  229. /* Execute the callback */
  230. dc->callback(server, dc->data);
  231. UA_free(dc);
  232. }
  233. #endif
  234. /**
  235. * Main Server Loop
  236. * ----------------
  237. * Start: Spin up the workers and the network layer and sample the server's
  238. * start time.
  239. * Iterate: Process repeated callbacks and events in the network layer.
  240. * This part can be driven from an external main-loop in an
  241. * event-driven single-threaded architecture.
  242. * Stop: Stop workers, finish all callbacks, stop the network layer,
  243. * clean up */
  244. UA_StatusCode
  245. UA_Server_run_startup(UA_Server *server) {
  246. UA_Variant var;
  247. UA_StatusCode result = UA_STATUSCODE_GOOD;
  248. /* Sample the start time and set it to the Server object */
  249. server->startTime = UA_DateTime_now();
  250. UA_Variant_init(&var);
  251. UA_Variant_setScalar(&var, &server->startTime, &UA_TYPES[UA_TYPES_DATETIME]);
  252. UA_Server_writeValue(server,
  253. UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_STARTTIME),
  254. var);
  255. /* Start the networklayers */
  256. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  257. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  258. result |= nl->start(nl, &server->config.customHostname);
  259. }
  260. /* Spin up the worker threads */
  261. #ifdef UA_ENABLE_MULTITHREADING
  262. UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
  263. "Spinning up %u worker thread(s)", server->config.nThreads);
  264. pthread_mutex_init(&server->dispatchQueue_accessMutex, NULL);
  265. pthread_cond_init(&server->dispatchQueue_condition, NULL);
  266. pthread_mutex_init(&server->dispatchQueue_conditionMutex, NULL);
  267. server->workers = (UA_Worker*)UA_malloc(server->config.nThreads * sizeof(UA_Worker));
  268. if(!server->workers)
  269. return UA_STATUSCODE_BADOUTOFMEMORY;
  270. for(size_t i = 0; i < server->config.nThreads; ++i) {
  271. UA_Worker *worker = &server->workers[i];
  272. worker->server = server;
  273. worker->counter = 0;
  274. worker->running = true;
  275. pthread_create(&worker->thr, NULL, (void* (*)(void*))workerLoop, worker);
  276. }
  277. #endif
  278. /* Start the multicast discovery server */
  279. #ifdef UA_ENABLE_DISCOVERY_MULTICAST
  280. if(server->config.applicationDescription.applicationType ==
  281. UA_APPLICATIONTYPE_DISCOVERYSERVER)
  282. startMulticastDiscoveryServer(server);
  283. #endif
  284. return result;
  285. }
  286. UA_UInt16
  287. UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
  288. /* Process repeated work */
  289. UA_DateTime now = UA_DateTime_nowMonotonic();
  290. UA_DateTime nextRepeated =
  291. UA_Timer_process(&server->timer, now,
  292. (UA_TimerDispatchCallback)UA_Server_workerCallback,
  293. server);
  294. UA_DateTime latest = now + (UA_MAXTIMEOUT * UA_DATETIME_MSEC);
  295. if(nextRepeated > latest)
  296. nextRepeated = latest;
  297. UA_UInt16 timeout = 0;
  298. /* round always to upper value to avoid timeout to be set to 0
  299. * if(nextRepeated - now) < (UA_DATETIME_MSEC/2) */
  300. if(waitInternal)
  301. timeout = (UA_UInt16)(((nextRepeated - now) + (UA_DATETIME_MSEC - 1)) / UA_DATETIME_MSEC);
  302. /* Listen on the networklayer */
  303. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  304. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  305. nl->listen(nl, server, timeout);
  306. }
  307. #ifndef UA_ENABLE_MULTITHREADING
  308. /* Process delayed callbacks when all callbacks and
  309. * network events are done */
  310. processDelayedCallbacks(server);
  311. #endif
  312. #if defined(UA_ENABLE_DISCOVERY_MULTICAST) && !defined(UA_ENABLE_MULTITHREADING)
  313. if(server->config.applicationDescription.applicationType ==
  314. UA_APPLICATIONTYPE_DISCOVERYSERVER) {
  315. // TODO multicastNextRepeat does not consider new input data (requests)
  316. // on the socket. It will be handled on the next call. if needed, we
  317. // need to use select with timeout on the multicast socket
  318. // server->mdnsSocket (see example in mdnsd library) on higher level.
  319. UA_DateTime multicastNextRepeat = 0;
  320. UA_StatusCode hasNext =
  321. iterateMulticastDiscoveryServer(server, &multicastNextRepeat, UA_TRUE);
  322. if(hasNext == UA_STATUSCODE_GOOD && multicastNextRepeat < nextRepeated)
  323. nextRepeated = multicastNextRepeat;
  324. }
  325. #endif
  326. now = UA_DateTime_nowMonotonic();
  327. timeout = 0;
  328. if(nextRepeated > now)
  329. timeout = (UA_UInt16)((nextRepeated - now) / UA_DATETIME_MSEC);
  330. return timeout;
  331. }
  332. UA_StatusCode
  333. UA_Server_run_shutdown(UA_Server *server) {
  334. /* Stop the netowrk layer */
  335. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  336. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  337. nl->stop(nl, server);
  338. }
  339. #ifndef UA_ENABLE_MULTITHREADING
  340. /* Process remaining delayed callbacks */
  341. processDelayedCallbacks(server);
  342. #else
  343. /* Shut down the workers */
  344. if(server->workers) {
  345. UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
  346. "Shutting down %u worker thread(s)",
  347. server->config.nThreads);
  348. for(size_t i = 0; i < server->config.nThreads; ++i)
  349. server->workers[i].running = false;
  350. pthread_cond_broadcast(&server->dispatchQueue_condition);
  351. for(size_t i = 0; i < server->config.nThreads; ++i)
  352. pthread_join(server->workers[i].thr, NULL);
  353. UA_free(server->workers);
  354. server->workers = NULL;
  355. }
  356. /* Execute the remaining callbacks in the dispatch queue.
  357. * This also executes the delayed callbacks. */
  358. emptyDispatchQueue(server);
  359. #endif
  360. /* Stop multicast discovery */
  361. #ifdef UA_ENABLE_DISCOVERY_MULTICAST
  362. if(server->config.applicationDescription.applicationType ==
  363. UA_APPLICATIONTYPE_DISCOVERYSERVER)
  364. stopMulticastDiscoveryServer(server);
  365. #endif
  366. return UA_STATUSCODE_GOOD;
  367. }
  368. UA_StatusCode
  369. UA_Server_run(UA_Server *server, volatile UA_Boolean *running) {
  370. UA_StatusCode retval = UA_Server_run_startup(server);
  371. if(retval != UA_STATUSCODE_GOOD)
  372. return retval;
  373. while(*running)
  374. UA_Server_run_iterate(server, true);
  375. return UA_Server_run_shutdown(server);
  376. }