瀏覽代碼

Removal of liburcu (userspace-rcu) (#1564)

* completely removed liburcu
(replaced rcu-queues by usage of native queues from queue.h that are secured by mutexes)

* atomic add/sub operations:
- fixed usage in discovery and mdns: was first atomicly changed then unsafe reassigned to itself afterwards
- differentiated between type UA_UInt32 and size_t (other types in discovery and mdns)
- explicit subtraction introduced UA_atomic_sub* (more readable)

* reverted accidentially introduced "const" parameter declarations

* unit tests (on travis) require also "libpthread"
Thomas Bender 6 年之前
父節點
當前提交
487bc8674d

+ 4 - 4
CMakeLists.txt

@@ -188,7 +188,10 @@ if(NOT WIN32)
     list(APPEND open62541_LIBRARIES c)
     list(APPEND open62541_LIBRARIES stdc++)
   else()
-    list(APPEND open62541_LIBRARIES pthread m)
+    list(APPEND open62541_LIBRARIES m)
+    if(UA_ENABLE_MULTITHREADING OR UA_BUILD_UNIT_TESTS)
+      list(APPEND open62541_LIBRARIES pthread)
+    endif()
     if(NOT APPLE AND (NOT ${CMAKE_SYSTEM_NAME} MATCHES "OpenBSD"))
       list(APPEND open62541_LIBRARIES rt)
     endif()
@@ -199,9 +202,6 @@ else()
         list(APPEND open62541_LIBRARIES iphlpapi)
     endif()
 endif()
-if(UA_ENABLE_MULTITHREADING)
-  list(APPEND open62541_LIBRARIES urcu-cds urcu-bp urcu-common)
-endif(UA_ENABLE_MULTITHREADING)
 
 #####################
 # Compiler Settings #

+ 0 - 1
doc/building.rst

@@ -27,7 +27,6 @@ Building with CMake on Ubuntu or Debian
    # enable additional features
    sudo apt-get install cmake-curses-gui # for the ccmake graphical interface
    sudo apt-get install libmbedtls-dev # for encryption support
-   sudo apt-get install liburcu-dev # for multithreading
    sudo apt-get install check # for unit tests
    sudo apt-get install python-sphinx graphviz # for documentation generation
    sudo apt-get install python-sphinx-rtd-theme # documentation style

+ 0 - 4
plugins/ua_network_udp.c

@@ -9,10 +9,6 @@
 #include <stdio.h>
 #include <string.h> // memset
 
-#ifdef UA_ENABLE_MULTITHREADING
-# include <urcu/uatomic.h>
-#endif
-
 /* with a space so amalgamation does not remove the includes */
 # include <errno.h> // errno, EINTR
 # include <fcntl.h> // fcntl

+ 2 - 2
src/server/ua_mdns.c

@@ -111,7 +111,7 @@ mdns_record_add_or_get(UA_Server *server, const char *record, const char *server
     // todo: malloc may fail: return a statuscode
     listEntry->serverOnNetwork.serverName.data = (UA_Byte*)UA_malloc(serverNameLen);
     memcpy(listEntry->serverOnNetwork.serverName.data, serverName, serverNameLen);
-    server->serverOnNetworkRecordIdCounter = UA_atomic_add(&server->serverOnNetworkRecordIdCounter, 1);
+    UA_atomic_addUInt32(&server->serverOnNetworkRecordIdCounter, 1);
     if(server->serverOnNetworkRecordIdCounter == 0)
         server->serverOnNetworkRecordIdLastReset = UA_DateTime_now();
 
@@ -169,7 +169,7 @@ mdns_record_remove(UA_Server *server, const char *record,
     server->serverOnNetworkSize--;
     UA_free(entry);
 #else
-    server->serverOnNetworkSize = uatomic_add_return(&server->serverOnNetworkSize, -1);
+    UA_atomic_subSize(&server->serverOnNetworkSize, 1);
     UA_Server_delayedCallback(server, delayedFree, entry);
 #endif
 }

+ 2 - 2
src/server/ua_securechannel_manager.c

@@ -59,7 +59,7 @@ removeSecureChannel(UA_SecureChannelManager *cm, channel_list_entry *entry) {
 
     /* Detach the channel and make the capacity available */
     LIST_REMOVE(entry, pointers);
-    UA_atomic_add(&cm->currentChannelCount, (UA_UInt32)-1);
+    UA_atomic_subUInt32(&cm->currentChannelCount, 1);
     return UA_STATUSCODE_GOOD;
 }
 
@@ -134,7 +134,7 @@ UA_SecureChannelManager_create(UA_SecureChannelManager *const cm, UA_Connection
     entry->channel.securityToken.revisedLifetime = cm->server->config.maxSecurityTokenLifetime;
 
     LIST_INSERT_HEAD(&cm->channels, entry, pointers);
-    UA_atomic_add(&cm->currentChannelCount, 1);
+    UA_atomic_addUInt32(&cm->currentChannelCount, 1);
     UA_Connection_attachSecureChannel(connection, &entry->channel);
     return UA_STATUSCODE_GOOD;
 }

+ 3 - 2
src/server/ua_server.c

@@ -145,8 +145,9 @@ void UA_Server_delete(UA_Server *server) {
 #endif
 
 #ifdef UA_ENABLE_MULTITHREADING
+    pthread_mutex_destroy(&server->dispatchQueue_accessMutex);
     pthread_cond_destroy(&server->dispatchQueue_condition);
-    pthread_mutex_destroy(&server->dispatchQueue_mutex);
+    pthread_mutex_destroy(&server->dispatchQueue_conditionMutex);
 #endif
 
     /* Delete the timed work */
@@ -211,7 +212,7 @@ UA_Server_new(const UA_ServerConfig *config) {
 
     /* Initialized the dispatch queue for worker threads */
 #ifdef UA_ENABLE_MULTITHREADING
-    cds_wfcq_init(&server->dispatchQueue_head, &server->dispatchQueue_tail);
+    SIMPLEQ_INIT(&server->dispatchQueue);
 #endif
 
     /* Create Namespaces 0 and 1 */

+ 10 - 7
src/server/ua_server_internal.h

@@ -28,13 +28,17 @@ extern "C" {
 
 #ifdef UA_ENABLE_MULTITHREADING
 
-/* TODO: Don't depend on liburcu */
-#include <urcu.h>
-#include <urcu/lfstack.h>
+#include <pthread.h>
 
 struct UA_Worker;
 typedef struct UA_Worker UA_Worker;
 
+struct UA_WorkerCallback;
+typedef struct UA_WorkerCallback UA_WorkerCallback;
+
+SIMPLEQ_HEAD(UA_DispatchQueue, UA_WorkerCallback);
+typedef struct UA_DispatchQueue UA_DispatchQueue;
+
 #endif /* UA_ENABLE_MULTITHREADING */
 
 #ifdef UA_ENABLE_DISCOVERY
@@ -126,12 +130,11 @@ struct UA_Server {
 
     /* Worker threads */
 #ifdef UA_ENABLE_MULTITHREADING
-    /* Dispatch queue head for the worker threads (the tail should not be in the same cache line) */
-    struct cds_wfcq_head dispatchQueue_head;
     UA_Worker *workers; /* there are nThread workers in a running server */
+    UA_DispatchQueue dispatchQueue; /* Dispatch queue for the worker threads */
+    pthread_mutex_t dispatchQueue_accessMutex; /* mutex for access to queue */
     pthread_cond_t dispatchQueue_condition; /* so the workers don't spin if the queue is empty */
-    pthread_mutex_t dispatchQueue_mutex; /* mutex for access to condition variable */
-    struct cds_wfcq_tail dispatchQueue_tail; /* Dispatch queue tail for the worker threads */
+    pthread_mutex_t dispatchQueue_conditionMutex; /* mutex for access to condition variable */
 #endif
 
     /* For bootstrapping, omit some consistency checks, creating a reference to

+ 34 - 29
src/server/ua_server_worker.c

@@ -44,15 +44,16 @@ struct UA_Worker {
                  sizeof(UA_UInt32) - sizeof(UA_Boolean)];
 };
 
-typedef struct {
-    struct cds_wfcq_node node;
+struct UA_WorkerCallback {
+    SIMPLEQ_ENTRY(UA_WorkerCallback) next;
     UA_ServerCallback callback;
     void *data;
 
     UA_Boolean delayed;         /* Is it a delayed callback? */
     UA_Boolean countersSampled; /* Have the worker counters been sampled? */
     UA_UInt32 workerCounters[]; /* Counter value for each worker */
-} WorkerCallback;
+};
+typedef struct UA_WorkerCallback WorkerCallback;
 
 /* Forward Declaration */
 static void
@@ -69,16 +70,19 @@ workerLoop(UA_Worker *worker) {
     UA_random_seed((uintptr_t)worker);
 
     while(*running) {
-        UA_atomic_add(counter, 1);
-        WorkerCallback *dc = (WorkerCallback*)
-            cds_wfcq_dequeue_blocking(&server->dispatchQueue_head,
-                                      &server->dispatchQueue_tail);
+        UA_atomic_addUInt32(counter, 1);
+        pthread_mutex_lock(&server->dispatchQueue_accessMutex);
+        WorkerCallback *dc = SIMPLEQ_FIRST(&server->dispatchQueue);
+        if(dc) {
+            SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
+        }
+        pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
         if(!dc) {
             /* Nothing to do. Sleep until a callback is dispatched */
-            pthread_mutex_lock(&server->dispatchQueue_mutex);
+            pthread_mutex_lock(&server->dispatchQueue_conditionMutex);
             pthread_cond_wait(&server->dispatchQueue_condition,
-                              &server->dispatchQueue_mutex);
-            pthread_mutex_unlock(&server->dispatchQueue_mutex);
+                              &server->dispatchQueue_conditionMutex);
+            pthread_mutex_unlock(&server->dispatchQueue_conditionMutex);
             continue;
         }
 
@@ -98,14 +102,14 @@ workerLoop(UA_Worker *worker) {
 
 static void
 emptyDispatchQueue(UA_Server *server) {
-    while(!cds_wfcq_empty(&server->dispatchQueue_head,
-                          &server->dispatchQueue_tail)) {
-        WorkerCallback *dc = (WorkerCallback*)
-            cds_wfcq_dequeue_blocking(&server->dispatchQueue_head,
-                                      &server->dispatchQueue_tail);
+    pthread_mutex_lock(&server->dispatchQueue_accessMutex);
+    WorkerCallback *dc;
+    while((dc = SIMPLEQ_FIRST(&server->dispatchQueue)) != NULL) {
+        SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
         dc->callback(server, dc->data);
         UA_free(dc);
     }
+    pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
 }
 
 #endif
@@ -135,9 +139,9 @@ UA_Server_workerCallback(UA_Server *server, UA_ServerCallback callback,
     dc->callback = callback;
     dc->data = data;
     dc->delayed = false;
-    cds_wfcq_node_init(&dc->node);
-    cds_wfcq_enqueue(&server->dispatchQueue_head,
-                     &server->dispatchQueue_tail, &dc->node);
+    pthread_mutex_lock(&server->dispatchQueue_accessMutex);
+    SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
+    pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
 
     /* Wake up sleeping workers */
     pthread_cond_broadcast(&server->dispatchQueue_condition);
@@ -210,9 +214,9 @@ UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
     dc->data = data;
     dc->delayed = true;
     dc->countersSampled = false;
-    cds_wfcq_node_init(&dc->node);
-    cds_wfcq_enqueue(&server->dispatchQueue_head,
-                     &server->dispatchQueue_tail, &dc->node);
+    pthread_mutex_lock(&server->dispatchQueue_accessMutex);
+    SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
+    pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
 
     /* Wake up sleeping workers */
     pthread_cond_broadcast(&server->dispatchQueue_condition);
@@ -229,9 +233,9 @@ processDelayedCallback(UA_Server *server, WorkerCallback *dc) {
         dc->countersSampled = true;
 
         /* Re-add to the dispatch queue */
-        cds_wfcq_node_init(&dc->node);
-        cds_wfcq_enqueue(&server->dispatchQueue_head,
-                         &server->dispatchQueue_tail, &dc->node);
+        pthread_mutex_lock(&server->dispatchQueue_accessMutex);
+        SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
+        pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
 
         /* Wake up sleeping workers */
         pthread_cond_broadcast(&server->dispatchQueue_condition);
@@ -251,9 +255,9 @@ processDelayedCallback(UA_Server *server, WorkerCallback *dc) {
      * TODO: What is the impact of this loop?
      * Can we add a small delay here? */
     if(!ready) {
-        cds_wfcq_node_init(&dc->node);
-        cds_wfcq_enqueue(&server->dispatchQueue_head,
-                         &server->dispatchQueue_tail, &dc->node);
+        pthread_mutex_lock(&server->dispatchQueue_accessMutex);
+        SIMPLEQ_INSERT_TAIL(&server->dispatchQueue, dc, next);
+        pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
 
         /* Wake up sleeping workers */
         pthread_cond_broadcast(&server->dispatchQueue_condition);
@@ -301,8 +305,9 @@ UA_Server_run_startup(UA_Server *server) {
 #ifdef UA_ENABLE_MULTITHREADING
     UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
                 "Spinning up %u worker thread(s)", server->config.nThreads);
-    pthread_cond_init(&server->dispatchQueue_condition, 0);
-    pthread_mutex_init(&server->dispatchQueue_mutex, 0);
+    pthread_mutex_init(&server->dispatchQueue_accessMutex, NULL);
+    pthread_cond_init(&server->dispatchQueue_condition, NULL);
+    pthread_mutex_init(&server->dispatchQueue_conditionMutex, NULL);
     server->workers = (UA_Worker*)UA_malloc(server->config.nThreads * sizeof(UA_Worker));
     if(!server->workers)
         return UA_STATUSCODE_BADOUTOFMEMORY;

+ 3 - 3
src/server/ua_services_discovery.c

@@ -461,7 +461,7 @@ process_RegisterServer(UA_Server *server, UA_Session *session,
         UA_free(registeredServer_entry);
         server->registeredServersSize--;
 #else
-        server->registeredServersSize = uatomic_add_return(&server->registeredServersSize, -1);
+        UA_atomic_subSize(&server->registeredServersSize, 1);
         UA_Server_delayedCallback(server, freeEntry, registeredServer_entry);
 #endif
         responseHeader->serviceResult = UA_STATUSCODE_GOOD;
@@ -485,7 +485,7 @@ process_RegisterServer(UA_Server *server, UA_Session *session,
 #ifndef UA_ENABLE_MULTITHREADING
         server->registeredServersSize++;
 #else
-        server->registeredServersSize = uatomic_add_return(&server->registeredServersSize, 1);
+        UA_atomic_addSize(&server->registeredServersSize, 1);
 #endif
 
         if(server->registerServerCallback)
@@ -583,7 +583,7 @@ void UA_Discovery_cleanupTimedOut(UA_Server *server, UA_DateTime nowMonotonic) {
             UA_free(current);
             server->registeredServersSize--;
 #else
-            server->registeredServersSize = uatomic_add_return(&server->registeredServersSize, -1);
+            UA_atomic_subSize(&server->registeredServersSize, 1);
             UA_Server_delayedCallback(server, freeEntry, current);
 #endif
         }

+ 2 - 2
src/server/ua_session_manager.c

@@ -58,7 +58,7 @@ removeSession(UA_SessionManager *sm, session_list_entry *sentry) {
     /* Detach the session from the session manager and make the capacity
      * available */
     LIST_REMOVE(sentry, pointers);
-    UA_atomic_add(&sm->currentSessionCount, (UA_UInt32)-1);
+    UA_atomic_subUInt32(&sm->currentSessionCount, 1);
     return UA_STATUSCODE_GOOD;
 }
 
@@ -141,7 +141,7 @@ UA_SessionManager_createSession(UA_SessionManager *sm, UA_SecureChannel *channel
     if(!newentry)
         return UA_STATUSCODE_BADOUTOFMEMORY;
 
-    UA_atomic_add(&sm->currentSessionCount, 1);
+    UA_atomic_addUInt32(&sm->currentSessionCount, 1);
     UA_Session_init(&newentry->session);
     newentry->session.sessionId = UA_NODEID_GUID(1, UA_Guid_random());
     newentry->session.header.authenticationToken = UA_NODEID_GUID(1, UA_Guid_random());

+ 2 - 2
src/ua_securechannel.c

@@ -344,7 +344,7 @@ UA_SecureChannel_sendAsymmetricOPNMessage(UA_SecureChannel *channel, UA_UInt32 r
 
     UA_SequenceHeader seqHeader;
     seqHeader.requestId = requestId;
-    seqHeader.sequenceNumber = UA_atomic_add(&channel->sendSequenceNumber, 1);
+    seqHeader.sequenceNumber = UA_atomic_addUInt32(&channel->sendSequenceNumber, 1);
     retval |= UA_encodeBinary(&seqHeader, &UA_TRANSPORT[UA_TRANSPORT_SEQUENCEHEADER],
                               &header_pos, &buf_end, NULL, NULL);
 
@@ -506,7 +506,7 @@ sendSymmetricChunk(UA_MessageContext *mc) {
 
     UA_SequenceHeader seqHeader;
     seqHeader.requestId = mc->requestId;
-    seqHeader.sequenceNumber = UA_atomic_add(&channel->sendSequenceNumber, 1);
+    seqHeader.sequenceNumber = UA_atomic_addUInt32(&channel->sendSequenceNumber, 1);
     res |= UA_encodeBinary(&seqHeader, &UA_TRANSPORT[UA_TRANSPORT_SEQUENCEHEADER],
                            &header_pos, &mc->buf_end, NULL, NULL);
 

+ 43 - 1
src/ua_util.h

@@ -107,7 +107,7 @@ UA_atomic_cmpxchg(void * volatile * addr, void *expected, void *newptr) {
 }
 
 static UA_INLINE uint32_t
-UA_atomic_add(volatile uint32_t *addr, uint32_t increase) {
+UA_atomic_addUInt32(volatile uint32_t *addr, uint32_t increase) {
 #ifndef UA_ENABLE_MULTITHREADING
     *addr += increase;
     return *addr;
@@ -120,6 +120,48 @@ UA_atomic_add(volatile uint32_t *addr, uint32_t increase) {
 #endif
 }
 
+static UA_INLINE size_t
+UA_atomic_addSize(volatile size_t *addr, size_t increase) {
+#ifndef UA_ENABLE_MULTITHREADING
+    *addr += increase;
+    return *addr;
+#else
+# ifdef _MSC_VER /* Visual Studio */
+    return _InterlockedExchangeAdd(addr, increase) + increase;
+# else /* GCC/Clang */
+    return __sync_add_and_fetch(addr, increase);
+# endif
+#endif
+}
+
+static UA_INLINE uint32_t
+UA_atomic_subUInt32(volatile uint32_t *addr, uint32_t decrease) {
+#ifndef UA_ENABLE_MULTITHREADING
+    *addr -= decrease;
+    return *addr;
+#else
+# ifdef _MSC_VER /* Visual Studio */
+    return _InterlockedExchangeSub(addr, decrease) - decrease;
+# else /* GCC/Clang */
+    return __sync_sub_and_fetch(addr, decrease);
+# endif
+#endif
+}
+
+static UA_INLINE size_t
+UA_atomic_subSize(volatile size_t *addr, size_t decrease) {
+#ifndef UA_ENABLE_MULTITHREADING
+    *addr -= decrease;
+    return *addr;
+#else
+# ifdef _MSC_VER /* Visual Studio */
+    return _InterlockedExchangeSub(addr, decrease) - decrease;
+# else /* GCC/Clang */
+    return __sync_sub_and_fetch(addr, decrease);
+# endif
+#endif
+}
+
 /* Utility Functions
  * ----------------- */
 

+ 0 - 1
tools/travis/travis_linux_before_install.sh

@@ -32,7 +32,6 @@ if [ -z ${DOCKER+x} ]; then
 
 	sudo add-apt-repository -y ppa:lttng/ppa
 	sudo apt-get update -qq
-	sudo apt-get install -y liburcu4 liburcu-dev
 	echo -en 'travis_fold:end:script.before_install.external\\r'
 
 	echo "=== Installing python packages ===" && echo -en 'travis_fold:start:before_install.python\\r'

+ 0 - 1
tools/travis/travis_osx_before_install.sh

@@ -2,7 +2,6 @@
 set -ev
 
 brew install check
-brew install userspace-rcu
 brew install valgrind
 brew install graphviz
 brew install python