Sfoglia il codice sorgente

Subscriptions: Track number of different Notification types via UA_Notification_enqueue

Julius Pfrommer 6 anni fa
parent
commit
682b61a9f8

+ 1 - 5
src/server/ua_services_subscription.c

@@ -510,12 +510,8 @@ Operation_SetMonitoringMode(UA_Server *server, UA_Session *session,
         /*  Setting the mode to DISABLED or SAMPLING causes all queued Notifications to be deleted */
         UA_Notification *notification, *notification_tmp;
         TAILQ_FOREACH_SAFE(notification, &mon->queue, listEntry, notification_tmp) {
-            TAILQ_REMOVE(&mon->queue, notification, listEntry);
-            TAILQ_REMOVE(&smc->sub->notificationQueue, notification, globalEntry);
-            --smc->sub->notificationQueueSize;
-            UA_Notification_delete(notification);
+            UA_Notification_delete(smc->sub, mon, notification);
         }
-        mon->queueSize = 0;
 
         /* Initialize lastSampledValue */
         UA_ByteString_deleteMembers(&mon->lastSampledValue);

+ 54 - 23
src/server/ua_subscription.c

@@ -20,16 +20,50 @@
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
 
-void UA_Notification_delete(UA_Notification *n) {
-    if(n->mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+void
+UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
+                        UA_MonitoredItem *mon, UA_Notification *n) {
+    /* Add to the MonitoredItem */
+    TAILQ_INSERT_TAIL(&mon->queue, n, listEntry);
+    ++mon->queueSize;
+
+    /* Add to the subscription */
+    TAILQ_INSERT_TAIL(&sub->notificationQueue, n, globalEntry);
+    ++mon->subscription->notificationQueueSize;
+    if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+        ++sub->dataChangeNotifications;
+    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
+        ++sub->eventNotifications;
+    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_STATUSNOTIFY) {
+        ++sub->statusChangeNotifications;
+    }
+
+    /* Ensure enough space is available in the MonitoredItem. Do this only after
+     * adding the new Notification. */
+    MonitoredItem_ensureQueueSpace(server, mon);
+}
+
+void
+UA_Notification_delete(UA_Subscription *sub, UA_MonitoredItem *mon,
+                       UA_Notification *n) {
+    if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
         UA_DataValue_deleteMembers(&n->data.value);
-    } else if (n->mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
-        UA_Array_delete(n->data.event.fields.eventFields, n->data.event.fields.eventFieldsSize,
-                        &UA_TYPES[UA_TYPES_VARIANT]);
+        --sub->dataChangeNotifications;
+    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
+        UA_EventFieldList_deleteMembers(&n->data.event.fields);
         /* EventFilterResult currently isn't being used
-         * UA_EventFilterResult_delete(notification->data.event->result);
-         */
+         * UA_EventFilterResult_delete(notification->data.event->result); */
+        --sub->eventNotifications;
+    } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_STATUSNOTIFY) {
+        --sub->statusChangeNotifications;
     }
+
+    TAILQ_REMOVE(&mon->queue, n, listEntry);
+    --mon->queueSize;
+
+    TAILQ_REMOVE(&sub->notificationQueue, n, globalEntry);
+    --sub->notificationQueueSize;
+
     UA_free(n);
 }
 
@@ -155,7 +189,7 @@ UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequ
 /* EventChange: Iterate over the monitoredItems of the subscription, starting at mon, and
  *              move notifications into the response. */
 static void
-Events_moveNotificationsFromMonitoredItems(UA_Server *server, UA_Subscription *sub, UA_EventFieldList *efls,
+Events_moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_EventFieldList *efls,
                                            size_t eflsSize) {
     size_t pos = 0;
     UA_Notification *notification, *notification_tmp;
@@ -165,10 +199,6 @@ Events_moveNotificationsFromMonitoredItems(UA_Server *server, UA_Subscription *s
 
         UA_MonitoredItem *mon = notification->mon;
 
-        /* Remove the notification from the queues */
-        TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
-        TAILQ_REMOVE(&mon->queue, notification, listEntry);
-
         /* removing an overflowEvent should not reduce the queueSize */
         UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
         if (!(notification->data.event.fields.eventFieldsSize == 1
@@ -180,11 +210,14 @@ Events_moveNotificationsFromMonitoredItems(UA_Server *server, UA_Subscription *s
 
         /* Move the content to the response */
         efls[pos] = notification->data.event.fields;
+        UA_EventFieldList_init(&notification->data.event.fields);
         efls[pos].clientHandle = mon->clientHandle;
 
         /* EventFilterResult currently isn't being used
         UA_EventFilterResult_deleteMembers(&notification->data.event.result); */
-        UA_free(notification);
+
+        /* Remove the notification from the queues */
+        UA_Notification_delete(sub, mon, notification);
 
         pos++; /* Increase the index */
     }
@@ -204,18 +237,16 @@ DataChange_moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_Monitore
 
         UA_MonitoredItem *mon = notification->mon;
 
-        /* Remove the notification from the queues */
-        TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
-        TAILQ_REMOVE(&mon->queue, notification, listEntry);
-        --mon->queueSize;
-        --sub->notificationQueueSize;
-
         /* Move the content to the response */
         UA_MonitoredItemNotification *min = &mins[pos];
         min->clientHandle = mon->clientHandle;
         min->value = notification->data.value;
-        UA_free(notification);
-        ++pos;
+        UA_DataValue_init(&notification->data.value); /* Reset after the value has been moved */
+
+        /* Remove the notification from the queues */
+        UA_Notification_delete(sub, mon, notification);
+
+        ++pos; /* Increase the index */
     }
 }
 
@@ -277,7 +308,7 @@ prepareNotificationMessage(UA_Server *server, UA_Subscription *sub, UA_Notificat
         enl->eventsSize = notifications;
 
         /* Move the list into the response .. the point of no return */
-        Events_moveNotificationsFromMonitoredItems(server, sub, enl->events, notifications);
+        Events_moveNotificationsFromMonitoredItems(sub, enl->events, notifications);
     }
 #endif
     else {
@@ -326,7 +357,7 @@ UA_Subscription_publish(UA_Server *server, UA_Subscription *sub) {
         }
     }
 
-    if (sub->readyNotifications > sub->notificationQueueSize)
+    if(sub->readyNotifications > sub->notificationQueueSize)
         sub->readyNotifications = sub->notificationQueueSize;
 
     /* Count the available notifications */

+ 12 - 4
src/server/ua_subscription.h

@@ -41,7 +41,6 @@ typedef enum {
     UA_MONITOREDITEMTYPE_EVENTNOTIFY = 4
 } UA_MonitoredItemType;
 
-
 struct UA_MonitoredItem;
 typedef struct UA_MonitoredItem UA_MonitoredItem;
 
@@ -64,8 +63,14 @@ typedef struct UA_Notification {
     } data;
 } UA_Notification;
 
-/* Clean up the notification. Must be removed from the lists first. */
-void UA_Notification_delete(UA_Notification *n);
+/* Ensure enough space is available; Add notification to the linked lists;
+ * Increase the counters */
+void UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
+                             UA_MonitoredItem *mon, UA_Notification *n);
+
+/* Delete the notification. Also removes it from the linked lists. */
+void UA_Notification_delete(UA_Subscription *sub, UA_MonitoredItem *mon,
+                            UA_Notification *n);
 
 typedef TAILQ_HEAD(NotificationQueue, UA_Notification) NotificationQueue;
 
@@ -165,7 +170,10 @@ struct UA_Subscription {
 
     /* Global list of notifications from the MonitoredItems */
     NotificationQueue notificationQueue;
-    UA_UInt32 notificationQueueSize;
+    UA_UInt32 notificationQueueSize; /* Total queue size */
+    UA_UInt32 dataChangeNotifications;
+    UA_UInt32 eventNotifications;
+    UA_UInt32 statusChangeNotifications;
     UA_UInt32 readyNotifications; /* Notifications to be sent out now (already late) */
 
     /* Retransmission Queue */

+ 12 - 30
src/server/ua_subscription_datachange.c

@@ -65,13 +65,9 @@ void UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem)
         UA_Notification *notification, *notification_tmp;
         TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
                            listEntry, notification_tmp) {
-            /* Remove the item from the queues */
-            TAILQ_REMOVE(&monitoredItem->queue, notification, listEntry);
-            TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
-            --sub->notificationQueueSize;
-            UA_Notification_delete(notification);
+            /* Remove the item from the queues and free the memory */
+            UA_Notification_delete(sub, monitoredItem, notification);
         }
-        monitoredItem->queueSize = 0;
     }
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
@@ -128,27 +124,18 @@ MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
         TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
         TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
 
-        /* Remove the notification from the queues */
-        TAILQ_REMOVE(&mon->queue, del, listEntry);
-        TAILQ_REMOVE(&sub->notificationQueue, del, globalEntry);
-
 #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
         /* TODO: provide additional protection for overflowEvents according to specification */
         /* removing an overflowEvent should not reduce the queueSize */
         UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
-        if(!(del->data.event.fields.eventFieldsSize == 1 &&
-             del->data.event.fields.eventFields->type == &UA_TYPES[UA_TYPES_NODEID] &&
-             UA_NodeId_equal((UA_NodeId *)del->data.event.fields.eventFields->data, &overflowId))) {
-            --mon->queueSize;
-            --sub->notificationQueueSize;
+        if(del->data.event.fields.eventFieldsSize == 1 &&
+           del->data.event.fields.eventFields->type == &UA_TYPES[UA_TYPES_NODEID] &&
+           UA_NodeId_equal((UA_NodeId *)del->data.event.fields.eventFields->data, &overflowId)) {
+            ++mon->queueSize;
+            ++sub->notificationQueueSize;
         }
-#else
-        --mon->queueSize;
-        --sub->notificationQueueSize;
-#endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
 
         /* Create an overflow notification */
-#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
         if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
             /* EventFilterResult currently isn't being used
             UA_EventFilterResult_deleteMembers(&del->data.event->result); */
@@ -187,8 +174,9 @@ MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
         }
 #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
 
-        /* Free the notification */
-        UA_Notification_delete(del);
+        /* Delete the notification. This also removes the notification from the
+         * linked lists. */
+        UA_Notification_delete(sub, mon, del);
     }
 
     if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
@@ -432,14 +420,8 @@ sampleCallbackWithValue(UA_Server *server, UA_MonitoredItem *monitoredItem,
         newNotification->data.value = *value; /* Move the value to the notification */
         storedValue = true;
 
-        /* Add the notification to the end of local and global queue */
-        TAILQ_INSERT_TAIL(&monitoredItem->queue, newNotification, listEntry);
-        TAILQ_INSERT_TAIL(&sub->notificationQueue, newNotification, globalEntry);
-        ++monitoredItem->queueSize;
-        ++sub->notificationQueueSize;
-
-        /* Remove some notifications if the queue is beyond maximum capacity */
-        MonitoredItem_ensureQueueSpace(server, monitoredItem);
+        /* Enqueue the new notification */
+        UA_Notification_enqueue(server, sub, monitoredItem, newNotification);
     } else {
         /* Call the local callback if not attached to a subscription */
         UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*) monitoredItem;

+ 1 - 7
src/server/ua_subscription_events.c

@@ -399,13 +399,7 @@ UA_Event_addEventToMonitoredItem(UA_Server *server, const UA_NodeId *event,
     }
     notification->mon = mon;
 
-    /* add to the monitored item queue */
-    MonitoredItem_ensureQueueSpace(server, mon);
-    TAILQ_INSERT_TAIL(&mon->queue, notification, listEntry);
-    ++mon->queueSize;
-    /* add to the subscription queue */
-    TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue, notification, globalEntry);
-    ++mon->subscription->notificationQueueSize;
+    UA_Notification_enqueue(server, mon->subscription, mon, notification);
     return UA_STATUSCODE_GOOD;
 }