Explorar el Código

process changes to the repeated jobs in the right order

Julius Pfrommer hace 7 años
padre
commit
5eecf41e25
Se han modificado 2 ficheros con 60 adiciones y 5 borrados
  1. 52 3
      src/ua_timer.c
  2. 8 2
      src/ua_timer.h

+ 52 - 3
src/ua_timer.c

@@ -139,11 +139,36 @@ removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid *jobId) {
     }
 }
 
+#define UA_REVERSE_ARRAY_SIZE 50
+
+/* Changes to the jobs are stored in reverse order in a singly-linked list. That
+ * is necessary, so that atomic operations can be used to add changes to the
+ * list. But the changes need to be processed in the original order to avoid
+ * artefacts. For example, adding and removing a job cannot be executed in
+ * reverse order without changing the outcome. */
 static void
-processAddRemoveJobs(UA_RepeatedJobsList *rjl, UA_DateTime nowMonotonic) {
+processAddRemoveJobsReverse(UA_RepeatedJobsList *rjl,
+                            struct memberstruct(UA_RepeatedJobsList,RepeatedJobsSList) *addRemoveJobs,
+                            size_t skip, size_t count,UA_DateTime nowMonotonic) {
+    /* Skip jobs to process only the "count" last entries */
     UA_RepeatedJob *current;
-    while((current = SLIST_FIRST(&rjl->addRemoveJobs))) {
-        SLIST_REMOVE_HEAD(&rjl->addRemoveJobs, next);
+    size_t skipped = 0;
+    SLIST_FOREACH(current, addRemoveJobs, next) {
+        if(skipped >= skip)
+            break;
+        skipped++;
+    }
+
+    /* Add job pointers to a stack-allocated list */
+    UA_RepeatedJob **list = (UA_RepeatedJob**)UA_alloca(sizeof(void*) * count);
+    for(size_t i = 0; i < count; i++) {
+        list[i] = current;
+        current = SLIST_NEXT(current, next);
+    }
+
+    /* Process list in reverse */
+    for(size_t i = 1; i <= count; i++) {
+        current = list[count-i];
         if(current->nextTime < UA_INT64_MAX) {
             insertRepeatedJob(rjl, current, nowMonotonic);
         } else {
@@ -153,6 +178,30 @@ processAddRemoveJobs(UA_RepeatedJobsList *rjl, UA_DateTime nowMonotonic) {
     }
 }
 
+static void
+processAddRemoveJobs(UA_RepeatedJobsList *rjl, UA_DateTime nowMonotonic) {
+    /* Unlink the list of changes */
+    struct memberstruct(UA_RepeatedJobsList,RepeatedJobsSList) addRemoveJobs;
+    addRemoveJobs.slh_first = UA_atomic_xchg((void**)&rjl->addRemoveJobs.slh_first, NULL);
+    if(!addRemoveJobs.slh_first)
+        return;
+
+    /* Count the number of changes */
+    size_t count = 0;
+    UA_RepeatedJob *current;
+    SLIST_FOREACH(current, &addRemoveJobs, next)
+        count++;
+
+    /* Process changes in the right order in batches of UA_REVERSE_ARRAY_SIZE. We
+     * cannot put all entries into a single array on the stack, since there may
+     * be 1000s of changes and the stack has limited size. */
+    while(count > UA_REVERSE_ARRAY_SIZE) {
+        count -= UA_REVERSE_ARRAY_SIZE; /* skip this much */
+        processAddRemoveJobsReverse(rjl, &addRemoveJobs, count, UA_REVERSE_ARRAY_SIZE, nowMonotonic);
+    }
+    processAddRemoveJobsReverse(rjl, &addRemoveJobs, 0, count, nowMonotonic);
+}
+
 UA_DateTime
 UA_RepeatedJobsList_process(UA_RepeatedJobsList *rjl,
                             UA_DateTime nowMonotonic,

+ 8 - 2
src/ua_timer.h

@@ -28,24 +28,30 @@ typedef struct {
     void *processContext;
 } UA_RepeatedJobsList;
 
+/* Initialize the RepeatedJobsSList. Not thread-safe. */
 void
 UA_RepeatedJobsList_init(UA_RepeatedJobsList *rjl,
                          UA_RepeatedJobsListProcessCallback processCallback,
                          void *processContext);
 
+/* Add a repated job. Thread-safe, can be used in parallel and in parallel with
+ * UA_RepeatedJobsList_process. */
 UA_StatusCode
 UA_RepeatedJobsList_addRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Job job,
                                    const UA_UInt32 interval, UA_Guid *jobId);
 
+/* Remove a repated job. Thread-safe, can be used in parallel and in parallel
+ * with UA_RepeatedJobsList_process. */
 UA_StatusCode
 UA_RepeatedJobsList_removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid jobId);
 
-/* Process the repeated jobs that have timed out.
- * Returns the timestamp of the next scheduled repeated job. */
+/* Process the repeated jobs that have timed out. Returns the timestamp of the
+ * next scheduled repeated job. Not thread-safe. */
 UA_DateTime
 UA_RepeatedJobsList_process(UA_RepeatedJobsList *rjl, UA_DateTime nowMonotonic,
                             UA_Boolean *dispatched);
 
+/* Remove all repeated jobs. Not thread-safe. */
 void
 UA_RepeatedJobsList_deleteMembers(UA_RepeatedJobsList *rjl);