Bladeren bron

Ensure that unit tests run with multithreading enabled (fix #912)

Julius Pfrommer 8 jaren geleden
bovenliggende
commit
9925681e23

+ 5 - 2
src/server/ua_nodestore_concurrent.c

@@ -82,8 +82,9 @@ void UA_NodeStore_delete(UA_NodeStore *ns) {
         }
         cds_lfht_next(ht, &iter);
     }
+    UA_RCU_UNLOCK();
     cds_lfht_destroy(ht, NULL);
-    UA_free(ns);
+    UA_RCU_LOCK();
 }
 
 UA_Node * UA_NodeStore_newNode(UA_NodeClass class) {
@@ -153,8 +154,10 @@ UA_StatusCode UA_NodeStore_replace(UA_NodeStore *ns, UA_Node *node) {
 
     /* We try to replace an obsolete version of the node */
     struct nodeEntry *oldEntry = (struct nodeEntry*)iter.node;
-    if(oldEntry != entry->orig)
+    if(oldEntry != entry->orig) {
+        deleteEntry(&entry->rcu_head);
         return UA_STATUSCODE_BADINTERNALERROR;
+    }
     
     cds_lfht_node_init(&entry->htn);
     if(cds_lfht_replace(ht, &iter, h, compare, &node->nodeId, &entry->htn) != 0) {

+ 22 - 19
src/server/ua_server_worker.c

@@ -115,6 +115,7 @@ workerLoop(UA_Worker *worker) {
     UA_ASSERT_RCU_UNLOCKED();
     rcu_barrier(); // wait for all scheduled call_rcu work to complete
     rcu_unregister_thread();
+    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER, "Worker shut down");
     return NULL;
 }
 
@@ -635,18 +636,16 @@ UA_UInt16 UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
 #endif
         }
 
-#ifdef UA_ENABLE_MULTITHREADING
-        /* Wake up worker threads */
-        if(dispatched)
-            pthread_cond_broadcast(&server->dispatchQueue_condition);
-#endif
-
         /* Clean up jobs list */
         if(jobsSize > 0)
             UA_free(jobs);
     }
 
-#ifndef UA_ENABLE_MULTITHREADING
+#ifdef UA_ENABLE_MULTITHREADING
+    /* Wake up worker threads */
+    if(dispatched)
+        pthread_cond_broadcast(&server->dispatchQueue_condition);
+#else
     processDelayedCallbacks(server);
 #endif
 
@@ -668,18 +667,22 @@ UA_StatusCode UA_Server_run_shutdown(UA_Server *server) {
     }
 
 #ifdef UA_ENABLE_MULTITHREADING
-    UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
-                "Shutting down %u worker thread(s)", server->config.nThreads);
-    /* Wait for all worker threads to finish */
-    for(size_t i = 0; i < server->config.nThreads; ++i)
-        server->workers[i].running = false;
-    pthread_cond_broadcast(&server->dispatchQueue_condition);
-    for(size_t i = 0; i < server->config.nThreads; ++i)
-        pthread_join(server->workers[i].thr, NULL);
-    UA_free(server->workers);
-
-    /* Manually finish the work still enqueued.
-       This especially contains delayed frees */
+    /* Ensure that run_shutdown can be called multiple times */
+    if(server->workers) {
+        UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
+                    "Shutting down %u worker thread(s)", server->config.nThreads);
+        /* Wait for all worker threads to finish */
+        for(size_t i = 0; i < server->config.nThreads; ++i)
+            server->workers[i].running = false;
+        pthread_cond_broadcast(&server->dispatchQueue_condition);
+        for(size_t i = 0; i < server->config.nThreads; ++i)
+            pthread_join(server->workers[i].thr, NULL);
+        /* Free the worker structures */
+        UA_free(server->workers);
+        server->workers = NULL;
+    }
+
+    /* Manually finish the work still enqueued */
     emptyDispatchQueue(server);
     UA_ASSERT_RCU_UNLOCKED();
     rcu_barrier(); // wait for all scheduled call_rcu work to complete

+ 8 - 2
src/server/ua_subscription.c

@@ -427,7 +427,7 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         if(sub->currentKeepAliveCount < sub->maxKeepAliveCount)
             return;
         UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                             "Sending a keepalive on subscription %u",
+                             "Subscription %u | Sending a KeepAlive",
                              sub->subscriptionID)
     }
 
@@ -442,7 +442,7 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     /* Cannot publish without a response */
     if(!pre) {
         UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                             "Cannot send a publish response on subscription %u, "
+                             "Subscription %u | Cannot send a publish response "
                              "since the publish queue is empty", sub->subscriptionID)
         if(sub->state != UA_SUBSCRIPTIONSTATE_LATE) {
             sub->state = UA_SUBSCRIPTIONSTATE_LATE;
@@ -545,6 +545,9 @@ Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub) {
     if(sub->publishJobIsRegistered)
         return UA_STATUSCODE_GOOD;
 
+    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
+                         "Subscription %u | Register subscription publishing callback",
+                         sub->subscriptionID);
     UA_Job job;
     job.type = UA_JOBTYPE_METHODCALL;
     job.job.methodCall.method = (UA_ServerCallback)UA_Subscription_publishCallback;
@@ -561,6 +564,9 @@ UA_StatusCode
 Subscription_unregisterPublishJob(UA_Server *server, UA_Subscription *sub) {
     if(!sub->publishJobIsRegistered)
         return UA_STATUSCODE_GOOD;
+    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
+                         "Subscription %u | Unregister subscription publishing callback",
+                         sub->subscriptionID);
     sub->publishJobIsRegistered = false;
     return UA_Server_removeRepeatedJob(server, sub->publishJobGuid);
 }

+ 34 - 116
tests/check_nodestore.c

@@ -4,6 +4,7 @@
 
 #include "ua_types.h"
 #include "server/ua_nodestore.h"
+#include "server/ua_server_internal.h"
 #include "ua_util.h"
 #include "check.h"
 
@@ -12,6 +13,18 @@
 #include <urcu.h>
 #endif
 
+UA_NodeStore *ns;
+
+static void setup(void) {
+    ns = UA_NodeStore_new();
+    UA_RCU_LOCK();
+}
+
+static void teardown(void) {
+    UA_NodeStore_delete(ns);
+    UA_RCU_UNLOCK();
+}
+
 int zeroCnt = 0;
 int visitCnt = 0;
 static void checkZeroVisitor(const UA_Node* node) {
@@ -33,21 +46,16 @@ static UA_Node* createNode(UA_Int16 nsid, UA_Int32 id) {
 }
 
 START_TEST(replaceExistingNode) {
-    UA_NodeStore *ns = UA_NodeStore_new();
     UA_Node* n1 = createNode(0,2253);
     UA_NodeStore_insert(ns, n1);
     UA_NodeId in1 = UA_NODEID_NUMERIC(0, 2253);
     UA_Node* n2 = UA_NodeStore_getCopy(ns, &in1);
     UA_StatusCode retval = UA_NodeStore_replace(ns, n2);
-    
     ck_assert_int_eq(retval, UA_STATUSCODE_GOOD);
-    
-    UA_NodeStore_delete(ns);
 }
 END_TEST
 
 START_TEST(replaceOldNode) {
-    UA_NodeStore *ns = UA_NodeStore_new();
     UA_Node* n1 = createNode(0,2253);
     UA_NodeStore_insert(ns, n1);
     UA_NodeId in1 = UA_NODEID_NUMERIC(0,2253);
@@ -61,60 +69,28 @@ START_TEST(replaceOldNode) {
     /* shall fail */
     retval = UA_NodeStore_replace(ns, n3);
     ck_assert_int_ne(retval, UA_STATUSCODE_GOOD);
-    
-    UA_NodeStore_delete(ns);
 }
 END_TEST
 
 START_TEST(findNodeInUA_NodeStoreWithSingleEntry) {
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_register_thread();
-#endif
-    // given
-    UA_NodeStore *ns = UA_NodeStore_new();
     UA_Node* n1 = createNode(0,2253);
     UA_NodeStore_insert(ns, n1);
     UA_NodeId in1 = UA_NODEID_NUMERIC(0,2253);
     const UA_Node* nr = UA_NodeStore_get(ns, &in1);
-    // then
     ck_assert_int_eq((uintptr_t)n1, (uintptr_t)nr);
-    // finally
-    UA_NodeStore_delete(ns);
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_unregister_thread();
-#endif
 }
 END_TEST
 
 START_TEST(failToFindNodeInOtherUA_NodeStore) {
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_register_thread();
-#endif
-    // given
-    UA_NodeStore *ns = UA_NodeStore_new();
-
     UA_Node* n1 = createNode(0,2255);
     UA_NodeStore_insert(ns, n1);
-
-    // when
     UA_NodeId in1 = UA_NODEID_NUMERIC(1, 2255);
     const UA_Node* nr = UA_NodeStore_get(ns, &in1);
-    // then
     ck_assert_int_eq((uintptr_t)nr, 0);
-    // finally
-    UA_NodeStore_delete(ns);
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_unregister_thread();
-#endif
 }
 END_TEST
 
 START_TEST(findNodeInUA_NodeStoreWithSeveralEntries) {
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_register_thread();
-#endif
-    // given
-    UA_NodeStore *ns = UA_NodeStore_new();
     UA_Node* n1 = createNode(0,2253);
     UA_NodeStore_insert(ns, n1);
     UA_Node* n2 = createNode(0,2255);
@@ -128,25 +104,13 @@ START_TEST(findNodeInUA_NodeStoreWithSeveralEntries) {
     UA_Node* n6 = createNode(0,12);
     UA_NodeStore_insert(ns, n6);
 
-    // when
     UA_NodeId in3 = UA_NODEID_NUMERIC(0, 2257);
     const UA_Node* nr = UA_NodeStore_get(ns, &in3);
-    // then
     ck_assert_int_eq((uintptr_t)nr, (uintptr_t)n3);
-    // finally
-    UA_NodeStore_delete(ns);
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_unregister_thread();
-#endif
 }
 END_TEST
 
 START_TEST(iterateOverUA_NodeStoreShallNotVisitEmptyNodes) {
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_register_thread();
-#endif
-    // given
-    UA_NodeStore *ns = UA_NodeStore_new();
     UA_Node* n1 = createNode(0,2253);
     UA_NodeStore_insert(ns, n1);
     UA_Node* n2 = createNode(0,2255);
@@ -160,57 +124,30 @@ START_TEST(iterateOverUA_NodeStoreShallNotVisitEmptyNodes) {
     UA_Node* n6 = createNode(0,12);
     UA_NodeStore_insert(ns, n6);
 
-    // when
     zeroCnt = 0;
     visitCnt = 0;
     UA_NodeStore_iterate(ns,checkZeroVisitor);
-    // then
     ck_assert_int_eq(zeroCnt, 0);
     ck_assert_int_eq(visitCnt, 6);
-    // finally
-    UA_NodeStore_delete(ns);
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_unregister_thread();
-#endif
 }
 END_TEST
 
 START_TEST(findNodeInExpandedNamespace) {
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_register_thread();
-#endif
-    // given
-    UA_NodeStore *ns = UA_NodeStore_new();
-    UA_Node* n;
-    UA_Int32 i=0;
-    for (; i<200; i++) {
-        n = createNode(0,i);
+    for(UA_UInt32 i = 0; i < 200; i++) {
+        UA_Node* n = createNode(0,i);
         UA_NodeStore_insert(ns, n);
     }
     // when
     UA_Node *n2 = createNode(0,25);
     const UA_Node* nr = UA_NodeStore_get(ns,&n2->nodeId);
-    // then
     ck_assert_int_eq(nr->nodeId.identifier.numeric,n2->nodeId.identifier.numeric);
-    // finally
     UA_NodeStore_deleteNode(n2);
-    UA_NodeStore_delete(ns);
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_unregister_thread();
-#endif
 }
 END_TEST
 
 START_TEST(iterateOverExpandedNamespaceShallNotVisitEmptyNodes) {
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_register_thread();
-#endif
-    // given
-    UA_NodeStore *ns = UA_NodeStore_new();
-    UA_Node* n;
-    UA_Int32 i=0;
-    for (; i<200; i++) {
-        n = createNode(0,i);
+    for(UA_UInt32 i = 0; i < 200; i++) {
+        UA_Node* n = createNode(0,i);
         UA_NodeStore_insert(ns, n);
     }
     // when
@@ -220,20 +157,10 @@ START_TEST(iterateOverExpandedNamespaceShallNotVisitEmptyNodes) {
     // then
     ck_assert_int_eq(zeroCnt, 0);
     ck_assert_int_eq(visitCnt, 200);
-    // finally
-    UA_NodeStore_delete(ns);
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_unregister_thread();
-#endif
 }
 END_TEST
 
 START_TEST(failToFindNonExistantNodeInUA_NodeStoreWithSeveralEntries) {
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_register_thread();
-#endif
-    // given
-    UA_NodeStore *ns = UA_NodeStore_new();
     UA_Node* n1 = createNode(0,2253);
     UA_NodeStore_insert(ns, n1);
     UA_Node* n2 = createNode(0,2255);
@@ -246,16 +173,8 @@ START_TEST(failToFindNonExistantNodeInUA_NodeStoreWithSeveralEntries) {
     UA_NodeStore_insert(ns, n5);
 
     UA_NodeId id = UA_NODEID_NUMERIC(0, 12);
-
-    // when
     const UA_Node* nr = UA_NodeStore_get(ns, &id);
-    // then
     ck_assert_int_eq((uintptr_t)nr, 0);
-    // finally
-    UA_NodeStore_delete(ns);
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_unregister_thread();
-#endif
 }
 END_TEST
 
@@ -265,7 +184,6 @@ END_TEST
 
 #ifdef UA_ENABLE_MULTITHREADING
 struct UA_NodeStoreProfileTest {
-    UA_NodeStore *ns;
     UA_Int32 min_val;
     UA_Int32 max_val;
     UA_Int32 rounds;
@@ -273,33 +191,27 @@ struct UA_NodeStoreProfileTest {
 
 static void *profileGetThread(void *arg) {
     rcu_register_thread();
+    UA_RCU_LOCK();
     struct UA_NodeStoreProfileTest *test = (struct UA_NodeStoreProfileTest*) arg;
     UA_NodeId id;
     UA_NodeId_init(&id);
     UA_Int32 max_val = test->max_val;
-    UA_NodeStore *ns = test->ns;
     for(UA_Int32 x = 0; x<test->rounds; x++) {
         for(UA_Int32 i=test->min_val; i<max_val; i++) {
             id.identifier.numeric = i;
             UA_NodeStore_get(ns,&id);
         }
     }
+    UA_RCU_UNLOCK();
     rcu_unregister_thread();
-    
     return NULL;
 }
 #endif
 
 START_TEST(profileGetDelete) {
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_register_thread();
-#endif
-
 #define N 1000000
-    UA_NodeStore *ns = UA_NodeStore_new();
-    UA_Node *n;
-    for (int i=0; i<N; i++) {
-        n = createNode(0,i);
+    for(UA_UInt32 i = 0; i < N; i++) {
+        UA_Node *n = createNode(0,i);
         UA_NodeStore_insert(ns, n);
     }
     clock_t begin, end;
@@ -309,7 +221,7 @@ START_TEST(profileGetDelete) {
     pthread_t t[THREADS];
     struct UA_NodeStoreProfileTest p[THREADS];
     for (int i = 0; i < THREADS; i++) {
-        p[i] = (struct UA_NodeStoreProfileTest){ns, i*(N/THREADS), (i+1)*(N/THREADS), 50};
+        p[i] = (struct UA_NodeStoreProfileTest){i*(N/THREADS), (i+1)*(N/THREADS), 50};
         pthread_create(&t[i], NULL, profileGetThread, &p[i]);
     }
     for (int i = 0; i < THREADS; i++)
@@ -328,12 +240,6 @@ START_TEST(profileGetDelete) {
     end = clock();
     printf("Time for single-threaded %d create/get/delete in a namespace: %fs.\n", N, (double)(end - begin) / CLOCKS_PER_SEC);
 #endif
-
-    UA_NodeStore_delete(ns);
-
-#ifdef UA_ENABLE_MULTITHREADING
-    rcu_unregister_thread();
-#endif
 }
 END_TEST
 
@@ -341,6 +247,7 @@ static Suite * namespace_suite (void) {
     Suite *s = suite_create ("UA_NodeStore");
 
     TCase* tc_find = tcase_create ("Find");
+    tcase_add_checked_fixture(tc_find, setup, teardown);
     tcase_add_test (tc_find, findNodeInUA_NodeStoreWithSingleEntry);
     tcase_add_test (tc_find, findNodeInUA_NodeStoreWithSeveralEntries);
     tcase_add_test (tc_find, findNodeInExpandedNamespace);
@@ -349,16 +256,19 @@ static Suite * namespace_suite (void) {
     suite_add_tcase (s, tc_find);
 
     TCase *tc_replace = tcase_create("Replace");
+    tcase_add_checked_fixture(tc_replace, setup, teardown);
     tcase_add_test (tc_replace, replaceExistingNode);
     tcase_add_test (tc_replace, replaceOldNode);
     suite_add_tcase (s, tc_replace);
 
     TCase* tc_iterate = tcase_create ("Iterate");
+    tcase_add_checked_fixture(tc_iterate, setup, teardown);
     tcase_add_test (tc_iterate, iterateOverUA_NodeStoreShallNotVisitEmptyNodes);
     tcase_add_test (tc_iterate, iterateOverExpandedNamespaceShallNotVisitEmptyNodes);
     suite_add_tcase (s, tc_iterate);
     
     /* TCase* tc_profile = tcase_create ("Profile"); */
+    /* tcase_add_checked_fixture(tc_profile, setup, teardown); */
     /* tcase_add_test (tc_profile, profileGetDelete); */
     /* suite_add_tcase (s, tc_profile); */
 
@@ -367,6 +277,10 @@ static Suite * namespace_suite (void) {
 
 
 int main (void) {
+#ifdef UA_ENABLE_MULTITHREADING
+    rcu_init();
+    rcu_register_thread();
+#endif
     int number_failed = 0;
     Suite *s = namespace_suite();
     SRunner *sr = srunner_create(s);
@@ -374,5 +288,9 @@ int main (void) {
     srunner_run_all(sr, CK_NORMAL);
     number_failed += srunner_ntests_failed (sr);
     srunner_free(sr);
+#ifdef UA_ENABLE_MULTITHREADING
+    rcu_barrier();
+    rcu_unregister_thread();
+#endif
     return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
 }

+ 5 - 0
tests/check_server_jobs.c

@@ -31,11 +31,16 @@ START_TEST(Server_addRemoveRepeatedJob) {
         .type = UA_JOBTYPE_METHODCALL,
         .job.methodCall = {.data = NULL, .method = dummyJob}
     };
+    /* The job is added to the main queue only upon the next run_iterate */
     UA_Server_addRepeatedJob(server, rj, 10, &id);
+    UA_Server_run_iterate(server, false);
 
+    /* Wait until the job has surely timed out */
     usleep(15*1000);
     UA_Server_run_iterate(server, false);
 
+    /* Wait a bit longer until the workers have picked up the dispatched job */
+    usleep(15*1000);
     ck_assert_uint_eq(*executed, true);
 
     UA_Server_removeRepeatedJob(server, id);

+ 0 - 5
tests/check_services_attributes.c

@@ -11,11 +11,6 @@
 #include "ua_config_standard.h"
 #include "server/ua_server_internal.h"
 
-#ifdef UA_ENABLE_MULTITHREADING
-#include <pthread.h>
-#include <urcu.h>
-#endif
-
 static UA_StatusCode
 readCPUTemperature(void *handle, const UA_NodeId nodeid, UA_Boolean sourceTimeStamp,
                    const UA_NumericRange *range, UA_DataValue *dataValue) {

+ 5 - 0
tests/check_services_subscriptions.c

@@ -84,6 +84,11 @@ START_TEST(Server_publishCallback) {
         ck_assert_uint_eq(sub->currentKeepAliveCount, sub->maxKeepAliveCount);
 
     UA_Server_run_iterate(server, false);
+#ifdef UA_ENABLE_MULTITHREADING
+    usleep((useconds_t)(publishingInterval * 1000) + 1000);
+    UA_Server_run_iterate(server, false);
+    usleep((useconds_t)(publishingInterval * 1000) + 1000);
+#endif
 
     LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry)
         ck_assert_uint_eq(sub->currentKeepAliveCount, sub->maxKeepAliveCount+1);