123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- /* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- *
- * Copyright 2017 (c) Julius Pfrommer, Fraunhofer IOSB
- * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
- */
- #include "ua_util.h"
- #include "ua_timer.h"
- /* Only one thread operates on the repeated jobs. This is usually the "main"
- * thread with the event loop. All other threads introduce changes via a
- * multi-producer single-consumer (MPSC) queue. The queue is based on a design
- * by Dmitry Vyukov.
- * http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
- *
- * The RepeatedCallback structure is used both in the sorted list of callbacks
- * and in the MPSC changes queue. For the changes queue, we differentiate
- * between three cases encoded in the callback pointer.
- *
- * callback > 0x01: add the new repeated callback to the sorted list
- * callback == 0x00: remove the callback with the same id
- * callback == 0x01: change the interval of the existing callback */
- #define REMOVE_SENTINEL 0x00
- #define CHANGE_SENTINEL 0x01
- struct UA_TimerCallbackEntry {
- SLIST_ENTRY(UA_TimerCallbackEntry) next; /* Next element in the list */
- UA_DateTime nextTime; /* The next time when the callbacks
- * are to be executed */
- UA_UInt64 interval; /* Interval in 100ns resolution */
- UA_UInt64 id; /* Id of the repeated callback */
- UA_TimerCallback callback;
- void *data;
- };
- void
- UA_Timer_init(UA_Timer *t) {
- SLIST_INIT(&t->repeatedCallbacks);
- t->changes_head = (UA_TimerCallbackEntry*)&t->changes_stub;
- t->changes_tail = (UA_TimerCallbackEntry*)&t->changes_stub;
- t->changes_stub = NULL;
- t->idCounter = 0;
- }
- static void
- enqueueChange(UA_Timer *t, UA_TimerCallbackEntry *tc) {
- tc->next.sle_next = NULL;
- UA_TimerCallbackEntry *prev = (UA_TimerCallbackEntry*)
- UA_atomic_xchg((void * volatile *)&t->changes_head, tc);
- /* Nothing can be dequeued while the producer is blocked here */
- prev->next.sle_next = tc; /* Once this change is visible in the consumer,
- * the node is dequeued in the following
- * iteration */
- }
- static UA_TimerCallbackEntry *
- dequeueChange(UA_Timer *t) {
- UA_TimerCallbackEntry *tail = t->changes_tail;
- UA_TimerCallbackEntry *next = tail->next.sle_next;
- if(tail == (UA_TimerCallbackEntry*)&t->changes_stub) {
- if(!next)
- return NULL;
- t->changes_tail = next;
- tail = next;
- next = next->next.sle_next;
- }
- if(next) {
- t->changes_tail = next;
- return tail;
- }
- UA_TimerCallbackEntry* head = t->changes_head;
- if(tail != head)
- return NULL;
- enqueueChange(t, (UA_TimerCallbackEntry*)&t->changes_stub);
- next = tail->next.sle_next;
- if(next) {
- t->changes_tail = next;
- return tail;
- }
- return NULL;
- }
- /* Adding repeated callbacks: Add an entry with the "nextTime" timestamp in the
- * future. This will be picked up in the next iteration and inserted at the
- * correct place. So that the next execution takes place ät "nextTime". */
- UA_StatusCode
- UA_Timer_addRepeatedCallback(UA_Timer *t, UA_TimerCallback callback,
- void *data, UA_UInt32 interval,
- UA_UInt64 *callbackId) {
- /* A callback method needs to be present */
- if(!callback)
- return UA_STATUSCODE_BADINTERNALERROR;
- /* The interval needs to be at least 5ms */
- if(interval < 5)
- return UA_STATUSCODE_BADINTERNALERROR;
- /* Allocate the repeated callback structure */
- UA_TimerCallbackEntry *tc =
- (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
- if(!tc)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- /* Set the repeated callback */
- tc->interval = (UA_UInt64)interval * UA_DATETIME_MSEC;
- tc->id = ++t->idCounter;
- tc->callback = callback;
- tc->data = data;
- tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
- /* Set the output identifier */
- if(callbackId)
- *callbackId = tc->id;
- /* Enqueue the changes in the MPSC queue */
- enqueueChange(t, tc);
- return UA_STATUSCODE_GOOD;
- }
- static void
- addTimerCallbackEntry(UA_Timer *t, UA_TimerCallbackEntry * UA_RESTRICT tc) {
- /* Find the last entry before this callback */
- UA_TimerCallbackEntry *tmpTc, *afterTc = NULL;
- SLIST_FOREACH(tmpTc, &t->repeatedCallbacks, next) {
- if(tmpTc->nextTime >= tc->nextTime)
- break;
- afterTc = tmpTc;
- /* The goal is to have many repeated callbacks with the same repetition
- * interval in a "block" in order to reduce linear search for re-entry
- * to the sorted list after processing. Allow the first execution to lie
- * between "nextTime - 1s" and "nextTime" if this adjustment groups
- * callbacks with the same repetition interval. */
- if(tmpTc->interval == tc->interval &&
- tmpTc->nextTime > (tc->nextTime - UA_DATETIME_SEC))
- tc->nextTime = tmpTc->nextTime;
- }
- /* Add the repeated callback */
- if(afterTc)
- SLIST_INSERT_AFTER(afterTc, tc, next);
- else
- SLIST_INSERT_HEAD(&t->repeatedCallbacks, tc, next);
- }
- UA_StatusCode
- UA_Timer_changeRepeatedCallbackInterval(UA_Timer *t, UA_UInt64 callbackId,
- UA_UInt32 interval) {
- /* The interval needs to be at least 5ms */
- if(interval < 5)
- return UA_STATUSCODE_BADINTERNALERROR;
- /* Allocate the repeated callback structure */
- UA_TimerCallbackEntry *tc =
- (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
- if(!tc)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- /* Set the repeated callback */
- tc->interval = (UA_UInt64)interval * UA_DATETIME_MSEC;
- tc->id = callbackId;
- tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
- tc->callback = (UA_TimerCallback)CHANGE_SENTINEL;
- /* Enqueue the changes in the MPSC queue */
- enqueueChange(t, tc);
- return UA_STATUSCODE_GOOD;
- }
- static void
- changeTimerCallbackEntryInterval(UA_Timer *t, UA_UInt64 callbackId,
- UA_UInt64 interval, UA_DateTime nextTime) {
- /* Remove from the sorted list */
- UA_TimerCallbackEntry *tc, *prev = NULL;
- SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
- if(callbackId == tc->id) {
- if(prev)
- SLIST_REMOVE_AFTER(prev, next);
- else
- SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
- break;
- }
- prev = tc;
- }
- if(!tc)
- return;
- /* Adjust settings */
- tc->interval = interval;
- tc->nextTime = nextTime;
- /* Reinsert at the new position */
- addTimerCallbackEntry(t, tc);
- }
- /* Removing a repeated callback: Add an entry with the "nextTime" timestamp set
- * to UA_INT64_MAX. The next iteration picks this up and removes the repated
- * callback from the linked list. */
- UA_StatusCode
- UA_Timer_removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
- /* Allocate the repeated callback structure */
- UA_TimerCallbackEntry *tc =
- (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
- if(!tc)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- /* Set the repeated callback with the sentinel nextTime */
- tc->id = callbackId;
- tc->callback = (UA_TimerCallback)REMOVE_SENTINEL;
- /* Enqueue the changes in the MPSC queue */
- enqueueChange(t, tc);
- return UA_STATUSCODE_GOOD;
- }
- static void
- removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
- UA_TimerCallbackEntry *tc, *prev = NULL;
- SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
- if(callbackId == tc->id) {
- if(prev)
- SLIST_REMOVE_AFTER(prev, next);
- else
- SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
- UA_free(tc);
- break;
- }
- prev = tc;
- }
- }
- /* Process the changes that were added to the MPSC queue (by other threads) */
- static void
- processChanges(UA_Timer *t) {
- UA_TimerCallbackEntry *change;
- while((change = dequeueChange(t))) {
- switch((uintptr_t)change->callback) {
- case REMOVE_SENTINEL:
- removeRepeatedCallback(t, change->id);
- UA_free(change);
- break;
- case CHANGE_SENTINEL:
- changeTimerCallbackEntryInterval(t, change->id, change->interval,
- change->nextTime);
- UA_free(change);
- break;
- default:
- addTimerCallbackEntry(t, change);
- }
- }
- }
- UA_DateTime
- UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic,
- UA_TimerDispatchCallback dispatchCallback,
- void *application) {
- /* Insert and remove callbacks */
- processChanges(t);
- /* Find the last callback to be executed now */
- UA_TimerCallbackEntry *firstAfter, *lastNow = NULL;
- SLIST_FOREACH(firstAfter, &t->repeatedCallbacks, next) {
- if(firstAfter->nextTime > nowMonotonic)
- break;
- lastNow = firstAfter;
- }
- /* Nothing to do */
- if(!lastNow) {
- if(firstAfter)
- return firstAfter->nextTime;
- return UA_INT64_MAX;
- }
- /* Put the callbacks that are executed now in a separate list */
- UA_TimerCallbackList executedNowList;
- executedNowList.slh_first = SLIST_FIRST(&t->repeatedCallbacks);
- lastNow->next.sle_next = NULL;
- /* Fake entry to represent the first element in the newly-sorted list */
- UA_TimerCallbackEntry tmp_first;
- tmp_first.nextTime = nowMonotonic - 1; /* never matches for last_dispatched */
- tmp_first.next.sle_next = firstAfter;
- UA_TimerCallbackEntry *last_dispatched = &tmp_first;
- /* Iterate over the list of callbacks to process now */
- UA_TimerCallbackEntry *tc;
- while((tc = SLIST_FIRST(&executedNowList))) {
- /* Remove from the list */
- SLIST_REMOVE_HEAD(&executedNowList, next);
- /* Dispatch/process callback */
- dispatchCallback(application, tc->callback, tc->data);
- /* Set the time for the next execution. Prevent an infinite loop by
- * forcing the next processing into the next iteration. */
- tc->nextTime += (UA_Int64)tc->interval;
- if(tc->nextTime < nowMonotonic)
- tc->nextTime = nowMonotonic + 1;
- /* Find the new position for tc to keep the list sorted */
- UA_TimerCallbackEntry *prev_tc;
- if(last_dispatched->nextTime == tc->nextTime) {
- /* We try to "batch" repeatedCallbacks with the same interval. This
- * saves a linear search when the last dispatched entry has the same
- * nextTime timestamp as this entry. */
- UA_assert(last_dispatched != &tmp_first);
- prev_tc = last_dispatched;
- } else {
- /* Find the position for the next execution by a linear search
- * starting at last_dispatched or the first element */
- if(last_dispatched->nextTime < tc->nextTime)
- prev_tc = last_dispatched;
- else
- prev_tc = &tmp_first;
- while(true) {
- UA_TimerCallbackEntry *n = SLIST_NEXT(prev_tc, next);
- if(!n || n->nextTime >= tc->nextTime)
- break;
- prev_tc = n;
- }
- /* Update last_dispatched */
- last_dispatched = tc;
- }
- /* Add entry to the new position in the sorted list */
- SLIST_INSERT_AFTER(prev_tc, tc, next);
- }
- /* Set the entry-point for the newly sorted list */
- t->repeatedCallbacks.slh_first = tmp_first.next.sle_next;
- /* Re-repeat processAddRemoved since one of the callbacks might have removed
- * or added a callback. So we return a correct timeout. */
- processChanges(t);
- /* Return timestamp of next repetition */
- tc = SLIST_FIRST(&t->repeatedCallbacks);
- if(!tc)
- return UA_INT64_MAX; /* Main-loop has a max timeout / will continue earlier */
- return tc->nextTime;
- }
- void
- UA_Timer_deleteMembers(UA_Timer *t) {
- /* Process changes to empty the MPSC queue */
- processChanges(t);
- /* Remove repeated callbacks */
- UA_TimerCallbackEntry *current;
- while((current = SLIST_FIRST(&t->repeatedCallbacks))) {
- SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
- UA_free(current);
- }
- }
|