Browse Source

simplify publish request queueing

Julius Pfrommer 7 years ago
parent
commit
5975e602b6

+ 65 - 53
src/server/ua_services_subscription.c

@@ -70,7 +70,7 @@ Service_CreateSubscription(UA_Server *server, UA_Session *session,
                            UA_CreateSubscriptionResponse *response) {
     /* Check limits for the number of subscriptions */
     if((server->config.maxSubscriptionsPerSession != 0) &&
-       (UA_Session_getNumSubscriptions(session) >= server->config.maxSubscriptionsPerSession)) {
+       (session->numPublishReq >= server->config.maxSubscriptionsPerSession)) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS;
         return;
     }
@@ -83,8 +83,8 @@ Service_CreateSubscription(UA_Server *server, UA_Session *session,
         response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
         return;
     }
-    newSubscription->subscriptionId = UA_Session_getUniqueSubscriptionId(session);
-    UA_Session_addSubscription(session, newSubscription);
+
+    UA_Session_addSubscription(session, newSubscription); /* Also assigns the subscription id */
 
     /* Set the subscription parameters */
     newSubscription->publishingEnabled = request->publishingEnabled;
@@ -389,8 +389,7 @@ static void
 Operation_SetMonitoringMode(UA_Server *server, UA_Session *session,
                             struct setMonitoringContext *smc,
                             UA_UInt32 *monitoredItemId, UA_StatusCode *result) {
-    UA_MonitoredItem *mon =
-        UA_Subscription_getMonitoredItem(smc->sub, *monitoredItemId);
+    UA_MonitoredItem *mon = UA_Subscription_getMonitoredItem(smc->sub, *monitoredItemId);
     if(!mon) {
         *result = UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
         return;
@@ -401,7 +400,7 @@ Operation_SetMonitoringMode(UA_Server *server, UA_Session *session,
         return;
     }
 
-    /* check monitoringMode is valid or not */
+    /* Check if the MonitoringMode is valid or not */
     if(smc->monitoringMode > UA_MONITORINGMODE_REPORTING) {
         *result = UA_STATUSCODE_BADMONITORINGMODEINVALID;
         return;
@@ -458,9 +457,10 @@ Service_SetMonitoringMode(UA_Server *server, UA_Session *session,
 
     smc.monitoringMode = request->monitoringMode;
     response->responseHeader.serviceResult =
-        UA_Server_processServiceOperations(server, session, (UA_ServiceOperation)Operation_SetMonitoringMode, &smc,
-                                           &request->monitoredItemIdsSize, &UA_TYPES[UA_TYPES_UINT32],
-                                           &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]);
+        UA_Server_processServiceOperations(server, session,
+                  (UA_ServiceOperation)Operation_SetMonitoringMode, &smc,
+                  &request->monitoredItemIdsSize, &UA_TYPES[UA_TYPES_UINT32],
+                  &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]);
 }
 
 /* TODO: Unify with senderror in ua_server_binary.c */
@@ -479,8 +479,7 @@ subscriptionSendError(UA_SecureChannel *channel, UA_UInt32 requestHandle,
 void
 Service_Publish(UA_Server *server, UA_Session *session,
                 const UA_PublishRequest *request, UA_UInt32 requestId) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, session,
-                         "Processing PublishRequest");
+    UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing PublishRequest");
 
     /* Return an error if the session has no subscription */
     if(LIST_EMPTY(&session->serverSubscriptions)) {
@@ -493,7 +492,7 @@ Service_Publish(UA_Server *server, UA_Session *session,
      * resources for the new publish request. If the limit has been reached the
      * oldest publish request shall be responded */
     if((server->config.maxPublishReqPerSession != 0) &&
-       (UA_Session_getNumPublishReq(session) >= server->config.maxPublishReqPerSession)) {
+       (session->numPublishReq >= server->config.maxPublishReqPerSession)) {
         if(!UA_Subscription_reachedPublishReqLimit(server, session)) {
             subscriptionSendError(session->header.channel, requestId,
                                   request->requestHeader.requestHandle,
@@ -502,6 +501,7 @@ Service_Publish(UA_Server *server, UA_Session *session,
         }
     }
 
+    /* Allocate the response to store it in the retransmission queue */
     UA_PublishResponseEntry *entry = (UA_PublishResponseEntry *)
         UA_malloc(sizeof(UA_PublishResponseEntry));
     if(!entry) {
@@ -510,12 +510,14 @@ Service_Publish(UA_Server *server, UA_Session *session,
                               UA_STATUSCODE_BADOUTOFMEMORY);
         return;
     }
-    entry->requestId = requestId;
 
-    /* Build the response */
+    /* Prepare the response */
+    entry->requestId = requestId;
     UA_PublishResponse *response = &entry->response;
     UA_PublishResponse_init(response);
     response->responseHeader.requestHandle = request->requestHeader.requestHandle;
+
+    /* Allocate the results array to acknowledge the acknowledge */
     if(request->subscriptionAcknowledgementsSize > 0) {
         response->results = (UA_StatusCode *)
             UA_Array_new(request->subscriptionAcknowledgementsSize,
@@ -542,48 +544,59 @@ Service_Publish(UA_Server *server, UA_Session *session,
             continue;
         }
         /* Remove the acked transmission from the retransmission queue */
-        response->results[i] =
-            UA_Subscription_removeRetransmissionMessage(sub, ack->sequenceNumber);
+        response->results[i] = UA_Subscription_removeRetransmissionMessage(sub, ack->sequenceNumber);
     }
 
-    /* Queue the publish response */
-    UA_Session_addPublishReq(session, entry);
+    /* Queue the publish response. It will be dequeued in a repeated publish
+     * callback. This can also be triggered right now for a late
+     * subscription. */
+    UA_Session_queuePublishReq(session, entry, false);
     UA_LOG_DEBUG_SESSION(server->config.logger, session, "Queued a publication message");
 
-    /* Answer immediately to a late subscription */
-    UA_Subscription *immediate;
-    UA_Boolean found = true;
-    int loopCount = 1;
+    /* If there are late subscriptions, the new publish request is used to
+     * answer them immediately. However, a single subscription that generates
+     * many notifications must not "starve" other late subscriptions. Therefore
+     * we keep track of the last subscription that got preferential treatment.
+     * We start searching for late subscriptions **after** the last one. */
 
+    UA_Subscription *immediate = NULL;
     if(session->lastSeenSubscriptionId > 0) {
-        /* If we found anything one the first loop or if there are LATE 
-         * in the list before lastSeenSubscriptionId and not LATE after 
-         * lastSeenSubscriptionId we need a second loop. */
-        loopCount = 2;
-        /* We must find the last seen subscription id  */
-        found = false;
-    }
-
-    for(int i = 0; i < loopCount; i++) {
         LIST_FOREACH(immediate, &session->serverSubscriptions, listEntry) {
-            if(!found) {
-                if(session->lastSeenSubscriptionId == immediate->subscriptionId) {
-                    found = true;
-                }
-            } else {
-                if(immediate->state == UA_SUBSCRIPTIONSTATE_LATE) {
-                    session->lastSeenSubscriptionId = immediate->subscriptionId;
-                    UA_LOG_DEBUG_SESSION(server->config.logger, session,
-                                         "Subscription %u | Response on a late subscription",
-                                         immediate->subscriptionId);
-                    UA_Subscription_publishCallback(server, immediate);
-                    return;
-                }
+            if(immediate->subscriptionId == session->lastSeenSubscriptionId) {
+                immediate = LIST_NEXT(immediate, listEntry);
+                break;
             }
         }
-        /* after the first loop, we can publish the first subscription with UA_SUBSCRIPTIONSTATE_LATE */
+    }
+
+    /* If no entry was found, start at the beginning and don't restart  */
+    UA_Boolean found = false;
+    if(!immediate)
+        immediate = LIST_FIRST(&session->serverSubscriptions);
+    else
         found = true;
+
+ repeat:
+    while(immediate) {
+        if(immediate->state == UA_SUBSCRIPTIONSTATE_LATE) {
+            session->lastSeenSubscriptionId = immediate->subscriptionId;
+            UA_LOG_DEBUG_SESSION(server->config.logger, session,
+                                 "Subscription %u | Response on a late subscription",
+                                 immediate->subscriptionId);
+            UA_Subscription_publish(server, immediate);
+            return;
+        }
+        immediate = LIST_NEXT(immediate, listEntry);
+    }
+
+    /* Restart at the beginning of the list */
+    if(found) {
+        immediate = LIST_FIRST(&session->serverSubscriptions);
+        found = false;
+        goto repeat;
     }
+
+    /* No late subscription this time */
     session->lastSeenSubscriptionId = 0;
 }
 
@@ -610,9 +623,9 @@ Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
 
     response->responseHeader.serviceResult =
         UA_Server_processServiceOperations(server, session,
-                                           (UA_ServiceOperation)Operation_DeleteSubscription, NULL,
-                                           &request->subscriptionIdsSize, &UA_TYPES[UA_TYPES_UINT32],
-                                           &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]);
+                  (UA_ServiceOperation)Operation_DeleteSubscription, NULL,
+                  &request->subscriptionIdsSize, &UA_TYPES[UA_TYPES_UINT32],
+                  &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]);
 
     /* The session has at least one subscription */
     if(LIST_FIRST(&session->serverSubscriptions))
@@ -652,14 +665,13 @@ Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
 
     response->responseHeader.serviceResult =
         UA_Server_processServiceOperations(server, session,
-                                           (UA_ServiceOperation)Operation_DeleteMonitoredItem, sub,
-                                           &request->monitoredItemIdsSize, &UA_TYPES[UA_TYPES_UINT32],
-                                           &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]);
+                  (UA_ServiceOperation)Operation_DeleteMonitoredItem, sub,
+                  &request->monitoredItemIdsSize, &UA_TYPES[UA_TYPES_UINT32],
+                  &response->resultsSize, &UA_TYPES[UA_TYPES_STATUSCODE]);
 }
 
 void
-Service_Republish(UA_Server *server, UA_Session *session,
-                  const UA_RepublishRequest *request,
+Service_Republish(UA_Server *server, UA_Session *session, const UA_RepublishRequest *request,
                   UA_RepublishResponse *response) {
     UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing RepublishRequest");
 

+ 14 - 30
src/server/ua_session.c

@@ -74,8 +74,7 @@ void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server* server) {
     }
 
     UA_PublishResponseEntry *entry;
-    while((entry = UA_Session_getPublishReq(session))) {
-        UA_Session_removePublishReq(session,entry);
+    while((entry = UA_Session_dequeuePublishReq(session))) {
         UA_PublishResponse_deleteMembers(&entry->response);
         UA_free(entry);
     }
@@ -121,8 +120,10 @@ void UA_Session_updateLifetime(UA_Session *session) {
 #ifdef UA_ENABLE_SUBSCRIPTIONS
 
 void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscription) {
-    session->numSubscriptions++;
+    newSubscription->subscriptionId = ++session->lastSubscriptionId;
+
     LIST_INSERT_HEAD(&session->serverSubscriptions, newSubscription, listEntry);
+    session->numSubscriptions++;
 }
 
 UA_StatusCode
@@ -151,11 +152,6 @@ UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
     return UA_STATUSCODE_GOOD;
 }
 
-UA_UInt32
-UA_Session_getNumSubscriptions( UA_Session *session ) {
-   return session->numSubscriptions;
-}
-
 UA_Subscription *
 UA_Session_getSubscriptionById(UA_Session *session, UA_UInt32 subscriptionId) {
     UA_Subscription *sub;
@@ -166,34 +162,22 @@ UA_Session_getSubscriptionById(UA_Session *session, UA_UInt32 subscriptionId) {
     return sub;
 }
 
-UA_UInt32 UA_Session_getUniqueSubscriptionId(UA_Session *session) {
-    return ++(session->lastSubscriptionId);
-}
-
-UA_UInt32
-UA_Session_getNumPublishReq(UA_Session *session) {
-    return session->numPublishReq;
-}
-
 UA_PublishResponseEntry*
-UA_Session_getPublishReq(UA_Session *session) {
-    return SIMPLEQ_FIRST(&session->responseQueue);
-}
-
-void
-UA_Session_removePublishReq( UA_Session *session, UA_PublishResponseEntry* entry) {
-    UA_PublishResponseEntry* firstEntry;
-    firstEntry = SIMPLEQ_FIRST(&session->responseQueue);
-
-    /* Remove the response from the response queue */
-    if((firstEntry != 0) && (firstEntry == entry)) {
+UA_Session_dequeuePublishReq(UA_Session *session) {
+    UA_PublishResponseEntry* entry = SIMPLEQ_FIRST(&session->responseQueue);
+    if(entry) {
         SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry);
         session->numPublishReq--;
     }
+    return entry;
 }
 
-void UA_Session_addPublishReq( UA_Session *session, UA_PublishResponseEntry* entry) {
-    SIMPLEQ_INSERT_TAIL(&session->responseQueue, entry, listEntry);
+void
+UA_Session_queuePublishReq(UA_Session *session, UA_PublishResponseEntry* entry, UA_Boolean head) {
+    if(!head)
+        SIMPLEQ_INSERT_TAIL(&session->responseQueue, entry, listEntry);
+    else
+        SIMPLEQ_INSERT_HEAD(&session->responseQueue, entry, listEntry);
     session->numPublishReq++;
 }
 

+ 5 - 25
src/server/ua_session.h

@@ -85,32 +85,12 @@ void UA_Session_updateLifetime(UA_Session *session);
  * --------------------- */
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS
-void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscription);
-
-UA_UInt32
-UA_Session_getNumSubscriptions(UA_Session *session );
-
-UA_Subscription *
-UA_Session_getSubscriptionById(UA_Session *session, UA_UInt32 subscriptionId);
-
-UA_StatusCode
-UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
-                              UA_UInt32 subscriptionId);
-
-UA_UInt32
-UA_Session_getUniqueSubscriptionId(UA_Session *session);
 
-UA_UInt32
-UA_Session_getNumPublishReq(UA_Session *session);
-
-UA_PublishResponseEntry*
-UA_Session_getPublishReq(UA_Session *session);
-
-void
-UA_Session_removePublishReq(UA_Session *session, UA_PublishResponseEntry* entry);
-
-void
-UA_Session_addPublishReq(UA_Session *session, UA_PublishResponseEntry* entry);
+void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscription);
+UA_Subscription * UA_Session_getSubscriptionById(UA_Session *session, UA_UInt32 subscriptionId);
+UA_StatusCode UA_Session_deleteSubscription(UA_Server *server, UA_Session *session, UA_UInt32 subscriptionId);
+void UA_Session_queuePublishReq(UA_Session *session, UA_PublishResponseEntry* entry, UA_Boolean head);
+UA_PublishResponseEntry* UA_Session_dequeuePublishReq(UA_Session *session);
 
 #endif
 

+ 54 - 54
src/server/ua_subscription.c

@@ -61,8 +61,7 @@ UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub) {
 }
 
 UA_MonitoredItem *
-UA_Subscription_getMonitoredItem(UA_Subscription *sub,
-                                 UA_UInt32 monitoredItemId) {
+UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemId) {
     UA_MonitoredItem *mon;
     LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
         if(mon->monitoredItemId == monitoredItemId)
@@ -115,8 +114,7 @@ UA_Subscription_addRetransmissionMessage(UA_Server *server, UA_Subscription *sub
 }
 
 UA_StatusCode
-UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub,
-                                            UA_UInt32 sequenceNumber) {
+UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber) {
     /* Find the retransmission message */
     UA_NotificationMessageEntry *entry;
     TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
@@ -167,8 +165,7 @@ moveNotificationsFromMonitoredItems(UA_Subscription *sub, UA_MonitoredItemNotifi
 }
 
 static UA_StatusCode
-prepareNotificationMessage(UA_Subscription *sub,
-                           UA_NotificationMessage *message,
+prepareNotificationMessage(UA_Subscription *sub, UA_NotificationMessage *message,
                            size_t notifications) {
     /* Array of ExtensionObject to hold different kinds of notifications
      * (currently only DataChangeNotifications) */
@@ -215,15 +212,20 @@ UA_Subscription_nextSequenceNumber(UA_UInt32 sequenceNumber) {
     return nextSequenceNumber;
 }
 
+static void
+publishCallback(UA_Server *server, UA_Subscription *sub) {
+    sub->readyNotifications = sub->notificationQueueSize;
+    UA_Subscription_publish(server, sub);
+}
+
 void
-UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                         "Subscription %u | Publish Callback",
-                         sub->subscriptionId);
+UA_Subscription_publish(UA_Server *server, UA_Subscription *sub) {
+    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
+                         "Publish Callback", sub->subscriptionId);
     /* Dequeue a response */
-    UA_PublishResponseEntry *pre = UA_Session_getPublishReq(sub->session);
+    UA_PublishResponseEntry *pre = UA_Session_dequeuePublishReq(sub->session);
     if(pre) {
-        sub->currentLifetimeCount = 0; /* Reset the lifetimecounter */
+        sub->currentLifetimeCount = 0; /* Reset the LifetimeCounter */
     } else {
         UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
                              "Subscription %u | The publish queue is empty",
@@ -240,12 +242,15 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         }
     }
 
+    if (sub->readyNotifications > sub->notificationQueueSize)
+        sub->readyNotifications = sub->notificationQueueSize;
+
     /* Count the available notifications */
-    UA_Boolean moreNotifications = false;
-    size_t notifications = sub->notificationQueueSize;
+    UA_UInt32 notifications = sub->readyNotifications;
     if(!sub->publishingEnabled)
         notifications = 0;
 
+    UA_Boolean moreNotifications = false;
     if(notifications > sub->notificationsPerPublish) {
         notifications = sub->notificationsPerPublish;
         moreNotifications = true;
@@ -254,20 +259,25 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     /* Return if no notifications and no keepalive */
     if(notifications == 0) {
         ++sub->currentKeepAliveCount;
-        if(sub->currentKeepAliveCount < sub->maxKeepAliveCount)
+        if(sub->currentKeepAliveCount < sub->maxKeepAliveCount) {
+            if(pre)
+                UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
             return;
+        }
         UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
                              "Subscription %u | Sending a KeepAlive",
                              sub->subscriptionId);
     }
 
-    /* Can we send a response? */
+    /* We want to send a response. Is it possible? */
     UA_SecureChannel *channel = sub->session->header.channel;
     if(!channel || !pre) {
         UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                             "Subscription %u | Want to send a publish response;"
-                             "but not possible", sub->subscriptionId);
+                             "Subscription %u | Want to send a publish response but can't. "
+                             "The subscription is late.", sub->subscriptionId);
         sub->state = UA_SUBSCRIPTIONSTATE_LATE;
+        if(pre)
+            UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
         return;
     }
 
@@ -277,13 +287,13 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     UA_NotificationMessageEntry *retransmission = NULL;
     if(notifications > 0) {
         /* Allocate the retransmission entry */
-        retransmission = (UA_NotificationMessageEntry*)
-            UA_malloc(sizeof(UA_NotificationMessageEntry));
+        retransmission = (UA_NotificationMessageEntry*)UA_malloc(sizeof(UA_NotificationMessageEntry));
         if(!retransmission) {
             UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
-                                   "Subscription %u | Could not allocate memory "
-                                   "for retransmission", sub->subscriptionId);
+                                   "Subscription %u | Could not allocate memory for retransmission. "
+                                   "The subscription is late.", sub->subscriptionId);
             sub->state = UA_SUBSCRIPTIONSTATE_LATE;
+            UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
             return;
         }
 
@@ -291,18 +301,20 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         UA_StatusCode retval = prepareNotificationMessage(sub, message, notifications);
         if(retval != UA_STATUSCODE_GOOD) {
             UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
-                                   "Subscription %u | Could not prepare the "
-                                   "notification message", sub->subscriptionId);
+                                   "Subscription %u | Could not prepare the notification message. "
+                                   "The subscription is late.", sub->subscriptionId);
             UA_free(retransmission);
             sub->state = UA_SUBSCRIPTIONSTATE_LATE;
+            UA_Session_queuePublishReq(sub->session, pre, true); /* Re-enqueue */
             return;
         }
     }
 
     /* <-- The point of no return --> */
 
-    /* Remove the response from the response queue */
-    UA_Session_removePublishReq(sub->session, pre);
+    /* Adjust the number of ready notifications */
+    UA_assert(sub->readyNotifications >= notifications);
+    sub->readyNotifications -= notifications;
 
     /* Set up the response */
     response->responseHeader.timestamp = UA_DateTime_now();
@@ -353,22 +365,20 @@ UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     sub->currentKeepAliveCount = 0;
 
     /* Free the response */
-    UA_Array_delete(response->results, response->resultsSize,
-                    &UA_TYPES[UA_TYPES_UINT32]);
+    UA_Array_delete(response->results, response->resultsSize, &UA_TYPES[UA_TYPES_UINT32]);
     UA_free(pre); /* No need for UA_PublishResponse_deleteMembers */
 
     /* Repeat sending responses if there are more notifications to send */
     if(moreNotifications)
-        UA_Subscription_publishCallback(server, sub);
+        UA_Subscription_publish(server, sub);
 }
 
 UA_Boolean
 UA_Subscription_reachedPublishReqLimit(UA_Server *server,  UA_Session *session) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, session,
-                         "Reached number of publish request limit");
+    UA_LOG_DEBUG_SESSION(server->config.logger, session, "Reached number of publish request limit");
 
     /* Dequeue a response */
-    UA_PublishResponseEntry *pre = UA_Session_getPublishReq(session);
+    UA_PublishResponseEntry *pre = UA_Session_dequeuePublishReq(session);
 
     /* Cannot publish without a response */
     if(!pre) {
@@ -376,13 +386,10 @@ UA_Subscription_reachedPublishReqLimit(UA_Server *server,  UA_Session *session)
         return false;
     }
 
-    UA_PublishResponse *response = &pre->response;
-    UA_NotificationMessage *message = &response->notificationMessage;
-
     /* <-- The point of no return --> */
 
-    /* Remove the response from the response queue */
-    UA_Session_removePublishReq(session, pre);
+    UA_PublishResponse *response = &pre->response;
+    UA_NotificationMessage *message = &response->notificationMessage;
 
     /* Set up the response. Note that this response has no related subscription id */
     response->responseHeader.timestamp = UA_DateTime_now();
@@ -397,12 +404,10 @@ UA_Subscription_reachedPublishReqLimit(UA_Server *server,  UA_Session *session)
     UA_LOG_DEBUG_SESSION(server->config.logger, session,
                          "Sending out a publish response triggered by too many publish requests");
     UA_SecureChannel_sendSymmetricMessage(session->header.channel, pre->requestId,
-                                          UA_MESSAGETYPE_MSG, response,
-                                          &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
+                     UA_MESSAGETYPE_MSG, response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
 
     /* Free the response */
-    UA_Array_delete(response->results, response->resultsSize,
-                    &UA_TYPES[UA_TYPES_UINT32]);
+    UA_Array_delete(response->results, response->resultsSize, &UA_TYPES[UA_TYPES_UINT32]);
     UA_free(pre); /* no need for UA_PublishResponse_deleteMembers */
 
     return true;
@@ -418,7 +423,7 @@ Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) {
         return UA_STATUSCODE_GOOD;
 
     UA_StatusCode retval =
-        UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_Subscription_publishCallback,
+        UA_Server_addRepeatedCallback(server, (UA_ServerCallback)publishCallback,
                                       sub, (UA_UInt32)sub->publishingInterval, &sub->publishCallbackId);
     if(retval != UA_STATUSCODE_GOOD)
         return retval;
@@ -429,15 +434,13 @@ Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) {
 
 UA_StatusCode
 Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                         "Subscription %u | Unregister subscription "
-                         "publishing callback", sub->subscriptionId);
+    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
+                         "Unregister subscription publishing callback", sub->subscriptionId);
 
     if(!sub->publishCallbackIsRegistered)
         return UA_STATUSCODE_GOOD;
 
-    UA_StatusCode retval =
-        UA_Server_removeRepeatedCallback(server, sub->publishCallbackId);
+    UA_StatusCode retval = UA_Server_removeRepeatedCallback(server, sub->publishCallbackId);
     if(retval != UA_STATUSCODE_GOOD)
         return retval;
 
@@ -448,22 +451,19 @@ Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub)
 /* When the session has publish requests stored but the last subscription is
  * deleted... Send out empty responses */
 void
-UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server,
-                                                    UA_Session *session) {
+UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server, UA_Session *session) {
     /* No session or there are remaining subscriptions */
     if(!session || LIST_FIRST(&session->serverSubscriptions))
         return;
 
     /* Send a response for every queued request */
     UA_PublishResponseEntry *pre;
-    while((pre = UA_Session_getPublishReq(session))) {
-        UA_Session_removePublishReq(session, pre);
+    while((pre = UA_Session_dequeuePublishReq(session))) {
         UA_PublishResponse *response = &pre->response;
         response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
         response->responseHeader.timestamp = UA_DateTime_now();
-        UA_SecureChannel_sendSymmetricMessage(session->header.channel, pre->requestId,
-                                              UA_MESSAGETYPE_MSG, response,
-                                              &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
+        UA_SecureChannel_sendSymmetricMessage(session->header.channel, pre->requestId, UA_MESSAGETYPE_MSG,
+                                              response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
         UA_PublishResponse_deleteMembers(response);
         UA_free(pre);
     }

+ 2 - 1
src/server/ua_subscription.h

@@ -154,6 +154,7 @@ struct UA_Subscription {
     /* Global list of notifications from the MonitoredItems */
     NotificationQueue notificationQueue;
     UA_UInt32 notificationQueueSize;
+    UA_UInt32 readyNotifications; /* Notifications to be sent out now (already late) */
 
     /* Retransmission Queue */
     ListOfNotificationMessages retransmissionQueue;
@@ -171,7 +172,7 @@ UA_StatusCode
 UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
                                     UA_UInt32 monitoredItemId);
 
-void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub);
+void UA_Subscription_publish(UA_Server *server, UA_Subscription *sub);
 UA_StatusCode UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber);
 void UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server, UA_Session *session);
 UA_Boolean UA_Subscription_reachedPublishReqLimit(UA_Server *server,  UA_Session *session);

+ 34 - 25
src/server/ua_subscription_datachange.c

@@ -68,51 +68,60 @@ MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
     UA_Server_delayedFree(server, monitoredItem);
 }
 
-void
-MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
-    UA_Boolean valueDiscarded = false;
-    UA_Subscription *sub = mon->subscription;
+void MonitoredItem_ensureQueueSpace(UA_MonitoredItem *mon) {
+    if(mon->queueSize <= mon->maxQueueSize)
+        return;
 
     /* Remove notifications until the queue size is reached */
+    UA_Subscription *sub = mon->subscription;
     while(mon->queueSize > mon->maxQueueSize) {
-        /* maxQueuesize is at least 1 */
-        UA_assert(mon->queueSize >= 2);
-
-        /* Get the item to remove. New items are added to the end */
-        UA_Notification *notification = NULL;
+        UA_assert(mon->queueSize >= 2); /* At least two Notifications in the queue */
+
+        /* Make sure that the MonitoredItem does not lose its place in the
+         * global queue when notifications are removed. Otherwise the
+         * MonitoredItem can "starve" itself by putting new notifications always
+         * at the end of the global queue and removing the old ones.
+         *
+         * - If the oldest notification is removed, put the second oldest
+         *   notification right behind it.
+         * - If the newest notification is removed, put the new notification
+         *   right behind it. */
+
+        UA_Notification *del; /* The notification that will be deleted */
+        UA_Notification *after_del; /* The notification to keep and move after del */
         if(mon->discardOldest) {
             /* Remove the oldest */
-            notification = TAILQ_FIRST(&mon->queue);
+            del = TAILQ_FIRST(&mon->queue);
+            after_del = TAILQ_NEXT(del, listEntry);
         } else {
-            /* Keep the newest, remove the second-newest */
-            notification = TAILQ_LAST(&mon->queue, NotificationQueue);
-            notification = TAILQ_PREV(notification, NotificationQueue, listEntry);
+            /* Remove the second newest (to keep the up-to-date notification) */
+            after_del = TAILQ_LAST(&mon->queue, NotificationQueue);
+            del = TAILQ_PREV(after_del, NotificationQueue, listEntry);
         }
-        UA_assert(notification);
 
-        /* Remove the item */
-        TAILQ_REMOVE(&mon->queue, notification, listEntry);
-        TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
+        /* Move after_del right after del in the global queue */
+        TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
+        TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
+
+        /* Remove the notification from the queues */
+        TAILQ_REMOVE(&mon->queue, del, listEntry);
+        TAILQ_REMOVE(&sub->notificationQueue, del, globalEntry);
         --mon->queueSize;
         --sub->notificationQueueSize;
 
         /* Free the notification */
         if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
-            UA_DataValue_deleteMembers(&notification->data.value);
+            UA_DataValue_deleteMembers(&del->data.value);
         } else {
             /* TODO: event implemantation */
         }
 
         /* Work around a false positive in clang analyzer */
 #ifndef __clang_analyzer__
-        UA_free(notification);
+        UA_free(del);
 #endif
-        valueDiscarded = true;
     }
 
-    if(!valueDiscarded)
-        return;
-            
     if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
         /* Get the element that carries the infobits */
         UA_Notification *notification = NULL;
@@ -266,13 +275,13 @@ sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
     UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
     monitoredItem->lastSampledValue = *valueEncoding;
 
-    /* Add the sample to the queue for publication */
+    /* Add the notification to the end of local and global queue */
     TAILQ_INSERT_TAIL(&monitoredItem->queue, newNotification, listEntry);
     TAILQ_INSERT_TAIL(&sub->notificationQueue, newNotification, globalEntry);
     ++monitoredItem->queueSize;
     ++sub->notificationQueueSize;
 
-    /* Remove entries from the queue if required and add the sample to the global queue */
+    /* Remove some notifications if the queue is beyond maximum capacity */
     MonitoredItem_ensureQueueSpace(monitoredItem);
 
     return true;

+ 1 - 0
tests/client/check_client_subscriptions.c

@@ -228,6 +228,7 @@ START_TEST(Client_subscription_keepAlive) {
     ck_assert_uint_eq(monResponse.statusCode, UA_STATUSCODE_GOOD);
     UA_UInt32 monId = monResponse.monitoredItemId;
 
+    /* Ensure that the subscription is late */
     UA_fakeSleep((UA_UInt32)(publishingInterval + 1));
 
     /* Manually send a publish request */