ua_timer.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  7. */
  8. #include "ua_util_internal.h"
  9. #include "ua_timer.h"
  10. /* Only one thread operates on the repeated jobs. This is usually the "main"
  11. * thread with the event loop. All other threads introduce changes via a
  12. * multi-producer single-consumer (MPSC) queue. The queue is based on a design
  13. * by Dmitry Vyukov.
  14. * http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
  15. *
  16. * The RepeatedCallback structure is used both in the sorted list of callbacks
  17. * and in the MPSC changes queue. For the changes queue, we differentiate
  18. * between three cases encoded in the callback pointer.
  19. *
  20. * callback > 0x01: add the new repeated callback to the sorted list
  21. * callback == 0x00: remove the callback with the same id
  22. * callback == 0x01: change the interval of the existing callback */
  23. #define REMOVE_SENTINEL 0x00
  24. #define CHANGE_SENTINEL 0x01
  25. struct UA_TimerCallbackEntry {
  26. SLIST_ENTRY(UA_TimerCallbackEntry) next; /* Next element in the list */
  27. UA_DateTime nextTime; /* The next time when the callbacks
  28. * are to be executed */
  29. UA_UInt64 interval; /* Interval in 100ns resolution */
  30. UA_UInt64 id; /* Id of the repeated callback */
  31. UA_TimerCallback callback;
  32. void *data;
  33. };
  34. void
  35. UA_Timer_init(UA_Timer *t) {
  36. SLIST_INIT(&t->repeatedCallbacks);
  37. t->changes_head = (UA_TimerCallbackEntry*)&t->changes_stub;
  38. t->changes_tail = (UA_TimerCallbackEntry*)&t->changes_stub;
  39. t->changes_stub = NULL;
  40. t->idCounter = 0;
  41. }
  42. static void
  43. enqueueChange(UA_Timer *t, UA_TimerCallbackEntry *tc) {
  44. tc->next.sle_next = NULL;
  45. UA_TimerCallbackEntry *prev = (UA_TimerCallbackEntry*)
  46. UA_atomic_xchg((void * volatile *)&t->changes_head, tc);
  47. /* Nothing can be dequeued while the producer is blocked here */
  48. prev->next.sle_next = tc; /* Once this change is visible in the consumer,
  49. * the node is dequeued in the following
  50. * iteration */
  51. }
  52. static UA_TimerCallbackEntry *
  53. dequeueChange(UA_Timer *t) {
  54. UA_TimerCallbackEntry *tail = t->changes_tail;
  55. UA_TimerCallbackEntry *next = tail->next.sle_next;
  56. if(tail == (UA_TimerCallbackEntry*)&t->changes_stub) {
  57. if(!next)
  58. return NULL;
  59. t->changes_tail = next;
  60. tail = next;
  61. next = next->next.sle_next;
  62. }
  63. if(next) {
  64. t->changes_tail = next;
  65. return tail;
  66. }
  67. UA_TimerCallbackEntry* head = t->changes_head;
  68. if(tail != head)
  69. return NULL;
  70. enqueueChange(t, (UA_TimerCallbackEntry*)&t->changes_stub);
  71. next = tail->next.sle_next;
  72. if(next) {
  73. t->changes_tail = next;
  74. return tail;
  75. }
  76. return NULL;
  77. }
  78. /* Adding repeated callbacks: Add an entry with the "nextTime" timestamp in the
  79. * future. This will be picked up in the next iteration and inserted at the
  80. * correct place. So that the next execution takes place ät "nextTime". */
  81. UA_StatusCode
  82. UA_Timer_addRepeatedCallback(UA_Timer *t, UA_TimerCallback callback,
  83. void *data, UA_UInt32 interval,
  84. UA_UInt64 *callbackId) {
  85. /* A callback method needs to be present */
  86. if(!callback)
  87. return UA_STATUSCODE_BADINTERNALERROR;
  88. /* The interval needs to be at least 5ms */
  89. if(interval < 5)
  90. return UA_STATUSCODE_BADINTERNALERROR;
  91. /* Allocate the repeated callback structure */
  92. UA_TimerCallbackEntry *tc =
  93. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  94. if(!tc)
  95. return UA_STATUSCODE_BADOUTOFMEMORY;
  96. /* Set the repeated callback */
  97. tc->interval = (UA_UInt64)interval * UA_DATETIME_MSEC;
  98. tc->id = ++t->idCounter;
  99. tc->callback = callback;
  100. tc->data = data;
  101. tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
  102. /* Set the output identifier */
  103. if(callbackId)
  104. *callbackId = tc->id;
  105. /* Enqueue the changes in the MPSC queue */
  106. enqueueChange(t, tc);
  107. return UA_STATUSCODE_GOOD;
  108. }
  109. static void
  110. addTimerCallbackEntry(UA_Timer *t, UA_TimerCallbackEntry * UA_RESTRICT tc) {
  111. /* Find the last entry before this callback */
  112. UA_TimerCallbackEntry *tmpTc, *afterTc = NULL;
  113. SLIST_FOREACH(tmpTc, &t->repeatedCallbacks, next) {
  114. if(tmpTc->nextTime >= tc->nextTime)
  115. break;
  116. /* The goal is to have many repeated callbacks with the same repetition
  117. * interval in a "block" in order to reduce linear search for re-entry
  118. * to the sorted list after processing. Allow the first execution to lie
  119. * between "nextTime - 1s" and "nextTime" if this adjustment groups
  120. * callbacks with the same repetition interval.
  121. * Callbacks of a block are added in reversed order. This design allows
  122. * the monitored items of a subscription (if created in a sequence with the
  123. * same publish/sample interval) to be executed before the subscription
  124. * publish the notifications */
  125. if(tmpTc->interval == tc->interval &&
  126. tmpTc->nextTime > (tc->nextTime - UA_DATETIME_SEC)) {
  127. tc->nextTime = tmpTc->nextTime;
  128. break;
  129. }
  130. /* tc is neither in the same interval nor supposed to be executed sooner
  131. * than tmpTc. Update afterTc to push tc further back in the timer list. */
  132. afterTc = tmpTc;
  133. }
  134. /* Add the repeated callback */
  135. if(afterTc)
  136. SLIST_INSERT_AFTER(afterTc, tc, next);
  137. else
  138. SLIST_INSERT_HEAD(&t->repeatedCallbacks, tc, next);
  139. }
  140. UA_StatusCode
  141. UA_Timer_changeRepeatedCallbackInterval(UA_Timer *t, UA_UInt64 callbackId,
  142. UA_UInt32 interval) {
  143. /* The interval needs to be at least 5ms */
  144. if(interval < 5)
  145. return UA_STATUSCODE_BADINTERNALERROR;
  146. /* Allocate the repeated callback structure */
  147. UA_TimerCallbackEntry *tc =
  148. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  149. if(!tc)
  150. return UA_STATUSCODE_BADOUTOFMEMORY;
  151. /* Set the repeated callback */
  152. tc->interval = (UA_UInt64)interval * UA_DATETIME_MSEC;
  153. tc->id = callbackId;
  154. tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
  155. tc->callback = (UA_TimerCallback)CHANGE_SENTINEL;
  156. /* Enqueue the changes in the MPSC queue */
  157. enqueueChange(t, tc);
  158. return UA_STATUSCODE_GOOD;
  159. }
  160. static void
  161. changeTimerCallbackEntryInterval(UA_Timer *t, UA_UInt64 callbackId,
  162. UA_UInt64 interval, UA_DateTime nextTime) {
  163. /* Remove from the sorted list */
  164. UA_TimerCallbackEntry *tc, *prev = NULL;
  165. SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
  166. if(callbackId == tc->id) {
  167. if(prev)
  168. SLIST_REMOVE_AFTER(prev, next);
  169. else
  170. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  171. break;
  172. }
  173. prev = tc;
  174. }
  175. if(!tc)
  176. return;
  177. /* Adjust settings */
  178. tc->interval = interval;
  179. tc->nextTime = nextTime;
  180. /* Reinsert at the new position */
  181. addTimerCallbackEntry(t, tc);
  182. }
  183. /* Removing a repeated callback: Add an entry with the "nextTime" timestamp set
  184. * to UA_INT64_MAX. The next iteration picks this up and removes the repated
  185. * callback from the linked list. */
  186. UA_StatusCode
  187. UA_Timer_removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
  188. /* Allocate the repeated callback structure */
  189. UA_TimerCallbackEntry *tc =
  190. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  191. if(!tc)
  192. return UA_STATUSCODE_BADOUTOFMEMORY;
  193. /* Set the repeated callback with the sentinel nextTime */
  194. tc->id = callbackId;
  195. tc->callback = (UA_TimerCallback)REMOVE_SENTINEL;
  196. /* Enqueue the changes in the MPSC queue */
  197. enqueueChange(t, tc);
  198. return UA_STATUSCODE_GOOD;
  199. }
  200. static void
  201. removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
  202. UA_TimerCallbackEntry *tc, *prev = NULL;
  203. SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
  204. if(callbackId == tc->id) {
  205. if(prev)
  206. SLIST_REMOVE_AFTER(prev, next);
  207. else
  208. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  209. UA_free(tc);
  210. break;
  211. }
  212. prev = tc;
  213. }
  214. }
  215. /* Process the changes that were added to the MPSC queue (by other threads) */
  216. static void
  217. processChanges(UA_Timer *t) {
  218. UA_TimerCallbackEntry *change;
  219. while((change = dequeueChange(t))) {
  220. switch((uintptr_t)change->callback) {
  221. case REMOVE_SENTINEL:
  222. removeRepeatedCallback(t, change->id);
  223. UA_free(change);
  224. break;
  225. case CHANGE_SENTINEL:
  226. changeTimerCallbackEntryInterval(t, change->id, change->interval,
  227. change->nextTime);
  228. UA_free(change);
  229. break;
  230. default:
  231. addTimerCallbackEntry(t, change);
  232. }
  233. }
  234. }
  235. UA_DateTime
  236. UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic,
  237. UA_TimerDispatchCallback dispatchCallback,
  238. void *application) {
  239. /* Insert and remove callbacks */
  240. processChanges(t);
  241. /* Find the last callback to be executed now */
  242. UA_TimerCallbackEntry *firstAfter, *lastNow = NULL;
  243. SLIST_FOREACH(firstAfter, &t->repeatedCallbacks, next) {
  244. if(firstAfter->nextTime > nowMonotonic)
  245. break;
  246. lastNow = firstAfter;
  247. }
  248. /* Nothing to do */
  249. if(!lastNow) {
  250. if(firstAfter)
  251. return firstAfter->nextTime;
  252. return UA_INT64_MAX;
  253. }
  254. /* Put the callbacks that are executed now in a separate list */
  255. UA_TimerCallbackList executedNowList;
  256. executedNowList.slh_first = SLIST_FIRST(&t->repeatedCallbacks);
  257. lastNow->next.sle_next = NULL;
  258. /* Fake entry to represent the first element in the newly-sorted list */
  259. UA_TimerCallbackEntry tmp_first;
  260. tmp_first.nextTime = nowMonotonic - 1; /* never matches for last_dispatched */
  261. tmp_first.next.sle_next = firstAfter;
  262. UA_TimerCallbackEntry *last_dispatched = &tmp_first;
  263. /* Iterate over the list of callbacks to process now */
  264. UA_TimerCallbackEntry *tc;
  265. while((tc = SLIST_FIRST(&executedNowList))) {
  266. /* Remove from the list */
  267. SLIST_REMOVE_HEAD(&executedNowList, next);
  268. /* Dispatch/process callback */
  269. dispatchCallback(application, tc->callback, tc->data);
  270. /* Set the time for the next execution. Prevent an infinite loop by
  271. * forcing the next processing into the next iteration. */
  272. tc->nextTime += (UA_Int64)tc->interval;
  273. if(tc->nextTime < nowMonotonic)
  274. tc->nextTime = nowMonotonic + 1;
  275. /* Find the new position for tc to keep the list sorted */
  276. UA_TimerCallbackEntry *prev_tc;
  277. if(last_dispatched->nextTime == tc->nextTime) {
  278. /* We try to "batch" repeatedCallbacks with the same interval. This
  279. * saves a linear search when the last dispatched entry has the same
  280. * nextTime timestamp as this entry. */
  281. UA_assert(last_dispatched != &tmp_first);
  282. prev_tc = last_dispatched;
  283. } else {
  284. /* Find the position for the next execution by a linear search
  285. * starting at last_dispatched or the first element */
  286. if(last_dispatched->nextTime < tc->nextTime)
  287. prev_tc = last_dispatched;
  288. else
  289. prev_tc = &tmp_first;
  290. while(true) {
  291. UA_TimerCallbackEntry *n = SLIST_NEXT(prev_tc, next);
  292. if(!n || n->nextTime >= tc->nextTime)
  293. break;
  294. prev_tc = n;
  295. }
  296. }
  297. /* Update last_dispatched to make sure batched callbacks are added in the
  298. * same sequence as before they were executed and to save some iterations
  299. * of the linear search for callbacks to be added further back in the list. */
  300. last_dispatched = tc;
  301. /* Add entry to the new position in the sorted list */
  302. SLIST_INSERT_AFTER(prev_tc, tc, next);
  303. }
  304. /* Set the entry-point for the newly sorted list */
  305. t->repeatedCallbacks.slh_first = tmp_first.next.sle_next;
  306. /* Re-repeat processAddRemoved since one of the callbacks might have removed
  307. * or added a callback. So we return a correct timeout. */
  308. processChanges(t);
  309. /* Return timestamp of next repetition */
  310. tc = SLIST_FIRST(&t->repeatedCallbacks);
  311. if(!tc)
  312. return UA_INT64_MAX; /* Main-loop has a max timeout / will continue earlier */
  313. return tc->nextTime;
  314. }
  315. void
  316. UA_Timer_deleteMembers(UA_Timer *t) {
  317. /* Process changes to empty the MPSC queue */
  318. processChanges(t);
  319. /* Remove repeated callbacks */
  320. UA_TimerCallbackEntry *current;
  321. while((current = SLIST_FIRST(&t->repeatedCallbacks))) {
  322. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  323. UA_free(current);
  324. }
  325. }