ソースを参照

feature: allow limiting the retransmission queue in the server (fix

Julius Pfrommer 8 年 前
コミット
243237e882
共有5 個のファイルを変更した57 個の追加25 個の削除を含む
  1. 2 1
      include/ua_server.h
  2. 1 0
      plugins/ua_config_standard.c
  3. 1 11
      src/server/ua_services_subscription.c
  4. 45 12
      src/server/ua_subscription.c
  5. 8 1
      src/server/ua_subscription.h

+ 2 - 1
include/ua_server.h

@@ -131,10 +131,11 @@ typedef struct {
     UA_UInt32Range lifeTimeCountLimits;
     UA_UInt32Range keepAliveCountLimits;
     UA_UInt32 maxNotificationsPerPublish;
+    UA_UInt32 maxRetransmissionQueueSize; /* 0 -> unlimited size */
 
     /* Limits for MonitoredItems */
     UA_DoubleRange samplingIntervalLimits;
-    UA_UInt32Range queueSizeLimits;
+    UA_UInt32Range queueSizeLimits; /* Negotiated with the client */
 } UA_ServerConfig;
 
 /* Add a new namespace to the server. Returns the index of the new namespace */

+ 1 - 0
plugins/ua_config_standard.c

@@ -81,6 +81,7 @@ const UA_EXPORT UA_ServerConfig UA_ServerConfig_standard = {
     .lifeTimeCountLimits = { .max = 15000, .min = 3 },
     .keepAliveCountLimits = { .max = 100, .min = 1 },
     .maxNotificationsPerPublish = 1000,
+    .maxRetransmissionQueueSize = 0, /* unlimited */
 
     /* Limits for MonitoredItems */
     .samplingIntervalLimits = { .min = 50.0, .max = 24.0 * 3600.0 * 1000.0 },

+ 1 - 11
src/server/ua_services_subscription.c

@@ -439,17 +439,7 @@ Service_Publish(UA_Server *server, UA_Session *session,
             continue;
         }
         /* Remove the acked transmission from the retransmission queue */
-        response->results[i] = UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN;
-        UA_NotificationMessageEntry *pre, *pre_tmp;
-        TAILQ_FOREACH_SAFE(pre, &sub->retransmissionQueue, listEntry, pre_tmp) {
-            if(pre->message.sequenceNumber == ack->sequenceNumber) {
-                TAILQ_REMOVE(&sub->retransmissionQueue, pre, listEntry);
-                response->results[i] = UA_STATUSCODE_GOOD;
-                UA_NotificationMessage_deleteMembers(&pre->message);
-                UA_free(pre);
-                break;
-            }
-        }
+        response->results[i] = UA_Subscription_removeRetransmissionMessage(sub, ack->sequenceNumber);
     }
 
     /* Queue the publish response */

+ 45 - 12
src/server/ua_subscription.c

@@ -260,8 +260,9 @@ UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptio
     new->currentLifetimeCount = 0;
     new->lastMonitoredItemId = 0;
     new->state = UA_SUBSCRIPTIONSTATE_NORMAL; /* The first publish response is sent immediately */
-    TAILQ_INIT(&new->retransmissionQueue);
     LIST_INIT(&new->monitoredItems);
+    TAILQ_INIT(&new->retransmissionQueue);
+    new->retransmissionQueueSize = 0;
     return new;
 }
 
@@ -282,6 +283,7 @@ void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *ser
         UA_NotificationMessage_deleteMembers(&nme->message);
         UA_free(nme);
     }
+    subscription->retransmissionQueueSize = 0;
 }
 
 UA_MonitoredItem *
@@ -327,6 +329,40 @@ countQueuedNotifications(UA_Subscription *sub, UA_Boolean *moreNotifications) {
     return notifications;
 }
 
+static void
+UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub,
+                                         UA_NotificationMessageEntry *entry) {
+    /* Release the oldest entry if there is not enough space */
+    if(server->config.maxRetransmissionQueueSize > 0 &&
+       sub->retransmissionQueueSize >= server->config.maxRetransmissionQueueSize) {
+        UA_NotificationMessageEntry *lastentry =
+            TAILQ_LAST(&sub->retransmissionQueue, UA_ListOfNotificationMessages);
+        TAILQ_REMOVE(&sub->retransmissionQueue, lastentry, listEntry);
+        --sub->retransmissionQueueSize;
+        UA_NotificationMessage_deleteMembers(&lastentry->message);
+        UA_free(lastentry);
+    }
+
+    /* Add entry */
+    TAILQ_INSERT_HEAD(&sub->retransmissionQueue, entry, listEntry);
+    ++sub->retransmissionQueueSize;
+}
+
+UA_StatusCode
+UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber) {
+    UA_NotificationMessageEntry *entry, *entry_tmp;
+    TAILQ_FOREACH_SAFE(entry, &sub->retransmissionQueue, listEntry, entry_tmp) {
+        if(entry->message.sequenceNumber != sequenceNumber)
+            continue;
+        TAILQ_REMOVE(&sub->retransmissionQueue, entry, listEntry);
+        --sub->retransmissionQueueSize;
+        UA_NotificationMessage_deleteMembers(&entry->message);
+        UA_free(entry);
+        return UA_STATUSCODE_GOOD;
+    }
+    return UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN;
+}
+
 static UA_StatusCode
 prepareNotificationMessage(UA_Subscription *sub, UA_NotificationMessage *message,
                            size_t notifications) {
@@ -466,23 +502,20 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
          * be done here, so that the message itself is included in the available
          * sequence numbers for acknowledgement. */
         retransmission->message = response->notificationMessage;
-        TAILQ_INSERT_HEAD(&sub->retransmissionQueue, retransmission, listEntry);
+        UA_Subscription_addRetransmissionMessage(server, sub, retransmission);
     }
 
     /* Get the available sequence numbers from the retransmission queue */
-    size_t available = 0;
-    UA_NotificationMessageEntry *nme;
-    TAILQ_FOREACH(nme, &sub->retransmissionQueue, listEntry)
-        ++available;
-    // cppcheck-suppress knownConditionTrueFalse
+    size_t available = sub->retransmissionQueueSize;
     if(available > 0) {
         response->availableSequenceNumbers = UA_alloca(available * sizeof(UA_UInt32));
         response->availableSequenceNumbersSize = available;
-    }
-    size_t i = 0;
-    TAILQ_FOREACH(nme, &sub->retransmissionQueue, listEntry) {
-        response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
-        ++i;
+        size_t i = 0;
+        UA_NotificationMessageEntry *nme;
+        TAILQ_FOREACH(nme, &sub->retransmissionQueue, listEntry) {
+            response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
+            ++i;
+        }
     }
 
     /* Send the response */

+ 8 - 1
src/server/ua_subscription.h

@@ -83,7 +83,7 @@ struct UA_Subscription {
     UA_Session *session;
     UA_UInt32 lifeTimeCount;
     UA_UInt32 maxKeepAliveCount;
-    UA_Double publishingInterval;     // [ms]
+    UA_Double publishingInterval; /* in ms */
     UA_UInt32 subscriptionID;
     UA_UInt32 notificationsPerPublish;
     UA_Boolean publishingEnabled;
@@ -100,8 +100,12 @@ struct UA_Subscription {
     UA_Guid publishJobGuid;
     UA_Boolean publishJobIsRegistered;
 
+    /* MonitoredItems */
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem) monitoredItems;
+
+    /* Retransmission Queue */
     TAILQ_HEAD(UA_ListOfNotificationMessages, UA_NotificationMessageEntry) retransmissionQueue;
+    UA_UInt32 retransmissionQueueSize;
 };
 
 UA_Subscription *UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID);
@@ -118,6 +122,9 @@ UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemID
 
 void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub);
 
+UA_StatusCode
+UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber);
+
 void
 UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server,
                                                     UA_NodeId *sessionToken);