ua_timer.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  7. */
  8. #include "ua_util_internal.h"
  9. #include "ua_timer.h"
  10. /* Only one thread operates on the repeated jobs. This is usually the "main"
  11. * thread with the event loop. All other threads introduce changes via a
  12. * multi-producer single-consumer (MPSC) queue. The queue is based on a design
  13. * by Dmitry Vyukov.
  14. * http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
  15. *
  16. * The RepeatedCallback structure is used both in the sorted list of callbacks
  17. * and in the MPSC changes queue. For the changes queue, we differentiate
  18. * between three cases encoded in the callback pointer.
  19. *
  20. * callback > 0x01: add the new repeated callback to the sorted list
  21. * callback == 0x00: remove the callback with the same id
  22. * callback == 0x01: change the interval of the existing callback */
  23. #define REMOVE_SENTINEL 0x00
  24. #define CHANGE_SENTINEL 0x01
  25. struct UA_TimerCallbackEntry {
  26. SLIST_ENTRY(UA_TimerCallbackEntry) next; /* Next element in the list */
  27. UA_DateTime nextTime; /* The next time when the callbacks
  28. * are to be executed */
  29. UA_UInt64 interval; /* Interval in 100ns resolution */
  30. UA_UInt64 id; /* Id of the repeated callback */
  31. UA_ApplicationCallback callback;
  32. void *application;
  33. void *data;
  34. };
  35. void
  36. UA_Timer_init(UA_Timer *t) {
  37. SLIST_INIT(&t->repeatedCallbacks);
  38. t->changes_head = (UA_TimerCallbackEntry*)&t->changes_stub;
  39. t->changes_tail = (UA_TimerCallbackEntry*)&t->changes_stub;
  40. t->changes_stub = NULL;
  41. t->idCounter = 0;
  42. }
  43. static void
  44. enqueueChange(UA_Timer *t, UA_TimerCallbackEntry *tc) {
  45. tc->next.sle_next = NULL;
  46. UA_TimerCallbackEntry *prev = (UA_TimerCallbackEntry*)
  47. UA_atomic_xchg((void * volatile *)&t->changes_head, tc);
  48. /* Nothing can be dequeued while the producer is blocked here */
  49. prev->next.sle_next = tc; /* Once this change is visible in the consumer,
  50. * the node is dequeued in the following
  51. * iteration */
  52. }
  53. static UA_TimerCallbackEntry *
  54. dequeueChange(UA_Timer *t) {
  55. UA_TimerCallbackEntry *tail = t->changes_tail;
  56. UA_TimerCallbackEntry *next = tail->next.sle_next;
  57. if(tail == (UA_TimerCallbackEntry*)&t->changes_stub) {
  58. if(!next)
  59. return NULL;
  60. t->changes_tail = next;
  61. tail = next;
  62. next = next->next.sle_next;
  63. }
  64. if(next) {
  65. t->changes_tail = next;
  66. return tail;
  67. }
  68. UA_TimerCallbackEntry* head = t->changes_head;
  69. if(tail != head)
  70. return NULL;
  71. enqueueChange(t, (UA_TimerCallbackEntry*)&t->changes_stub);
  72. next = tail->next.sle_next;
  73. if(next) {
  74. t->changes_tail = next;
  75. return tail;
  76. }
  77. return NULL;
  78. }
  79. /* Adding repeated callbacks: Add an entry with the "nextTime" timestamp in the
  80. * future. This will be picked up in the next iteration and inserted at the
  81. * correct place. So that the next execution takes place ät "nextTime". */
  82. UA_StatusCode
  83. UA_Timer_addRepeatedCallback(UA_Timer *t, UA_ApplicationCallback callback,
  84. void *application, void *data, UA_UInt32 interval,
  85. UA_UInt64 *callbackId) {
  86. /* A callback method needs to be present */
  87. if(!callback)
  88. return UA_STATUSCODE_BADINTERNALERROR;
  89. /* The interval needs to be at least 5ms */
  90. if(interval < 5)
  91. return UA_STATUSCODE_BADINTERNALERROR;
  92. /* Allocate the repeated callback structure */
  93. UA_TimerCallbackEntry *tc =
  94. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  95. if(!tc)
  96. return UA_STATUSCODE_BADOUTOFMEMORY;
  97. /* Set the repeated callback */
  98. tc->interval = (UA_UInt64)interval * UA_DATETIME_MSEC;
  99. tc->id = ++t->idCounter;
  100. tc->callback = callback;
  101. tc->application = application;
  102. tc->data = data;
  103. tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
  104. /* Set the output identifier */
  105. if(callbackId)
  106. *callbackId = tc->id;
  107. /* Enqueue the changes in the MPSC queue */
  108. enqueueChange(t, tc);
  109. return UA_STATUSCODE_GOOD;
  110. }
  111. static void
  112. addTimerCallbackEntry(UA_Timer *t, UA_TimerCallbackEntry * UA_RESTRICT tc) {
  113. /* Find the last entry before this callback */
  114. UA_TimerCallbackEntry *tmpTc, *afterTc = NULL;
  115. SLIST_FOREACH(tmpTc, &t->repeatedCallbacks, next) {
  116. if(tmpTc->nextTime >= tc->nextTime)
  117. break;
  118. /* The goal is to have many repeated callbacks with the same repetition
  119. * interval in a "block" in order to reduce linear search for re-entry
  120. * to the sorted list after processing. Allow the first execution to lie
  121. * between "nextTime - 1s" and "nextTime" if this adjustment groups
  122. * callbacks with the same repetition interval.
  123. * Callbacks of a block are added in reversed order. This design allows
  124. * the monitored items of a subscription (if created in a sequence with the
  125. * same publish/sample interval) to be executed before the subscription
  126. * publish the notifications */
  127. if(tmpTc->interval == tc->interval &&
  128. tmpTc->nextTime > (tc->nextTime - UA_DATETIME_SEC)) {
  129. tc->nextTime = tmpTc->nextTime;
  130. break;
  131. }
  132. /* tc is neither in the same interval nor supposed to be executed sooner
  133. * than tmpTc. Update afterTc to push tc further back in the timer list. */
  134. afterTc = tmpTc;
  135. }
  136. /* Add the repeated callback */
  137. if(afterTc)
  138. SLIST_INSERT_AFTER(afterTc, tc, next);
  139. else
  140. SLIST_INSERT_HEAD(&t->repeatedCallbacks, tc, next);
  141. }
  142. UA_StatusCode
  143. UA_Timer_changeRepeatedCallbackInterval(UA_Timer *t, UA_UInt64 callbackId,
  144. UA_UInt32 interval) {
  145. /* The interval needs to be at least 5ms */
  146. if(interval < 5)
  147. return UA_STATUSCODE_BADINTERNALERROR;
  148. /* Allocate the repeated callback structure */
  149. UA_TimerCallbackEntry *tc =
  150. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  151. if(!tc)
  152. return UA_STATUSCODE_BADOUTOFMEMORY;
  153. /* Set the repeated callback */
  154. tc->interval = (UA_UInt64)interval * UA_DATETIME_MSEC;
  155. tc->id = callbackId;
  156. tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
  157. tc->callback = (UA_ApplicationCallback)CHANGE_SENTINEL;
  158. /* Enqueue the changes in the MPSC queue */
  159. enqueueChange(t, tc);
  160. return UA_STATUSCODE_GOOD;
  161. }
  162. static void
  163. changeTimerCallbackEntryInterval(UA_Timer *t, UA_UInt64 callbackId,
  164. UA_UInt64 interval, UA_DateTime nextTime) {
  165. /* Remove from the sorted list */
  166. UA_TimerCallbackEntry *tc, *prev = NULL;
  167. SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
  168. if(callbackId == tc->id) {
  169. if(prev)
  170. SLIST_REMOVE_AFTER(prev, next);
  171. else
  172. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  173. break;
  174. }
  175. prev = tc;
  176. }
  177. if(!tc)
  178. return;
  179. /* Adjust settings */
  180. tc->interval = interval;
  181. tc->nextTime = nextTime;
  182. /* Reinsert at the new position */
  183. addTimerCallbackEntry(t, tc);
  184. }
  185. /* Removing a repeated callback: Add an entry with the "nextTime" timestamp set
  186. * to UA_INT64_MAX. The next iteration picks this up and removes the repated
  187. * callback from the linked list. */
  188. UA_StatusCode
  189. UA_Timer_removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
  190. /* Allocate the repeated callback structure */
  191. UA_TimerCallbackEntry *tc =
  192. (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
  193. if(!tc)
  194. return UA_STATUSCODE_BADOUTOFMEMORY;
  195. /* Set the repeated callback with the sentinel nextTime */
  196. tc->id = callbackId;
  197. tc->callback = (UA_ApplicationCallback)REMOVE_SENTINEL;
  198. /* Enqueue the changes in the MPSC queue */
  199. enqueueChange(t, tc);
  200. return UA_STATUSCODE_GOOD;
  201. }
  202. static void
  203. removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
  204. UA_TimerCallbackEntry *tc, *prev = NULL;
  205. SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
  206. if(callbackId == tc->id) {
  207. if(prev)
  208. SLIST_REMOVE_AFTER(prev, next);
  209. else
  210. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  211. UA_free(tc);
  212. break;
  213. }
  214. prev = tc;
  215. }
  216. }
  217. /* Process the changes that were added to the MPSC queue (by other threads) */
  218. static void
  219. processChanges(UA_Timer *t) {
  220. UA_TimerCallbackEntry *change;
  221. while((change = dequeueChange(t))) {
  222. switch((uintptr_t)change->callback) {
  223. case REMOVE_SENTINEL:
  224. removeRepeatedCallback(t, change->id);
  225. UA_free(change);
  226. break;
  227. case CHANGE_SENTINEL:
  228. changeTimerCallbackEntryInterval(t, change->id, change->interval,
  229. change->nextTime);
  230. UA_free(change);
  231. break;
  232. default:
  233. addTimerCallbackEntry(t, change);
  234. }
  235. }
  236. }
  237. UA_DateTime
  238. UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic,
  239. UA_TimerExecutionCallback executionCallback,
  240. void *executionApplication) {
  241. /* Insert and remove callbacks */
  242. processChanges(t);
  243. /* Find the last callback to be executed now */
  244. UA_TimerCallbackEntry *firstAfter, *lastNow = NULL;
  245. SLIST_FOREACH(firstAfter, &t->repeatedCallbacks, next) {
  246. if(firstAfter->nextTime > nowMonotonic)
  247. break;
  248. lastNow = firstAfter;
  249. }
  250. /* Nothing to do */
  251. if(!lastNow) {
  252. if(firstAfter)
  253. return firstAfter->nextTime;
  254. return UA_INT64_MAX;
  255. }
  256. /* Put the callbacks that are executed now in a separate list */
  257. UA_TimerCallbackList executedNowList;
  258. executedNowList.slh_first = SLIST_FIRST(&t->repeatedCallbacks);
  259. lastNow->next.sle_next = NULL;
  260. /* Fake entry to represent the first element in the newly-sorted list */
  261. UA_TimerCallbackEntry tmp_first;
  262. tmp_first.nextTime = nowMonotonic - 1; /* never matches for last_dispatched */
  263. tmp_first.next.sle_next = firstAfter;
  264. UA_TimerCallbackEntry *last_dispatched = &tmp_first;
  265. /* Iterate over the list of callbacks to process now */
  266. UA_TimerCallbackEntry *tc;
  267. while((tc = SLIST_FIRST(&executedNowList))) {
  268. /* Remove from the list */
  269. SLIST_REMOVE_HEAD(&executedNowList, next);
  270. /* Dispatch/process callback */
  271. executionCallback(executionApplication, tc->callback,
  272. tc->application, tc->data);
  273. /* Set the time for the next execution. Prevent an infinite loop by
  274. * forcing the next processing into the next iteration. */
  275. tc->nextTime += (UA_Int64)tc->interval;
  276. if(tc->nextTime < nowMonotonic)
  277. tc->nextTime = nowMonotonic + 1;
  278. /* Find the new position for tc to keep the list sorted */
  279. UA_TimerCallbackEntry *prev_tc;
  280. if(last_dispatched->nextTime == tc->nextTime) {
  281. /* We try to "batch" repeatedCallbacks with the same interval. This
  282. * saves a linear search when the last dispatched entry has the same
  283. * nextTime timestamp as this entry. */
  284. UA_assert(last_dispatched != &tmp_first);
  285. prev_tc = last_dispatched;
  286. } else {
  287. /* Find the position for the next execution by a linear search
  288. * starting at last_dispatched or the first element */
  289. if(last_dispatched->nextTime < tc->nextTime)
  290. prev_tc = last_dispatched;
  291. else
  292. prev_tc = &tmp_first;
  293. while(true) {
  294. UA_TimerCallbackEntry *n = SLIST_NEXT(prev_tc, next);
  295. if(!n || n->nextTime >= tc->nextTime)
  296. break;
  297. prev_tc = n;
  298. }
  299. }
  300. /* Update last_dispatched to make sure batched callbacks are added in the
  301. * same sequence as before they were executed and to save some iterations
  302. * of the linear search for callbacks to be added further back in the list. */
  303. last_dispatched = tc;
  304. /* Add entry to the new position in the sorted list */
  305. SLIST_INSERT_AFTER(prev_tc, tc, next);
  306. }
  307. /* Set the entry-point for the newly sorted list */
  308. t->repeatedCallbacks.slh_first = tmp_first.next.sle_next;
  309. /* Re-repeat processAddRemoved since one of the callbacks might have removed
  310. * or added a callback. So we return a correct timeout. */
  311. processChanges(t);
  312. /* Return timestamp of next repetition */
  313. tc = SLIST_FIRST(&t->repeatedCallbacks);
  314. if(!tc)
  315. return UA_INT64_MAX; /* Main-loop has a max timeout / will continue earlier */
  316. return tc->nextTime;
  317. }
  318. void
  319. UA_Timer_deleteMembers(UA_Timer *t) {
  320. /* Process changes to empty the MPSC queue */
  321. processChanges(t);
  322. /* Remove repeated callbacks */
  323. UA_TimerCallbackEntry *current;
  324. while((current = SLIST_FIRST(&t->repeatedCallbacks))) {
  325. SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
  326. UA_free(current);
  327. }
  328. }