ua_server_worker.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. #define __USE_POSIX
  2. #define _XOPEN_SOURCE 500
  3. #define __USE_POSIX199309
  4. #ifndef WIN32
  5. #include <sys/time.h>
  6. #endif
  7. #include <time.h>
  8. #include "ua_server_internal.h"
  9. /**
  10. * There are three types of work:
  11. *
  12. * 1. Ordinary WorkItems (that are dispatched to worker threads if
  13. * multithreading is activated)
  14. *
  15. * 2. Timed work that is executed at a precise date (with an optional repetition
  16. * interval)
  17. *
  18. * 3. Delayed work that is executed at a later time when it is guaranteed that
  19. * all previous work has actually finished (only for multithreading)
  20. */
  21. #define MAXTIMEOUT 50000 // max timeout in usec until the next main loop iteration
  22. #define BATCHSIZE 20 // max size of worklists that are dispatched to workers
  23. static void processWork(UA_Server *server, const UA_WorkItem *work, UA_Int32 workSize) {
  24. for(UA_Int32 i = 0;i<workSize;i++) {
  25. const UA_WorkItem *item = &work[i];
  26. switch(item->type) {
  27. case UA_WORKITEMTYPE_BINARYNETWORKMESSAGE:
  28. UA_Server_processBinaryMessage(server, item->work.binaryNetworkMessage.connection,
  29. &item->work.binaryNetworkMessage.message);
  30. UA_free(item->work.binaryNetworkMessage.message.data);
  31. break;
  32. case UA_WORKITEMTYPE_METHODCALL:
  33. case UA_WORKITEMTYPE_DELAYEDMETHODCALL:
  34. item->work.methodCall.method(server, item->work.methodCall.data);
  35. break;
  36. default:
  37. break;
  38. }
  39. }
  40. }
  41. /*******************************/
  42. /* Worker Threads and Dispatch */
  43. /*******************************/
  44. #ifdef UA_MULTITHREADING
  45. /** Entry in the dipatch queue */
  46. struct workListNode {
  47. struct cds_wfcq_node node; // node for the queue
  48. UA_UInt32 workSize;
  49. UA_WorkItem *work;
  50. };
  51. /** Dispatch work to workers. Slices the work up if it contains more than
  52. BATCHSIZE items. The work array is freed by the worker threads. */
  53. static void dispatchWork(UA_Server *server, UA_Int32 workSize, UA_WorkItem *work) {
  54. UA_Int32 startIndex = workSize; // start at the end
  55. while(workSize > 0) {
  56. UA_Int32 size = BATCHSIZE;
  57. if(size > workSize)
  58. size = workSize;
  59. startIndex = startIndex - size;
  60. struct workListNode *wln = UA_malloc(sizeof(struct workListNode));
  61. if(startIndex > 0) {
  62. UA_WorkItem *workSlice = UA_malloc(size * sizeof(UA_WorkItem));
  63. UA_memcpy(workSlice, &work[startIndex], size * sizeof(UA_WorkItem));
  64. *wln = (struct workListNode){.workSize = size, .work = workSlice};
  65. }
  66. else {
  67. // do not alloc, but forward the original array
  68. *wln = (struct workListNode){.workSize = size, .work = work};
  69. }
  70. cds_wfcq_node_init(&wln->node);
  71. cds_wfcq_enqueue(&server->dispatchQueue_head, &server->dispatchQueue_tail, &wln->node);
  72. workSize -= size;
  73. }
  74. }
  75. // throwaway struct to bring data into the worker threads
  76. struct workerStartData {
  77. UA_Server *server;
  78. UA_UInt32 **workerCounter;
  79. };
  80. /** Waits until work arrives in the dispatch queue (restart after 10ms) and
  81. processes it. */
  82. static void * workerLoop(struct workerStartData *startInfo) {
  83. rcu_register_thread();
  84. UA_UInt32 *c = UA_malloc(sizeof(UA_UInt32));
  85. uatomic_set(c, 0);
  86. *startInfo->workerCounter = c;
  87. UA_Server *server = startInfo->server;
  88. UA_free(startInfo);
  89. pthread_mutex_t mutex; // required for the condition variable
  90. pthread_mutex_init(&mutex,0);
  91. pthread_mutex_lock(&mutex);
  92. struct timespec to;
  93. while(*server->running) {
  94. struct workListNode *wln = (struct workListNode*)
  95. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  96. if(wln) {
  97. processWork(server, wln->work, wln->workSize);
  98. UA_free(wln->work);
  99. UA_free(wln);
  100. } else {
  101. clock_gettime(CLOCK_REALTIME, &to);
  102. to.tv_sec += 2;
  103. pthread_cond_timedwait(&server->dispatchQueue_condition, &mutex, &to);
  104. }
  105. uatomic_inc(c); // increase the workerCounter;
  106. }
  107. pthread_mutex_unlock(&mutex);
  108. pthread_mutex_destroy(&mutex);
  109. rcu_unregister_thread();
  110. return UA_NULL;
  111. }
  112. static void emptyDispatchQueue(UA_Server *server) {
  113. while(!cds_wfcq_empty(&server->dispatchQueue_head, &server->dispatchQueue_tail)) {
  114. struct workListNode *wln = (struct workListNode*)
  115. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  116. processWork(server, wln->work, wln->workSize);
  117. UA_free(wln->work);
  118. UA_free(wln);
  119. }
  120. }
  121. #endif
  122. /**************/
  123. /* Timed Work */
  124. /**************/
  125. struct UA_TimedWork {
  126. LIST_ENTRY(UA_TimedWork) pointers;
  127. UA_UInt16 workSize;
  128. UA_WorkItem *work;
  129. UA_Guid *workIds;
  130. UA_DateTime time;
  131. UA_UInt32 repetitionInterval; // in 100ns resolution, 0 means no repetition
  132. };
  133. /* The item is copied and not freed by this function. */
  134. static UA_StatusCode addTimedWork(UA_Server *server, const UA_WorkItem *item, UA_DateTime firstTime,
  135. UA_UInt32 repetitionInterval, UA_Guid *resultWorkGuid) {
  136. UA_TimedWork *tw, *lastTw = UA_NULL;
  137. // search for matching entry
  138. LIST_FOREACH(tw, &server->timedWork, pointers) {
  139. if(tw->repetitionInterval == repetitionInterval &&
  140. (repetitionInterval > 0 || tw->time == firstTime))
  141. break; // found a matching entry
  142. if(tw->time > firstTime) {
  143. tw = UA_NULL; // not matchin entry exists
  144. lastTw = tw;
  145. break;
  146. }
  147. }
  148. if(tw) {
  149. // append to matching entry
  150. tw->workSize++;
  151. tw->work = UA_realloc(tw->work, sizeof(UA_WorkItem)*tw->workSize);
  152. tw->workIds = UA_realloc(tw->workIds, sizeof(UA_Guid)*tw->workSize);
  153. tw->work[tw->workSize-1] = *item;
  154. tw->workIds[tw->workSize-1] = UA_Guid_random(&server->random_seed);
  155. if(resultWorkGuid)
  156. *resultWorkGuid = tw->workIds[tw->workSize-1];
  157. return UA_STATUSCODE_GOOD;
  158. }
  159. // create a new entry
  160. if(!(tw = UA_malloc(sizeof(UA_TimedWork))))
  161. return UA_STATUSCODE_BADOUTOFMEMORY;
  162. tw->workSize = 1;
  163. tw->time = firstTime;
  164. tw->repetitionInterval = repetitionInterval;
  165. tw->work = UA_malloc(sizeof(UA_WorkItem));
  166. tw->work[0] = *item;
  167. tw->workIds = UA_malloc(sizeof(UA_Guid));
  168. tw->workIds[0] = UA_Guid_random(&server->random_seed);
  169. if(lastTw)
  170. LIST_INSERT_AFTER(lastTw, tw, pointers);
  171. else
  172. LIST_INSERT_HEAD(&server->timedWork, tw, pointers);
  173. if(resultWorkGuid)
  174. *resultWorkGuid = tw->workIds[0];
  175. return UA_STATUSCODE_GOOD;
  176. }
  177. // Currently, these functions need to get the server mutex, but should be sufficiently fast
  178. UA_StatusCode UA_Server_addTimedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_DateTime executionTime,
  179. UA_Guid *resultWorkGuid) {
  180. return addTimedWork(server, work, executionTime, 0, resultWorkGuid);
  181. }
  182. UA_StatusCode UA_Server_addRepeatedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_UInt32 interval,
  183. UA_Guid *resultWorkGuid) {
  184. return addTimedWork(server, work, UA_DateTime_now() + interval, interval, resultWorkGuid);
  185. }
  186. /** Dispatches timed work, returns the timeout until the next timed work in ms */
  187. static UA_UInt16 processTimedWork(UA_Server *server) {
  188. UA_DateTime current = UA_DateTime_now();
  189. UA_TimedWork *next = LIST_FIRST(&server->timedWork);
  190. UA_TimedWork *tw = UA_NULL;
  191. while(next) {
  192. tw = next;
  193. if(tw->time > current)
  194. break;
  195. next = LIST_NEXT(tw, pointers);
  196. #ifdef UA_MULTITHREADING
  197. if(tw->repetitionInterval > 0) {
  198. // copy the entry and insert at the new location
  199. UA_WorkItem *workCopy = UA_malloc(sizeof(UA_WorkItem) * tw->workSize);
  200. UA_memcpy(workCopy, tw->work, sizeof(UA_WorkItem) * tw->workSize);
  201. dispatchWork(server, tw->workSize, workCopy); // frees the work pointer
  202. tw->time += tw->repetitionInterval;
  203. UA_TimedWork *prevTw = tw; // after which tw do we insert?
  204. while(UA_TRUE) {
  205. UA_TimedWork *n = LIST_NEXT(prevTw, pointers);
  206. if(!n || n->time > tw->time)
  207. break;
  208. prevTw = n;
  209. }
  210. if(prevTw != tw) {
  211. LIST_REMOVE(tw, pointers);
  212. LIST_INSERT_AFTER(prevTw, tw, pointers);
  213. }
  214. } else {
  215. dispatchWork(server, tw->workSize, tw->work); // frees the work pointer
  216. LIST_REMOVE(tw, pointers);
  217. UA_free(tw->workIds);
  218. UA_free(tw);
  219. }
  220. #else
  221. // 1) Process the work since it is past its due date
  222. processWork(server, tw->work, tw->workSize); // does not free the work
  223. // 2) If the work is repeated, add it back into the list. Otherwise remove it.
  224. if(tw->repetitionInterval > 0) {
  225. tw->time += tw->repetitionInterval;
  226. UA_TimedWork *prevTw = tw;
  227. while(UA_TRUE) {
  228. UA_TimedWork *n = LIST_NEXT(prevTw, pointers);
  229. if(!n || n->time > tw->time)
  230. break;
  231. prevTw = n;
  232. }
  233. if(prevTw != tw) {
  234. LIST_REMOVE(tw, pointers);
  235. LIST_INSERT_AFTER(prevTw, tw, pointers);
  236. }
  237. } else {
  238. LIST_REMOVE(tw, pointers);
  239. UA_free(tw->work);
  240. UA_free(tw->workIds);
  241. UA_free(tw);
  242. }
  243. #endif
  244. }
  245. // check if the next timed work is sooner than the usual timeout
  246. UA_TimedWork *first = LIST_FIRST(&server->timedWork);
  247. UA_UInt16 timeout = MAXTIMEOUT;
  248. if(first) {
  249. timeout = (first->time - current)/10;
  250. if(timeout > MAXTIMEOUT)
  251. timeout = MAXTIMEOUT;
  252. }
  253. return timeout;
  254. }
  255. void UA_Server_deleteTimedWork(UA_Server *server) {
  256. UA_TimedWork *current;
  257. UA_TimedWork *next = LIST_FIRST(&server->timedWork);
  258. while(next) {
  259. current = next;
  260. next = LIST_NEXT(current, pointers);
  261. LIST_REMOVE(current, pointers);
  262. UA_free(current->work);
  263. UA_free(current->workIds);
  264. UA_free(current);
  265. }
  266. }
  267. /****************/
  268. /* Delayed Work */
  269. /****************/
  270. #ifdef UA_MULTITHREADING
  271. #define DELAYEDWORKSIZE 100 // Collect delayed work until we have DELAYEDWORKSIZE items
  272. struct UA_DelayedWork {
  273. UA_DelayedWork *next;
  274. UA_UInt32 *workerCounters; // initially UA_NULL until a workitem gets the counters
  275. UA_UInt32 workItemsCount; // the size of the array is DELAYEDWORKSIZE, the count may be less
  276. UA_WorkItem *workItems; // when it runs full, a new delayedWork entry is created
  277. };
  278. // Dispatched as a methodcall-WorkItem when the delayedwork is added
  279. static void getCounters(UA_Server *server, UA_DelayedWork *delayed) {
  280. UA_UInt32 *counters = UA_malloc(server->nThreads * sizeof(UA_UInt32));
  281. for(UA_UInt16 i = 0;i<server->nThreads;i++)
  282. counters[i] = *server->workerCounters[i];
  283. delayed->workerCounters = counters;
  284. }
  285. // Call from the main thread only. This is the only function that modifies
  286. // server->delayedWork. processDelayedWorkQueue modifies the "next" (after the
  287. // head).
  288. static void addDelayedWork(UA_Server *server, UA_WorkItem work) {
  289. UA_DelayedWork *dw = server->delayedWork;
  290. if(!dw || dw->workItemsCount >= DELAYEDWORKSIZE) {
  291. UA_DelayedWork *newwork = UA_malloc(sizeof(UA_DelayedWork));
  292. newwork->workItems = UA_malloc(sizeof(UA_WorkItem)*DELAYEDWORKSIZE);
  293. newwork->workItemsCount = 0;
  294. newwork->workerCounters = UA_NULL;
  295. newwork->next = server->delayedWork;
  296. // dispatch a method that sets the counter
  297. if(dw && dw->workItemsCount >= DELAYEDWORKSIZE) {
  298. UA_WorkItem *setCounter = UA_malloc(sizeof(UA_WorkItem));
  299. *setCounter = (UA_WorkItem)
  300. {.type = UA_WORKITEMTYPE_METHODCALL,
  301. .work.methodCall = {.method = (void (*)(UA_Server*, void*))getCounters, .data = dw}};
  302. dispatchWork(server, 1, setCounter);
  303. }
  304. server->delayedWork = newwork;
  305. dw = newwork;
  306. }
  307. dw->workItems[dw->workItemsCount] = work;
  308. dw->workItemsCount++;
  309. }
  310. static void processDelayedWork(UA_Server *server) {
  311. UA_DelayedWork *dw = server->delayedWork;
  312. while(dw) {
  313. processWork(server, dw->workItems, dw->workItemsCount);
  314. UA_DelayedWork *next = dw->next;
  315. UA_free(dw->workerCounters);
  316. UA_free(dw->workItems);
  317. UA_free(dw);
  318. dw = next;
  319. }
  320. }
  321. // Execute this every N seconds (repeated work) to execute delayed work that is ready
  322. static void dispatchDelayedWork(UA_Server *server, void *data /* not used, but needed for the signature*/) {
  323. UA_DelayedWork *dw = UA_NULL;
  324. UA_DelayedWork *readydw = UA_NULL;
  325. UA_DelayedWork *beforedw = server->delayedWork;
  326. // start at the second...
  327. if(beforedw)
  328. dw = beforedw->next;
  329. // find the first delayedwork where the counters are set and have been moved
  330. while(dw) {
  331. if(!dw->workerCounters) {
  332. beforedw = dw;
  333. dw = dw->next;
  334. continue;
  335. }
  336. UA_Boolean countersMoved = UA_TRUE;
  337. for(UA_UInt16 i=0;i<server->nThreads;i++) {
  338. if(*server->workerCounters[i] == dw->workerCounters[i])
  339. countersMoved = UA_FALSE;
  340. break;
  341. }
  342. if(countersMoved) {
  343. readydw = uatomic_xchg(&beforedw->next, UA_NULL);
  344. break;
  345. } else {
  346. beforedw = dw;
  347. dw = dw->next;
  348. }
  349. }
  350. // we have a ready entry. all afterwards are also ready
  351. while(readydw) {
  352. dispatchWork(server, readydw->workItemsCount, readydw->workItems);
  353. beforedw = readydw;
  354. readydw = readydw->next;
  355. UA_free(beforedw->workerCounters);
  356. UA_free(beforedw);
  357. }
  358. }
  359. #endif
  360. /********************/
  361. /* Main Server Loop */
  362. /********************/
  363. UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *running) {
  364. #ifdef UA_MULTITHREADING
  365. // 1) Prepare the threads
  366. server->running = running; // the threads need to access the variable
  367. server->nThreads = nThreads;
  368. pthread_cond_init(&server->dispatchQueue_condition, 0);
  369. pthread_t *thr = UA_malloc(nThreads * sizeof(pthread_t));
  370. server->workerCounters = UA_malloc(nThreads * sizeof(UA_UInt32 *));
  371. for(UA_UInt32 i=0;i<nThreads;i++) {
  372. struct workerStartData *startData = UA_malloc(sizeof(struct workerStartData));
  373. startData->server = server;
  374. startData->workerCounter = &server->workerCounters[i];
  375. pthread_create(&thr[i], UA_NULL, (void* (*)(void*))workerLoop, startData);
  376. }
  377. UA_WorkItem processDelayed = {.type = UA_WORKITEMTYPE_METHODCALL,
  378. .work.methodCall = {.method = dispatchDelayedWork,
  379. .data = UA_NULL} };
  380. UA_Server_addRepeatedWorkItem(server, &processDelayed, 10000000, UA_NULL);
  381. #endif
  382. // 2a) Start the networklayers
  383. for(UA_Int32 i=0;i<server->nlsSize;i++)
  384. server->nls[i].start(server->nls[i].nlHandle, &server->logger);
  385. // 2b) Init server's meta-information
  386. //fill startTime
  387. server->startTime = UA_DateTime_now();
  388. //fill build date
  389. {
  390. static struct tm ct;
  391. ct.tm_year = (__DATE__[7] - '0') * 1000 + (__DATE__[8] - '0') * 100 + (__DATE__[9] - '0') * 10 + (__DATE__[10] - '0')- 1900;
  392. if (0) ;
  393. else if ((__DATE__[0]=='J') && (__DATE__[1]=='a') && (__DATE__[2]=='n')) ct.tm_mon = 1-1;
  394. else if ((__DATE__[0]=='F') && (__DATE__[1]=='e') && (__DATE__[2]=='b')) ct.tm_mon = 2-1;
  395. else if ((__DATE__[0]=='M') && (__DATE__[1]=='a') && (__DATE__[2]=='r')) ct.tm_mon = 3-1;
  396. else if ((__DATE__[0]=='A') && (__DATE__[1]=='p') && (__DATE__[2]=='r')) ct.tm_mon = 4-1;
  397. else if ((__DATE__[0]=='M') && (__DATE__[1]=='a') && (__DATE__[2]=='y')) ct.tm_mon = 5-1;
  398. else if ((__DATE__[0]=='J') && (__DATE__[1]=='u') && (__DATE__[2]=='n')) ct.tm_mon = 6-1;
  399. else if ((__DATE__[0]=='J') && (__DATE__[1]=='u') && (__DATE__[2]=='l')) ct.tm_mon = 7-1;
  400. else if ((__DATE__[0]=='A') && (__DATE__[1]=='u') && (__DATE__[2]=='g')) ct.tm_mon = 8-1;
  401. else if ((__DATE__[0]=='S') && (__DATE__[1]=='e') && (__DATE__[2]=='p')) ct.tm_mon = 9-1;
  402. else if ((__DATE__[0]=='O') && (__DATE__[1]=='c') && (__DATE__[2]=='t')) ct.tm_mon = 10-1;
  403. else if ((__DATE__[0]=='N') && (__DATE__[1]=='o') && (__DATE__[2]=='v')) ct.tm_mon = 11-1;
  404. else if ((__DATE__[0]=='D') && (__DATE__[1]=='e') && (__DATE__[2]=='c')) ct.tm_mon = 12-1;
  405. // special case to handle __DATE__ not inserting leading zero on day of month
  406. // if Day of month is less than 10 - it inserts a blank character
  407. // this results in a negative number for tm_mday
  408. if(__DATE__[4] == ' ')
  409. {
  410. ct.tm_mday = __DATE__[5]-'0';
  411. }
  412. else
  413. {
  414. ct.tm_mday = (__DATE__[4]-'0')*10 + (__DATE__[5]-'0');
  415. }
  416. ct.tm_hour = ((__TIME__[0] - '0') * 10 + __TIME__[1] - '0');
  417. ct.tm_min = ((__TIME__[3] - '0') * 10 + __TIME__[4] - '0');
  418. ct.tm_sec = ((__TIME__[6] - '0') * 10 + __TIME__[7] - '0');
  419. ct.tm_isdst = -1; // information is not available.
  420. //FIXME: next 3 lines are copy-pasted from ua_types.c
  421. #define UNIX_EPOCH_BIAS_SEC 11644473600LL // Number of seconds from 1 Jan. 1601 00:00 to 1 Jan 1970 00:00 UTC
  422. #define HUNDRED_NANOSEC_PER_USEC 10LL
  423. #define HUNDRED_NANOSEC_PER_SEC (HUNDRED_NANOSEC_PER_USEC * 1000000LL)
  424. server->buildDate = (mktime(&ct) + UNIX_EPOCH_BIAS_SEC) * HUNDRED_NANOSEC_PER_SEC;
  425. }
  426. //3) The loop
  427. while(1) {
  428. // 3.1) Process timed work
  429. UA_UInt16 timeout = processTimedWork(server);
  430. // 3.2) Get work from the networklayer and dispatch it
  431. for(UA_Int32 i=0;i<server->nlsSize;i++) {
  432. UA_ServerNetworkLayer *nl = &server->nls[i];
  433. UA_WorkItem *work;
  434. UA_Int32 workSize;
  435. if(*running) {
  436. if(i == server->nlsSize-1)
  437. workSize = nl->getWork(nl->nlHandle, &work, timeout);
  438. else
  439. workSize = nl->getWork(nl->nlHandle, &work, 0);
  440. } else {
  441. workSize = server->nls[i].stop(nl->nlHandle, &work);
  442. }
  443. #ifdef UA_MULTITHREADING
  444. // Filter out delayed work
  445. for(UA_Int32 k=0;k<workSize;k++) {
  446. if(work[k].type != UA_WORKITEMTYPE_DELAYEDMETHODCALL)
  447. continue;
  448. addDelayedWork(server, work[k]);
  449. work[k].type = UA_WORKITEMTYPE_NOTHING;
  450. }
  451. dispatchWork(server, workSize, work);
  452. if(workSize > 0)
  453. pthread_cond_broadcast(&server->dispatchQueue_condition);
  454. #else
  455. processWork(server, work, workSize);
  456. UA_free(work);
  457. #endif
  458. }
  459. // 3.3) Exit?
  460. if(!*running)
  461. break;
  462. }
  463. #ifdef UA_MULTITHREADING
  464. // 4) Clean up: Wait until all worker threads finish, then empty the
  465. // dispatch queue, then process the remaining delayed work
  466. for(UA_UInt32 i=0;i<nThreads;i++) {
  467. pthread_join(thr[i], UA_NULL);
  468. UA_free(server->workerCounters[i]);
  469. }
  470. UA_free(server->workerCounters);
  471. UA_free(thr);
  472. emptyDispatchQueue(server);
  473. processDelayedWork(server);
  474. #endif
  475. return UA_STATUSCODE_GOOD;
  476. }