Parcourir la source

simplify notifications handling

Julius Pfrommer il y a 6 ans
Parent
commit
721843e9fe

+ 21 - 20
src/server/ua_services_subscription.c

@@ -343,7 +343,7 @@ Operation_ModifyMonitoredItem(UA_Server *server, UA_Session *session, UA_Subscri
     result->revisedQueueSize = mon->maxQueueSize;
 
     /* Remove some notifications if the queue is now too small */
-    MonitoredItem_ensureQueueSpace(sub, mon, NULL);
+    MonitoredItem_ensureQueueSpace(mon);
 }
 
 void
@@ -375,9 +375,9 @@ Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,
 
     response->responseHeader.serviceResult =
         UA_Server_processServiceOperations(server, session,
-                                           (UA_ServiceOperation)Operation_ModifyMonitoredItem, sub,
-                                           &request->itemsToModifySize, &UA_TYPES[UA_TYPES_MONITOREDITEMMODIFYREQUEST],
-                                           &response->resultsSize, &UA_TYPES[UA_TYPES_MONITOREDITEMMODIFYRESULT]);
+                  (UA_ServiceOperation)Operation_ModifyMonitoredItem, sub,
+                  &request->itemsToModifySize, &UA_TYPES[UA_TYPES_MONITOREDITEMMODIFYREQUEST],
+                  &response->resultsSize, &UA_TYPES[UA_TYPES_MONITOREDITEMMODIFYRESULT]);
 }
 
 struct setMonitoringContext {
@@ -414,19 +414,23 @@ Operation_SetMonitoringMode(UA_Server *server, UA_Session *session,
     if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) {
         MonitoredItem_registerSampleCallback(server, mon);
     } else {
+        MonitoredItem_unregisterSampleCallback(server, mon);
+
         // TODO correctly implement SAMPLING
-        /*  Setting the mode to DISABLED or SAMPLING causes all queued Notifications to be delete */
+        /*  Setting the mode to DISABLED or SAMPLING causes all queued Notifications to be deleted */
         UA_Notification *notification, *notification_tmp;
         TAILQ_FOREACH_SAFE(notification, &mon->queue, listEntry, notification_tmp) {
             TAILQ_REMOVE(&mon->queue, notification, listEntry);
+            TAILQ_REMOVE(&smc->sub->notificationQueue, notification, globalEntry);
+            --smc->sub->notificationQueueSize;
+
             UA_DataValue_deleteMembers(&notification->data.value);
             UA_free(notification);
         }
-        mon->currentQueueSize = 0;
+        mon->queueSize = 0;
 
-        /* initialize lastSampledValue */
+        /* Initialize lastSampledValue */
         UA_ByteString_deleteMembers(&mon->lastSampledValue);
-        MonitoredItem_unregisterSampleCallback(server, mon);
     }
 }
 
@@ -434,8 +438,7 @@ void
 Service_SetMonitoringMode(UA_Server *server, UA_Session *session,
                           const UA_SetMonitoringModeRequest *request,
                           UA_SetMonitoringModeResponse *response) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, session,
-                         "Processing SetMonitoringMode");
+    UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing SetMonitoringMode");
 
     if(server->config.maxMonitoredItemsPerCall != 0 &&
        request->monitoredItemIdsSize > server->config.maxMonitoredItemsPerCall) {
@@ -555,8 +558,7 @@ Service_Publish(UA_Server *server, UA_Session *session,
     if(session->lastSeenSubscriptionId > 0) {
         /* If we found anything one the first loop or if there are LATE 
          * in the list before lastSeenSubscriptionId and not LATE after 
-         * lastSeenSubscriptionId we need a second loop.
-         */
+         * lastSeenSubscriptionId we need a second loop. */
         loopCount = 2;
         /* We must find the last seen subscription id  */
         found = false;
@@ -604,11 +606,11 @@ void
 Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
                             const UA_DeleteSubscriptionsRequest *request,
                             UA_DeleteSubscriptionsResponse *response) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, session,
-                         "Processing DeleteSubscriptionsRequest");
+    UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing DeleteSubscriptionsRequest");
 
     response->responseHeader.serviceResult =
-        UA_Server_processServiceOperations(server, session, (UA_ServiceOperation)Operation_DeleteSubscription, NULL,
+        UA_Server_processServiceOperations(server, session,
+                                           (UA_ServiceOperation)Operation_DeleteSubscription, NULL,
                                            &request->subscriptionIdsSize, &UA_TYPES[UA_TYPES_UINT32],
                                            &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]);
 
@@ -630,8 +632,7 @@ void
 Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
                              const UA_DeleteMonitoredItemsRequest *request,
                              UA_DeleteMonitoredItemsResponse *response) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, session,
-                         "Processing DeleteMonitoredItemsRequest");
+    UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing DeleteMonitoredItemsRequest");
 
     if(server->config.maxMonitoredItemsPerCall != 0 &&
        request->monitoredItemIdsSize > server->config.maxMonitoredItemsPerCall) {
@@ -650,7 +651,8 @@ Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
     sub->currentLifetimeCount = 0;
 
     response->responseHeader.serviceResult =
-        UA_Server_processServiceOperations(server, session, (UA_ServiceOperation)Operation_DeleteMonitoredItem, sub,
+        UA_Server_processServiceOperations(server, session,
+                                           (UA_ServiceOperation)Operation_DeleteMonitoredItem, sub,
                                            &request->monitoredItemIdsSize, &UA_TYPES[UA_TYPES_UINT32],
                                            &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]);
 }
@@ -659,8 +661,7 @@ void
 Service_Republish(UA_Server *server, UA_Session *session,
                   const UA_RepublishRequest *request,
                   UA_RepublishResponse *response) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, session,
-                         "Processing RepublishRequest");
+    UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing RepublishRequest");
 
     /* Get the subscription */
     UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId);

+ 18 - 24
src/server/ua_subscription.c

@@ -31,8 +31,7 @@ UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId) {
     newSub->session = session;
     newSub->subscriptionId = subscriptionId;
     newSub->numMonitoredItems = 0;
-    newSub->readyNotifications = 0;
-    newSub->pendingNotifications = 0;
+    newSub->notificationQueueSize = 0;
     newSub->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
     TAILQ_INIT(&newSub->retransmissionQueue);
     TAILQ_INIT(&newSub->notificationQueue);
@@ -61,8 +60,7 @@ UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub) {
         UA_free(nme);
     }
     sub->retransmissionQueueSize = 0;
-    sub->readyNotifications = 0;
-    sub->pendingNotifications = 0;
+    sub->notificationQueueSize = 0;
     sub->numMonitoredItems = 0;
 }
 
@@ -155,18 +153,24 @@ moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_MonitoredItemNotifi
     TAILQ_FOREACH_SAFE(notification, &sub->notificationQueue, globalEntry, notification_tmp) {
         if(pos >= minsSize)
             return;
+
+        UA_MonitoredItem *mon = notification->mon;
+
+        /* Remove the notification from the queues */
+        TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
+        TAILQ_REMOVE(&mon->queue, notification, listEntry);
+        --mon->queueSize;
+        --sub->notificationQueueSize;
+
+        /* Move the content to the response */
         UA_MonitoredItemNotification *min = &mins[pos];
-        min->clientHandle = notification->mon->clientHandle;
-        if(notification->mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+        min->clientHandle = mon->clientHandle;
+        if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
             min->value = notification->data.value;
         } else {
             /* TODO implementation for events */
         }
-        TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
-        TAILQ_REMOVE(&notification->mon->queue, notification, listEntry);
-        --(notification->mon->currentQueueSize);
         UA_free(notification);
-        --sub->readyNotifications;
         ++pos;
     }
 }
@@ -247,11 +251,11 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
 
     /* Count the available notifications */
     UA_Boolean moreNotifications = false;
-    size_t notifications = sub->readyNotifications;
+    size_t notifications = sub->notificationQueueSize;
     if(!sub->publishingEnabled)
         notifications = 0;
 
-    if (notifications > sub->notificationsPerPublish) {
+    if(notifications > sub->notificationsPerPublish) {
         notifications = sub->notificationsPerPublish;
         moreNotifications = true;
     }
@@ -369,19 +373,11 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     }
 }
 
-static void
-UA_Subscription_publishTriggeredCallback(UA_Server *server, UA_Subscription *sub) {
-    sub->readyNotifications += sub->pendingNotifications;
-    sub->pendingNotifications = 0;
-    UA_Subscription_publishCallback(server, sub);
-}
-
 UA_Boolean
 UA_Subscription_reachedPublishReqLimit(UA_Server *server,  UA_Session *session) {
     UA_LOG_DEBUG_SESSION(server->config.logger, session,
                          "Reached number of publish request limit");
 
-
     /* Dequeue a response */
     UA_PublishResponseEntry *pre = UA_Session_getPublishReq(session);
 
@@ -433,10 +429,8 @@ Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) {
         return UA_STATUSCODE_GOOD;
 
     UA_StatusCode retval =
-        UA_Server_addRepeatedCallback(server,
-                  (UA_ServerCallback)UA_Subscription_publishTriggeredCallback,
-                  sub, (UA_UInt32)sub->publishingInterval,
-                  &sub->publishCallbackId);
+        UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_Subscription_publishCallback,
+                                      sub, (UA_UInt32)sub->publishingInterval, &sub->publishCallbackId);
     if(retval != UA_STATUSCODE_GOOD)
         return retval;
 

+ 13 - 16
src/server/ua_subscription.h

@@ -43,6 +43,7 @@ typedef struct UA_Notification {
 
     UA_MonitoredItem *mon;
 
+    /* See the monitoredItemType of the MonitoredItem */
     union {
         UA_Event event;
         UA_DataValue value;
@@ -53,31 +54,33 @@ typedef TAILQ_HEAD(NotificationQueue, UA_Notification) NotificationQueue;
 
 struct UA_MonitoredItem {
     LIST_ENTRY(UA_MonitoredItem) listEntry;
-
-    /* Settings */
     UA_Subscription *subscription;
+
+    /* Identifier */
     UA_UInt32 monitoredItemId;
+    UA_UInt32 clientHandle;
+
+    /* Settings */
     UA_MonitoredItemType monitoredItemType;
     UA_TimestampsToReturn timestampsToReturn;
     UA_MonitoringMode monitoringMode;
     UA_NodeId monitoredNodeId;
     UA_UInt32 attributeId;
-    UA_UInt32 clientHandle;
+    UA_String indexRange;
     UA_Double samplingInterval; // [ms]
-    UA_UInt32 currentQueueSize;
     UA_UInt32 maxQueueSize;
     UA_Boolean discardOldest;
-    UA_String indexRange;
     // TODO: dataEncoding is hardcoded to UA binary
     UA_DataChangeTrigger trigger;
 
     /* Sample Callback */
     UA_UInt64 sampleCallbackId;
+    UA_ByteString lastSampledValue;
     UA_Boolean sampleCallbackIsRegistered;
 
-    /* Sample Queue */
-    UA_ByteString lastSampledValue;
+    /* Notification Queue */
     NotificationQueue queue;
+    UA_UInt32 queueSize;
 };
 
 UA_MonitoredItem * UA_MonitoredItem_new(UA_MonitoredItemType);
@@ -87,10 +90,8 @@ UA_StatusCode MonitoredItem_registerSampleCallback(UA_Server *server, UA_Monitor
 UA_StatusCode MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon);
 
 /* Remove entries until mon->maxQueueSize is reached. Sets infobits for lost
- * data if required. Insert newQueuedItem in global list if non NULL */
-void
-MonitoredItem_ensureQueueSpace(UA_Subscription *sub, UA_MonitoredItem *mon,
-                               UA_Notification *newNotification);
+ * data if required. */
+void MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon);
 
 /****************/
 /* Subscription */
@@ -141,11 +142,7 @@ struct UA_Subscription {
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem) monitoredItems;
 
     NotificationQueue notificationQueue;
-
-    /* count number of notifications present before last repeated publish callback */
-    UA_UInt32 readyNotifications;
-    /* count number of notifications present after last repeated publish callback */
-    UA_UInt32 pendingNotifications;
+    UA_UInt32 notificationQueueSize;
 
     /* Retransmission Queue */
     ListOfNotificationMessages retransmissionQueue;

+ 38 - 52
src/server/ua_subscription_datachange.c

@@ -45,19 +45,15 @@ MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
         /* Clear the queued samples */
         UA_Notification *notification, *notification_tmp;
         TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue, listEntry, notification_tmp) {
+            /* Remove the item from the queues */
             TAILQ_REMOVE(&monitoredItem->queue, notification, listEntry);
-
-            /* Remove the item in the global queue */
             TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
+            --sub->notificationQueueSize;
+
             UA_DataValue_deleteMembers(&notification->data.value);
             UA_free(notification);
-
-            if (sub->pendingNotifications)
-                sub->pendingNotifications--;
-            else
-                sub->readyNotifications--;
         }
-        monitoredItem->currentQueueSize = 0;
+        monitoredItem->queueSize = 0;
     } else {
         /* TODO: Access val data.event */
         UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
@@ -73,16 +69,17 @@ MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
 }
 
 void
-MonitoredItem_ensureQueueSpace(UA_Subscription *sub, UA_MonitoredItem *mon,
-                               UA_Notification *newNotification) {
+MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
     UA_Boolean valueDiscarded = false;
-    UA_Notification *notification;
-#ifndef __clang_analyzer__
-    while(mon->currentQueueSize > mon->maxQueueSize) {
+    UA_Subscription *sub = mon->subscription;
+
+    /* Remove notifications until the queue size is reached */
+    while(mon->queueSize > mon->maxQueueSize) {
         /* maxQueuesize is at least 1 */
-        UA_assert(mon->currentQueueSize >= 2);
+        UA_assert(mon->queueSize >= 2);
 
         /* Get the item to remove. New items are added to the end */
+        UA_Notification *notification = NULL;
         if(mon->discardOldest) {
             /* Remove the oldest */
             notification = TAILQ_FIRST(&mon->queue);
@@ -95,62 +92,49 @@ MonitoredItem_ensureQueueSpace(UA_Subscription *sub, UA_MonitoredItem *mon,
 
         /* Remove the item */
         TAILQ_REMOVE(&mon->queue, notification, listEntry);
+        TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
+        --mon->queueSize;
+        --sub->notificationQueueSize;
+
+        /* Free the notification */
         if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
             UA_DataValue_deleteMembers(&notification->data.value);
         } else {
-            //TODO: event implemantation
-        }
-
-        UA_Notification *nextGlobalNotification = TAILQ_NEXT(notification, globalEntry);
-        TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
-
-        if(newNotification) {
-            if(nextGlobalNotification)
-                TAILQ_INSERT_BEFORE(nextGlobalNotification, newNotification, globalEntry);
-            else
-                TAILQ_INSERT_TAIL(&sub->notificationQueue, newNotification, globalEntry);
-            newNotification = NULL;
-        } else { 
-            if (sub->pendingNotifications)
-                --sub->pendingNotifications;
-            else
-                --sub->readyNotifications;
+            /* TODO: event implemantation */
         }
 
+        /* Work around a false positive in clang analyzer */
+#ifndef __clang_analyzer__
         UA_free(notification);
-        --mon->currentQueueSize;
+#endif
         valueDiscarded = true;
     }
-#endif
 
     if(!valueDiscarded)
-        goto end;
-
+        return;
+            
     if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
         /* Get the element that carries the infobits */
+        UA_Notification *notification = NULL;
         if(mon->discardOldest)
             notification = TAILQ_FIRST(&mon->queue);
         else
             notification = TAILQ_LAST(&mon->queue, NotificationQueue);
         UA_assert(notification);
 
-        /* If the queue size is reduced to one, remove the infobits */
-        if(mon->maxQueueSize == 1) {
-            notification->data.value.status &= ~(UA_StatusCode) (UA_STATUSCODE_INFOTYPE_DATAVALUE |
-                                                              UA_STATUSCODE_INFOBITS_OVERFLOW);
-
-            goto end;
+        if(mon->maxQueueSize > 1) {
+            /* Add the infobits either to the newest or the new last entry */
+            notification->data.value.hasStatus = true;
+            notification->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
+                                                UA_STATUSCODE_INFOBITS_OVERFLOW);
+        } else {
+            /* If the queue size is reduced to one, remove the infobits */
+            notification->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
+                                                                UA_STATUSCODE_INFOBITS_OVERFLOW);
         }
-
-        /* Add the infobits either to the newest or the new last entry */
-        notification->data.value.hasStatus = true;
-        notification->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
-    }
-end:
-    if(newNotification) {
-        TAILQ_INSERT_TAIL(&sub->notificationQueue, newNotification, globalEntry);
-        ++sub->pendingNotifications;
     }
+
+    /* TODO: Infobits for Events? */
 }
 
 /* Errors are returned as no change detected */
@@ -284,10 +268,12 @@ sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
 
     /* Add the sample to the queue for publication */
     TAILQ_INSERT_TAIL(&monitoredItem->queue, newNotification, listEntry);
-    ++monitoredItem->currentQueueSize;
+    TAILQ_INSERT_TAIL(&sub->notificationQueue, newNotification, globalEntry);
+    ++monitoredItem->queueSize;
+    ++sub->notificationQueueSize;
 
     /* Remove entries from the queue if required and add the sample to the global queue */
-    MonitoredItem_ensureQueueSpace(sub, monitoredItem, newNotification);
+    MonitoredItem_ensureQueueSpace(monitoredItem);
 
     return true;
 }

+ 7 - 7
tests/server/check_services_subscriptions.c

@@ -327,7 +327,7 @@ START_TEST(Server_overflow) {
     }
     ck_assert_ptr_ne(mon, NULL);
     UA_assert(mon);
-    ck_assert_uint_eq(mon->currentQueueSize, 1); 
+    ck_assert_uint_eq(mon->queueSize, 1); 
     ck_assert_uint_eq(mon->maxQueueSize, 3); 
     UA_Notification *notification;
     notification = TAILQ_LAST(&mon->queue, NotificationQueue);
@@ -335,21 +335,21 @@ START_TEST(Server_overflow) {
 
     UA_ByteString_deleteMembers(&mon->lastSampledValue);
     UA_MonitoredItem_SampleCallback(server, mon);
-    ck_assert_uint_eq(mon->currentQueueSize, 2); 
+    ck_assert_uint_eq(mon->queueSize, 2); 
     ck_assert_uint_eq(mon->maxQueueSize, 3); 
     notification = TAILQ_LAST(&mon->queue, NotificationQueue);
     ck_assert_uint_eq(notification->data.value.hasStatus, false);
 
     UA_ByteString_deleteMembers(&mon->lastSampledValue);
     UA_MonitoredItem_SampleCallback(server, mon);
-    ck_assert_uint_eq(mon->currentQueueSize, 3); 
+    ck_assert_uint_eq(mon->queueSize, 3); 
     ck_assert_uint_eq(mon->maxQueueSize, 3); 
     notification = TAILQ_LAST(&mon->queue, NotificationQueue);
     ck_assert_uint_eq(notification->data.value.hasStatus, false);
 
     UA_ByteString_deleteMembers(&mon->lastSampledValue);
     UA_MonitoredItem_SampleCallback(server, mon);
-    ck_assert_uint_eq(mon->currentQueueSize, 3); 
+    ck_assert_uint_eq(mon->queueSize, 3); 
     ck_assert_uint_eq(mon->maxQueueSize, 3); 
     notification = TAILQ_FIRST(&mon->queue);
     ck_assert_uint_eq(notification->data.value.hasStatus, true);
@@ -387,7 +387,7 @@ START_TEST(Server_overflow) {
     UA_MonitoredItemModifyRequest_deleteMembers(&itemToModify);
     UA_ModifyMonitoredItemsResponse_deleteMembers(&modifyMonitoredItemsResponse);
 
-    ck_assert_uint_eq(mon->currentQueueSize, 2); 
+    ck_assert_uint_eq(mon->queueSize, 2); 
     ck_assert_uint_eq(mon->maxQueueSize, 2); 
     notification = TAILQ_FIRST(&mon->queue);
     ck_assert_uint_eq(notification->data.value.hasStatus, true);
@@ -417,7 +417,7 @@ START_TEST(Server_overflow) {
     UA_MonitoredItemModifyRequest_deleteMembers(&itemToModify);
     UA_ModifyMonitoredItemsResponse_deleteMembers(&modifyMonitoredItemsResponse);
 
-    ck_assert_uint_eq(mon->currentQueueSize, 1); 
+    ck_assert_uint_eq(mon->queueSize, 1); 
     ck_assert_uint_eq(mon->maxQueueSize, 1); 
     notification = TAILQ_LAST(&mon->queue, NotificationQueue);
     ck_assert_uint_eq(notification->data.value.hasStatus, false);
@@ -447,7 +447,7 @@ START_TEST(Server_overflow) {
     UA_ModifyMonitoredItemsResponse_deleteMembers(&modifyMonitoredItemsResponse);
 
     UA_MonitoredItem_SampleCallback(server, mon);
-    ck_assert_uint_eq(mon->currentQueueSize, 1); 
+    ck_assert_uint_eq(mon->queueSize, 1); 
     ck_assert_uint_eq(mon->maxQueueSize, 1); 
     notification = TAILQ_FIRST(&mon->queue);
     ck_assert_uint_eq(notification->data.value.hasStatus, false); /* the infobit is only set if the queue is larger than one */