Browse Source

fix LifetimeCount and call UA_Subscription_publishCallback if a publi… (#1556)

* fix LifetimeCount and call UA_Subscription_publishCallback if a publishResponse is present
StalderT 6 years ago
parent
commit
5c3630eab2

+ 29 - 28
src/server/ua_subscription.c

@@ -264,12 +264,31 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
                          "Subscription %u | Publish Callback",
                          sub->subscriptionId);
+    /* Dequeue a response */
+    UA_PublishResponseEntry *pre = UA_Session_getPublishReq(sub->session);
+    if(pre) {
+        sub->currentLifetimeCount = 0; /* Reset the lifetimecounter */
+    } else {
+        UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
+                             "Subscription %u | The publish queue is empty",
+                             sub->subscriptionId);
+        ++sub->currentLifetimeCount;
+
+        if(sub->currentLifetimeCount > sub->lifeTimeCount) {
+            UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
+                                 "Subscription %u | End of lifetime "
+                                 "for subscription", sub->subscriptionId);
+            UA_Session_deleteSubscription(server, sub->session, sub->subscriptionId);
+            /* TODO: send a StatusChangeNotification with Bad_Timeout */
+            return;
+        }
+    }
 
     /* Count the available notifications */
     UA_Boolean moreNotifications = false;
     size_t notifications = countQueuedNotifications(sub, &moreNotifications);
 
-    /* Return if nothing to do */
+    /* Return if no notifications and no keepalive */
     if(notifications == 0) {
         ++sub->currentKeepAliveCount;
         if(sub->currentKeepAliveCount < sub->maxKeepAliveCount)
@@ -279,35 +298,17 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
                              sub->subscriptionId);
     }
 
-    /* Check if the securechannel is valid */
+    /* Can we send a response? */
     UA_SecureChannel *channel = sub->session->header.channel;
-    if(!channel)
-        return;
-
-    /* Dequeue a response */
-    UA_PublishResponseEntry *pre = UA_Session_getPublishReq(sub->session);
-
-    /* Cannot publish without a response */
-    if(!pre) {
+    if(!channel || !pre) {
         UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                             "Subscription %u | Cannot send a publish "
-                             "response since the publish queue is empty",
-                             sub->subscriptionId);
-        if(sub->state != UA_SUBSCRIPTIONSTATE_LATE) {
-            sub->state = UA_SUBSCRIPTIONSTATE_LATE;
-        } else {
-            ++sub->currentLifetimeCount;
-            if(sub->currentLifetimeCount > sub->lifeTimeCount) {
-                UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                                     "Subscription %u | End of lifetime "
-                                     "for subscription", sub->subscriptionId);
-                UA_Session_deleteSubscription(server, sub->session,
-                                              sub->subscriptionId);
-            }
-        }
+                             "Subscription %u | Want to send a publish response;"
+                             "but not possible", sub->subscriptionId);
+        sub->state = UA_SUBSCRIPTIONSTATE_LATE;
         return;
     }
 
+    /* Prepare the response */
     UA_PublishResponse *response = &pre->response;
     UA_NotificationMessage *message = &response->notificationMessage;
     UA_NotificationMessageEntry *retransmission = NULL;
@@ -319,17 +320,18 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
             UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
                                    "Subscription %u | Could not allocate memory "
                                    "for retransmission", sub->subscriptionId);
+            sub->state = UA_SUBSCRIPTIONSTATE_LATE;
             return;
         }
 
         /* Prepare the response */
-        UA_StatusCode retval =
-            prepareNotificationMessage(sub, message, notifications);
+        UA_StatusCode retval = prepareNotificationMessage(sub, message, notifications);
         if(retval != UA_STATUSCODE_GOOD) {
             UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
                                    "Subscription %u | Could not prepare the "
                                    "notification message", sub->subscriptionId);
             UA_free(retransmission);
+            sub->state = UA_SUBSCRIPTIONSTATE_LATE;
             return;
         }
     }
@@ -386,7 +388,6 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     /* Reset subscription state to normal. */
     sub->state = UA_SUBSCRIPTIONSTATE_NORMAL;
     sub->currentKeepAliveCount = 0;
-    sub->currentLifetimeCount = 0;
 
     /* Free the response */
     UA_Array_delete(response->results, response->resultsSize,

+ 68 - 0
tests/client/check_client_subscriptions.c

@@ -179,6 +179,73 @@ START_TEST(Client_subscription_addMonitoredItems) {
 }
 END_TEST
 
+START_TEST(Client_subscription_keepAlive) {
+    UA_Client *client = UA_Client_new(UA_ClientConfig_default);
+    UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    UA_UInt32 subId;
+    retval = UA_Client_Subscriptions_new(client, UA_SubscriptionSettings_default, &subId);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    UA_MonitoredItemCreateRequest items[1];
+    UA_MonitoredItemHandlingFunction hfs[1];
+    void *hfContexts[1];
+    UA_StatusCode itemResults[1];
+    UA_UInt32 newMonitoredItemIds[1];
+
+    /* monitor the server state */
+    UA_MonitoredItemCreateRequest_init(&items[0]);
+    items[0].itemToMonitor.nodeId = UA_NODEID_NUMERIC(0, 2259);
+    items[0].itemToMonitor.attributeId = UA_ATTRIBUTEID_VALUE;
+    items[0].monitoringMode = UA_MONITORINGMODE_REPORTING;
+    items[0].requestedParameters.samplingInterval = 250;
+    items[0].requestedParameters.discardOldest = true;
+    items[0].requestedParameters.queueSize = 1;
+    hfs[0] = (UA_MonitoredItemHandlingFunction)(uintptr_t)monitoredItemHandler;
+    hfContexts[0] = NULL;
+
+    retval = UA_Client_Subscriptions_addMonitoredItems(client, subId, items, 1,
+                                                      hfs, hfContexts, itemResults,
+                                                      newMonitoredItemIds);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(itemResults[0], UA_STATUSCODE_GOOD);
+
+    UA_fakeSleep((UA_UInt32)UA_SubscriptionSettings_default.requestedPublishingInterval + 1);
+
+    UA_PublishRequest request;
+    UA_PublishRequest_init(&request);
+    request.subscriptionAcknowledgementsSize = 0;
+
+    UA_PublishResponse response = UA_Client_Service_publish(client, request);
+    ck_assert_uint_eq(response.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(response.notificationMessage.notificationDataSize, 1);
+    UA_PublishResponse_deleteMembers(&response);
+    UA_PublishRequest_deleteMembers(&request);
+
+    UA_fakeSleep((UA_UInt32)UA_SubscriptionSettings_default.requestedPublishingInterval + 1);
+
+    /* by default maxKeepAlive is set to 1 we must receive a response without notification message */
+    UA_PublishRequest_init(&request);
+    request.subscriptionAcknowledgementsSize = 0;
+
+    response = UA_Client_Service_publish(client, request);
+    ck_assert_uint_eq(response.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(response.notificationMessage.notificationDataSize, 0);
+    UA_PublishResponse_deleteMembers(&response);
+    UA_PublishRequest_deleteMembers(&request);
+
+    retval = UA_Client_Subscriptions_removeMonitoredItem(client, subId, newMonitoredItemIds[0]);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    retval = UA_Client_Subscriptions_remove(client, subId);
+    ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD);
+
+    UA_Client_disconnect(client);
+    UA_Client_delete(client);
+}
+END_TEST
+
 START_TEST(Client_subscription_connectionClose) {
     UA_Client *client = UA_Client_new(UA_ClientConfig_default);
     UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840");
@@ -273,6 +340,7 @@ static Suite* testSuite_Client(void) {
     tcase_add_test(tc_client, Client_subscription);
     tcase_add_test(tc_client, Client_subscription_connectionClose);
     tcase_add_test(tc_client, Client_subscription_addMonitoredItems);
+    tcase_add_test(tc_client, Client_subscription_keepAlive);
 #endif /* UA_ENABLE_SUBSCRIPTIONS */
 
     TCase *tc_client2 = tcase_create("Client Subscription + Method Call of GetMonitoredItmes");

+ 119 - 0
tests/server/check_services_subscriptions.c

@@ -506,6 +506,124 @@ START_TEST(Server_deleteMonitoredItems) {
 }
 END_TEST
 
+START_TEST(Server_lifeTimeCount) {
+    /* Create a subscription */
+    UA_CreateSubscriptionRequest request;
+    UA_CreateSubscriptionResponse response;
+
+    UA_CreateSubscriptionRequest_init(&request);
+    request.publishingEnabled = true;
+    request.requestedLifetimeCount = 3;
+    request.requestedMaxKeepAliveCount = 1;
+    UA_CreateSubscriptionResponse_init(&response);
+    Service_CreateSubscription(server, &adminSession, &request, &response);
+    ck_assert_uint_eq(response.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(response.revisedMaxKeepAliveCount, 1);
+    ck_assert_uint_eq(response.revisedLifetimeCount, 3);
+    UA_CreateSubscriptionResponse_deleteMembers(&response);
+
+    /* Create a second subscription */
+    UA_CreateSubscriptionRequest_init(&request);
+    request.publishingEnabled = true;
+    request.requestedLifetimeCount = 4;
+    request.requestedMaxKeepAliveCount = 2;
+    UA_CreateSubscriptionResponse_init(&response);
+    Service_CreateSubscription(server, &adminSession, &request, &response);
+    ck_assert_uint_eq(response.responseHeader.serviceResult, UA_STATUSCODE_GOOD);
+    ck_assert_uint_eq(response.revisedMaxKeepAliveCount, 2);
+    /* revisedLifetimeCount is revised to 3*MaxKeepAliveCount */
+    ck_assert_uint_eq(response.revisedLifetimeCount, 6);
+    UA_Double publishingInterval = response.revisedPublishingInterval;
+    ck_assert(publishingInterval > 0.0f);
+    UA_CreateSubscriptionResponse_deleteMembers(&response);
+
+    UA_Server_run_iterate(server, false);
+
+    UA_UInt32 count = 0;
+    UA_Subscription *sub;
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry){
+        ck_assert_uint_eq(sub->currentLifetimeCount, 0);
+        count++;
+    }
+    ck_assert_uint_eq(count, 2);
+
+    UA_fakeSleep((UA_UInt32)publishingInterval + 1);
+    UA_Server_run_iterate(server, false);
+
+    count = 0;
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry){
+        ck_assert_uint_eq(sub->currentLifetimeCount, 1);
+        count++;
+    }
+    ck_assert_uint_eq(count, 2);
+
+    /* Sleep until the publishing interval times out */
+    UA_fakeSleep((UA_UInt32)publishingInterval + 1);
+    UA_Server_run_iterate(server, false);
+
+    count = 0;
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry){
+        ck_assert_uint_eq(sub->currentLifetimeCount, 2);
+        count++;
+    }
+    ck_assert_uint_eq(count, 2);
+
+    /* Sleep until the publishing interval times out */
+    UA_fakeSleep((UA_UInt32)publishingInterval + 1);
+    UA_Server_run_iterate(server, false);
+
+    count = 0;
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry){
+        ck_assert_uint_eq(sub->currentLifetimeCount, 3);
+        count++;
+    }
+    ck_assert_uint_eq(count, 2);
+
+    /* Sleep until the publishing interval times out */
+    UA_fakeSleep((UA_UInt32)publishingInterval + 1);
+    UA_Server_run_iterate(server, false);
+
+    count = 0;
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry){
+        ck_assert_uint_eq(sub->currentLifetimeCount, 4);
+        count++;
+    }
+    ck_assert_uint_eq(count, 1);
+
+    /* Sleep until the publishing interval times out */
+    UA_fakeSleep((UA_UInt32)publishingInterval + 1);
+    UA_Server_run_iterate(server, false);
+
+    count = 0;
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry){
+        ck_assert_uint_eq(sub->currentLifetimeCount, 5);
+        count++;
+    }
+    ck_assert_uint_eq(count, 1);
+
+    /* Sleep until the publishing interval times out */
+    UA_fakeSleep((UA_UInt32)publishingInterval + 1);
+    UA_Server_run_iterate(server, false);
+
+    count = 0;
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry){
+        ck_assert_uint_eq(sub->currentLifetimeCount, 6);
+        count++;
+    }
+    ck_assert_uint_eq(count, 1);
+
+    /* Sleep until the publishing interval times out */
+    UA_fakeSleep((UA_UInt32)publishingInterval + 1);
+    UA_Server_run_iterate(server, false);
+
+    count = 0;
+    LIST_FOREACH(sub, &adminSession.serverSubscriptions, listEntry){
+        count++;
+    }
+    ck_assert_uint_eq(count, 0);
+}
+END_TEST
+
 #endif /* UA_ENABLE_SUBSCRIPTIONS */
 
 static Suite* testSuite_Client(void) {
@@ -525,6 +643,7 @@ static Suite* testSuite_Client(void) {
     tcase_add_test(tc_server, Server_deleteSubscription);
     tcase_add_test(tc_server, Server_republish_invalid);
     tcase_add_test(tc_server, Server_publishCallback);
+    tcase_add_test(tc_server, Server_lifeTimeCount);
 #endif /* UA_ENABLE_SUBSCRIPTIONS */
     suite_add_tcase(s, tc_server);