ua_server_worker.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. #include "ua_util.h"
  2. #include "ua_server_internal.h"
  3. /**
  4. * There are three types of work:
  5. *
  6. * 1. Ordinary WorkItems (that are dispatched to worker threads if
  7. * multithreading is activated)
  8. *
  9. * 2. Timed work that is executed at a precise date (with an optional repetition
  10. * interval)
  11. *
  12. * 3. Delayed work that is executed at a later time when it is guaranteed that
  13. * all previous work has actually finished (only for multithreading)
  14. */
  15. #define MAXTIMEOUT 50000 // max timeout in usec until the next main loop iteration
  16. #define BATCHSIZE 20 // max size of worklists that are dispatched to workers
  17. static void processWork(UA_Server *server, UA_WorkItem *work, UA_Int32 workSize) {
  18. for(UA_Int32 i = 0; i < workSize; i++) {
  19. UA_WorkItem *item = &work[i];
  20. switch(item->type) {
  21. case UA_WORKITEMTYPE_BINARYNETWORKMESSAGE:
  22. UA_Server_processBinaryMessage(server, item->work.binaryNetworkMessage.connection,
  23. &item->work.binaryNetworkMessage.message);
  24. item->work.binaryNetworkMessage.connection->releaseBuffer(item->work.binaryNetworkMessage.connection,
  25. &item->work.binaryNetworkMessage.message);
  26. break;
  27. case UA_WORKITEMTYPE_METHODCALL:
  28. case UA_WORKITEMTYPE_DELAYEDMETHODCALL:
  29. item->work.methodCall.method(server, item->work.methodCall.data);
  30. break;
  31. default:
  32. break;
  33. }
  34. }
  35. }
  36. /*******************************/
  37. /* Worker Threads and Dispatch */
  38. /*******************************/
  39. #ifdef UA_MULTITHREADING
  40. /** Entry in the dipatch queue */
  41. struct workListNode {
  42. struct cds_wfcq_node node; // node for the queue
  43. UA_UInt32 workSize;
  44. UA_WorkItem *work;
  45. };
  46. /** Dispatch work to workers. Slices the work up if it contains more than
  47. BATCHSIZE items. The work array is freed by the worker threads. */
  48. static void dispatchWork(UA_Server *server, UA_Int32 workSize, UA_WorkItem *work) {
  49. UA_Int32 startIndex = workSize; // start at the end
  50. while(workSize > 0) {
  51. UA_Int32 size = BATCHSIZE;
  52. if(size > workSize)
  53. size = workSize;
  54. startIndex = startIndex - size;
  55. struct workListNode *wln = UA_malloc(sizeof(struct workListNode));
  56. if(startIndex > 0) {
  57. UA_WorkItem *workSlice = UA_malloc(size * sizeof(UA_WorkItem));
  58. UA_memcpy(workSlice, &work[startIndex], size * sizeof(UA_WorkItem));
  59. *wln = (struct workListNode){.workSize = size, .work = workSlice};
  60. }
  61. else {
  62. // do not alloc, but forward the original array
  63. *wln = (struct workListNode){.workSize = size, .work = work};
  64. }
  65. cds_wfcq_node_init(&wln->node);
  66. cds_wfcq_enqueue(&server->dispatchQueue_head, &server->dispatchQueue_tail, &wln->node);
  67. workSize -= size;
  68. }
  69. }
  70. // throwaway struct to bring data into the worker threads
  71. struct workerStartData {
  72. UA_Server *server;
  73. UA_UInt32 **workerCounter;
  74. };
  75. /** Waits until work arrives in the dispatch queue (restart after 10ms) and
  76. processes it. */
  77. static void * workerLoop(struct workerStartData *startInfo) {
  78. rcu_register_thread();
  79. UA_UInt32 *c = UA_malloc(sizeof(UA_UInt32));
  80. uatomic_set(c, 0);
  81. *startInfo->workerCounter = c;
  82. UA_Server *server = startInfo->server;
  83. UA_free(startInfo);
  84. pthread_mutex_t mutex; // required for the condition variable
  85. pthread_mutex_init(&mutex,0);
  86. pthread_mutex_lock(&mutex);
  87. struct timespec to;
  88. while(*server->running) {
  89. struct workListNode *wln = (struct workListNode*)
  90. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  91. if(wln) {
  92. processWork(server, wln->work, wln->workSize);
  93. UA_free(wln->work);
  94. UA_free(wln);
  95. } else {
  96. clock_gettime(CLOCK_REALTIME, &to);
  97. to.tv_sec += 2;
  98. pthread_cond_timedwait(&server->dispatchQueue_condition, &mutex, &to);
  99. }
  100. uatomic_inc(c); // increase the workerCounter;
  101. }
  102. pthread_mutex_unlock(&mutex);
  103. pthread_mutex_destroy(&mutex);
  104. rcu_unregister_thread();
  105. return UA_NULL;
  106. }
  107. static void emptyDispatchQueue(UA_Server *server) {
  108. while(!cds_wfcq_empty(&server->dispatchQueue_head, &server->dispatchQueue_tail)) {
  109. struct workListNode *wln = (struct workListNode*)
  110. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  111. processWork(server, wln->work, wln->workSize);
  112. UA_free(wln->work);
  113. UA_free(wln);
  114. }
  115. }
  116. #endif
  117. /**************/
  118. /* Timed Work */
  119. /**************/
  120. struct UA_TimedWork {
  121. LIST_ENTRY(UA_TimedWork) pointers;
  122. UA_UInt16 workSize;
  123. UA_WorkItem *work;
  124. UA_Guid *workIds;
  125. UA_DateTime time;
  126. UA_UInt32 repetitionInterval; // in 100ns resolution, 0 means no repetition
  127. };
  128. /* The item is copied and not freed by this function. */
  129. static UA_StatusCode addTimedWork(UA_Server *server, const UA_WorkItem *item, UA_DateTime firstTime,
  130. UA_UInt32 repetitionInterval, UA_Guid *resultWorkGuid) {
  131. UA_TimedWork *tw, *lastTw = UA_NULL;
  132. // search for matching entry
  133. LIST_FOREACH(tw, &server->timedWork, pointers) {
  134. if(tw->repetitionInterval == repetitionInterval &&
  135. (repetitionInterval > 0 || tw->time == firstTime))
  136. break; // found a matching entry
  137. if(tw->time > firstTime) {
  138. tw = UA_NULL; // not matchin entry exists
  139. lastTw = tw;
  140. break;
  141. }
  142. }
  143. if(tw) {
  144. // append to matching entry
  145. tw->workSize++;
  146. UA_WorkItem *biggerWorkArray = UA_realloc(tw->work, sizeof(UA_WorkItem)*tw->workSize);
  147. if(!biggerWorkArray)
  148. return UA_STATUSCODE_BADOUTOFMEMORY;
  149. tw->work = biggerWorkArray;
  150. UA_Guid *biggerWorkIds = UA_realloc(tw->workIds, sizeof(UA_Guid)*tw->workSize);
  151. if(!biggerWorkIds)
  152. return UA_STATUSCODE_BADOUTOFMEMORY;
  153. tw->workIds = biggerWorkIds;
  154. tw->work[tw->workSize-1] = *item;
  155. tw->workIds[tw->workSize-1] = UA_Guid_random(&server->random_seed);
  156. if(resultWorkGuid)
  157. *resultWorkGuid = tw->workIds[tw->workSize-1];
  158. return UA_STATUSCODE_GOOD;
  159. }
  160. // create a new entry
  161. if(!(tw = UA_malloc(sizeof(UA_TimedWork))))
  162. return UA_STATUSCODE_BADOUTOFMEMORY;
  163. if(!(tw->work = UA_malloc(sizeof(UA_WorkItem)))) {
  164. UA_free(tw);
  165. return UA_STATUSCODE_BADOUTOFMEMORY;
  166. }
  167. if(!(tw->workIds = UA_malloc(sizeof(UA_Guid)))) {
  168. UA_free(tw->work);
  169. UA_free(tw);
  170. return UA_STATUSCODE_BADOUTOFMEMORY;
  171. }
  172. tw->workSize = 1;
  173. tw->time = firstTime;
  174. tw->repetitionInterval = repetitionInterval;
  175. tw->work[0] = *item;
  176. tw->workIds[0] = UA_Guid_random(&server->random_seed);
  177. if(lastTw)
  178. LIST_INSERT_AFTER(lastTw, tw, pointers);
  179. else
  180. LIST_INSERT_HEAD(&server->timedWork, tw, pointers);
  181. if(resultWorkGuid)
  182. *resultWorkGuid = tw->workIds[0];
  183. return UA_STATUSCODE_GOOD;
  184. }
  185. // Currently, these functions need to get the server mutex, but should be sufficiently fast
  186. UA_StatusCode UA_Server_addTimedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_DateTime executionTime,
  187. UA_Guid *resultWorkGuid) {
  188. return addTimedWork(server, work, executionTime, 0, resultWorkGuid);
  189. }
  190. UA_StatusCode UA_Server_addRepeatedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_UInt32 interval,
  191. UA_Guid *resultWorkGuid) {
  192. return addTimedWork(server, work, UA_DateTime_now() + interval, interval, resultWorkGuid);
  193. }
  194. /** Dispatches timed work, returns the timeout until the next timed work in ms */
  195. static UA_UInt16 processTimedWork(UA_Server *server) {
  196. UA_DateTime current = UA_DateTime_now();
  197. UA_TimedWork *next = LIST_FIRST(&server->timedWork);
  198. UA_TimedWork *tw = UA_NULL;
  199. while(next) {
  200. tw = next;
  201. if(tw->time > current)
  202. break;
  203. next = LIST_NEXT(tw, pointers);
  204. #ifdef UA_MULTITHREADING
  205. if(tw->repetitionInterval > 0) {
  206. // copy the entry and insert at the new location
  207. UA_WorkItem *workCopy = UA_malloc(sizeof(UA_WorkItem) * tw->workSize);
  208. UA_memcpy(workCopy, tw->work, sizeof(UA_WorkItem) * tw->workSize);
  209. dispatchWork(server, tw->workSize, workCopy); // frees the work pointer
  210. tw->time += tw->repetitionInterval;
  211. UA_TimedWork *prevTw = tw; // after which tw do we insert?
  212. while(UA_TRUE) {
  213. UA_TimedWork *n = LIST_NEXT(prevTw, pointers);
  214. if(!n || n->time > tw->time)
  215. break;
  216. prevTw = n;
  217. }
  218. if(prevTw != tw) {
  219. LIST_REMOVE(tw, pointers);
  220. LIST_INSERT_AFTER(prevTw, tw, pointers);
  221. }
  222. } else {
  223. dispatchWork(server, tw->workSize, tw->work); // frees the work pointer
  224. LIST_REMOVE(tw, pointers);
  225. UA_free(tw->workIds);
  226. UA_free(tw);
  227. }
  228. #else
  229. // 1) Process the work since it is past its due date
  230. processWork(server, tw->work, tw->workSize); // does not free the work
  231. // 2) If the work is repeated, add it back into the list. Otherwise remove it.
  232. if(tw->repetitionInterval > 0) {
  233. tw->time += tw->repetitionInterval;
  234. UA_TimedWork *prevTw = tw;
  235. while(UA_TRUE) {
  236. UA_TimedWork *n = LIST_NEXT(prevTw, pointers);
  237. if(!n || n->time > tw->time)
  238. break;
  239. prevTw = n;
  240. }
  241. if(prevTw != tw) {
  242. LIST_REMOVE(tw, pointers);
  243. LIST_INSERT_AFTER(prevTw, tw, pointers);
  244. }
  245. } else {
  246. LIST_REMOVE(tw, pointers);
  247. UA_free(tw->work);
  248. UA_free(tw->workIds);
  249. UA_free(tw);
  250. }
  251. #endif
  252. }
  253. // check if the next timed work is sooner than the usual timeout
  254. UA_TimedWork *first = LIST_FIRST(&server->timedWork);
  255. UA_UInt16 timeout = MAXTIMEOUT;
  256. if(first) {
  257. timeout = (first->time - current)/10;
  258. if(timeout > MAXTIMEOUT)
  259. timeout = MAXTIMEOUT;
  260. }
  261. return timeout;
  262. }
  263. void UA_Server_deleteTimedWork(UA_Server *server) {
  264. UA_TimedWork *current;
  265. UA_TimedWork *next = LIST_FIRST(&server->timedWork);
  266. while(next) {
  267. current = next;
  268. next = LIST_NEXT(current, pointers);
  269. LIST_REMOVE(current, pointers);
  270. UA_free(current->work);
  271. UA_free(current->workIds);
  272. UA_free(current);
  273. }
  274. }
  275. /****************/
  276. /* Delayed Work */
  277. /****************/
  278. #ifdef UA_MULTITHREADING
  279. #define DELAYEDWORKSIZE 100 // Collect delayed work until we have DELAYEDWORKSIZE items
  280. struct UA_DelayedWork {
  281. UA_DelayedWork *next;
  282. UA_UInt32 *workerCounters; // initially UA_NULL until a workitem gets the counters
  283. UA_UInt32 workItemsCount; // the size of the array is DELAYEDWORKSIZE, the count may be less
  284. UA_WorkItem *workItems; // when it runs full, a new delayedWork entry is created
  285. };
  286. // Dispatched as a methodcall-WorkItem when the delayedwork is added
  287. static void getCounters(UA_Server *server, UA_DelayedWork *delayed) {
  288. UA_UInt32 *counters = UA_malloc(server->nThreads * sizeof(UA_UInt32));
  289. for(UA_UInt16 i = 0;i<server->nThreads;i++)
  290. counters[i] = *server->workerCounters[i];
  291. delayed->workerCounters = counters;
  292. }
  293. // Call from the main thread only. This is the only function that modifies
  294. // server->delayedWork. processDelayedWorkQueue modifies the "next" (after the
  295. // head).
  296. static void addDelayedWork(UA_Server *server, UA_WorkItem work) {
  297. UA_DelayedWork *dw = server->delayedWork;
  298. if(!dw || dw->workItemsCount >= DELAYEDWORKSIZE) {
  299. UA_DelayedWork *newwork = UA_malloc(sizeof(UA_DelayedWork));
  300. newwork->workItems = UA_malloc(sizeof(UA_WorkItem)*DELAYEDWORKSIZE);
  301. newwork->workItemsCount = 0;
  302. newwork->workerCounters = UA_NULL;
  303. newwork->next = server->delayedWork;
  304. // dispatch a method that sets the counter
  305. if(dw && dw->workItemsCount >= DELAYEDWORKSIZE) {
  306. UA_WorkItem *setCounter = UA_malloc(sizeof(UA_WorkItem));
  307. *setCounter = (UA_WorkItem)
  308. {.type = UA_WORKITEMTYPE_METHODCALL,
  309. .work.methodCall = {.method = (void (*)(UA_Server*, void*))getCounters, .data = dw}};
  310. dispatchWork(server, 1, setCounter);
  311. }
  312. server->delayedWork = newwork;
  313. dw = newwork;
  314. }
  315. dw->workItems[dw->workItemsCount] = work;
  316. dw->workItemsCount++;
  317. }
  318. static void processDelayedWork(UA_Server *server) {
  319. UA_DelayedWork *dw = server->delayedWork;
  320. while(dw) {
  321. processWork(server, dw->workItems, dw->workItemsCount);
  322. UA_DelayedWork *next = dw->next;
  323. UA_free(dw->workerCounters);
  324. UA_free(dw->workItems);
  325. UA_free(dw);
  326. dw = next;
  327. }
  328. }
  329. // Execute this every N seconds (repeated work) to execute delayed work that is ready
  330. static void dispatchDelayedWork(UA_Server *server, void *data /* not used, but needed for the signature*/) {
  331. UA_DelayedWork *dw = UA_NULL;
  332. UA_DelayedWork *readydw = UA_NULL;
  333. UA_DelayedWork *beforedw = server->delayedWork;
  334. // start at the second...
  335. if(beforedw)
  336. dw = beforedw->next;
  337. // find the first delayedwork where the counters are set and have been moved
  338. while(dw) {
  339. if(!dw->workerCounters) {
  340. beforedw = dw;
  341. dw = dw->next;
  342. continue;
  343. }
  344. UA_Boolean countersMoved = UA_TRUE;
  345. for(UA_UInt16 i=0;i<server->nThreads;i++) {
  346. if(*server->workerCounters[i] == dw->workerCounters[i])
  347. countersMoved = UA_FALSE;
  348. break;
  349. }
  350. if(countersMoved) {
  351. readydw = uatomic_xchg(&beforedw->next, UA_NULL);
  352. break;
  353. } else {
  354. beforedw = dw;
  355. dw = dw->next;
  356. }
  357. }
  358. // we have a ready entry. all afterwards are also ready
  359. while(readydw) {
  360. dispatchWork(server, readydw->workItemsCount, readydw->workItems);
  361. beforedw = readydw;
  362. readydw = readydw->next;
  363. UA_free(beforedw->workerCounters);
  364. UA_free(beforedw);
  365. }
  366. }
  367. #endif
  368. /********************/
  369. /* Main Server Loop */
  370. /********************/
  371. UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *running) {
  372. #ifdef UA_MULTITHREADING
  373. // 1) Prepare the threads
  374. server->running = running; // the threads need to access the variable
  375. server->nThreads = nThreads;
  376. pthread_cond_init(&server->dispatchQueue_condition, 0);
  377. pthread_t *thr = UA_malloc(nThreads * sizeof(pthread_t));
  378. server->workerCounters = UA_malloc(nThreads * sizeof(UA_UInt32 *));
  379. for(UA_UInt32 i=0;i<nThreads;i++) {
  380. struct workerStartData *startData = UA_malloc(sizeof(struct workerStartData));
  381. startData->server = server;
  382. startData->workerCounter = &server->workerCounters[i];
  383. pthread_create(&thr[i], UA_NULL, (void* (*)(void*))workerLoop, startData);
  384. }
  385. UA_WorkItem processDelayed = {.type = UA_WORKITEMTYPE_METHODCALL,
  386. .work.methodCall = {.method = dispatchDelayedWork,
  387. .data = UA_NULL} };
  388. UA_Server_addRepeatedWorkItem(server, &processDelayed, 10000000, UA_NULL);
  389. #endif
  390. // 2) Start the networklayers
  391. for(UA_Int32 i=0;i<server->nlsSize;i++)
  392. server->nls[i].start(server->nls[i].nlHandle, &server->logger);
  393. //3) The loop
  394. while(1) {
  395. // 3.1) Process timed work
  396. UA_UInt16 timeout = processTimedWork(server);
  397. // 3.2) Get work from the networklayer and dispatch it
  398. for(UA_Int32 i=0;i<server->nlsSize;i++) {
  399. UA_ServerNetworkLayer *nl = &server->nls[i];
  400. UA_WorkItem *work;
  401. UA_Int32 workSize;
  402. if(*running) {
  403. if(i == server->nlsSize-1)
  404. workSize = nl->getWork(nl->nlHandle, &work, timeout);
  405. else
  406. workSize = nl->getWork(nl->nlHandle, &work, 0);
  407. } else {
  408. workSize = server->nls[i].stop(nl->nlHandle, &work);
  409. }
  410. #ifdef UA_MULTITHREADING
  411. // Filter out delayed work
  412. for(UA_Int32 k=0;k<workSize;k++) {
  413. if(work[k].type != UA_WORKITEMTYPE_DELAYEDMETHODCALL)
  414. continue;
  415. addDelayedWork(server, work[k]);
  416. work[k].type = UA_WORKITEMTYPE_NOTHING;
  417. }
  418. dispatchWork(server, workSize, work);
  419. if(workSize > 0)
  420. pthread_cond_broadcast(&server->dispatchQueue_condition);
  421. #else
  422. processWork(server, work, workSize);
  423. if(workSize > 0)
  424. UA_free(work);
  425. #endif
  426. }
  427. // 3.3) Exit?
  428. if(!*running)
  429. break;
  430. }
  431. #ifdef UA_MULTITHREADING
  432. // 4) Clean up: Wait until all worker threads finish, then empty the
  433. // dispatch queue, then process the remaining delayed work
  434. for(UA_UInt32 i=0;i<nThreads;i++) {
  435. pthread_join(thr[i], UA_NULL);
  436. UA_free(server->workerCounters[i]);
  437. }
  438. UA_free(server->workerCounters);
  439. UA_free(thr);
  440. emptyDispatchQueue(server);
  441. processDelayedWork(server);
  442. #endif
  443. return UA_STATUSCODE_GOOD;
  444. }