瀏覽代碼

Subscriptions: Fix sending EventNotifications and DataChangeNotifications in one message

Julius Pfrommer 6 年之前
父節點
當前提交
68c60fc22f
共有 2 個文件被更改,包括 178 次插入155 次删除
  1. 138 109
      src/server/ua_subscription.c
  2. 40 46
      src/server/ua_subscription_datachange.c

+ 138 - 109
src/server/ua_subscription.c

@@ -29,7 +29,7 @@ UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
 
     /* Add to the subscription */
     TAILQ_INSERT_TAIL(&sub->notificationQueue, n, globalEntry);
-    ++mon->subscription->notificationQueueSize;
+    ++sub->notificationQueueSize;
     if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
         ++sub->dataChangeNotifications;
     } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
@@ -185,135 +185,164 @@ UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequ
     return UA_STATUSCODE_GOOD;
 }
 
-#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
-/* EventChange: Iterate over the monitoredItems of the subscription, starting at mon, and
- *              move notifications into the response. */
-static void
-Events_moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_EventFieldList *efls,
-                                           size_t eflsSize) {
-    size_t pos = 0;
-    UA_Notification *notification, *notification_tmp;
-    TAILQ_FOREACH_SAFE(notification, &sub->notificationQueue, globalEntry, notification_tmp) {
-        if(pos >= eflsSize)
-            break;
-
-        UA_MonitoredItem *mon = notification->mon;
+static UA_StatusCode
+prepareNotificationMessage(UA_Server *server, UA_Subscription *sub,
+                           UA_NotificationMessage *message, size_t notifications) {
+    UA_assert(notifications > 0);
 
-        /* removing an overflowEvent should not reduce the queueSize */
-        UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
-        if (!(notification->data.event.fields.eventFieldsSize == 1
-                && notification->data.event.fields.eventFields->type == &UA_TYPES[UA_TYPES_NODEID]
-                && UA_NodeId_equal((UA_NodeId *)notification->data.event.fields.eventFields->data, &overflowId))) {
-            --mon->queueSize;
-            --sub->notificationQueueSize;
+    /* Allocate an ExtensionObject for events and data */
+    message->notificationData = (UA_ExtensionObject*)
+        UA_Array_new(2, &UA_TYPES[UA_TYPES_EXTENSIONOBJECT]);
+    if(!message->notificationData)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    message->notificationDataSize = 2;
+
+    /* Pre-allocate DataChangeNotifications */
+    size_t notificationDataIdx = 0;
+    UA_DataChangeNotification *dcn = NULL;
+    if(sub->dataChangeNotifications > 0) {
+        dcn = UA_DataChangeNotification_new();
+        if(!dcn) {
+            UA_NotificationMessage_deleteMembers(message);
+            return UA_STATUSCODE_BADOUTOFMEMORY;
         }
+        message->notificationData->encoding = UA_EXTENSIONOBJECT_DECODED;
+        message->notificationData->content.decoded.data = dcn;
+        message->notificationData->content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
+
+        size_t dcnSize = sub->dataChangeNotifications;
+        if(dcnSize > notifications)
+            dcnSize = notifications;
+        dcn->monitoredItems = (UA_MonitoredItemNotification*)
+            UA_Array_new(dcnSize, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
+        if(!dcn->monitoredItems) {
+            UA_NotificationMessage_deleteMembers(message);
+            return UA_STATUSCODE_BADOUTOFMEMORY;
+        }
+        dcn->monitoredItemsSize = dcnSize;
+        notificationDataIdx++;
+    }
 
-        /* Move the content to the response */
-        efls[pos] = notification->data.event.fields;
-        UA_EventFieldList_init(&notification->data.event.fields);
-        efls[pos].clientHandle = mon->clientHandle;
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+    UA_EventNotificationList *enl = NULL;
+    UA_StatusChangeNotification *scn = NULL;
+    /* Pre-allocate either StatusChange or EventNotifications. Sending a
+     * (single) StatusChangeNotification has priority. */
+    if(sub->statusChangeNotifications > 0) {
+        scn = UA_StatusChangeNotification_new();
+        if(!scn) {
+            UA_NotificationMessage_deleteMembers(message);
+            return UA_STATUSCODE_BADOUTOFMEMORY;
+        }
+        message->notificationData[notificationDataIdx].encoding = UA_EXTENSIONOBJECT_DECODED;
+        message->notificationData[notificationDataIdx].content.decoded.data = scn;
+        message->notificationData[notificationDataIdx].content.decoded.type = &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION];
+        notificationDataIdx++;
+    } else if(sub->eventNotifications > 0) {
+        enl = UA_EventNotificationList_new();
+        if(!enl) {
+            UA_NotificationMessage_deleteMembers(message);
+            return UA_STATUSCODE_BADOUTOFMEMORY;
+        }
+        message->notificationData[notificationDataIdx].encoding = UA_EXTENSIONOBJECT_DECODED;
+        message->notificationData[notificationDataIdx].content.decoded.data = enl;
+        message->notificationData[notificationDataIdx].content.decoded.type = &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST];
+
+        size_t enlSize = sub->eventNotifications;
+        if(enlSize > notifications)
+            enlSize = notifications;
+        enl->events = (UA_EventFieldList*) UA_Array_new(enlSize, &UA_TYPES[UA_TYPES_EVENTFIELDLIST]);
+        if(!enl->events) {
+            UA_NotificationMessage_deleteMembers(message);
+            return UA_STATUSCODE_BADOUTOFMEMORY;
+        }
+        enl->eventsSize = enlSize;
+        notificationDataIdx++;
+    }
+#endif
 
-        /* EventFilterResult currently isn't being used
-        UA_EventFilterResult_deleteMembers(&notification->data.event.result); */
+    UA_assert(notificationDataIdx > 0);
+    message->notificationDataSize = notificationDataIdx;
 
-        /* Remove the notification from the queues */
-        UA_Notification_delete(sub, mon, notification);
+    /* <-- The point of no return --> */
 
-        pos++; /* Increase the index */
-    }
-}
+    size_t totalNotifications = 0; /* How many notifications were moved to the response overall? */
+    size_t dcnPos = 0; /* How many DataChangeNotifications were put into the list? */
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+    size_t enlPos = 0; /* How many EventNotifications were moved into the list */
 #endif
-
-/* DataChange: Iterate over the monitoreditems of the subscription, starting at mon, and
- *             move notifications into the response. */
-static void
-DataChange_moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_MonitoredItemNotification *mins,
-                                               size_t minsSize) {
-    size_t pos = 0;
     UA_Notification *notification, *notification_tmp;
     TAILQ_FOREACH_SAFE(notification, &sub->notificationQueue, globalEntry, notification_tmp) {
-        if(pos >= minsSize)
-            return;
-
+        if(totalNotifications >= notifications)
+            break;
+        
         UA_MonitoredItem *mon = notification->mon;
 
         /* Move the content to the response */
-        UA_MonitoredItemNotification *min = &mins[pos];
-        min->clientHandle = mon->clientHandle;
-        min->value = notification->data.value;
-        UA_DataValue_init(&notification->data.value); /* Reset after the value has been moved */
+        if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+            UA_assert(dcn != NULL); /* Have at least one change notification */
+            /* Move the content to the response */
+            UA_MonitoredItemNotification *min = &dcn->monitoredItems[dcnPos];
+            min->clientHandle = mon->clientHandle;
+            min->value = notification->data.value;
+            UA_DataValue_init(&notification->data.value); /* Reset after the value has been moved */
+            dcnPos++;
+        }
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+        else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_STATUSNOTIFY && scn) {
+            // TODO: Handling of StatusChangeNotifications
+            scn = NULL; /* At most one per PublishReponse */
+        } else if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY && enl) {
+            UA_assert(enl != NULL); /* Have at least one event notification */
+
+            /* TODO: The following lead to crashes when we assumed notifications to be ready... */
+            /* /\* removing an overflowEvent should not reduce the queueSize *\/ */
+            /* UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE); */
+            /* if (!(notification->data.event.fields.eventFieldsSize == 1 */
+            /*       && notification->data.event.fields.eventFields->type == &UA_TYPES[UA_TYPES_NODEID] */
+            /*       && UA_NodeId_equal((UA_NodeId *)notification->data.event.fields.eventFields->data, &overflowId))) { */
+            /*     --mon->queueSize; */
+            /*     --sub->notificationQueueSize; */
+            /* } */
+
+            /* Move the content to the response */
+            UA_EventFieldList *efl = &enl->events[enlPos];
+            *efl = notification->data.event.fields;
+            UA_EventFieldList_init(&notification->data.event.fields);
+            efl->clientHandle = mon->clientHandle;
+            /* EventFilterResult currently isn't being used
+               UA_EventFilterResult_deleteMembers(&notification->data.event.result); */
+            enlPos++;
+        }
+#endif
+        else {
+            continue; /* Nothing to do */
+        }
 
         /* Remove the notification from the queues */
         UA_Notification_delete(sub, mon, notification);
-
-        ++pos; /* Increase the index */
+        totalNotifications++;
     }
-}
-
-static UA_StatusCode
-prepareNotificationMessage(UA_Server *server, UA_Subscription *sub, UA_NotificationMessage *message,
-                           size_t notifications) {
-    /* Array of ExtensionObject to hold different kinds of notifications
-     * (currently only DataChangeNotifications) */
-    message->notificationData = UA_ExtensionObject_new();
-    if(!message->notificationData)
-        return UA_STATUSCODE_BADOUTOFMEMORY;
-    message->notificationDataSize = 1;
-
-    UA_ExtensionObject *data = message->notificationData;
-    data->encoding = UA_EXTENSIONOBJECT_DECODED;
-    /* TODO: basing type of notificationtype off of first monitoredItem in subscription which isnt very good */
-    /* Allocate Notification */
-    if (LIST_FIRST(&sub->monitoredItems)->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
-        UA_DataChangeNotification *dcn = UA_DataChangeNotification_new();
-        if (!dcn) {
-            UA_NotificationMessage_deleteMembers(message);
-            return UA_STATUSCODE_BADOUTOFMEMORY;
-        }
-        data->content.decoded.data = dcn;
-        data->content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
 
-        /* Allocate array of notifications */
-        dcn->monitoredItems = (UA_MonitoredItemNotification *)
-                UA_Array_new(notifications,
-                             &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
-        if(!dcn->monitoredItems) {
-            UA_NotificationMessage_deleteMembers(message);
-            return UA_STATUSCODE_BADOUTOFMEMORY;
+    /* Set sizes */
+    if(dcn) {
+        dcn->monitoredItemsSize = dcnPos;
+        if(dcnPos == 0) {
+            UA_free(dcn->monitoredItems);
+            dcn->monitoredItems = NULL;
         }
-        dcn->monitoredItemsSize = notifications;
-
-        /* Move notifications into the response .. the point of no return */
-        DataChange_moveNotificationsFromMonitoredItems(sub, dcn->monitoredItems, notifications);
     }
-#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
-    else if (LIST_FIRST(&sub->monitoredItems)->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
-
-        UA_EventNotificationList *enl = UA_EventNotificationList_new();
-        if (!enl) {
-            UA_NotificationMessage_deleteMembers(message);
-            return UA_STATUSCODE_BADOUTOFMEMORY;
-        }
-        UA_EventNotificationList_init(enl);
-
-        data->content.decoded.data = enl;
-        data->content.decoded.type = &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST];
 
-        /* Allocate array of notifications */
-        enl->events = (UA_EventFieldList *) UA_Array_new(notifications, &UA_TYPES[UA_TYPES_EVENTFIELDLIST]);
-        if (!enl->events) {
-            UA_NotificationMessage_deleteMembers(message);
-            return UA_STATUSCODE_BADOUTOFMEMORY;
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+    if(enl) {
+        enl->eventsSize = enlPos;
+        if(enlPos == 0) {
+            UA_free(enl->events);
+            enl->events = NULL;
         }
-        enl->eventsSize = notifications;
-
-        /* Move the list into the response .. the point of no return */
-        Events_moveNotificationsFromMonitoredItems(sub, enl->events, notifications);
     }
 #endif
-    else {
-        return UA_STATUSCODE_BADNOTIMPLEMENTED;
-    }
+
     return UA_STATUSCODE_GOOD;
 }
 
@@ -385,7 +414,7 @@ UA_Subscription_publish(UA_Server *server, UA_Subscription *sub) {
                              sub->subscriptionId);
     }
 
-    /* We want to send a response. Is it possible? */
+    /* We want to send a response. Is the channel open? */
     UA_SecureChannel *channel = sub->session->header.channel;
     if(!channel || !pre) {
         UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
@@ -442,7 +471,7 @@ UA_Subscription_publish(UA_Server *server, UA_Subscription *sub) {
      * no notifications (and this is a keepalive message). */
     message->sequenceNumber = UA_Subscription_nextSequenceNumber(sub->sequenceNumber);
 
-    if(notifications != 0) {
+    if(notifications > 0) {
         /* There are notifications. So we can't reuse the sequence number. */
         sub->sequenceNumber = message->sequenceNumber;
 

+ 40 - 46
src/server/ua_subscription_datachange.c

@@ -125,53 +125,47 @@ MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
         TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
-        /* TODO: provide additional protection for overflowEvents according to specification */
-        /* removing an overflowEvent should not reduce the queueSize */
-        UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
-        if(del->data.event.fields.eventFieldsSize == 1 &&
-           del->data.event.fields.eventFields->type == &UA_TYPES[UA_TYPES_NODEID] &&
-           UA_NodeId_equal((UA_NodeId *)del->data.event.fields.eventFields->data, &overflowId)) {
-            ++mon->queueSize;
-            ++sub->notificationQueueSize;
-        }
-
         /* Create an overflow notification */
-        if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
-            /* EventFilterResult currently isn't being used
-            UA_EventFilterResult_deleteMembers(&del->data.event->result); */
-            UA_EventFieldList_deleteMembers(&del->data.event.fields);
-
-            /* cause an overflowEvent */
-            /* an overflowEvent does not care about event filters and as such
-             * will not be "triggered" correctly. Instead, a notification will
-             * be inserted into the queue which includes only the nodeId of the
-             * overflowEventType. It is up to the client to check for possible
-             * overflows. */
-            UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification));
-            if(!overflowNotification)
-                return UA_STATUSCODE_BADOUTOFMEMORY;
-
-            UA_EventFieldList_init(&overflowNotification->data.event.fields);
-            overflowNotification->data.event.fields.eventFields = UA_Variant_new();
-            if(!overflowNotification->data.event.fields.eventFields) {
-                UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
-                UA_free(overflowNotification);
-                return UA_STATUSCODE_BADOUTOFMEMORY;
-            }
-
-            UA_Variant_init(overflowNotification->data.event.fields.eventFields);
-            overflowNotification->data.event.fields.eventFieldsSize = 1;
-            UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
-                                     &overflowId, &UA_TYPES[UA_TYPES_NODEID]);
-            overflowNotification->mon = mon;
-            if(mon->discardOldest) {
-                TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry);
-                TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue, overflowNotification, globalEntry);
-            } else {
-                TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry);
-                TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue, overflowNotification, globalEntry);
-            }
-        }
+        /* if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) { */
+        /*     /\* EventFilterResult currently isn't being used */
+        /*     UA_EventFilterResult_deleteMembers(&del->data.event->result); *\/ */
+        /*     UA_EventFieldList_deleteMembers(&del->data.event.fields); */
+
+        /*     /\* cause an overflowEvent *\/ */
+        /*     /\* an overflowEvent does not care about event filters and as such */
+        /*      * will not be "triggered" correctly. Instead, a notification will */
+        /*      * be inserted into the queue which includes only the nodeId of the */
+        /*      * overflowEventType. It is up to the client to check for possible */
+        /*      * overflows. *\/ */
+        /*     UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification)); */
+        /*     if(!overflowNotification) */
+        /*         return UA_STATUSCODE_BADOUTOFMEMORY; */
+
+        /*     UA_EventFieldList_init(&overflowNotification->data.event.fields); */
+        /*     overflowNotification->data.event.fields.eventFields = UA_Variant_new(); */
+        /*     if(!overflowNotification->data.event.fields.eventFields) { */
+        /*         UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields); */
+        /*         UA_free(overflowNotification); */
+        /*         return UA_STATUSCODE_BADOUTOFMEMORY; */
+        /*     } */
+
+        /*     UA_Variant_init(overflowNotification->data.event.fields.eventFields); */
+        /*     overflowNotification->data.event.fields.eventFieldsSize = 1; */
+        /*     UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE); */
+        /*     UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields, */
+        /*                              &overflowId, &UA_TYPES[UA_TYPES_NODEID]); */
+        /*     overflowNotification->mon = mon; */
+        /*     if(mon->discardOldest) { */
+        /*         TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry); */
+        /*         TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue, overflowNotification, globalEntry); */
+        /*     } else { */
+        /*         TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry); */
+        /*         TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue, overflowNotification, globalEntry); */
+        /*     } */
+        /*     ++mon->queueSize; */
+        /*     ++sub->notificationQueueSize; */
+        /*     ++sub->eventNotifications; */
+        /* } */
 #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
 
         /* Delete the notification. This also removes the notification from the