Browse Source

modify notification queues in the server fix #1631

StalderT 6 years ago
parent
commit
98a29088c9

+ 1 - 1
src/server/ua_services_subscription.c

@@ -343,7 +343,7 @@ Operation_ModifyMonitoredItem(UA_Server *server, UA_Session *session, UA_Subscri
     result->revisedQueueSize = mon->maxQueueSize;
 
     /* Remove some notifications if the queue is now too small */
-    MonitoredItem_ensureQueueSpace(mon);
+    MonitoredItem_ensureQueueSpace(sub, mon, NULL);
 }
 
 void

+ 44 - 91
src/server/ua_subscription.c

@@ -22,19 +22,21 @@
 UA_Subscription *
 UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId) {
     /* Allocate the memory */
-    UA_Subscription *newItem =
+    UA_Subscription *newSub =
         (UA_Subscription*)UA_calloc(1, sizeof(UA_Subscription));
-    if(!newItem)
+    if(!newSub)
         return NULL;
 
     /* Remaining members are covered by calloc zeroing out the memory */
-    newItem->session = session;
-    newItem->subscriptionId = subscriptionId;
-    newItem->numMonitoredItems = 0;
-    newItem->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
-    TAILQ_INIT(&newItem->retransmissionQueue);
-    newItem->lastTriggeredPublishCallback = UA_DateTime_nowMonotonic();
-    return newItem;
+    newSub->session = session;
+    newSub->subscriptionId = subscriptionId;
+    newSub->numMonitoredItems = 0;
+    newSub->readyNotifications = 0;
+    newSub->pendingNotifications = 0;
+    newSub->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
+    TAILQ_INIT(&newSub->retransmissionQueue);
+    TAILQ_INIT(&newSub->notificationQueue);
+    return newSub;
 }
 
 void
@@ -48,7 +50,6 @@ UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub) {
     /* Delete monitored Items */
     UA_MonitoredItem *mon, *tmp_mon;
     LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmp_mon) {
-        LIST_REMOVE(mon, listEntry);
         MonitoredItem_delete(server, mon);
     }
 
@@ -60,6 +61,9 @@ UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub) {
         UA_free(nme);
     }
     sub->retransmissionQueueSize = 0;
+    sub->readyNotifications = 0;
+    sub->pendingNotifications = 0;
+    sub->numMonitoredItems = 0;
 }
 
 UA_MonitoredItem *
@@ -86,7 +90,6 @@ UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
         return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
 
     /* Remove the MonitoredItem */
-    LIST_REMOVE(mon, listEntry);
     MonitoredItem_delete(server, mon);
     sub->numMonitoredItems--;
     return UA_STATUSCODE_GOOD;
@@ -103,29 +106,6 @@ UA_Subscription_getNumMonitoredItems(UA_Subscription *sub) {
     return sub->numMonitoredItems;
 }
 
-static size_t
-countQueuedNotifications(UA_Subscription *sub,
-                         UA_Boolean *moreNotifications) {
-    if(!sub->publishingEnabled)
-        return 0;
-
-    size_t notifications = 0;
-    UA_MonitoredItem *mon;
-    LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
-        MonitoredItem_queuedValue *qv;
-        TAILQ_FOREACH(qv, &mon->queue, listEntry) {
-            if(notifications >= sub->notificationsPerPublish) {
-                *moreNotifications = true;
-                break;
-            }
-            /* Only count if the item was sampled before lastTriggeredPublishCallback or events */
-            if((sub->lastTriggeredPublishCallback >= qv->sampledDateTime) || (mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY))
-                ++notifications;
-        }
-    }
-    return notifications;
-}
-
 static void
 UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub,
                                          UA_NotificationMessageEntry *entry) {
@@ -165,49 +145,29 @@ UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub,
     return UA_STATUSCODE_GOOD;
 }
 
-static UA_MonitoredItem *
-selectFirstMonToIterate(UA_Subscription *sub) {
-    UA_MonitoredItem *mon = LIST_FIRST(&sub->monitoredItems);
-    if(sub->nextMonitoredItemIdToBrowse > 0) {
-        while(mon) {
-            if(mon->itemId == sub->nextMonitoredItemIdToBrowse)
-                break;
-            mon = LIST_NEXT(mon, listEntry);
-        }
-        if(!mon)
-            mon = LIST_FIRST(&sub->monitoredItems);
-    }
-    return mon;
-}
-
 /* Iterate over the monitoreditems of the subscription, starting at mon, and
  * move notifications into the response. */
 static void
-moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_MonitoredItem *mon,
-                                    UA_MonitoredItemNotification *mins, size_t minsSize,
-                                    size_t *pos) {
+moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_MonitoredItemNotification *mins,
+                                    size_t minsSize) {
+    size_t pos = 0;
     MonitoredItem_queuedValue *qv, *qv_tmp;
-    while(mon) {
-        sub->nextMonitoredItemIdToBrowse = mon->itemId;
-        TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
-            if(*pos >= minsSize)
-                return;
-            /* Only move if the item was sampled before lastTriggeredPublishCallback or events */
-            if((sub->lastTriggeredPublishCallback >= qv->sampledDateTime) || (mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY)) {
-                UA_MonitoredItemNotification *min = &mins[*pos];
-                min->clientHandle = qv->clientHandle;
-                if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
-                    min->value = qv->data.value;
-                } else {
-                    /* TODO implementation for events */
-                }
-                TAILQ_REMOVE(&mon->queue, qv, listEntry);
-                UA_free(qv);
-                --mon->currentQueueSize;
-                ++(*pos);
-            }
+    TAILQ_FOREACH_SAFE(qv, &sub->notificationQueue, globalEntry, qv_tmp) {
+        if(pos >= minsSize)
+            return;
+        UA_MonitoredItemNotification *min = &mins[pos];
+        min->clientHandle = qv->clientHandle;
+        if(qv->mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+            min->value = qv->data.value;
+        } else {
+            /* TODO implementation for events */
         }
-        mon = LIST_NEXT(mon, listEntry);
+        TAILQ_REMOVE(&sub->notificationQueue, qv, globalEntry);
+        TAILQ_REMOVE(&qv->mon->queue, qv, listEntry);
+        --(qv->mon->currentQueueSize);
+        UA_free(qv);
+        --sub->readyNotifications;
+        ++pos;
     }
 }
 
@@ -245,18 +205,7 @@ prepareNotificationMessage(UA_Subscription *sub,
 
     /* Move notifications into the response .. the point of no return */
 
-    /* Select the first monitoredItem or the first monitoreditem after the last
-     * that was processed. */
-    UA_MonitoredItem *mon = selectFirstMonToIterate(sub);
-
-    /* Move notifications into the response */
-    size_t l = 0;
-    moveNotificationsFromMonitoredItems(sub, mon, dcn->monitoredItems, notifications, &l);
-    if(l < notifications) {
-        /* Not done. We skipped MonitoredItems. Restart at the beginning. */
-        moveNotificationsFromMonitoredItems(sub, LIST_FIRST(&sub->monitoredItems),
-                                            dcn->monitoredItems, notifications, &l);
-    }
+    moveNotificationsFromMonitoredItems(sub, dcn->monitoredItems, notifications);
 
     return UA_STATUSCODE_GOOD;
 }
@@ -298,7 +247,14 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
 
     /* Count the available notifications */
     UA_Boolean moreNotifications = false;
-    size_t notifications = countQueuedNotifications(sub, &moreNotifications);
+    size_t notifications = sub->readyNotifications;
+    if(!sub->publishingEnabled)
+        notifications = 0;
+
+    if (notifications > sub->notificationsPerPublish) {
+        notifications = sub->notificationsPerPublish;
+        moreNotifications = true;
+    }
 
     /* Return if no notifications and no keepalive */
     if(notifications == 0) {
@@ -406,11 +362,7 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
                     &UA_TYPES[UA_TYPES_UINT32]);
     UA_free(pre); /* no need for UA_PublishResponse_deleteMembers */
 
-    if(!moreNotifications) {
-        /* All notifications were sent. The next time, just start at the first
-         * monitoreditem. */
-        sub->nextMonitoredItemIdToBrowse = 0;
-    } else {
+    if(moreNotifications) {
         /* Repeat sending responses right away if there are more notifications
          * to send */
         UA_Subscription_publishCallback(server, sub);
@@ -419,7 +371,8 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
 
 static void
 UA_Subscription_publishTriggeredCallback(UA_Server *server, UA_Subscription *sub) {
-    sub->lastTriggeredPublishCallback = UA_DateTime_nowMonotonic();
+    sub->readyNotifications += sub->pendingNotifications;
+    sub->pendingNotifications = 0;
     UA_Subscription_publishCallback(server, sub);
 }
 

+ 23 - 17
src/server/ua_subscription.h

@@ -35,16 +35,6 @@ typedef struct UA_Event {
    UA_Int32 eventId;
 } UA_Event;
 
-typedef struct MonitoredItem_queuedValue {
-    TAILQ_ENTRY(MonitoredItem_queuedValue) listEntry;
-    UA_UInt32 clientHandle;
-    UA_DateTime sampledDateTime;
-    union {
-        UA_Event event;
-        UA_DataValue value;
-    } data;
-} MonitoredItem_queuedValue;
-
 typedef TAILQ_HEAD(QueuedValueQueue, MonitoredItem_queuedValue) QueuedValueQueue;
 
 typedef struct UA_MonitoredItem {
@@ -76,6 +66,19 @@ typedef struct UA_MonitoredItem {
     QueuedValueQueue queue;
 } UA_MonitoredItem;
 
+typedef struct MonitoredItem_queuedValue {
+    TAILQ_ENTRY(MonitoredItem_queuedValue) listEntry;
+    TAILQ_ENTRY(MonitoredItem_queuedValue) globalEntry;
+
+    UA_MonitoredItem *mon;
+
+    UA_UInt32 clientHandle;
+    union {
+        UA_Event event;
+        UA_DataValue value;
+    } data;
+} MonitoredItem_queuedValue;
+
 UA_MonitoredItem * UA_MonitoredItem_new(UA_MonitoredItemType);
 void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem);
 void UA_MonitoredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem);
@@ -83,8 +86,9 @@ UA_StatusCode MonitoredItem_registerSampleCallback(UA_Server *server, UA_Monitor
 UA_StatusCode MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon);
 
 /* Remove entries until mon->maxQueueSize is reached. Sets infobits for lost
- * data if required. */
-void MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon);
+ * data if required. Insert newQueuedItem in global list if non NULL */
+void
+MonitoredItem_ensureQueueSpace(UA_Subscription *sub, UA_MonitoredItem *mon, MonitoredItem_queuedValue *newQueuedItem);
 
 /****************/
 /* Subscription */
@@ -130,14 +134,16 @@ struct UA_Subscription {
     /* Publish Callback */
     UA_UInt64 publishCallbackId;
     UA_Boolean publishCallbackIsRegistered;
-    UA_DateTime lastTriggeredPublishCallback;
 
     /* MonitoredItems */
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem) monitoredItems;
-    /* When the last publish response could not hold all available
-     * notifications, in the next iteration, start at the monitoreditem with
-     * this id. If zero, start at the first monitoreditem. */
-    UA_UInt32 nextMonitoredItemIdToBrowse;
+
+    QueuedValueQueue notificationQueue;
+
+    /* count number of notifications present before last repeated publish callback */
+    UA_UInt32 readyNotifications;
+    /* count number of notifications present after last repeated publish callback */
+    UA_UInt32 pendingNotifications;
 
     /* Retransmission Queue */
     ListOfNotificationMessages retransmissionQueue;

+ 39 - 11
src/server/ua_subscription_datachange.c

@@ -46,8 +46,16 @@ MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
         MonitoredItem_queuedValue *val, *val_tmp;
         TAILQ_FOREACH_SAFE(val, &monitoredItem->queue, listEntry, val_tmp) {
             TAILQ_REMOVE(&monitoredItem->queue, val, listEntry);
+
+            /* Remove the item in the global queue */
+            TAILQ_REMOVE(&sub->notificationQueue, val, globalEntry);
             UA_DataValue_deleteMembers(&val->data.value);
             UA_free(val);
+
+            if (sub->pendingNotifications)
+                sub->pendingNotifications--;
+            else
+                sub->readyNotifications--;
         }
         monitoredItem->currentQueueSize = 0;
     } else {
@@ -65,7 +73,7 @@ MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
 }
 
 void
-MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
+MonitoredItem_ensureQueueSpace(UA_Subscription *sub, UA_MonitoredItem *mon, MonitoredItem_queuedValue *newQueuedItem) {
     UA_Boolean valueDiscarded = false;
     MonitoredItem_queuedValue *queueItem;
 #ifndef __clang_analyzer__
@@ -84,10 +92,6 @@ MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
         }
         UA_assert(queueItem);
 
-        /* Copy the sampled date time of the removed item to the last item */
-        MonitoredItem_queuedValue *lastQueueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
-        lastQueueItem->sampledDateTime = queueItem->sampledDateTime;
-
         /* Remove the item */
         TAILQ_REMOVE(&mon->queue, queueItem, listEntry);
         if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
@@ -95,6 +99,23 @@ MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
         } else {
             //TODO: event implemantation
         }
+
+        MonitoredItem_queuedValue *nextGlobalQueueItem = TAILQ_NEXT(queueItem, globalEntry);
+        TAILQ_REMOVE(&sub->notificationQueue, queueItem, globalEntry);
+
+        if(newQueuedItem) {
+            if(nextGlobalQueueItem)
+                TAILQ_INSERT_BEFORE(nextGlobalQueueItem, newQueuedItem, globalEntry);
+            else
+                TAILQ_INSERT_TAIL(&sub->notificationQueue, newQueuedItem, globalEntry);
+            newQueuedItem = NULL;
+        } else { 
+            if (sub->pendingNotifications)
+                --sub->pendingNotifications;
+            else
+                --sub->readyNotifications;
+        }
+
         UA_free(queueItem);
         --mon->currentQueueSize;
         valueDiscarded = true;
@@ -102,7 +123,7 @@ MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
 #endif
 
     if(!valueDiscarded)
-        return;
+        goto end;
 
     if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
         /* Get the element that carries the infobits */
@@ -116,13 +137,19 @@ MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
         if(mon->maxQueueSize == 1) {
             queueItem->data.value.status &= ~(UA_StatusCode) (UA_STATUSCODE_INFOTYPE_DATAVALUE |
                                                               UA_STATUSCODE_INFOBITS_OVERFLOW);
-            return;
+
+            goto end;
         }
 
         /* Add the infobits either to the newest or the new last entry */
         queueItem->data.value.hasStatus = true;
         queueItem->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
     }
+end:
+    if(newQueuedItem) {
+        TAILQ_INSERT_TAIL(&sub->notificationQueue, newQueuedItem, globalEntry);
+        ++sub->pendingNotifications;
+    }
 }
 
 /* Errors are returned as no change detected */
@@ -257,11 +284,12 @@ sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
     TAILQ_INSERT_TAIL(&monitoredItem->queue, newQueueItem, listEntry);
     ++monitoredItem->currentQueueSize;
 
-    /* Save the sampled date time */
-    newQueueItem->sampledDateTime = UA_DateTime_nowMonotonic();
 
-    /* Remove entries from the queue if required */
-    MonitoredItem_ensureQueueSpace(monitoredItem);
+    newQueueItem->mon = monitoredItem;
+
+    /* Remove entries from the queue if required and add the sample to the global queue */
+    MonitoredItem_ensureQueueSpace(sub, monitoredItem, newQueueItem);
+
     return true;
 }