123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- /* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
- #include "ua_util.h"
- #include "ua_timer.h"
- /* Only one thread operates on the repeated jobs. This is usually the "main"
- * thread with the event loop. All other threads may add changes to the repeated
- * jobs to a multi-producer single-consumer queue. The queue is based on a
- * design by Dmitry Vyukov.
- * http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue */
- struct UA_RepeatedJob {
- SLIST_ENTRY(UA_RepeatedJob) next; /* Next element in the list */
- UA_DateTime nextTime; /* The next time when the jobs are to be executed */
- UA_UInt64 interval; /* Interval in 100ns resolution */
- UA_Guid id; /* Id of the repeated job */
- UA_Job job; /* The job description itself */
- };
- void
- UA_RepeatedJobsList_init(UA_RepeatedJobsList *rjl,
- UA_RepeatedJobsListProcessCallback processCallback,
- void *processContext) {
- SLIST_INIT(&rjl->repeatedJobs);
- rjl->changes_head = (UA_RepeatedJob*)&rjl->changes_stub;
- rjl->changes_tail = (UA_RepeatedJob*)&rjl->changes_stub;
- rjl->changes_stub = NULL;
- rjl->processCallback = processCallback;
- rjl->processContext = processContext;
- }
- static void
- enqueueChange(UA_RepeatedJobsList *rjl, UA_RepeatedJob *rj) {
- rj->next.sle_next = NULL;
- UA_RepeatedJob *prev = UA_atomic_xchg((void* volatile *)&rjl->changes_head, rj);
- /* Nothing can be dequeued while the producer is blocked here */
- prev->next.sle_next = rj; /* Once this change is visible in the consumer,
- * the node is dequeued in the following
- * iteration */
- }
- static UA_RepeatedJob *
- dequeueChange(UA_RepeatedJobsList *rjl) {
- UA_RepeatedJob *tail = rjl->changes_tail;
- UA_RepeatedJob *next = tail->next.sle_next;
- if(tail == (UA_RepeatedJob*)&rjl->changes_stub) {
- if(!next)
- return NULL;
- rjl->changes_tail = next;
- tail = next;
- next = next->next.sle_next;
- }
- if(next) {
- rjl->changes_tail = next;
- return tail;
- }
- UA_RepeatedJob* head = rjl->changes_head;
- if(tail != head)
- return NULL;
- enqueueChange(rjl, (UA_RepeatedJob*)&rjl->changes_stub);
- next = tail->next.sle_next;
- if(next) {
- rjl->changes_tail = next;
- return tail;
- }
- return NULL;
- }
- /* Adding repeated jobs: Add an entry with the "nextTime" timestamp in the
- * future. This will be picked up in the next iteration and inserted at the
- * correct place. So that the next execution takes place ät "nextTime". */
- UA_StatusCode
- UA_RepeatedJobsList_addRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Job job,
- const UA_UInt32 interval, UA_Guid *jobId) {
- /* The interval needs to be at least 5ms */
- if(interval < 5)
- return UA_STATUSCODE_BADINTERNALERROR;
- /* Allocate the repeated job structure */
- UA_RepeatedJob *rj = (UA_RepeatedJob*)UA_malloc(sizeof(UA_RepeatedJob));
- if(!rj)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- /* Set the repeated job */
- rj->interval = (UA_UInt64)interval * (UA_UInt64)UA_MSEC_TO_DATETIME;
- rj->id = UA_Guid_random();
- rj->job = job;
- rj->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)rj->interval;
- /* Set the output guid */
- if(jobId)
- *jobId = rj->id;
- /* Enqueue the changes in the MPSC queue */
- enqueueChange(rjl, rj);
- return UA_STATUSCODE_GOOD;
- }
- static void
- addRepeatedJob(UA_RepeatedJobsList *rjl,
- UA_RepeatedJob * UA_RESTRICT rj,
- UA_DateTime nowMonotonic) {
- /* The latest time for the first execution */
- rj->nextTime = nowMonotonic + (UA_Int64)rj->interval;
- /* Find the last entry before this job */
- UA_RepeatedJob *tmpRj, *afterRj = NULL;
- SLIST_FOREACH(tmpRj, &rjl->repeatedJobs, next) {
- if(tmpRj->nextTime >= rj->nextTime)
- break;
- afterRj = tmpRj;
- /* The goal is to have many repeated jobs with the same repetition
- * interval in a "block" in order to reduce linear search for re-entry
- * to the sorted list after processing. Allow the first execution to lie
- * between "nextTime - 1s" and "nextTime" if this adjustment groups jobs
- * with the same repetition interval. */
- if(tmpRj->interval == rj->interval &&
- tmpRj->nextTime > (rj->nextTime - UA_SEC_TO_DATETIME))
- rj->nextTime = tmpRj->nextTime;
- }
- /* Add the repeated job */
- if(afterRj)
- SLIST_INSERT_AFTER(afterRj, rj, next);
- else
- SLIST_INSERT_HEAD(&rjl->repeatedJobs, rj, next);
- }
- /* Removing a repeated job: Add an entry with the "nextTime" timestamp set to
- * UA_INT64_MAX. The next iteration picks this up and removes the repated job
- * from the linked list. */
- UA_StatusCode
- UA_RepeatedJobsList_removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid jobId) {
- /* Allocate the repeated job structure */
- UA_RepeatedJob *rj = (UA_RepeatedJob*)UA_malloc(sizeof(UA_RepeatedJob));
- if(!rj)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- /* Set the repeated job with the sentinel nextTime */
- rj->id = jobId;
- rj->nextTime = UA_INT64_MAX;
- /* Enqueue the changes in the MPSC queue */
- enqueueChange(rjl, rj);
- return UA_STATUSCODE_GOOD;
- }
- static void
- removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid *jobId) {
- UA_RepeatedJob *rj, *prev = NULL;
- SLIST_FOREACH(rj, &rjl->repeatedJobs, next) {
- if(UA_Guid_equal(jobId, &rj->id)) {
- if(prev)
- SLIST_REMOVE_AFTER(prev, next);
- else
- SLIST_REMOVE_HEAD(&rjl->repeatedJobs, next);
- UA_free(rj);
- break;
- }
- prev = rj;
- }
- }
- static void
- processChanges(UA_RepeatedJobsList *rjl, UA_DateTime nowMonotonic) {
- UA_RepeatedJob *change;
- while((change = dequeueChange(rjl))) {
- if(change->nextTime < UA_INT64_MAX) {
- addRepeatedJob(rjl, change, nowMonotonic);
- } else {
- removeRepeatedJob(rjl, &change->id);
- UA_free(change);
- }
- }
- }
- UA_DateTime
- UA_RepeatedJobsList_process(UA_RepeatedJobsList *rjl,
- UA_DateTime nowMonotonic,
- UA_Boolean *dispatched) {
- /* Insert and remove jobs */
- processChanges(rjl, nowMonotonic);
- /* Find the last job to be executed now */
- UA_RepeatedJob *firstAfter, *lastNow = NULL;
- SLIST_FOREACH(firstAfter, &rjl->repeatedJobs, next) {
- if(firstAfter->nextTime > nowMonotonic)
- break;
- lastNow = firstAfter;
- }
- /* Nothing to do */
- if(!lastNow) {
- if(firstAfter)
- return firstAfter->nextTime;
- return UA_INT64_MAX;
- }
- /* Put the jobs that are executed now in a separate list */
- struct memberstruct(UA_RepeatedJobsList,RepeatedJobsSList) executedNowList;
- executedNowList.slh_first = SLIST_FIRST(&rjl->repeatedJobs);
- lastNow->next.sle_next = NULL;
- /* Fake entry to represent the first element in the newly-sorted list */
- UA_RepeatedJob tmp_first;
- tmp_first.nextTime = nowMonotonic - 1; /* never matches for last_dispatched */
- tmp_first.next.sle_next = firstAfter;
- UA_RepeatedJob *last_dispatched = &tmp_first;
- /* Iterate over the list of jobs to process now */
- UA_RepeatedJob *rj;
- while((rj = SLIST_FIRST(&executedNowList))) {
- /* Remove from the list */
- SLIST_REMOVE_HEAD(&executedNowList, next);
- /* Dispatch/process job */
- rjl->processCallback(rjl->processContext, &rj->job);
- *dispatched = true;
- /* Set the time for the next execution. Prevent an infinite loop by
- * forcing the next processing into the next iteration. */
- rj->nextTime += (UA_Int64)rj->interval;
- if(rj->nextTime < nowMonotonic)
- rj->nextTime = nowMonotonic + 1;
- /* Find the new position for rj to keep the list sorted */
- UA_RepeatedJob *prev_rj;
- if(last_dispatched->nextTime == rj->nextTime) {
- /* We "batch" repeatedJobs with the same interval in
- * addRepeatedJobs. So this might occur quite often. */
- UA_assert(last_dispatched != &tmp_first);
- prev_rj = last_dispatched;
- } else {
- /* Find the position for the next execution by a linear search
- * starting at the first possible job */
- prev_rj = &tmp_first;
- while(true) {
- UA_RepeatedJob *n = SLIST_NEXT(prev_rj, next);
- if(!n || n->nextTime >= rj->nextTime)
- break;
- prev_rj = n;
- }
- /* Update last_dispatched */
- last_dispatched = rj;
- }
- /* Add entry to the new position in the sorted list */
- SLIST_INSERT_AFTER(prev_rj, rj, next);
- }
- /* Set the entry-point for the newly sorted list */
- rjl->repeatedJobs.slh_first = tmp_first.next.sle_next;
- /* Re-repeat processAddRemoved since one of the jobs might have removed or
- * added a job. So we get the returned timeout right. */
- processChanges(rjl, nowMonotonic);
- /* Return timestamp of next repetition */
- return SLIST_FIRST(&rjl->repeatedJobs)->nextTime;
- }
- void
- UA_RepeatedJobsList_deleteMembers(UA_RepeatedJobsList *rjl) {
- /* Process changes to empty the queue */
- processChanges(rjl, 0);
- /* Remove repeated jobs */
- UA_RepeatedJob *current;
- while((current = SLIST_FIRST(&rjl->repeatedJobs))) {
- SLIST_REMOVE_HEAD(&rjl->repeatedJobs, next);
- UA_free(current);
- }
- }
|