Browse Source

Subscriptions: Track number of retransmission messages for session

Julius Pfrommer 6 years ago
parent
commit
2c97d5914a
3 changed files with 37 additions and 9 deletions
  1. 1 0
      src/server/ua_session.h
  2. 35 8
      src/server/ua_subscription.c
  3. 1 1
      src/server/ua_subscription.h

+ 1 - 0
src/server/ua_session.h

@@ -58,6 +58,7 @@ typedef struct {
     SIMPLEQ_HEAD(UA_ListOfQueuedPublishResponses, UA_PublishResponseEntry) responseQueue;
     UA_UInt32        numSubscriptions;
     UA_UInt32        numPublishReq;
+    size_t           totalRetransmissionQueueSize; /* Retransmissions of all subscriptions */
 #endif
 } UA_Session;
 

+ 35 - 8
src/server/ua_subscription.c

@@ -63,8 +63,10 @@ UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub) {
         TAILQ_REMOVE(&sub->retransmissionQueue, nme, listEntry);
         UA_NotificationMessage_deleteMembers(&nme->message);
         UA_free(nme);
+        --sub->session->totalRetransmissionQueueSize;
+        --sub->retransmissionQueueSize;
     }
-    sub->retransmissionQueueSize = 0;
+    UA_assert(sub->retransmissionQueueSize == 0);
 
     UA_LOG_INFO_SESSION(server->config.logger, sub->session,
                         "Subscription %u | Deleted the Subscription",
@@ -114,22 +116,46 @@ UA_Subscription_addMonitoredItem(UA_Subscription *sub, UA_MonitoredItem *newMon)
     LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
 }
 
+static void
+removeOldestRetransmissionMessage(UA_Session *session) {
+    UA_NotificationMessageEntry *oldestEntry = NULL;
+    UA_Subscription *oldestSub = NULL;
+
+    UA_Subscription *sub;
+    LIST_FOREACH(sub, &session->serverSubscriptions, listEntry) {
+        UA_NotificationMessageEntry *first =
+            TAILQ_LAST(&sub->retransmissionQueue, ListOfNotificationMessages);
+        if(!first)
+            continue;
+        if(!oldestEntry || oldestEntry->message.publishTime > first->message.publishTime) {
+            oldestEntry = first;
+            oldestSub = sub;
+        }
+    }
+    UA_assert(oldestEntry);
+    UA_assert(oldestSub);
+
+    TAILQ_REMOVE(&oldestSub->retransmissionQueue, oldestEntry, listEntry);
+    UA_NotificationMessage_deleteMembers(&oldestEntry->message);
+    UA_free(oldestEntry);
+    --session->totalRetransmissionQueueSize;
+    --oldestSub->retransmissionQueueSize;
+}
+
 static void
 UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub,
                                          UA_NotificationMessageEntry *entry) {
     /* Release the oldest entry if there is not enough space */
     if(server->config.maxRetransmissionQueueSize > 0 &&
-       sub->retransmissionQueueSize >= server->config.maxRetransmissionQueueSize) {
-        UA_NotificationMessageEntry *lastentry =
-            TAILQ_LAST(&sub->retransmissionQueue, ListOfNotificationMessages);
-        TAILQ_REMOVE(&sub->retransmissionQueue, lastentry, listEntry);
-        --sub->retransmissionQueueSize;
-        UA_NotificationMessage_deleteMembers(&lastentry->message);
-        UA_free(lastentry);
+       sub->session->totalRetransmissionQueueSize >= server->config.maxRetransmissionQueueSize) {
+        UA_LOG_WARNING_SESSION(server->config.logger, sub->session, "Subscription %u | "
+                               "Retransmission queue overflow", sub->subscriptionId);
+        removeOldestRetransmissionMessage(sub->session);
     }
 
     /* Add entry */
     TAILQ_INSERT_TAIL(&sub->retransmissionQueue, entry, listEntry);
+    ++sub->session->totalRetransmissionQueueSize;
     ++sub->retransmissionQueueSize;
 }
 
@@ -146,6 +172,7 @@ UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequ
 
     /* Remove the retransmission message */
     TAILQ_REMOVE(&sub->retransmissionQueue, entry, listEntry);
+    --sub->session->totalRetransmissionQueueSize;
     --sub->retransmissionQueueSize;
     UA_NotificationMessage_deleteMembers(&entry->message);
     UA_free(entry);

+ 1 - 1
src/server/ua_subscription.h

@@ -206,7 +206,7 @@ struct UA_Subscription {
 
     /* Retransmission Queue */
     ListOfNotificationMessages retransmissionQueue;
-    UA_UInt32 retransmissionQueueSize;
+    size_t retransmissionQueueSize;
 };
 
 UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId);