Browse Source

use sent subscription acks to find entries in the pending ack list, relates to #634

Julius Pfrommer 8 years ago
parent
commit
8016b42a35
2 changed files with 40 additions and 39 deletions
  1. 39 38
      src/client/ua_client_highlevel_subscriptions.c
  2. 1 1
      src/client/ua_client_internal.h

+ 39 - 38
src/client/ua_client_highlevel_subscriptions.c

@@ -199,7 +199,7 @@ UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscri
 }
 
 static void
-UA_Client_processPublishResponse(UA_Client *client, UA_PublishResponse *response) {
+UA_Client_processPublishResponse(UA_Client *client, UA_PublishRequest *request, UA_PublishResponse *response) {
     if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
         return;
 
@@ -216,53 +216,54 @@ UA_Client_processPublishResponse(UA_Client *client, UA_PublishResponse *response
                  "Processing a publish response on subscription %u with %u notifications",
                  sub->SubscriptionID, response->notificationMessage.notificationDataSize);
 
-    /* Check if the server has acknowledged any of our ACKS */
-    UA_Client_NotificationsAckNumber *ack=NULL, *tmpAck=NULL;
-    if(response->resultsSize>0){
-        // TODO: The acks should be attached to the subscription
-        size_t i = 0;
-        LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, tmpAck) {
-            if(response->results[i] == UA_STATUSCODE_GOOD ||
-               response->results[i] == UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN) {
-                LIST_REMOVE(ack, listEntry);
-                UA_free(ack);
-            }
-            i++;
+    /* Check if the server has acknowledged any of the sent ACKs */
+    for(size_t i = 0; i < response->resultsSize && i < request->subscriptionAcknowledgementsSize; i++) {
+        /* remove also acks that are unknown to the server */
+        if(response->results[i] != UA_STATUSCODE_GOOD &&
+           response->results[i] != UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN)
+            continue;
+
+        /* Remove the ack from the list */
+        UA_SubscriptionAcknowledgement *orig_ack = &request->subscriptionAcknowledgements[i];
+        UA_Client_NotificationsAckNumber *ack;
+        LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
+            if(ack->subAck.subscriptionId != orig_ack->subscriptionId ||
+               ack->subAck.sequenceNumber != orig_ack->sequenceNumber)
+                continue;
+            LIST_REMOVE(ack, listEntry);
+            UA_free(ack);
         }
     }
     
     /* Process the notification messages */
     UA_NotificationMessage *msg = &response->notificationMessage;
+    for(size_t k = 0; k < msg->notificationDataSize; k++) {
+        if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
+            continue;
 
-    if(response->notificationMessage.notificationDataSize>0){
-        for(size_t k = 0; k < msg->notificationDataSize; k++) {
-            if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
-                continue;
-
-            /* Currently only dataChangeNotifications are supported */
-            if(msg->notificationData[k].content.decoded.type != &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION])
-                continue;
+        /* Currently only dataChangeNotifications are supported */
+        if(msg->notificationData[k].content.decoded.type != &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION])
+            continue;
 
-            UA_DataChangeNotification *dataChangeNotification = msg->notificationData[k].content.decoded.data;
-            for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; j++) {
-                UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
-                UA_Client_MonitoredItem *mon;
-                LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
-                    if(mon->ClientHandle == mitemNot->clientHandle) {
-                        mon->handler(mon->MonitoredItemId, &mitemNot->value, mon->handlerContext);
-                        break;
-                    }
+        UA_DataChangeNotification *dataChangeNotification = msg->notificationData[k].content.decoded.data;
+        for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; j++) {
+            UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
+            UA_Client_MonitoredItem *mon;
+            LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+                if(mon->ClientHandle == mitemNot->clientHandle) {
+                    mon->handler(mon->MonitoredItemId, &mitemNot->value, mon->handlerContext);
+                    break;
                 }
-                if(!mon)
-                    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
-                                 "Could not process a notification with clienthandle %u on subscription %u",
-                                 mitemNot->clientHandle, sub->SubscriptionID);
             }
+            if(!mon)
+                UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                             "Could not process a notification with clienthandle %u on subscription %u",
+                             mitemNot->clientHandle, sub->SubscriptionID);
         }
     }
 
     /* Add to the list of pending acks */
-    tmpAck = UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
+    UA_Client_NotificationsAckNumber *tmpAck = UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
     tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
     tmpAck->subAck.subscriptionId = sub->SubscriptionID;
     LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
@@ -273,7 +274,7 @@ UA_StatusCode UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *clie
         return UA_STATUSCODE_BADSERVERNOTCONNECTED;
 
     UA_Boolean moreNotifications = true;
-    while(moreNotifications == true) {
+    while(moreNotifications) {
         UA_PublishRequest request;
         UA_PublishRequest_init(&request);
         request.subscriptionAcknowledgementsSize = 0;
@@ -288,7 +289,7 @@ UA_StatusCode UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *clie
                 return UA_STATUSCODE_GOOD;
         }
         
-        int index = 0 ;
+        int index = 0;
         LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
             request.subscriptionAcknowledgements[index].sequenceNumber = ack->subAck.sequenceNumber;
             request.subscriptionAcknowledgements[index].subscriptionId = ack->subAck.subscriptionId;
@@ -296,7 +297,7 @@ UA_StatusCode UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *clie
         }
         
         UA_PublishResponse response = UA_Client_Service_publish(client, request);
-        UA_Client_processPublishResponse(client, &response);
+        UA_Client_processPublishResponse(client, &request, &response);
         moreNotifications = response.moreNotifications;
         
         UA_PublishResponse_deleteMembers(&response);

+ 1 - 1
src/client/ua_client_internal.h

@@ -11,8 +11,8 @@
 #ifdef UA_ENABLE_SUBSCRIPTIONS
 
 typedef struct UA_Client_NotificationsAckNumber_s {
-    UA_SubscriptionAcknowledgement subAck;
     LIST_ENTRY(UA_Client_NotificationsAckNumber_s) listEntry;
+    UA_SubscriptionAcknowledgement subAck;
 } UA_Client_NotificationsAckNumber;
 
 typedef struct UA_Client_MonitoredItem_s {