Переглянути джерело

Subscriptions: Add sampling mode for MonitoredItems

Julius Pfrommer 4 роки тому
батько
коміт
383b23f639

+ 37 - 1
src/server/ua_services_monitoreditem.c

@@ -407,6 +407,7 @@ Operation_SetMonitoringMode(UA_Server *server, UA_Session *session,
         *result = UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
         return;
     }
+    UA_Subscription *sub = mon->subscription;
 
     /* Check if the MonitoringMode is valid or not */
     if(smc->monitoringMode > UA_MONITORINGMODE_REPORTING) {
@@ -420,12 +421,47 @@ Operation_SetMonitoringMode(UA_Server *server, UA_Session *session,
 
     mon->monitoringMode = smc->monitoringMode;
 
+    /* When reporting is enabled, put all notifications that were already
+     * sampled into the global queue of the subscription. When sampling is
+     * enabled, remove all notifications from the global queue. !!! This needs
+     * to be the same operation as in UA_Notification_enqueue !!! */
     if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) {
+        UA_Notification *notification;
+        TAILQ_FOREACH(notification, &mon->queue, listEntry) {
+            TAILQ_INSERT_TAIL(&sub->notificationQueue, notification, globalEntry);
+            ++sub->notificationQueueSize;
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+            if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
+                ++sub->eventNotifications;
+            } else
+#endif
+            {
+                ++sub->dataChangeNotifications;
+            }
+        }
+        /* Register the sampling callback with an interval */
+        *result = UA_MonitoredItem_registerSampleCallback(server, mon);
+    } else if(mon->monitoringMode == UA_MONITORINGMODE_SAMPLING) {
+        UA_Notification *notification;
+        TAILQ_FOREACH(notification, &mon->queue, listEntry) {
+            TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
+            TAILQ_NEXT(notification, globalEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL;
+            --sub->notificationQueueSize;
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+            if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
+                --sub->eventNotifications;
+            } else
+#endif
+            {
+                --sub->dataChangeNotifications;
+            }
+        }
+        /* Register the sampling callback with an interval */
         *result = UA_MonitoredItem_registerSampleCallback(server, mon);
     } else {
+        /* UA_MONITORINGMODE_DISABLED */
         UA_MonitoredItem_unregisterSampleCallback(server, mon);
 
-        // TODO correctly implement SAMPLING
         /* Setting the mode to DISABLED or SAMPLING causes all queued Notifications to be deleted */
         UA_Notification *notification, *notification_tmp;
         TAILQ_FOREACH_SAFE(notification, &mon->queue, listEntry, notification_tmp) {

+ 4 - 0
src/server/ua_subscription.h

@@ -32,6 +32,10 @@ _UA_BEGIN_DECLS
         else DST = SRC;                                \
     }
 
+/* Set to the TAILQ_NEXT pointer of a notification, the sentinel that the
+ * notification was not added to the global queue */
+#define UA_SUBSCRIPTION_QUEUE_SENTINEL ((UA_Notification*)0x01)
+
 /**
  * MonitoredItems create Notifications. Subscriptions collect Notifications from
  * (several) MonitoredItems and publish them to the client.

+ 58 - 35
src/server/ua_subscription_monitoreditem.c

@@ -80,20 +80,27 @@ createEventOverflowNotification(UA_Server *server, UA_Subscription *sub,
         return retval;
     }
 
-    /* Insert before the "indicator notification". This is either first in
-     * the queue (if the oldest notification was removed) or before the new
-     * event that remains the last element of the queue. */
+    /* Insert before the "indicator notification". This is either first in the
+     * queue (if the oldest notification was removed) or before the new event
+     * that remains the last element of the queue. */
     TAILQ_INSERT_BEFORE(indicator, overflowNotification, listEntry);
-    TAILQ_INSERT_BEFORE(indicator, overflowNotification, globalEntry);
     ++mon->eventOverflows;
     ++mon->queueSize;
-    ++sub->notificationQueueSize;
-    ++sub->eventNotifications;
+
+    TAILQ_NEXT(overflowNotification, globalEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL;
+    if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) {
+        TAILQ_INSERT_BEFORE(indicator, overflowNotification, globalEntry);
+        ++sub->notificationQueueSize;
+        ++sub->eventNotifications;
+    }
     return UA_STATUSCODE_GOOD;
 }
 
 #endif
 
+/* !!! The enqueue and dequeue operations need to match the reporting
+ * disable/enable logic in Operation_SetMonitoringMode !!! */
+
 void
 UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
                         UA_MonitoredItem *mon, UA_Notification *n) {
@@ -101,19 +108,25 @@ UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
     TAILQ_INSERT_TAIL(&mon->queue, n, listEntry);
     ++mon->queueSize;
 
-    /* Add to the subscription */
-    TAILQ_INSERT_TAIL(&sub->notificationQueue, n, globalEntry);
-    ++sub->notificationQueueSize;
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+    if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER &&
+       UA_Notification_isOverflowEvent(server, n))
+        ++mon->eventOverflows;
+#endif
 
+    /* Add to the subscription if reporting is enabled */
+    TAILQ_NEXT(n, globalEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL;
+    if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) {
+        TAILQ_INSERT_TAIL(&sub->notificationQueue, n, globalEntry);
+        ++sub->notificationQueueSize;
 #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
-    if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
-        ++sub->eventNotifications;
-        if(UA_Notification_isOverflowEvent(server, n))
-            ++mon->eventOverflows;
-    } else
+        if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
+            ++sub->eventNotifications;
+        } else
 #endif
-    {
-        ++sub->dataChangeNotifications;
+        {
+            ++sub->dataChangeNotifications;
+        }
     }
 
     /* Ensure enough space is available in the MonitoredItem. Do this only after
@@ -126,22 +139,28 @@ UA_Notification_dequeue(UA_Server *server, UA_Notification *n) {
     UA_MonitoredItem *mon = n->mon;
     UA_Subscription *sub = mon->subscription;
 
+    /* Remove from the MonitoredItem queue */
 #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
-    if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
-        --sub->eventNotifications;
-        if(UA_Notification_isOverflowEvent(server, n))
+    if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER &&
+       UA_Notification_isOverflowEvent(server, n))
             --mon->eventOverflows;
-    } else
 #endif
-    {
-        --sub->dataChangeNotifications;
-    }
-
     TAILQ_REMOVE(&mon->queue, n, listEntry);
     --mon->queueSize;
 
-    TAILQ_REMOVE(&sub->notificationQueue, n, globalEntry);
-    --sub->notificationQueueSize;
+    /* Remove from the subscription's queue */
+    if(TAILQ_NEXT(n, globalEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) {
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+        if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) {
+            --sub->eventNotifications;
+        } else
+#endif
+        {
+            --sub->dataChangeNotifications;
+        }
+        TAILQ_REMOVE(&sub->notificationQueue, n, globalEntry);
+        --sub->notificationQueueSize;
+    }
 }
 
 void
@@ -277,15 +296,19 @@ UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
 
         UA_assert(del); /* There must have been one entry that can be deleted */
 
-        /* Move the entry after del in the per-MonitoredItem queue right after
-         * del in the global queue. (It is already right after del in the
-         * per-MonitoredItem queue.) This is required so we don't starve
-         * MonitoredItems with a high sampling interval by always removing their
-         * first appearance in the gloal queue for the Subscription. */
-        UA_Notification *after_del = TAILQ_NEXT(del, listEntry);
-        UA_assert(after_del); /* There must be one remaining element after del */
-        TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
-        TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
+        /* If reporting is activated (entries are also in the subscriptions
+         * global queue): Move the entry after del in the per-MonitoredItem
+         * queue right after del in the global queue. (It is already right after
+         * del in the per-MonitoredItem queue.) This is required so we don't
+         * starve MonitoredItems with a high sampling interval by always
+         * removing their first appearance in the gloal queue for the
+         * Subscription. */
+        if(TAILQ_NEXT(del, globalEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) {
+            UA_Notification *after_del = TAILQ_NEXT(del, listEntry);
+            UA_assert(after_del); /* There must be one remaining element after del */
+            TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
+            TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
+        }
 
         /* Delete the notification */
         UA_Notification_dequeue(server, del);