ソースを参照

use proper multi-producer single-consumer queue for repeated job changes

Julius Pfrommer 7 年 前
コミット
de0c70759d
共有2 個のファイルを変更した92 個の追加109 個の削除を含む
  1. 85 106
      src/ua_timer.c
  2. 7 3
      src/ua_timer.h

+ 85 - 106
src/ua_timer.c

@@ -5,21 +5,11 @@
 #include "ua_util.h"
 #include "ua_timer.h"
 
-/* Only one thread may traverse the lists. This is usually the "main" thread
- * with the event loop. All other threads may add and remove repeated jobs by
- * adding entries to the beginning of the addRemoveJobs list (with atomic
- * operations).
- *
- * Adding repeated jobs: Add an entry with the "nextTime" timestamp in the
- * future. This will be picked up in the next traversal and inserted at the
- * correct place. So that the next execution takes place ät "nextTime".
- *
- * Removing a repeated job: Add an entry with the "nextTime" timestamp set to
- * UA_INT64_MAX. The next iteration picks this up and removes the repated job
- * from the linked list. */
-
-struct UA_RepeatedJob;
-typedef struct UA_RepeatedJob UA_RepeatedJob;
+/* Only one thread operates on the repeated jobs. This is usually the "main"
+ * thread with the event loop. All other threads may add changes to the repeated
+ * jobs to a multi-producer single-consumer queue. The queue is based on a
+ * design by Dmitry Vyukov.
+ * http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue */
 
 struct UA_RepeatedJob {
     SLIST_ENTRY(UA_RepeatedJob) next; /* Next element in the list */
@@ -34,11 +24,53 @@ UA_RepeatedJobsList_init(UA_RepeatedJobsList *rjl,
                          UA_RepeatedJobsListProcessCallback processCallback,
                          void *processContext) {
     SLIST_INIT(&rjl->repeatedJobs);
-    SLIST_INIT(&rjl->addRemoveJobs);
+    rjl->changes_head = (UA_RepeatedJob*)&rjl->changes_stub;
+    rjl->changes_tail = (UA_RepeatedJob*)&rjl->changes_stub;
+    rjl->changes_stub = NULL;
     rjl->processCallback = processCallback;
     rjl->processContext = processContext;
 }
 
+static void
+enqueueChange(UA_RepeatedJobsList *rjl, UA_RepeatedJob *rj) {
+    rj->next.sle_next = NULL;
+    UA_RepeatedJob *prev = UA_atomic_xchg((void* volatile *)&rjl->changes_head, rj);
+    /* Nothing can be dequeued while the producer is blocked here */
+    prev->next.sle_next = rj; /* Once this change is visible in the consumer,
+                               * the node is dequeued in the following
+                               * iteration */
+}
+
+static UA_RepeatedJob *
+dequeueChange(UA_RepeatedJobsList *rjl) {
+    UA_RepeatedJob *tail = rjl->changes_tail;
+    UA_RepeatedJob *next = tail->next.sle_next;
+    if(tail == (UA_RepeatedJob*)&rjl->changes_stub) {
+        if(!next)
+            return NULL;
+        rjl->changes_tail = next;
+        tail = next;
+        next = next->next.sle_next;
+    }
+    if(next) {
+        rjl->changes_tail = next;
+        return tail;
+    }
+    UA_RepeatedJob* head = rjl->changes_head;
+    if(tail != head)
+        return NULL;
+    enqueueChange(rjl, (UA_RepeatedJob*)&rjl->changes_stub);
+    next = tail->next.sle_next;
+    if(next) {
+        rjl->changes_tail = next;
+        return tail;
+    }
+    return NULL;
+}
+
+/* Adding repeated jobs: Add an entry with the "nextTime" timestamp in the
+ * future. This will be picked up in the next iteration and inserted at the
+ * correct place. So that the next execution takes place ät "nextTime". */
 UA_StatusCode
 UA_RepeatedJobsList_addRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Job job,
                                    const UA_UInt32 interval, UA_Guid *jobId) {
@@ -61,41 +93,15 @@ UA_RepeatedJobsList_addRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Job job,
     if(jobId)
         *jobId = rj->id;
 
-    /* Insert the element to the linked list */
-    UA_RepeatedJob *currentFirst;
-    do {
-        currentFirst = SLIST_FIRST(&rjl->addRemoveJobs);
-        SLIST_NEXT(rj, next) = currentFirst;
-    } while(UA_atomic_cmpxchg((void**)&SLIST_FIRST(&rjl->addRemoveJobs), currentFirst, rj) != currentFirst);
-
-    return UA_STATUSCODE_GOOD;
-}
-
-UA_StatusCode
-UA_RepeatedJobsList_removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid jobId) {
-    /* Allocate the repeated job structure */
-    UA_RepeatedJob *rj = (UA_RepeatedJob*)UA_malloc(sizeof(UA_RepeatedJob));
-    if(!rj)
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-
-    /* Set the repeated job with the sentinel nextTime */
-    rj->id = jobId;
-    rj->nextTime = UA_INT64_MAX;
-
-    /* Insert the element to the linked list */
-    UA_RepeatedJob *currentFirst;
-    do {
-        currentFirst = SLIST_FIRST(&rjl->addRemoveJobs);
-        SLIST_NEXT(rj, next) = currentFirst;
-    } while(UA_atomic_cmpxchg((void**)&SLIST_FIRST(&rjl->addRemoveJobs), currentFirst, rj) != currentFirst);
-
+    /* Enqueue the changes in the MPSC queue */
+    enqueueChange(rjl, rj);
     return UA_STATUSCODE_GOOD;
 }
 
 static void
-insertRepeatedJob(UA_RepeatedJobsList *rjl,
-                  UA_RepeatedJob * UA_RESTRICT rj,
-                  UA_DateTime nowMonotonic) {
+addRepeatedJob(UA_RepeatedJobsList *rjl,
+               UA_RepeatedJob * UA_RESTRICT rj,
+               UA_DateTime nowMonotonic) {
     /* The latest time for the first execution */
     rj->nextTime = nowMonotonic + (UA_Int64)rj->interval;
 
@@ -123,6 +129,25 @@ insertRepeatedJob(UA_RepeatedJobsList *rjl,
         SLIST_INSERT_HEAD(&rjl->repeatedJobs, rj, next);
 }
 
+/* Removing a repeated job: Add an entry with the "nextTime" timestamp set to
+ * UA_INT64_MAX. The next iteration picks this up and removes the repated job
+ * from the linked list. */
+UA_StatusCode
+UA_RepeatedJobsList_removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid jobId) {
+    /* Allocate the repeated job structure */
+    UA_RepeatedJob *rj = (UA_RepeatedJob*)UA_malloc(sizeof(UA_RepeatedJob));
+    if(!rj)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+
+    /* Set the repeated job with the sentinel nextTime */
+    rj->id = jobId;
+    rj->nextTime = UA_INT64_MAX;
+
+    /* Enqueue the changes in the MPSC queue */
+    enqueueChange(rjl, rj);
+    return UA_STATUSCODE_GOOD;
+}
+
 static void
 removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid *jobId) {
     UA_RepeatedJob *rj, *prev = NULL;
@@ -139,75 +164,25 @@ removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid *jobId) {
     }
 }
 
-#define UA_REVERSE_ARRAY_SIZE 50
-
-/* Changes to the jobs are stored in reverse order in a singly-linked list. That
- * is necessary, so that atomic operations can be used to add changes to the
- * list. But the changes need to be processed in the original order to avoid
- * artefacts. For example, adding and removing a job cannot be executed in
- * reverse order without changing the outcome. */
 static void
-processAddRemoveJobsReverse(UA_RepeatedJobsList *rjl,
-                            struct memberstruct(UA_RepeatedJobsList,RepeatedJobsSList) *addRemoveJobs,
-                            size_t skip, size_t count,UA_DateTime nowMonotonic) {
-    /* Skip jobs to process only the "count" last entries */
-    UA_RepeatedJob *current;
-    size_t skipped = 0;
-    SLIST_FOREACH(current, addRemoveJobs, next) {
-        if(skipped >= skip)
-            break;
-        skipped++;
-    }
-
-    /* Add job pointers to a stack-allocated list */
-    UA_RepeatedJob **list = (UA_RepeatedJob**)UA_alloca(sizeof(void*) * count);
-    for(size_t i = 0; i < count; i++) {
-        list[i] = current;
-        current = SLIST_NEXT(current, next);
-    }
-
-    /* Process list in reverse */
-    for(size_t i = 1; i <= count; i++) {
-        current = list[count-i];
-        if(current->nextTime < UA_INT64_MAX) {
-            insertRepeatedJob(rjl, current, nowMonotonic);
+processChanges(UA_RepeatedJobsList *rjl, UA_DateTime nowMonotonic) {
+    UA_RepeatedJob *change;
+    while((change = dequeueChange(rjl))) {
+        if(change->nextTime < UA_INT64_MAX) {
+            addRepeatedJob(rjl, change, nowMonotonic);
         } else {
-            removeRepeatedJob(rjl, &current->id);
-            UA_free(current);
+            removeRepeatedJob(rjl, &change->id);
+            UA_free(change);
         }
     }
 }
 
-static void
-processAddRemoveJobs(UA_RepeatedJobsList *rjl, UA_DateTime nowMonotonic) {
-    /* Unlink the list of changes */
-    struct memberstruct(UA_RepeatedJobsList,RepeatedJobsSList) addRemoveJobs;
-    addRemoveJobs.slh_first = UA_atomic_xchg((void**)&rjl->addRemoveJobs.slh_first, NULL);
-    if(!addRemoveJobs.slh_first)
-        return;
-
-    /* Count the number of changes */
-    size_t count = 0;
-    UA_RepeatedJob *current;
-    SLIST_FOREACH(current, &addRemoveJobs, next)
-        count++;
-
-    /* Process changes in the right order in batches of UA_REVERSE_ARRAY_SIZE. We
-     * cannot put all entries into a single array on the stack, since there may
-     * be 1000s of changes and the stack has limited size. */
-    while(count > UA_REVERSE_ARRAY_SIZE) {
-        count -= UA_REVERSE_ARRAY_SIZE; /* skip this much */
-        processAddRemoveJobsReverse(rjl, &addRemoveJobs, count, UA_REVERSE_ARRAY_SIZE, nowMonotonic);
-    }
-    processAddRemoveJobsReverse(rjl, &addRemoveJobs, 0, count, nowMonotonic);
-}
-
 UA_DateTime
 UA_RepeatedJobsList_process(UA_RepeatedJobsList *rjl,
                             UA_DateTime nowMonotonic,
                             UA_Boolean *dispatched) {
     /* Insert and remove jobs */
-    processAddRemoveJobs(rjl, nowMonotonic);
+    processChanges(rjl, nowMonotonic);
 
     /* Find the last job to be executed now */
     UA_RepeatedJob *firstAfter, *lastNow = NULL;
@@ -282,7 +257,7 @@ UA_RepeatedJobsList_process(UA_RepeatedJobsList *rjl,
 
     /* Re-repeat processAddRemoved since one of the jobs might have removed or
      * added a job. So we get the returned timeout right. */
-    processAddRemoveJobs(rjl, nowMonotonic);
+    processChanges(rjl, nowMonotonic);
 
     /* Return timestamp of next repetition */
     return SLIST_FIRST(&rjl->repeatedJobs)->nextTime;
@@ -290,6 +265,10 @@ UA_RepeatedJobsList_process(UA_RepeatedJobsList *rjl,
 
 void
 UA_RepeatedJobsList_deleteMembers(UA_RepeatedJobsList *rjl) {
+    /* Process changes to empty the queue */
+    processChanges(rjl, 0);
+
+    /* Remove repeated jobs */
     UA_RepeatedJob *current;
     while((current = SLIST_FIRST(&rjl->repeatedJobs))) {
         SLIST_REMOVE_HEAD(&rjl->repeatedJobs, next);

+ 7 - 3
src/ua_timer.h

@@ -15,13 +15,17 @@ extern "C" {
 typedef void
 (*UA_RepeatedJobsListProcessCallback)(void *processContext, UA_Job *job);
 
+struct UA_RepeatedJob;
+typedef struct UA_RepeatedJob UA_RepeatedJob;
+
 typedef struct {
     /* The linked list of jobs is sorted according to the execution timestamp. */
     SLIST_HEAD(RepeatedJobsSList, UA_RepeatedJob) repeatedJobs;
 
-    /* Repeated jobs that shall be added or removed from the sorted list (with
-     * atomic operations) */
-    SLIST_HEAD(RepeatedJobsSList2, UA_RepeatedJob) addRemoveJobs;
+    /* Changes to the repeated jobs in a multi-producer single-consumer queue */
+    UA_RepeatedJob * volatile changes_head;
+    UA_RepeatedJob *changes_tail;
+    UA_RepeatedJob *changes_stub;
 
     /* The callback to process jobs that have timed out */
     UA_RepeatedJobsListProcessCallback processCallback;