Browse Source

Move WorkQueue to a separate struct for client and server

Julius Pfrommer 5 years ago
parent
commit
f492dbbfa0

+ 2 - 1
CMakeLists.txt

@@ -472,6 +472,7 @@ set(internal_headers ${PROJECT_SOURCE_DIR}/deps/queue.h
                      ${PROJECT_BINARY_DIR}/src_generated/ua_transport_generated_encoding_binary.h
                      ${PROJECT_SOURCE_DIR}/src/ua_connection_internal.h
                      ${PROJECT_SOURCE_DIR}/src/ua_securechannel.h
+                     ${PROJECT_SOURCE_DIR}/src/ua_workqueue.h
                      ${PROJECT_SOURCE_DIR}/src/ua_timer.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_session.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_subscription.h
@@ -493,6 +494,7 @@ set(lib_sources ${PROJECT_SOURCE_DIR}/src/ua_types.c
                 ${PROJECT_BINARY_DIR}/src_generated/ua_transport_generated.c
                 ${PROJECT_BINARY_DIR}/src_generated/ua_statuscodes.c
                 ${PROJECT_SOURCE_DIR}/src/ua_util.c
+                ${PROJECT_SOURCE_DIR}/src/ua_workqueue.c
                 ${PROJECT_SOURCE_DIR}/src/ua_timer.c
                 ${PROJECT_SOURCE_DIR}/src/ua_connection.c
                 ${PROJECT_SOURCE_DIR}/src/ua_securechannel.c
@@ -503,7 +505,6 @@ set(lib_sources ${PROJECT_SOURCE_DIR}/src/ua_types.c
                 ${PROJECT_BINARY_DIR}/src_generated/ua_namespace0.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server_binary.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server_utils.c
-                ${PROJECT_SOURCE_DIR}/src/server/ua_server_worker.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server_discovery.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_securechannel_manager.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_session_manager.c

+ 10 - 15
src/client/ua_client.c

@@ -44,12 +44,8 @@ UA_Client_init(UA_Client* client, UA_ClientConfig config) {
     /* Catch error during async connection */
     client->connectStatus = UA_STATUSCODE_GOOD;
 
-    /* Needed by async client */
     UA_Timer_init(&client->timer);
-
-#ifndef UA_ENABLE_MULTITHREADING
-    SLIST_INIT(&client->delayedClientCallbacks);
-#endif
+    UA_WorkQueue_init(&client->workQueue);
 }
 
 UA_Client *
@@ -122,12 +118,8 @@ UA_Client_secure_init(UA_Client* client, UA_ClientConfig config,
     /* Catch error during async connection */
     client->connectStatus = UA_STATUSCODE_GOOD;
 
-    /* Needed by async client */
     UA_Timer_init(&client->timer);
-
-#ifndef UA_ENABLE_MULTITHREADING
-    SLIST_INIT(&client->delayedClientCallbacks);
-#endif
+    UA_WorkQueue_init(&client->workQueue);
 
     /* Initialize the SecureChannel */
     UA_SecureChannel_init(&client->channel);
@@ -204,6 +196,9 @@ UA_Client_deleteMembers(UA_Client* client) {
 
     /* Delete the timed work */
     UA_Timer_deleteMembers(&client->timer);
+
+    /* Clean up the work queue */
+    UA_WorkQueue_cleanup(&client->workQueue);
 }
 
 void
@@ -640,16 +635,16 @@ UA_Client_sendAsyncRequest(UA_Client *client, const void *request,
 }
 
 UA_StatusCode
-UA_Client_addRepeatedCallback(UA_Client *Client, UA_ClientCallback callback,
+UA_Client_addRepeatedCallback(UA_Client *client, UA_ClientCallback callback,
                               void *data, UA_UInt32 interval,
                               UA_UInt64 *callbackId) {
-    return UA_Timer_addRepeatedCallback(&Client->timer,
-                                        (UA_TimerCallback) callback, data,
+    return UA_Timer_addRepeatedCallback(&client->timer,
+                                        (UA_ApplicationCallback) callback, client, data,
                                         interval, callbackId);
 }
 
 
 UA_StatusCode
-UA_Client_removeRepeatedCallback(UA_Client *Client, UA_UInt64 callbackId) {
-    return UA_Timer_removeRepeatedCallback(&Client->timer, callbackId);
+UA_Client_removeRepeatedCallback(UA_Client *client, UA_UInt64 callbackId) {
+    return UA_Timer_removeRepeatedCallback(&client->timer, callbackId);
 }

+ 5 - 8
src/client/ua_client_internal.h

@@ -14,6 +14,7 @@
 #define UA_CLIENT_INTERNAL_H_
 
 #include "ua_securechannel.h"
+#include "ua_workqueue.h"
 #include "ua_client_highlevel.h"
 #include "ua_client_subscriptions.h"
 #include "ua_timer.h"
@@ -171,8 +172,9 @@ struct UA_Client {
     /*When using highlevel functions these are the callbacks that can be accessed by the user*/
     LIST_HEAD(ListOfCustomCallback, CustomCallback) customCallbacks;
 
-    /* Delayed callbacks */
-    SLIST_HEAD(DelayedClientCallbacksList, UA_DelayedClientCallback) delayedClientCallbacks;
+    /* Work queue */
+    UA_WorkQueue workQueue;
+
     /* Subscriptions */
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     UA_UInt32 monitoredItemHandles;
@@ -220,12 +222,7 @@ receiveServiceResponse(UA_Client *client, void *response,
 UA_StatusCode
 receiveServiceResponseAsync(UA_Client *client, void *response,
                              const UA_DataType *responseType);
-void
-UA_Client_workerCallback(UA_Client *client, UA_ClientCallback callback,
-                         void *data);
-UA_StatusCode
-UA_Client_delayedCallback(UA_Client *client, UA_ClientCallback callback,
-                          void *data);
+
 UA_StatusCode
 UA_Client_connect_iterate (UA_Client *client);
 

+ 12 - 81
src/client/ua_client_worker.c

@@ -6,83 +6,6 @@
 #include "ua_client.h"
 #include "ua_client_internal.h"
 
-/**
- * Worker Threads and Dispatch Queue
- * ---------------------------------
- * The worker threads dequeue callbacks from a central Multi-Producer
- * Multi-Consumer Queue (MPMC). When there are no callbacks, workers go idle.
- * The condition to wake them up is triggered whenever a callback is
- * dispatched.
- *
- * Future Plans: Use work-stealing to load-balance between cores.
- * Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory
- * models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */
-
-/**
- * Repeated Callbacks
- * ------------------
- * Repeated Callbacks are handled by UA_Timer (used in both client and client).
- * In the multi-threaded case, callbacks are dispatched to workers. Otherwise,
- * they are executed immediately. */
-
-void UA_Client_workerCallback(UA_Client *client, UA_ClientCallback callback,
-        void *data) {
-    /* Execute immediately */
-    callback(client, data);
-}
-
-/**
- * Delayed Callbacks
- * -----------------
- *
- * Delayed Callbacks are called only when all callbacks that were dispatched
- * prior are finished. In the single-threaded case, the callback is added to a
- * singly-linked list that is processed at the end of the client's main-loop. In
- * the multi-threaded case, the delay is ensure by a three-step procedure:
- *
- * 1. The delayed callback is dispatched to the worker queue. So it is only
- *    dequeued when all prior callbacks have been dequeued.
- *
- * 2. When the callback is first dequeued by a worker, sample the counter of all
- *    workers. Once all counters have advanced, the callback is ready.
- *
- * 3. Check regularly if the callback is ready by adding it back to the dispatch
- *    queue. */
-
-typedef struct UA_DelayedClientCallback {
-    SLIST_ENTRY(UA_DelayedClientCallback)
-    next;
-    UA_ClientCallback callback;
-    void *data;
-} UA_DelayedClientCallback;
-
-UA_StatusCode UA_Client_delayedCallback(UA_Client *client,
-        UA_ClientCallback callback, void *data) {
-    UA_DelayedClientCallback *dc = (UA_DelayedClientCallback*) UA_malloc(
-            sizeof(UA_DelayedClientCallback));
-    if (!dc)
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-
-    dc->callback = callback;
-    dc->data = data;
-    SLIST_INSERT_HEAD(&client->delayedClientCallbacks, dc, next);
-    return UA_STATUSCODE_GOOD;
-}
-
-void
-processDelayedClientCallbacks(UA_Client *client);
-
-void processDelayedClientCallbacks(UA_Client *client) {
-    UA_DelayedClientCallback *dc, *dc_tmp;
-    SLIST_FOREACH_SAFE(dc, &client->delayedClientCallbacks, next, dc_tmp)
-    {
-        SLIST_REMOVE(&client->delayedClientCallbacks, dc,
-                UA_DelayedClientCallback, next);
-        dc->callback(client, dc->data);
-        UA_free(dc);
-    }
-}
-
 static void
 asyncServiceTimeoutCheck(UA_Client *client) {
     UA_DateTime now = UA_DateTime_nowMonotonic();
@@ -156,6 +79,14 @@ UA_Client_backgroundConnectivity(UA_Client *client) {
  * Stop: Stop workers, finish all callbacks, stop the network layer,
  *       clean up */
 
+static void
+clientExecuteRepeatedCallback(UA_Client *client, UA_ApplicationCallback cb,
+                              void *callbackApplication, void *data) {
+    cb(callbackApplication, data);
+    /* TODO: Use workers in the client
+     * UA_WorkQueue_enqueue(&client->workQueue, cb, callbackApplication, data); */
+}
+
 UA_StatusCode UA_Client_run_iterate(UA_Client *client, UA_UInt16 timeout) {
 // TODO connectivity check & timeout features for the async implementation (timeout == 0)
     UA_StatusCode retval = UA_STATUSCODE_GOOD;
@@ -190,7 +121,7 @@ UA_StatusCode UA_Client_run_iterate(UA_Client *client, UA_UInt16 timeout) {
     else{
         UA_DateTime now = UA_DateTime_nowMonotonic();
         UA_Timer_process(&client->timer, now,
-                         (UA_TimerDispatchCallback) UA_Client_workerCallback, client);
+                         (UA_TimerExecutionCallback)clientExecuteRepeatedCallback, client);
 
         UA_ClientState cs = UA_Client_getState(client);
         retval = UA_Client_connect_iterate(client);
@@ -212,9 +143,9 @@ UA_StatusCode UA_Client_run_iterate(UA_Client *client, UA_UInt16 timeout) {
         asyncServiceTimeoutCheck(client);
 
 #ifndef UA_ENABLE_MULTITHREADING
-/* Process delayed callbacks when all callbacks and
- * network events are done */
-    processDelayedClientCallbacks(client);
+        /* Process delayed callbacks when all callbacks and network events are
+         * done */
+        UA_WorkQueue_manuallyProcessDelayed(&client->workQueue);
 #endif
     return retval;
 }

+ 2 - 2
src/pubsub/ua_pubsub_manager.c

@@ -299,8 +299,8 @@ UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager) {
 UA_StatusCode
 UA_PubSubManager_addRepeatedCallback(UA_Server *server, UA_ServerCallback callback,
                                      void *data, UA_UInt32 interval, UA_UInt64 *callbackId) {
-    return UA_Timer_addRepeatedCallback(&server->timer, (UA_TimerCallback)callback,
-                                        data, interval, callbackId);
+    return UA_Timer_addRepeatedCallback(&server->timer, (UA_ApplicationCallback)callback,
+                                        server, data, interval, callbackId);
 }
 
 UA_StatusCode

+ 2 - 8
src/server/ua_mdns.c

@@ -116,13 +116,6 @@ mdns_record_add_or_get(UA_Server *server, const char *record, const char *server
 }
 
 #ifdef UA_ENABLE_MULTITHREADING
-static void
-delayedFree(UA_Server *server, void *data) {
-    UA_free(data);
-}
-#endif
-
-
 
 static struct mdnsHostnameToIp_list_entry *
 mdns_hostname_add_or_get(UA_Server *server, const char *hostname, struct in_addr addr, UA_Boolean createNew) {
@@ -416,7 +409,8 @@ mdns_record_remove(UA_Server *server, const char *record,
     UA_free(entry);
 #else
     UA_atomic_subSize(&server->serverOnNetworkSize, 1);
-    UA_Server_delayedCallback(server, delayedFree, entry);
+    entry->delayedCleanup.callback = NULL; /* Only free the structure */
+    UA_WorkQueue_enqueueDelayed(&server->workQueue, &entry->delayedCleanup);
 #endif
 }
 

+ 13 - 17
src/server/ua_securechannel_manager.c

@@ -40,28 +40,22 @@ UA_SecureChannelManager_deleteMembers(UA_SecureChannelManager *cm) {
 }
 
 static void
-removeSecureChannelCallback(UA_Server *server, void *entry) {
-    channel_entry *centry = (channel_entry *)entry;
-    UA_SecureChannel_deleteMembersCleanup(&centry->channel);
-    UA_free(entry);
+removeSecureChannelCallback(void *_, channel_entry *entry) {
+    UA_SecureChannel_deleteMembersCleanup(&entry->channel);
 }
 
-static UA_StatusCode
+static void
 removeSecureChannel(UA_SecureChannelManager *cm, channel_entry *entry) {
-    /* Add a delayed callback to remove the channel when the currently
-     * scheduled jobs have completed */
-    UA_StatusCode retval = UA_Server_delayedCallback(cm->server, removeSecureChannelCallback, entry);
-    if(retval != UA_STATUSCODE_GOOD) {
-        UA_LOG_WARNING(cm->server->config.logger, UA_LOGCATEGORY_SESSION,
-                       "Could not remove the secure channel with error code %s",
-                       UA_StatusCode_name(retval));
-        return retval; /* Try again next time */
-    }
-
     /* Detach the channel and make the capacity available */
     TAILQ_REMOVE(&cm->channels, entry, pointers);
     UA_atomic_subUInt32(&cm->currentChannelCount, 1);
-    return UA_STATUSCODE_GOOD;
+
+    /* Add a delayed callback to remove the channel when the currently
+     * scheduled jobs have completed */
+    entry->cleanupCallback.callback = (UA_ApplicationCallback)removeSecureChannelCallback;
+    entry->cleanupCallback.application = NULL;
+    entry->cleanupCallback.data = entry;
+    UA_WorkQueue_enqueueDelayed(&cm->server->workQueue, &entry->cleanupCallback);
 }
 
 /* remove channels that were not renewed or who have no connection attached */
@@ -266,5 +260,7 @@ UA_SecureChannelManager_close(UA_SecureChannelManager *cm, UA_UInt32 channelId)
     }
     if(!entry)
         return UA_STATUSCODE_BADINTERNALERROR;
-    return removeSecureChannel(cm, entry);
+
+    removeSecureChannel(cm, entry);
+    return UA_STATUSCODE_GOOD;
 }

+ 3 - 1
src/server/ua_securechannel_manager.h

@@ -13,14 +13,16 @@
 
 #include "ua_util_internal.h"
 #include "ua_server.h"
+#include "ua_workqueue.h"
 #include "ua_securechannel.h"
 #include "../../deps/queue.h"
 
 _UA_BEGIN_DECLS
 
 typedef struct channel_entry {
-    UA_SecureChannel channel;
+    UA_DelayedCallback cleanupCallback;
     TAILQ_ENTRY(channel_entry) pointers;
+    UA_SecureChannel channel;
 } channel_entry;
 
 typedef struct {

+ 174 - 22
src/server/ua_server.c

@@ -28,6 +28,10 @@
 #include "ua_subscription.h"
 #endif
 
+#ifdef UA_ENABLE_VALGRIND_INTERACTIVE
+#include <valgrind/memcheck.h>
+#endif
+
 /**********************/
 /* Namespace Handling */
 /**********************/
@@ -210,16 +214,8 @@ void UA_Server_delete(UA_Server *server) {
     /* Clean up the Admin Session */
     UA_Session_deleteMembersCleanup(&server->adminSession, server);
 
-#ifdef UA_ENABLE_MULTITHREADING
-    /* Process new delayed callbacks from the cleanup */
-    UA_Server_cleanupDispatchQueue(server);
-    pthread_mutex_destroy(&server->dispatchQueue_accessMutex);
-    pthread_cond_destroy(&server->dispatchQueue_condition);
-    pthread_mutex_destroy(&server->dispatchQueue_conditionMutex);
-#else
-    /* Process new delayed callbacks from the cleanup */
-    UA_Server_cleanupDelayedCallbacks(server);
-#endif
+    /* Clean up the work queue */
+    UA_WorkQueue_cleanup(&server->workQueue);
 
     /* Delete the timed work */
     UA_Timer_deleteMembers(&server->timer);
@@ -269,15 +265,7 @@ UA_Server_new(const UA_ServerConfig *config) {
     /* Initialize the handling of repeated callbacks */
     UA_Timer_init(&server->timer);
 
-    /* Initialized the linked list for delayed callbacks */
-#ifndef UA_ENABLE_MULTITHREADING
-    SLIST_INIT(&server->delayedCallbacks);
-#endif
-
-    /* Initialized the dispatch queue for worker threads */
-#ifdef UA_ENABLE_MULTITHREADING
-    SIMPLEQ_INIT(&server->dispatchQueue);
-#endif
+    UA_WorkQueue_init(&server->workQueue);
 
     /* Initialize the adminSession */
     UA_Session_init(&server->adminSession);
@@ -358,8 +346,9 @@ UA_StatusCode
 UA_Server_addRepeatedCallback(UA_Server *server, UA_ServerCallback callback,
                               void *data, UA_UInt32 interval,
                               UA_UInt64 *callbackId) {
-    return UA_Timer_addRepeatedCallback(&server->timer, (UA_TimerCallback)callback,
-                                        data, interval, callbackId);
+    return UA_Timer_addRepeatedCallback(&server->timer,
+                                        (UA_ApplicationCallback)callback,
+                                        server, data, interval, callbackId);
 }
 
 UA_StatusCode
@@ -373,7 +362,6 @@ UA_Server_removeRepeatedCallback(UA_Server *server, UA_UInt64 callbackId) {
     return UA_Timer_removeRepeatedCallback(&server->timer, callbackId);
 }
 
-
 UA_StatusCode UA_EXPORT
 UA_Server_updateCertificate(UA_Server *server,
                             const UA_ByteString *oldCertificate,
@@ -423,3 +411,167 @@ UA_Server_updateCertificate(UA_Server *server,
 
     return UA_STATUSCODE_GOOD;
 }
+
+/********************/
+/* Main Server Loop */
+/********************/
+
+#define UA_MAXTIMEOUT 50 /* Max timeout in ms between main-loop iterations */
+
+/* Start: Spin up the workers and the network layer and sample the server's
+ *        start time.
+ * Iterate: Process repeated callbacks and events in the network layer. This
+ *          part can be driven from an external main-loop in an event-driven
+ *          single-threaded architecture.
+ * Stop: Stop workers, finish all callbacks, stop the network layer, clean up */
+
+UA_StatusCode
+UA_Server_run_startup(UA_Server *server) {
+    UA_Variant var;
+    UA_StatusCode result = UA_STATUSCODE_GOOD;
+	
+	/* At least one endpoint has to be configured */
+    if(server->config.endpointsSize == 0) {
+        UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER,
+                       "There has to be at least one endpoint.");
+    }
+
+    /* Sample the start time and set it to the Server object */
+    server->startTime = UA_DateTime_now();
+    UA_Variant_init(&var);
+    UA_Variant_setScalar(&var, &server->startTime, &UA_TYPES[UA_TYPES_DATETIME]);
+    UA_Server_writeValue(server,
+                         UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_STARTTIME),
+                         var);
+
+    /* Start the networklayers */
+    for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
+        UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
+        result |= nl->start(nl, &server->config.customHostname);
+    }
+
+    /* Spin up the worker threads */
+#ifdef UA_ENABLE_MULTITHREADING
+    UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
+                "Spinning up %u worker thread(s)", server->config.nThreads);
+    UA_WorkQueue_start(&server->workQueue, server->config.nThreads);
+#endif
+
+    /* Start the multicast discovery server */
+#ifdef UA_ENABLE_DISCOVERY_MULTICAST
+    if(server->config.applicationDescription.applicationType ==
+       UA_APPLICATIONTYPE_DISCOVERYSERVER)
+        startMulticastDiscoveryServer(server);
+#endif
+
+    return result;
+}
+
+static void
+serverExecuteRepeatedCallback(UA_Server *server, UA_ApplicationCallback cb,
+                        void *callbackApplication, void *data) {
+#ifndef UA_ENABLE_MULTITHREADING
+    cb(callbackApplication, data);
+#else
+    UA_WorkQueue_enqueue(&server->workQueue, cb, callbackApplication, data);
+#endif
+}
+
+UA_UInt16
+UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
+    /* Process repeated work */
+    UA_DateTime now = UA_DateTime_nowMonotonic();
+    UA_DateTime nextRepeated = UA_Timer_process(&server->timer, now,
+                     (UA_TimerExecutionCallback)serverExecuteRepeatedCallback, server);
+    UA_DateTime latest = now + (UA_MAXTIMEOUT * UA_DATETIME_MSEC);
+    if(nextRepeated > latest)
+        nextRepeated = latest;
+
+    UA_UInt16 timeout = 0;
+
+    /* round always to upper value to avoid timeout to be set to 0
+    * if(nextRepeated - now) < (UA_DATETIME_MSEC/2) */
+    if(waitInternal)
+        timeout = (UA_UInt16)(((nextRepeated - now) + (UA_DATETIME_MSEC - 1)) / UA_DATETIME_MSEC);
+
+    /* Listen on the networklayer */
+    for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
+        UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
+        nl->listen(nl, server, timeout);
+    }
+
+#if defined(UA_ENABLE_DISCOVERY_MULTICAST) && !defined(UA_ENABLE_MULTITHREADING)
+    if(server->config.applicationDescription.applicationType ==
+       UA_APPLICATIONTYPE_DISCOVERYSERVER) {
+        // TODO multicastNextRepeat does not consider new input data (requests)
+        // on the socket. It will be handled on the next call. if needed, we
+        // need to use select with timeout on the multicast socket
+        // server->mdnsSocket (see example in mdnsd library) on higher level.
+        UA_DateTime multicastNextRepeat = 0;
+        UA_StatusCode hasNext =
+            iterateMulticastDiscoveryServer(server, &multicastNextRepeat, UA_TRUE);
+        if(hasNext == UA_STATUSCODE_GOOD && multicastNextRepeat < nextRepeated)
+            nextRepeated = multicastNextRepeat;
+    }
+#endif
+
+#ifndef UA_ENABLE_MULTITHREADING
+    UA_WorkQueue_manuallyProcessDelayed(&server->workQueue);
+#endif
+
+    now = UA_DateTime_nowMonotonic();
+    timeout = 0;
+    if(nextRepeated > now)
+        timeout = (UA_UInt16)((nextRepeated - now) / UA_DATETIME_MSEC);
+    return timeout;
+}
+
+UA_StatusCode
+UA_Server_run_shutdown(UA_Server *server) {
+    /* Stop the netowrk layer */
+    for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
+        UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
+        nl->stop(nl, server);
+    }
+
+#ifdef UA_ENABLE_MULTITHREADING
+    /* Shut down the workers */
+    UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
+                "Shutting down %u worker thread(s)",
+                (UA_UInt32)server->workQueue.workersSize);
+    UA_WorkQueue_stop(&server->workQueue);
+#endif
+
+#ifdef UA_ENABLE_DISCOVERY_MULTICAST
+    /* Stop multicast discovery */
+    if(server->config.applicationDescription.applicationType ==
+       UA_APPLICATIONTYPE_DISCOVERYSERVER)
+        stopMulticastDiscoveryServer(server);
+#endif
+
+    /* Execute all delayed callbacks */
+    UA_WorkQueue_cleanup(&server->workQueue);
+
+    return UA_STATUSCODE_GOOD;
+}
+
+UA_StatusCode
+UA_Server_run(UA_Server *server, volatile UA_Boolean *running) {
+    UA_StatusCode retval = UA_Server_run_startup(server);
+    if(retval != UA_STATUSCODE_GOOD)
+        return retval;
+#ifdef UA_ENABLE_VALGRIND_INTERACTIVE
+    size_t loopCount = 0;
+#endif
+    while(*running) {
+#ifdef UA_ENABLE_VALGRIND_INTERACTIVE
+        if(loopCount == 0) {
+            VALGRIND_DO_LEAK_CHECK;
+        }
+        ++loopCount;
+        loopCount %= UA_VALGRIND_INTERACTIVE_INTERVAL;
+#endif
+        UA_Server_run_iterate(server, true);
+    }
+    return UA_Server_run_shutdown(server);
+}

+ 9 - 3
src/server/ua_server_binary.c

@@ -791,8 +791,7 @@ UA_Server_processBinaryMessage(UA_Server *server, UA_Connection *connection,
 
 #ifdef UA_ENABLE_MULTITHREADING
 static void
-deleteConnectionTrampoline(UA_Server *server, void *data) {
-    UA_Connection *connection = (UA_Connection*)data;
+deleteConnection(UA_Server *server, UA_Connection *connection) {
     connection->free(connection);
 }
 #endif
@@ -803,6 +802,13 @@ UA_Server_removeConnection(UA_Server *server, UA_Connection *connection) {
 #ifndef UA_ENABLE_MULTITHREADING
     connection->free(connection);
 #else
-    UA_Server_delayedCallback(server, deleteConnectionTrampoline, connection);
+    UA_DelayedCallback *dc = (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback));
+    if(!dc)
+        return; /* Malloc cannot fail on OS's that support multithreading. They
+                 * rather kill the process. */
+    dc->callback = (UA_ApplicationCallback)deleteConnection;
+    dc->application = server;
+    dc->data = connection;
+    UA_WorkQueue_enqueueDelayed(&server->workQueue, dc);
 #endif
 }

+ 12 - 52
src/server/ua_server_internal.h

@@ -2,7 +2,7 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  *
- *    Copyright 2014-2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
+ *    Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  *    Copyright 2014, 2017 (c) Florian Palm
  *    Copyright 2015-2016 (c) Sten Grüner
  *    Copyright 2015 (c) Chris Iatrou
@@ -21,6 +21,7 @@
 #include "ua_connection_internal.h"
 #include "ua_session_manager.h"
 #include "ua_securechannel_manager.h"
+#include "ua_workqueue.h"
 
 _UA_BEGIN_DECLS
 
@@ -42,30 +43,21 @@ typedef struct {
 
 #endif
 
-#ifdef UA_ENABLE_MULTITHREADING
-
-#include <pthread.h>
-
-struct UA_Worker;
-typedef struct UA_Worker UA_Worker;
-
-struct UA_WorkerCallback;
-typedef struct UA_WorkerCallback UA_WorkerCallback;
-
-SIMPLEQ_HEAD(UA_DispatchQueue, UA_WorkerCallback);
-typedef struct UA_DispatchQueue UA_DispatchQueue;
-
-#endif /* UA_ENABLE_MULTITHREADING */
-
 #ifdef UA_ENABLE_DISCOVERY
 
 typedef struct registeredServer_list_entry {
+#ifdef UA_ENABLE_MULTITHREADING
+    UA_DelayedCallback delayedCleanup;
+#endif
     LIST_ENTRY(registeredServer_list_entry) pointers;
     UA_RegisteredServer registeredServer;
     UA_DateTime lastSeen;
 } registeredServer_list_entry;
 
 typedef struct periodicServerRegisterCallback_entry {
+#ifdef UA_ENABLE_MULTITHREADING
+    UA_DelayedCallback delayedCleanup;
+#endif
     LIST_ENTRY(periodicServerRegisterCallback_entry) pointers;
     struct PeriodicServerRegisterCallback *callback;
 } periodicServerRegisterCallback_entry;
@@ -75,6 +67,9 @@ typedef struct periodicServerRegisterCallback_entry {
 #include "mdnsd/libmdnsd/mdnsd.h"
 
 typedef struct serverOnNetwork_list_entry {
+#ifdef UA_ENABLE_MULTITHREADING
+    UA_DelayedCallback delayedCleanup;
+#endif
     LIST_ENTRY(serverOnNetwork_list_entry) pointers;
     UA_ServerOnNetwork serverOnNetwork;
     UA_DateTime created;
@@ -157,17 +152,7 @@ struct UA_Server {
     /* Callbacks with a repetition interval */
     UA_Timer timer;
 
-    /* Delayed callbacks */
-    SLIST_HEAD(DelayedCallbacksList, UA_DelayedCallback) delayedCallbacks;
-
-    /* Worker threads */
-#ifdef UA_ENABLE_MULTITHREADING
-    UA_Worker *workers; /* there are nThread workers in a running server */
-    UA_DispatchQueue dispatchQueue; /* Dispatch queue for the worker threads */
-    pthread_mutex_t dispatchQueue_accessMutex; /* mutex for access to queue */
-    pthread_cond_t dispatchQueue_condition; /* so the workers don't spin if the queue is empty */
-    pthread_mutex_t dispatchQueue_conditionMutex; /* mutex for access to condition variable */
-#endif
+    UA_WorkQueue workQueue;
 
     /* For bootstrapping, omit some consistency checks, creating a reference to
      * the parent and member instantiation */
@@ -229,31 +214,6 @@ UA_StatusCode UA_Server_editNode(UA_Server *server, UA_Session *session,
                                  UA_EditNodeCallback callback,
                                  void *data);
 
-/*************/
-/* Callbacks */
-/*************/
-
-/* Delayed callbacks are executed when all previously dispatched callbacks are
- * finished */
-UA_StatusCode
-UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback, void *data);
-
-UA_StatusCode
-UA_Server_delayedFree(UA_Server *server, void *data);
-
-#ifndef UA_ENABLE_MULTITHREADING
-/* Execute all delayed callbacks regardless of whether the worker threads have
- * finished previous work */
-void UA_Server_cleanupDelayedCallbacks(UA_Server *server);
-#else
-void UA_Server_cleanupDispatchQueue(UA_Server *server);
-#endif
-
-/* Callback is executed in the same thread or, if possible, dispatched to one of
- * the worker threads. */
-void
-UA_Server_workerCallback(UA_Server *server, UA_ServerCallback callback, void *data);
-
 /*********************/
 /* Utility Functions */
 /*********************/

+ 0 - 472
src/server/ua_server_worker.c

@@ -1,472 +0,0 @@
-/* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/. 
- *
- *    Copyright 2014-2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
- *    Copyright 2014-2016 (c) Sten Grüner
- *    Copyright 2015 (c) Chris Iatrou
- *    Copyright 2015 (c) Nick Goossens
- *    Copyright 2015 (c) Jörg Schüler-Maroldt
- *    Copyright 2015-2016 (c) Oleksiy Vasylyev
- *    Copyright 2016-2017 (c) Florian Palm
- *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
- *    Copyright 2016 (c) Lorenz Haas
- *    Copyright 2017 (c) Jonas Green
- */
-
-#include "ua_util.h"
-#include "ua_server_internal.h"
-
-#ifdef UA_ENABLE_VALGRIND_INTERACTIVE
-#include <valgrind/memcheck.h>
-#endif
-
-#define UA_MAXTIMEOUT 50 /* Max timeout in ms between main-loop iterations */
-
-/**
- * Worker Threads and Dispatch Queue
- * ---------------------------------
- * The worker threads dequeue callbacks from a central Multi-Producer
- * Multi-Consumer Queue (MPMC). When there are no callbacks, workers go idle.
- * The condition to wake them up is triggered whenever a callback is
- * dispatched.
- *
- * Future Plans: Use work-stealing to load-balance between cores.
- * Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory
- * models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */
-
-#ifdef UA_ENABLE_MULTITHREADING
-
-struct UA_Worker {
-    UA_Server *server;
-    pthread_t thr;
-    UA_UInt32 counter;
-    volatile UA_Boolean running;
-
-    /* separate cache lines */
-    char padding[64 - sizeof(void*) - sizeof(pthread_t) -
-                 sizeof(UA_UInt32) - sizeof(UA_Boolean)];
-};
-
-struct UA_WorkerCallback {
-    SIMPLEQ_ENTRY(UA_WorkerCallback) next;
-    UA_ServerCallback callback;
-    void *data;
-
-    UA_Boolean delayed;         /* Is it a delayed callback? */
-    UA_Boolean countersSampled; /* Have the worker counters been sampled? */
-    UA_UInt32 workerCounters[]; /* Counter value for each worker */
-};
-typedef struct UA_WorkerCallback WorkerCallback;
-
-/* Forward Declaration */
-static void
-processDelayedCallback(UA_Server *server, WorkerCallback *dc);
-
-static void *
-workerLoop(UA_Worker *worker) {
-    UA_Server *server = worker->server;
-    UA_UInt32 *counter = &worker->counter;
-    volatile UA_Boolean *running = &worker->running;
-
-    /* Initialize the (thread local) random seed with the ram address
-     * of the worker. Not for security-critical entropy! */
-    UA_random_seed((uintptr_t)worker);
-
-    while(*running) {
-        UA_atomic_addUInt32(counter, 1);
-        pthread_mutex_lock(&server->dispatchQueue_accessMutex);
-        WorkerCallback *dc = SIMPLEQ_FIRST(&server->dispatchQueue);
-        if(dc) {
-            SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
-        }
-        pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
-        if(!dc) {
-            /* Nothing to do. Sleep until a callback is dispatched */
-            pthread_mutex_lock(&server->dispatchQueue_conditionMutex);
-            pthread_cond_wait(&server->dispatchQueue_condition,
-                              &server->dispatchQueue_conditionMutex);
-            pthread_mutex_unlock(&server->dispatchQueue_conditionMutex);
-            continue;
-        }
-
-        if(dc->delayed) {
-            processDelayedCallback(server, dc);
-            continue;
-        }
-
-        dc->callback(server, dc->data);
-        UA_free(dc);
-    }
-
-    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
-                 "Worker shut down");
-    return NULL;
-}
-
-void UA_Server_cleanupDispatchQueue(UA_Server *server) {
-    while(true) {
-        pthread_mutex_lock(&server->dispatchQueue_accessMutex);
-        WorkerCallback *dc = SIMPLEQ_FIRST(&server->dispatchQueue);
-        if(!dc) {
-            pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
-            break;
-        }
-        SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
-        pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
-        dc->callback(server, dc->data);
-        UA_free(dc);
-    }
-}
-
-#endif
-
-/**
- * Repeated Callbacks
- * ------------------
- * Repeated Callbacks are handled by UA_Timer (used in both client and server).
- * In the multi-threaded case, callbacks are dispatched to workers. Otherwise,
- * they are executed immediately. */
-
-void
-UA_Server_workerCallback(UA_Server *server, UA_ServerCallback callback,
-                         void *data) {
-#ifndef UA_ENABLE_MULTITHREADING
-    /* Execute immediately */
-    callback(server, data);
-#else
-    /* Execute immediately if memory could not be allocated */
-    WorkerCallback *dc = (WorkerCallback*)UA_malloc(sizeof(WorkerCallback));
-    if(!dc) {
-        callback(server, data);
-        return;
-    }
-
-    /* Enqueue for the worker threads */
-    dc->callback = callback;
-    dc->data = data;
-    dc->delayed = false;
-    pthread_mutex_lock(&server->dispatchQueue_accessMutex);
-    SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
-    pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
-
-    /* Wake up sleeping workers */
-    pthread_cond_broadcast(&server->dispatchQueue_condition);
-#endif
-}
-
-/**
- * Delayed Callbacks
- * -----------------
- *
- * Delayed Callbacks are called only when all callbacks that were dispatched
- * prior are finished. In the single-threaded case, the callback is added to a
- * singly-linked list that is processed at the end of the server's main-loop. In
- * the multi-threaded case, the delay is ensure by a three-step procedure:
- *
- * 1. The delayed callback is dispatched to the worker queue. So it is only
- *    dequeued when all prior callbacks have been dequeued.
- *
- * 2. When the callback is first dequeued by a worker, sample the counter of all
- *    workers. Once all counters have advanced, the callback is ready.
- *
- * 3. Check regularly if the callback is ready by adding it back to the dispatch
- *    queue. */
-
-/* Delayed callback to free the subscription memory */
-static void
-freeCallback(UA_Server *server, void *data) {
-    UA_free(data);
-}
-
-/* TODO: Delayed free should never fail. This can be achieved by adding a prefix
- * with the list pointers */
-UA_StatusCode
-UA_Server_delayedFree(UA_Server *server, void *data) {
-    return UA_Server_delayedCallback(server, freeCallback, data);
-}
-
-#ifndef UA_ENABLE_MULTITHREADING
-
-typedef struct UA_DelayedCallback {
-    SLIST_ENTRY(UA_DelayedCallback) next;
-    UA_ServerCallback callback;
-    void *data;
-} UA_DelayedCallback;
-
-UA_StatusCode
-UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
-                          void *data) {
-    UA_DelayedCallback *dc =
-        (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback));
-    if(!dc)
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-
-    dc->callback = callback;
-    dc->data = data;
-    SLIST_INSERT_HEAD(&server->delayedCallbacks, dc, next);
-    return UA_STATUSCODE_GOOD;
-}
-
-void UA_Server_cleanupDelayedCallbacks(UA_Server *server) {
-    UA_DelayedCallback *dc, *dc_tmp;
-    SLIST_FOREACH_SAFE(dc, &server->delayedCallbacks, next, dc_tmp) {
-        SLIST_REMOVE(&server->delayedCallbacks, dc, UA_DelayedCallback, next);
-        dc->callback(server, dc->data);
-        UA_free(dc);
-    }
-}
-
-#else /* UA_ENABLE_MULTITHREADING */
-
-UA_StatusCode
-UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
-                          void *data) {
-    size_t dcsize = sizeof(WorkerCallback) +
-        (sizeof(UA_UInt32) * server->config.nThreads);
-    WorkerCallback *dc = (WorkerCallback*)UA_malloc(dcsize);
-    if(!dc)
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-
-    /* Enqueue for the worker threads */
-    dc->callback = callback;
-    dc->data = data;
-    dc->delayed = true;
-    dc->countersSampled = false;
-    pthread_mutex_lock(&server->dispatchQueue_accessMutex);
-    SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
-    pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
-
-    /* Wake up sleeping workers */
-    pthread_cond_broadcast(&server->dispatchQueue_condition);
-    return UA_STATUSCODE_GOOD;
-}
-
-/* Called from the worker loop */
-static void
-processDelayedCallback(UA_Server *server, WorkerCallback *dc) {
-    /* Set the worker counters */
-    if(!dc->countersSampled) {
-        for(size_t i = 0; i < server->config.nThreads; ++i)
-            dc->workerCounters[i] = server->workers[i].counter;
-        dc->countersSampled = true;
-
-        /* Re-add to the dispatch queue */
-        pthread_mutex_lock(&server->dispatchQueue_accessMutex);
-        SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
-        pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
-
-        /* Wake up sleeping workers */
-        pthread_cond_broadcast(&server->dispatchQueue_condition);
-        return;
-    }
-
-    /* Have all other jobs finished? */
-    UA_Boolean ready = true;
-    for(size_t i = 0; i < server->config.nThreads; ++i) {
-        if(dc->workerCounters[i] == server->workers[i].counter) {
-            ready = false;
-            break;
-        }
-    }
-
-    /* Re-add to the dispatch queue.
-     * TODO: What is the impact of this loop?
-     * Can we add a small delay here? */
-    if(!ready) {
-        pthread_mutex_lock(&server->dispatchQueue_accessMutex);
-        SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
-        pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
-
-        /* Wake up sleeping workers */
-        pthread_cond_broadcast(&server->dispatchQueue_condition);
-        return;
-    }
-
-    /* Execute the callback */
-    dc->callback(server, dc->data);
-    UA_free(dc);
-}
-
-#endif
-
-/**
- * Main Server Loop
- * ----------------
- * Start: Spin up the workers and the network layer and sample the server's
- *        start time.
- * Iterate: Process repeated callbacks and events in the network layer.
- *          This part can be driven from an external main-loop in an
- *          event-driven single-threaded architecture.
- * Stop: Stop workers, finish all callbacks, stop the network layer,
- *       clean up */
-
-UA_StatusCode
-UA_Server_run_startup(UA_Server *server) {
-    UA_Variant var;
-    UA_StatusCode result = UA_STATUSCODE_GOOD;
-	
-	/* At least one endpoint has to be configured */
-    if(server->config.endpointsSize == 0) {
-        UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER,
-                       "There has to be at least one endpoint.");
-    }
-
-    /* Sample the start time and set it to the Server object */
-    server->startTime = UA_DateTime_now();
-    UA_Variant_init(&var);
-    UA_Variant_setScalar(&var, &server->startTime, &UA_TYPES[UA_TYPES_DATETIME]);
-    UA_Server_writeValue(server,
-                         UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_STARTTIME),
-                         var);
-
-    /* Start the networklayers */
-    for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
-        UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
-        result |= nl->start(nl, &server->config.customHostname);
-    }
-
-    /* Spin up the worker threads */
-#ifdef UA_ENABLE_MULTITHREADING
-    UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
-                "Spinning up %u worker thread(s)", server->config.nThreads);
-    pthread_mutex_init(&server->dispatchQueue_accessMutex, NULL);
-    pthread_cond_init(&server->dispatchQueue_condition, NULL);
-    pthread_mutex_init(&server->dispatchQueue_conditionMutex, NULL);
-    server->workers = (UA_Worker*)UA_malloc(server->config.nThreads * sizeof(UA_Worker));
-    if(!server->workers)
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-    for(size_t i = 0; i < server->config.nThreads; ++i) {
-        UA_Worker *worker = &server->workers[i];
-        worker->server = server;
-        worker->counter = 0;
-        worker->running = true;
-        pthread_create(&worker->thr, NULL, (void* (*)(void*))workerLoop, worker);
-    }
-#endif
-
-    /* Start the multicast discovery server */
-#ifdef UA_ENABLE_DISCOVERY_MULTICAST
-    if(server->config.applicationDescription.applicationType ==
-       UA_APPLICATIONTYPE_DISCOVERYSERVER)
-        startMulticastDiscoveryServer(server);
-#endif
-
-    return result;
-}
-
-UA_UInt16
-UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
-    /* Process repeated work */
-    UA_DateTime now = UA_DateTime_nowMonotonic();
-    UA_DateTime nextRepeated =
-        UA_Timer_process(&server->timer, now,
-                         (UA_TimerDispatchCallback)UA_Server_workerCallback,
-                         server);
-    UA_DateTime latest = now + (UA_MAXTIMEOUT * UA_DATETIME_MSEC);
-    if(nextRepeated > latest)
-        nextRepeated = latest;
-
-    UA_UInt16 timeout = 0;
-
-    /* round always to upper value to avoid timeout to be set to 0
-    * if(nextRepeated - now) < (UA_DATETIME_MSEC/2) */
-    if(waitInternal)
-        timeout = (UA_UInt16)(((nextRepeated - now) + (UA_DATETIME_MSEC - 1)) / UA_DATETIME_MSEC);
-
-    /* Listen on the networklayer */
-    for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
-        UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
-        nl->listen(nl, server, timeout);
-    }
-
-#ifndef UA_ENABLE_MULTITHREADING
-    /* Process delayed callbacks when all callbacks and network events are done.
-     * If multithreading is enabled, the cleanup of delayed values is attempted
-     * by a callback in the job queue. */
-    UA_Server_cleanupDelayedCallbacks(server);
-#endif
-
-#if defined(UA_ENABLE_DISCOVERY_MULTICAST) && !defined(UA_ENABLE_MULTITHREADING)
-    if(server->config.applicationDescription.applicationType ==
-       UA_APPLICATIONTYPE_DISCOVERYSERVER) {
-        // TODO multicastNextRepeat does not consider new input data (requests)
-        // on the socket. It will be handled on the next call. if needed, we
-        // need to use select with timeout on the multicast socket
-        // server->mdnsSocket (see example in mdnsd library) on higher level.
-        UA_DateTime multicastNextRepeat = 0;
-        UA_StatusCode hasNext =
-            iterateMulticastDiscoveryServer(server, &multicastNextRepeat, UA_TRUE);
-        if(hasNext == UA_STATUSCODE_GOOD && multicastNextRepeat < nextRepeated)
-            nextRepeated = multicastNextRepeat;
-    }
-#endif
-
-    now = UA_DateTime_nowMonotonic();
-    timeout = 0;
-    if(nextRepeated > now)
-        timeout = (UA_UInt16)((nextRepeated - now) / UA_DATETIME_MSEC);
-    return timeout;
-}
-
-UA_StatusCode
-UA_Server_run_shutdown(UA_Server *server) {
-    /* Stop the netowrk layer */
-    for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
-        UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
-        nl->stop(nl, server);
-    }
-
-#ifdef UA_ENABLE_MULTITHREADING
-    /* Shut down the workers */
-    if(server->workers) {
-        UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
-                    "Shutting down %u worker thread(s)",
-                    server->config.nThreads);
-        for(size_t i = 0; i < server->config.nThreads; ++i)
-            server->workers[i].running = false;
-        pthread_cond_broadcast(&server->dispatchQueue_condition);
-        for(size_t i = 0; i < server->config.nThreads; ++i)
-            pthread_join(server->workers[i].thr, NULL);
-        UA_free(server->workers);
-        server->workers = NULL;
-    }
-
-    /* Execute the remaining callbacks in the dispatch queue. Also executes
-     * delayed callbacks. */
-    UA_Server_cleanupDispatchQueue(server);
-#else
-    /* Process remaining delayed callbacks */
-    UA_Server_cleanupDelayedCallbacks(server);
-#endif
-
-#ifdef UA_ENABLE_DISCOVERY_MULTICAST
-    /* Stop multicast discovery */
-    if(server->config.applicationDescription.applicationType ==
-       UA_APPLICATIONTYPE_DISCOVERYSERVER)
-        stopMulticastDiscoveryServer(server);
-#endif
-
-    return UA_STATUSCODE_GOOD;
-}
-
-UA_StatusCode
-UA_Server_run(UA_Server *server, volatile UA_Boolean *running) {
-    UA_StatusCode retval = UA_Server_run_startup(server);
-    if(retval != UA_STATUSCODE_GOOD)
-        return retval;
-#ifdef UA_ENABLE_VALGRIND_INTERACTIVE
-    size_t loopCount = 0;
-#endif
-    while(*running) {
-#ifdef UA_ENABLE_VALGRIND_INTERACTIVE
-        if(loopCount == 0) {
-            VALGRIND_DO_LEAK_CHECK;
-        }
-        ++loopCount;
-        loopCount %= UA_VALGRIND_INTERACTIVE_INTERVAL;
-#endif
-        UA_Server_run_iterate(server, true);
-    }
-    return UA_Server_run_shutdown(server);
-}

+ 4 - 9
src/server/ua_services_discovery.c

@@ -300,13 +300,6 @@ error:
 
 #ifdef UA_ENABLE_DISCOVERY
 
-#ifdef UA_ENABLE_MULTITHREADING
-static void
-freeEntry(UA_Server *server, void *entry) {
-    UA_free(entry);
-}
-#endif
-
 static void
 process_RegisterServer(UA_Server *server, UA_Session *session,
                        const UA_RequestHeader* requestHeader,
@@ -428,7 +421,8 @@ process_RegisterServer(UA_Server *server, UA_Session *session,
         server->registeredServersSize--;
 #else
         UA_atomic_subSize(&server->registeredServersSize, 1);
-        UA_Server_delayedCallback(server, freeEntry, registeredServer_entry);
+        registeredServer_entry->delayedCleanup.callback = NULL; /* only free the structure */
+        UA_WorkQueue_enqueueDelayed(&server->workQueue, &registeredServer_entry->delayedCleanup);
 #endif
         responseHeader->serviceResult = UA_STATUSCODE_GOOD;
         return;
@@ -543,7 +537,8 @@ void UA_Discovery_cleanupTimedOut(UA_Server *server, UA_DateTime nowMonotonic) {
             server->registeredServersSize--;
 #else
             UA_atomic_subSize(&server->registeredServersSize, 1);
-            UA_Server_delayedCallback(server, freeEntry, current);
+            current->delayedCleanup.callback = NULL; /* Only free the structure */
+            UA_WorkQueue_enqueueDelayed(&server->workQueue, &current->delayedCleanup);
 #endif
         }
     }

+ 4 - 8
src/server/ua_session.c

@@ -106,14 +106,10 @@ UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
     UA_Subscription_deleteMembers(server, sub);
 
     /* Add a delayed callback to remove the subscription when the currently
-     * scheduled jobs have completed */
-    UA_StatusCode retval = UA_Server_delayedFree(server, sub);
-    if(retval != UA_STATUSCODE_GOOD) {
-        UA_LOG_WARNING_SESSION(server->config.logger, session,
-                       "Could not remove subscription with error code %s",
-                       UA_StatusCode_name(retval));
-        return retval; /* Try again next time */
-    }
+     * scheduled jobs have completed. There is no actual delayed callback. Just
+     * free the structure. */
+    sub->delayedFreePointers.callback = NULL;
+    UA_WorkQueue_enqueueDelayed(&server->workQueue, &sub->delayedFreePointers);
 
     /* Remove from the session */
     LIST_REMOVE(sub, listEntry);

+ 13 - 17
src/server/ua_session_manager.c

@@ -31,13 +31,11 @@ void UA_SessionManager_deleteMembers(UA_SessionManager *sm) {
 
 /* Delayed callback to free the session memory */
 static void
-removeSessionCallback(UA_Server *server, void *entry) {
-    session_list_entry *sentry = (session_list_entry*)entry;
-    UA_Session_deleteMembersCleanup(&sentry->session, server);
-    UA_free(sentry);
+removeSessionCallback(UA_Server *server, session_list_entry *entry) {
+    UA_Session_deleteMembersCleanup(&entry->session, server);
 }
 
-static UA_StatusCode
+static void
 removeSession(UA_SessionManager *sm, session_list_entry *sentry) {
     /* Detach the Session from the SecureChannel */
     UA_Session_detachFromSecureChannel(&sentry->session);
@@ -45,21 +43,17 @@ removeSession(UA_SessionManager *sm, session_list_entry *sentry) {
     /* Deactivate the session */
     sentry->session.activated = false;
 
-    /* Add a delayed callback to remove the session when the currently
-     * scheduled jobs have completed */
-    UA_StatusCode retval = UA_Server_delayedCallback(sm->server, removeSessionCallback, sentry);
-    if(retval != UA_STATUSCODE_GOOD) {
-        UA_LOG_WARNING_SESSION(sm->server->config.logger, &sentry->session,
-                       "Could not remove session with error code %s",
-                       UA_StatusCode_name(retval));
-        return retval; /* Try again next time */
-    }
-
     /* Detach the session from the session manager and make the capacity
      * available */
     LIST_REMOVE(sentry, pointers);
     UA_atomic_subUInt32(&sm->currentSessionCount, 1);
-    return UA_STATUSCODE_GOOD;
+
+    /* Add a delayed callback to remove the session when the currently
+     * scheduled jobs have completed */
+    sentry->cleanupCallback.callback = (UA_ApplicationCallback)removeSessionCallback;
+    sentry->cleanupCallback.application = sm->server;
+    sentry->cleanupCallback.data = sentry;
+    UA_WorkQueue_enqueueDelayed(&sm->server->workQueue, &sentry->cleanupCallback);
 }
 
 void
@@ -175,5 +169,7 @@ UA_SessionManager_removeSession(UA_SessionManager *sm, const UA_NodeId *token) {
     }
     if(!current)
         return UA_STATUSCODE_BADSESSIONIDINVALID;
-    return removeSession(sm, current);
+
+    removeSession(sm, current);
+    return UA_STATUSCODE_GOOD;
 }

+ 2 - 0
src/server/ua_session_manager.h

@@ -13,6 +13,7 @@
 #define UA_SESSION_MANAGER_H_
 
 #include "ua_server.h"
+#include "ua_workqueue.h"
 #include "ua_util_internal.h"
 #include "ua_session.h"
 #include "../../deps/queue.h"
@@ -20,6 +21,7 @@
 _UA_BEGIN_DECLS
 
 typedef struct session_list_entry {
+    UA_DelayedCallback cleanupCallback;
     LIST_ENTRY(session_list_entry) pointers;
     UA_Session session;
 } session_list_entry;

+ 2 - 0
src/server/ua_subscription.h

@@ -83,6 +83,7 @@ void UA_Notification_delete(UA_Subscription *sub, UA_MonitoredItem *mon,
 typedef TAILQ_HEAD(NotificationQueue, UA_Notification) NotificationQueue;
 
 struct UA_MonitoredItem {
+    UA_DelayedCallback delayedFreePointers;
     LIST_ENTRY(UA_MonitoredItem) listEntry;
     UA_Subscription *subscription;
     UA_UInt32 monitoredItemId;
@@ -158,6 +159,7 @@ typedef enum {
 typedef TAILQ_HEAD(ListOfNotificationMessages, UA_NotificationMessageEntry) ListOfNotificationMessages;
 
 struct UA_Subscription {
+    UA_DelayedCallback delayedFreePointers;
     LIST_ENTRY(UA_Subscription) listEntry;
     UA_Session *session;
     UA_UInt32 subscriptionId;

+ 4 - 1
src/server/ua_subscription_monitoreditem.c

@@ -82,7 +82,10 @@ UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
     UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
     UA_Variant_deleteMembers(&monitoredItem->lastValue);
     UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
-    UA_Server_delayedFree(server, monitoredItem);
+
+    /* No actual callback, just remove the structure */
+    monitoredItem->delayedFreePointers.callback = NULL;
+    UA_WorkQueue_enqueueDelayed(&server->workQueue, &monitoredItem->delayedFreePointers);
 }
 
 UA_StatusCode

+ 11 - 8
src/ua_timer.c

@@ -33,7 +33,8 @@ struct UA_TimerCallbackEntry {
     UA_UInt64 interval;                      /* Interval in 100ns resolution */
     UA_UInt64 id;                            /* Id of the repeated callback */
 
-    UA_TimerCallback callback;
+    UA_ApplicationCallback callback;
+    void *application;
     void *data;
 };
 
@@ -88,8 +89,8 @@ dequeueChange(UA_Timer *t) {
  * future. This will be picked up in the next iteration and inserted at the
  * correct place. So that the next execution takes place ät "nextTime". */
 UA_StatusCode
-UA_Timer_addRepeatedCallback(UA_Timer *t, UA_TimerCallback callback,
-                             void *data, UA_UInt32 interval,
+UA_Timer_addRepeatedCallback(UA_Timer *t, UA_ApplicationCallback callback,
+                             void *application, void *data, UA_UInt32 interval,
                              UA_UInt64 *callbackId) {
     /* A callback method needs to be present */
     if(!callback)
@@ -109,6 +110,7 @@ UA_Timer_addRepeatedCallback(UA_Timer *t, UA_TimerCallback callback,
     tc->interval = (UA_UInt64)interval * UA_DATETIME_MSEC;
     tc->id = ++t->idCounter;
     tc->callback = callback;
+    tc->application = application;
     tc->data = data;
     tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
 
@@ -173,7 +175,7 @@ UA_Timer_changeRepeatedCallbackInterval(UA_Timer *t, UA_UInt64 callbackId,
     tc->interval = (UA_UInt64)interval * UA_DATETIME_MSEC;
     tc->id = callbackId;
     tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
-    tc->callback = (UA_TimerCallback)CHANGE_SENTINEL;
+    tc->callback = (UA_ApplicationCallback)CHANGE_SENTINEL;
 
     /* Enqueue the changes in the MPSC queue */
     enqueueChange(t, tc);
@@ -219,7 +221,7 @@ UA_Timer_removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
 
     /* Set the repeated callback with the sentinel nextTime */
     tc->id = callbackId;
-    tc->callback = (UA_TimerCallback)REMOVE_SENTINEL;
+    tc->callback = (UA_ApplicationCallback)REMOVE_SENTINEL;
 
     /* Enqueue the changes in the MPSC queue */
     enqueueChange(t, tc);
@@ -265,8 +267,8 @@ processChanges(UA_Timer *t) {
 
 UA_DateTime
 UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic,
-                 UA_TimerDispatchCallback dispatchCallback,
-                 void *application) {
+                 UA_TimerExecutionCallback executionCallback,
+                 void *executionApplication) {
     /* Insert and remove callbacks */
     processChanges(t);
 
@@ -303,7 +305,8 @@ UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic,
         SLIST_REMOVE_HEAD(&executedNowList, next);
 
         /* Dispatch/process callback */
-        dispatchCallback(application, tc->callback, tc->data);
+        executionCallback(executionApplication, tc->callback,
+                          tc->application, tc->data);
 
         /* Set the time for the next execution. Prevent an infinite loop by
          * forcing the next processing into the next iteration. */

+ 8 - 8
src/ua_timer.h

@@ -10,6 +10,7 @@
 #define UA_TIMER_H_
 
 #include "ua_util_internal.h"
+#include "ua_workqueue.h"
 
 _UA_BEGIN_DECLS
 
@@ -49,11 +50,9 @@ void UA_Timer_init(UA_Timer *t);
 
 /* Add a repated callback. Thread-safe, can be used in parallel and in parallel
  * with UA_Timer_process. */
-typedef void (*UA_TimerCallback)(void *application, void *data);
-
 UA_StatusCode
-UA_Timer_addRepeatedCallback(UA_Timer *t, UA_TimerCallback callback, void *data,
-                             UA_UInt32 interval, UA_UInt64 *callbackId);
+UA_Timer_addRepeatedCallback(UA_Timer *t, UA_ApplicationCallback callback, void *application,
+                             void *data, UA_UInt32 interval, UA_UInt64 *callbackId);
 
 /* Change the callback interval. If this is called from within the callback. The
  * adjustment is made during the next _process call. */
@@ -70,13 +69,14 @@ UA_Timer_removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId);
  * timestamp of the next scheduled repeated callback. Not thread-safe.
  * Application is a pointer to the client / server environment for the callback.
  * Dispatched is set to true when at least one callback was run / dispatched. */
-typedef void (*UA_TimerDispatchCallback)(void *application, UA_TimerCallback callback,
-                                         void *data);
+typedef void
+(*UA_TimerExecutionCallback)(void *executionApplication, UA_ApplicationCallback cb,
+                             void *callbackApplication, void *data);
 
 UA_DateTime
 UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic,
-                 UA_TimerDispatchCallback dispatchCallback,
-                 void *application);
+                 UA_TimerExecutionCallback executionCallback,
+                 void *executionApplication);
 
 /* Remove all repeated callbacks. Not thread-safe. */
 void UA_Timer_deleteMembers(UA_Timer *t);

+ 260 - 0
src/ua_workqueue.c

@@ -0,0 +1,260 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. 
+ *
+ *    Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
+ *    Copyright 2014-2016 (c) Sten Grüner
+ *    Copyright 2015 (c) Chris Iatrou
+ *    Copyright 2015 (c) Nick Goossens
+ *    Copyright 2015 (c) Jörg Schüler-Maroldt
+ *    Copyright 2015-2016 (c) Oleksiy Vasylyev
+ *    Copyright 2016-2017 (c) Florian Palm
+ *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
+ *    Copyright 2016 (c) Lorenz Haas
+ *    Copyright 2017 (c) Jonas Green
+ */
+
+#include "ua_workqueue.h"
+
+void UA_WorkQueue_init(UA_WorkQueue *wq) {
+    /* Initialized the linked list for delayed callbacks */
+    SIMPLEQ_INIT(&wq->delayedCallbacks);
+
+#ifdef UA_ENABLE_MULTITHREADING
+    wq->delayedCallbacks_checkpoint = NULL;
+    pthread_mutex_init(&wq->delayedCallbacks_accessMutex,  NULL);
+
+    /* Initialize the dispatch queue for worker threads */
+    SIMPLEQ_INIT(&wq->dispatchQueue);
+    pthread_mutex_init(&wq->dispatchQueue_accessMutex, NULL);
+    pthread_cond_init(&wq->dispatchQueue_condition, NULL);
+    pthread_mutex_init(&wq->dispatchQueue_conditionMutex, NULL);
+#endif
+}
+
+#ifdef UA_ENABLE_MULTITHREADING
+/* Forward declaration */
+static void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq);
+#endif
+
+void UA_WorkQueue_cleanup(UA_WorkQueue *wq) {
+#ifdef UA_ENABLE_MULTITHREADING
+    /* Shut down workers */
+    UA_WorkQueue_stop(wq);
+
+    /* Execute remaining work in the dispatch queue */
+    while(true) {
+        pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
+        UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue);
+        if(!dc) {
+            pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+            break;
+        }
+        SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next);
+        pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+        dc->callback(dc->application, dc->data);
+        UA_free(dc);
+    }
+#endif
+
+    /* All workers are shut down. Execute remaining delayed work here. */
+    UA_WorkQueue_manuallyProcessDelayed(wq);
+
+#ifdef UA_ENABLE_MULTITHREADING
+    wq->delayedCallbacks_checkpoint = NULL;
+    pthread_mutex_destroy(&wq->dispatchQueue_accessMutex);
+    pthread_cond_destroy(&wq->dispatchQueue_condition);
+    pthread_mutex_destroy(&wq->dispatchQueue_conditionMutex);
+    pthread_mutex_destroy(&wq->delayedCallbacks_accessMutex);
+#endif
+}
+
+/***********/
+/* Workers */
+/***********/
+
+#ifdef UA_ENABLE_MULTITHREADING
+
+static void *
+workerLoop(UA_Worker *worker) {
+    UA_WorkQueue *wq = worker->queue;
+    UA_UInt32 *counter = &worker->counter;
+    volatile UA_Boolean *running = &worker->running;
+
+    /* Initialize the (thread local) random seed with the ram address
+     * of the worker. Not for security-critical entropy! */
+    UA_random_seed((uintptr_t)worker);
+
+    while(*running) {
+        UA_atomic_addUInt32(counter, 1);
+
+        /* Remove a callback from the queue */
+        pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
+        UA_DelayedCallback *dc = SIMPLEQ_FIRST(&wq->dispatchQueue);
+        if(dc)
+            SIMPLEQ_REMOVE_HEAD(&wq->dispatchQueue, next);
+        pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+
+        /* Nothing to do. Sleep until a callback is dispatched */
+        if(!dc) {
+            pthread_mutex_lock(&wq->dispatchQueue_conditionMutex);
+            pthread_cond_wait(&wq->dispatchQueue_condition,
+                              &wq->dispatchQueue_conditionMutex);
+            pthread_mutex_unlock(&wq->dispatchQueue_conditionMutex);
+            continue;
+        }
+
+        /* Execute */
+        if(dc->callback)
+            dc->callback(dc->application, dc->data);
+        UA_free(dc);
+    }
+
+    return NULL;
+}
+
+/* Can be called repeatedly and starts additional workers */
+UA_StatusCode
+UA_WorkQueue_start(UA_WorkQueue *wq, size_t workersCount) {
+    if(wq->workersSize > 0 || workersCount == 0)
+        return UA_STATUSCODE_BADINTERNALERROR;
+    
+    /* Create the worker array */
+    wq->workers = (UA_Worker*)UA_calloc(workersCount, sizeof(UA_Worker));
+    if(!wq->workers)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    wq->workersSize = workersCount;
+
+    /* Spin up the workers */
+    for(size_t i = 0; i < workersCount; ++i) {
+        UA_Worker *w = &wq->workers[i];
+        w->queue = wq;
+        w->counter = 0;
+        w->running = true;
+        pthread_create(&w->thread, NULL, (void* (*)(void*))workerLoop, w);
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
+void UA_WorkQueue_stop(UA_WorkQueue *wq) {
+    if(wq->workersSize == 0)
+        return;
+
+    /* Signal the workers to stop */
+    for(size_t i = 0; i < wq->workersSize; ++i)
+        wq->workers[i].running = false;
+
+    /* Wake up all workers */
+    pthread_cond_broadcast(&wq->dispatchQueue_condition);
+
+    /* Wait for the workers to finish, then clean up */
+    for(size_t i = 0; i < wq->workersSize; ++i)
+        pthread_join(wq->workers[i].thread, NULL);
+
+    UA_free(wq->workers);
+    wq->workers = NULL;
+    wq->workersSize = 0;
+}
+
+void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb,
+                          void *application, void *data) {
+    UA_DelayedCallback *dc = (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback));
+    if(!dc) {
+        cb(application, data); /* Execute immediately if the memory could not be allocated */
+        return;
+    }
+
+    dc->callback = cb;
+    dc->application = application;
+    dc->data = data;
+
+    /* Enqueue for the worker threads */
+    pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
+    SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, dc, next);
+    pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+
+    /* Wake up sleeping workers */
+    pthread_cond_broadcast(&wq->dispatchQueue_condition);
+}
+
+#endif
+
+/*********************/
+/* Delayed Callbacks */
+/*********************/
+
+#ifdef UA_ENABLE_MULTITHREADING
+
+/* Delayed Callbacks are called only when all callbacks that were dispatched
+ * prior are finished. After every UA_MAX_DELAYED_SAMPLE delayed Callbacks that
+ * were added to the queue, we sample the counters from the workers. The
+ * counters are compared to the last counters that were sampled. If every worker
+ * has proceeded the counter, then we know that all delayed callbacks prior to
+ * the last sample-point are safe to execute. */
+
+/* Sample the worker counter for every nth delayed callback. This is used to
+ * test that all workers have **finished** their current job before the delayed
+ * callback is processed. */
+#define UA_MAX_DELAYED_SAMPLE 100
+
+/* Call only with a held mutex for the delayed callbacks */
+static void
+dispatchDelayedCallbacks(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
+    /* Are callbacks before the last checkpoint ready? */
+    for(size_t i = 0; i < wq->workersSize; ++i) {
+        if(wq->workers[i].counter == wq->workers[i].checkpointCounter)
+            return;
+    }
+
+    /* Dispatch all delayed callbacks up to the checkpoint.
+     * TODO: Move over the entire queue up to the checkpoint in one step. */
+    if(wq->delayedCallbacks_checkpoint != NULL) {
+        UA_DelayedCallback *iter, *tmp_iter;
+        SIMPLEQ_FOREACH_SAFE(iter, &wq->delayedCallbacks, next, tmp_iter) {
+            pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
+            SIMPLEQ_INSERT_TAIL(&wq->dispatchQueue, iter, next);
+            pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+            if(iter == wq->delayedCallbacks_checkpoint)
+                break;
+        }
+    }
+
+    /* Create the new sample point */
+    for(size_t i = 0; i < wq->workersSize; ++i)
+        wq->workers[i].checkpointCounter = wq->workers[i].counter;
+    wq->delayedCallbacks_checkpoint = cb;
+}
+
+#endif
+
+void
+UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb) {
+#ifdef UA_ENABLE_MULTITHREADING
+    pthread_mutex_lock(&wq->dispatchQueue_accessMutex);
+#endif
+
+    SIMPLEQ_INSERT_HEAD(&wq->delayedCallbacks, cb, next);
+
+#ifdef UA_ENABLE_MULTITHREADING
+    wq->delayedCallbacks_sinceDispatch++;
+    if(wq->delayedCallbacks_sinceDispatch > UA_MAX_DELAYED_SAMPLE) {
+        dispatchDelayedCallbacks(wq, cb);
+        wq->delayedCallbacks_sinceDispatch = 0;
+    }
+    pthread_mutex_unlock(&wq->dispatchQueue_accessMutex);
+#endif
+}
+
+/* Assumes all workers are shut down */
+void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq) {
+    UA_DelayedCallback *dc, *dc_tmp;
+    SIMPLEQ_FOREACH_SAFE(dc, &wq->delayedCallbacks, next, dc_tmp) {
+        SIMPLEQ_REMOVE_HEAD(&wq->delayedCallbacks, next);
+        if(dc->callback)
+            dc->callback(dc->application, dc->data);
+        UA_free(dc);
+    }
+#ifdef UA_ENABLE_MULTITHREADING
+    wq->delayedCallbacks_checkpoint = NULL;
+#endif
+}

+ 129 - 0
src/ua_workqueue.h

@@ -0,0 +1,129 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ *    Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
+ *    Copyright 2014-2016 (c) Sten Grüner
+ *    Copyright 2015 (c) Chris Iatrou
+ *    Copyright 2015 (c) Nick Goossens
+ *    Copyright 2015 (c) Jörg Schüler-Maroldt
+ *    Copyright 2015-2016 (c) Oleksiy Vasylyev
+ *    Copyright 2016-2017 (c) Florian Palm
+ *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
+ *    Copyright 2016 (c) Lorenz Haas
+ *    Copyright 2017 (c) Jonas Green
+ */
+
+#ifndef UA_WORKQUEUE_H_
+#define UA_WORKQUEUE_H_
+
+#include "ua_util_internal.h"
+#include "../../deps/queue.h"
+
+#ifdef UA_ENABLE_MULTITHREADING
+#include <pthread.h>
+#endif
+
+_UA_BEGIN_DECLS
+
+/* Callback where the application is either a client or a server */
+typedef void (*UA_ApplicationCallback)(void *application, void *data);
+
+/* Delayed callbacks are executed when all previously enqueue work is finished.
+ * This is used to free memory that might used by a parallel worker or where the
+ * current threat has remaining pointers to until the current operation
+ * finishes. */
+typedef struct UA_DelayedCallback {
+    SIMPLEQ_ENTRY(UA_DelayedCallback) next;
+    UA_ApplicationCallback callback;
+    void *application;
+    void *data;
+} UA_DelayedCallback;
+
+struct UA_WorkQueue;
+typedef struct UA_WorkQueue UA_WorkQueue;
+
+#ifdef UA_ENABLE_MULTITHREADING
+
+/* Workers take out callbacks from the work queue and execute them.
+ *
+ * Future Plans: Use work-stealing to load-balance between cores.
+ * Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory
+ * models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */
+typedef struct {
+    pthread_t thread;
+    volatile UA_Boolean running;
+    UA_WorkQueue *queue;
+    UA_UInt32 counter;
+    UA_UInt32 checkpointCounter; /* Counter when the last checkpoint was made
+                                  * for the delayed callbacks */
+
+    /* separate cache lines */
+    char padding[64 - sizeof(void*) - sizeof(pthread_t) -
+                 sizeof(UA_UInt32) - sizeof(UA_Boolean)];
+} UA_Worker;
+
+#endif
+
+struct UA_WorkQueue {
+    /* Worker threads and work queue. Without multithreading, work is executed
+       immediately. */
+#ifdef UA_ENABLE_MULTITHREADING
+    UA_Worker *workers;
+    size_t workersSize;
+
+    /* Work queue */
+    SIMPLEQ_HEAD(, UA_DelayedCallback) dispatchQueue; /* Dispatch queue for the worker threads */
+    pthread_mutex_t dispatchQueue_accessMutex; /* mutex for access to queue */
+    pthread_cond_t dispatchQueue_condition; /* so the workers don't spin if the queue is empty */
+    pthread_mutex_t dispatchQueue_conditionMutex; /* mutex for access to condition variable */
+#endif
+
+    /* Delayed callbacks
+     * To be executed after all curretly dispatched works has finished */
+    SIMPLEQ_HEAD(, UA_DelayedCallback) delayedCallbacks;
+#ifdef UA_ENABLE_MULTITHREADING
+    pthread_mutex_t delayedCallbacks_accessMutex;
+    UA_DelayedCallback *delayedCallbacks_checkpoint;
+    size_t delayedCallbacks_sinceDispatch; /* How many have been added since we
+                                            * tried to dispatch callbacks? */
+#endif
+};
+
+void UA_WorkQueue_init(UA_WorkQueue *wq);
+
+/* Enqueue a delayed callback. It is executed when all previous work in the
+ * queue has been finished. The ``cb`` pointer is freed afterwards. ``cb`` can
+ * have a NULL callback that is not executed.
+ *
+ * This method checks internally if existing delayed work can be moved from the
+ * delayed queue to the worker dispatch queue. */
+void UA_WorkQueue_enqueueDelayed(UA_WorkQueue *wq, UA_DelayedCallback *cb);
+
+/* Stop the workers, process all enqueued work in the calling thread, clean up
+ * mutexes etc. */
+void UA_WorkQueue_cleanup(UA_WorkQueue *wq);
+
+#ifndef UA_ENABLE_MULTITHREADING
+
+/* Process all enqueued delayed work. This is not needed when workers are
+ * running for the multithreading case. (UA_WorkQueue_cleanup still calls this
+ * method during cleanup when the workers are shut down.) */
+void UA_WorkQueue_manuallyProcessDelayed(UA_WorkQueue *wq);
+
+#else
+
+/* Spin up a number of worker threads that listen on the work queue */
+UA_StatusCode UA_WorkQueue_start(UA_WorkQueue *wq, size_t workersCount);
+
+void UA_WorkQueue_stop(UA_WorkQueue *wq);
+
+/* Enqueue work for the worker threads */
+void UA_WorkQueue_enqueue(UA_WorkQueue *wq, UA_ApplicationCallback cb,
+                          void *application, void *data);
+
+#endif
+
+_UA_END_DECLS
+
+#endif /* UA_SERVER_WORKQUEUE_H_ */