ua_server_worker.c 18 KB


  1. #include "ua_util.h"
  2. #include "ua_server_internal.h"
  3. /**
  4. * There are three types of work:
  5. *
  6. * 1. Ordinary WorkItems (that are dispatched to worker threads if
  7. * multithreading is activated)
  8. * 2. Timed work that is executed at a precise date (with an optional repetition
  9. * interval)
  10. * 3. Delayed work that is executed at a later time when it is guaranteed that
  11. * all previous work has actually finished (only for multithreading)
  12. */
  13. #define MAXTIMEOUT 5000 // max timeout in microsec until the next main loop iteration
  14. #define BATCHSIZE 20 // max size of worklists that are dispatched to workers
  15. static void processWork(UA_Server *server, UA_WorkItem *work, size_t workSize) {
  16. for(size_t i = 0; i < workSize; i++) {
  17. UA_WorkItem *item = &work[i];
  18. switch(item->type) {
  19. case UA_WORKITEMTYPE_BINARYMESSAGE:
  20. UA_Server_processBinaryMessage(server, item->work.binaryMessage.connection,
  21. &item->work.binaryMessage.message);
  22. item->work.binaryMessage.connection->releaseBuffer(item->work.binaryMessage.connection,
  23. &item->work.binaryMessage.message);
  24. break;
  25. case UA_WORKITEMTYPE_CLOSECONNECTION:
  26. UA_Connection_detachSecureChannel(item->work.closeConnection);
  27. item->work.closeConnection->close(item->work.closeConnection);
  28. break;
  29. case UA_WORKITEMTYPE_METHODCALL:
  30. case UA_WORKITEMTYPE_DELAYEDMETHODCALL:
  31. item->work.methodCall.method(server, item->work.methodCall.data);
  32. break;
  33. default:
  34. break;
  35. }
  36. }
  37. }
  38. /*******************************/
  39. /* Worker Threads and Dispatch */
  40. /*******************************/
  41. #ifdef UA_MULTITHREADING
  42. /** Entry in the dipatch queue */
  43. struct workListNode {
  44. struct cds_wfcq_node node; // node for the queue
  45. UA_UInt32 workSize;
  46. UA_WorkItem *work;
  47. };
  48. /** Dispatch work to workers. Slices the work up if it contains more than
  49. BATCHSIZE items. The work array is freed by the worker threads. */
  50. static void dispatchWork(UA_Server *server, UA_Int32 workSize, UA_WorkItem *work) {
  51. UA_Int32 startIndex = workSize; // start at the end
  52. while(workSize > 0) {
  53. UA_Int32 size = BATCHSIZE;
  54. if(size > workSize)
  55. size = workSize;
  56. startIndex = startIndex - size;
  57. struct workListNode *wln = UA_malloc(sizeof(struct workListNode));
  58. if(startIndex > 0) {
  59. UA_WorkItem *workSlice = UA_malloc(size * sizeof(UA_WorkItem));
  60. UA_memcpy(workSlice, &work[startIndex], size * sizeof(UA_WorkItem));
  61. *wln = (struct workListNode){.workSize = size, .work = workSlice};
  62. }
  63. else {
  64. // do not alloc, but forward the original array
  65. *wln = (struct workListNode){.workSize = size, .work = work};
  66. }
  67. cds_wfcq_node_init(&wln->node);
  68. cds_wfcq_enqueue(&server->dispatchQueue_head, &server->dispatchQueue_tail, &wln->node);
  69. workSize -= size;
  70. }
  71. }
  72. // throwaway struct to bring data into the worker threads
  73. struct workerStartData {
  74. UA_Server *server;
  75. UA_UInt32 **workerCounter;
  76. };
  77. /** Waits until work arrives in the dispatch queue (restart after 10ms) and
  78. processes it. */
  79. static void * workerLoop(struct workerStartData *startInfo) {
  80. rcu_register_thread();
  81. UA_UInt32 *c = UA_malloc(sizeof(UA_UInt32));
  82. uatomic_set(c, 0);
  83. *startInfo->workerCounter = c;
  84. UA_Server *server = startInfo->server;
  85. UA_free(startInfo);
  86. pthread_mutex_t mutex; // required for the condition variable
  87. pthread_mutex_init(&mutex,0);
  88. pthread_mutex_lock(&mutex);
  89. struct timespec to;
  90. while(*server->running) {
  91. struct workListNode *wln = (struct workListNode*)
  92. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  93. if(wln) {
  94. processWork(server, wln->work, wln->workSize);
  95. UA_free(wln->work);
  96. UA_free(wln);
  97. } else {
  98. clock_gettime(CLOCK_REALTIME, &to);
  99. to.tv_sec += 2;
  100. pthread_cond_timedwait(&server->dispatchQueue_condition, &mutex, &to);
  101. }
  102. uatomic_inc(c); // increase the workerCounter;
  103. }
  104. pthread_mutex_unlock(&mutex);
  105. pthread_mutex_destroy(&mutex);
  106. rcu_unregister_thread();
  107. return UA_NULL;
  108. }
  109. static void emptyDispatchQueue(UA_Server *server) {
  110. while(!cds_wfcq_empty(&server->dispatchQueue_head, &server->dispatchQueue_tail)) {
  111. struct workListNode *wln = (struct workListNode*)
  112. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  113. processWork(server, wln->work, wln->workSize);
  114. UA_free(wln->work);
  115. UA_free(wln);
  116. }
  117. }
  118. #endif
  119. /**************/
  120. /* Timed Work */
  121. /**************/
  122. struct TimedWork {
  123. LIST_ENTRY(TimedWork) pointers;
  124. UA_DateTime nextTime;
  125. UA_UInt32 interval; ///> in ms resolution, 0 means no repetition
  126. size_t workSize;
  127. UA_WorkItem *work;
  128. UA_Guid workIds[];
  129. };
  130. /* The item is copied and not freed by this function. The interval is in 100ns (as UA_DateTime) */
  131. static UA_StatusCode addTimedWork(UA_Server *server, const UA_WorkItem *item, UA_DateTime firstTime,
  132. UA_UInt32 interval, UA_Guid *resultWorkGuid) {
  133. struct TimedWork *lastTw = UA_NULL, *matchingTw = UA_NULL;
  134. /* search for matching entry */
  135. if(interval == 0) {
  136. LIST_FOREACH(lastTw, &server->timedWork, pointers) {
  137. if(lastTw->nextTime == firstTime) {
  138. if(lastTw->nextTime == firstTime)
  139. matchingTw = lastTw;
  140. break;
  141. }
  142. }
  143. } else {
  144. LIST_FOREACH(matchingTw, &server->timedWork, pointers) {
  145. if(interval == matchingTw->interval)
  146. break;
  147. }
  148. }
  149. struct TimedWork *newWork;
  150. if(matchingTw) {
  151. /* append to matching entry */
  152. newWork = UA_realloc(matchingTw, sizeof(struct TimedWork) + sizeof(UA_Guid)*(matchingTw->workSize + 1));
  153. if(!newWork)
  154. return UA_STATUSCODE_BADOUTOFMEMORY;
  155. if(newWork->pointers.le_next)
  156. newWork->pointers.le_next->pointers.le_prev = &newWork->pointers.le_next;
  157. if(newWork->pointers.le_prev)
  158. *newWork->pointers.le_prev = newWork;
  159. UA_WorkItem *newItems = UA_realloc(newWork->work, sizeof(UA_WorkItem)*(matchingTw->workSize + 1));
  160. if(!newItems)
  161. return UA_STATUSCODE_BADOUTOFMEMORY;
  162. newWork->work = newItems;
  163. } else {
  164. /* create a new entry */
  165. newWork = UA_malloc(sizeof(struct TimedWork) + sizeof(UA_Guid));
  166. if(!newWork)
  167. return UA_STATUSCODE_BADOUTOFMEMORY;
  168. newWork->work = UA_malloc(sizeof(UA_WorkItem));
  169. if(!newWork->work) {
  170. UA_free(newWork);
  171. return UA_STATUSCODE_BADOUTOFMEMORY;
  172. }
  173. newWork->workSize = 0;
  174. newWork->nextTime = firstTime;
  175. newWork->interval = interval;
  176. if(lastTw)
  177. LIST_INSERT_AFTER(lastTw, newWork, pointers);
  178. else
  179. LIST_INSERT_HEAD(&server->timedWork, newWork, pointers);
  180. }
  181. if(resultWorkGuid) {
  182. newWork->workIds[newWork->workSize] = UA_Guid_random(&server->random_seed);
  183. *resultWorkGuid = newWork->workIds[matchingTw->workSize];
  184. }
  185. newWork->work[newWork->workSize] = *item;
  186. newWork->workSize++;
  187. return UA_STATUSCODE_GOOD;
  188. }
  189. // Currently, these functions need to get the server mutex, but should be sufficiently fast
  190. UA_StatusCode UA_Server_addTimedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_DateTime executionTime,
  191. UA_Guid *resultWorkGuid) {
  192. return addTimedWork(server, work, executionTime, 0, resultWorkGuid);
  193. }
  194. UA_StatusCode UA_Server_addRepeatedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_UInt32 interval,
  195. UA_Guid *resultWorkGuid) {
  196. return addTimedWork(server, work, UA_DateTime_now() + interval * 10000, interval * 10000, resultWorkGuid);
  197. }
  198. /** Dispatches timed work, returns the timeout until the next timed work in ms */
  199. static UA_UInt16 processTimedWork(UA_Server *server) {
  200. UA_DateTime current = UA_DateTime_now();
  201. struct TimedWork *next = LIST_FIRST(&server->timedWork);
  202. struct TimedWork *tw = UA_NULL;
  203. while(next) {
  204. tw = next;
  205. if(tw->nextTime > current)
  206. break;
  207. next = LIST_NEXT(tw, pointers);
  208. #ifdef UA_MULTITHREADING
  209. if(tw->interval > 0) {
  210. // copy the entry and insert at the new location
  211. UA_WorkItem *workCopy = UA_malloc(sizeof(UA_WorkItem) * tw->workSize);
  212. UA_memcpy(workCopy, tw->work, sizeof(UA_WorkItem) * tw->workSize);
  213. dispatchWork(server, tw->workSize, workCopy); // frees the work pointer
  214. tw->nextTime += tw->interval;
  215. struct TimedWork *prevTw = tw; // after which tw do we insert?
  216. while(UA_TRUE) {
  217. struct TimedWork *n = LIST_NEXT(prevTw, pointers);
  218. if(!n || n->nextTime > tw->nextTime)
  219. break;
  220. prevTw = n;
  221. }
  222. if(prevTw != tw) {
  223. LIST_REMOVE(tw, pointers);
  224. LIST_INSERT_AFTER(prevTw, tw, pointers);
  225. }
  226. } else {
  227. dispatchWork(server, tw->workSize, tw->work); // frees the work pointer
  228. LIST_REMOVE(tw, pointers);
  229. UA_free(tw);
  230. }
  231. #else
  232. // 1) Process the work since it is past its due date
  233. processWork(server, tw->work, tw->workSize); // does not free the work ptr
  234. // 2) If the work is repeated, add it back into the list. Otherwise remove it.
  235. if(tw->interval > 0) {
  236. tw->nextTime += tw->interval;
  237. if(tw->nextTime < current)
  238. tw->nextTime = current;
  239. struct TimedWork *prevTw = tw;
  240. while(UA_TRUE) {
  241. struct TimedWork *n = LIST_NEXT(prevTw, pointers);
  242. if(!n || n->nextTime > tw->nextTime)
  243. break;
  244. prevTw = n;
  245. }
  246. if(prevTw != tw) {
  247. LIST_REMOVE(tw, pointers);
  248. LIST_INSERT_AFTER(prevTw, tw, pointers);
  249. }
  250. } else {
  251. LIST_REMOVE(tw, pointers);
  252. UA_free(tw->work);
  253. UA_free(tw);
  254. }
  255. #endif
  256. }
  257. // check if the next timed work is sooner than the usual timeout
  258. struct TimedWork *first = LIST_FIRST(&server->timedWork);
  259. UA_UInt16 timeout = MAXTIMEOUT;
  260. if(first) {
  261. timeout = (first->nextTime - current)/10;
  262. if(timeout > MAXTIMEOUT)
  263. return MAXTIMEOUT;
  264. }
  265. return timeout;
  266. }
  267. void UA_Server_deleteTimedWork(UA_Server *server) {
  268. struct TimedWork *current;
  269. struct TimedWork *next = LIST_FIRST(&server->timedWork);
  270. while(next) {
  271. current = next;
  272. next = LIST_NEXT(current, pointers);
  273. LIST_REMOVE(current, pointers);
  274. UA_free(current->work);
  275. UA_free(current);
  276. }
  277. }
  278. /****************/
  279. /* Delayed Work */
  280. /****************/
  281. #ifdef UA_MULTITHREADING
  282. #define DELAYEDWORKSIZE 100 // Collect delayed work until we have DELAYEDWORKSIZE items
  283. struct DelayedWork {
  284. struct DelayedWork *next;
  285. UA_UInt32 *workerCounters; // initially UA_NULL until a workitem gets the counters
  286. UA_UInt32 workItemsCount; // the size of the array is DELAYEDWORKSIZE, the count may be less
  287. UA_WorkItem *workItems; // when it runs full, a new delayedWork entry is created
  288. };
  289. // Dispatched as a methodcall-WorkItem when the delayedwork is added
  290. static void getCounters(UA_Server *server, struct DelayedWork *delayed) {
  291. UA_UInt32 *counters = UA_malloc(server->nThreads * sizeof(UA_UInt32));
  292. for(UA_UInt16 i = 0;i<server->nThreads;i++)
  293. counters[i] = *server->workerCounters[i];
  294. delayed->workerCounters = counters;
  295. }
  296. // Call from the main thread only. This is the only function that modifies
  297. // server->delayedWork. processDelayedWorkQueue modifies the "next" (after the
  298. // head).
  299. static void addDelayedWork(UA_Server *server, UA_WorkItem work) {
  300. struct DelayedWork *dw = server->delayedWork;
  301. if(!dw || dw->workItemsCount >= DELAYEDWORKSIZE) {
  302. struct DelayedWork *newwork = UA_malloc(sizeof(struct DelayedWork));
  303. newwork->workItems = UA_malloc(sizeof(UA_WorkItem)*DELAYEDWORKSIZE);
  304. newwork->workItemsCount = 0;
  305. newwork->workerCounters = UA_NULL;
  306. newwork->next = server->delayedWork;
  307. // dispatch a method that sets the counter
  308. if(dw && dw->workItemsCount >= DELAYEDWORKSIZE) {
  309. UA_WorkItem *setCounter = UA_malloc(sizeof(UA_WorkItem));
  310. *setCounter = (UA_WorkItem)
  311. {.type = UA_WORKITEMTYPE_METHODCALL,
  312. .work.methodCall = {.method = (void (*)(UA_Server*, void*))getCounters, .data = dw}};
  313. dispatchWork(server, 1, setCounter);
  314. }
  315. server->delayedWork = newwork;
  316. dw = newwork;
  317. }
  318. dw->workItems[dw->workItemsCount] = work;
  319. dw->workItemsCount++;
  320. }
  321. static void processDelayedWork(UA_Server *server) {
  322. struct DelayedWork *dw = server->delayedWork;
  323. while(dw) {
  324. processWork(server, dw->workItems, dw->workItemsCount);
  325. struct DelayedWork *next = dw->next;
  326. UA_free(dw->workerCounters);
  327. UA_free(dw->workItems);
  328. UA_free(dw);
  329. dw = next;
  330. }
  331. }
  332. // Execute this every N seconds (repeated work) to execute delayed work that is ready
  333. static void dispatchDelayedWork(UA_Server *server, void *data /* not used, but needed for the signature*/) {
  334. struct DelayedWork *dw = UA_NULL;
  335. struct DelayedWork *readydw = UA_NULL;
  336. struct DelayedWork *beforedw = server->delayedWork;
  337. // start at the second...
  338. if(beforedw)
  339. dw = beforedw->next;
  340. // find the first delayedwork where the counters are set and have been moved
  341. while(dw) {
  342. if(!dw->workerCounters) {
  343. beforedw = dw;
  344. dw = dw->next;
  345. continue;
  346. }
  347. UA_Boolean countersMoved = UA_TRUE;
  348. for(UA_UInt16 i=0;i<server->nThreads;i++) {
  349. if(*server->workerCounters[i] == dw->workerCounters[i])
  350. countersMoved = UA_FALSE;
  351. break;
  352. }
  353. if(countersMoved) {
  354. readydw = uatomic_xchg(&beforedw->next, UA_NULL);
  355. break;
  356. } else {
  357. beforedw = dw;
  358. dw = dw->next;
  359. }
  360. }
  361. // we have a ready entry. all afterwards are also ready
  362. while(readydw) {
  363. dispatchWork(server, readydw->workItemsCount, readydw->workItems);
  364. beforedw = readydw;
  365. readydw = readydw->next;
  366. UA_free(beforedw->workerCounters);
  367. UA_free(beforedw);
  368. }
  369. }
  370. #endif
  371. /********************/
  372. /* Main Server Loop */
  373. /********************/
  374. UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *running) {
  375. #ifdef UA_MULTITHREADING
  376. // 1) Prepare the threads
  377. server->running = running; // the threads need to access the variable
  378. server->nThreads = nThreads;
  379. pthread_cond_init(&server->dispatchQueue_condition, 0);
  380. pthread_t *thr = UA_malloc(nThreads * sizeof(pthread_t));
  381. server->workerCounters = UA_malloc(nThreads * sizeof(UA_UInt32 *));
  382. for(UA_UInt32 i=0;i<nThreads;i++) {
  383. struct workerStartData *startData = UA_malloc(sizeof(struct workerStartData));
  384. startData->server = server;
  385. startData->workerCounter = &server->workerCounters[i];
  386. pthread_create(&thr[i], UA_NULL, (void* (*)(void*))workerLoop, startData);
  387. }
  388. UA_WorkItem processDelayed = {.type = UA_WORKITEMTYPE_METHODCALL,
  389. .work.methodCall = {.method = dispatchDelayedWork,
  390. .data = UA_NULL} };
  391. UA_Server_addRepeatedWorkItem(server, &processDelayed, 10000000, UA_NULL);
  392. #endif
  393. // 2) Start the networklayers
  394. for(size_t i = 0; i <server->networkLayersSize; i++)
  395. server->networkLayers[i].start(server->networkLayers[i].nlHandle, &server->logger);
  396. // 3) The loop
  397. while(1) {
  398. // 3.1) Process timed work
  399. UA_UInt16 timeout = processTimedWork(server);
  400. // 3.2) Get work from the networklayer and dispatch it
  401. for(size_t i = 0; i < server->networkLayersSize; i++) {
  402. UA_ServerNetworkLayer *nl = &server->networkLayers[i];
  403. UA_WorkItem *work;
  404. UA_Int32 workSize;
  405. if(*running) {
  406. if(i == server->networkLayersSize-1)
  407. workSize = nl->getWork(nl->nlHandle, &work, timeout);
  408. else
  409. workSize = nl->getWork(nl->nlHandle, &work, 0);
  410. } else {
  411. workSize = server->networkLayers[i].stop(nl->nlHandle, &work);
  412. }
  413. #ifdef UA_MULTITHREADING
  414. // Filter out delayed work
  415. for(UA_Int32 k=0;k<workSize;k++) {
  416. if(work[k].type != UA_WORKITEMTYPE_DELAYEDMETHODCALL)
  417. continue;
  418. addDelayedWork(server, work[k]);
  419. work[k].type = UA_WORKITEMTYPE_NOTHING;
  420. }
  421. dispatchWork(server, workSize, work);
  422. if(workSize > 0)
  423. pthread_cond_broadcast(&server->dispatchQueue_condition);
  424. #else
  425. processWork(server, work, workSize);
  426. if(workSize > 0)
  427. UA_free(work);
  428. #endif
  429. }
  430. // 3.3) Exit?
  431. if(!*running)
  432. break;
  433. }
  434. #ifdef UA_MULTITHREADING
  435. // 4) Clean up: Wait until all worker threads finish, then empty the
  436. // dispatch queue, then process the remaining delayed work
  437. for(UA_UInt32 i=0;i<nThreads;i++) {
  438. pthread_join(thr[i], UA_NULL);
  439. UA_free(server->workerCounters[i]);
  440. }
  441. UA_free(server->workerCounters);
  442. UA_free(thr);
  443. emptyDispatchQueue(server);
  444. processDelayedWork(server);
  445. #endif
  446. return UA_STATUSCODE_GOOD;
  447. }