Browse Source

save the position of the last monitored item sent (#1311)

StalderT 7 years ago
parent
commit
b9c9fc0510
2 changed files with 62 additions and 16 deletions
  1. 58 16
      src/server/ua_subscription.c
  2. 4 0
      src/server/ua_subscription.h

+ 58 - 16
src/server/ua_subscription.c

@@ -135,6 +135,44 @@ UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub,
     return UA_STATUSCODE_GOOD;
 }
 
+static UA_MonitoredItem *
+selectFirstMonToIterate(UA_Subscription *sub) {
+    UA_MonitoredItem *mon = LIST_FIRST(&sub->monitoredItems);
+    if(sub->lastSendMonitoredItemId > 0) {
+        while(mon) {
+            if(mon->itemId == sub->lastSendMonitoredItemId)
+                break;
+        }
+        if(!mon)
+            mon = LIST_FIRST(&sub->monitoredItems);
+    }
+    return mon;
+}
+
+/* Iterate over the monitoreditems of the subscription, starting at mon, and
+ * move notifications into the response. */
+static void
+moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_MonitoredItem *mon,
+                                    UA_MonitoredItemNotification *mins, size_t minsSize,
+                                    size_t *pos) {
+    MonitoredItem_queuedValue *qv, *qv_tmp;
+    while(mon) {
+        sub->lastSendMonitoredItemId = mon->itemId;
+        TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
+            if(*pos >= minsSize)
+                return;
+            UA_MonitoredItemNotification *min = &mins[*pos];
+            min->clientHandle = qv->clientHandle;
+            min->value = qv->value;
+            TAILQ_REMOVE(&mon->queue, qv, listEntry);
+            UA_free(qv);
+            --mon->currentQueueSize;
+            ++(*pos);
+        }
+        mon = LIST_NEXT(mon, listEntry);
+    }
+}
+
 static UA_StatusCode
 prepareNotificationMessage(UA_Subscription *sub,
                            UA_NotificationMessage *message,
@@ -168,22 +206,20 @@ prepareNotificationMessage(UA_Subscription *sub,
     dcn->monitoredItemsSize = notifications;
 
     /* Move notifications into the response .. the point of no return */
+
+    /* Select the first monitoredItem or the first monitoreditem after the last
+     * that was processed. */
+    UA_MonitoredItem *mon = selectFirstMonToIterate(sub);
+
+    /* Move notifications into the response */
     size_t l = 0;
-    UA_MonitoredItem *mon;
-    LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
-        MonitoredItem_queuedValue *qv, *qv_tmp;
-        TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
-            if(l >= notifications)
-                return UA_STATUSCODE_GOOD;
-            UA_MonitoredItemNotification *min = &dcn->monitoredItems[l];
-            min->clientHandle = qv->clientHandle;
-            min->value = qv->value;
-            TAILQ_REMOVE(&mon->queue, qv, listEntry);
-            UA_free(qv);
-            --mon->currentQueueSize;
-            ++l;
-        }
+    moveNotificationsFromMonitoredItems(sub, mon, dcn->monitoredItems, notifications, &l);
+    if(l < notifications) {
+        /* Not done. We skipped MonitoredItems. Restart at the beginning. */
+        moveNotificationsFromMonitoredItems(sub, LIST_FIRST(&sub->monitoredItems),
+                                            dcn->monitoredItems, notifications, &l);
     }
+
     return UA_STATUSCODE_GOOD;
 }
 
@@ -319,9 +355,15 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
                     &UA_TYPES[UA_TYPES_UINT32]);
     UA_free(pre); /* no need for UA_PublishResponse_deleteMembers */
 
-    /* Repeat if there are more notifications to send */
-    if(moreNotifications)
+    if(!moreNotifications) {
+        /* All notifications were sent. The next time, just start at the first
+         * monitoreditem. */
+        sub->lastSendMonitoredItemId = 0;
+    } else {
+        /* Repeat sending responses right away if there are more notifications
+         * to send */
         UA_Subscription_publishCallback(server, sub);
+    }
 }
 
 UA_StatusCode

+ 4 - 0
src/server/ua_subscription.h

@@ -109,6 +109,10 @@ struct UA_Subscription {
 
     /* MonitoredItems */
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem) monitoredItems;
+    /* When the last publish response could not hold all available
+     * notifications, in the next iteration, start at the monitoreditem with
+     * this id. If zero, start at the first monitoreditem. */
+    UA_UInt32 lastSendMonitoredItemId;
 
     /* Retransmission Queue */
     ListOfNotificationMessages retransmissionQueue;