Bladeren bron

Subscriptions: Split notifications dequeue and delete; Reusable UA_Notification_isOverflowEvent

Julius Pfrommer 6 jaren geleden
bovenliggende
commit
b975c7a3af

+ 3 - 2
src/server/ua_services_subscription.c

@@ -2,7 +2,7 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. 
  *
- *    Copyright 2014-2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
+ *    Copyright 2014-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  *    Copyright 2016-2017 (c) Florian Palm
  *    Copyright 2015 (c) Chris Iatrou
  *    Copyright 2015-2016 (c) Sten Grüner
@@ -550,7 +550,8 @@ Operation_SetMonitoringMode(UA_Server *server, UA_Session *session,
         /*  Setting the mode to DISABLED or SAMPLING causes all queued Notifications to be deleted */
         UA_Notification *notification, *notification_tmp;
         TAILQ_FOREACH_SAFE(notification, &mon->queue, listEntry, notification_tmp) {
-            UA_Notification_delete(smc->sub, mon, notification);
+            UA_Notification_dequeue(server, notification);
+            UA_Notification_delete(notification);
         }
 
         /* Initialize lastSampledValue */

+ 6 - 61
src/server/ua_subscription.c

@@ -21,55 +21,6 @@
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
 
-void
-UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
-                        UA_MonitoredItem *mon, UA_Notification *n) {
-    /* Add to the MonitoredItem */
-    TAILQ_INSERT_TAIL(&mon->queue, n, listEntry);
-    ++mon->queueSize;
-
-    /* Add to the subscription */
-    TAILQ_INSERT_TAIL(&sub->notificationQueue, n, globalEntry);
-    ++sub->notificationQueueSize;
-    if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
-        ++sub->dataChangeNotifications;
-    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
-        ++sub->eventNotifications;
-    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_STATUSNOTIFY) {
-        ++sub->statusChangeNotifications;
-    }
-
-    /* Ensure enough space is available in the MonitoredItem. Do this only after
-     * adding the new Notification. */
-    UA_MonitoredItem_ensureQueueSpace(server, mon);
-}
-
-void
-UA_Notification_delete(UA_Subscription *sub, UA_MonitoredItem *mon,
-                       UA_Notification *n) {
-    if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
-        UA_DataValue_deleteMembers(&n->data.value);
-        --sub->dataChangeNotifications;
-#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
-    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
-        UA_EventFieldList_deleteMembers(&n->data.event.fields);
-        /* EventFilterResult currently isn't being used
-         * UA_EventFilterResult_delete(notification->data.event->result); */
-        --sub->eventNotifications;
-#endif
-    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_STATUSNOTIFY) {
-        --sub->statusChangeNotifications;
-    }
-
-    TAILQ_REMOVE(&mon->queue, n, listEntry);
-    --mon->queueSize;
-
-    TAILQ_REMOVE(&sub->notificationQueue, n, globalEntry);
-    --sub->notificationQueueSize;
-
-    UA_free(n);
-}
-
 UA_Subscription *
 UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId) {
     /* Allocate the memory */
@@ -294,6 +245,9 @@ prepareNotificationMessage(UA_Server *server, UA_Subscription *sub,
         
         UA_MonitoredItem *mon = notification->mon;
 
+        /* Remove from the queues and decrease the counters */
+        UA_Notification_dequeue(server, notification);
+
         /* Move the content to the response */
         if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
             UA_assert(dcn != NULL); /* Have at least one change notification */
@@ -317,24 +271,15 @@ prepareNotificationMessage(UA_Server *server, UA_Subscription *sub,
             UA_EventFieldList_init(&notification->data.event.fields);
             efl->clientHandle = mon->clientHandle;
 
-            /* Check if this is an overflowEvent */
-            UA_NodeId overflowBaseId = UA_NODEID_NUMERIC(0, UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE);
-            if(efl->eventFieldsSize == 1 &&
-               efl->eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID] &&
-               isNodeInTree(&server->config.nodestore,
-                            (UA_NodeId *)efl->eventFields[0].data,
-                            &overflowBaseId, &subtypeId, 1)) {
-                --mon->eventOverflows;
-            }
             enlPos++;
         }
 #endif
         else {
-            continue; /* Nothing to do */
+            UA_Notification_delete(notification);
+            continue; /* Unknown type. Nothing to do */
         }
 
-        /* Remove the notification from the queues */
-        UA_Notification_delete(sub, mon, notification);
+        UA_Notification_delete(notification);
         totalNotifications++;
     }
 

+ 6 - 3
src/server/ua_subscription.h

@@ -76,9 +76,12 @@ typedef struct UA_Notification {
 void UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
                              UA_MonitoredItem *mon, UA_Notification *n);
 
-/* Delete the notification. Also removes it from the linked lists. */
-void UA_Notification_delete(UA_Subscription *sub, UA_MonitoredItem *mon,
-                            UA_Notification *n);
+/* Remove the notification from the MonitoredItem's queue and the Subscriptions
+ * global queue. Reduce the respective counters. */
+void UA_Notification_dequeue(UA_Server *server, UA_Notification *n);
+
+/* Delete the notification. Must be dequeued first. */
+void UA_Notification_delete(UA_Notification *n);
 
 typedef TAILQ_HEAD(NotificationQueue, UA_Notification) NotificationQueue;
 

+ 110 - 3
src/server/ua_subscription_monitoreditem.c

@@ -2,7 +2,7 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. 
  *
- *    Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
+ *    Copyright 2017-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  *    Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
  *    Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
@@ -14,6 +14,110 @@
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
 
+/****************/
+/* Notification */
+/****************/
+
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+
+static const UA_NodeId overflowEventType =
+    {0, UA_NODEIDTYPE_NUMERIC, {UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE}};
+
+static UA_Boolean
+UA_Notification_isOverflowEvent(UA_Server *server, UA_Notification *n) {
+    UA_MonitoredItem *mon = n->mon;
+    if(mon->monitoredItemType != UA_MONITOREDITEMTYPE_EVENTNOTIFY)
+        return false;
+
+    UA_EventFieldList *efl = &n->data.event.fields;
+    if(efl->eventFieldsSize == 1 &&
+       efl->eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID] &&
+       isNodeInTree(&server->config.nodestore,
+                    (const UA_NodeId *)efl->eventFields[0].data,
+                    &overflowEventType, &subtypeId, 1)) {
+        return true;
+    }
+
+    return false;
+}
+
+#endif
+
+void
+UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
+                        UA_MonitoredItem *mon, UA_Notification *n) {
+    /* Add to the MonitoredItem */
+    TAILQ_INSERT_TAIL(&mon->queue, n, listEntry);
+    ++mon->queueSize;
+
+    /* Add to the subscription */
+    TAILQ_INSERT_TAIL(&sub->notificationQueue, n, globalEntry);
+    ++sub->notificationQueueSize;
+
+    if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+        ++sub->dataChangeNotifications;
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
+        ++sub->eventNotifications;
+        if(UA_Notification_isOverflowEvent(server, n))
+            ++mon->eventOverflows;
+#endif
+    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_STATUSNOTIFY) {
+        ++sub->statusChangeNotifications;
+    }
+
+    /* Ensure enough space is available in the MonitoredItem. Do this only after
+     * adding the new Notification. */
+    UA_MonitoredItem_ensureQueueSpace(server, mon);
+}
+
+void
+UA_Notification_dequeue(UA_Server *server, UA_Notification *n) {
+    UA_MonitoredItem *mon = n->mon;
+    UA_Subscription *sub = mon->subscription;
+
+    if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+        --sub->dataChangeNotifications;
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
+        --sub->eventNotifications;
+        if(UA_Notification_isOverflowEvent(server, n))
+            --mon->eventOverflows;
+#endif
+    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_STATUSNOTIFY) {
+        --sub->statusChangeNotifications;
+    }
+
+    TAILQ_REMOVE(&mon->queue, n, listEntry);
+    --mon->queueSize;
+
+    TAILQ_REMOVE(&sub->notificationQueue, n, globalEntry);
+    --sub->notificationQueueSize;
+}
+
+void
+UA_Notification_delete(UA_Notification *n) {
+    UA_MonitoredItem *mon = n->mon;
+
+    if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+        UA_DataValue_deleteMembers(&n->data.value);
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
+        UA_EventFieldList_deleteMembers(&n->data.event.fields);
+        /* EventFilterResult currently isn't being used
+         * UA_EventFilterResult_delete(notification->data.event->result); */
+#endif
+    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_STATUSNOTIFY) {
+        /* Nothing to do */
+    }
+
+    UA_free(n);
+}
+
+/*****************/
+/* MonitoredItem */
+/*****************/
+
 void
 UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
     memset(mon, 0, sizeof(UA_MonitoredItem));
@@ -35,18 +139,21 @@ UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
                      "are not supported yet");
     }
 
-    /* Remove the queued notifications if attached to a subscription */
+    /* Remove the queued notifications if attached to a subscription (not a
+     * local MonitoredItem) */
     if(monitoredItem->subscription) {
         UA_Notification *notification, *notification_tmp;
         TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
                            listEntry, notification_tmp) {
             /* Remove the item from the queues and free the memory */
-            UA_Notification_delete(sub, monitoredItem, notification);
+            UA_Notification_dequeue(server, notification);
+            UA_Notification_delete(notification);
         }
     }
 
     /* if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY)
      * -> UA_DataChangeFilter does not hold dynamic content we need to free */
+
 #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
     if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
         /* Remove the monitored item from the node queue */