浏览代码

Subscriptions: Refactor handling of overflow events

Julius Pfrommer 6 年之前
父节点
当前提交
a364554e32
共有 1 个文件被更改,包括 95 次插入103 次删除
  1. 95 103
      src/server/ua_subscription_monitoreditem.c

+ 95 - 103
src/server/ua_subscription_monitoreditem.c

@@ -22,6 +22,8 @@
 
 static const UA_NodeId overflowEventType =
     {0, UA_NODEIDTYPE_NUMERIC, {UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE}};
+static const UA_NodeId simpleOverflowEventType =
+    {0, UA_NODEIDTYPE_NUMERIC, {UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE}};
 
 static UA_Boolean
 UA_Notification_isOverflowEvent(UA_Server *server, UA_Notification *n) {
@@ -41,6 +43,34 @@ UA_Notification_isOverflowEvent(UA_Server *server, UA_Notification *n) {
     return false;
 }
 
+static UA_Notification *
+createEventOverflowNotification(UA_MonitoredItem *mon) {
+    UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification));
+    if(!overflowNotification)
+        return NULL;
+
+    overflowNotification->mon = mon;
+    UA_EventFieldList_init(&overflowNotification->data.event.fields);
+    overflowNotification->data.event.fields.eventFields = UA_Variant_new();
+    if(!overflowNotification->data.event.fields.eventFields) {
+        UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
+        UA_free(overflowNotification);
+        return NULL;
+    }
+
+    overflowNotification->data.event.fields.eventFieldsSize = 1;
+    UA_StatusCode retval =
+        UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
+                                 &simpleOverflowEventType, &UA_TYPES[UA_TYPES_NODEID]);
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
+        UA_free(overflowNotification);
+        return NULL;
+    }
+
+    return overflowNotification;
+}
+
 #endif
 
 void
@@ -204,112 +234,81 @@ UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
     /* Remove notifications until the queue size is reached */
     UA_Subscription *sub = mon->subscription;
     while(mon->queueSize - mon->eventOverflows > mon->maxQueueSize) {
-        UA_assert(mon->queueSize >= 2); /* At least two Notifications in the queue */
-
-        /* Make sure that the MonitoredItem does not lose its place in the
-         * global queue when notifications are removed. Otherwise the
-         * MonitoredItem can "starve" itself by putting new notifications always
-         * at the end of the global queue and removing the old ones.
-         *
-         * - If the oldest notification is removed, put the second oldest
-         *   notification right behind it.
-         * - If the newest notification is removed, put the new notification
-         *   right behind it. */
-
-        UA_Notification *del; /* The notification that will be deleted */
-        UA_Notification *after_del; /* The notification to keep and move after del */
+        /* At least two notifications that are not eventOverflows in the queue */
+        UA_assert(mon->queueSize - mon->eventOverflows >= 2);
+
+        /* Select the next notification to delete. Skip over overflow events. */
+        UA_Notification *del;
         if(mon->discardOldest) {
             /* Remove the oldest */
             del = TAILQ_FIRST(&mon->queue);
-            after_del = TAILQ_NEXT(del, listEntry);
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+            while(UA_Notification_isOverflowEvent(server, del))
+                del = TAILQ_NEXT(del, listEntry); /* skip overflow events */
+#endif
         } else {
             /* Remove the second newest (to keep the up-to-date notification) */
-            after_del = TAILQ_LAST(&mon->queue, NotificationQueue);
-            del = TAILQ_PREV(after_del, NotificationQueue, listEntry);
-        }
-
-        /* Move after_del right after del in the global queue */
-        TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
-        TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
-
+            del = TAILQ_LAST(&mon->queue, NotificationQueue);
+            del = TAILQ_PREV(del, NotificationQueue, listEntry);
 #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
-        /* Check if an OverflowEvent is being deleted */
-
-        /* TODO: EventOverflows should never be deleted */
-        UA_NodeId overflowBaseId = UA_NODEID_NUMERIC(0, UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE);
-        if(del->data.event.fields.eventFieldsSize == 1 &&
-           del->data.event.fields.eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID] &&
-           isNodeInTree(&server->config.nodestore,
-                        (UA_NodeId*)del->data.event.fields.eventFields[0].data,
-                        &overflowBaseId, &subtypeId, 1)) {
-            --mon->eventOverflows;
-        }
+            while(UA_Notification_isOverflowEvent(server, del))
+                del = TAILQ_PREV(del, NotificationQueue, listEntry); /* skip overflow events */
 #endif
+        }
+        UA_assert(del);
+
+        /* Move after_del right after del in the global queue. (It is already
+         * right after del in the per-MonitoredItem queue.) This is required so
+         * we don't starve MonitoredItems with a high sampling interval by
+         * always removing their first appearance in the gloal queue for the
+         * Subscription. */
+        UA_Notification *after_del = TAILQ_NEXT(del, listEntry);
+        if(after_del) {
+            TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
+            TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
+        }
 
-        /* Delete the notification. This also removes the notification from the
-         * linked lists. */
-        UA_Notification_delete(sub, mon, del);
+        /* Delete the notification */
+        UA_Notification_dequeue(server, del);
+        UA_Notification_delete(del);
     }
 
-#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+    /* Get the element where the overflow shall be announced (infobits or
+     * overflowevent) */
+    UA_Notification *indicator;
+    if(mon->discardOldest)
+        indicator = TAILQ_FIRST(&mon->queue);
+    else
+        indicator = TAILQ_LAST(&mon->queue, NotificationQueue);
+    UA_assert(indicator);
+
     /* Create an overflow notification */
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
     /* The specification states in Part 4 5.12.1.5 that an EventQueueOverflowEvent
      * "is generated when the first Event has to be discarded [...] without discarding
      * any other event". So only generate one for all deleted events. */
-    if (mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
-        /* check if an overflowEvent is being deleted
-         * TODO: make sure overflowEvents are never deleted */
-        UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
-
-        /* an overflowEvent does not care about event filters and as such
-         * will not be "triggered" correctly. Instead, a notification will
-         * be inserted into the queue which includes only the nodeId of the
-         * overflowEventType. It is up to the client to check for possible
-         * overflows. */
-        UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification));
-        if (!overflowNotification)
-            return UA_STATUSCODE_BADOUTOFMEMORY;
-
-        UA_EventFieldList_init(&overflowNotification->data.event.fields);
-        overflowNotification->data.event.fields.eventFields = UA_Variant_new();
-        if (!overflowNotification->data.event.fields.eventFields) {
-            UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
-            UA_free(overflowNotification);
-            return UA_STATUSCODE_BADOUTOFMEMORY;
+    if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
+        /* Avoid two redundant overflow events in a row */
+        if(UA_Notification_isOverflowEvent(server, indicator)) {
+            if(mon->discardOldest)
+                return UA_STATUSCODE_GOOD;
+            UA_Notification *prev = TAILQ_PREV(indicator, NotificationQueue, listEntry);
+            if(prev && UA_Notification_isOverflowEvent(server, prev))
+                return UA_STATUSCODE_GOOD;
         }
 
-        overflowNotification->data.event.fields.eventFieldsSize = 1;
-        UA_StatusCode retval =
-                UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
-                                         &overflowId, &UA_TYPES[UA_TYPES_NODEID]);
-        if (retval != UA_STATUSCODE_GOOD) {
-            UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
-            UA_free(overflowNotification);
-            return retval;
-        }
+        /* A notification is inserted into the queue which includes only the
+         * NodeId of the overflowEventType. It is up to the client to check for
+         * possible overflows. */
+        UA_Notification *overflowNotification = createEventOverflowNotification(mon);
+        if(!overflowNotification)
+            return UA_STATUSCODE_BADOUTOFMEMORY;
 
-        overflowNotification->mon = mon;
-
-        /* The amount of notifications in the subscription don't change. The specification
-         * only states that the queue size in each MonitoredItem isn't affected by OverflowEvents.
-         * (In this case the queue in the MonitoredItemQueue IS affected internally because externally
-         * the queueSize will always appear with eventOverflows subtracted from it)
-         *
-         * Since they are reduced in Notification_delete the queues are increased here, so they
-         * will remain the same in the end.
-         *
-         * Do not use Notification_enqueue to insert the notification into the queues, since this would
-         * cause a bad recursive call of this function.
-         */
-        if (mon->discardOldest) {
-            TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry);
-            TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue,
-                              overflowNotification, globalEntry);
-        } else {
-            TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry);
-            TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue,
-                              overflowNotification, globalEntry);
-        }
+        /* Insert before the "indicator notification". This is either first in
+         * the queue (if the oldest notification was removed) or before the new
+         * event that remains the last element of the queue. */
+        TAILQ_INSERT_BEFORE(indicator, overflowNotification, listEntry);
+        TAILQ_INSERT_BEFORE(indicator, overflowNotification, globalEntry);
         ++mon->eventOverflows;
         ++mon->queueSize;
         ++sub->notificationQueueSize;
@@ -317,28 +316,21 @@ UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
     }
 #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
 
+    /* Set the infobits of a datachange notification */
     if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
-        /* Get the element that carries the infobits */
-        UA_Notification *notification = NULL;
-        if(mon->discardOldest)
-            notification = TAILQ_FIRST(&mon->queue);
-        else
-            notification = TAILQ_LAST(&mon->queue, NotificationQueue);
-        UA_assert(notification);
-
+        /* Set the infobits */
         if(mon->maxQueueSize > 1) {
             /* Add the infobits either to the newest or the new last entry */
-            notification->data.value.hasStatus = true;
-            notification->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
-                                                UA_STATUSCODE_INFOBITS_OVERFLOW);
+            indicator->data.value.hasStatus = true;
+            indicator->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
+                                             UA_STATUSCODE_INFOBITS_OVERFLOW);
         } else {
             /* If the queue size is reduced to one, remove the infobits */
-            notification->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
-                                                                UA_STATUSCODE_INFOBITS_OVERFLOW);
+            indicator->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
+                                                             UA_STATUSCODE_INFOBITS_OVERFLOW);
         }
     }
 
-    /* TODO: Infobits for Events? */
     return UA_STATUSCODE_GOOD;
 }