浏览代码

add shrinkMonitoredItemQueue and a test (#1517)

* add shrinkMonitoredItemQueue and test

* correct tailq order of notifications
StalderT 6 年之前
父节点
当前提交
3c099f6d57

+ 3 - 0
src/server/ua_services_subscription.c

@@ -330,6 +330,9 @@ Operation_ModifyMonitoredItem(UA_Server *server, UA_Session *session, UA_Subscri
     setMonitoredItemSettings(server, mon, mon->monitoringMode, &request->requestedParameters);
     result->revisedSamplingInterval = mon->samplingInterval;
     result->revisedQueueSize = mon->maxQueueSize;
+
+    /* Remove some notifications if the queue is now too small */
+    MonitoredItem_ensureQueueSpace(mon);
 }
 
 void Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,

+ 4 - 0
src/server/ua_subscription.h

@@ -63,6 +63,10 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
 UA_StatusCode MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon);
 UA_StatusCode MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon);
 
+/* Remove entries until mon->maxQueueSize is reached. Sets infobits for lost
+ * data if required. */
+void MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon);
+
 /****************/
 /* Subscription */
 /****************/

+ 44 - 17
src/server/ua_subscription_datachange.c

@@ -47,30 +47,55 @@ MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
     UA_free(monitoredItem); // TODO: Use a delayed free
 }
 
-static void
-ensureSpaceInMonitoredItemQueue(UA_MonitoredItem *mon, MonitoredItem_queuedValue *newQueueItem) {
-    /* Enough space, nothing to do here */
-    if(mon->currentQueueSize < mon->maxQueueSize)
+void
+MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
+    UA_Boolean valueDiscarded = false;
+    MonitoredItem_queuedValue *queueItem;
+#ifndef __clang_analyzer__
+    while(mon->currentQueueSize > mon->maxQueueSize) {
+        /* maxQueuesize is at least 1 */
+        UA_assert(mon->currentQueueSize >= 2);
+
+        /* Get the item to remove. New items are added to the end */
+        if(mon->discardOldest) {
+            /* Remove the oldest */
+            queueItem = TAILQ_FIRST(&mon->queue);
+        } else {
+            /* Keep the newest, remove the second-newest */
+            queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
+            queueItem = TAILQ_PREV(queueItem, QueuedValueQueue, listEntry);
+        }
+        UA_assert(queueItem);
+
+        /* Remove the item */
+        TAILQ_REMOVE(&mon->queue, queueItem, listEntry);
+        UA_DataValue_deleteMembers(&queueItem->value);
+        UA_free(queueItem);
+        --mon->currentQueueSize;
+        valueDiscarded = true;
+    }
+#endif
+
+    if(!valueDiscarded)
         return;
 
-    /* Get the item to remove */
-    MonitoredItem_queuedValue *queueItem;
+    /* Get the element that carries the infobits */
     if(mon->discardOldest)
         queueItem = TAILQ_FIRST(&mon->queue);
     else
         queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
     UA_assert(queueItem);
 
-    /* Remove the item */
-    TAILQ_REMOVE(&mon->queue, queueItem, listEntry);
-    UA_DataValue_deleteMembers(&queueItem->value);
-    UA_free(queueItem);
-    --mon->currentQueueSize;
-
-    if(mon->maxQueueSize > 1) {
-        newQueueItem->value.hasStatus = true;
-        newQueueItem->value.status = UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW;
+    /* If the queue size is reduced to one, remove the infobits */
+    if(mon->maxQueueSize == 1) {
+        queueItem->value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
+                                                    UA_STATUSCODE_INFOBITS_OVERFLOW);
+        return;
     }
+
+    /* Add the infobits either to the newest or the new last entry */
+    queueItem->value.hasStatus = true;
+    queueItem->value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
 }
 
 /* Errors are returned as no change detected */
@@ -201,10 +226,12 @@ sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
     monitoredItem->lastSampledValue = *valueEncoding;
 
     /* Add the sample to the queue for publication */
-    ensureSpaceInMonitoredItemQueue(monitoredItem, newQueueItem);
     TAILQ_INSERT_TAIL(&monitoredItem->queue, newQueueItem, listEntry);
     ++monitoredItem->currentQueueSize;
-    return true;;
+
+    /* Remove entries from the queue if required */
+    MonitoredItem_ensureQueueSpace(monitoredItem);
+    return true;
 }
 
 void

+ 206 - 2
tests/server/check_services_subscriptions.c

@@ -173,8 +173,10 @@ START_TEST(Server_publishCallback) {
     UA_Server_run_iterate(server, false);
     UA_realSleep(100);
 
-    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry)
-        ck_assert_uint_eq(sub->currentKeepAliveCount, sub->maxKeepAliveCount+1);
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry) {
+        if ((sub->subscriptionId == subscriptionId1) || (sub->subscriptionId == subscriptionId2))
+            ck_assert_uint_eq(sub->currentKeepAliveCount, sub->maxKeepAliveCount+1);
+    }
 
     /* Remove the subscriptions */
     UA_DeleteSubscriptionsRequest del_request;
@@ -225,6 +227,7 @@ START_TEST(Server_createMonitoredItems) {
     ck_assert_uint_eq(response.results[0].statusCode, UA_STATUSCODE_GOOD);
 
     monitoredItemId = response.results[0].monitoredItemId;
+    ck_assert_uint_gt(monitoredItemId, 0);
 
     UA_MonitoredItemCreateRequest_deleteMembers(&item);
     UA_CreateMonitoredItemsResponse_deleteMembers(&response);
@@ -263,6 +266,206 @@ START_TEST(Server_modifyMonitoredItems) {
 }
 END_TEST
 
+START_TEST(Server_overflow) {
+    /* Create a subscription */
+    UA_CreateSubscriptionRequest createSubscriptionRequest;
+    UA_CreateSubscriptionResponse createSubscriptionResponse;
+
+    UA_CreateSubscriptionRequest_init(&createSubscriptionRequest);
+    createSubscriptionRequest.publishingEnabled = true;
+    UA_CreateSubscriptionResponse_init(&createSubscriptionResponse);
+    Service_CreateSubscription(server, &adminSession, &createSubscriptionRequest, &createSubscriptionResponse);
+    ck_assert_uint_eq(createSubscriptionResponse.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    UA_UInt32 localSubscriptionId = createSubscriptionResponse.subscriptionId;
+    UA_Double publishingInterval = createSubscriptionResponse.revisedPublishingInterval;
+    ck_assert(publishingInterval > 0.0f);
+    UA_CreateSubscriptionResponse_deleteMembers(&createSubscriptionResponse);
+
+    /* Create a monitoredItem */
+    UA_CreateMonitoredItemsRequest createMonitoredItemsRequest;
+    UA_CreateMonitoredItemsRequest_init(&createMonitoredItemsRequest);
+    createMonitoredItemsRequest.subscriptionId = localSubscriptionId;
+    createMonitoredItemsRequest.timestampsToReturn = UA_TIMESTAMPSTORETURN_SERVER;
+    UA_MonitoredItemCreateRequest item;
+    UA_MonitoredItemCreateRequest_init(&item);
+    UA_ReadValueId rvi;
+    UA_ReadValueId_init(&rvi);
+    rvi.nodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
+    rvi.attributeId = UA_ATTRIBUTEID_BROWSENAME;
+    rvi.indexRange = UA_STRING_NULL;
+    item.itemToMonitor = rvi;
+    item.monitoringMode = UA_MONITORINGMODE_REPORTING;
+    UA_MonitoringParameters params;
+    UA_MonitoringParameters_init(&params);
+    item.requestedParameters = params;
+    item.requestedParameters.queueSize = 3;
+    item.requestedParameters.discardOldest = true;
+    createMonitoredItemsRequest.itemsToCreateSize = 1;
+    createMonitoredItemsRequest.itemsToCreate = &item;
+
+    UA_CreateMonitoredItemsResponse createMonitoredItemsResponse;
+    UA_CreateMonitoredItemsResponse_init(&createMonitoredItemsResponse);
+
+    Service_CreateMonitoredItems(server, &adminSession, &createMonitoredItemsRequest, &createMonitoredItemsResponse);
+    ck_assert_uint_eq(createMonitoredItemsResponse.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(createMonitoredItemsResponse.resultsSize, 1);
+    ck_assert_uint_eq(createMonitoredItemsResponse.results[0].statusCode, UA_STATUSCODE_GOOD);
+
+    UA_UInt32 localMonitoredItemId = createMonitoredItemsResponse.results[0].monitoredItemId;
+    ck_assert_uint_gt(localMonitoredItemId, 0);
+
+    UA_MonitoredItemCreateRequest_deleteMembers(&item);
+    UA_CreateMonitoredItemsResponse_deleteMembers(&createMonitoredItemsResponse);
+
+    UA_MonitoredItem *mon = NULL;
+    UA_Subscription *sub;
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry) {
+        if(sub->subscriptionId == localSubscriptionId)
+            mon = UA_Subscription_getMonitoredItem(sub, localMonitoredItemId);
+    }
+    ck_assert_ptr_ne(mon, NULL);
+    UA_assert(mon);
+    ck_assert_uint_eq(mon->currentQueueSize, 1); 
+    ck_assert_uint_eq(mon->maxQueueSize, 3); 
+    MonitoredItem_queuedValue *queueItem;
+    queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
+    ck_assert_uint_eq(queueItem->value.hasStatus, false);
+
+    UA_ByteString_deleteMembers(&mon->lastSampledValue);
+    UA_MoniteredItem_SampleCallback(server, mon);
+    ck_assert_uint_eq(mon->currentQueueSize, 2); 
+    ck_assert_uint_eq(mon->maxQueueSize, 3); 
+    queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
+    ck_assert_uint_eq(queueItem->value.hasStatus, false);
+
+    UA_ByteString_deleteMembers(&mon->lastSampledValue);
+    UA_MoniteredItem_SampleCallback(server, mon);
+    ck_assert_uint_eq(mon->currentQueueSize, 3); 
+    ck_assert_uint_eq(mon->maxQueueSize, 3); 
+    queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
+    ck_assert_uint_eq(queueItem->value.hasStatus, false);
+
+    UA_ByteString_deleteMembers(&mon->lastSampledValue);
+    UA_MoniteredItem_SampleCallback(server, mon);
+    ck_assert_uint_eq(mon->currentQueueSize, 3); 
+    ck_assert_uint_eq(mon->maxQueueSize, 3); 
+    queueItem = TAILQ_FIRST(&mon->queue);
+    ck_assert_uint_eq(queueItem->value.hasStatus, true);
+    ck_assert_uint_eq(queueItem->value.status, UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
+
+    /* Remove status for next test */
+    queueItem->value.hasStatus = false;
+    queueItem->value.status = 0;
+
+    /* Modify the MonitoredItem */
+    UA_ModifyMonitoredItemsRequest modifyMonitoredItemsRequest;
+    UA_ModifyMonitoredItemsRequest_init(&modifyMonitoredItemsRequest);
+    modifyMonitoredItemsRequest.subscriptionId = localSubscriptionId;
+    modifyMonitoredItemsRequest.timestampsToReturn = UA_TIMESTAMPSTORETURN_SERVER;
+    UA_MonitoredItemModifyRequest itemToModify;
+    UA_MonitoredItemModifyRequest_init(&itemToModify);
+    itemToModify.monitoredItemId = localMonitoredItemId;
+    UA_MonitoringParameters_init(&params);
+    itemToModify.requestedParameters = params;
+    itemToModify.requestedParameters.queueSize = 2;
+    itemToModify.requestedParameters.discardOldest = true;
+    modifyMonitoredItemsRequest.itemsToModifySize = 1;
+    modifyMonitoredItemsRequest.itemsToModify = &itemToModify;
+
+    UA_ModifyMonitoredItemsResponse modifyMonitoredItemsResponse;
+    UA_ModifyMonitoredItemsResponse_init(&modifyMonitoredItemsResponse);
+
+    Service_ModifyMonitoredItems(server, &adminSession, &modifyMonitoredItemsRequest, &modifyMonitoredItemsResponse);
+    ck_assert_uint_eq(modifyMonitoredItemsResponse.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(modifyMonitoredItemsResponse.resultsSize, 1);
+    ck_assert_uint_eq(modifyMonitoredItemsResponse.results[0].statusCode, UA_STATUSCODE_GOOD);
+
+    UA_MonitoredItemModifyRequest_deleteMembers(&itemToModify);
+    UA_ModifyMonitoredItemsResponse_deleteMembers(&modifyMonitoredItemsResponse);
+
+    ck_assert_uint_eq(mon->currentQueueSize, 2); 
+    ck_assert_uint_eq(mon->maxQueueSize, 2); 
+    queueItem = TAILQ_FIRST(&mon->queue);
+    ck_assert_uint_eq(queueItem->value.hasStatus, true);
+    ck_assert_uint_eq(queueItem->value.status, UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW);
+
+    /* Modify the MonitoredItem */
+    UA_ModifyMonitoredItemsRequest_init(&modifyMonitoredItemsRequest);
+    modifyMonitoredItemsRequest.subscriptionId = localSubscriptionId;
+    modifyMonitoredItemsRequest.timestampsToReturn = UA_TIMESTAMPSTORETURN_SERVER;
+    UA_MonitoredItemModifyRequest_init(&itemToModify);
+    itemToModify.monitoredItemId = localMonitoredItemId;
+    UA_MonitoringParameters_init(&params);
+    itemToModify.requestedParameters = params;
+    itemToModify.requestedParameters.queueSize = 1;
+    modifyMonitoredItemsRequest.itemsToModifySize = 1;
+    modifyMonitoredItemsRequest.itemsToModify = &itemToModify;
+
+    UA_ModifyMonitoredItemsResponse_init(&modifyMonitoredItemsResponse);
+
+    Service_ModifyMonitoredItems(server, &adminSession, &modifyMonitoredItemsRequest, &modifyMonitoredItemsResponse);
+    ck_assert_uint_eq(modifyMonitoredItemsResponse.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(modifyMonitoredItemsResponse.resultsSize, 1);
+    ck_assert_uint_eq(modifyMonitoredItemsResponse.results[0].statusCode, UA_STATUSCODE_GOOD);
+
+    UA_MonitoredItemModifyRequest_deleteMembers(&itemToModify);
+    UA_ModifyMonitoredItemsResponse_deleteMembers(&modifyMonitoredItemsResponse);
+
+    ck_assert_uint_eq(mon->currentQueueSize, 1); 
+    ck_assert_uint_eq(mon->maxQueueSize, 1); 
+    queueItem = TAILQ_LAST(&mon->queue, QueuedValueQueue);
+    ck_assert_uint_eq(queueItem->value.hasStatus, false);
+
+    /* Modify the MonitoredItem */
+    UA_ModifyMonitoredItemsRequest_init(&modifyMonitoredItemsRequest);
+    modifyMonitoredItemsRequest.subscriptionId = localSubscriptionId;
+    modifyMonitoredItemsRequest.timestampsToReturn = UA_TIMESTAMPSTORETURN_SERVER;
+    UA_MonitoredItemModifyRequest_init(&itemToModify);
+    itemToModify.monitoredItemId = localMonitoredItemId;
+    UA_MonitoringParameters_init(&params);
+    itemToModify.requestedParameters = params;
+    itemToModify.requestedParameters.discardOldest = false;
+    itemToModify.requestedParameters.queueSize = 1;
+    modifyMonitoredItemsRequest.itemsToModifySize = 1;
+    modifyMonitoredItemsRequest.itemsToModify = &itemToModify;
+
+    UA_ModifyMonitoredItemsResponse_init(&modifyMonitoredItemsResponse);
+
+    Service_ModifyMonitoredItems(server, &adminSession, &modifyMonitoredItemsRequest,
+                                 &modifyMonitoredItemsResponse);
+    ck_assert_uint_eq(modifyMonitoredItemsResponse.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(modifyMonitoredItemsResponse.resultsSize, 1);
+    ck_assert_uint_eq(modifyMonitoredItemsResponse.results[0].statusCode, UA_STATUSCODE_GOOD);
+
+    UA_MonitoredItemModifyRequest_deleteMembers(&itemToModify);
+    UA_ModifyMonitoredItemsResponse_deleteMembers(&modifyMonitoredItemsResponse);
+
+    UA_MoniteredItem_SampleCallback(server, mon);
+    ck_assert_uint_eq(mon->currentQueueSize, 1); 
+    ck_assert_uint_eq(mon->maxQueueSize, 1); 
+    queueItem = TAILQ_FIRST(&mon->queue);
+    ck_assert_uint_eq(queueItem->value.hasStatus, false); /* the infobit is only set if the queue is larger than one */
+
+    /* Remove the subscriptions */
+    UA_DeleteSubscriptionsRequest deleteSubscriptionsRequest;
+    UA_DeleteSubscriptionsRequest_init(&deleteSubscriptionsRequest);
+    UA_UInt32 removeId = localSubscriptionId;
+    deleteSubscriptionsRequest.subscriptionIdsSize = 1;
+    deleteSubscriptionsRequest.subscriptionIds = &removeId;
+
+    UA_DeleteSubscriptionsResponse deleteSubscriptionsResponse;
+    UA_DeleteSubscriptionsResponse_init(&deleteSubscriptionsResponse);
+
+    Service_DeleteSubscriptions(server, &adminSession, &deleteSubscriptionsRequest, &deleteSubscriptionsResponse);
+    ck_assert_uint_eq(deleteSubscriptionsResponse.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(deleteSubscriptionsResponse.resultsSize, 1);
+    ck_assert_uint_eq(deleteSubscriptionsResponse.results[0], UA_STATUSCODE_GOOD);
+
+    UA_DeleteSubscriptionsResponse_deleteMembers(&deleteSubscriptionsResponse);
+
+}
+END_TEST
+
 START_TEST(Server_setMonitoringMode) {
     UA_SetMonitoringModeRequest request;
     UA_SetMonitoringModeRequest_init(&request);
@@ -315,6 +518,7 @@ static Suite* testSuite_Client(void) {
     tcase_add_test(tc_server, Server_setPublishingMode);
     tcase_add_test(tc_server, Server_createMonitoredItems);
     tcase_add_test(tc_server, Server_modifyMonitoredItems);
+    tcase_add_test(tc_server, Server_overflow);
     tcase_add_test(tc_server, Server_setMonitoringMode);
     tcase_add_test(tc_server, Server_deleteMonitoredItems);
     tcase_add_test(tc_server, Server_republish);