Переглянути джерело

Feature/event preparation (#1560)

Prepare some internal structures for implementation of events

Includes the commits:
* event preparation

* whitespace_fix

* unit test fix

* moved functions from ua_subscription.c back to ua_subscription_datachange.c

* moved functions from ua_subscription.c back to ua_subscription_datachange.c

* Revert "Merge remote-tracking branch 'origin/feature/event_preparation' into feature/event_preparation"

This reverts commit 8689d1f79171ac96c0e6ab305fbf4c3fb4e7cd80, reversing
changes made to 671d23e899698c9108881c614f449df4c8275268.

* fixed missing #ifdef

* fixed missing #ifdef
Ari 7 роки тому
батько
коміт
e8fb312512

+ 1 - 1
src/server/ua_services_subscription.c

@@ -259,7 +259,7 @@ Operation_CreateMonitoredItem(UA_Server *server, UA_Session *session, struct cre
     }
 
     /* Create the monitoreditem */
-    UA_MonitoredItem *newMon = UA_MonitoredItem_new();
+    UA_MonitoredItem *newMon = UA_MonitoredItem_new(UA_MONITOREDITEMTYPE_CHANGENOTIFY);
     if(!newMon) {
         result->statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
         return;

+ 5 - 1
src/server/ua_subscription.c

@@ -189,7 +189,11 @@ moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_MonitoredItem *mon,
                 return;
             UA_MonitoredItemNotification *min = &mins[*pos];
             min->clientHandle = qv->clientHandle;
-            min->value = qv->value;
+            if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+                min->value = qv->data.value;
+            } else {
+                /* TODO implementation for events */
+            }
             TAILQ_REMOVE(&mon->queue, qv, listEntry);
             UA_free(qv);
             --mon->currentQueueSize;

+ 11 - 2
src/server/ua_subscription.h

@@ -29,10 +29,19 @@ typedef enum {
     UA_MONITOREDITEMTYPE_EVENTNOTIFY = 4
 } UA_MonitoredItemType;
 
+
+/* not used yet, placeholder for event implementation */
+typedef struct UA_Event {
+   UA_Int32 eventId;
+} UA_Event;
+
 typedef struct MonitoredItem_queuedValue {
     TAILQ_ENTRY(MonitoredItem_queuedValue) listEntry;
     UA_UInt32 clientHandle;
-    UA_DataValue value;
+    union {
+        UA_Event event;
+        UA_DataValue value;
+    } data;
 } MonitoredItem_queuedValue;
 
 typedef TAILQ_HEAD(QueuedValueQueue, MonitoredItem_queuedValue) QueuedValueQueue;
@@ -66,7 +75,7 @@ typedef struct UA_MonitoredItem {
     QueuedValueQueue queue;
 } UA_MonitoredItem;
 
-UA_MonitoredItem * UA_MonitoredItem_new(void);
+UA_MonitoredItem * UA_MonitoredItem_new(UA_MonitoredItemType);
 void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem);
 void UA_MonitoredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem);
 UA_StatusCode MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon);

+ 46 - 33
src/server/ua_subscription_datachange.c

@@ -16,15 +16,15 @@
 #define UA_VALUENCODING_MAXSTACK 512
 
 UA_MonitoredItem *
-UA_MonitoredItem_new(void) {
+UA_MonitoredItem_new(UA_MonitoredItemType monType) {
     /* Allocate the memory */
     UA_MonitoredItem *newItem =
-        (UA_MonitoredItem*)UA_calloc(1, sizeof(UA_MonitoredItem));
+            (UA_MonitoredItem *) UA_calloc(1, sizeof(UA_MonitoredItem));
     if(!newItem)
         return NULL;
 
     /* Remaining members are covered by calloc zeroing out the memory */
-    newItem->monitoredItemType = UA_MONITOREDITEMTYPE_CHANGENOTIFY; /* currently hardcoded */
+    newItem->monitoredItemType = monType; /* currently hardcoded */
     newItem->timestampsToReturn = UA_TIMESTAMPSTORETURN_SOURCE;
     TAILQ_INIT(&newItem->queue);
     return newItem;
@@ -32,18 +32,24 @@ UA_MonitoredItem_new(void) {
 
 void
 MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
-    /* Remove the sampling callback */
-    MonitoredItem_unregisterSampleCallback(server, monitoredItem);
-
-    /* Clear the queued samples */
-    MonitoredItem_queuedValue *val, *val_tmp;
-    TAILQ_FOREACH_SAFE(val, &monitoredItem->queue, listEntry, val_tmp) {
-        TAILQ_REMOVE(&monitoredItem->queue, val, listEntry);
-        UA_DataValue_deleteMembers(&val->value);
-        UA_free(val);
+    if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+        /* Remove the sampling callback */
+        MonitoredItem_unregisterSampleCallback(server, monitoredItem);
+
+        /* Clear the queued samples */
+        MonitoredItem_queuedValue *val, *val_tmp;
+        TAILQ_FOREACH_SAFE(val, &monitoredItem->queue, listEntry, val_tmp) {
+            TAILQ_REMOVE(&monitoredItem->queue, val, listEntry);
+            UA_DataValue_deleteMembers(&val->data.value);
+            UA_free(val);
+        }
+        monitoredItem->currentQueueSize = 0;
+    } else {
+        /* TODO: Access val data.event */
+        UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "MonitoredItemTypes other than ChangeNotify are not supported yet");
+        return;
     }
-    monitoredItem->currentQueueSize = 0;
-
     /* Remove the monitored item */
     LIST_REMOVE(monitoredItem, listEntry);
     UA_String_deleteMembers(&monitoredItem->indexRange);
@@ -74,7 +80,11 @@ MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
 
         /* Remove the item */
         TAILQ_REMOVE(&mon->queue, queueItem, listEntry);
-        UA_DataValue_deleteMembers(&queueItem->value);
+        if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+            UA_DataValue_deleteMembers(&queueItem->data.value);
+        } else {
+            //TODO: event implemantation
+        }
         UA_free(queueItem);
         --mon->currentQueueSize;
         valueDiscarded = true;
@@ -84,23 +94,25 @@ MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
     if(!valueDiscarded)
         return;
 
-    /* Get the element that carries the infobits */
-    if(mon->discardOldest)
-        queueItem = TAILQ_FIRST(&mon->queue);
-    else
-        queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
-    UA_assert(queueItem);
-
-    /* If the queue size is reduced to one, remove the infobits */
-    if(mon->maxQueueSize == 1) {
-        queueItem->value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
-                                                    UA_STATUSCODE_INFOBITS_OVERFLOW);
-        return;
-    }
+    if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+        /* Get the element that carries the infobits */
+        if(mon->discardOldest)
+            queueItem = TAILQ_FIRST(&mon->queue);
+        else
+            queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
+        UA_assert(queueItem);
 
-    /* Add the infobits either to the newest or the new last entry */
-    queueItem->value.hasStatus = true;
-    queueItem->value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
+        /* If the queue size is reduced to one, remove the infobits */
+        if(mon->maxQueueSize == 1) {
+            queueItem->data.value.status &= ~(UA_StatusCode) (UA_STATUSCODE_INFOTYPE_DATAVALUE |
+                                                              UA_STATUSCODE_INFOBITS_OVERFLOW);
+            return;
+        }
+
+        /* Add the infobits either to the newest or the new last entry */
+        queueItem->data.value.hasStatus = true;
+        queueItem->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
+    }
 }
 
 /* Errors are returned as no change detected */
@@ -169,6 +181,7 @@ sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
                         UA_MonitoredItem *monitoredItem,
                         UA_DataValue *value,
                         UA_ByteString *valueEncoding) {
+    UA_assert(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY);
     /* Store the pointer to the stack-allocated bytestring to see if a heap-allocation
      * was necessary */
     UA_Byte *stackValueEncoding = valueEncoding->data;
@@ -206,7 +219,7 @@ sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
     /* Prepare the newQueueItem */
     if(value->hasValue && value->value.storageType == UA_VARIANT_DATA_NODELETE) {
         /* Make a deep copy of the value */
-        UA_StatusCode retval = UA_DataValue_copy(value, &newQueueItem->value);
+        UA_StatusCode retval = UA_DataValue_copy(value, &newQueueItem->data.value);
         if(retval != UA_STATUSCODE_GOOD) {
             UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
                                    "Subscription %u | MonitoredItem %i | "
@@ -216,7 +229,7 @@ sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
             return false;
         }
     } else {
-        newQueueItem->value = *value; /* Just copy the value and do not release it */
+        newQueueItem->data.value = *value; /* Just copy the value and do not release it */
     }
     newQueueItem->clientHandle = monitoredItem->clientHandle;
 

+ 13 - 11
tests/server/check_services_subscriptions.c

@@ -2,6 +2,8 @@
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
+/* TODO add event based testing */
+
 #include "ua_server.h"
 #include "server/ua_services.h"
 #include "server/ua_server_internal.h"
@@ -329,33 +331,33 @@ START_TEST(Server_overflow) {
     ck_assert_uint_eq(mon->maxQueueSize, 3); 
     MonitoredItem_queuedValue *queueItem;
     queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
-    ck_assert_uint_eq(queueItem->value.hasStatus, false);
+    ck_assert_uint_eq(queueItem->data.value.hasStatus, false);
 
     UA_ByteString_deleteMembers(&mon->lastSampledValue);
     UA_MonitoredItem_SampleCallback(server, mon);
     ck_assert_uint_eq(mon->currentQueueSize, 2); 
     ck_assert_uint_eq(mon->maxQueueSize, 3); 
     queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
-    ck_assert_uint_eq(queueItem->value.hasStatus, false);
+    ck_assert_uint_eq(queueItem->data.value.hasStatus, false);
 
     UA_ByteString_deleteMembers(&mon->lastSampledValue);
     UA_MonitoredItem_SampleCallback(server, mon);
     ck_assert_uint_eq(mon->currentQueueSize, 3); 
     ck_assert_uint_eq(mon->maxQueueSize, 3); 
     queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
-    ck_assert_uint_eq(queueItem->value.hasStatus, false);
+    ck_assert_uint_eq(queueItem->data.value.hasStatus, false);
 
     UA_ByteString_deleteMembers(&mon->lastSampledValue);
     UA_MonitoredItem_SampleCallback(server, mon);
     ck_assert_uint_eq(mon->currentQueueSize, 3); 
     ck_assert_uint_eq(mon->maxQueueSize, 3); 
     queueItem = TAILQ_FIRST(&mon->queue);
-    ck_assert_uint_eq(queueItem->value.hasStatus, true);
-    ck_assert_uint_eq(queueItem->value.status, UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
+    ck_assert_uint_eq(queueItem->data.value.hasStatus, true);
+    ck_assert_uint_eq(queueItem->data.value.status, UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
 
     /* Remove status for next test */
-    queueItem->value.hasStatus = false;
-    queueItem->value.status = 0;
+    queueItem->data.value.hasStatus = false;
+    queueItem->data.value.status = 0;
 
     /* Modify the MonitoredItem */
     UA_ModifyMonitoredItemsRequest modifyMonitoredItemsRequest;
@@ -386,8 +388,8 @@ START_TEST(Server_overflow) {
     ck_assert_uint_eq(mon->currentQueueSize, 2); 
     ck_assert_uint_eq(mon->maxQueueSize, 2); 
     queueItem = TAILQ_FIRST(&mon->queue);
-    ck_assert_uint_eq(queueItem->value.hasStatus, true);
-    ck_assert_uint_eq(queueItem->value.status, UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
+    ck_assert_uint_eq(queueItem->data.value.hasStatus, true);
+    ck_assert_uint_eq(queueItem->data.value.status, UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
 
     /* Modify the MonitoredItem */
     UA_ModifyMonitoredItemsRequest_init(&modifyMonitoredItemsRequest);
@@ -414,7 +416,7 @@ START_TEST(Server_overflow) {
     ck_assert_uint_eq(mon->currentQueueSize, 1); 
     ck_assert_uint_eq(mon->maxQueueSize, 1); 
     queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
-    ck_assert_uint_eq(queueItem->value.hasStatus, false);
+    ck_assert_uint_eq(queueItem->data.value.hasStatus, false);
 
     /* Modify the MonitoredItem */
     UA_ModifyMonitoredItemsRequest_init(&modifyMonitoredItemsRequest);
@@ -444,7 +446,7 @@ START_TEST(Server_overflow) {
     ck_assert_uint_eq(mon->currentQueueSize, 1); 
     ck_assert_uint_eq(mon->maxQueueSize, 1); 
     queueItem = TAILQ_FIRST(&mon->queue);
-    ck_assert_uint_eq(queueItem->value.hasStatus, false); /* the infobit is only set if the queue is larger than one */
+    ck_assert_uint_eq(queueItem->data.value.hasStatus, false); /* the infobit is only set if the queue is larger than one */
 
     /* Remove the subscriptions */
     UA_DeleteSubscriptionsRequest deleteSubscriptionsRequest;