ua_timer.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. #include "ua_util.h"
  5. #include "ua_timer.h"
  6. /* Only one thread operates on the repeated jobs. This is usually the "main"
  7. * thread with the event loop. All other threads introduce changes via a
  8. * multi-producer single-consumer (MPSC) queue. The queue is based on a design
  9. * by Dmitry Vyukov.
  10. * http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
  11. *
  12. * The RepeatedCallback structure is used both in the sorted list of callbacks
  13. * and in the MPSC changes queue. For the changes queue, we differentiate
  14. * between three cases encoded in the callback pointer.
  15. *
  16. * callback > 0x01: add the new repeated callback to the sorted list
  17. * callback == 0x00: remove the callback with the same id
  18. * callback == 0x01: change the interval of the existing callback */
  19. #define REMOVE_SENTINEL 0x00
  20. #define CHANGE_SENTINEL 0x01
  21. struct UA_TimerCallbackEntry {
  22. SLIST_ENTRY(UA_TimerCallbackEntry) next; /* Next element in the list */
  23. UA_DateTime nextTime; /* The next time when the callbacks
  24. * are to be executed */
  25. UA_UInt64 interval; /* Interval in 100ns resolution */
  26. UA_UInt64 id; /* Id of the repeated callback */
  27. UA_TimerCallback callback;
  28. void *data;
  29. };
  30. void
  31. UA_Timer_init(UA_Timer *t) {
  32. SLIST_INIT(&t->repeatedCallbacks);
  33. t->changes_head = (UA_TimerCallbackEntry*)&t->changes_stub;
  34. t->changes_tail = (UA_TimerCallbackEntry*)&t->changes_stub;
  35. t->changes_stub = NULL;
  36. t->idCounter = 0;
  37. }
  38. static void
  39. enqueueChange(UA_Timer *t, UA_TimerCallbackEntry *tc) {
  40. tc->next.sle_next = NULL;
  41. UA_TimerCallbackEntry *prev = (UA_TimerCallbackEntry*)
  42. UA_atomic_xchg((void * volatile *)&t->changes_head, tc);
  43. /* Nothing can be dequeued while the producer is blocked here */
  44. prev->next.sle_next = tc; /* Once this change is visible in the consumer,
  45. * the node is dequeued in the following
  46. * iteration */
  47. }
  48. static UA_TimerCallbackEntry *
  49. dequeueChange(UA_Timer *t) {
  50. UA_TimerCallbackEntry *tail = t->changes_tail;
  51. UA_TimerCallbackEntry *next = tail->next.sle_next;
  52. if(tail == (UA_TimerCallbackEntry*)&t->changes_stub) {
  53. if(!next)
  54. return NULL;
  55. t->changes_tail = next;
  56. tail = next;
  57. next = next->next.sle_next;
  58. }
  59. if(next) {
  60. t->changes_tail = next;
  61. return tail;
  62. }
  63. UA_TimerCallbackEntry* head = t->changes_head;
  64. if(tail != head)
  65. return NULL;
  66. enqueueChange(t, (UA_TimerCallbackEntry*)&t->changes_stub);
  67. next = tail->next.sle_next;
  68. if(next) {
  69. t->changes_tail = next;
  70. return tail;
  71. }
  72. return NULL;
  73. }
  74. /* Adding repeated callbacks: Add an entry with the "nextTime" timestamp in the
  75. * future. This will be picked up in the next iteration and inserted at the
  76. * correct place. So that the next execution takes place ät "nextTime". */
  77. UA_StatusCode
  78. UA_Timer_addRepeatedCallback(UA_Timer *t, UA_TimerCallback callback,
  79. void *data, UA_UInt32 interval,
  80. UA_UInt64 *callbackId) {
  81. /* A callback method needs to be present */
  82. if(!callback)
  83. return UA_STATUSCODE_BADINTERNALERROR;
  84. /* The interval needs to be at least 5ms */
  85. if(interval < 5)
  86. return UA_STATUSCODE_BADINTERNALERROR;
  87. /* Allocate the repeated callback structure */
  88. UA_TimerCallbackEntry *tc =
  89. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  90. if(!tc)
  91. return UA_STATUSCODE_BADOUTOFMEMORY;
  92. /* Set the repeated callback */
  93. tc->interval = (UA_UInt64)interval * (UA_UInt64)UA_MSEC_TO_DATETIME;
  94. tc->id = ++t->idCounter;
  95. tc->callback = callback;
  96. tc->data = data;
  97. tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
  98. /* Set the output identifier */
  99. if(callbackId)
  100. *callbackId = tc->id;
  101. /* Enqueue the changes in the MPSC queue */
  102. enqueueChange(t, tc);
  103. return UA_STATUSCODE_GOOD;
  104. }
  105. static void
  106. addTimerCallbackEntry(UA_Timer *t, UA_TimerCallbackEntry * UA_RESTRICT tc) {
  107. /* Find the last entry before this callback */
  108. UA_TimerCallbackEntry *tmpTc, *afterTc = NULL;
  109. SLIST_FOREACH(tmpTc, &t->repeatedCallbacks, next) {
  110. if(tmpTc->nextTime >= tc->nextTime)
  111. break;
  112. afterTc = tmpTc;
  113. /* The goal is to have many repeated callbacks with the same repetition
  114. * interval in a "block" in order to reduce linear search for re-entry
  115. * to the sorted list after processing. Allow the first execution to lie
  116. * between "nextTime - 1s" and "nextTime" if this adjustment groups
  117. * callbacks with the same repetition interval. */
  118. if(tmpTc->interval == tc->interval &&
  119. tmpTc->nextTime > (tc->nextTime - UA_SEC_TO_DATETIME))
  120. tc->nextTime = tmpTc->nextTime;
  121. }
  122. /* Add the repeated callback */
  123. if(afterTc)
  124. SLIST_INSERT_AFTER(afterTc, tc, next);
  125. else
  126. SLIST_INSERT_HEAD(&t->repeatedCallbacks, tc, next);
  127. }
  128. UA_StatusCode
  129. UA_Timer_changeRepeatedCallbackInterval(UA_Timer *t, UA_UInt64 callbackId,
  130. UA_UInt32 interval) {
  131. /* The interval needs to be at least 5ms */
  132. if(interval < 5)
  133. return UA_STATUSCODE_BADINTERNALERROR;
  134. /* Allocate the repeated callback structure */
  135. UA_TimerCallbackEntry *tc =
  136. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  137. if(!tc)
  138. return UA_STATUSCODE_BADOUTOFMEMORY;
  139. /* Set the repeated callback */
  140. tc->interval = (UA_UInt64)interval * (UA_UInt64)UA_MSEC_TO_DATETIME;
  141. tc->id = callbackId;
  142. tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
  143. tc->callback = (UA_TimerCallback)CHANGE_SENTINEL;
  144. /* Enqueue the changes in the MPSC queue */
  145. enqueueChange(t, tc);
  146. return UA_STATUSCODE_GOOD;
  147. }
  148. static void
  149. changeTimerCallbackEntryInterval(UA_Timer *t, UA_UInt64 callbackId,
  150. UA_UInt64 interval, UA_DateTime nextTime) {
  151. /* Remove from the sorted list */
  152. UA_TimerCallbackEntry *tc, *prev = NULL;
  153. SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
  154. if(callbackId == tc->id) {
  155. if(prev)
  156. SLIST_REMOVE_AFTER(prev, next);
  157. else
  158. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  159. break;
  160. }
  161. prev = tc;
  162. }
  163. if(!tc)
  164. return;
  165. /* Adjust settings */
  166. tc->interval = interval;
  167. tc->nextTime = nextTime;
  168. /* Reinsert at the new position */
  169. addTimerCallbackEntry(t, tc);
  170. }
  171. /* Removing a repeated callback: Add an entry with the "nextTime" timestamp set
  172. * to UA_INT64_MAX. The next iteration picks this up and removes the repated
  173. * callback from the linked list. */
  174. UA_StatusCode
  175. UA_Timer_removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
  176. /* Allocate the repeated callback structure */
  177. UA_TimerCallbackEntry *tc =
  178. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  179. if(!tc)
  180. return UA_STATUSCODE_BADOUTOFMEMORY;
  181. /* Set the repeated callback with the sentinel nextTime */
  182. tc->id = callbackId;
  183. tc->callback = (UA_TimerCallback)REMOVE_SENTINEL;
  184. /* Enqueue the changes in the MPSC queue */
  185. enqueueChange(t, tc);
  186. return UA_STATUSCODE_GOOD;
  187. }
  188. static void
  189. removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
  190. UA_TimerCallbackEntry *tc, *prev = NULL;
  191. SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
  192. if(callbackId == tc->id) {
  193. if(prev)
  194. SLIST_REMOVE_AFTER(prev, next);
  195. else
  196. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  197. UA_free(tc);
  198. break;
  199. }
  200. prev = tc;
  201. }
  202. }
  203. /* Process the changes that were added to the MPSC queue (by other threads) */
  204. static void
  205. processChanges(UA_Timer *t) {
  206. UA_TimerCallbackEntry *change;
  207. while((change = dequeueChange(t))) {
  208. switch((uintptr_t)change->callback) {
  209. case REMOVE_SENTINEL:
  210. removeRepeatedCallback(t, change->id);
  211. UA_free(change);
  212. break;
  213. case CHANGE_SENTINEL:
  214. changeTimerCallbackEntryInterval(t, change->id, change->interval,
  215. change->nextTime);
  216. UA_free(change);
  217. break;
  218. default:
  219. addTimerCallbackEntry(t, change);
  220. }
  221. }
  222. }
  223. UA_DateTime
  224. UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic,
  225. UA_TimerDispatchCallback dispatchCallback,
  226. void *application) {
  227. /* Insert and remove callbacks */
  228. processChanges(t);
  229. /* Find the last callback to be executed now */
  230. UA_TimerCallbackEntry *firstAfter, *lastNow = NULL;
  231. SLIST_FOREACH(firstAfter, &t->repeatedCallbacks, next) {
  232. if(firstAfter->nextTime > nowMonotonic)
  233. break;
  234. lastNow = firstAfter;
  235. }
  236. /* Nothing to do */
  237. if(!lastNow) {
  238. if(firstAfter)
  239. return firstAfter->nextTime;
  240. return UA_INT64_MAX;
  241. }
  242. /* Put the callbacks that are executed now in a separate list */
  243. UA_TimerCallbackList executedNowList;
  244. executedNowList.slh_first = SLIST_FIRST(&t->repeatedCallbacks);
  245. lastNow->next.sle_next = NULL;
  246. /* Fake entry to represent the first element in the newly-sorted list */
  247. UA_TimerCallbackEntry tmp_first;
  248. tmp_first.nextTime = nowMonotonic - 1; /* never matches for last_dispatched */
  249. tmp_first.next.sle_next = firstAfter;
  250. UA_TimerCallbackEntry *last_dispatched = &tmp_first;
  251. /* Iterate over the list of callbacks to process now */
  252. UA_TimerCallbackEntry *tc;
  253. while((tc = SLIST_FIRST(&executedNowList))) {
  254. /* Remove from the list */
  255. SLIST_REMOVE_HEAD(&executedNowList, next);
  256. /* Dispatch/process callback */
  257. dispatchCallback(application, tc->callback, tc->data);
  258. /* Set the time for the next execution. Prevent an infinite loop by
  259. * forcing the next processing into the next iteration. */
  260. tc->nextTime += (UA_Int64)tc->interval;
  261. if(tc->nextTime < nowMonotonic)
  262. tc->nextTime = nowMonotonic + 1;
  263. /* Find the new position for tc to keep the list sorted */
  264. UA_TimerCallbackEntry *prev_tc;
  265. if(last_dispatched->nextTime == tc->nextTime) {
  266. /* We try to "batch" repeatedCallbacks with the same interval. This
  267. * saves a linear search when the last dispatched entry has the same
  268. * nextTime timestamp as this entry. */
  269. UA_assert(last_dispatched != &tmp_first);
  270. prev_tc = last_dispatched;
  271. } else {
  272. /* Find the position for the next execution by a linear search
  273. * starting at last_dispatched or the first element */
  274. if(last_dispatched->nextTime < tc->nextTime)
  275. prev_tc = last_dispatched;
  276. else
  277. prev_tc = &tmp_first;
  278. while(true) {
  279. UA_TimerCallbackEntry *n = SLIST_NEXT(prev_tc, next);
  280. if(!n || n->nextTime >= tc->nextTime)
  281. break;
  282. prev_tc = n;
  283. }
  284. /* Update last_dispatched */
  285. last_dispatched = tc;
  286. }
  287. /* Add entry to the new position in the sorted list */
  288. SLIST_INSERT_AFTER(prev_tc, tc, next);
  289. }
  290. /* Set the entry-point for the newly sorted list */
  291. t->repeatedCallbacks.slh_first = tmp_first.next.sle_next;
  292. /* Re-repeat processAddRemoved since one of the callbacks might have removed
  293. * or added a callback. So we return a correct timeout. */
  294. processChanges(t);
  295. /* Return timestamp of next repetition */
  296. return SLIST_FIRST(&t->repeatedCallbacks)->nextTime;
  297. }
  298. void
  299. UA_Timer_deleteMembers(UA_Timer *t) {
  300. /* Process changes to empty the MPSC queue */
  301. processChanges(t);
  302. /* Remove repeated callbacks */
  303. UA_TimerCallbackEntry *current;
  304. while((current = SLIST_FIRST(&t->repeatedCallbacks))) {
  305. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  306. UA_free(current);
  307. }
  308. }