소스 검색

feature: align nextTime of repeated jobs with the same interval

Otherwise, there is quadratic runtime due to linear search for
reinserting every repeated job
Julius Pfrommer 8 년 전
부모
커밋
277aa21c52
1개의 변경된 파일67개의 추가작업 그리고 30개의 파일을 삭제
  1. 67 30
      src/server/ua_server_worker.c

+ 67 - 30
src/server/ua_server_worker.c

@@ -30,6 +30,10 @@
  * [1] Fraser, K. 2003. Practical lock freedom. Ph.D. thesis. Computer Laboratory, University of Cambridge.
  * [2] Hart, T. E., McKenney, P. E., Brown, A. D., & Walpole, J. (2007). Performance of memory reclamation
  *     for lockless synchronization. Journal of Parallel and Distributed Computing, 67(12), 1270-1285.
+ *
+ * Future Plans: Use work-stealing to load-balance between cores.
+ * [3] Lê, Nhat Minh, et al. "Correct and efficient work-stealing for weak
+ *     memory models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013.
  */
 
 #define MAXTIMEOUT 50 // max timeout in millisec until the next main loop iteration
@@ -153,21 +157,31 @@ struct RepeatedJob {
 
 /* internal. call only from the main loop. */
 static void
-addRepeatedJob(UA_Server *server, struct RepeatedJob * UA_RESTRICT rj)
-{
-    /* search for matching entry */
-    struct RepeatedJob *lastRj = NULL; /* Add after this entry or at LIST_HEAD if NULL */
-    struct RepeatedJob *tempRj = LIST_FIRST(&server->repeatedJobs);
-    while(tempRj) {
-        if(tempRj->nextTime > rj->nextTime)
+addRepeatedJob(UA_Server *server, struct RepeatedJob * UA_RESTRICT rj) {
+    /* Search for the best position on the repeatedJobs sorted list. The goal is
+     * to have many repeated jobs with the same repetition interval in a
+     * "block". This helps to reduce the (linear) search to find the next entry
+     * in the repeatedJobs list when dispatching the repeated jobs.
+     * For this, we search between "nexttime_max - 1s" and "nexttime_max" for
+     * entries with the same repetition interval and adjust the "nexttime".
+     * Otherwise, add entry after the first element before "nexttime_max". */
+    UA_DateTime nextTime_max = UA_DateTime_nowMonotonic() + (UA_Int64) rj->interval;
+
+    struct RepeatedJob *afterRj = NULL;
+    struct RepeatedJob *tmpRj;
+    LIST_FOREACH(tmpRj, &server->repeatedJobs, next) {
+        if(tmpRj->nextTime >= nextTime_max)
             break;
-        lastRj = tempRj;
-        tempRj = LIST_NEXT(lastRj, next);
+        if(tmpRj->interval == rj->interval &&
+           tmpRj->nextTime > (nextTime_max - UA_SEC_TO_DATETIME))
+            nextTime_max = tmpRj->nextTime; /* break in the next iteration */
+        afterRj = tmpRj;
     }
 
     /* add the repeated job */
-    if(lastRj)
-        LIST_INSERT_AFTER(lastRj, rj, next);
+    rj->nextTime = nextTime_max;
+    if(afterRj)
+        LIST_INSERT_AFTER(afterRj, rj, next);
     else
         LIST_INSERT_HEAD(&server->repeatedJobs, rj, next);
 }
@@ -184,7 +198,8 @@ UA_Server_addRepeatedJob(UA_Server *server, UA_Job job,
     struct RepeatedJob *rj = UA_malloc(sizeof(struct RepeatedJob));
     if(!rj)
         return UA_STATUSCODE_BADOUTOFMEMORY;
-    rj->nextTime = UA_DateTime_nowMonotonic() + (UA_Int64) interval;
+    /* done inside addRepeatedJob:
+     * rj->nextTime = UA_DateTime_nowMonotonic() + interval; */
     rj->interval = interval;
     rj->id = UA_Guid_random();
     rj->job = job;
@@ -209,10 +224,12 @@ UA_Server_addRepeatedJob(UA_Server *server, UA_Job job,
     return UA_STATUSCODE_GOOD;
 }
 
-/* Returns the next datetime when a repeated job is scheduled */
+/* - Dispatches all repeated jobs that have timed out
+ * - Reinserts dispatched job at their new position in the sorted list
+ * - Returns the next datetime when a repeated job is scheduled */
 static UA_DateTime
 processRepeatedJobs(UA_Server *server, UA_DateTime current) {
-    /* Find the last job that is executed after this iteration */
+    /* Find the last job that is executed in this iteration */
     struct RepeatedJob *lastNow = NULL, *tmp;
     LIST_FOREACH(tmp, &server->repeatedJobs, next) {
         if(tmp->nextTime > current)
@@ -220,14 +237,18 @@ processRepeatedJobs(UA_Server *server, UA_DateTime current) {
         lastNow = tmp;
     }
 
-    /* Iterate over the list of elements (sorted according to the next execution timestamp) */
+    /* Keep pointer to the previously dispatched job to avoid linear search for
+     * "batched" jobs with the same nexttime and interval */
+    struct RepeatedJob tmp_last;
+    tmp_last.nextTime = current-1; /* never matches. just to avoid if(last_added && ...) */
+    struct RepeatedJob *last_dispatched = &tmp_last;
+
+    /* Iterate over the list of elements (sorted according to the nextTime timestamp) */
     struct RepeatedJob *rj, *tmp_rj;
     LIST_FOREACH_SAFE(rj, &server->repeatedJobs, next, tmp_rj) {
         if(rj->nextTime > current)
             break;
 
-        UA_assert(lastNow); /* at least one element at the current time */
-
         /* Dispatch/process job */
 #ifdef UA_ENABLE_MULTITHREADING
         dispatchJob(server, &rj->job);
@@ -235,8 +256,8 @@ processRepeatedJobs(UA_Server *server, UA_DateTime current) {
         struct RepeatedJob **previousNext = rj->next.le_prev;
         processJob(server, &rj->job);
         /* See if the current job was deleted during processJob. That means the
-           le_next field of the previous repeated job (could also be the list
-           head) does no longer point to the current repeated job */
+         * le_next field of the previous repeated job (could also be the list
+         * head) does no longer point to the current repeated job */
         if((void*)*previousNext != (void*)rj) {
             UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
                          "The current repeated job removed itself");
@@ -246,23 +267,39 @@ processRepeatedJobs(UA_Server *server, UA_DateTime current) {
 
         /* Set the time for the next execution */
         rj->nextTime += (UA_Int64)rj->interval;
+
+        /* Prevent an infinite loop when the repeated jobs took more time than
+         * rj->interval */
         if(rj->nextTime < current)
-            rj->nextTime = current + 1; /* prevent to rerun the job right now
-                                           when the repeated jobs took more time
-                                           than rj->interval */
-
-        /* Keep the list sorted */
-        struct RepeatedJob *prev_rj = lastNow;
-        while(true) {
-            struct RepeatedJob *n = LIST_NEXT(prev_rj, next);
-            if(!n || n->nextTime >= rj->nextTime)
-                break;
-            prev_rj = n;
+            rj->nextTime = current + 1;
+
+        /* Find new position for rj to keep the list sorted */
+        struct RepeatedJob *prev_rj;
+        if(last_dispatched->nextTime == rj->nextTime) {
+            /* We "batch" repeatedJobs with the same interval in
+             * addRepeatedJobs. So this might occur quite often. */
+            prev_rj = last_dispatched;
+        } else {
+            /* Find the position by a linear search starting at the first
+             * possible job */
+            UA_assert(lastNow); /* Not NULL. Otherwise, we never reach this point. */
+            prev_rj = lastNow;
+            while(true) {
+                struct RepeatedJob *n = LIST_NEXT(prev_rj, next);
+                if(!n || n->nextTime >= rj->nextTime)
+                    break;
+                prev_rj = n;
+            }
         }
+
+        /* Add entry */
         if(prev_rj != rj) {
             LIST_REMOVE(rj, next);
             LIST_INSERT_AFTER(prev_rj, rj, next);
         }
+
+        /* Update last_dispatched and loop */
+        last_dispatched = rj;
     }
 
     /* Check if the next repeated job is sooner than the usual timeout */