Parcourir la source

improve handling of timestamps in subscriptions

Julius Pfrommer il y a 8 ans
Parent
commit
bd7858bf12

+ 8 - 6
CMakeLists.txt

@@ -205,26 +205,28 @@ set(lib_sources ${PROJECT_SOURCE_DIR}/src/ua_types.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server_binary.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server_utils.c
-                ${PROJECT_SOURCE_DIR}/src/server/ua_nodes.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server_worker.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_securechannel_manager.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_session_manager.c
+                ${PROJECT_SOURCE_DIR}/src/server/ua_nodes.c
+                # nodestores
+                ${PROJECT_SOURCE_DIR}/src/server/ua_nodestore.c
+                ${PROJECT_SOURCE_DIR}/src/server/ua_nodestore_concurrent.c
+                # services
                 ${PROJECT_SOURCE_DIR}/src/server/ua_services_discovery.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_services_securechannel.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_services_session.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_services_attribute.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_services_nodemanagement.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_services_view.c
-                ${PROJECT_SOURCE_DIR}/src/client/ua_client.c
-                ${PROJECT_SOURCE_DIR}/src/client/ua_client_highlevel.c
-                # nodestores
-                ${PROJECT_SOURCE_DIR}/src/server/ua_nodestore.c
-                ${PROJECT_SOURCE_DIR}/src/server/ua_nodestore_concurrent.c
                 # method call
                 ${PROJECT_SOURCE_DIR}/src/server/ua_services_call.c
                 # subscriptions
                 ${PROJECT_SOURCE_DIR}/src/server/ua_subscription.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_services_subscription.c
+                # client
+                ${PROJECT_SOURCE_DIR}/src/client/ua_client.c
+                ${PROJECT_SOURCE_DIR}/src/client/ua_client_highlevel.c
                 ${PROJECT_SOURCE_DIR}/src/client/ua_client_highlevel_subscriptions.c
                 # plugins and dependencies
                 ${PROJECT_SOURCE_DIR}/plugins/ua_network_tcp.c

+ 4 - 2
src/server/ua_nodestore.c

@@ -209,12 +209,14 @@ UA_NodeStore_newNode(UA_NodeClass class) {
     UA_NodeStoreEntry *entry = instantiateEntry(class);
     if(!entry)
         return NULL;
-    return (UA_Node*)&entry->node;
+    return &entry->node;
 }
 
 void
 UA_NodeStore_deleteNode(UA_Node *node) {
-    deleteEntry(container_of(node, UA_NodeStoreEntry, node));
+    UA_NodeStoreEntry *entry = container_of(node, UA_NodeStoreEntry, node);
+    UA_assert(&entry->node == node);
+    deleteEntry(entry);
 }
 
 UA_StatusCode

+ 1 - 0
src/server/ua_nodestore_concurrent.c

@@ -1,5 +1,6 @@
 #include "ua_util.h"
 #include "ua_nodestore.h"
+#include "ua_server_internal.h"
 
 #ifdef UA_ENABLE_MULTITHREADING /* conditional compilation */
 

+ 5 - 2
src/server/ua_services_nodemanagement.c

@@ -762,10 +762,13 @@ UA_Server_addDataSourceVariableNode(UA_Server *server, const UA_NodeId requested
                                  false, NULL, &value);
     else
         retval = UA_STATUSCODE_BADTYPEMISMATCH;
-    if(retval != UA_STATUSCODE_GOOD)
-        return retval;
     editAttr.value = value.value;
 
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_NodeStore_deleteNode((UA_Node*)node);
+        return retval;
+    }
+
     /* Copy attributes into node */
     UA_AddNodesItem item;
     UA_AddNodesItem_init(&item);

+ 55 - 32
src/server/ua_subscription.c

@@ -66,14 +66,20 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
     UA_DataValue_init(&newvalue->value);
     newvalue->clientHandle = monitoredItem->clientHandle;
 
+    /* Adjust timestampstoreturn to get source timestamp for triggering */
+    UA_TimestampsToReturn ts = monitoredItem->timestampsToReturn;
+    if(ts == UA_TIMESTAMPSTORETURN_SERVER)
+        ts = UA_TIMESTAMPSTORETURN_BOTH;
+    else if(ts == UA_TIMESTAMPSTORETURN_NEITHER)
+        ts = UA_TIMESTAMPSTORETURN_SOURCE;
+
     /* Read the value */
     UA_ReadValueId rvid;
     UA_ReadValueId_init(&rvid);
     rvid.nodeId = monitoredItem->monitoredNodeId;
     rvid.attributeId = monitoredItem->attributeID;
     rvid.indexRange = monitoredItem->indexRange;
-    Service_Read_single(server, sub->session, monitoredItem->timestampsToReturn,
-                        &rvid, &newvalue->value);
+    Service_Read_single(server, sub->session, ts, &rvid, &newvalue->value);
 
     /* Apply Filter */
     UA_Boolean hasValue = newvalue->value.hasValue;
@@ -90,7 +96,7 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
         newvalue->value.hasSourcePicoseconds = false;
     }
 
-    /* Encode to see if the data has changed */
+    /* Encode the data for comparison */
     size_t binsize = UA_calcSizeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE]);
     UA_ByteString newValueAsByteString;
     UA_StatusCode retval = UA_ByteString_allocBuffer(&newValueAsByteString, binsize);
@@ -103,31 +109,34 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
     retval = UA_encodeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE],
                              NULL, NULL, &newValueAsByteString, &encodingOffset);
 
-    /* Restore the booleans changed for the filter */
+    /* Restore the settings changed for the filter */
     newvalue->value.hasValue = hasValue;
     newvalue->value.hasServerTimestamp = hasServerTimestamp;
     newvalue->value.hasServerPicoseconds = hasServerPicoseconds;
-    newvalue->value.hasSourceTimestamp = hasSourceTimestamp;
-    newvalue->value.hasSourcePicoseconds = hasSourcePicoseconds;
+    if(monitoredItem->timestampsToReturn == UA_TIMESTAMPSTORETURN_SERVER ||
+       monitoredItem->timestampsToReturn == UA_TIMESTAMPSTORETURN_NEITHER) {
+        newvalue->value.hasSourceTimestamp = false;
+        newvalue->value.hasSourcePicoseconds = false;
+    } else {
+        newvalue->value.hasSourceTimestamp = hasSourceTimestamp;
+        newvalue->value.hasSourcePicoseconds = hasSourcePicoseconds;
+    }
 
     /* Error or the value has not changed */
     if(retval != UA_STATUSCODE_GOOD ||
        (monitoredItem->lastSampledValue.data &&
         UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue))) {
-        UA_ByteString_deleteMembers(&newValueAsByteString);
-        UA_DataValue_deleteMembers(&newvalue->value);
-        UA_free(newvalue);
         UA_LOG_TRACE_SESSION(server->config.logger, sub->session, "Subscription %u | "
                              "MonitoredItem %u | Do not sample an unchanged value",
                              sub->subscriptionID, monitoredItem->itemId);
-        return;
+        goto cleanup;
     }
 
     UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
                          "Subscription %u | MonitoredItem %u | "
                          "Sampling the value", sub->subscriptionID, monitoredItem->itemId);
 
-    /* Do we have space in the queue? */
+    /* Is enough space in the queue? */
     if(monitoredItem->currentQueueSize >= monitoredItem->maxQueueSize) {
         MonitoredItem_queuedValue *queueItem;
         if(monitoredItem->discardOldest)
@@ -140,10 +149,7 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
                                    "MonitoredItem %u | Cannot remove an element from the full "
                                    "queue. Internal error!", sub->subscriptionID,
                                    monitoredItem->itemId);
-            UA_ByteString_deleteMembers(&newValueAsByteString);
-            UA_DataValue_deleteMembers(&newvalue->value);
-            UA_free(newvalue);
-            return;
+            goto cleanup;
         }
 
         TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
@@ -167,11 +173,20 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
     /* Add the sample to the queue for publication */
     TAILQ_INSERT_TAIL(&monitoredItem->queue, newvalue, listEntry);
     monitoredItem->currentQueueSize++;
+    return;
+
+ cleanup:
+    UA_ByteString_deleteMembers(&newValueAsByteString);
+    UA_DataValue_deleteMembers(&newvalue->value);
+    UA_free(newvalue);
 }
 
-UA_StatusCode MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
-    UA_Job job = {.type = UA_JOBTYPE_METHODCALL, .job.methodCall = {
-            .method = (UA_ServerCallback)UA_MoniteredItem_SampleCallback, .data = mon} };
+UA_StatusCode
+MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
+    UA_Job job;
+    job.type = UA_JOBTYPE_METHODCALL;
+    job.job.methodCall.method = (UA_ServerCallback)UA_MoniteredItem_SampleCallback;
+    job.job.methodCall.data = mon;
     UA_StatusCode retval = UA_Server_addRepeatedJob(server, job,
                                                     (UA_UInt32)mon->samplingInterval,
                                                     &mon->sampleJobGuid);
@@ -333,7 +348,8 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         message->notificationDataSize = 1;
         UA_ExtensionObject *data = message->notificationData;
         UA_DataChangeNotification *dcn = UA_DataChangeNotification_new();
-        dcn->monitoredItems = UA_Array_new(notifications, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
+        dcn->monitoredItems =
+            UA_Array_new(notifications, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
         dcn->monitoredItemsSize = notifications;
         size_t l = 0;
         UA_MonitoredItem *mon;
@@ -351,10 +367,12 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
                 mon->currentQueueSize--;
                 mon_l++;
             }
-            UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | MonitoredItem %u | " \
-                                 "Adding %u notifications to the publish response. " \
+            UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
+                                 "Subscription %u | MonitoredItem %u | "
+                                 "Adding %u notifications to the publish response. "
                                  "%u notifications remain in the queue",
-                                 sub->subscriptionID, mon->itemId, mon_l, mon->currentQueueSize);
+                                 sub->subscriptionID, mon->itemId, mon_l,
+                                 mon->currentQueueSize);
             l += mon_l;
         }
         data->encoding = UA_EXTENSIONOBJECT_DECODED;
@@ -362,13 +380,16 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         data->content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
 
         /* Put the notification message into the retransmission queue */
-        UA_NotificationMessageEntry *retransmission = malloc(sizeof(UA_NotificationMessageEntry));
+        UA_NotificationMessageEntry *retransmission =
+            malloc(sizeof(UA_NotificationMessageEntry));
         if(retransmission) {
-            UA_NotificationMessage_copy(&response->notificationMessage, &retransmission->message);
+            UA_NotificationMessage_copy(&response->notificationMessage,
+                                        &retransmission->message);
             LIST_INSERT_HEAD(&sub->retransmissionQueue, retransmission, listEntry);
         } else {
-            UA_LOG_WARNING_SESSION(server->config.logger, sub->session, "Subscription %u | "
-                                   "Could not allocate memory for retransmission", sub->subscriptionID);
+            UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
+                                   "Subscription %u | Could not allocate memory "
+                                   "for retransmission", sub->subscriptionID);
         }
     }
 
@@ -389,8 +410,8 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
 
     /* Send the response */
     UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                         "Subscription %u | Sending out a publish response with %u notifications",
-                         sub->subscriptionID, (UA_UInt32)notifications);
+                         "Subscription %u | Sending out a publish response with %u "
+                         "notifications", sub->subscriptionID, (UA_UInt32)notifications);
     UA_SecureChannel_sendBinaryMessage(sub->session->channel, requestId, response,
                                        &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
 
@@ -411,10 +432,12 @@ UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription
     if(!sub->publishingEnabled)
         return UA_STATUSCODE_GOOD;
 
-    UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
-                           .job.methodCall = {.method = (UA_ServerCallback)UA_Subscription_publishCallback,
-                                              .data = sub} };
-    UA_StatusCode retval = UA_Server_addRepeatedJob(server, job, (UA_UInt32)sub->publishingInterval,
+    UA_Job job;
+    job.type = UA_JOBTYPE_METHODCALL;
+    job.job.methodCall.method = (UA_ServerCallback)UA_Subscription_publishCallback;
+    job.job.methodCall.data = sub;
+    UA_StatusCode retval = UA_Server_addRepeatedJob(server, job,
+                                                    (UA_UInt32)sub->publishingInterval,
                                                     &sub->publishJobGuid);
     if(retval == UA_STATUSCODE_GOOD)
         sub->publishJobIsRegistered = true;

+ 1 - 37
tests/check_services_attributes.c

@@ -16,13 +16,6 @@
 #include <urcu.h>
 #endif
 
-static UA_StatusCode
-readCPUTemperature_broken(void *handle, const UA_NodeId nodeid, UA_Boolean sourceTimeStamp,
-                          const UA_NumericRange *range, UA_DataValue *dataValue) {
-  dataValue->hasValue = true;
-  return UA_STATUSCODE_GOOD;
-}
-
 static UA_StatusCode
 readCPUTemperature(void *handle, const UA_NodeId nodeid, UA_Boolean sourceTimeStamp,
                    const UA_NumericRange *range, UA_DataValue *dataValue) {
@@ -67,22 +60,11 @@ makeTestSequence(void) {
                                                  UA_NODEID_NULL, vattr, temperatureDataSource, NULL);
     ck_assert_int_eq(retval, UA_STATUSCODE_GOOD);
 
-    /* DataSource Variable returning no value */
-    UA_DataSource temperatureDataSource1 =
-        (UA_DataSource) {.handle = NULL, .read = readCPUTemperature_broken, .write = NULL};
-    vattr.description = UA_LOCALIZEDTEXT("en_US","temperature1");
-    vattr.displayName = UA_LOCALIZEDTEXT("en_US","temperature1");
-    retval = UA_Server_addDataSourceVariableNode(server, UA_NODEID_STRING(1, "cpu.temperature1"),
-                                                 UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
-                                                 UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES),
-                                                 UA_QUALIFIEDNAME(1, "cpu temperature bogus"),
-                                                 UA_NODEID_NULL, vattr, temperatureDataSource1, NULL);
-    ck_assert_int_eq(retval, UA_STATUSCODE_GOOD);
-
     /* VariableNode with array */
     UA_VariableAttributes_init(&vattr);
     UA_Int32 myIntegerArray[9] = {1,2,3,4,5,6,7,8,9};
     UA_Variant_setArray(&vattr.value, &myIntegerArray, 9, &UA_TYPES[UA_TYPES_INT32]);
+    vattr.valueRank = -2;
     UA_UInt32 myIntegerDimensions[2] = {3,3};
     vattr.value.arrayDimensions = myIntegerDimensions;
     vattr.value.arrayDimensionsSize = 2;
@@ -629,23 +611,6 @@ START_TEST(ReadSingleAttributeUserExecutableWithoutTimestamp) {
 #endif
 } END_TEST
 
-START_TEST(ReadSingleDataSourceAttributeDataTypeWithoutTimestampFromBrokenSource) {
-    UA_Server *server = makeTestSequence();
-    UA_DataValue resp;
-    UA_DataValue_init(&resp);
-    UA_ReadRequest rReq;
-    UA_ReadRequest_init(&rReq);
-    rReq.nodesToRead = UA_ReadValueId_new();
-    rReq.nodesToReadSize = 1;
-    rReq.nodesToRead[0].nodeId = UA_NODEID_STRING_ALLOC(1, "cpu.temperature1");
-    rReq.nodesToRead[0].attributeId = UA_ATTRIBUTEID_DATATYPE;
-    Service_Read_single(server, &adminSession, UA_TIMESTAMPSTORETURN_NEITHER, &rReq.nodesToRead[0], &resp);
-    ck_assert_int_eq(UA_STATUSCODE_GOOD, resp.status);
-    UA_Server_delete(server);
-    UA_ReadRequest_deleteMembers(&rReq);
-    UA_DataValue_deleteMembers(&resp);
-} END_TEST
-
 START_TEST(ReadSingleDataSourceAttributeValueWithoutTimestamp) {
     UA_Server *server = makeTestSequence();
     UA_DataValue resp;
@@ -1069,7 +1034,6 @@ static Suite * testSuite_services_attributes(void) {
     tcase_add_test(tc_readSingleAttributes, ReadSingleAttributeHistorizingWithoutTimestamp);
     tcase_add_test(tc_readSingleAttributes, ReadSingleAttributeExecutableWithoutTimestamp);
     tcase_add_test(tc_readSingleAttributes, ReadSingleAttributeUserExecutableWithoutTimestamp);
-    tcase_add_test(tc_readSingleAttributes, ReadSingleDataSourceAttributeDataTypeWithoutTimestampFromBrokenSource);
     tcase_add_test(tc_readSingleAttributes, ReadSingleDataSourceAttributeValueWithoutTimestamp);
     tcase_add_test(tc_readSingleAttributes, ReadSingleDataSourceAttributeDataTypeWithoutTimestamp);
     tcase_add_test(tc_readSingleAttributes, ReadSingleDataSourceAttributeArrayDimensionsWithoutTimestamp);

+ 1 - 1
tests/check_types_range.c

@@ -23,7 +23,7 @@ START_TEST(parseRange) {
 
 START_TEST(parseRangeMinEqualMax) {
     UA_NumericRange range;
-    UA_String str = UA_STRING("1:2,1:1");
+    UA_String str = UA_STRING("1:2,1");
     UA_StatusCode retval = parse_numericrange(&str, &range);
     ck_assert_int_eq(retval, UA_STATUSCODE_GOOD);
     ck_assert_int_eq(range.dimensionsSize,2);