ua_server_worker.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. #include "ua_util.h"
  2. #include "ua_server_internal.h"
  3. /**
  4. * There are three types of work:
  5. *
  6. * 1. Ordinary WorkItems (that are dispatched to worker threads if
  7. * multithreading is activated)
  8. * 2. Timed work that is executed at a precise date (with an optional repetition
  9. * interval)
  10. * 3. Delayed work that is executed at a later time when it is guaranteed that
  11. * all previous work has actually finished (only for multithreading)
  12. */
  13. #define MAXTIMEOUT 50000 // max timeout in microsec until the next main loop iteration
  14. #define BATCHSIZE 20 // max size of worklists that are dispatched to workers
  15. static void processWork(UA_Server *server, UA_WorkItem *work, size_t workSize) {
  16. for(size_t i = 0; i < workSize; i++) {
  17. UA_WorkItem *item = &work[i];
  18. switch(item->type) {
  19. case UA_WORKITEMTYPE_BINARYMESSAGE:
  20. UA_Server_processBinaryMessage(server, item->work.binaryMessage.connection,
  21. &item->work.binaryMessage.message);
  22. item->work.binaryMessage.connection->releaseBuffer(item->work.binaryMessage.connection,
  23. &item->work.binaryMessage.message);
  24. break;
  25. case UA_WORKITEMTYPE_CLOSECONNECTION:
  26. UA_Connection_detachSecureChannel(item->work.closeConnection);
  27. item->work.closeConnection->close(item->work.closeConnection);
  28. break;
  29. case UA_WORKITEMTYPE_METHODCALL:
  30. case UA_WORKITEMTYPE_DELAYEDMETHODCALL:
  31. item->work.methodCall.method(server, item->work.methodCall.data);
  32. break;
  33. default:
  34. break;
  35. }
  36. }
  37. }
  38. /*******************************/
  39. /* Worker Threads and Dispatch */
  40. /*******************************/
  41. #ifdef UA_MULTITHREADING
  42. /** Entry in the dipatch queue */
  43. struct workListNode {
  44. struct cds_wfcq_node node; // node for the queue
  45. UA_UInt32 workSize;
  46. UA_WorkItem *work;
  47. };
  48. /** Dispatch work to workers. Slices the work up if it contains more than
  49. BATCHSIZE items. The work array is freed by the worker threads. */
  50. static void dispatchWork(UA_Server *server, UA_Int32 workSize, UA_WorkItem *work) {
  51. UA_Int32 startIndex = workSize; // start at the end
  52. while(workSize > 0) {
  53. UA_Int32 size = BATCHSIZE;
  54. if(size > workSize)
  55. size = workSize;
  56. startIndex = startIndex - size;
  57. struct workListNode *wln = UA_malloc(sizeof(struct workListNode));
  58. if(startIndex > 0) {
  59. UA_WorkItem *workSlice = UA_malloc(size * sizeof(UA_WorkItem));
  60. UA_memcpy(workSlice, &work[startIndex], size * sizeof(UA_WorkItem));
  61. *wln = (struct workListNode){.workSize = size, .work = workSlice};
  62. }
  63. else {
  64. // do not alloc, but forward the original array
  65. *wln = (struct workListNode){.workSize = size, .work = work};
  66. }
  67. cds_wfcq_node_init(&wln->node);
  68. cds_wfcq_enqueue(&server->dispatchQueue_head, &server->dispatchQueue_tail, &wln->node);
  69. workSize -= size;
  70. }
  71. }
  72. // throwaway struct to bring data into the worker threads
  73. struct workerStartData {
  74. UA_Server *server;
  75. UA_UInt32 **workerCounter;
  76. };
  77. /** Waits until work arrives in the dispatch queue (restart after 10ms) and
  78. processes it. */
  79. static void * workerLoop(struct workerStartData *startInfo) {
  80. rcu_register_thread();
  81. UA_UInt32 *c = UA_malloc(sizeof(UA_UInt32));
  82. uatomic_set(c, 0);
  83. *startInfo->workerCounter = c;
  84. UA_Server *server = startInfo->server;
  85. UA_free(startInfo);
  86. pthread_mutex_t mutex; // required for the condition variable
  87. pthread_mutex_init(&mutex,0);
  88. pthread_mutex_lock(&mutex);
  89. struct timespec to;
  90. while(*server->running) {
  91. struct workListNode *wln = (struct workListNode*)
  92. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  93. if(wln) {
  94. processWork(server, wln->work, wln->workSize);
  95. UA_free(wln->work);
  96. UA_free(wln);
  97. } else {
  98. clock_gettime(CLOCK_REALTIME, &to);
  99. to.tv_sec += 2;
  100. pthread_cond_timedwait(&server->dispatchQueue_condition, &mutex, &to);
  101. }
  102. uatomic_inc(c); // increase the workerCounter;
  103. }
  104. pthread_mutex_unlock(&mutex);
  105. pthread_mutex_destroy(&mutex);
  106. rcu_unregister_thread();
  107. return UA_NULL;
  108. }
  109. static void emptyDispatchQueue(UA_Server *server) {
  110. while(!cds_wfcq_empty(&server->dispatchQueue_head, &server->dispatchQueue_tail)) {
  111. struct workListNode *wln = (struct workListNode*)
  112. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  113. processWork(server, wln->work, wln->workSize);
  114. UA_free(wln->work);
  115. UA_free(wln);
  116. }
  117. }
  118. #endif
  119. /**************/
  120. /* Timed Work */
  121. /**************/
  122. /**
  123. * The TimedWork structure contains an array of workitems that are either executed at the same time
  124. * or in the same repetition inverval. The linked list is sorted, so we can stop traversing when the
  125. * first element has nextTime > now.
  126. */
  127. struct TimedWork {
  128. LIST_ENTRY(TimedWork) pointers;
  129. UA_DateTime nextTime;
  130. UA_UInt32 interval; ///> in 100ns resolution, 0 means no repetition
  131. size_t workSize;
  132. UA_WorkItem *work;
  133. UA_Guid workIds[];
  134. };
  135. /* Traverse the list until there is a TimedWork to which the item can be added or we reached the
  136. end. The item is copied into the TimedWork and not freed by this function. The interval is in
  137. 100ns resolution */
  138. static UA_StatusCode addTimedWork(UA_Server *server, const UA_WorkItem *item, UA_DateTime firstTime,
  139. UA_UInt32 interval, UA_Guid *resultWorkGuid) {
  140. struct TimedWork *matchingTw = UA_NULL; // add the item here
  141. struct TimedWork *lastTw = UA_NULL; // if there is no matchingTw, add a new TimedWork after this entry
  142. struct TimedWork *tempTw;
  143. /* search for matching entry */
  144. tempTw = LIST_FIRST(&server->timedWork);
  145. if(interval == 0) {
  146. /* single execution. the time needs to match */
  147. while(tempTw) {
  148. if(tempTw->nextTime >= firstTime) {
  149. if(tempTw->nextTime == firstTime)
  150. matchingTw = tempTw;
  151. break;
  152. }
  153. lastTw = tempTw;
  154. tempTw = LIST_NEXT(lastTw, pointers);
  155. }
  156. } else {
  157. /* repeated execution. the interval needs to match */
  158. while(tempTw) {
  159. if(interval == tempTw->interval) {
  160. matchingTw = tempTw;
  161. break;
  162. }
  163. if(tempTw->nextTime > firstTime)
  164. break;
  165. lastTw = tempTw;
  166. tempTw = LIST_NEXT(lastTw, pointers);
  167. }
  168. }
  169. if(matchingTw) {
  170. /* append to matching entry */
  171. matchingTw = UA_realloc(matchingTw, sizeof(struct TimedWork) + sizeof(UA_Guid)*(matchingTw->workSize + 1));
  172. if(!matchingTw)
  173. return UA_STATUSCODE_BADOUTOFMEMORY;
  174. if(matchingTw->pointers.le_next)
  175. matchingTw->pointers.le_next->pointers.le_prev = &matchingTw->pointers.le_next;
  176. if(matchingTw->pointers.le_prev)
  177. *matchingTw->pointers.le_prev = matchingTw;
  178. UA_WorkItem *newItems = UA_realloc(matchingTw->work, sizeof(UA_WorkItem)*(matchingTw->workSize + 1));
  179. if(!newItems)
  180. return UA_STATUSCODE_BADOUTOFMEMORY;
  181. matchingTw->work = newItems;
  182. } else {
  183. /* create a new entry */
  184. matchingTw = UA_malloc(sizeof(struct TimedWork) + sizeof(UA_Guid));
  185. if(!matchingTw)
  186. return UA_STATUSCODE_BADOUTOFMEMORY;
  187. matchingTw->work = UA_malloc(sizeof(UA_WorkItem));
  188. if(!matchingTw->work) {
  189. UA_free(matchingTw);
  190. return UA_STATUSCODE_BADOUTOFMEMORY;
  191. }
  192. matchingTw->workSize = 0;
  193. matchingTw->nextTime = firstTime;
  194. matchingTw->interval = interval;
  195. if(lastTw)
  196. LIST_INSERT_AFTER(lastTw, matchingTw, pointers);
  197. else
  198. LIST_INSERT_HEAD(&server->timedWork, matchingTw, pointers);
  199. }
  200. matchingTw->work[matchingTw->workSize] = *item;
  201. matchingTw->workSize++;
  202. /* create a guid for finding and deleting the timed work later on */
  203. if(resultWorkGuid) {
  204. matchingTw->workIds[matchingTw->workSize] = UA_Guid_random(&server->random_seed);
  205. *resultWorkGuid = matchingTw->workIds[matchingTw->workSize];
  206. }
  207. return UA_STATUSCODE_GOOD;
  208. }
  209. UA_StatusCode UA_Server_addTimedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_DateTime executionTime,
  210. UA_Guid *resultWorkGuid) {
  211. return addTimedWork(server, work, executionTime, 0, resultWorkGuid);
  212. }
  213. UA_StatusCode UA_Server_addRepeatedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_UInt32 interval,
  214. UA_Guid *resultWorkGuid) {
  215. return addTimedWork(server, work, UA_DateTime_now() + interval * 10000, interval * 10000, resultWorkGuid);
  216. }
  217. /** Dispatches timed work, returns the timeout until the next timed work in ms */
  218. static UA_UInt16 processTimedWork(UA_Server *server) {
  219. UA_DateTime current = UA_DateTime_now();
  220. struct TimedWork *next = LIST_FIRST(&server->timedWork);
  221. struct TimedWork *tw = UA_NULL;
  222. while(next) {
  223. tw = next;
  224. if(tw->nextTime > current)
  225. break;
  226. next = LIST_NEXT(tw, pointers);
  227. #ifdef UA_MULTITHREADING
  228. if(tw->interval > 0) {
  229. // copy the entry and insert at the new location
  230. UA_WorkItem *workCopy = (UA_WorkItem *) UA_malloc(sizeof(UA_WorkItem) * tw->workSize);
  231. UA_memcpy(workCopy, tw->work, sizeof(UA_WorkItem) * tw->workSize);
  232. dispatchWork(server, tw->workSize, workCopy); // frees the work pointer
  233. tw->nextTime += tw->interval;
  234. struct TimedWork *prevTw = tw; // after which tw do we insert?
  235. while(UA_TRUE) {
  236. struct TimedWork *n = LIST_NEXT(prevTw, pointers);
  237. if(!n || n->nextTime > tw->nextTime)
  238. break;
  239. prevTw = n;
  240. }
  241. if(prevTw != tw) {
  242. LIST_REMOVE(tw, pointers);
  243. LIST_INSERT_AFTER(prevTw, tw, pointers);
  244. }
  245. } else {
  246. dispatchWork(server, tw->workSize, tw->work); // frees the work pointer
  247. LIST_REMOVE(tw, pointers);
  248. UA_free(tw);
  249. }
  250. #else
  251. // 1) Process the work since it is past its due date
  252. processWork(server, tw->work, tw->workSize); // does not free the work ptr
  253. // 2) If the work is repeated, add it back into the list. Otherwise remove it.
  254. if(tw->interval > 0) {
  255. tw->nextTime += tw->interval;
  256. if(tw->nextTime < current)
  257. tw->nextTime = current;
  258. struct TimedWork *prevTw = tw;
  259. while(UA_TRUE) {
  260. struct TimedWork *n = LIST_NEXT(prevTw, pointers);
  261. if(!n || n->nextTime > tw->nextTime)
  262. break;
  263. prevTw = n;
  264. }
  265. if(prevTw != tw) {
  266. LIST_REMOVE(tw, pointers);
  267. LIST_INSERT_AFTER(prevTw, tw, pointers);
  268. }
  269. } else {
  270. LIST_REMOVE(tw, pointers);
  271. UA_free(tw->work);
  272. UA_free(tw);
  273. }
  274. #endif
  275. }
  276. // check if the next timed work is sooner than the usual timeout
  277. struct TimedWork *first = LIST_FIRST(&server->timedWork);
  278. UA_UInt16 timeout = MAXTIMEOUT;
  279. if(first) {
  280. timeout = (first->nextTime - current)/10;
  281. if(timeout > MAXTIMEOUT)
  282. return MAXTIMEOUT;
  283. }
  284. return timeout;
  285. }
  286. void UA_Server_deleteTimedWork(UA_Server *server) {
  287. struct TimedWork *current;
  288. struct TimedWork *next = LIST_FIRST(&server->timedWork);
  289. while(next) {
  290. current = next;
  291. next = LIST_NEXT(current, pointers);
  292. LIST_REMOVE(current, pointers);
  293. UA_free(current->work);
  294. UA_free(current);
  295. }
  296. }
  297. /****************/
  298. /* Delayed Work */
  299. /****************/
  300. #ifdef UA_MULTITHREADING
  301. #define DELAYEDWORKSIZE 100 // Collect delayed work until we have DELAYEDWORKSIZE items
  302. struct DelayedWork {
  303. struct DelayedWork *next;
  304. UA_UInt32 *workerCounters; // initially UA_NULL until a workitem gets the counters
  305. UA_UInt32 workItemsCount; // the size of the array is DELAYEDWORKSIZE, the count may be less
  306. UA_WorkItem *workItems; // when it runs full, a new delayedWork entry is created
  307. };
  308. // Dispatched as a methodcall-WorkItem when the delayedwork is added
  309. static void getCounters(UA_Server *server, struct DelayedWork *delayed) {
  310. UA_UInt32 *counters = UA_malloc(server->nThreads * sizeof(UA_UInt32));
  311. for(UA_UInt16 i = 0;i<server->nThreads;i++)
  312. counters[i] = *server->workerCounters[i];
  313. delayed->workerCounters = counters;
  314. }
  315. // Call from the main thread only. This is the only function that modifies
  316. // server->delayedWork. processDelayedWorkQueue modifies the "next" (after the
  317. // head).
  318. static void addDelayedWork(UA_Server *server, UA_WorkItem work) {
  319. struct DelayedWork *dw = server->delayedWork;
  320. if(!dw || dw->workItemsCount >= DELAYEDWORKSIZE) {
  321. struct DelayedWork *newwork = UA_malloc(sizeof(struct DelayedWork));
  322. newwork->workItems = UA_malloc(sizeof(UA_WorkItem)*DELAYEDWORKSIZE);
  323. newwork->workItemsCount = 0;
  324. newwork->workerCounters = UA_NULL;
  325. newwork->next = server->delayedWork;
  326. // dispatch a method that sets the counter
  327. if(dw && dw->workItemsCount >= DELAYEDWORKSIZE) {
  328. UA_WorkItem *setCounter = UA_malloc(sizeof(UA_WorkItem));
  329. *setCounter = (UA_WorkItem)
  330. {.type = UA_WORKITEMTYPE_METHODCALL,
  331. .work.methodCall = {.method = (void (*)(UA_Server*, void*))getCounters, .data = dw}};
  332. dispatchWork(server, 1, setCounter);
  333. }
  334. server->delayedWork = newwork;
  335. dw = newwork;
  336. }
  337. dw->workItems[dw->workItemsCount] = work;
  338. dw->workItemsCount++;
  339. }
  340. static void processDelayedWork(UA_Server *server) {
  341. struct DelayedWork *dw = server->delayedWork;
  342. while(dw) {
  343. processWork(server, dw->workItems, dw->workItemsCount);
  344. struct DelayedWork *next = dw->next;
  345. UA_free(dw->workerCounters);
  346. UA_free(dw->workItems);
  347. UA_free(dw);
  348. dw = next;
  349. }
  350. }
  351. // Execute this every N seconds (repeated work) to execute delayed work that is ready
  352. static void dispatchDelayedWork(UA_Server *server, void *data /* not used, but needed for the signature*/) {
  353. struct DelayedWork *dw = UA_NULL;
  354. struct DelayedWork *readydw = UA_NULL;
  355. struct DelayedWork *beforedw = server->delayedWork;
  356. // start at the second...
  357. if(beforedw)
  358. dw = beforedw->next;
  359. // find the first delayedwork where the counters are set and have been moved
  360. while(dw) {
  361. if(!dw->workerCounters) {
  362. beforedw = dw;
  363. dw = dw->next;
  364. continue;
  365. }
  366. UA_Boolean countersMoved = UA_TRUE;
  367. for(UA_UInt16 i=0;i<server->nThreads;i++) {
  368. if(*server->workerCounters[i] == dw->workerCounters[i])
  369. countersMoved = UA_FALSE;
  370. break;
  371. }
  372. if(countersMoved) {
  373. readydw = uatomic_xchg(&beforedw->next, UA_NULL);
  374. break;
  375. } else {
  376. beforedw = dw;
  377. dw = dw->next;
  378. }
  379. }
  380. // we have a ready entry. all afterwards are also ready
  381. while(readydw) {
  382. dispatchWork(server, readydw->workItemsCount, readydw->workItems);
  383. beforedw = readydw;
  384. readydw = readydw->next;
  385. UA_free(beforedw->workerCounters);
  386. UA_free(beforedw);
  387. }
  388. }
  389. #endif
  390. /********************/
  391. /* Main Server Loop */
  392. /********************/
  393. UA_StatusCode UA_Server_run_startup(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *running){
  394. #ifdef UA_MULTITHREADING
  395. // 1) Prepare the threads
  396. server->running = running; // the threads need to access the variable
  397. server->nThreads = nThreads;
  398. pthread_cond_init(&server->dispatchQueue_condition, 0);
  399. server->thr = UA_malloc(nThreads * sizeof(pthread_t));
  400. server->workerCounters = UA_malloc(nThreads * sizeof(UA_UInt32 *));
  401. for(UA_UInt32 i=0;i<nThreads;i++) {
  402. struct workerStartData *startData = UA_malloc(sizeof(struct workerStartData));
  403. startData->server = server;
  404. startData->workerCounter = &server->workerCounters[i];
  405. pthread_create(&server->thr[i], UA_NULL, (void* (*)(void*))workerLoop, startData);
  406. }
  407. UA_WorkItem processDelayed = {.type = UA_WORKITEMTYPE_METHODCALL,
  408. .work.methodCall = {.method = dispatchDelayedWork,
  409. .data = UA_NULL} };
  410. UA_Server_addRepeatedWorkItem(server, &processDelayed, 10000000, UA_NULL);
  411. #endif
  412. // 2) Start the networklayers
  413. for(size_t i = 0; i <server->networkLayersSize; i++)
  414. server->networkLayers[i].start(server->networkLayers[i].nlHandle, &server->logger);
  415. return UA_STATUSCODE_GOOD;
  416. }
  417. UA_StatusCode UA_Server_run_getAndProcessWork(UA_Server *server, UA_Boolean *running){
  418. // 3.1) Process timed work
  419. UA_UInt16 timeout = processTimedWork(server);
  420. // 3.2) Get work from the networklayer and dispatch it
  421. for(size_t i = 0; i < server->networkLayersSize; i++) {
  422. UA_ServerNetworkLayer *nl = &server->networkLayers[i];
  423. UA_WorkItem *work;
  424. UA_Int32 workSize;
  425. if(*running) {
  426. if(i == server->networkLayersSize-1)
  427. workSize = nl->getWork(nl->nlHandle, &work, timeout);
  428. else
  429. workSize = nl->getWork(nl->nlHandle, &work, 0);
  430. } else {
  431. workSize = server->networkLayers[i].stop(nl->nlHandle, &work);
  432. }
  433. #ifdef UA_MULTITHREADING
  434. // Filter out delayed work
  435. for(UA_Int32 k=0;k<workSize;k++) {
  436. if(work[k].type != UA_WORKITEMTYPE_DELAYEDMETHODCALL)
  437. continue;
  438. addDelayedWork(server, work[k]);
  439. work[k].type = UA_WORKITEMTYPE_NOTHING;
  440. }
  441. dispatchWork(server, workSize, work);
  442. if(workSize > 0)
  443. pthread_cond_broadcast(&server->dispatchQueue_condition);
  444. #else
  445. processWork(server, work, workSize);
  446. if(workSize > 0)
  447. UA_free(work);
  448. #endif
  449. }
  450. return UA_STATUSCODE_GOOD;
  451. }
  452. UA_StatusCode UA_Server_run_shutdown(UA_Server *server, UA_UInt16 nThreads){
  453. #ifdef UA_MULTITHREADING
  454. // 4) Clean up: Wait until all worker threads finish, then empty the
  455. // dispatch queue, then process the remaining delayed work
  456. for(UA_UInt32 i=0;i<nThreads;i++) {
  457. pthread_join(server->thr[i], UA_NULL);
  458. UA_free(server->workerCounters[i]);
  459. }
  460. UA_free(server->workerCounters);
  461. UA_free(server->thr);
  462. emptyDispatchQueue(server);
  463. processDelayedWork(server);
  464. #endif
  465. return UA_STATUSCODE_GOOD;
  466. }
  467. UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *running) {
  468. UA_Server_run_startup(server, nThreads, running);
  469. // 3) The loop
  470. while(1) {
  471. UA_Server_run_getAndProcessWork(server, running);
  472. // 3.3) Exit?
  473. if(!*running)
  474. break;
  475. }
  476. UA_Server_run_shutdown(server, nThreads);
  477. return UA_STATUSCODE_GOOD;
  478. }