ua_server_worker.c 19 KB


  1. #include <stdio.h>
  2. #define __USE_POSIX
  3. #define _XOPEN_SOURCE 500
  4. #define __USE_POSIX199309
  5. #ifndef WIN32
  6. #include <sys/time.h>
  7. #endif
  8. #include <time.h>
  9. #include "ua_server_internal.h"
  10. /**
  11. * There are three types of work:
  12. *
  13. * 1. Ordinary WorkItems (that are dispatched to worker threads if
  14. * multithreading is activated)
  15. *
  16. * 2. Timed work that is executed at a precise date (with an optional repetition
  17. * interval)
  18. *
  19. * 3. Delayed work that is executed at a later time when it is guaranteed that
  20. * all previous work has actually finished (only for multithreading)
  21. */
  22. #define MAXTIMEOUT 50000 // max timeout in usec until the next main loop iteration
  23. #define BATCHSIZE 20 // max size of worklists that are dispatched to workers
  24. static void processWork(UA_Server *server, const UA_WorkItem *work, UA_Int32 workSize) {
  25. for(UA_Int32 i = 0;i<workSize;i++) {
  26. const UA_WorkItem *item = &work[i];
  27. switch(item->type) {
  28. case UA_WORKITEMTYPE_BINARYNETWORKMESSAGE:
  29. UA_Server_processBinaryMessage(server, item->work.binaryNetworkMessage.connection,
  30. &item->work.binaryNetworkMessage.message);
  31. UA_free(item->work.binaryNetworkMessage.message.data);
  32. break;
  33. case UA_WORKITEMTYPE_METHODCALL:
  34. case UA_WORKITEMTYPE_DELAYEDMETHODCALL:
  35. item->work.methodCall.method(server, item->work.methodCall.data);
  36. break;
  37. default:
  38. break;
  39. }
  40. }
  41. }
  42. /*******************************/
  43. /* Worker Threads and Dispatch */
  44. /*******************************/
  45. #ifdef UA_MULTITHREADING
  46. /** Entry in the dipatch queue */
  47. struct workListNode {
  48. struct cds_wfcq_node node; // node for the queue
  49. UA_UInt32 workSize;
  50. UA_WorkItem *work;
  51. };
  52. /** Dispatch work to workers. Slices the work up if it contains more than
  53. BATCHSIZE items. The work array is freed by the worker threads. */
  54. static void dispatchWork(UA_Server *server, UA_Int32 workSize, UA_WorkItem *work) {
  55. UA_Int32 startIndex = workSize; // start at the end
  56. while(workSize > 0) {
  57. UA_Int32 size = BATCHSIZE;
  58. if(size > workSize)
  59. size = workSize;
  60. startIndex = startIndex - size;
  61. struct workListNode *wln = UA_malloc(sizeof(struct workListNode));
  62. if(startIndex > 0) {
  63. UA_WorkItem *workSlice = UA_malloc(size * sizeof(UA_WorkItem));
  64. UA_memcpy(workSlice, &work[startIndex], size * sizeof(UA_WorkItem));
  65. *wln = (struct workListNode){.workSize = size, .work = workSlice};
  66. }
  67. else {
  68. // do not alloc, but forward the original array
  69. *wln = (struct workListNode){.workSize = size, .work = work};
  70. }
  71. cds_wfcq_node_init(&wln->node);
  72. cds_wfcq_enqueue(&server->dispatchQueue_head, &server->dispatchQueue_tail, &wln->node);
  73. workSize -= size;
  74. }
  75. }
  76. // throwaway struct to bring data into the worker threads
  77. struct workerStartData {
  78. UA_Server *server;
  79. UA_UInt32 **workerCounter;
  80. };
  81. /** Waits until work arrives in the dispatch queue (restart after 10ms) and
  82. processes it. */
  83. static void * workerLoop(struct workerStartData *startInfo) {
  84. rcu_register_thread();
  85. UA_UInt32 *c = UA_malloc(sizeof(UA_UInt32));
  86. uatomic_set(c, 0);
  87. *startInfo->workerCounter = c;
  88. UA_Server *server = startInfo->server;
  89. UA_free(startInfo);
  90. pthread_mutex_t mutex; // required for the condition variable
  91. pthread_mutex_init(&mutex,0);
  92. pthread_mutex_lock(&mutex);
  93. struct timespec to;
  94. while(*server->running) {
  95. struct workListNode *wln = (struct workListNode*)
  96. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  97. if(wln) {
  98. processWork(server, wln->work, wln->workSize);
  99. UA_free(wln->work);
  100. UA_free(wln);
  101. } else {
  102. clock_gettime(CLOCK_REALTIME, &to);
  103. to.tv_sec += 2;
  104. pthread_cond_timedwait(&server->dispatchQueue_condition, &mutex, &to);
  105. }
  106. uatomic_inc(c); // increase the workerCounter;
  107. }
  108. pthread_mutex_unlock(&mutex);
  109. pthread_mutex_destroy(&mutex);
  110. rcu_unregister_thread();
  111. return UA_NULL;
  112. }
  113. static void emptyDispatchQueue(UA_Server *server) {
  114. while(!cds_wfcq_empty(&server->dispatchQueue_head, &server->dispatchQueue_tail)) {
  115. struct workListNode *wln = (struct workListNode*)
  116. cds_wfcq_dequeue_blocking(&server->dispatchQueue_head, &server->dispatchQueue_tail);
  117. processWork(server, wln->work, wln->workSize);
  118. UA_free(wln->work);
  119. UA_free(wln);
  120. }
  121. }
  122. #endif
  123. /**************/
  124. /* Timed Work */
  125. /**************/
  126. struct UA_TimedWork {
  127. LIST_ENTRY(UA_TimedWork) pointers;
  128. UA_UInt16 workSize;
  129. UA_WorkItem *work;
  130. UA_Guid *workIds;
  131. UA_DateTime time;
  132. UA_UInt32 repetitionInterval; // in 100ns resolution, 0 means no repetition
  133. };
  134. /* The item is copied and not freed by this function. */
  135. static UA_StatusCode addTimedWork(UA_Server *server, const UA_WorkItem *item, UA_DateTime firstTime,
  136. UA_UInt32 repetitionInterval, UA_Guid *resultWorkGuid) {
  137. UA_TimedWork *tw, *lastTw = UA_NULL;
  138. // search for matching entry
  139. LIST_FOREACH(tw, &server->timedWork, pointers) {
  140. if(tw->repetitionInterval == repetitionInterval &&
  141. (repetitionInterval > 0 || tw->time == firstTime))
  142. break; // found a matching entry
  143. if(tw->time > firstTime) {
  144. tw = UA_NULL; // not matchin entry exists
  145. lastTw = tw;
  146. break;
  147. }
  148. }
  149. if(tw) {
  150. // append to matching entry
  151. tw->workSize++;
  152. tw->work = UA_realloc(tw->work, sizeof(UA_WorkItem)*tw->workSize);
  153. tw->workIds = UA_realloc(tw->workIds, sizeof(UA_Guid)*tw->workSize);
  154. tw->work[tw->workSize-1] = *item;
  155. tw->workIds[tw->workSize-1] = UA_Guid_random(&server->random_seed);
  156. if(resultWorkGuid)
  157. *resultWorkGuid = tw->workIds[tw->workSize-1];
  158. return UA_STATUSCODE_GOOD;
  159. }
  160. // create a new entry
  161. if(!(tw = UA_malloc(sizeof(UA_TimedWork))))
  162. return UA_STATUSCODE_BADOUTOFMEMORY;
  163. tw->workSize = 1;
  164. tw->time = firstTime;
  165. tw->repetitionInterval = repetitionInterval;
  166. tw->work = UA_malloc(sizeof(UA_WorkItem));
  167. tw->work[0] = *item;
  168. tw->workIds = UA_malloc(sizeof(UA_Guid));
  169. tw->workIds[0] = UA_Guid_random(&server->random_seed);
  170. if(lastTw)
  171. LIST_INSERT_AFTER(lastTw, tw, pointers);
  172. else
  173. LIST_INSERT_HEAD(&server->timedWork, tw, pointers);
  174. if(resultWorkGuid)
  175. *resultWorkGuid = tw->workIds[0];
  176. return UA_STATUSCODE_GOOD;
  177. }
  178. // Currently, these functions need to get the server mutex, but should be sufficiently fast
  179. UA_StatusCode UA_Server_addTimedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_DateTime executionTime,
  180. UA_Guid *resultWorkGuid) {
  181. return addTimedWork(server, work, executionTime, 0, resultWorkGuid);
  182. }
  183. UA_StatusCode UA_Server_addRepeatedWorkItem(UA_Server *server, const UA_WorkItem *work, UA_UInt32 interval,
  184. UA_Guid *resultWorkGuid) {
  185. return addTimedWork(server, work, UA_DateTime_now() + interval, interval, resultWorkGuid);
  186. }
  187. /** Dispatches timed work, returns the timeout until the next timed work in ms */
  188. static UA_UInt16 processTimedWork(UA_Server *server) {
  189. UA_DateTime current = UA_DateTime_now();
  190. UA_TimedWork *next = LIST_FIRST(&server->timedWork);
  191. UA_TimedWork *tw = UA_NULL;
  192. while(next) {
  193. tw = next;
  194. if(tw->time > current)
  195. break;
  196. next = LIST_NEXT(tw, pointers);
  197. #ifdef UA_MULTITHREADING
  198. if(tw->repetitionInterval > 0) {
  199. // copy the entry and insert at the new location
  200. UA_WorkItem *workCopy = UA_malloc(sizeof(UA_WorkItem) * tw->workSize);
  201. UA_memcpy(workCopy, tw->work, sizeof(UA_WorkItem) * tw->workSize);
  202. dispatchWork(server, tw->workSize, workCopy); // frees the work pointer
  203. tw->time += tw->repetitionInterval;
  204. UA_TimedWork *prevTw = tw; // after which tw do we insert?
  205. while(UA_TRUE) {
  206. UA_TimedWork *n = LIST_NEXT(prevTw, pointers);
  207. if(!n || n->time > tw->time)
  208. break;
  209. prevTw = n;
  210. }
  211. if(prevTw != tw) {
  212. LIST_REMOVE(tw, pointers);
  213. LIST_INSERT_AFTER(prevTw, tw, pointers);
  214. }
  215. } else {
  216. dispatchWork(server, tw->workSize, tw->work); // frees the work pointer
  217. LIST_REMOVE(tw, pointers);
  218. UA_free(tw->workIds);
  219. UA_free(tw);
  220. }
  221. #else
  222. processWork(server,tw->work, tw->workSize); // does not free the work
  223. if(tw->repetitionInterval > 0) {
  224. tw->time += tw->repetitionInterval;
  225. UA_TimedWork *prevTw = tw;
  226. while(UA_TRUE) {
  227. UA_TimedWork *n = LIST_NEXT(prevTw, pointers);
  228. if(!n || n->time > tw->time)
  229. break;
  230. prevTw = n;
  231. }
  232. if(prevTw != tw) {
  233. LIST_REMOVE(tw, pointers);
  234. LIST_INSERT_AFTER(prevTw, tw, pointers);
  235. }
  236. } else {
  237. LIST_REMOVE(tw, pointers);
  238. UA_free(tw->work);
  239. UA_free(tw->workIds);
  240. UA_free(tw);
  241. }
  242. #endif
  243. }
  244. tw = LIST_FIRST(&server->timedWork);
  245. UA_UInt16 timeout = MAXTIMEOUT;
  246. if(tw){
  247. timeout = (tw->time - current)/10;
  248. if(timeout>MAXTIMEOUT)timeout = MAXTIMEOUT;
  249. }
  250. return timeout;
  251. }
  252. void UA_Server_deleteTimedWork(UA_Server *server) {
  253. UA_TimedWork *tw;
  254. while((tw = LIST_FIRST(&server->timedWork))) {
  255. LIST_REMOVE(tw, pointers);
  256. UA_free(tw->work);
  257. UA_free(tw->workIds);
  258. UA_free(tw);
  259. }
  260. }
  261. /****************/
  262. /* Delayed Work */
  263. /****************/
  264. #ifdef UA_MULTITHREADING
  265. #define DELAYEDWORKSIZE 100 // Collect delayed work until we have DELAYEDWORKSIZE items
  266. struct UA_DelayedWork {
  267. UA_DelayedWork *next;
  268. UA_UInt32 *workerCounters; // initially UA_NULL until a workitem gets the counters
  269. UA_UInt32 workItemsCount; // the size of the array is DELAYEDWORKSIZE, the count may be less
  270. UA_WorkItem *workItems; // when it runs full, a new delayedWork entry is created
  271. };
  272. // Dispatched as a methodcall-WorkItem when the delayedwork is added
  273. static void getCounters(UA_Server *server, UA_DelayedWork *delayed) {
  274. UA_UInt32 *counters = UA_malloc(server->nThreads * sizeof(UA_UInt32));
  275. for(UA_UInt16 i = 0;i<server->nThreads;i++)
  276. counters[i] = *server->workerCounters[i];
  277. delayed->workerCounters = counters;
  278. }
  279. // Call from the main thread only. This is the only function that modifies
  280. // server->delayedWork. processDelayedWorkQueue modifies the "next" (after the
  281. // head).
  282. static void addDelayedWork(UA_Server *server, UA_WorkItem work) {
  283. UA_DelayedWork *dw = server->delayedWork;
  284. if(!dw || dw->workItemsCount >= DELAYEDWORKSIZE) {
  285. UA_DelayedWork *newwork = UA_malloc(sizeof(UA_DelayedWork));
  286. newwork->workItems = UA_malloc(sizeof(UA_WorkItem)*DELAYEDWORKSIZE);
  287. newwork->workItemsCount = 0;
  288. newwork->workerCounters = UA_NULL;
  289. newwork->next = server->delayedWork;
  290. // dispatch a method that sets the counter
  291. if(dw && dw->workItemsCount >= DELAYEDWORKSIZE) {
  292. UA_WorkItem *setCounter = UA_malloc(sizeof(UA_WorkItem));
  293. *setCounter = (UA_WorkItem)
  294. {.type = UA_WORKITEMTYPE_METHODCALL,
  295. .work.methodCall = {.method = (void (*)(UA_Server*, void*))getCounters, .data = dw}};
  296. dispatchWork(server, 1, setCounter);
  297. }
  298. server->delayedWork = newwork;
  299. dw = newwork;
  300. }
  301. dw->workItems[dw->workItemsCount] = work;
  302. dw->workItemsCount++;
  303. }
  304. static void processDelayedWork(UA_Server *server) {
  305. UA_DelayedWork *dw = server->delayedWork;
  306. while(dw) {
  307. processWork(server, dw->workItems, dw->workItemsCount);
  308. UA_DelayedWork *next = dw->next;
  309. UA_free(dw->workerCounters);
  310. UA_free(dw->workItems);
  311. UA_free(dw);
  312. dw = next;
  313. }
  314. }
  315. // Execute this every N seconds (repeated work) to execute delayed work that is ready
  316. static void dispatchDelayedWork(UA_Server *server, void *data /* not used, but needed for the signature*/) {
  317. UA_DelayedWork *dw = UA_NULL;
  318. UA_DelayedWork *readydw = UA_NULL;
  319. UA_DelayedWork *beforedw = server->delayedWork;
  320. // start at the second...
  321. if(beforedw)
  322. dw = beforedw->next;
  323. // find the first delayedwork where the counters are set and have been moved
  324. while(dw) {
  325. if(!dw->workerCounters) {
  326. beforedw = dw;
  327. dw = dw->next;
  328. continue;
  329. }
  330. UA_Boolean countersMoved = UA_TRUE;
  331. for(UA_UInt16 i=0;i<server->nThreads;i++) {
  332. if(*server->workerCounters[i] == dw->workerCounters[i])
  333. countersMoved = UA_FALSE;
  334. break;
  335. }
  336. if(countersMoved) {
  337. readydw = uatomic_xchg(&beforedw->next, UA_NULL);
  338. break;
  339. } else {
  340. beforedw = dw;
  341. dw = dw->next;
  342. }
  343. }
  344. // we have a ready entry. all afterwards are also ready
  345. while(readydw) {
  346. dispatchWork(server, readydw->workItemsCount, readydw->workItems);
  347. beforedw = readydw;
  348. readydw = readydw->next;
  349. UA_free(beforedw->workerCounters);
  350. UA_free(beforedw);
  351. }
  352. }
  353. #endif
  354. /********************/
  355. /* Main Server Loop */
  356. /********************/
  357. UA_StatusCode UA_Server_run(UA_Server *server, UA_UInt16 nThreads, UA_Boolean *running) {
  358. #ifdef UA_MULTITHREADING
  359. // 1) Prepare the threads
  360. server->running = running; // the threads need to access the variable
  361. server->nThreads = nThreads;
  362. pthread_cond_init(&server->dispatchQueue_condition, 0);
  363. pthread_t *thr = UA_malloc(nThreads * sizeof(pthread_t));
  364. server->workerCounters = UA_malloc(nThreads * sizeof(UA_UInt32 *));
  365. for(UA_UInt32 i=0;i<nThreads;i++) {
  366. struct workerStartData *startData = UA_malloc(sizeof(struct workerStartData));
  367. startData->server = server;
  368. startData->workerCounter = &server->workerCounters[i];
  369. pthread_create(&thr[i], UA_NULL, (void* (*)(void*))workerLoop, startData);
  370. }
  371. UA_WorkItem processDelayed = {.type = UA_WORKITEMTYPE_METHODCALL,
  372. .work.methodCall = {.method = dispatchDelayedWork,
  373. .data = UA_NULL} };
  374. UA_Server_addRepeatedWorkItem(server, &processDelayed, 10000000, UA_NULL);
  375. #endif
  376. // 2a) Start the networklayers
  377. for(UA_Int32 i=0;i<server->nlsSize;i++)
  378. server->nls[i].start(server->nls[i].nlHandle, &server->logger);
  379. // 2b) Init server's meta-information
  380. //fill startTime
  381. server->startTime = UA_DateTime_now();
  382. //fill build date
  383. {
  384. static struct tm ct;
  385. ct.tm_year = (__DATE__[7] - '0') * 1000 + (__DATE__[8] - '0') * 100 + (__DATE__[9] - '0') * 10 + (__DATE__[10] - '0')- 1900;
  386. if (0) ;
  387. else if ((__DATE__[0]=='J') && (__DATE__[1]=='a') && (__DATE__[2]=='n')) ct.tm_mon = 1-1;
  388. else if ((__DATE__[0]=='F') && (__DATE__[1]=='e') && (__DATE__[2]=='b')) ct.tm_mon = 2-1;
  389. else if ((__DATE__[0]=='M') && (__DATE__[1]=='a') && (__DATE__[2]=='r')) ct.tm_mon = 3-1;
  390. else if ((__DATE__[0]=='A') && (__DATE__[1]=='p') && (__DATE__[2]=='r')) ct.tm_mon = 4-1;
  391. else if ((__DATE__[0]=='M') && (__DATE__[1]=='a') && (__DATE__[2]=='y')) ct.tm_mon = 5-1;
  392. else if ((__DATE__[0]=='J') && (__DATE__[1]=='u') && (__DATE__[2]=='n')) ct.tm_mon = 6-1;
  393. else if ((__DATE__[0]=='J') && (__DATE__[1]=='u') && (__DATE__[2]=='l')) ct.tm_mon = 7-1;
  394. else if ((__DATE__[0]=='A') && (__DATE__[1]=='u') && (__DATE__[2]=='g')) ct.tm_mon = 8-1;
  395. else if ((__DATE__[0]=='S') && (__DATE__[1]=='e') && (__DATE__[2]=='p')) ct.tm_mon = 9-1;
  396. else if ((__DATE__[0]=='O') && (__DATE__[1]=='c') && (__DATE__[2]=='t')) ct.tm_mon = 10-1;
  397. else if ((__DATE__[0]=='N') && (__DATE__[1]=='o') && (__DATE__[2]=='v')) ct.tm_mon = 11-1;
  398. else if ((__DATE__[0]=='D') && (__DATE__[1]=='e') && (__DATE__[2]=='c')) ct.tm_mon = 12-1;
  399. // special case to handle __DATE__ not inserting leading zero on day of month
  400. // if Day of month is less than 10 - it inserts a blank character
  401. // this results in a negative number for tm_mday
  402. if(__DATE__[4] == ' ')
  403. {
  404. ct.tm_mday = __DATE__[5]-'0';
  405. }
  406. else
  407. {
  408. ct.tm_mday = (__DATE__[4]-'0')*10 + (__DATE__[5]-'0');
  409. }
  410. ct.tm_hour = ((__TIME__[0] - '0') * 10 + __TIME__[1] - '0');
  411. ct.tm_min = ((__TIME__[3] - '0') * 10 + __TIME__[4] - '0');
  412. ct.tm_sec = ((__TIME__[6] - '0') * 10 + __TIME__[7] - '0');
  413. ct.tm_isdst = -1; // information is not available.
  414. //FIXME: next 3 lines are copy-pasted from ua_types.c
  415. #define UNIX_EPOCH_BIAS_SEC 11644473600LL // Number of seconds from 1 Jan. 1601 00:00 to 1 Jan 1970 00:00 UTC
  416. #define HUNDRED_NANOSEC_PER_USEC 10LL
  417. #define HUNDRED_NANOSEC_PER_SEC (HUNDRED_NANOSEC_PER_USEC * 1000000LL)
  418. server->buildDate = (mktime(&ct) + UNIX_EPOCH_BIAS_SEC) * HUNDRED_NANOSEC_PER_SEC;
  419. }
  420. //3) The loop
  421. while(1) {
  422. // 3.1) Process timed work
  423. UA_UInt16 timeout = processTimedWork(server);
  424. // 3.2) Get work from the networklayer and dispatch it
  425. for(UA_Int32 i=0;i<server->nlsSize;i++) {
  426. UA_ServerNetworkLayer *nl = &server->nls[i];
  427. UA_WorkItem *work;
  428. UA_Int32 workSize;
  429. if(*running) {
  430. if(i == server->nlsSize-1)
  431. workSize = nl->getWork(nl->nlHandle, &work, timeout);
  432. else
  433. workSize = nl->getWork(nl->nlHandle, &work, 0);
  434. } else {
  435. workSize = server->nls[i].stop(nl->nlHandle, &work);
  436. }
  437. #ifdef UA_MULTITHREADING
  438. // Filter out delayed work
  439. for(UA_Int32 k=0;k<workSize;k++) {
  440. if(work[k].type != UA_WORKITEMTYPE_DELAYEDMETHODCALL)
  441. continue;
  442. addDelayedWork(server, work[k]);
  443. work[k].type = UA_WORKITEMTYPE_NOTHING;
  444. }
  445. dispatchWork(server, workSize, work);
  446. if(workSize > 0)
  447. pthread_cond_broadcast(&server->dispatchQueue_condition);
  448. #else
  449. processWork(server, work, workSize);
  450. UA_free(work);
  451. #endif
  452. }
  453. // 3.3) Exit?
  454. if(!*running)
  455. break;
  456. }
  457. #ifdef UA_MULTITHREADING
  458. // 4) Clean up: Wait until all worker threads finish, then empty the
  459. // dispatch queue, then process the remaining delayed work
  460. for(UA_UInt32 i=0;i<nThreads;i++) {
  461. pthread_join(thr[i], UA_NULL);
  462. UA_free(server->workerCounters[i]);
  463. }
  464. UA_free(server->workerCounters);
  465. UA_free(thr);
  466. emptyDispatchQueue(server);
  467. processDelayedWork(server);
  468. #endif
  469. return UA_STATUSCODE_GOOD;
  470. }