Browse Source

add delayed callbacks also for single-threaded execution

Julius Pfrommer 8 years ago
parent
commit
fe7e2f1508
3 changed files with 41 additions and 2 deletions
  1. 2 0
      src/server/ua_server.c
  2. 3 1
      src/server/ua_server_internal.h
  3. 36 1
      src/server/ua_server_worker.c

+ 2 - 0
src/server/ua_server.c

@@ -451,6 +451,8 @@ UA_Server * UA_Server_new(const UA_ServerConfig config) {
     rcu_init();
     cds_wfcq_init(&server->dispatchQueue_head, &server->dispatchQueue_tail);
     cds_lfs_init(&server->mainLoopJobs);
+#else
+    SLIST_INIT(&server->delayedCallbacks);
 #endif
 
     /* uncomment for non-reproducible server runs */

+ 3 - 1
src/server/ua_server_internal.h

@@ -62,7 +62,9 @@ struct UA_Server {
     /* Jobs with a repetition interval */
     LIST_HEAD(RepeatedJobsList, RepeatedJob) repeatedJobs;
 
-#ifdef UA_ENABLE_MULTITHREADING
+#ifndef UA_ENABLE_MULTITHREADING
+    SLIST_HEAD(DelayedJobsList, UA_DelayedJob) delayedCallbacks;
+#else
     /* Dispatch queue head for the worker threads (the tail should not be in the same cache line) */
     struct cds_wfcq_head dispatchQueue_head;
     UA_Worker *workers; /* there are nThread workers in a running server */

+ 36 - 1
src/server/ua_server_worker.c

@@ -307,7 +307,36 @@ void UA_Server_deleteAllRepeatedJobs(UA_Server *server) {
 /* Delayed Jobs */
 /****************/
 
-#ifdef UA_ENABLE_MULTITHREADING
+#ifndef UA_ENABLE_MULTITHREADING
+
+typedef struct UA_DelayedJob {
+    SLIST_ENTRY(UA_DelayedJob) next;
+    UA_Job job;
+} UA_DelayedJob;
+
+UA_StatusCode
+UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback, void *data) {
+    UA_DelayedJob *dj = UA_malloc(sizeof(UA_DelayedJob));
+    if(!dj)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    dj->job.type = UA_JOBTYPE_METHODCALL;
+    dj->job.job.methodCall.data = data;
+    dj->job.job.methodCall.method = callback;
+    SLIST_INSERT_HEAD(&server->delayedCallbacks, dj, next);
+    return UA_STATUSCODE_GOOD;
+}
+
+static void
+processDelayedCallbacks(UA_Server *server) {
+    UA_DelayedJob *dj, *dj_tmp;
+    SLIST_FOREACH_SAFE(dj, &server->delayedCallbacks, next, dj_tmp) {
+        SLIST_REMOVE(&server->delayedCallbacks, dj, UA_DelayedJob, next);
+        processJob(server, &dj->job);
+        UA_free(dj);
+    }
+}
+
+#else
 
 #define DELAYEDJOBSSIZE 100 // Collect delayed jobs until we have DELAYEDWORKSIZE items
 
@@ -574,6 +603,10 @@ UA_UInt16 UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
         }
     }
 
+#ifndef UA_ENABLE_MULTITHREADING
+    processDelayedCallbacks(server);
+#endif
+
     now = UA_DateTime_nowMonotonic();
     timeout = 0;
     if(nextRepeated > now)
@@ -607,6 +640,8 @@ UA_StatusCode UA_Server_run_shutdown(UA_Server *server) {
     emptyDispatchQueue(server);
     UA_ASSERT_RCU_UNLOCKED();
     rcu_barrier(); // wait for all scheduled call_rcu work to complete
+#else
+    processDelayedCallbacks(server);
 #endif
     return UA_STATUSCODE_GOOD;
 }