Kaynağa Gözat

feat(multithreading): Locks and assertions for Session, Subscription and MonitoredItem handling

Ubuntu 5 yıl önce
ebeveyn
işleme
dfdc6ae296

+ 15 - 5
src/server/ua_server.c

@@ -170,14 +170,18 @@ cleanup:
 void UA_Server_delete(UA_Server *server) {
     /* Delete all internal data */
     UA_SecureChannelManager_deleteMembers(&server->secureChannelManager);
+    UA_LOCK(server->serviceMutex);
     UA_SessionManager_deleteMembers(&server->sessionManager);
+    UA_UNLOCK(server->serviceMutex);
     UA_Array_delete(server->namespaces, server->namespacesSize, &UA_TYPES[UA_TYPES_STRING]);
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     UA_MonitoredItem *mon, *mon_tmp;
     LIST_FOREACH_SAFE(mon, &server->localMonitoredItems, listEntry, mon_tmp) {
         LIST_REMOVE(mon, listEntry);
+        UA_LOCK(server->serviceMutex);
         UA_MonitoredItem_delete(server, mon);
+        UA_UNLOCK(server->serviceMutex);
     }
 #endif
 
@@ -189,13 +193,10 @@ void UA_Server_delete(UA_Server *server) {
     UA_DiscoveryManager_deleteMembers(&server->discoveryManager, server);
 #endif
 
-#if UA_MULTITHREADING >= 100
-    UA_LOCK_DESTROY(server->networkMutex)
-    UA_LOCK_DESTROY(server->serviceMutex)
-#endif
-
     /* Clean up the Admin Session */
+    UA_LOCK(server->serviceMutex);
     UA_Session_deleteMembersCleanup(&server->adminSession, server);
+    UA_UNLOCK(server->serviceMutex);
 
     /* Clean up the work queue */
     UA_WorkQueue_cleanup(&server->workQueue);
@@ -209,6 +210,11 @@ void UA_Server_delete(UA_Server *server) {
     /* Clean up the config */
     UA_ServerConfig_clean(&server->config);
 
+#if UA_MULTITHREADING >= 100
+    UA_LOCK_DESTROY(server->networkMutex)
+    UA_LOCK_DESTROY(server->serviceMutex)
+#endif
+
     /* Delete the server itself */
     UA_free(server);
 }
@@ -216,12 +222,14 @@ void UA_Server_delete(UA_Server *server) {
 /* Recurring cleanup. Removing unused and timed-out channels and sessions */
 static void
 UA_Server_cleanup(UA_Server *server, void *_) {
+    UA_LOCK(server->serviceMutex);
     UA_DateTime nowMonotonic = UA_DateTime_nowMonotonic();
     UA_SessionManager_cleanupTimedOut(&server->sessionManager, nowMonotonic);
     UA_SecureChannelManager_cleanupTimedOut(&server->secureChannelManager, nowMonotonic);
 #ifdef UA_ENABLE_DISCOVERY
     UA_Discovery_cleanupTimedOut(server, nowMonotonic);
 #endif
+    UA_UNLOCK(server->serviceMutex);
 }
 
 /********************/
@@ -409,7 +417,9 @@ UA_Server_updateCertificate(UA_Server *server,
         LIST_FOREACH(current, &sm->sessions, pointers) {
             if (UA_ByteString_equal(oldCertificate,
                                     &current->session.header.channel->securityPolicy->localCertificate)) {
+                UA_LOCK(server->serviceMutex);
                 UA_SessionManager_removeSession(sm, &current->session.header.authenticationToken);
+                UA_UNLOCK(server->serviceMutex);
             }
         }
 

+ 12 - 1
src/server/ua_server_binary.c

@@ -441,9 +441,11 @@ processMSGDecoded(UA_Server *server, UA_SecureChannel *channel, UA_UInt32 reques
                   const UA_DataType *responseType, UA_Boolean sessionRequired) {
     /* CreateSession doesn't need a session */
     if(requestType == &UA_TYPES[UA_TYPES_CREATESESSIONREQUEST]) {
+        UA_LOCK(server->serviceMutex);
         Service_CreateSession(server, channel,
                               (const UA_CreateSessionRequest *)requestHeader,
                               (UA_CreateSessionResponse *)responseHeader);
+        UA_UNLOCK(server->serviceMutex);
 #ifdef FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION
         /* Store the authentication token and session ID so we can help fuzzing
          * by setting these values in the next request automatically */
@@ -457,9 +459,12 @@ processMSGDecoded(UA_Server *server, UA_SecureChannel *channel, UA_UInt32 reques
     /* Find the matching session */
     UA_Session *session = (UA_Session*)
         UA_SecureChannel_getSession(channel, &requestHeader->authenticationToken);
-    if(!session && !UA_NodeId_isNull(&requestHeader->authenticationToken))
+    if(!session && !UA_NodeId_isNull(&requestHeader->authenticationToken)) {
+        UA_LOCK(server->serviceMutex);
         session = UA_SessionManager_getSessionByToken(&server->sessionManager,
                                                       &requestHeader->authenticationToken);
+        UA_UNLOCK(server->serviceMutex)
+    }
 
     if(requestType == &UA_TYPES[UA_TYPES_ACTIVATESESSIONREQUEST]) {
         if(!session) {
@@ -469,9 +474,11 @@ processMSGDecoded(UA_Server *server, UA_SecureChannel *channel, UA_UInt32 reques
             return sendServiceFaultWithRequest(channel, requestHeader, responseType,
                                     requestId, UA_STATUSCODE_BADSESSIONIDINVALID);
         }
+        UA_LOCK(server->serviceMutex);
         Service_ActivateSession(server, channel, session,
                                 (const UA_ActivateSessionRequest*)requestHeader,
                                 (UA_ActivateSessionResponse*)responseHeader);
+        UA_UNLOCK(server->serviceMutex);
         return sendResponse(channel, requestId, requestHeader->requestHandle,
                             responseHeader, responseType);
     }
@@ -512,8 +519,10 @@ processMSGDecoded(UA_Server *server, UA_SecureChannel *channel, UA_UInt32 reques
                                "Service %i refused on a non-activated session",
                                requestType->binaryEncodingId);
 #endif
+        UA_LOCK(server->serviceMutex);
         UA_SessionManager_removeSession(&server->sessionManager,
                                         &session->header.authenticationToken);
+        UA_UNLOCK(server->serviceMutex);
         return sendServiceFaultWithRequest(channel, requestHeader, responseType,
                                            requestId, UA_STATUSCODE_BADSESSIONNOTACTIVATED);
     }
@@ -533,7 +542,9 @@ processMSGDecoded(UA_Server *server, UA_SecureChannel *channel, UA_UInt32 reques
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     /* The publish request is not answered immediately */
     if(requestType == &UA_TYPES[UA_TYPES_PUBLISHREQUEST]) {
+        UA_LOCK(server->serviceMutex);
         Service_Publish(server, session, (const UA_PublishRequest*)requestHeader, requestId);
+        UA_UNLOCK(server->serviceMutex);
         return UA_STATUSCODE_GOOD;
     }
 #endif

+ 4 - 0
src/server/ua_server_ns0.c

@@ -547,13 +547,17 @@ readMonitoredItems(UA_Server *server, const UA_NodeId *sessionId, void *sessionC
                    void *objectContext, size_t inputSize,
                    const UA_Variant *input, size_t outputSize,
                    UA_Variant *output) {
+    UA_LOCK(server->serviceMutex);
     UA_Session *session = UA_SessionManager_getSessionById(&server->sessionManager, sessionId);
+    UA_UNLOCK(server->serviceMutex);
     if(!session)
         return UA_STATUSCODE_BADINTERNALERROR;
     if (inputSize == 0 || !input[0].data)
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
     UA_UInt32 subscriptionId = *((UA_UInt32*)(input[0].data));
+    UA_LOCK(server->serviceMutex);
     UA_Subscription* subscription = UA_Session_getSubscriptionById(session, subscriptionId);
+    UA_UNLOCK(server->serviceMutex);
     if(!subscription)
     {
         if(LIST_EMPTY(&session->serverSubscriptions))

+ 3 - 0
src/server/ua_services_session.c

@@ -59,6 +59,8 @@ void
 Service_CreateSession(UA_Server *server, UA_SecureChannel *channel,
                       const UA_CreateSessionRequest *request,
                       UA_CreateSessionResponse *response) {
+    UA_LOCK_ASSERT(server->serviceMutex, 1);
+
     if(!channel) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
         return;
@@ -322,6 +324,7 @@ Service_ActivateSession(UA_Server *server, UA_SecureChannel *channel,
                         UA_Session *session, const UA_ActivateSessionRequest *request,
                         UA_ActivateSessionResponse *response) {
     UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Execute ActivateSession");
+    UA_LOCK_ASSERT(server->serviceMutex, 1);
 
     if(session->validTill < UA_DateTime_nowMonotonic()) {
         UA_LOG_INFO_SESSION(&server->config.logger, session,

+ 1 - 0
src/server/ua_services_subscription.c

@@ -181,6 +181,7 @@ void
 Service_Publish(UA_Server *server, UA_Session *session,
                 const UA_PublishRequest *request, UA_UInt32 requestId) {
     UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Processing PublishRequest");
+    UA_LOCK_ASSERT(server->serviceMutex, 1);
 
     /* Return an error if the session has no subscription */
     if(LIST_EMPTY(&session->serverSubscriptions)) {

+ 2 - 0
src/server/ua_session.c

@@ -87,6 +87,8 @@ void UA_Session_addSubscription(UA_Server *server, UA_Session *session, UA_Subsc
 UA_StatusCode
 UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
                               UA_UInt32 subscriptionId) {
+    UA_LOCK_ASSERT(server->serviceMutex, 1);
+
     UA_Subscription *sub = UA_Session_getSubscriptionById(session, subscriptionId);
     if(!sub)
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;

+ 18 - 1
src/server/ua_session_manager.c

@@ -24,7 +24,9 @@ UA_SessionManager_init(UA_SessionManager *sm, UA_Server *server) {
 /* Delayed callback to free the session memory */
 static void
 removeSessionCallback(UA_Server *server, session_list_entry *entry) {
+    UA_LOCK(server->serviceMutex);
     UA_Session_deleteMembersCleanup(&entry->session, server);
+    UA_UNLOCK(server->serviceMutex);
 }
 
 static void
@@ -32,6 +34,8 @@ removeSession(UA_SessionManager *sm, session_list_entry *sentry) {
     UA_Server *server = sm->server;
     UA_Session *session = &sentry->session;
 
+    UA_LOCK_ASSERT(server->serviceMutex, 1);
+
     /* Remove the Subscriptions */
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     UA_Subscription *sub, *tempsub;
@@ -47,9 +51,12 @@ removeSession(UA_SessionManager *sm, session_list_entry *sentry) {
 #endif
 
     /* Callback into userland access control */
-    if(server->config.accessControl.closeSession)
+    if(server->config.accessControl.closeSession) {
+        UA_UNLOCK(server->serviceMutex);
         server->config.accessControl.closeSession(server, &server->config.accessControl,
                                                   &session->sessionId, session->sessionHandle);
+        UA_LOCK(server->serviceMutex);
+    }
 
     /* Detach the Session from the SecureChannel */
     UA_Session_detachFromSecureChannel(session);
@@ -71,6 +78,7 @@ removeSession(UA_SessionManager *sm, session_list_entry *sentry) {
 }
 
 void UA_SessionManager_deleteMembers(UA_SessionManager *sm) {
+    UA_LOCK_ASSERT(sm->server->serviceMutex, 1);
     session_list_entry *current, *temp;
     LIST_FOREACH_SAFE(current, &sm->sessions, pointers, temp) {
         removeSession(sm, current);
@@ -80,6 +88,7 @@ void UA_SessionManager_deleteMembers(UA_SessionManager *sm) {
 void
 UA_SessionManager_cleanupTimedOut(UA_SessionManager *sm,
                                   UA_DateTime nowMonotonic) {
+    UA_LOCK_ASSERT(sm->server->serviceMutex, 1);
     session_list_entry *sentry, *temp;
     LIST_FOREACH_SAFE(sentry, &sm->sessions, pointers, temp) {
         /* Session has timed out? */
@@ -93,6 +102,8 @@ UA_SessionManager_cleanupTimedOut(UA_SessionManager *sm,
 
 UA_Session *
 UA_SessionManager_getSessionByToken(UA_SessionManager *sm, const UA_NodeId *token) {
+    UA_LOCK_ASSERT(sm->server->serviceMutex, 1);
+
     session_list_entry *current = NULL;
     LIST_FOREACH(current, &sm->sessions, pointers) {
         /* Token does not match */
@@ -124,6 +135,8 @@ UA_SessionManager_getSessionByToken(UA_SessionManager *sm, const UA_NodeId *toke
 
 UA_Session *
 UA_SessionManager_getSessionById(UA_SessionManager *sm, const UA_NodeId *sessionId) {
+    UA_LOCK_ASSERT(sm->server->serviceMutex, 1);
+
     session_list_entry *current = NULL;
     LIST_FOREACH(current, &sm->sessions, pointers) {
         /* Token does not match */
@@ -155,6 +168,8 @@ UA_SessionManager_getSessionById(UA_SessionManager *sm, const UA_NodeId *session
 UA_StatusCode
 UA_SessionManager_createSession(UA_SessionManager *sm, UA_SecureChannel *channel,
                                 const UA_CreateSessionRequest *request, UA_Session **session) {
+    UA_LOCK_ASSERT(sm->server->serviceMutex, 1);
+
     if(sm->currentSessionCount >= sm->server->config.maxSessions)
         return UA_STATUSCODE_BADTOOMANYSESSIONS;
 
@@ -181,6 +196,8 @@ UA_SessionManager_createSession(UA_SessionManager *sm, UA_SecureChannel *channel
 
 UA_StatusCode
 UA_SessionManager_removeSession(UA_SessionManager *sm, const UA_NodeId *token) {
+    UA_LOCK_ASSERT(sm->server->serviceMutex, 1);
+
     session_list_entry *current;
     LIST_FOREACH(current, &sm->sessions, pointers) {
         if(UA_NodeId_equal(&current->session.header.authenticationToken, token))

+ 8 - 0
src/server/ua_subscription.c

@@ -43,6 +43,8 @@ UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId) {
 
 void
 UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub) {
+    UA_LOCK_ASSERT(server->serviceMutex, 1);
+
     Subscription_unregisterPublishCallback(server, sub);
 
     /* Delete monitored Items */
@@ -88,6 +90,8 @@ UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemId
 UA_StatusCode
 UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
                                     UA_UInt32 monitoredItemId) {
+    UA_LOCK_ASSERT(server->serviceMutex, 1);
+
     /* Find the MonitoredItem */
     UA_MonitoredItem *mon;
     LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
@@ -345,11 +349,15 @@ UA_Subscription_nextSequenceNumber(UA_UInt32 sequenceNumber) {
 static void
 publishCallback(UA_Server *server, UA_Subscription *sub) {
     sub->readyNotifications = sub->notificationQueueSize;
+    UA_LOCK(server->serviceMutex);
     UA_Subscription_publish(server, sub);
+    UA_UNLOCK(server->serviceMutex);
 }
 
 void
 UA_Subscription_publish(UA_Server *server, UA_Subscription *sub) {
+    UA_LOCK_ASSERT(server->serviceMutex, 1);
+
     UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session, "Subscription %u | "
                          "Publish Callback", sub->subscriptionId);
     /* Dequeue a response */

+ 2 - 0
src/server/ua_subscription_monitoreditem.c

@@ -192,6 +192,8 @@ UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
 
 void
 UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+    UA_LOCK_ASSERT(server->serviceMutex, 1);
+
     /* Remove the sampling callback */
     UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
 

+ 2 - 0
tests/server/check_services_subscriptions.c

@@ -33,8 +33,10 @@ createSession(void) {
     UA_CreateSessionRequest request;
     UA_CreateSessionRequest_init(&request);
     request.requestedSessionTimeout = UA_UINT32_MAX;
+    UA_LOCK(server->serviceMutex);
     UA_StatusCode retval = UA_SessionManager_createSession(&server->sessionManager, NULL,
                                                            &request, &session);
+    UA_UNLOCK(server->serviceMutex);
     ck_assert_uint_eq(retval, 0);
 }