Explorar el Código

Add support for EventQueueOverflowEvents (#1829)

* add support for overflowEvents

the amount of overflowEvents in a monitoredItem are saved as a separate counter. overflowEvents are not treated differently for subscriptions.

* add unit tests for overflow events

* general improvements

* check all overflowEvents

Instead of only checking if an event is of SimpleOverflowEvent type check if it is of any subtype of BaseEvent
Ari hace 6 años
padre
commit
c6c5b9e294

+ 0 - 12
src/server/ua_subscription.c

@@ -297,23 +297,11 @@ prepareNotificationMessage(UA_Server *server, UA_Subscription *sub,
         } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY && enl) {
             UA_assert(enl != NULL); /* Have at least one event notification */
 
-            /* TODO: The following lead to crashes when we assumed notifications to be ready... */
-            /* /\* removing an overflowEvent should not reduce the queueSize *\/ */
-            /* UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE); */
-            /* if (!(notification->data.event.fields.eventFieldsSize == 1 */
-            /*       && notification->data.event.fields.eventFields->type == &UA_TYPES[UA_TYPES_NODEID] */
-            /*       && UA_NodeId_equal((UA_NodeId *)notification->data.event.fields.eventFields->data, &overflowId))) { */
-            /*     --mon->queueSize; */
-            /*     --sub->notificationQueueSize; */
-            /* } */
-
             /* Move the content to the response */
             UA_EventFieldList *efl = &enl->events[enlPos];
             *efl = notification->data.event.fields;
             UA_EventFieldList_init(&notification->data.event.fields);
             efl->clientHandle = mon->clientHandle;
-            /* EventFilterResult currently isn't being used
-               UA_EventFilterResult_deleteMembers(&notification->data.event.result); */
             enlPos++;
         }
 #endif

+ 2 - 0
src/server/ua_subscription.h

@@ -113,6 +113,8 @@ struct UA_MonitoredItem {
     /* Notification Queue */
     NotificationQueue queue;
     UA_UInt32 queueSize;
+     /* Save the amount of OverflowEvents in a separate counter */
+     UA_UInt32 eventOverflows;
 #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
     UA_MonitoredItem *next;
 #endif

+ 62 - 42
src/server/ua_subscription_datachange.c

@@ -92,12 +92,12 @@ void UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem)
 
 UA_StatusCode
 MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
-    if(mon->queueSize <= mon->maxQueueSize)
+    if(mon->queueSize - mon->eventOverflows <= mon->maxQueueSize)
         return UA_STATUSCODE_GOOD;
 
     /* Remove notifications until the queue size is reached */
     UA_Subscription *sub = mon->subscription;
-    while(mon->queueSize > mon->maxQueueSize) {
+    while(mon->queueSize - mon->eventOverflows > mon->maxQueueSize) {
         UA_assert(mon->queueSize >= 2); /* At least two Notifications in the queue */
 
         /* Make sure that the MonitoredItem does not lose its place in the
@@ -128,46 +128,66 @@ MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
         /* Create an overflow notification */
-        /* if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) { */
-        /*     /\* EventFilterResult currently isn't being used */
-        /*     UA_EventFilterResult_deleteMembers(&del->data.event->result); *\/ */
-        /*     UA_EventFieldList_deleteMembers(&del->data.event.fields); */
-
-        /*     /\* cause an overflowEvent *\/ */
-        /*     /\* an overflowEvent does not care about event filters and as such */
-        /*      * will not be "triggered" correctly. Instead, a notification will */
-        /*      * be inserted into the queue which includes only the nodeId of the */
-        /*      * overflowEventType. It is up to the client to check for possible */
-        /*      * overflows. *\/ */
-        /*     UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification)); */
-        /*     if(!overflowNotification) */
-        /*         return UA_STATUSCODE_BADOUTOFMEMORY; */
-
-        /*     UA_EventFieldList_init(&overflowNotification->data.event.fields); */
-        /*     overflowNotification->data.event.fields.eventFields = UA_Variant_new(); */
-        /*     if(!overflowNotification->data.event.fields.eventFields) { */
-        /*         UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields); */
-        /*         UA_free(overflowNotification); */
-        /*         return UA_STATUSCODE_BADOUTOFMEMORY; */
-        /*     } */
-
-        /*     UA_Variant_init(overflowNotification->data.event.fields.eventFields); */
-        /*     overflowNotification->data.event.fields.eventFieldsSize = 1; */
-        /*     UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE); */
-        /*     UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields, */
-        /*                              &overflowId, &UA_TYPES[UA_TYPES_NODEID]); */
-        /*     overflowNotification->mon = mon; */
-        /*     if(mon->discardOldest) { */
-        /*         TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry); */
-        /*         TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue, overflowNotification, globalEntry); */
-        /*     } else { */
-        /*         TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry); */
-        /*         TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue, overflowNotification, globalEntry); */
-        /*     } */
-        /*     ++mon->queueSize; */
-        /*     ++sub->notificationQueueSize; */
-        /*     ++sub->eventNotifications; */
-        /* } */
+         if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
+             /* check if an overflowEvent is being deleted
+              * TODO: make sure overflowEvents are never deleted */
+             UA_NodeId overflowBaseId = UA_NODEID_NUMERIC(0, UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE);
+             UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
+
+             /* Check if an OverflowEvent is being deleted */
+             if (del->data.event.fields.eventFieldsSize == 1
+                 && del->data.event.fields.eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID]
+                 && isNodeInTree(&server->config.nodestore, (UA_NodeId*)del->data.event.fields.eventFields[0].data,
+                                 &overflowBaseId, &subtypeId, 1)) {
+                 /* Don't do anything, since adding and removing an overflow will not change anything */
+                 return UA_STATUSCODE_GOOD;
+             }
+
+             /* cause an overflowEvent */
+             /* an overflowEvent does not care about event filters and as such
+              * will not be "triggered" correctly. Instead, a notification will
+              * be inserted into the queue which includes only the nodeId of the
+              * overflowEventType. It is up to the client to check for possible
+              * overflows. */
+             UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification));
+             if(!overflowNotification)
+                 return UA_STATUSCODE_BADOUTOFMEMORY;
+
+             UA_EventFieldList_init(&overflowNotification->data.event.fields);
+             overflowNotification->data.event.fields.eventFields = UA_Variant_new();
+             if(!overflowNotification->data.event.fields.eventFields) {
+                 UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
+                 UA_free(overflowNotification);
+                 return UA_STATUSCODE_BADOUTOFMEMORY;
+             }
+
+             overflowNotification->data.event.fields.eventFieldsSize = 1;
+             UA_StatusCode retval = UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
+                                      &overflowId, &UA_TYPES[UA_TYPES_NODEID]);
+             if (retval != UA_STATUSCODE_GOOD) {
+                 UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
+                 UA_free(overflowNotification);
+                 return retval;
+             }
+
+             overflowNotification->mon = mon;
+             if(mon->discardOldest) {
+                 TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry);
+                 TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue, overflowNotification, globalEntry);
+             } else {
+                 TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry);
+                 TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue, overflowNotification, globalEntry);
+             }
+
+
+             /* The amount of notifications in the subscription don't change. The specification
+              * only states that the queue size in each MonitoredItem isn't affected by OverflowEvents.
+              * Since they are reduced in Notification_delete the queues are increased here, so they
+              * will remain the same in the end.
+              */
+             ++sub->notificationQueueSize;
+             ++sub->eventNotifications;
+         }
 #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
 
         /* Delete the notification. This also removes the notification from the

+ 21 - 18
tests/server/check_subscription_events.c

@@ -27,6 +27,7 @@ static UA_UInt32 monitoredItemId;
 static UA_NodeId eventType;
 static size_t nSelectClauses = 4;
 static UA_Boolean notificationReceived;
+static UA_Boolean overflowNotificationReceived;
 static UA_SimpleAttributeOperand *selectClauses;
 
 UA_Double publishingInterval = 500.0;
@@ -361,18 +362,21 @@ START_TEST(uppropagation) {
 }
 END_TEST
 
-/*
 static void
 handler_events_overflow(UA_Client *lclient, UA_UInt32 subId, void *subContext,
                         UA_UInt32 monId, void *monContext,
                         size_t nEventFields, UA_Variant *eventFields) {
-    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Event overflow was found");
     ck_assert_uint_eq(*(UA_UInt32 *) monContext, monitoredItemId);
-    ck_assert_uint_eq(nEventFields, 1);
-    ck_assert(eventFields->type == &UA_TYPES[UA_TYPES_NODEID]);
-    UA_NodeId comp = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
-    ck_assert((UA_NodeId_equal((UA_NodeId *)eventFields->data, &comp)));
-    notificationReceived = true;
+    if (nEventFields == 1) {
+        /* overflow was received */
+        ck_assert(eventFields->type == &UA_TYPES[UA_TYPES_NODEID]);
+        UA_NodeId comp = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
+        ck_assert((UA_NodeId_equal((UA_NodeId *)eventFields->data, &comp)));
+        overflowNotificationReceived = UA_TRUE;
+    } else if (nEventFields == 4) {
+        /* other event was received */
+        handler_events_simple(lclient, subId, subContext, monId, monContext, nEventFields, eventFields);
+    }
 }
 
 // ensures an eventQueueOverflowEvent is published when appropriate
@@ -384,21 +388,21 @@ START_TEST(eventOverflow)
 
         // trigger first event
         UA_NodeId eventNodeId;
-        UA_StatusCode retval = UA_STATUSCODE_GOOD;
-        for (int i = 0; i < 3; i++) {
-            retval = eventSetup(&eventNodeId);
-            ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
-            retval = UA_Server_triggerEvent(server, eventNodeId, UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER), NULL, UA_TRUE);
-            ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
-        }
-
+        UA_StatusCode retval = eventSetup(&eventNodeId);
+        ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+        retval = UA_Server_triggerEvent(server, eventNodeId, UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER), NULL, UA_FALSE);
+        ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+        retval = UA_Server_triggerEvent(server, eventNodeId, UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER), NULL, UA_TRUE);
+        ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
 
-        // fetch the events
+        // fetch the events, ensure both the overflow and the original event are received
         notificationReceived = false;
+        overflowNotificationReceived = true;
         UA_fakeSleep((UA_UInt32) publishingInterval + 100);
         retval = UA_Client_run_iterate(client, 0);
         ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
         ck_assert_uint_eq(notificationReceived, true);
+        ck_assert_uint_eq(overflowNotificationReceived, true);
         ck_assert_uint_eq(createResult.revisedQueueSize, 1);
 
         // delete the monitoredItem
@@ -417,7 +421,6 @@ START_TEST(eventOverflow)
         UA_DeleteMonitoredItemsResponse_deleteMembers(&deleteResponse);
     }
 END_TEST
-*/
 
 #endif // UA_ENABLE_SUBSCRIPTIONS_EVENTS
 
@@ -429,7 +432,7 @@ static Suite *testSuite_Client(void) {
     tcase_add_checked_fixture(tc_server, setup, teardown);
     tcase_add_test(tc_server, generateEvents);
     tcase_add_test(tc_server, uppropagation);
-//    tcase_add_test(tc_server, eventOverflow);
+    tcase_add_test(tc_server, eventOverflow);
 #endif // UA_ENABLE_SUBSCRIPTIONS_EVENTS
     suite_add_tcase(s, tc_server);