ua_timer.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2017 (c) Julius Pfrommer, Fraunhofer IOSB
  6. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  7. */
  8. #include "ua_util.h"
  9. #include "ua_timer.h"
  10. /* Only one thread operates on the repeated jobs. This is usually the "main"
  11. * thread with the event loop. All other threads introduce changes via a
  12. * multi-producer single-consumer (MPSC) queue. The queue is based on a design
  13. * by Dmitry Vyukov.
  14. * http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
  15. *
  16. * The RepeatedCallback structure is used both in the sorted list of callbacks
  17. * and in the MPSC changes queue. For the changes queue, we differentiate
  18. * between three cases encoded in the callback pointer.
  19. *
  20. * callback > 0x01: add the new repeated callback to the sorted list
  21. * callback == 0x00: remove the callback with the same id
  22. * callback == 0x01: change the interval of the existing callback */
  23. #define REMOVE_SENTINEL 0x00
  24. #define CHANGE_SENTINEL 0x01
  25. struct UA_TimerCallbackEntry {
  26. SLIST_ENTRY(UA_TimerCallbackEntry) next; /* Next element in the list */
  27. UA_DateTime nextTime; /* The next time when the callbacks
  28. * are to be executed */
  29. UA_UInt64 interval; /* Interval in 100ns resolution */
  30. UA_UInt64 id; /* Id of the repeated callback */
  31. UA_TimerCallback callback;
  32. void *data;
  33. };
  34. void
  35. UA_Timer_init(UA_Timer *t) {
  36. SLIST_INIT(&t->repeatedCallbacks);
  37. t->changes_head = (UA_TimerCallbackEntry*)&t->changes_stub;
  38. t->changes_tail = (UA_TimerCallbackEntry*)&t->changes_stub;
  39. t->changes_stub = NULL;
  40. t->idCounter = 0;
  41. }
  42. static void
  43. enqueueChange(UA_Timer *t, UA_TimerCallbackEntry *tc) {
  44. tc->next.sle_next = NULL;
  45. UA_TimerCallbackEntry *prev = (UA_TimerCallbackEntry*)
  46. UA_atomic_xchg((void * volatile *)&t->changes_head, tc);
  47. /* Nothing can be dequeued while the producer is blocked here */
  48. prev->next.sle_next = tc; /* Once this change is visible in the consumer,
  49. * the node is dequeued in the following
  50. * iteration */
  51. }
  52. static UA_TimerCallbackEntry *
  53. dequeueChange(UA_Timer *t) {
  54. UA_TimerCallbackEntry *tail = t->changes_tail;
  55. UA_TimerCallbackEntry *next = tail->next.sle_next;
  56. if(tail == (UA_TimerCallbackEntry*)&t->changes_stub) {
  57. if(!next)
  58. return NULL;
  59. t->changes_tail = next;
  60. tail = next;
  61. next = next->next.sle_next;
  62. }
  63. if(next) {
  64. t->changes_tail = next;
  65. return tail;
  66. }
  67. UA_TimerCallbackEntry* head = t->changes_head;
  68. if(tail != head)
  69. return NULL;
  70. enqueueChange(t, (UA_TimerCallbackEntry*)&t->changes_stub);
  71. next = tail->next.sle_next;
  72. if(next) {
  73. t->changes_tail = next;
  74. return tail;
  75. }
  76. return NULL;
  77. }
  78. /* Adding repeated callbacks: Add an entry with the "nextTime" timestamp in the
  79. * future. This will be picked up in the next iteration and inserted at the
  80. * correct place. So that the next execution takes place ät "nextTime". */
  81. UA_StatusCode
  82. UA_Timer_addRepeatedCallback(UA_Timer *t, UA_TimerCallback callback,
  83. void *data, UA_UInt32 interval,
  84. UA_UInt64 *callbackId) {
  85. /* A callback method needs to be present */
  86. if(!callback)
  87. return UA_STATUSCODE_BADINTERNALERROR;
  88. /* The interval needs to be at least 5ms */
  89. if(interval < 5)
  90. return UA_STATUSCODE_BADINTERNALERROR;
  91. /* Allocate the repeated callback structure */
  92. UA_TimerCallbackEntry *tc =
  93. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  94. if(!tc)
  95. return UA_STATUSCODE_BADOUTOFMEMORY;
  96. /* Set the repeated callback */
  97. tc->interval = (UA_UInt64)interval * UA_DATETIME_MSEC;
  98. tc->id = ++t->idCounter;
  99. tc->callback = callback;
  100. tc->data = data;
  101. tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
  102. /* Set the output identifier */
  103. if(callbackId)
  104. *callbackId = tc->id;
  105. /* Enqueue the changes in the MPSC queue */
  106. enqueueChange(t, tc);
  107. return UA_STATUSCODE_GOOD;
  108. }
  109. static void
  110. addTimerCallbackEntry(UA_Timer *t, UA_TimerCallbackEntry * UA_RESTRICT tc) {
  111. /* Find the last entry before this callback */
  112. UA_TimerCallbackEntry *tmpTc, *afterTc = NULL;
  113. SLIST_FOREACH(tmpTc, &t->repeatedCallbacks, next) {
  114. if(tmpTc->nextTime >= tc->nextTime)
  115. break;
  116. afterTc = tmpTc;
  117. /* The goal is to have many repeated callbacks with the same repetition
  118. * interval in a "block" in order to reduce linear search for re-entry
  119. * to the sorted list after processing. Allow the first execution to lie
  120. * between "nextTime - 1s" and "nextTime" if this adjustment groups
  121. * callbacks with the same repetition interval. */
  122. if(tmpTc->interval == tc->interval &&
  123. tmpTc->nextTime > (tc->nextTime - UA_DATETIME_SEC))
  124. tc->nextTime = tmpTc->nextTime;
  125. }
  126. /* Add the repeated callback */
  127. if(afterTc)
  128. SLIST_INSERT_AFTER(afterTc, tc, next);
  129. else
  130. SLIST_INSERT_HEAD(&t->repeatedCallbacks, tc, next);
  131. }
  132. UA_StatusCode
  133. UA_Timer_changeRepeatedCallbackInterval(UA_Timer *t, UA_UInt64 callbackId,
  134. UA_UInt32 interval) {
  135. /* The interval needs to be at least 5ms */
  136. if(interval < 5)
  137. return UA_STATUSCODE_BADINTERNALERROR;
  138. /* Allocate the repeated callback structure */
  139. UA_TimerCallbackEntry *tc =
  140. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  141. if(!tc)
  142. return UA_STATUSCODE_BADOUTOFMEMORY;
  143. /* Set the repeated callback */
  144. tc->interval = (UA_UInt64)interval * UA_DATETIME_MSEC;
  145. tc->id = callbackId;
  146. tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
  147. tc->callback = (UA_TimerCallback)CHANGE_SENTINEL;
  148. /* Enqueue the changes in the MPSC queue */
  149. enqueueChange(t, tc);
  150. return UA_STATUSCODE_GOOD;
  151. }
  152. static void
  153. changeTimerCallbackEntryInterval(UA_Timer *t, UA_UInt64 callbackId,
  154. UA_UInt64 interval, UA_DateTime nextTime) {
  155. /* Remove from the sorted list */
  156. UA_TimerCallbackEntry *tc, *prev = NULL;
  157. SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
  158. if(callbackId == tc->id) {
  159. if(prev)
  160. SLIST_REMOVE_AFTER(prev, next);
  161. else
  162. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  163. break;
  164. }
  165. prev = tc;
  166. }
  167. if(!tc)
  168. return;
  169. /* Adjust settings */
  170. tc->interval = interval;
  171. tc->nextTime = nextTime;
  172. /* Reinsert at the new position */
  173. addTimerCallbackEntry(t, tc);
  174. }
  175. /* Removing a repeated callback: Add an entry with the "nextTime" timestamp set
  176. * to UA_INT64_MAX. The next iteration picks this up and removes the repated
  177. * callback from the linked list. */
  178. UA_StatusCode
  179. UA_Timer_removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
  180. /* Allocate the repeated callback structure */
  181. UA_TimerCallbackEntry *tc =
  182. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  183. if(!tc)
  184. return UA_STATUSCODE_BADOUTOFMEMORY;
  185. /* Set the repeated callback with the sentinel nextTime */
  186. tc->id = callbackId;
  187. tc->callback = (UA_TimerCallback)REMOVE_SENTINEL;
  188. /* Enqueue the changes in the MPSC queue */
  189. enqueueChange(t, tc);
  190. return UA_STATUSCODE_GOOD;
  191. }
  192. static void
  193. removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
  194. UA_TimerCallbackEntry *tc, *prev = NULL;
  195. SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
  196. if(callbackId == tc->id) {
  197. if(prev)
  198. SLIST_REMOVE_AFTER(prev, next);
  199. else
  200. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  201. UA_free(tc);
  202. break;
  203. }
  204. prev = tc;
  205. }
  206. }
  207. /* Process the changes that were added to the MPSC queue (by other threads) */
  208. static void
  209. processChanges(UA_Timer *t) {
  210. UA_TimerCallbackEntry *change;
  211. while((change = dequeueChange(t))) {
  212. switch((uintptr_t)change->callback) {
  213. case REMOVE_SENTINEL:
  214. removeRepeatedCallback(t, change->id);
  215. UA_free(change);
  216. break;
  217. case CHANGE_SENTINEL:
  218. changeTimerCallbackEntryInterval(t, change->id, change->interval,
  219. change->nextTime);
  220. UA_free(change);
  221. break;
  222. default:
  223. addTimerCallbackEntry(t, change);
  224. }
  225. }
  226. }
  227. UA_DateTime
  228. UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic,
  229. UA_TimerDispatchCallback dispatchCallback,
  230. void *application) {
  231. /* Insert and remove callbacks */
  232. processChanges(t);
  233. /* Find the last callback to be executed now */
  234. UA_TimerCallbackEntry *firstAfter, *lastNow = NULL;
  235. SLIST_FOREACH(firstAfter, &t->repeatedCallbacks, next) {
  236. if(firstAfter->nextTime > nowMonotonic)
  237. break;
  238. lastNow = firstAfter;
  239. }
  240. /* Nothing to do */
  241. if(!lastNow) {
  242. if(firstAfter)
  243. return firstAfter->nextTime;
  244. return UA_INT64_MAX;
  245. }
  246. /* Put the callbacks that are executed now in a separate list */
  247. UA_TimerCallbackList executedNowList;
  248. executedNowList.slh_first = SLIST_FIRST(&t->repeatedCallbacks);
  249. lastNow->next.sle_next = NULL;
  250. /* Fake entry to represent the first element in the newly-sorted list */
  251. UA_TimerCallbackEntry tmp_first;
  252. tmp_first.nextTime = nowMonotonic - 1; /* never matches for last_dispatched */
  253. tmp_first.next.sle_next = firstAfter;
  254. UA_TimerCallbackEntry *last_dispatched = &tmp_first;
  255. /* Iterate over the list of callbacks to process now */
  256. UA_TimerCallbackEntry *tc;
  257. while((tc = SLIST_FIRST(&executedNowList))) {
  258. /* Remove from the list */
  259. SLIST_REMOVE_HEAD(&executedNowList, next);
  260. /* Dispatch/process callback */
  261. dispatchCallback(application, tc->callback, tc->data);
  262. /* Set the time for the next execution. Prevent an infinite loop by
  263. * forcing the next processing into the next iteration. */
  264. tc->nextTime += (UA_Int64)tc->interval;
  265. if(tc->nextTime < nowMonotonic)
  266. tc->nextTime = nowMonotonic + 1;
  267. /* Find the new position for tc to keep the list sorted */
  268. UA_TimerCallbackEntry *prev_tc;
  269. if(last_dispatched->nextTime == tc->nextTime) {
  270. /* We try to "batch" repeatedCallbacks with the same interval. This
  271. * saves a linear search when the last dispatched entry has the same
  272. * nextTime timestamp as this entry. */
  273. UA_assert(last_dispatched != &tmp_first);
  274. prev_tc = last_dispatched;
  275. } else {
  276. /* Find the position for the next execution by a linear search
  277. * starting at last_dispatched or the first element */
  278. if(last_dispatched->nextTime < tc->nextTime)
  279. prev_tc = last_dispatched;
  280. else
  281. prev_tc = &tmp_first;
  282. while(true) {
  283. UA_TimerCallbackEntry *n = SLIST_NEXT(prev_tc, next);
  284. if(!n || n->nextTime >= tc->nextTime)
  285. break;
  286. prev_tc = n;
  287. }
  288. /* Update last_dispatched */
  289. last_dispatched = tc;
  290. }
  291. /* Add entry to the new position in the sorted list */
  292. SLIST_INSERT_AFTER(prev_tc, tc, next);
  293. }
  294. /* Set the entry-point for the newly sorted list */
  295. t->repeatedCallbacks.slh_first = tmp_first.next.sle_next;
  296. /* Re-repeat processAddRemoved since one of the callbacks might have removed
  297. * or added a callback. So we return a correct timeout. */
  298. processChanges(t);
  299. /* Return timestamp of next repetition */
  300. tc = SLIST_FIRST(&t->repeatedCallbacks);
  301. if(!tc)
  302. return UA_INT64_MAX; /* Main-loop has a max timeout / will continue earlier */
  303. return tc->nextTime;
  304. }
  305. void
  306. UA_Timer_deleteMembers(UA_Timer *t) {
  307. /* Process changes to empty the MPSC queue */
  308. processChanges(t);
  309. /* Remove repeated callbacks */
  310. UA_TimerCallbackEntry *current;
  311. while((current = SLIST_FIRST(&t->repeatedCallbacks))) {
  312. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  313. UA_free(current);
  314. }
  315. }