ua_server_worker.c 16 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2014-2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2014-2016 (c) Sten Grüner
  7. * Copyright 2015 (c) Chris Iatrou
  8. * Copyright 2015 (c) Nick Goossens
  9. * Copyright 2015 (c) Jörg Schüler-Maroldt
  10. * Copyright 2015-2016 (c) Oleksiy Vasylyev
  11. * Copyright 2016-2017 (c) Florian Palm
  12. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  13. * Copyright 2016 (c) Lorenz Haas
  14. * Copyright 2017 (c) Jonas Green
  15. */
  16. #include "ua_util.h"
  17. #include "ua_server_internal.h"
  18. #ifdef UA_ENABLE_VALGRIND_INTERACTIVE
  19. #include <valgrind/memcheck.h>
  20. #endif
  21. #define UA_MAXTIMEOUT 50 /* Max timeout in ms between main-loop iterations */
  22. /**
  23. * Worker Threads and Dispatch Queue
  24. * ---------------------------------
  25. * The worker threads dequeue callbacks from a central Multi-Producer
  26. * Multi-Consumer Queue (MPMC). When there are no callbacks, workers go idle.
  27. * The condition to wake them up is triggered whenever a callback is
  28. * dispatched.
  29. *
  30. * Future Plans: Use work-stealing to load-balance between cores.
  31. * Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory
  32. * models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */
  33. #ifdef UA_ENABLE_MULTITHREADING
  34. struct UA_Worker {
  35. UA_Server *server;
  36. pthread_t thr;
  37. UA_UInt32 counter;
  38. volatile UA_Boolean running;
  39. /* separate cache lines */
  40. char padding[64 - sizeof(void*) - sizeof(pthread_t) -
  41. sizeof(UA_UInt32) - sizeof(UA_Boolean)];
  42. };
  43. struct UA_WorkerCallback {
  44. SIMPLEQ_ENTRY(UA_WorkerCallback) next;
  45. UA_ServerCallback callback;
  46. void *data;
  47. UA_Boolean delayed; /* Is it a delayed callback? */
  48. UA_Boolean countersSampled; /* Have the worker counters been sampled? */
  49. UA_UInt32 workerCounters[]; /* Counter value for each worker */
  50. };
  51. typedef struct UA_WorkerCallback WorkerCallback;
  52. /* Forward Declaration */
  53. static void
  54. processDelayedCallback(UA_Server *server, WorkerCallback *dc);
  55. static void *
  56. workerLoop(UA_Worker *worker) {
  57. UA_Server *server = worker->server;
  58. UA_UInt32 *counter = &worker->counter;
  59. volatile UA_Boolean *running = &worker->running;
  60. /* Initialize the (thread local) random seed with the ram address
  61. * of the worker. Not for security-critical entropy! */
  62. UA_random_seed((uintptr_t)worker);
  63. while(*running) {
  64. UA_atomic_addUInt32(counter, 1);
  65. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  66. WorkerCallback *dc = SIMPLEQ_FIRST(&server->dispatchQueue);
  67. if(dc) {
  68. SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
  69. }
  70. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  71. if(!dc) {
  72. /* Nothing to do. Sleep until a callback is dispatched */
  73. pthread_mutex_lock(&server->dispatchQueue_conditionMutex);
  74. pthread_cond_wait(&server->dispatchQueue_condition,
  75. &server->dispatchQueue_conditionMutex);
  76. pthread_mutex_unlock(&server->dispatchQueue_conditionMutex);
  77. continue;
  78. }
  79. if(dc->delayed) {
  80. processDelayedCallback(server, dc);
  81. continue;
  82. }
  83. dc->callback(server, dc->data);
  84. UA_free(dc);
  85. }
  86. UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
  87. "Worker shut down");
  88. return NULL;
  89. }
  90. void UA_Server_cleanupDispatchQueue(UA_Server *server) {
  91. while(true) {
  92. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  93. WorkerCallback *dc = SIMPLEQ_FIRST(&server->dispatchQueue);
  94. if(!dc) {
  95. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  96. break;
  97. }
  98. SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
  99. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  100. dc->callback(server, dc->data);
  101. UA_free(dc);
  102. }
  103. }
  104. #endif
  105. /**
  106. * Repeated Callbacks
  107. * ------------------
  108. * Repeated Callbacks are handled by UA_Timer (used in both client and server).
  109. * In the multi-threaded case, callbacks are dispatched to workers. Otherwise,
  110. * they are executed immediately. */
  111. void
  112. UA_Server_workerCallback(UA_Server *server, UA_ServerCallback callback,
  113. void *data) {
  114. #ifndef UA_ENABLE_MULTITHREADING
  115. /* Execute immediately */
  116. callback(server, data);
  117. #else
  118. /* Execute immediately if memory could not be allocated */
  119. WorkerCallback *dc = (WorkerCallback*)UA_malloc(sizeof(WorkerCallback));
  120. if(!dc) {
  121. callback(server, data);
  122. return;
  123. }
  124. /* Enqueue for the worker threads */
  125. dc->callback = callback;
  126. dc->data = data;
  127. dc->delayed = false;
  128. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  129. SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
  130. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  131. /* Wake up sleeping workers */
  132. pthread_cond_broadcast(&server->dispatchQueue_condition);
  133. #endif
  134. }
  135. /**
  136. * Delayed Callbacks
  137. * -----------------
  138. *
  139. * Delayed Callbacks are called only when all callbacks that were dispatched
  140. * prior are finished. In the single-threaded case, the callback is added to a
  141. * singly-linked list that is processed at the end of the server's main-loop. In
  142. * the multi-threaded case, the delay is ensure by a three-step procedure:
  143. *
  144. * 1. The delayed callback is dispatched to the worker queue. So it is only
  145. * dequeued when all prior callbacks have been dequeued.
  146. *
  147. * 2. When the callback is first dequeued by a worker, sample the counter of all
  148. * workers. Once all counters have advanced, the callback is ready.
  149. *
  150. * 3. Check regularly if the callback is ready by adding it back to the dispatch
  151. * queue. */
  152. /* Delayed callback to free the subscription memory */
  153. static void
  154. freeCallback(UA_Server *server, void *data) {
  155. UA_free(data);
  156. }
  157. /* TODO: Delayed free should never fail. This can be achieved by adding a prefix
  158. * with the list pointers */
  159. UA_StatusCode
  160. UA_Server_delayedFree(UA_Server *server, void *data) {
  161. return UA_Server_delayedCallback(server, freeCallback, data);
  162. }
  163. #ifndef UA_ENABLE_MULTITHREADING
  164. typedef struct UA_DelayedCallback {
  165. SLIST_ENTRY(UA_DelayedCallback) next;
  166. UA_ServerCallback callback;
  167. void *data;
  168. } UA_DelayedCallback;
  169. UA_StatusCode
  170. UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
  171. void *data) {
  172. UA_DelayedCallback *dc =
  173. (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback));
  174. if(!dc)
  175. return UA_STATUSCODE_BADOUTOFMEMORY;
  176. dc->callback = callback;
  177. dc->data = data;
  178. SLIST_INSERT_HEAD(&server->delayedCallbacks, dc, next);
  179. return UA_STATUSCODE_GOOD;
  180. }
  181. void UA_Server_cleanupDelayedCallbacks(UA_Server *server) {
  182. UA_DelayedCallback *dc, *dc_tmp;
  183. SLIST_FOREACH_SAFE(dc, &server->delayedCallbacks, next, dc_tmp) {
  184. SLIST_REMOVE(&server->delayedCallbacks, dc, UA_DelayedCallback, next);
  185. dc->callback(server, dc->data);
  186. UA_free(dc);
  187. }
  188. }
  189. #else /* UA_ENABLE_MULTITHREADING */
  190. UA_StatusCode
  191. UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
  192. void *data) {
  193. size_t dcsize = sizeof(WorkerCallback) +
  194. (sizeof(UA_UInt32) * server->config.nThreads);
  195. WorkerCallback *dc = (WorkerCallback*)UA_malloc(dcsize);
  196. if(!dc)
  197. return UA_STATUSCODE_BADOUTOFMEMORY;
  198. /* Enqueue for the worker threads */
  199. dc->callback = callback;
  200. dc->data = data;
  201. dc->delayed = true;
  202. dc->countersSampled = false;
  203. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  204. SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
  205. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  206. /* Wake up sleeping workers */
  207. pthread_cond_broadcast(&server->dispatchQueue_condition);
  208. return UA_STATUSCODE_GOOD;
  209. }
  210. /* Called from the worker loop */
  211. static void
  212. processDelayedCallback(UA_Server *server, WorkerCallback *dc) {
  213. /* Set the worker counters */
  214. if(!dc->countersSampled) {
  215. for(size_t i = 0; i < server->config.nThreads; ++i)
  216. dc->workerCounters[i] = server->workers[i].counter;
  217. dc->countersSampled = true;
  218. /* Re-add to the dispatch queue */
  219. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  220. SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
  221. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  222. /* Wake up sleeping workers */
  223. pthread_cond_broadcast(&server->dispatchQueue_condition);
  224. return;
  225. }
  226. /* Have all other jobs finished? */
  227. UA_Boolean ready = true;
  228. for(size_t i = 0; i < server->config.nThreads; ++i) {
  229. if(dc->workerCounters[i] == server->workers[i].counter) {
  230. ready = false;
  231. break;
  232. }
  233. }
  234. /* Re-add to the dispatch queue.
  235. * TODO: What is the impact of this loop?
  236. * Can we add a small delay here? */
  237. if(!ready) {
  238. pthread_mutex_lock(&server->dispatchQueue_accessMutex);
  239. SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
  240. pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
  241. /* Wake up sleeping workers */
  242. pthread_cond_broadcast(&server->dispatchQueue_condition);
  243. return;
  244. }
  245. /* Execute the callback */
  246. dc->callback(server, dc->data);
  247. UA_free(dc);
  248. }
  249. #endif
  250. /**
  251. * Main Server Loop
  252. * ----------------
  253. * Start: Spin up the workers and the network layer and sample the server's
  254. * start time.
  255. * Iterate: Process repeated callbacks and events in the network layer.
  256. * This part can be driven from an external main-loop in an
  257. * event-driven single-threaded architecture.
  258. * Stop: Stop workers, finish all callbacks, stop the network layer,
  259. * clean up */
  260. UA_StatusCode
  261. UA_Server_run_startup(UA_Server *server) {
  262. UA_Variant var;
  263. UA_StatusCode result = UA_STATUSCODE_GOOD;
  264. /* Sample the start time and set it to the Server object */
  265. server->startTime = UA_DateTime_now();
  266. UA_Variant_init(&var);
  267. UA_Variant_setScalar(&var, &server->startTime, &UA_TYPES[UA_TYPES_DATETIME]);
  268. UA_Server_writeValue(server,
  269. UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_STARTTIME),
  270. var);
  271. /* Start the networklayers */
  272. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  273. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  274. result |= nl->start(nl, &server->config.customHostname);
  275. }
  276. /* Spin up the worker threads */
  277. #ifdef UA_ENABLE_MULTITHREADING
  278. UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
  279. "Spinning up %u worker thread(s)", server->config.nThreads);
  280. pthread_mutex_init(&server->dispatchQueue_accessMutex, NULL);
  281. pthread_cond_init(&server->dispatchQueue_condition, NULL);
  282. pthread_mutex_init(&server->dispatchQueue_conditionMutex, NULL);
  283. server->workers = (UA_Worker*)UA_malloc(server->config.nThreads * sizeof(UA_Worker));
  284. if(!server->workers)
  285. return UA_STATUSCODE_BADOUTOFMEMORY;
  286. for(size_t i = 0; i < server->config.nThreads; ++i) {
  287. UA_Worker *worker = &server->workers[i];
  288. worker->server = server;
  289. worker->counter = 0;
  290. worker->running = true;
  291. pthread_create(&worker->thr, NULL, (void* (*)(void*))workerLoop, worker);
  292. }
  293. #endif
  294. /* Start the multicast discovery server */
  295. #ifdef UA_ENABLE_DISCOVERY_MULTICAST
  296. if(server->config.applicationDescription.applicationType ==
  297. UA_APPLICATIONTYPE_DISCOVERYSERVER)
  298. startMulticastDiscoveryServer(server);
  299. #endif
  300. return result;
  301. }
  302. UA_UInt16
  303. UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
  304. /* Process repeated work */
  305. UA_DateTime now = UA_DateTime_nowMonotonic();
  306. UA_DateTime nextRepeated =
  307. UA_Timer_process(&server->timer, now,
  308. (UA_TimerDispatchCallback)UA_Server_workerCallback,
  309. server);
  310. UA_DateTime latest = now + (UA_MAXTIMEOUT * UA_DATETIME_MSEC);
  311. if(nextRepeated > latest)
  312. nextRepeated = latest;
  313. UA_UInt16 timeout = 0;
  314. /* round always to upper value to avoid timeout to be set to 0
  315. * if(nextRepeated - now) < (UA_DATETIME_MSEC/2) */
  316. if(waitInternal)
  317. timeout = (UA_UInt16)(((nextRepeated - now) + (UA_DATETIME_MSEC - 1)) / UA_DATETIME_MSEC);
  318. /* Listen on the networklayer */
  319. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  320. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  321. nl->listen(nl, server, timeout);
  322. }
  323. #ifndef UA_ENABLE_MULTITHREADING
  324. /* Process delayed callbacks when all callbacks and network events are done.
  325. * If multithreading is enabled, the cleanup of delayed values is attempted
  326. * by a callback in the job queue. */
  327. UA_Server_cleanupDelayedCallbacks(server);
  328. #endif
  329. #if defined(UA_ENABLE_DISCOVERY_MULTICAST) && !defined(UA_ENABLE_MULTITHREADING)
  330. if(server->config.applicationDescription.applicationType ==
  331. UA_APPLICATIONTYPE_DISCOVERYSERVER) {
  332. // TODO multicastNextRepeat does not consider new input data (requests)
  333. // on the socket. It will be handled on the next call. if needed, we
  334. // need to use select with timeout on the multicast socket
  335. // server->mdnsSocket (see example in mdnsd library) on higher level.
  336. UA_DateTime multicastNextRepeat = 0;
  337. UA_StatusCode hasNext =
  338. iterateMulticastDiscoveryServer(server, &multicastNextRepeat, UA_TRUE);
  339. if(hasNext == UA_STATUSCODE_GOOD && multicastNextRepeat < nextRepeated)
  340. nextRepeated = multicastNextRepeat;
  341. }
  342. #endif
  343. now = UA_DateTime_nowMonotonic();
  344. timeout = 0;
  345. if(nextRepeated > now)
  346. timeout = (UA_UInt16)((nextRepeated - now) / UA_DATETIME_MSEC);
  347. return timeout;
  348. }
  349. UA_StatusCode
  350. UA_Server_run_shutdown(UA_Server *server) {
  351. /* Stop the netowrk layer */
  352. for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
  353. UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
  354. nl->stop(nl, server);
  355. }
  356. #ifdef UA_ENABLE_MULTITHREADING
  357. /* Shut down the workers */
  358. if(server->workers) {
  359. UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
  360. "Shutting down %u worker thread(s)",
  361. server->config.nThreads);
  362. for(size_t i = 0; i < server->config.nThreads; ++i)
  363. server->workers[i].running = false;
  364. pthread_cond_broadcast(&server->dispatchQueue_condition);
  365. for(size_t i = 0; i < server->config.nThreads; ++i)
  366. pthread_join(server->workers[i].thr, NULL);
  367. UA_free(server->workers);
  368. server->workers = NULL;
  369. }
  370. /* Execute the remaining callbacks in the dispatch queue. Also executes
  371. * delayed callbacks. */
  372. UA_Server_cleanupDispatchQueue(server);
  373. #else
  374. /* Process remaining delayed callbacks */
  375. UA_Server_cleanupDelayedCallbacks(server);
  376. #endif
  377. #ifdef UA_ENABLE_DISCOVERY_MULTICAST
  378. /* Stop multicast discovery */
  379. if(server->config.applicationDescription.applicationType ==
  380. UA_APPLICATIONTYPE_DISCOVERYSERVER)
  381. stopMulticastDiscoveryServer(server);
  382. #endif
  383. return UA_STATUSCODE_GOOD;
  384. }
  385. UA_StatusCode
  386. UA_Server_run(UA_Server *server, volatile UA_Boolean *running) {
  387. UA_StatusCode retval = UA_Server_run_startup(server);
  388. if(retval != UA_STATUSCODE_GOOD)
  389. return retval;
  390. #ifdef UA_ENABLE_VALGRIND_INTERACTIVE
  391. size_t loopCount = 0;
  392. #endif
  393. while(*running) {
  394. #ifdef UA_ENABLE_VALGRIND_INTERACTIVE
  395. if(loopCount == 0) {
  396. VALGRIND_DO_LEAK_CHECK;
  397. }
  398. ++loopCount;
  399. loopCount %= UA_VALGRIND_INTERACTIVE_INTERVAL;
  400. #endif
  401. UA_Server_run_iterate(server, true);
  402. }
  403. return UA_Server_run_shutdown(server);
  404. }