123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- /* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- *
- * Copyright 2014-2017 (c) Julius Pfrommer, Fraunhofer IOSB
- * Copyright 2014-2016 (c) Sten Grüner
- * Copyright 2015 (c) Chris Iatrou
- * Copyright 2015 (c) Nick Goossens
- * Copyright 2015 (c) Jörg Schüler-Maroldt
- * Copyright 2015-2016 (c) Oleksiy Vasylyev
- * Copyright 2016-2017 (c) Florian Palm
- * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
- * Copyright 2016 (c) Lorenz Haas
- * Copyright 2017 (c) Jonas Green
- */
- #include "ua_util.h"
- #include "ua_server_internal.h"
- #ifdef UA_ENABLE_VALGRIND_INTERACTIVE
- #include <valgrind/memcheck.h>
- #endif
- #define UA_MAXTIMEOUT 50 /* Max timeout in ms between main-loop iterations */
- /**
- * Worker Threads and Dispatch Queue
- * ---------------------------------
- * The worker threads dequeue callbacks from a central Multi-Producer
- * Multi-Consumer Queue (MPMC). When there are no callbacks, workers go idle.
- * The condition to wake them up is triggered whenever a callback is
- * dispatched.
- *
- * Future Plans: Use work-stealing to load-balance between cores.
- * Le, Nhat Minh, et al. "Correct and efficient work-stealing for weak memory
- * models." ACM SIGPLAN Notices. Vol. 48. No. 8. ACM, 2013. */
- #ifdef UA_ENABLE_MULTITHREADING
- struct UA_Worker {
- UA_Server *server;
- pthread_t thr;
- UA_UInt32 counter;
- volatile UA_Boolean running;
- /* separate cache lines */
- char padding[64 - sizeof(void*) - sizeof(pthread_t) -
- sizeof(UA_UInt32) - sizeof(UA_Boolean)];
- };
- struct UA_WorkerCallback {
- SIMPLEQ_ENTRY(UA_WorkerCallback) next;
- UA_ServerCallback callback;
- void *data;
- UA_Boolean delayed; /* Is it a delayed callback? */
- UA_Boolean countersSampled; /* Have the worker counters been sampled? */
- UA_UInt32 workerCounters[]; /* Counter value for each worker */
- };
- typedef struct UA_WorkerCallback WorkerCallback;
- /* Forward Declaration */
- static void
- processDelayedCallback(UA_Server *server, WorkerCallback *dc);
- static void *
- workerLoop(UA_Worker *worker) {
- UA_Server *server = worker->server;
- UA_UInt32 *counter = &worker->counter;
- volatile UA_Boolean *running = &worker->running;
- /* Initialize the (thread local) random seed with the ram address
- * of the worker. Not for security-critical entropy! */
- UA_random_seed((uintptr_t)worker);
- while(*running) {
- UA_atomic_addUInt32(counter, 1);
- pthread_mutex_lock(&server->dispatchQueue_accessMutex);
- WorkerCallback *dc = SIMPLEQ_FIRST(&server->dispatchQueue);
- if(dc) {
- SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
- }
- pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
- if(!dc) {
- /* Nothing to do. Sleep until a callback is dispatched */
- pthread_mutex_lock(&server->dispatchQueue_conditionMutex);
- pthread_cond_wait(&server->dispatchQueue_condition,
- &server->dispatchQueue_conditionMutex);
- pthread_mutex_unlock(&server->dispatchQueue_conditionMutex);
- continue;
- }
- if(dc->delayed) {
- processDelayedCallback(server, dc);
- continue;
- }
- dc->callback(server, dc->data);
- UA_free(dc);
- }
- UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
- "Worker shut down");
- return NULL;
- }
- void UA_Server_cleanupDispatchQueue(UA_Server *server) {
- while(true) {
- pthread_mutex_lock(&server->dispatchQueue_accessMutex);
- WorkerCallback *dc = SIMPLEQ_FIRST(&server->dispatchQueue);
- if(!dc) {
- pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
- break;
- }
- SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
- pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
- dc->callback(server, dc->data);
- UA_free(dc);
- }
- }
- #endif
- /**
- * Repeated Callbacks
- * ------------------
- * Repeated Callbacks are handled by UA_Timer (used in both client and server).
- * In the multi-threaded case, callbacks are dispatched to workers. Otherwise,
- * they are executed immediately. */
- void
- UA_Server_workerCallback(UA_Server *server, UA_ServerCallback callback,
- void *data) {
- #ifndef UA_ENABLE_MULTITHREADING
- /* Execute immediately */
- callback(server, data);
- #else
- /* Execute immediately if memory could not be allocated */
- WorkerCallback *dc = (WorkerCallback*)UA_malloc(sizeof(WorkerCallback));
- if(!dc) {
- callback(server, data);
- return;
- }
- /* Enqueue for the worker threads */
- dc->callback = callback;
- dc->data = data;
- dc->delayed = false;
- pthread_mutex_lock(&server->dispatchQueue_accessMutex);
- SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
- pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
- /* Wake up sleeping workers */
- pthread_cond_broadcast(&server->dispatchQueue_condition);
- #endif
- }
- /**
- * Delayed Callbacks
- * -----------------
- *
- * Delayed Callbacks are called only when all callbacks that were dispatched
- * prior are finished. In the single-threaded case, the callback is added to a
- * singly-linked list that is processed at the end of the server's main-loop. In
- * the multi-threaded case, the delay is ensure by a three-step procedure:
- *
- * 1. The delayed callback is dispatched to the worker queue. So it is only
- * dequeued when all prior callbacks have been dequeued.
- *
- * 2. When the callback is first dequeued by a worker, sample the counter of all
- * workers. Once all counters have advanced, the callback is ready.
- *
- * 3. Check regularly if the callback is ready by adding it back to the dispatch
- * queue. */
- /* Delayed callback to free the subscription memory */
- static void
- freeCallback(UA_Server *server, void *data) {
- UA_free(data);
- }
- /* TODO: Delayed free should never fail. This can be achieved by adding a prefix
- * with the list pointers */
- UA_StatusCode
- UA_Server_delayedFree(UA_Server *server, void *data) {
- return UA_Server_delayedCallback(server, freeCallback, data);
- }
- #ifndef UA_ENABLE_MULTITHREADING
- typedef struct UA_DelayedCallback {
- SLIST_ENTRY(UA_DelayedCallback) next;
- UA_ServerCallback callback;
- void *data;
- } UA_DelayedCallback;
- UA_StatusCode
- UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
- void *data) {
- UA_DelayedCallback *dc =
- (UA_DelayedCallback*)UA_malloc(sizeof(UA_DelayedCallback));
- if(!dc)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- dc->callback = callback;
- dc->data = data;
- SLIST_INSERT_HEAD(&server->delayedCallbacks, dc, next);
- return UA_STATUSCODE_GOOD;
- }
- void UA_Server_cleanupDelayedCallbacks(UA_Server *server) {
- UA_DelayedCallback *dc, *dc_tmp;
- SLIST_FOREACH_SAFE(dc, &server->delayedCallbacks, next, dc_tmp) {
- SLIST_REMOVE(&server->delayedCallbacks, dc, UA_DelayedCallback, next);
- dc->callback(server, dc->data);
- UA_free(dc);
- }
- }
- #else /* UA_ENABLE_MULTITHREADING */
- UA_StatusCode
- UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
- void *data) {
- size_t dcsize = sizeof(WorkerCallback) +
- (sizeof(UA_UInt32) * server->config.nThreads);
- WorkerCallback *dc = (WorkerCallback*)UA_malloc(dcsize);
- if(!dc)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- /* Enqueue for the worker threads */
- dc->callback = callback;
- dc->data = data;
- dc->delayed = true;
- dc->countersSampled = false;
- pthread_mutex_lock(&server->dispatchQueue_accessMutex);
- SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
- pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
- /* Wake up sleeping workers */
- pthread_cond_broadcast(&server->dispatchQueue_condition);
- return UA_STATUSCODE_GOOD;
- }
- /* Called from the worker loop */
- static void
- processDelayedCallback(UA_Server *server, WorkerCallback *dc) {
- /* Set the worker counters */
- if(!dc->countersSampled) {
- for(size_t i = 0; i < server->config.nThreads; ++i)
- dc->workerCounters[i] = server->workers[i].counter;
- dc->countersSampled = true;
- /* Re-add to the dispatch queue */
- pthread_mutex_lock(&server->dispatchQueue_accessMutex);
- SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
- pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
- /* Wake up sleeping workers */
- pthread_cond_broadcast(&server->dispatchQueue_condition);
- return;
- }
- /* Have all other jobs finished? */
- UA_Boolean ready = true;
- for(size_t i = 0; i < server->config.nThreads; ++i) {
- if(dc->workerCounters[i] == server->workers[i].counter) {
- ready = false;
- break;
- }
- }
- /* Re-add to the dispatch queue.
- * TODO: What is the impact of this loop?
- * Can we add a small delay here? */
- if(!ready) {
- pthread_mutex_lock(&server->dispatchQueue_accessMutex);
- SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
- pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
- /* Wake up sleeping workers */
- pthread_cond_broadcast(&server->dispatchQueue_condition);
- return;
- }
- /* Execute the callback */
- dc->callback(server, dc->data);
- UA_free(dc);
- }
- #endif
- /**
- * Main Server Loop
- * ----------------
- * Start: Spin up the workers and the network layer and sample the server's
- * start time.
- * Iterate: Process repeated callbacks and events in the network layer.
- * This part can be driven from an external main-loop in an
- * event-driven single-threaded architecture.
- * Stop: Stop workers, finish all callbacks, stop the network layer,
- * clean up */
- UA_StatusCode
- UA_Server_run_startup(UA_Server *server) {
- UA_Variant var;
- UA_StatusCode result = UA_STATUSCODE_GOOD;
- /* Sample the start time and set it to the Server object */
- server->startTime = UA_DateTime_now();
- UA_Variant_init(&var);
- UA_Variant_setScalar(&var, &server->startTime, &UA_TYPES[UA_TYPES_DATETIME]);
- UA_Server_writeValue(server,
- UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_STARTTIME),
- var);
- /* Start the networklayers */
- for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
- UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
- result |= nl->start(nl, &server->config.customHostname);
- }
- /* Spin up the worker threads */
- #ifdef UA_ENABLE_MULTITHREADING
- UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
- "Spinning up %u worker thread(s)", server->config.nThreads);
- pthread_mutex_init(&server->dispatchQueue_accessMutex, NULL);
- pthread_cond_init(&server->dispatchQueue_condition, NULL);
- pthread_mutex_init(&server->dispatchQueue_conditionMutex, NULL);
- server->workers = (UA_Worker*)UA_malloc(server->config.nThreads * sizeof(UA_Worker));
- if(!server->workers)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- for(size_t i = 0; i < server->config.nThreads; ++i) {
- UA_Worker *worker = &server->workers[i];
- worker->server = server;
- worker->counter = 0;
- worker->running = true;
- pthread_create(&worker->thr, NULL, (void* (*)(void*))workerLoop, worker);
- }
- #endif
- /* Start the multicast discovery server */
- #ifdef UA_ENABLE_DISCOVERY_MULTICAST
- if(server->config.applicationDescription.applicationType ==
- UA_APPLICATIONTYPE_DISCOVERYSERVER)
- startMulticastDiscoveryServer(server);
- #endif
- return result;
- }
- UA_UInt16
- UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
- /* Process repeated work */
- UA_DateTime now = UA_DateTime_nowMonotonic();
- UA_DateTime nextRepeated =
- UA_Timer_process(&server->timer, now,
- (UA_TimerDispatchCallback)UA_Server_workerCallback,
- server);
- UA_DateTime latest = now + (UA_MAXTIMEOUT * UA_DATETIME_MSEC);
- if(nextRepeated > latest)
- nextRepeated = latest;
- UA_UInt16 timeout = 0;
- /* round always to upper value to avoid timeout to be set to 0
- * if(nextRepeated - now) < (UA_DATETIME_MSEC/2) */
- if(waitInternal)
- timeout = (UA_UInt16)(((nextRepeated - now) + (UA_DATETIME_MSEC - 1)) / UA_DATETIME_MSEC);
- /* Listen on the networklayer */
- for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
- UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
- nl->listen(nl, server, timeout);
- }
- #ifndef UA_ENABLE_MULTITHREADING
- /* Process delayed callbacks when all callbacks and network events are done.
- * If multithreading is enabled, the cleanup of delayed values is attempted
- * by a callback in the job queue. */
- UA_Server_cleanupDelayedCallbacks(server);
- #endif
- #if defined(UA_ENABLE_DISCOVERY_MULTICAST) && !defined(UA_ENABLE_MULTITHREADING)
- if(server->config.applicationDescription.applicationType ==
- UA_APPLICATIONTYPE_DISCOVERYSERVER) {
- // TODO multicastNextRepeat does not consider new input data (requests)
- // on the socket. It will be handled on the next call. if needed, we
- // need to use select with timeout on the multicast socket
- // server->mdnsSocket (see example in mdnsd library) on higher level.
- UA_DateTime multicastNextRepeat = 0;
- UA_StatusCode hasNext =
- iterateMulticastDiscoveryServer(server, &multicastNextRepeat, UA_TRUE);
- if(hasNext == UA_STATUSCODE_GOOD && multicastNextRepeat < nextRepeated)
- nextRepeated = multicastNextRepeat;
- }
- #endif
- now = UA_DateTime_nowMonotonic();
- timeout = 0;
- if(nextRepeated > now)
- timeout = (UA_UInt16)((nextRepeated - now) / UA_DATETIME_MSEC);
- return timeout;
- }
- UA_StatusCode
- UA_Server_run_shutdown(UA_Server *server) {
- /* Stop the netowrk layer */
- for(size_t i = 0; i < server->config.networkLayersSize; ++i) {
- UA_ServerNetworkLayer *nl = &server->config.networkLayers[i];
- nl->stop(nl, server);
- }
- #ifdef UA_ENABLE_MULTITHREADING
- /* Shut down the workers */
- if(server->workers) {
- UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
- "Shutting down %u worker thread(s)",
- server->config.nThreads);
- for(size_t i = 0; i < server->config.nThreads; ++i)
- server->workers[i].running = false;
- pthread_cond_broadcast(&server->dispatchQueue_condition);
- for(size_t i = 0; i < server->config.nThreads; ++i)
- pthread_join(server->workers[i].thr, NULL);
- UA_free(server->workers);
- server->workers = NULL;
- }
- /* Execute the remaining callbacks in the dispatch queue. Also executes
- * delayed callbacks. */
- UA_Server_cleanupDispatchQueue(server);
- #else
- /* Process remaining delayed callbacks */
- UA_Server_cleanupDelayedCallbacks(server);
- #endif
- #ifdef UA_ENABLE_DISCOVERY_MULTICAST
- /* Stop multicast discovery */
- if(server->config.applicationDescription.applicationType ==
- UA_APPLICATIONTYPE_DISCOVERYSERVER)
- stopMulticastDiscoveryServer(server);
- #endif
- return UA_STATUSCODE_GOOD;
- }
- UA_StatusCode
- UA_Server_run(UA_Server *server, volatile UA_Boolean *running) {
- UA_StatusCode retval = UA_Server_run_startup(server);
- if(retval != UA_STATUSCODE_GOOD)
- return retval;
- #ifdef UA_ENABLE_VALGRIND_INTERACTIVE
- size_t loopCount = 0;
- #endif
- while(*running) {
- #ifdef UA_ENABLE_VALGRIND_INTERACTIVE
- if(loopCount == 0) {
- VALGRIND_DO_LEAK_CHECK;
- }
- ++loopCount;
- loopCount %= UA_VALGRIND_INTERACTIVE_INTERVAL;
- #endif
- UA_Server_run_iterate(server, true);
- }
- return UA_Server_run_shutdown(server);
- }
|