Browse Source

fix a memory issue in the cleanup of subscriptions

Julius Pfrommer 7 years ago
parent
commit
35ba42be09

+ 5 - 0
src/server/ua_server.c

@@ -145,9 +145,14 @@ void UA_Server_delete(UA_Server *server) {
 #endif
 #endif
 
 
 #ifdef UA_ENABLE_MULTITHREADING
 #ifdef UA_ENABLE_MULTITHREADING
+    /* Process new delayed callbacks from the cleanup */
+    UA_Server_cleanupDispatchQueue(server);
     pthread_mutex_destroy(&server->dispatchQueue_accessMutex);
     pthread_mutex_destroy(&server->dispatchQueue_accessMutex);
     pthread_cond_destroy(&server->dispatchQueue_condition);
     pthread_cond_destroy(&server->dispatchQueue_condition);
     pthread_mutex_destroy(&server->dispatchQueue_conditionMutex);
     pthread_mutex_destroy(&server->dispatchQueue_conditionMutex);
+#else
+    /* Process new delayed callbacks from the cleanup */
+    UA_Server_cleanupDelayedCallbacks(server);
 #endif
 #endif
 
 
     /* Delete the timed work */
     /* Delete the timed work */

+ 11 - 0
src/server/ua_server_internal.h

@@ -189,6 +189,17 @@ UA_StatusCode UA_Server_editNode(UA_Server *server, UA_Session *session,
 UA_StatusCode
 UA_StatusCode
 UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback, void *data);
 UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback, void *data);
 
 
+UA_StatusCode
+UA_Server_delayedFree(UA_Server *server, void *data);
+
+#ifndef UA_ENABLE_MULTITHREADING
+/* Execute all delayed callbacks regardless of whether the worker threads have
+ * finished previous work */
+void UA_Server_cleanupDelayedCallbacks(UA_Server *server);
+#else
+void UA_Server_cleanupDispatchQueue(UA_Server *server);
+#endif
+
 /* Callback is executed in the same thread or, if possible, dispatched to one of
 /* Callback is executed in the same thread or, if possible, dispatched to one of
  * the worker threads. */
  * the worker threads. */
 void
 void

+ 35 - 19
src/server/ua_server_worker.c

@@ -103,16 +103,19 @@ workerLoop(UA_Worker *worker) {
     return NULL;
     return NULL;
 }
 }
 
 
-static void
-emptyDispatchQueue(UA_Server *server) {
-    pthread_mutex_lock(&server->dispatchQueue_accessMutex);
-    WorkerCallback *dc;
-    while((dc = SIMPLEQ_FIRST(&server->dispatchQueue)) != NULL) {
+void UA_Server_cleanupDispatchQueue(UA_Server *server) {
+    while(true) {
+        pthread_mutex_lock(&server->dispatchQueue_accessMutex);
+        WorkerCallback *dc = SIMPLEQ_FIRST(&server->dispatchQueue);
+        if(!dc) {
+            pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
+            break;
+        }
         SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
         SIMPLEQ_REMOVE_HEAD(&server->dispatchQueue, next);
+        pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
         dc->callback(server, dc->data);
         dc->callback(server, dc->data);
         UA_free(dc);
         UA_free(dc);
     }
     }
-    pthread_mutex_unlock(&server->dispatchQueue_accessMutex);
 }
 }
 
 
 #endif
 #endif
@@ -169,6 +172,19 @@ UA_Server_workerCallback(UA_Server *server, UA_ServerCallback callback,
  * 3. Check regularly if the callback is ready by adding it back to the dispatch
  * 3. Check regularly if the callback is ready by adding it back to the dispatch
  *    queue. */
  *    queue. */
 
 
+/* Delayed callback to free the subscription memory */
+static void
+freeCallback(UA_Server *server, void *data) {
+    UA_free(data);
+}
+
+/* TODO: Delayed free should never fail. This can be achieved by adding a prefix
+ * with the list pointers */
+UA_StatusCode
+UA_Server_delayedFree(UA_Server *server, void *data) {
+    return UA_Server_delayedCallback(server, freeCallback, data);
+}
+
 #ifndef UA_ENABLE_MULTITHREADING
 #ifndef UA_ENABLE_MULTITHREADING
 
 
 typedef struct UA_DelayedCallback {
 typedef struct UA_DelayedCallback {
@@ -191,8 +207,7 @@ UA_Server_delayedCallback(UA_Server *server, UA_ServerCallback callback,
     return UA_STATUSCODE_GOOD;
     return UA_STATUSCODE_GOOD;
 }
 }
 
 
-static void
-processDelayedCallbacks(UA_Server *server) {
+void UA_Server_cleanupDelayedCallbacks(UA_Server *server) {
     UA_DelayedCallback *dc, *dc_tmp;
     UA_DelayedCallback *dc, *dc_tmp;
     SLIST_FOREACH_SAFE(dc, &server->delayedCallbacks, next, dc_tmp) {
     SLIST_FOREACH_SAFE(dc, &server->delayedCallbacks, next, dc_tmp) {
         SLIST_REMOVE(&server->delayedCallbacks, dc, UA_DelayedCallback, next);
         SLIST_REMOVE(&server->delayedCallbacks, dc, UA_DelayedCallback, next);
@@ -359,9 +374,10 @@ UA_Server_run_iterate(UA_Server *server, UA_Boolean waitInternal) {
     }
     }
 
 
 #ifndef UA_ENABLE_MULTITHREADING
 #ifndef UA_ENABLE_MULTITHREADING
-    /* Process delayed callbacks when all callbacks and
-     * network events are done */
-    processDelayedCallbacks(server);
+    /* Process delayed callbacks when all callbacks and network events are done.
+     * If multithreading is enabled, the cleanup of delayed values is attempted
+     * by a callback in the job queue. */
+    UA_Server_cleanupDelayedCallbacks(server);
 #endif
 #endif
 
 
 #if defined(UA_ENABLE_DISCOVERY_MULTICAST) && !defined(UA_ENABLE_MULTITHREADING)
 #if defined(UA_ENABLE_DISCOVERY_MULTICAST) && !defined(UA_ENABLE_MULTITHREADING)
@@ -394,10 +410,7 @@ UA_Server_run_shutdown(UA_Server *server) {
         nl->stop(nl, server);
         nl->stop(nl, server);
     }
     }
 
 
-#ifndef UA_ENABLE_MULTITHREADING
-    /* Process remaining delayed callbacks */
-    processDelayedCallbacks(server);
-#else
+#ifdef UA_ENABLE_MULTITHREADING
     /* Shut down the workers */
     /* Shut down the workers */
     if(server->workers) {
     if(server->workers) {
         UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
         UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
@@ -412,13 +425,16 @@ UA_Server_run_shutdown(UA_Server *server) {
         server->workers = NULL;
         server->workers = NULL;
     }
     }
 
 
-    /* Execute the remaining callbacks in the dispatch queue.
-     * This also executes the delayed callbacks. */
-    emptyDispatchQueue(server);
+    /* Execute the remaining callbacks in the dispatch queue. Also executes
+     * delayed callbacks. */
+    UA_Server_cleanupDispatchQueue(server);
+#else
+    /* Process remaining delayed callbacks */
+    UA_Server_cleanupDelayedCallbacks(server);
 #endif
 #endif
 
 
-    /* Stop multicast discovery */
 #ifdef UA_ENABLE_DISCOVERY_MULTICAST
 #ifdef UA_ENABLE_DISCOVERY_MULTICAST
+    /* Stop multicast discovery */
     if(server->config.applicationDescription.applicationType ==
     if(server->config.applicationDescription.applicationType ==
        UA_APPLICATIONTYPE_DISCOVERYSERVER)
        UA_APPLICATIONTYPE_DISCOVERYSERVER)
         stopMulticastDiscoveryServer(server);
         stopMulticastDiscoveryServer(server);

+ 11 - 22
src/server/ua_session.c

@@ -66,13 +66,13 @@ void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server* server) {
         UA_BrowseDescription_deleteMembers(&cp->browseDescription);
         UA_BrowseDescription_deleteMembers(&cp->browseDescription);
         UA_free(cp);
         UA_free(cp);
     }
     }
+
 #ifdef UA_ENABLE_SUBSCRIPTIONS
 #ifdef UA_ENABLE_SUBSCRIPTIONS
-    UA_Subscription *currents, *temps;
-    LIST_FOREACH_SAFE(currents, &session->serverSubscriptions, listEntry, temps) {
-        LIST_REMOVE(currents, listEntry);
-        UA_Subscription_deleteMembers(server, currents);
-        UA_free(currents);
+    UA_Subscription *sub, *tempsub;
+    LIST_FOREACH_SAFE(sub, &session->serverSubscriptions, listEntry, tempsub) {
+        UA_Session_deleteSubscription(server, session, sub->subscriptionId);
     }
     }
+
     UA_PublishResponseEntry *entry;
     UA_PublishResponseEntry *entry;
     while((entry = UA_Session_getPublishReq(session))) {
     while((entry = UA_Session_getPublishReq(session))) {
         UA_Session_removePublishReq(session,entry);
         UA_Session_removePublishReq(session,entry);
@@ -125,14 +125,6 @@ void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscri
     LIST_INSERT_HEAD(&session->serverSubscriptions, newSubscription, listEntry);
     LIST_INSERT_HEAD(&session->serverSubscriptions, newSubscription, listEntry);
 }
 }
 
 
-/* Delayed callback to free the subscription memory */
-static void
-removeSubscriptionCallback(UA_Server *server, void *data) {
-    UA_Subscription *sub = (UA_Subscription*)data;
-    UA_Subscription_deleteMembers(server, sub);
-    UA_free(sub);
-}
-
 UA_StatusCode
 UA_StatusCode
 UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
 UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
                               UA_UInt32 subscriptionId) {
                               UA_UInt32 subscriptionId) {
@@ -140,9 +132,11 @@ UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
     if(!sub)
     if(!sub)
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
 
 
+    UA_Subscription_deleteMembers(server, sub);
+
     /* Add a delayed callback to remove the subscription when the currently
     /* Add a delayed callback to remove the subscription when the currently
      * scheduled jobs have completed */
      * scheduled jobs have completed */
-    UA_StatusCode retval = UA_Server_delayedCallback(server, removeSubscriptionCallback, sub);
+    UA_StatusCode retval = UA_Server_delayedFree(server, sub);
     if(retval != UA_STATUSCODE_GOOD) {
     if(retval != UA_STATUSCODE_GOOD) {
         UA_LOG_WARNING_SESSION(server->config.logger, session,
         UA_LOG_WARNING_SESSION(server->config.logger, session,
                        "Could not remove subscription with error code %s",
                        "Could not remove subscription with error code %s",
@@ -150,15 +144,10 @@ UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
         return retval; /* Try again next time */
         return retval; /* Try again next time */
     }
     }
 
 
+    /* Remove from the session */
     LIST_REMOVE(sub, listEntry);
     LIST_REMOVE(sub, listEntry);
-
-    if(session->numSubscriptions > 0) {
-        session->numSubscriptions--;
-    }
-    else {
-        return UA_STATUSCODE_BADINTERNALERROR;
-    }
-
+    UA_assert(session->numSubscriptions > 0);
+    session->numSubscriptions--;
     return UA_STATUSCODE_GOOD;
     return UA_STATUSCODE_GOOD;
 }
 }
 
 

+ 1 - 2
src/server/ua_subscription.c

@@ -53,8 +53,7 @@ UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub) {
 
 
     /* Delete Retransmission Queue */
     /* Delete Retransmission Queue */
     UA_NotificationMessageEntry *nme, *nme_tmp;
     UA_NotificationMessageEntry *nme, *nme_tmp;
-    TAILQ_FOREACH_SAFE(nme, &sub->retransmissionQueue,
-                       listEntry, nme_tmp) {
+    TAILQ_FOREACH_SAFE(nme, &sub->retransmissionQueue, listEntry, nme_tmp) {
         TAILQ_REMOVE(&sub->retransmissionQueue, nme, listEntry);
         TAILQ_REMOVE(&sub->retransmissionQueue, nme, listEntry);
         UA_NotificationMessage_deleteMembers(&nme->message);
         UA_NotificationMessage_deleteMembers(&nme->message);
         UA_free(nme);
         UA_free(nme);

+ 1 - 1
src/server/ua_subscription.h

@@ -142,7 +142,7 @@ struct UA_Subscription {
 };
 };
 
 
 UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId);
 UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId);
-void UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *subscription);
+void UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub);
 UA_StatusCode Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub);
 UA_StatusCode Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub);
 UA_StatusCode Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub);
 UA_StatusCode Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub);
 
 

+ 1 - 1
src/server/ua_subscription_datachange.c

@@ -61,7 +61,7 @@ MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
     UA_String_deleteMembers(&monitoredItem->indexRange);
     UA_String_deleteMembers(&monitoredItem->indexRange);
     UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
     UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
     UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
     UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
-    UA_free(monitoredItem); // TODO: Use a delayed free
+    UA_Server_delayedFree(server, monitoredItem);
 }
 }
 
 
 void
 void