ua_timer.c 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. #include "ua_util.h"
  5. #include "ua_timer.h"
  6. /* Only one thread operates on the repeated jobs. This is usually the "main"
  7. * thread with the event loop. All other threads may add changes to the repeated
  8. * jobs to a multi-producer single-consumer queue. The queue is based on a
  9. * design by Dmitry Vyukov.
  10. * http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue */
  11. struct UA_RepeatedJob {
  12. SLIST_ENTRY(UA_RepeatedJob) next; /* Next element in the list */
  13. UA_DateTime nextTime; /* The next time when the jobs are to be executed */
  14. UA_UInt64 interval; /* Interval in 100ns resolution */
  15. UA_Guid id; /* Id of the repeated job */
  16. UA_Job job; /* The job description itself */
  17. };
  18. void
  19. UA_RepeatedJobsList_init(UA_RepeatedJobsList *rjl,
  20. UA_RepeatedJobsListProcessCallback processCallback,
  21. void *processContext) {
  22. SLIST_INIT(&rjl->repeatedJobs);
  23. rjl->changes_head = (UA_RepeatedJob*)&rjl->changes_stub;
  24. rjl->changes_tail = (UA_RepeatedJob*)&rjl->changes_stub;
  25. rjl->changes_stub = NULL;
  26. rjl->processCallback = processCallback;
  27. rjl->processContext = processContext;
  28. }
  29. static void
  30. enqueueChange(UA_RepeatedJobsList *rjl, UA_RepeatedJob *rj) {
  31. rj->next.sle_next = NULL;
  32. UA_RepeatedJob *prev = UA_atomic_xchg((void* volatile *)&rjl->changes_head, rj);
  33. /* Nothing can be dequeued while the producer is blocked here */
  34. prev->next.sle_next = rj; /* Once this change is visible in the consumer,
  35. * the node is dequeued in the following
  36. * iteration */
  37. }
  38. static UA_RepeatedJob *
  39. dequeueChange(UA_RepeatedJobsList *rjl) {
  40. UA_RepeatedJob *tail = rjl->changes_tail;
  41. UA_RepeatedJob *next = tail->next.sle_next;
  42. if(tail == (UA_RepeatedJob*)&rjl->changes_stub) {
  43. if(!next)
  44. return NULL;
  45. rjl->changes_tail = next;
  46. tail = next;
  47. next = next->next.sle_next;
  48. }
  49. if(next) {
  50. rjl->changes_tail = next;
  51. return tail;
  52. }
  53. UA_RepeatedJob* head = rjl->changes_head;
  54. if(tail != head)
  55. return NULL;
  56. enqueueChange(rjl, (UA_RepeatedJob*)&rjl->changes_stub);
  57. next = tail->next.sle_next;
  58. if(next) {
  59. rjl->changes_tail = next;
  60. return tail;
  61. }
  62. return NULL;
  63. }
  64. /* Adding repeated jobs: Add an entry with the "nextTime" timestamp in the
  65. * future. This will be picked up in the next iteration and inserted at the
  66. * correct place. So that the next execution takes place ät "nextTime". */
  67. UA_StatusCode
  68. UA_RepeatedJobsList_addRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Job job,
  69. const UA_UInt32 interval, UA_Guid *jobId) {
  70. /* The interval needs to be at least 5ms */
  71. if(interval < 5)
  72. return UA_STATUSCODE_BADINTERNALERROR;
  73. /* Allocate the repeated job structure */
  74. UA_RepeatedJob *rj = (UA_RepeatedJob*)UA_malloc(sizeof(UA_RepeatedJob));
  75. if(!rj)
  76. return UA_STATUSCODE_BADOUTOFMEMORY;
  77. /* Set the repeated job */
  78. rj->interval = (UA_UInt64)interval * (UA_UInt64)UA_MSEC_TO_DATETIME;
  79. rj->id = UA_Guid_random();
  80. rj->job = job;
  81. rj->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)rj->interval;
  82. /* Set the output guid */
  83. if(jobId)
  84. *jobId = rj->id;
  85. /* Enqueue the changes in the MPSC queue */
  86. enqueueChange(rjl, rj);
  87. return UA_STATUSCODE_GOOD;
  88. }
  89. static void
  90. addRepeatedJob(UA_RepeatedJobsList *rjl,
  91. UA_RepeatedJob * UA_RESTRICT rj,
  92. UA_DateTime nowMonotonic) {
  93. /* The latest time for the first execution */
  94. rj->nextTime = nowMonotonic + (UA_Int64)rj->interval;
  95. /* Find the last entry before this job */
  96. UA_RepeatedJob *tmpRj, *afterRj = NULL;
  97. SLIST_FOREACH(tmpRj, &rjl->repeatedJobs, next) {
  98. if(tmpRj->nextTime >= rj->nextTime)
  99. break;
  100. afterRj = tmpRj;
  101. /* The goal is to have many repeated jobs with the same repetition
  102. * interval in a "block" in order to reduce linear search for re-entry
  103. * to the sorted list after processing. Allow the first execution to lie
  104. * between "nextTime - 1s" and "nextTime" if this adjustment groups jobs
  105. * with the same repetition interval. */
  106. if(tmpRj->interval == rj->interval &&
  107. tmpRj->nextTime > (rj->nextTime - UA_SEC_TO_DATETIME))
  108. rj->nextTime = tmpRj->nextTime;
  109. }
  110. /* Add the repeated job */
  111. if(afterRj)
  112. SLIST_INSERT_AFTER(afterRj, rj, next);
  113. else
  114. SLIST_INSERT_HEAD(&rjl->repeatedJobs, rj, next);
  115. }
  116. /* Removing a repeated job: Add an entry with the "nextTime" timestamp set to
  117. * UA_INT64_MAX. The next iteration picks this up and removes the repated job
  118. * from the linked list. */
  119. UA_StatusCode
  120. UA_RepeatedJobsList_removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid jobId) {
  121. /* Allocate the repeated job structure */
  122. UA_RepeatedJob *rj = (UA_RepeatedJob*)UA_malloc(sizeof(UA_RepeatedJob));
  123. if(!rj)
  124. return UA_STATUSCODE_BADOUTOFMEMORY;
  125. /* Set the repeated job with the sentinel nextTime */
  126. rj->id = jobId;
  127. rj->nextTime = UA_INT64_MAX;
  128. /* Enqueue the changes in the MPSC queue */
  129. enqueueChange(rjl, rj);
  130. return UA_STATUSCODE_GOOD;
  131. }
  132. static void
  133. removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid *jobId) {
  134. UA_RepeatedJob *rj, *prev = NULL;
  135. SLIST_FOREACH(rj, &rjl->repeatedJobs, next) {
  136. if(UA_Guid_equal(jobId, &rj->id)) {
  137. if(prev)
  138. SLIST_REMOVE_AFTER(prev, next);
  139. else
  140. SLIST_REMOVE_HEAD(&rjl->repeatedJobs, next);
  141. UA_free(rj);
  142. break;
  143. }
  144. prev = rj;
  145. }
  146. }
  147. static void
  148. processChanges(UA_RepeatedJobsList *rjl, UA_DateTime nowMonotonic) {
  149. UA_RepeatedJob *change;
  150. while((change = dequeueChange(rjl))) {
  151. if(change->nextTime < UA_INT64_MAX) {
  152. addRepeatedJob(rjl, change, nowMonotonic);
  153. } else {
  154. removeRepeatedJob(rjl, &change->id);
  155. UA_free(change);
  156. }
  157. }
  158. }
  159. UA_DateTime
  160. UA_RepeatedJobsList_process(UA_RepeatedJobsList *rjl,
  161. UA_DateTime nowMonotonic,
  162. UA_Boolean *dispatched) {
  163. /* Insert and remove jobs */
  164. processChanges(rjl, nowMonotonic);
  165. /* Find the last job to be executed now */
  166. UA_RepeatedJob *firstAfter, *lastNow = NULL;
  167. SLIST_FOREACH(firstAfter, &rjl->repeatedJobs, next) {
  168. if(firstAfter->nextTime > nowMonotonic)
  169. break;
  170. lastNow = firstAfter;
  171. }
  172. /* Nothing to do */
  173. if(!lastNow) {
  174. if(firstAfter)
  175. return firstAfter->nextTime;
  176. return UA_INT64_MAX;
  177. }
  178. /* Put the jobs that are executed now in a separate list */
  179. struct memberstruct(UA_RepeatedJobsList,RepeatedJobsSList) executedNowList;
  180. executedNowList.slh_first = SLIST_FIRST(&rjl->repeatedJobs);
  181. lastNow->next.sle_next = NULL;
  182. /* Fake entry to represent the first element in the newly-sorted list */
  183. UA_RepeatedJob tmp_first;
  184. tmp_first.nextTime = nowMonotonic - 1; /* never matches for last_dispatched */
  185. tmp_first.next.sle_next = firstAfter;
  186. UA_RepeatedJob *last_dispatched = &tmp_first;
  187. /* Iterate over the list of jobs to process now */
  188. UA_RepeatedJob *rj;
  189. while((rj = SLIST_FIRST(&executedNowList))) {
  190. /* Remove from the list */
  191. SLIST_REMOVE_HEAD(&executedNowList, next);
  192. /* Dispatch/process job */
  193. rjl->processCallback(rjl->processContext, &rj->job);
  194. *dispatched = true;
  195. /* Set the time for the next execution. Prevent an infinite loop by
  196. * forcing the next processing into the next iteration. */
  197. rj->nextTime += (UA_Int64)rj->interval;
  198. if(rj->nextTime < nowMonotonic)
  199. rj->nextTime = nowMonotonic + 1;
  200. /* Find the new position for rj to keep the list sorted */
  201. UA_RepeatedJob *prev_rj;
  202. if(last_dispatched->nextTime == rj->nextTime) {
  203. /* We "batch" repeatedJobs with the same interval in
  204. * addRepeatedJobs. So this might occur quite often. */
  205. UA_assert(last_dispatched != &tmp_first);
  206. prev_rj = last_dispatched;
  207. } else {
  208. /* Find the position for the next execution by a linear search
  209. * starting at the first possible job */
  210. prev_rj = &tmp_first;
  211. while(true) {
  212. UA_RepeatedJob *n = SLIST_NEXT(prev_rj, next);
  213. if(!n || n->nextTime >= rj->nextTime)
  214. break;
  215. prev_rj = n;
  216. }
  217. /* Update last_dispatched */
  218. last_dispatched = rj;
  219. }
  220. /* Add entry to the new position in the sorted list */
  221. SLIST_INSERT_AFTER(prev_rj, rj, next);
  222. }
  223. /* Set the entry-point for the newly sorted list */
  224. rjl->repeatedJobs.slh_first = tmp_first.next.sle_next;
  225. /* Re-repeat processAddRemoved since one of the jobs might have removed or
  226. * added a job. So we get the returned timeout right. */
  227. processChanges(rjl, nowMonotonic);
  228. /* Return timestamp of next repetition */
  229. return SLIST_FIRST(&rjl->repeatedJobs)->nextTime;
  230. }
  231. void
  232. UA_RepeatedJobsList_deleteMembers(UA_RepeatedJobsList *rjl) {
  233. /* Process changes to empty the queue */
  234. processChanges(rjl, 0);
  235. /* Remove repeated jobs */
  236. UA_RepeatedJob *current;
  237. while((current = SLIST_FIRST(&rjl->repeatedJobs))) {
  238. SLIST_REMOVE_HEAD(&rjl->repeatedJobs, next);
  239. UA_free(current);
  240. }
  241. }