Explorar o código

refactor timer code to not rely on server internals

Julius Pfrommer %!s(int64=8) %!d(string=hai) anos
pai
achega
00fd02f108

+ 2 - 1
CMakeLists.txt

@@ -238,6 +238,7 @@ set(internal_headers ${PROJECT_SOURCE_DIR}/deps/queue.h
                      ${PROJECT_SOURCE_DIR}/src/ua_securechannel.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_nodes.h
                      ${PROJECT_SOURCE_DIR}/src/ua_session.h
+                     ${PROJECT_SOURCE_DIR}/src/ua_timer.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_subscription.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_nodestore.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_session_manager.h
@@ -253,11 +254,11 @@ set(lib_sources ${PROJECT_SOURCE_DIR}/src/ua_types.c
                 ${PROJECT_SOURCE_DIR}/src/ua_connection.c
                 ${PROJECT_SOURCE_DIR}/src/ua_securechannel.c
                 ${PROJECT_SOURCE_DIR}/src/ua_session.c
+                ${PROJECT_SOURCE_DIR}/src/ua_timer.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server_binary.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server_utils.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server_worker.c
-                ${PROJECT_SOURCE_DIR}/src/server/ua_server_timer.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_securechannel_manager.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_session_manager.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_nodes.c

+ 27 - 2
src/server/ua_server.c

@@ -244,7 +244,7 @@ static void deleteInstanceChildren(UA_Server *server, UA_NodeId *objectNodeId) {
 /* The server needs to be stopped before it can be deleted */
 void UA_Server_delete(UA_Server *server) {
     // Delete the timed work
-    UA_Server_deleteAllRepeatedJobs(server);
+    UA_RepeatedJobsList_deleteMembers(&server->repeatedJobs);
 
     // Delete all internal data
     UA_SecureChannelManager_deleteMembers(&server->secureChannelManager);
@@ -554,7 +554,17 @@ UA_Server * UA_Server_new(const UA_ServerConfig config) {
 
     server->config = config;
     server->nodestore = UA_NodeStore_new();
-    LIST_INIT(&server->repeatedJobs);
+
+    /* Initialize the handling of repeated jobs */
+#ifdef UA_ENABLE_MULTITHREADING
+    UA_RepeatedJobsList_init(&server->repeatedJobs,
+                             (UA_RepeatedJobsListProcessCallback)UA_Server_dispatchJob,
+                             server);
+#else
+    UA_RepeatedJobsList_init(&server->repeatedJobs,
+                             (UA_RepeatedJobsListProcessCallback)UA_Server_processJob,
+                             server);
+#endif
 
 #ifdef UA_ENABLE_MULTITHREADING
     rcu_init();
@@ -1578,3 +1588,18 @@ UA_Server_unregister_discovery(UA_Server *server, const char* discoveryServerUrl
 }
 
 #endif
+
+/*****************/
+/* Repeated Jobs */
+/*****************/
+
+UA_StatusCode
+UA_Server_addRepeatedJob(UA_Server *server, UA_Job job,
+                         UA_UInt32 interval, UA_Guid *jobId) {
+    return UA_RepeatedJobsList_addRepeatedJob(&server->repeatedJobs, job, interval, jobId);
+}
+
+UA_StatusCode
+UA_Server_removeRepeatedJob(UA_Server *server, UA_Guid jobId) {
+    return UA_RepeatedJobsList_removeRepeatedJob(&server->repeatedJobs, jobId);
+}

+ 2 - 6
src/server/ua_server_internal.h

@@ -11,6 +11,7 @@ extern "C" {
 
 #include "ua_util.h"
 #include "ua_server.h"
+#include "ua_timer.h"
 #include "ua_server_external_ns.h"
 #include "ua_connection_internal.h"
 #include "ua_session_manager.h"
@@ -91,10 +92,6 @@ UA_Server_dispatchJob(UA_Server *server, const UA_Job *job);
 void
 UA_Server_processJob(UA_Server *server, UA_Job *job);
 
-UA_DateTime
-UA_Server_processRepeatedJobs(UA_Server *server, UA_DateTime current,
-                              UA_Boolean *dispatched);
-
 #if defined(UA_ENABLE_METHODCALLS) && defined(UA_ENABLE_SUBSCRIPTIONS)
 /* Internally used context to a session 'context' of the current mehtod call */
 extern UA_THREAD_LOCAL UA_Session* methodCallSession;
@@ -180,7 +177,7 @@ struct UA_Server {
 #endif
 
     /* Jobs with a repetition interval */
-    LIST_HEAD(RepeatedJobsList, RepeatedJob) repeatedJobs;
+    UA_RepeatedJobsList repeatedJobs;
 
 #ifndef UA_ENABLE_MULTITHREADING
     SLIST_HEAD(DelayedJobsList, UA_DelayedJob) delayedCallbacks;
@@ -220,7 +217,6 @@ void UA_Server_processBinaryMessage(UA_Server *server, UA_Connection *connection
 
 UA_StatusCode UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback, void *data);
 UA_StatusCode UA_Server_delayedFree(UA_Server *server, void *data);
-void UA_Server_deleteAllRepeatedJobs(UA_Server *server);
 
 /* Add an existing node. The node is assumed to be "finished", i.e. no
  * instantiation from inheritance is necessary. Instantiationcallback and

+ 0 - 217
src/server/ua_server_timer.c

@@ -1,217 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
-
-#include "ua_util.h"
-#include "ua_server_internal.h"
-
-#define MAXTIMEOUT 50 // max timeout in millisec until the next main loop iteration
-
-/* The linked list of jobs is sorted according to the next execution timestamp */
-struct RepeatedJob {
-    LIST_ENTRY(RepeatedJob) next;  /* Next element in the list */
-    UA_DateTime nextTime;          /* The next time when the jobs are to be executed */
-    UA_UInt64 interval;            /* Interval in 100ns resolution */
-    UA_Guid id;                    /* Id of the repeated job */
-    UA_Job job;                    /* The job description itself */
-};
-
-/* internal. call only from the main loop. */
-static void
-addRepeatedJob(UA_Server *server, struct RepeatedJob * UA_RESTRICT rj) {
-    /* Search for the best position on the repeatedJobs sorted list. The goal is
-     * to have many repeated jobs with the same repetition interval in a
-     * "block". This helps to reduce the (linear) search to find the next entry
-     * in the repeatedJobs list when dispatching the repeated jobs.
-     * For this, we search between "nexttime_max - 1s" and "nexttime_max" for
-     * entries with the same repetition interval and adjust the "nexttime".
-     * Otherwise, add entry after the first element before "nexttime_max". */
-    UA_DateTime nextTime_max = UA_DateTime_nowMonotonic() + (UA_Int64) rj->interval;
-
-    struct RepeatedJob *afterRj = NULL;
-    struct RepeatedJob *tmpRj;
-    LIST_FOREACH(tmpRj, &server->repeatedJobs, next) {
-        if(tmpRj->nextTime >= nextTime_max)
-            break;
-        if(tmpRj->interval == rj->interval &&
-           tmpRj->nextTime > (nextTime_max - UA_SEC_TO_DATETIME))
-            nextTime_max = tmpRj->nextTime; /* break in the next iteration */
-        afterRj = tmpRj;
-    }
-
-    /* add the repeated job */
-    rj->nextTime = nextTime_max;
-    if(afterRj)
-        LIST_INSERT_AFTER(afterRj, rj, next);
-    else
-        LIST_INSERT_HEAD(&server->repeatedJobs, rj, next);
-}
-
-UA_StatusCode
-UA_Server_addRepeatedJob(UA_Server *server, UA_Job job,
-                         UA_UInt32 interval, UA_Guid *jobId) {
-    /* the interval needs to be at least 5ms */
-    if(interval < 5)
-        return UA_STATUSCODE_BADINTERNALERROR;
-    UA_UInt64 interval_dt =
-        (UA_UInt64)interval * (UA_UInt64)UA_MSEC_TO_DATETIME; // from ms to 100ns resolution
-
-    /* Create and fill the repeated job structure */
-    struct RepeatedJob *rj = (struct RepeatedJob *)UA_malloc(sizeof(struct RepeatedJob));
-    if(!rj)
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-    /* done inside addRepeatedJob:
-     * rj->nextTime = UA_DateTime_nowMonotonic() + interval_dt; */
-    rj->interval = interval_dt;
-    rj->id = UA_Guid_random();
-    rj->job = job;
-
-#ifdef UA_ENABLE_MULTITHREADING
-    /* Call addRepeatedJob from the main loop */
-    struct MainLoopJob *mlw = UA_malloc(sizeof(struct MainLoopJob));
-    if(!mlw) {
-        UA_free(rj);
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-    }
-    mlw->job = (UA_Job) {
-        .type = UA_JOBTYPE_METHODCALL,
-        .job.methodCall = {.data = rj, .method = (void (*)(UA_Server*, void*))addRepeatedJob}};
-    cds_lfs_push(&server->mainLoopJobs, &mlw->node);
-#else
-    /* Add directly */
-    addRepeatedJob(server, rj);
-#endif
-    if(jobId)
-        *jobId = rj->id;
-    return UA_STATUSCODE_GOOD;
-}
-
-/* - Dispatches all repeated jobs that have timed out
- * - Reinserts dispatched job at their new position in the sorted list
- * - Returns the next datetime when a repeated job is scheduled */
-UA_DateTime
-UA_Server_processRepeatedJobs(UA_Server *server, UA_DateTime current, UA_Boolean *dispatched) {
-    /* Find the last job that is executed in this iteration */
-    struct RepeatedJob *lastNow = NULL, *tmp;
-    LIST_FOREACH(tmp, &server->repeatedJobs, next) {
-        if(tmp->nextTime > current)
-            break;
-        lastNow = tmp;
-    }
-
-    /* Keep pointer to the previously dispatched job to avoid linear search for
-     * "batched" jobs with the same nexttime and interval */
-    struct RepeatedJob tmp_last;
-    tmp_last.nextTime = current-1; /* never matches. just to avoid if(last_added && ...) */
-    struct RepeatedJob *last_dispatched = &tmp_last;
-
-    /* Iterate over the list of elements (sorted according to the nextTime timestamp) */
-    struct RepeatedJob *rj, *tmp_rj;
-    LIST_FOREACH_SAFE(rj, &server->repeatedJobs, next, tmp_rj) {
-        if(rj->nextTime > current)
-            break;
-
-        /* Dispatch/process job */
-#ifdef UA_ENABLE_MULTITHREADING
-        UA_Server_dispatchJob(server, &rj->job);
-        *dispatched = true;
-#else
-        struct RepeatedJob **previousNext = rj->next.le_prev;
-        UA_Server_processJob(server, &rj->job);
-        /* See if the current job was deleted during processJob. That means the
-         * le_next field of the previous repeated job (could also be the list
-         * head) does no longer point to the current repeated job */
-        if((void*)*previousNext != (void*)rj) {
-            UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
-                         "The current repeated job removed itself");
-            continue;
-        }
-#endif
-
-        /* Set the time for the next execution */
-        rj->nextTime += (UA_Int64)rj->interval;
-
-        /* Prevent an infinite loop when the repeated jobs took more time than
-         * rj->interval */
-        if(rj->nextTime < current)
-            rj->nextTime = current + 1;
-
-        /* Find new position for rj to keep the list sorted */
-        struct RepeatedJob *prev_rj;
-        if(last_dispatched->nextTime == rj->nextTime) {
-            /* We "batch" repeatedJobs with the same interval in
-             * addRepeatedJobs. So this might occur quite often. */
-            UA_assert(last_dispatched != &tmp_last);
-            prev_rj = last_dispatched;
-        } else {
-            /* Find the position by a linear search starting at the first
-             * possible job */
-            UA_assert(lastNow); /* Not NULL. Otherwise, we never reach this point. */
-            prev_rj = lastNow;
-            while(true) {
-                struct RepeatedJob *n = LIST_NEXT(prev_rj, next);
-                if(!n || n->nextTime >= rj->nextTime)
-                    break;
-                prev_rj = n;
-            }
-        }
-
-        /* Add entry */
-        if(prev_rj != rj) {
-            LIST_REMOVE(rj, next);
-            LIST_INSERT_AFTER(prev_rj, rj, next);
-        }
-
-        /* Update last_dispatched and loop */
-        last_dispatched = rj;
-    }
-
-    /* Check if the next repeated job is sooner than the usual timeout */
-    struct RepeatedJob *first = LIST_FIRST(&server->repeatedJobs);
-    UA_DateTime next = current + (MAXTIMEOUT * UA_MSEC_TO_DATETIME);
-    if(first && first->nextTime < next)
-        next = first->nextTime;
-    return next;
-}
-
-/* Call this function only from the main loop! */
-static void
-removeRepeatedJob(UA_Server *server, UA_Guid *jobId) {
-    struct RepeatedJob *rj;
-    LIST_FOREACH(rj, &server->repeatedJobs, next) {
-        if(!UA_Guid_equal(jobId, &rj->id))
-            continue;
-        LIST_REMOVE(rj, next);
-        UA_free(rj);
-        break;
-    }
-#ifdef UA_ENABLE_MULTITHREADING
-    UA_free(jobId);
-#endif
-}
-
-UA_StatusCode UA_Server_removeRepeatedJob(UA_Server *server, UA_Guid jobId) {
-#ifdef UA_ENABLE_MULTITHREADING
-    UA_Guid *idptr = UA_malloc(sizeof(UA_Guid));
-    if(!idptr)
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-    *idptr = jobId;
-    // dispatch to the mainloopjobs stack
-    struct MainLoopJob *mlw = UA_malloc(sizeof(struct MainLoopJob));
-    mlw->job = (UA_Job) {
-        .type = UA_JOBTYPE_METHODCALL,
-        .job.methodCall = {.data = idptr, .method = (void (*)(UA_Server*, void*))removeRepeatedJob}};
-    cds_lfs_push(&server->mainLoopJobs, &mlw->node);
-#else
-    removeRepeatedJob(server, &jobId);
-#endif
-    return UA_STATUSCODE_GOOD;
-}
-
-void UA_Server_deleteAllRepeatedJobs(UA_Server *server) {
-    struct RepeatedJob *current, *temp;
-    LIST_FOREACH_SAFE(current, &server->repeatedJobs, next, temp) {
-        LIST_REMOVE(current, next);
-        UA_free(current);
-    }
-}

+ 3 - 2
src/server/ua_server_worker.c

@@ -367,7 +367,7 @@ UA_StatusCode UA_Server_run_startup(UA_Server *server) {
     /* Try to execute delayed callbacks every 10 sec */
     UA_Job processDelayed = {.type = UA_JOBTYPE_METHODCALL,
                              .job.methodCall = {.method = dispatchDelayedJobs, .data = NULL} };
-    UA_Server_addRepeatedJob(server, processDelayed, 10000, NULL);
+    UA_RepeatedJobsList_addRepeatedJob(&server->repeatedJobs, processDelayed, 10000, NULL);
 #endif
 
     /* Start the networklayers */
@@ -442,7 +442,8 @@ UA_UInt16 UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
     /* Process repeated work */
     UA_DateTime now = UA_DateTime_nowMonotonic();
     UA_Boolean dispatched = false; /* to wake up worker threads */
-    UA_DateTime nextRepeated = UA_Server_processRepeatedJobs(server, now, &dispatched);
+    UA_DateTime nextRepeated =
+        UA_RepeatedJobsList_process(&server->repeatedJobs, now, &dispatched);
 
     UA_UInt16 timeout = 0;
     if(waitInternal)

+ 252 - 0
src/ua_timer.c

@@ -0,0 +1,252 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "ua_util.h"
+#include "ua_timer.h"
+
+#define MAXTIMEOUT 50 // max timeout in millisec until the next main loop iteration
+
+/* Only one thread may traverse the lists. This is usually the "main" thread
+ * with the event loop. All other threads may add and remove repeated jobs by
+ * adding entries to the beginning of the addRemoveJobs list (with atomic
+ * operations).
+ *
+ * Adding repeated jobs: Add an entry with the "nextTime" timestamp in the
+ * future. This will be picked up in the next traversal and inserted at the
+ * correct place. So that the next execution takes place ät "nextTime".
+ *
+ * Removing a repeated job: Add an entry with the "nextTime" timestamp set to
+ * UA_INT64_MAX. The next iteration picks this up and removes the repated job
+ * from the linked list. */
+
+struct UA_RepeatedJob;
+typedef struct UA_RepeatedJob UA_RepeatedJob;
+
+struct UA_RepeatedJob {
+    SLIST_ENTRY(UA_RepeatedJob) next; /* Next element in the list */
+    UA_DateTime nextTime;             /* The next time when the jobs are to be executed */
+    UA_UInt64 interval;               /* Interval in 100ns resolution */
+    UA_Guid id;                       /* Id of the repeated job */
+    UA_Job job;                       /* The job description itself */
+};
+
+UA_StatusCode
+UA_RepeatedJobsList_addRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Job job,
+                                   const UA_UInt32 interval, UA_Guid *jobId) {
+    /* The interval needs to be at least 5ms */
+    if(interval < 5)
+        return UA_STATUSCODE_BADINTERNALERROR;
+
+    /* From ms to 100ns resolution */
+    UA_UInt64 interval_dt = (UA_UInt64)interval * (UA_UInt64)UA_MSEC_TO_DATETIME;
+
+    /* Allocate the repeated job structure */
+    UA_RepeatedJob *rj = (UA_RepeatedJob*)UA_malloc(sizeof(UA_RepeatedJob));
+    if(!rj)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+
+    /* Set the repeated job */
+    rj->interval = interval_dt;
+    rj->id = UA_Guid_random();
+    rj->job = job;
+    rj->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)interval_dt;
+
+    /* Set the output guid */
+    if(jobId)
+        *jobId = rj->id;
+
+    /* Insert the element to the linked list */
+    UA_RepeatedJob *currentFirst;
+    do {
+        currentFirst = SLIST_FIRST(&rjl->addRemoveJobs);
+        SLIST_NEXT(rj, next) = currentFirst;
+    } while(UA_atomic_cmpxchg((void**)&SLIST_FIRST(&rjl->addRemoveJobs), currentFirst, rj) != currentFirst);
+
+    return UA_STATUSCODE_GOOD;
+}
+
+UA_StatusCode
+UA_RepeatedJobsList_removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid jobId) {
+    /* Allocate the repeated job structure */
+    UA_RepeatedJob *rj = (UA_RepeatedJob*)UA_malloc(sizeof(UA_RepeatedJob));
+    if(!rj)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+
+    /* Set the repeated job with the sentinel nextTime */
+    rj->id = jobId;
+    rj->nextTime = UA_INT64_MAX;
+
+    /* Insert the element to the linked list */
+    UA_RepeatedJob *currentFirst;
+    do {
+        currentFirst = SLIST_FIRST(&rjl->addRemoveJobs);
+        SLIST_NEXT(rj, next) = currentFirst;
+    } while(UA_atomic_cmpxchg((void**)&SLIST_FIRST(&rjl->addRemoveJobs), currentFirst, rj) != currentFirst);
+
+    return UA_STATUSCODE_GOOD;
+}
+
+static void
+insertRepeatedJob(UA_RepeatedJobsList *rjl,
+                  UA_RepeatedJob * UA_RESTRICT rj,
+                  UA_DateTime nowMonotonic) {
+    /* The latest time for the first execution */
+    rj->nextTime = nowMonotonic + (UA_Int64)rj->interval;
+
+    /* Find the last entry before this job */
+    UA_RepeatedJob *tmpRj, *afterRj = NULL;
+    SLIST_FOREACH(tmpRj, &rjl->repeatedJobs, next) {
+        if(tmpRj->nextTime >= rj->nextTime)
+            break;
+        afterRj = tmpRj;
+
+        /* The goal is to have many repeated jobs with the same repetition
+         * interval in a "block" in order to reduce linear search for re-entry
+         * to the sorted list after processing. Allow the first execution to lie
+         * between "nextTime - 1s" and "nextTime" if this adjustment groups jobs
+         * with the same repetition interval. */
+        if(tmpRj->interval == rj->interval &&
+           tmpRj->nextTime > (rj->nextTime - UA_SEC_TO_DATETIME))
+            rj->nextTime = tmpRj->nextTime;
+    }
+
+    /* Add the repeated job */
+    if(afterRj)
+        SLIST_INSERT_AFTER(afterRj, rj, next);
+    else
+        SLIST_INSERT_HEAD(&rjl->repeatedJobs, rj, next);
+}
+
+static void
+removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid *jobId) {
+    UA_RepeatedJob *rj, *prev = NULL;
+    SLIST_FOREACH(rj, &rjl->repeatedJobs, next) {
+        if(UA_Guid_equal(jobId, &rj->id)) {
+            if(prev)
+                SLIST_REMOVE_AFTER(prev, next);
+            else
+                SLIST_REMOVE_HEAD(&rjl->repeatedJobs, next);
+            UA_free(rj);
+            break;
+        }
+        prev = rj;
+    }
+}
+
+static UA_DateTime
+nextRepetition(const UA_RepeatedJobsList *rjl, UA_DateTime nowMonotonic) {
+    /* Check if the next repeated job is sooner than the usual timeout.
+     * Return the duration until the next job or the max interval. */
+    UA_RepeatedJob *first = SLIST_FIRST(&rjl->repeatedJobs);
+    UA_DateTime next = nowMonotonic + (MAXTIMEOUT * UA_MSEC_TO_DATETIME);
+    if(first && first->nextTime < next)
+        next = first->nextTime;
+    return next;
+}
+
+static void
+processAddRemoveJobs(UA_RepeatedJobsList *rjl, UA_DateTime nowMonotonic) {
+    UA_RepeatedJob *current;
+    while((current = SLIST_FIRST(&rjl->addRemoveJobs))) {
+        SLIST_REMOVE_HEAD(&rjl->addRemoveJobs, next);
+        if(current->nextTime < UA_INT64_MAX) {
+            insertRepeatedJob(rjl, current, nowMonotonic);
+        } else {
+            removeRepeatedJob(rjl, &current->id);
+            UA_free(current);
+        }
+    }
+}
+
+UA_DateTime
+UA_RepeatedJobsList_process(UA_RepeatedJobsList *rjl,
+                            UA_DateTime nowMonotonic,
+                            UA_Boolean *dispatched) {
+    /* Insert and remove jobs */
+    processAddRemoveJobs(rjl, nowMonotonic);
+
+    /* Find the last job to be executed now */
+    UA_RepeatedJob *firstAfter, *lastNow = NULL;
+    SLIST_FOREACH(firstAfter, &rjl->repeatedJobs, next) {
+        if(firstAfter->nextTime > nowMonotonic)
+            break;
+        lastNow = firstAfter;
+    }
+
+    /* Nothing to do */
+    if(!lastNow)
+        return nextRepetition(rjl, nowMonotonic);
+
+    /* Put the jobs that are executed now in a separate list */
+    struct memberstruct(UA_RepeatedJobsList,RepeatedJobsSList) executedNowList;
+    executedNowList.slh_first = SLIST_FIRST(&rjl->repeatedJobs);
+    lastNow->next.sle_next = NULL;
+
+    /* Fake entry to represent the first element in the newly-sorted list */
+    UA_RepeatedJob tmp_first;
+    tmp_first.nextTime = nowMonotonic - 1; /* never matches for last_dispatched */
+    tmp_first.next.sle_next = firstAfter;
+    UA_RepeatedJob *last_dispatched = &tmp_first;
+
+    /* Iterate over the list of jobs to process now */
+    UA_RepeatedJob *rj;
+    while((rj = SLIST_FIRST(&executedNowList))) {
+        /* Remove from the list */
+        SLIST_REMOVE_HEAD(&executedNowList, next);
+
+        /* Dispatch/process job */
+        rjl->processCallback(rjl->processContext, &rj->job);
+        *dispatched = true;
+
+        /* Set the time for the next execution. Prevent an infinite loop by
+         * forcing the next processing into the next iteration. */
+        rj->nextTime += (UA_Int64)rj->interval;
+        if(rj->nextTime < nowMonotonic)
+            rj->nextTime = nowMonotonic + 1;
+
+        /* Find the new position for rj to keep the list sorted */
+        UA_RepeatedJob *prev_rj;
+        if(last_dispatched->nextTime == rj->nextTime) {
+            /* We "batch" repeatedJobs with the same interval in
+             * addRepeatedJobs. So this might occur quite often. */
+            UA_assert(last_dispatched != &tmp_first);
+            prev_rj = last_dispatched;
+        } else {
+            /* Find the position for the next execution by a linear search
+             * starting at the first possible job */
+            prev_rj = &tmp_first;
+            while(true) {
+                UA_RepeatedJob *n = SLIST_NEXT(prev_rj, next);
+                if(!n || n->nextTime >= rj->nextTime)
+                    break;
+                prev_rj = n;
+            }
+
+            /* Update last_dispatched */
+            last_dispatched = rj;
+        }
+
+        /* Add entry to the new position in the sorted list */
+        SLIST_INSERT_AFTER(prev_rj, rj, next);
+    }
+
+    /* Set the entry-point for the newly sorted list */
+    rjl->repeatedJobs.slh_first = tmp_first.next.sle_next;
+
+    /* Re-repeat processAddRemoved since one of the jobs might have removed or
+     * added a job. So we get the returned timeout right. */
+    processAddRemoveJobs(rjl, nowMonotonic);
+
+    /* Return timestamp of next repetition */
+    return nextRepetition(rjl, nowMonotonic);
+}
+
+void
+UA_RepeatedJobsList_deleteMembers(UA_RepeatedJobsList *rjl) {
+    UA_RepeatedJob *current;
+    while((current = SLIST_FIRST(&rjl->repeatedJobs))) {
+        SLIST_REMOVE_HEAD(&rjl->repeatedJobs, next);
+        UA_free(current);
+    }
+}

+ 62 - 0
src/ua_timer.h

@@ -0,0 +1,62 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#ifndef UA_TIMER_H_
+#define UA_TIMER_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ua_util.h"
+#include "ua_job.h"
+
+typedef void
+(*UA_RepeatedJobsListProcessCallback)(void *processContext, UA_Job *job);
+
+typedef struct {
+    /* The linked list of jobs is sorted according to the execution timestamp. */
+    SLIST_HEAD(RepeatedJobsSList, UA_RepeatedJob) repeatedJobs;
+
+    /* Repeated jobs that shall be added or removed from the sorted list (with
+     * atomic operations) */
+    SLIST_HEAD(RepeatedJobsSList2, UA_RepeatedJob) addRemoveJobs;
+
+    /* The callback to process jobs that have timed out */
+    UA_RepeatedJobsListProcessCallback processCallback;
+    void *processContext;
+} UA_RepeatedJobsList;
+
+static UA_INLINE void
+UA_RepeatedJobsList_init(UA_RepeatedJobsList *rjl,
+                         UA_RepeatedJobsListProcessCallback processCallback,
+                         void *processContext) {
+    SLIST_INIT(&rjl->repeatedJobs);
+    SLIST_INIT(&rjl->addRemoveJobs);
+    rjl->processCallback = processCallback;
+    rjl->processContext = processContext;
+}
+
+UA_StatusCode
+UA_RepeatedJobsList_addRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Job job,
+                                   const UA_UInt32 interval, UA_Guid *jobId);
+
+UA_StatusCode
+UA_RepeatedJobsList_removeRepeatedJob(UA_RepeatedJobsList *rjl, const UA_Guid jobId);
+
+/* - Processed all repeated jobs that have timed out
+ * - Reinserts dispatched job at their new position in the sorted list
+ * - Returns the next datetime when a repeated job is scheduled */
+UA_DateTime
+UA_RepeatedJobsList_process(UA_RepeatedJobsList *rjl, UA_DateTime nowMonotonic,
+                            UA_Boolean *dispatched);
+
+void
+UA_RepeatedJobsList_deleteMembers(UA_RepeatedJobsList *rjl);
+
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
+#endif /* UA_TIMER_H_ */

+ 1 - 5
tests/check_services_subscriptions.c

@@ -79,20 +79,16 @@ START_TEST(Server_publishCallback) {
     ck_assert(publishingInterval > 0.0f);
     UA_CreateSubscriptionResponse_deleteMembers(&response);
 
-    /* Sleep until the publishing interval times out */
-    usleep((useconds_t)(publishingInterval * 1000) + 1000);
-
     /* Keepalive is set to max initially */
     UA_Subscription *sub;
     LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry)
         ck_assert_uint_eq(sub->currentKeepAliveCount, sub->maxKeepAliveCount);
 
+    /* Sleep until the publishing interval times out */
     UA_Server_run_iterate(server, false);
-#ifdef UA_ENABLE_MULTITHREADING
     usleep((useconds_t)(publishingInterval * 1000) + 1000);
     UA_Server_run_iterate(server, false);
     usleep((useconds_t)(publishingInterval * 1000) + 1000);
-#endif
 
     LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry)
         ck_assert_uint_eq(sub->currentKeepAliveCount, sub->maxKeepAliveCount+1);