ua_server_worker.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. #include <stdio.h>
  2. #include "ua_server_internal.h"
  3. /**
  4. * There are three types of work:
  5. *
  6. * 1. Ordinary WorkItems (that are dispatched to worker threads if
  7. * multithreading is activated)
  8. *
  9. * 2. Timed work that is executed at a precise date (with an optional repetition
  10. * interval)
  11. *
  12. * 3. Delayed work that is executed at a later time when it is guaranteed that
  13. * all previous work has actually finished (only for multithreading)
  14. */
  15. #define MAXTIMEOUT 5000 // max timeout in usec until the next main loop iteration
  16. #define BATCHSIZE 20 // max size of worklists that are dispatched to workers
  17. static void processWork(UA_Server *server, const UA_WorkItem *work, UA_Int32 workSize) {
  18. for(UA_Int32 i = 0;i<workSize;i++) {
  19. const UA_WorkItem *item = &work[i];
  20. switch(item->type) {
  21. case UA_WORKITEMTYPE_BINARYNETWORKMESSAGE:
  22. UA_Server_processBinaryMessage(server, item->work.binaryNetworkMessage.connection,
  23. &item->work.binaryNetworkMessage.message);
  24. UA_free(item->work.binaryNetworkMessage.message.data);
  25. break;
  26. case UA_WORKITEMTYPE_METHODCALL:
  27. case UA_WORKITEMTYPE_DELAYEDMETHODCALL:
  28. item->work.methodCall.method(server, item->work.methodCall.data);
  29. break;
  30. default:
  31. break;
  32. }
  33. }
  34. }
  35. /*******************************/
  36. /* Worker Threads and Dispatch */
  37. /*******************************/
  38. #ifdef UA_MULTITHREADING
  39. /** Entry in the dipatch queue */
  40. struct workListNode {
  41. struct cds_wfcq_node node; // node for the queue
  42. UA_UInt32 workSize;
  43. UA_WorkItem *work;
  44. };
  45. /** Dispatch work to workers. Slices the work up if it contains more than
  46. BATCHSIZE items. The work array is freed by the worker threads. */
  47. static void dispatchWork(UA_Server *server, UA_Int32 workSize, UA_WorkItem *work) {
  48. UA_Int32 startIndex = workSize; // start at the end
  49. while(workSize > 0) {
  50. UA_Int32 size = BATCHSIZE;
  51. if(size > workSize)
  52. size = workSize;
  53. startIndex = startIndex - size;
  54. struct workListNode *wln = UA_malloc(sizeof(struct workListNode));
  55. if(startIndex > 0) {
  56. UA_WorkItem *workSlice = UA_malloc(size * sizeof(UA_WorkItem));
  57. UA_memcpy(workSlice, &work[startIndex], size * sizeof(UA_WorkItem));
  58. *wln = (struct workListNode){.workSize = size, .work = workSlice};
  59. }
  60. else {
  61. // do not alloc, but forward the original array
  62. *wln = (struct workListNode){.workSize = size, .work = work};
  63. }
  64. cds_wfcq_node_init(&wln->node);
  65. cds_wfcq_enqueue(&server->dispatchQueue_head, &server->dispatchQueue_tail, &wln->node);
  66. workSize -= size;
  67. }
  68. }
  69. // throwaway struct to bring data into the worker threads
  70. struct workerStartData {
  71. UA_Server *server;
  72. UA_UInt32 **workerCounter;
  73. };
  74. /** Waits until work arrives in the dispatch queue (restart after 10ms) and
  75. processes it. */
  76. static void * workerLoop(struct workerStartData *startInfo) {
  77. rcu_register_thread();
  78. UA_UInt32 *c = UA_malloc(sizeof(UA_UInt32));
  79. uatomic_set(c, 0);
  80. *startInfo->workerCounter = c;
  81. UA_Server *server = startInfo->server;
  82. UA_free(startInfo);
  83. while(*server->running) {
  84. struct workListNode *wln = (struct workListNode*)
  85. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  86. if(wln) {
  87. processWork(server, wln->work, wln->workSize);
  88. UA_free(wln->work);
  89. UA_free(wln);
  90. }
  91. uatomic_inc(c); // increase the workerCounter;
  92. }
  93. rcu_unregister_thread();
  94. return UA_NULL;
  95. }
  96. static void emptyDispatchQueue(UA_Server *server) {
  97. while(!cds_wfcq_empty(&server->dispatchQueue_head, &server->dispatchQueue_tail)) {
  98. struct workListNode *wln = (struct workListNode*)
  99. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  100. processWork(server, wln->work, wln->workSize);
  101. UA_free(wln->work);
  102. UA_free(wln);
  103. }
  104. }
  105. #endif
  106. /**************/
  107. /* Timed Work */
  108. /**************/
  109. struct UA_TimedWork {
  110. LIST_ENTRY(UA_TimedWork) pointers;
  111. UA_UInt16 workSize;
  112. UA_WorkItem *work;
  113. UA_Guid *workIds;
  114. UA_DateTime time;
  115. UA_UInt32 repetitionInterval; // in 100ns resolution, 0 means no repetition
  116. };
  117. /* The item is copied and not freed by this function. */
  118. static UA_Guid addTimedWork(UA_Server *server, UA_WorkItem *item, UA_DateTime firstTime,
  119. UA_UInt32 repetitionInterval) {
  120. UA_TimedWork *tw, *lastTw = UA_NULL;
  121. // search for matching entry
  122. LIST_FOREACH(tw, &server->timedWork, pointers) {
  123. if(tw->repetitionInterval == repetitionInterval &&
  124. (repetitionInterval > 0 || tw->time == firstTime))
  125. break; // found a matching entry
  126. if(tw->time > firstTime) {
  127. tw = UA_NULL; // not matchin entry exists
  128. lastTw = tw;
  129. break;
  130. }
  131. }
  132. if(tw) {
  133. // append to matching entry
  134. tw->workSize++;
  135. tw->work = UA_realloc(tw->work, sizeof(UA_WorkItem)*tw->workSize);
  136. tw->workIds = UA_realloc(tw->workIds, sizeof(UA_Guid)*tw->workSize);
  137. tw->work[tw->workSize-1] = *item;
  138. tw->workIds[tw->workSize-1] = UA_Guid_random(&server->random_seed);
  139. return tw->workIds[tw->workSize-1];
  140. }
  141. // create a new entry
  142. tw = UA_malloc(sizeof(UA_TimedWork));
  143. tw->workSize = 1;
  144. tw->time = firstTime;
  145. tw->repetitionInterval = repetitionInterval;
  146. tw->work = UA_malloc(sizeof(UA_WorkItem));
  147. tw->work[0] = *item;
  148. tw->workIds = UA_malloc(sizeof(UA_Guid));
  149. tw->workIds[0] = UA_Guid_random(&server->random_seed);
  150. if(lastTw)
  151. LIST_INSERT_AFTER(lastTw, tw, pointers);
  152. else
  153. LIST_INSERT_HEAD(&server->timedWork, tw, pointers);
  154. return tw->workIds[0];
  155. }
  156. // Currently, these functions need to get the server mutex, but should be sufficiently fast
  157. UA_Guid UA_Server_addTimedWorkItem(UA_Server *server, UA_WorkItem *work, UA_DateTime time) {
  158. return addTimedWork(server, work, time, 0);
  159. }
  160. UA_Guid UA_Server_addRepeatedWorkItem(UA_Server *server, UA_WorkItem *work, UA_UInt32 interval) {
  161. return addTimedWork(server, work, UA_DateTime_now() + interval, interval);
  162. }
  163. /** Dispatches timed work, returns the timeout until the next timed work in ms */
  164. static UA_UInt16 processTimedWork(UA_Server *server) {
  165. UA_DateTime current = UA_DateTime_now();
  166. UA_TimedWork *next = LIST_FIRST(&server->timedWork);
  167. UA_TimedWork *tw = UA_NULL;
  168. while(next) {
  169. tw = next;
  170. if(tw->time > current)
  171. break;
  172. next = LIST_NEXT(tw, pointers);
  173. #ifdef UA_MULTITHREADING
  174. if(tw->repetitionInterval > 0) {
  175. // copy the entry and insert at the new location
  176. UA_WorkItem *workCopy = UA_malloc(sizeof(UA_WorkItem) * tw->workSize);
  177. UA_memcpy(workCopy, tw->work, sizeof(UA_WorkItem) * tw->workSize);
  178. dispatchWork(server, tw->workSize, workCopy); // frees the work pointer
  179. tw->time += tw->repetitionInterval;
  180. UA_TimedWork *prevTw = tw; // after which tw do we insert?
  181. while(UA_TRUE) {
  182. UA_TimedWork *next = LIST_NEXT(prevTw, pointers);
  183. if(!next || next->time > tw->time)
  184. break;
  185. prevTw = next;
  186. }
  187. if(prevTw != tw) {
  188. LIST_REMOVE(tw, pointers);
  189. LIST_INSERT_AFTER(prevTw, tw, pointers);
  190. }
  191. } else {
  192. dispatchWork(server, tw->workSize, tw->work); // frees the work pointer
  193. LIST_REMOVE(tw, pointers);
  194. UA_free(tw->workIds);
  195. UA_free(tw);
  196. }
  197. #else
  198. processWork(server,tw->work, tw->workSize); // does not free the work
  199. if(tw->repetitionInterval > 0) {
  200. tw->time += tw->repetitionInterval;
  201. UA_TimedWork *prevTw = tw;
  202. while(UA_TRUE) {
  203. UA_TimedWork *next = LIST_NEXT(prevTw, pointers);
  204. if(!next || next->time > tw->time)
  205. break;
  206. prevTw = next;
  207. }
  208. if(prevTw != tw) {
  209. LIST_REMOVE(tw, pointers);
  210. LIST_INSERT_AFTER(prevTw, tw, pointers);
  211. }
  212. } else {
  213. LIST_REMOVE(tw, pointers);
  214. UA_free(tw->work);
  215. UA_free(tw->workIds);
  216. UA_free(tw);
  217. }
  218. #endif
  219. }
  220. tw = LIST_FIRST(&server->timedWork);
  221. UA_UInt16 timeout = MAXTIMEOUT;
  222. if(tw)
  223. timeout = (tw->time - current)/10;
  224. return timeout;
  225. }
  226. void UA_Server_deleteTimedWork(UA_Server *server) {
  227. UA_TimedWork *tw;
  228. while((tw = LIST_FIRST(&server->timedWork))) {
  229. LIST_REMOVE(tw, pointers);
  230. UA_free(tw->work);
  231. UA_free(tw->workIds);
  232. UA_free(tw);
  233. }
  234. }
  235. /****************/
  236. /* Delayed Work */
  237. /****************/
  238. #ifdef UA_MULTITHREADING
  239. #define DELAYEDWORKSIZE 100 // Collect delayed work until we have DELAYEDWORKSIZE items
  240. struct UA_DelayedWork {
  241. UA_DelayedWork *next;
  242. UA_UInt32 *workerCounters; // initially UA_NULL until a workitem gets the counters
  243. UA_UInt32 workItemsCount; // the size of the array is DELAYEDWORKSIZE, the count may be less
  244. UA_WorkItem *workItems; // when it runs full, a new delayedWork entry is created
  245. };
  246. // Dispatched as a methodcall-WorkItem when the delayedwork is added
  247. static void getCounters(UA_Server *server, UA_DelayedWork *delayed) {
  248. UA_UInt32 *counters = UA_malloc(server->nThreads * sizeof(UA_UInt32));
  249. for(UA_UInt16 i = 0;i<server->nThreads;i++)
  250. counters[i] = *server->workerCounters[i];
  251. delayed->workerCounters = counters;
  252. }
  253. // Call from the main thread only. This is the only function that modifies
  254. // server->delayedWork. processDelayedWorkQueue modifies the "next" (after the
  255. // head).
  256. static void addDelayedWork(UA_Server *server, UA_WorkItem work) {
  257. UA_DelayedWork *dw = server->delayedWork;
  258. if(!dw || dw->workItemsCount >= DELAYEDWORKSIZE) {
  259. UA_DelayedWork *newwork = UA_malloc(sizeof(UA_DelayedWork));
  260. newwork->workItems = UA_malloc(sizeof(UA_WorkItem)*DELAYEDWORKSIZE);
  261. newwork->workItemsCount = 0;
  262. newwork->workerCounters = UA_NULL;
  263. newwork->next = server->delayedWork;
  264. // dispatch a method that sets the counter
  265. if(dw && dw->workItemsCount >= DELAYEDWORKSIZE) {
  266. UA_WorkItem *setCounter = UA_malloc(sizeof(UA_WorkItem));
  267. *setCounter = (UA_WorkItem)
  268. {.type = UA_WORKITEMTYPE_METHODCALL,
  269. .work.methodCall = {.method = (void (*)(UA_Server*, void*))getCounters, .data = dw}};
  270. dispatchWork(server, 1, setCounter);
  271. }
  272. server->delayedWork = newwork;
  273. dw = newwork;
  274. }
  275. dw->workItems[dw->workItemsCount] = work;
  276. dw->workItemsCount++;
  277. }
  278. static void processDelayedWork(UA_Server *server) {
  279. UA_DelayedWork *dw = server->delayedWork;
  280. while(dw) {
  281. processWork(server, dw->workItems, dw->workItemsCount);
  282. UA_DelayedWork *next = dw->next;
  283. UA_free(dw->workerCounters);
  284. UA_free(dw->workItems);
  285. UA_free(dw);
  286. dw = next;
  287. }
  288. }
  289. // Execute this every N seconds (repeated work) to execute delayed work that is ready
  290. static void dispatchDelayedWork(UA_Server *server, void *data /* not used, but needed for the signature*/) {
  291. UA_DelayedWork *dw = UA_NULL;
  292. UA_DelayedWork *readydw = UA_NULL;
  293. UA_DelayedWork *beforedw = server->delayedWork;
  294. // start at the second...
  295. if(beforedw)
  296. dw = beforedw->next;
  297. // find the first delayedwork where the counters are set and have been moved
  298. while(dw) {
  299. if(!dw->workerCounters) {
  300. beforedw = dw;
  301. dw = dw->next;
  302. continue;
  303. }
  304. UA_Boolean countersMoved = UA_TRUE;
  305. for(UA_UInt16 i=0;i<server->nThreads;i++) {
  306. if(*server->workerCounters[i] == dw->workerCounters[i])
  307. countersMoved = UA_FALSE;
  308. break;
  309. }
  310. if(countersMoved) {
  311. readydw = uatomic_xchg(&beforedw->next, UA_NULL);
  312. break;
  313. } else {
  314. beforedw = dw;
  315. dw = dw->next;
  316. }
  317. }
  318. // we have a ready entry. all afterwards are also ready
  319. while(readydw) {
  320. dispatchWork(server, readydw->workItemsCount, readydw->workItems);
  321. beforedw = readydw;
  322. readydw = readydw->next;
  323. UA_free(beforedw->workerCounters);
  324. UA_free(beforedw);
  325. }
  326. }
  327. #endif
  328. /********************/
  329. /* Main Server Loop */
  330. /********************/
  331. UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *running) {
  332. #ifdef UA_MULTITHREADING
  333. // 1) Prepare the threads
  334. server->running = running; // the threads need to access the variable
  335. server->nThreads = nThreads;
  336. pthread_t *thr = UA_malloc(nThreads * sizeof(pthread_t));
  337. server->workerCounters = UA_malloc(nThreads * sizeof(UA_UInt32 *));
  338. for(UA_UInt32 i=0;i<nThreads;i++) {
  339. struct workerStartData *startData = UA_malloc(sizeof(struct workerStartData));
  340. startData->server = server;
  341. startData->workerCounter = &server->workerCounters[i];
  342. pthread_create(&thr[i], UA_NULL, (void* (*)(void*))workerLoop, startData);
  343. }
  344. UA_WorkItem processDelayed = {.type = UA_WORKITEMTYPE_METHODCALL,
  345. .work.methodCall = {.method = dispatchDelayedWork,
  346. .data = UA_NULL} };
  347. UA_Server_addRepeatedWorkItem(server, &processDelayed, 10000000);
  348. #endif
  349. // 2) Start the networklayers
  350. for(UA_Int32 i=0;i<server->nlsSize;i++)
  351. server->nls[i].start(server->nls[i].nlHandle);
  352. // 3) The loop
  353. while(1) {
  354. // 3.1) Process timed work
  355. UA_UInt16 timeout = processTimedWork(server);
  356. // 3.2) Get work from the networklayer and dispatch it
  357. for(UA_Int32 i=0;i<server->nlsSize;i++) {
  358. UA_NetworkLayer *nl = &server->nls[i];
  359. UA_WorkItem *work;
  360. UA_Int32 workSize;
  361. if(*running) {
  362. if(i == server->nlsSize-1)
  363. workSize = nl->getWork(nl->nlHandle, &work, timeout);
  364. else
  365. workSize = nl->getWork(nl->nlHandle, &work, 0);
  366. } else {
  367. workSize = server->nls[i].stop(nl->nlHandle, &work);
  368. }
  369. #ifdef UA_MULTITHREADING
  370. // Filter out delayed work
  371. for(UA_Int32 k=0;k<workSize;k++) {
  372. if(work[k].type != UA_WORKITEMTYPE_DELAYEDMETHODCALL)
  373. continue;
  374. addDelayedWork(server, work[k]);
  375. work[k].type = UA_WORKITEMTYPE_NOTHING;
  376. }
  377. dispatchWork(server, workSize, work);
  378. #else
  379. processWork(server, work, workSize);
  380. UA_free(work);
  381. #endif
  382. }
  383. // 3.3) Exit?
  384. if(!*running)
  385. break;
  386. }
  387. #ifdef UA_MULTITHREADING
  388. // 4) Clean up: Wait until all worker threads finish, then empty the
  389. // dispatch queue, then process the remaining delayed work
  390. for(UA_UInt32 i=0;i<nThreads;i++) {
  391. pthread_join(thr[i], UA_NULL);
  392. UA_free(server->workerCounters[i]);
  393. }
  394. UA_free(server->workerCounters);
  395. UA_free(thr);
  396. emptyDispatchQueue(server);
  397. processDelayedWork(server);
  398. #endif
  399. return UA_STATUSCODE_GOOD;
  400. }