Browse Source

only send monitored items sampled before last triggered publish

StalderT 7 years ago
parent
commit
5bf7414b12
3 changed files with 39 additions and 17 deletions
  1. 28 16
      src/server/ua_subscription.c
  2. 4 1
      src/server/ua_subscription.h
  3. 7 0
      src/server/ua_subscription_datachange.c

+ 28 - 16
src/server/ua_subscription.c

@@ -33,6 +33,7 @@ UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId) {
     newItem->numMonitoredItems = 0;
     newItem->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
     TAILQ_INIT(&newItem->retransmissionQueue);
+    newItem->lastTriggeredPublishCallback = UA_DateTime_nowMonotonic();
     return newItem;
 }
 
@@ -117,7 +118,9 @@ countQueuedNotifications(UA_Subscription *sub,
                 *moreNotifications = true;
                 break;
             }
-            ++notifications;
+            /* Only count if the item was sampled before lastTriggeredPublishCallback or events */
+            if((sub->lastTriggeredPublishCallback >= qv->sampledDateTime) || (mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY))
+                ++notifications;
         }
     }
     return notifications;
@@ -165,9 +168,9 @@ UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub,
 static UA_MonitoredItem *
 selectFirstMonToIterate(UA_Subscription *sub) {
     UA_MonitoredItem *mon = LIST_FIRST(&sub->monitoredItems);
-    if(sub->lastSendMonitoredItemId > 0) {
+    if(sub->nextMonitoredItemIdToBrowse > 0) {
         while(mon) {
-            if(mon->itemId == sub->lastSendMonitoredItemId)
+            if(mon->itemId == sub->nextMonitoredItemIdToBrowse)
                 break;
             mon = LIST_NEXT(mon, listEntry);
         }
@@ -185,21 +188,24 @@ moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_MonitoredItem *mon,
                                     size_t *pos) {
     MonitoredItem_queuedValue *qv, *qv_tmp;
     while(mon) {
-        sub->lastSendMonitoredItemId = mon->itemId;
+        sub->nextMonitoredItemIdToBrowse = mon->itemId;
         TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
             if(*pos >= minsSize)
                 return;
-            UA_MonitoredItemNotification *min = &mins[*pos];
-            min->clientHandle = qv->clientHandle;
-            if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
-                min->value = qv->data.value;
-            } else {
-                /* TODO implementation for events */
+            /* Only move if the item was sampled before lastTriggeredPublishCallback or events */
+            if((sub->lastTriggeredPublishCallback >= qv->sampledDateTime) || (mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY)) {
+                UA_MonitoredItemNotification *min = &mins[*pos];
+                min->clientHandle = qv->clientHandle;
+                if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+                    min->value = qv->data.value;
+                } else {
+                    /* TODO implementation for events */
+                }
+                TAILQ_REMOVE(&mon->queue, qv, listEntry);
+                UA_free(qv);
+                --mon->currentQueueSize;
+                ++(*pos);
             }
-            TAILQ_REMOVE(&mon->queue, qv, listEntry);
-            UA_free(qv);
-            --mon->currentQueueSize;
-            ++(*pos);
         }
         mon = LIST_NEXT(mon, listEntry);
     }
@@ -403,7 +409,7 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     if(!moreNotifications) {
         /* All notifications were sent. The next time, just start at the first
          * monitoreditem. */
-        sub->lastSendMonitoredItemId = 0;
+        sub->nextMonitoredItemIdToBrowse = 0;
     } else {
         /* Repeat sending responses right away if there are more notifications
          * to send */
@@ -411,6 +417,12 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     }
 }
 
+static void
+UA_Subscription_publishTriggeredCallback(UA_Server *server, UA_Subscription *sub) {
+    sub->lastTriggeredPublishCallback = UA_DateTime_nowMonotonic();
+    UA_Subscription_publishCallback(server, sub);
+}
+
 UA_Boolean
 UA_Subscription_reachedPublishReqLimit(UA_Server *server,  UA_Session *session) {
     UA_LOG_DEBUG_SESSION(server->config.logger, session,
@@ -469,7 +481,7 @@ Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) {
 
     UA_StatusCode retval =
         UA_Server_addRepeatedCallback(server,
-                  (UA_ServerCallback)UA_Subscription_publishCallback,
+                  (UA_ServerCallback)UA_Subscription_publishTriggeredCallback,
                   sub, (UA_UInt32)sub->publishingInterval,
                   &sub->publishCallbackId);
     if(retval != UA_STATUSCODE_GOOD)

+ 4 - 1
src/server/ua_subscription.h

@@ -38,6 +38,7 @@ typedef struct UA_Event {
 typedef struct MonitoredItem_queuedValue {
     TAILQ_ENTRY(MonitoredItem_queuedValue) listEntry;
     UA_UInt32 clientHandle;
+    UA_DateTime sampledDateTime;
     union {
         UA_Event event;
         UA_DataValue value;
@@ -125,16 +126,18 @@ struct UA_Subscription {
     UA_UInt32 currentLifetimeCount;
     UA_UInt32 lastMonitoredItemId;
     UA_UInt32 numMonitoredItems;
+
     /* Publish Callback */
     UA_UInt64 publishCallbackId;
     UA_Boolean publishCallbackIsRegistered;
+    UA_DateTime lastTriggeredPublishCallback;
 
     /* MonitoredItems */
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem) monitoredItems;
     /* When the last publish response could not hold all available
      * notifications, in the next iteration, start at the monitoreditem with
      * this id. If zero, start at the first monitoreditem. */
-    UA_UInt32 lastSendMonitoredItemId;
+    UA_UInt32 nextMonitoredItemIdToBrowse;
 
     /* Retransmission Queue */
     ListOfNotificationMessages retransmissionQueue;

+ 7 - 0
src/server/ua_subscription_datachange.c

@@ -84,6 +84,10 @@ MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
         }
         UA_assert(queueItem);
 
+        /* Copy the sampled date time of the removed item to the last item */
+        MonitoredItem_queuedValue *lastQueueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
+        lastQueueItem->sampledDateTime = queueItem->sampledDateTime;
+
         /* Remove the item */
         TAILQ_REMOVE(&mon->queue, queueItem, listEntry);
         if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
@@ -253,6 +257,9 @@ sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
     TAILQ_INSERT_TAIL(&monitoredItem->queue, newQueueItem, listEntry);
     ++monitoredItem->currentQueueSize;
 
+    /* Save the sampled date time */
+    newQueueItem->sampledDateTime = UA_DateTime_nowMonotonic();
+
     /* Remove entries from the queue if required */
     MonitoredItem_ensureQueueSpace(monitoredItem);
     return true;