ua_server_worker.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. #include "ua_util.h"
  2. #include "ua_server_internal.h"
  3. /**
  4. * There are three types of work:
  5. *
  6. * 1. Ordinary WorkItems (that are dispatched to worker threads if
  7. * multithreading is activated)
  8. * 2. Timed work that is executed at a precise date (with an optional repetition
  9. * interval)
  10. * 3. Delayed work that is executed at a later time when it is guaranteed that
  11. * all previous work has actually finished (only for multithreading)
  12. */
  13. #define MAXTIMEOUT 50000 // max timeout in usec until the next main loop iteration
  14. #define BATCHSIZE 20 // max size of worklists that are dispatched to workers
  15. static void processWork(UA_Server *server, UA_WorkItem *work, UA_Int32 workSize) {
  16. for(UA_Int32 i = 0; i < workSize; i++) {
  17. UA_WorkItem *item = &work[i];
  18. switch(item->type) {
  19. case UA_WORKITEMTYPE_BINARYMESSAGE:
  20. UA_Server_processBinaryMessage(server, item->work.binaryMessage.connection,
  21. &item->work.binaryMessage.message);
  22. item->work.binaryMessage.connection->releaseBuffer(item->work.binaryMessage.connection,
  23. &item->work.binaryMessage.message);
  24. break;
  25. case UA_WORKITEMTYPE_CLOSECONNECTION:
  26. UA_Connection_detachSecureChannel(item->work.closeConnection);
  27. item->work.closeConnection->close(item->work.closeConnection);
  28. break;
  29. case UA_WORKITEMTYPE_METHODCALL:
  30. case UA_WORKITEMTYPE_DELAYEDMETHODCALL:
  31. item->work.methodCall.method(server, item->work.methodCall.data);
  32. break;
  33. default:
  34. break;
  35. }
  36. }
  37. }
  38. /*******************************/
  39. /* Worker Threads and Dispatch */
  40. /*******************************/
  41. #ifdef UA_MULTITHREADING
  42. /** Entry in the dipatch queue */
  43. struct workListNode {
  44. struct cds_wfcq_node node; // node for the queue
  45. UA_UInt32 workSize;
  46. UA_WorkItem *work;
  47. };
  48. /** Dispatch work to workers. Slices the work up if it contains more than
  49. BATCHSIZE items. The work array is freed by the worker threads. */
  50. static void dispatchWork(UA_Server *server, UA_Int32 workSize, UA_WorkItem *work) {
  51. UA_Int32 startIndex = workSize; // start at the end
  52. while(workSize > 0) {
  53. UA_Int32 size = BATCHSIZE;
  54. if(size > workSize)
  55. size = workSize;
  56. startIndex = startIndex - size;
  57. struct workListNode *wln = UA_malloc(sizeof(struct workListNode));
  58. if(startIndex > 0) {
  59. UA_WorkItem *workSlice = UA_malloc(size * sizeof(UA_WorkItem));
  60. UA_memcpy(workSlice, &work[startIndex], size * sizeof(UA_WorkItem));
  61. *wln = (struct workListNode){.workSize = size, .work = workSlice};
  62. }
  63. else {
  64. // do not alloc, but forward the original array
  65. *wln = (struct workListNode){.workSize = size, .work = work};
  66. }
  67. cds_wfcq_node_init(&wln->node);
  68. cds_wfcq_enqueue(&server->dispatchQueue_head, &server->dispatchQueue_tail, &wln->node);
  69. workSize -= size;
  70. }
  71. }
  72. // throwaway struct to bring data into the worker threads
  73. struct workerStartData {
  74. UA_Server *server;
  75. UA_UInt32 **workerCounter;
  76. };
  77. /** Waits until work arrives in the dispatch queue (restart after 10ms) and
  78. processes it. */
  79. static void * workerLoop(struct workerStartData *startInfo) {
  80. rcu_register_thread();
  81. UA_UInt32 *c = UA_malloc(sizeof(UA_UInt32));
  82. uatomic_set(c, 0);
  83. *startInfo->workerCounter = c;
  84. UA_Server *server = startInfo->server;
  85. UA_free(startInfo);
  86. pthread_mutex_t mutex; // required for the condition variable
  87. pthread_mutex_init(&mutex,0);
  88. pthread_mutex_lock(&mutex);
  89. struct timespec to;
  90. while(*server->running) {
  91. struct workListNode *wln = (struct workListNode*)
  92. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  93. if(wln) {
  94. processWork(server, wln->work, wln->workSize);
  95. UA_free(wln->work);
  96. UA_free(wln);
  97. } else {
  98. clock_gettime(CLOCK_REALTIME, &to);
  99. to.tv_sec += 2;
  100. pthread_cond_timedwait(&server->dispatchQueue_condition, &mutex, &to);
  101. }
  102. uatomic_inc(c); // increase the workerCounter;
  103. }
  104. pthread_mutex_unlock(&mutex);
  105. pthread_mutex_destroy(&mutex);
  106. rcu_unregister_thread();
  107. return UA_NULL;
  108. }
  109. static void emptyDispatchQueue(UA_Server *server) {
  110. while(!cds_wfcq_empty(&server->dispatchQueue_head, &server->dispatchQueue_tail)) {
  111. struct workListNode *wln = (struct workListNode*)
  112. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  113. processWork(server, wln->work, wln->workSize);
  114. UA_free(wln->work);
  115. UA_free(wln);
  116. }
  117. }
  118. #endif
  119. /**************/
  120. /* Timed Work */
  121. /**************/
  122. struct TimedWork {
  123. LIST_ENTRY(TimedWork) pointers;
  124. UA_DateTime nextTime;
  125. UA_UInt32 interval; ///> in ms resolution, 0 means no repetition
  126. size_t workSize;
  127. struct {
  128. UA_WorkItem work;
  129. UA_Guid workId;
  130. } work[];
  131. };
  132. /* The item is copied and not freed by this function. */
  133. static UA_StatusCode addTimedWork(UA_Server *server, const UA_WorkItem *item, UA_DateTime firstTime,
  134. UA_UInt32 repetitionInterval, UA_Guid *resultWorkGuid) {
  135. struct TimedWork *lastTw = UA_NULL, *matchingTw = UA_NULL;
  136. /* search for matching entry */
  137. if(repetitionInterval == 0) {
  138. LIST_FOREACH(lastTw, &server->timedWork, pointers) {
  139. if(lastTw->nextTime == firstTime) {
  140. if(lastTw->nextTime == firstTime)
  141. matchingTw = lastTw;
  142. break;
  143. }
  144. }
  145. } else {
  146. LIST_FOREACH(matchingTw, &server->timedWork, pointers) {
  147. if(repetitionInterval == matchingTw->interval)
  148. break;
  149. }
  150. }
  151. struct TimedWork *newWork;
  152. if(matchingTw) {
  153. /* append to matching entry */
  154. newWork = UA_realloc(matchingTw, sizeof(struct TimedWork) + (sizeof(UA_WorkItem)*matchingTw->workSize + 1));
  155. if(!newWork)
  156. return UA_STATUSCODE_BADOUTOFMEMORY;
  157. if(newWork->pointers.le_next)
  158. newWork->pointers.le_next->pointers.le_prev = &newWork->pointers.le_next;
  159. if(newWork->pointers.le_prev)
  160. *newWork->pointers.le_prev = newWork;
  161. } else {
  162. /* create a new entry */
  163. newWork = UA_malloc(sizeof(struct TimedWork) + sizeof(UA_WorkItem));
  164. if(!newWork)
  165. return UA_STATUSCODE_BADOUTOFMEMORY;
  166. newWork->workSize = 0;
  167. newWork->nextTime = firstTime;
  168. newWork->interval = repetitionInterval;
  169. if(lastTw)
  170. LIST_INSERT_AFTER(lastTw, newWork, pointers);
  171. else
  172. LIST_INSERT_HEAD(&server->timedWork, newWork, pointers);
  173. }
  174. newWork->work[newWork->workSize].work = *item;
  175. if(resultWorkGuid) {
  176. newWork->work[newWork->workSize].workId = UA_Guid_random(&server->random_seed);
  177. *resultWorkGuid = newWork->work[matchingTw->workSize - 1].workId;
  178. }
  179. newWork->workSize++;
  180. return UA_STATUSCODE_GOOD;
  181. }
  182. // Currently, these functions need to get the server mutex, but should be sufficiently fast
  183. UA_StatusCode UA_Server_addTimedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_DateTime executionTime,
  184. UA_Guid *resultWorkGuid) {
  185. return addTimedWork(server, work, executionTime, 0, resultWorkGuid);
  186. }
  187. UA_StatusCode UA_Server_addRepeatedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_UInt32 interval,
  188. UA_Guid *resultWorkGuid) {
  189. return addTimedWork(server, work, UA_DateTime_now() + interval * 1000, interval * 1000, resultWorkGuid);
  190. }
  191. /** Dispatches timed work, returns the timeout until the next timed work in ms */
  192. static UA_UInt16 processTimedWork(UA_Server *server) {
  193. UA_DateTime current = UA_DateTime_now();
  194. struct TimedWork *next = LIST_FIRST(&server->timedWork);
  195. struct TimedWork *tw = UA_NULL;
  196. while(next) {
  197. tw = next;
  198. if(tw->nextTime > current)
  199. break;
  200. next = LIST_NEXT(tw, pointers);
  201. #ifdef UA_MULTITHREADING
  202. if(tw->repetitionInterval > 0) {
  203. // copy the entry and insert at the new location
  204. UA_WorkItem *workCopy = (UA_WorkItem *) UA_malloc(sizeof(UA_WorkItem) * tw->workSize);
  205. UA_memcpy(workCopy, tw->work, sizeof(UA_WorkItem) * tw->workSize);
  206. dispatchWork(server, tw->workSize, workCopy); // frees the work pointer
  207. tw->time += tw->repetitionInterval;
  208. struct TimedWork *prevTw = tw; // after which tw do we insert?
  209. while(UA_TRUE) {
  210. struct TimedWork *n = LIST_NEXT(prevTw, pointers);
  211. if(!n || n->time > tw->time)
  212. break;
  213. prevTw = n;
  214. }
  215. if(prevTw != tw) {
  216. LIST_REMOVE(tw, pointers);
  217. LIST_INSERT_AFTER(prevTw, tw, pointers);
  218. }
  219. } else {
  220. dispatchWork(server, tw->workSize, tw->work); // frees the work pointer
  221. LIST_REMOVE(tw, pointers);
  222. UA_free(tw->workIds);
  223. UA_free(tw);
  224. }
  225. #else
  226. // 1) Process the work since it is past its due date
  227. processWork(server, (UA_WorkItem *) tw->work, tw->workSize); // does not free the work
  228. // 2) If the work is repeated, add it back into the list. Otherwise remove it.
  229. if(tw->interval > 0) {
  230. tw->nextTime += tw->interval * 10;
  231. struct TimedWork *prevTw = tw;
  232. while(UA_TRUE) {
  233. struct TimedWork *n = LIST_NEXT(prevTw, pointers);
  234. if(!n || n->nextTime > tw->nextTime)
  235. break;
  236. prevTw = n;
  237. }
  238. if(prevTw != tw) {
  239. LIST_REMOVE(tw, pointers);
  240. LIST_INSERT_AFTER(prevTw, tw, pointers);
  241. }
  242. } else {
  243. LIST_REMOVE(tw, pointers);
  244. UA_free(tw);
  245. }
  246. #endif
  247. }
  248. // check if the next timed work is sooner than the usual timeout
  249. struct TimedWork *first = LIST_FIRST(&server->timedWork);
  250. UA_UInt16 timeout = MAXTIMEOUT;
  251. if(first) {
  252. timeout = (first->nextTime - current)/10000;
  253. if(timeout > MAXTIMEOUT)
  254. return MAXTIMEOUT;
  255. }
  256. return timeout;
  257. }
  258. void UA_Server_deleteTimedWork(UA_Server *server) {
  259. struct TimedWork *current;
  260. struct TimedWork *next = LIST_FIRST(&server->timedWork);
  261. while(next) {
  262. current = next;
  263. next = LIST_NEXT(current, pointers);
  264. LIST_REMOVE(current, pointers);
  265. UA_free(current);
  266. }
  267. }
  268. /****************/
  269. /* Delayed Work */
  270. /****************/
  271. #ifdef UA_MULTITHREADING
  272. #define DELAYEDWORKSIZE 100 // Collect delayed work until we have DELAYEDWORKSIZE items
  273. struct DelayedWork {
  274. struct DelayedWork *next;
  275. UA_UInt32 *workerCounters; // initially UA_NULL until a workitem gets the counters
  276. UA_UInt32 workItemsCount; // the size of the array is DELAYEDWORKSIZE, the count may be less
  277. UA_WorkItem *workItems; // when it runs full, a new delayedWork entry is created
  278. };
  279. // Dispatched as a methodcall-WorkItem when the delayedwork is added
  280. static void getCounters(UA_Server *server, DelayedWork *delayed) {
  281. UA_UInt32 *counters = UA_malloc(server->nThreads * sizeof(UA_UInt32));
  282. for(UA_UInt16 i = 0;i<server->nThreads;i++)
  283. counters[i] = *server->workerCounters[i];
  284. delayed->workerCounters = counters;
  285. }
  286. // Call from the main thread only. This is the only function that modifies
  287. // server->delayedWork. processDelayedWorkQueue modifies the "next" (after the
  288. // head).
  289. static void addDelayedWork(UA_Server *server, UA_WorkItem work) {
  290. struct DelayedWork *dw = server->delayedWork;
  291. if(!dw || dw->workItemsCount >= DELAYEDWORKSIZE) {
  292. struct DelayedWork *newwork = UA_malloc(sizeof(DelayedWork));
  293. newwork->workItems = UA_malloc(sizeof(UA_WorkItem)*DELAYEDWORKSIZE);
  294. newwork->workItemsCount = 0;
  295. newwork->workerCounters = UA_NULL;
  296. newwork->next = server->delayedWork;
  297. // dispatch a method that sets the counter
  298. if(dw && dw->workItemsCount >= DELAYEDWORKSIZE) {
  299. UA_WorkItem *setCounter = UA_malloc(sizeof(UA_WorkItem));
  300. *setCounter = (UA_WorkItem)
  301. {.type = UA_WORKITEMTYPE_METHODCALL,
  302. .work.methodCall = {.method = (void (*)(UA_Server*, void*))getCounters, .data = dw}};
  303. dispatchWork(server, 1, setCounter);
  304. }
  305. server->delayedWork = newwork;
  306. dw = newwork;
  307. }
  308. dw->workItems[dw->workItemsCount] = work;
  309. dw->workItemsCount++;
  310. }
  311. static void processDelayedWork(UA_Server *server) {
  312. struct DelayedWork *dw = server->delayedWork;
  313. while(dw) {
  314. processWork(server, dw->workItems, dw->workItemsCount);
  315. struct DelayedWork *next = dw->next;
  316. UA_free(dw->workerCounters);
  317. UA_free(dw->workItems);
  318. UA_free(dw);
  319. dw = next;
  320. }
  321. }
  322. // Execute this every N seconds (repeated work) to execute delayed work that is ready
  323. static void dispatchDelayedWork(UA_Server *server, void *data /* not used, but needed for the signature*/) {
  324. struct DelayedWork *dw = UA_NULL;
  325. struct DelayedWork *readydw = UA_NULL;
  326. struct DelayedWork *beforedw = server->delayedWork;
  327. // start at the second...
  328. if(beforedw)
  329. dw = beforedw->next;
  330. // find the first delayedwork where the counters are set and have been moved
  331. while(dw) {
  332. if(!dw->workerCounters) {
  333. beforedw = dw;
  334. dw = dw->next;
  335. continue;
  336. }
  337. UA_Boolean countersMoved = UA_TRUE;
  338. for(UA_UInt16 i=0;i<server->nThreads;i++) {
  339. if(*server->workerCounters[i] == dw->workerCounters[i])
  340. countersMoved = UA_FALSE;
  341. break;
  342. }
  343. if(countersMoved) {
  344. readydw = uatomic_xchg(&beforedw->next, UA_NULL);
  345. break;
  346. } else {
  347. beforedw = dw;
  348. dw = dw->next;
  349. }
  350. }
  351. // we have a ready entry. all afterwards are also ready
  352. while(readydw) {
  353. dispatchWork(server, readydw->workItemsCount, readydw->workItems);
  354. beforedw = readydw;
  355. readydw = readydw->next;
  356. UA_free(beforedw->workerCounters);
  357. UA_free(beforedw);
  358. }
  359. }
  360. #endif
  361. /********************/
  362. /* Main Server Loop */
  363. /********************/
  364. UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *running) {
  365. #ifdef UA_MULTITHREADING
  366. // 1) Prepare the threads
  367. server->running = running; // the threads need to access the variable
  368. server->nThreads = nThreads;
  369. pthread_cond_init(&server->dispatchQueue_condition, 0);
  370. pthread_t *thr = UA_malloc(nThreads * sizeof(pthread_t));
  371. server->workerCounters = UA_malloc(nThreads * sizeof(UA_UInt32 *));
  372. for(UA_UInt32 i=0;i<nThreads;i++) {
  373. struct workerStartData *startData = UA_malloc(sizeof(struct workerStartData));
  374. startData->server = server;
  375. startData->workerCounter = &server->workerCounters[i];
  376. pthread_create(&thr[i], UA_NULL, (void* (*)(void*))workerLoop, startData);
  377. }
  378. UA_WorkItem processDelayed = {.type = UA_WORKITEMTYPE_METHODCALL,
  379. .work.methodCall = {.method = dispatchDelayedWork,
  380. .data = UA_NULL} };
  381. UA_Server_addRepeatedWorkItem(server, &processDelayed, 10000000, UA_NULL);
  382. #endif
  383. // 2) Start the networklayers
  384. for(size_t i = 0; i <server->networkLayersSize; i++)
  385. server->networkLayers[i].start(server->networkLayers[i].nlHandle, &server->logger);
  386. //3) The loop
  387. while(1) {
  388. // 3.1) Process timed work
  389. UA_UInt16 timeout = processTimedWork(server);
  390. // 3.2) Get work from the networklayer and dispatch it
  391. for(size_t i = 0; i < server->networkLayersSize; i++) {
  392. UA_ServerNetworkLayer *nl = &server->networkLayers[i];
  393. UA_WorkItem *work;
  394. UA_Int32 workSize;
  395. if(*running) {
  396. if(i == server->networkLayersSize-1)
  397. workSize = nl->getWork(nl->nlHandle, &work, timeout);
  398. else
  399. workSize = nl->getWork(nl->nlHandle, &work, 0);
  400. } else {
  401. workSize = server->networkLayers[i].stop(nl->nlHandle, &work);
  402. }
  403. #ifdef UA_MULTITHREADING
  404. // Filter out delayed work
  405. for(UA_Int32 k=0;k<workSize;k++) {
  406. if(work[k].type != UA_WORKITEMTYPE_DELAYEDMETHODCALL)
  407. continue;
  408. addDelayedWork(server, work[k]);
  409. work[k].type = UA_WORKITEMTYPE_NOTHING;
  410. }
  411. dispatchWork(server, workSize, work);
  412. if(workSize > 0)
  413. pthread_cond_broadcast(&server->dispatchQueue_condition);
  414. #else
  415. processWork(server, work, workSize);
  416. if(workSize > 0)
  417. UA_free(work);
  418. #endif
  419. }
  420. // 3.3) Exit?
  421. if(!*running)
  422. break;
  423. }
  424. #ifdef UA_MULTITHREADING
  425. // 4) Clean up: Wait until all worker threads finish, then empty the
  426. // dispatch queue, then process the remaining delayed work
  427. for(UA_UInt32 i=0;i<nThreads;i++) {
  428. pthread_join(thr[i], UA_NULL);
  429. UA_free(server->workerCounters[i]);
  430. }
  431. UA_free(server->workerCounters);
  432. UA_free(thr);
  433. emptyDispatchQueue(server);
  434. processDelayedWork(server);
  435. #endif
  436. return UA_STATUSCODE_GOOD;
  437. }