Bladeren bron

remove iterations of unpublished notifications just for counting; remove redundant checks

Julius Pfrommer 9 jaren geleden
bovenliggende
commit
3ba9364fdd
3 gewijzigde bestanden met toevoegingen van 24 en 34 verwijderingen
  1. 6 2
      src/server/ua_services_subscription.c
  2. 16 28
      src/server/ua_subscription.c
  3. 2 4
      src/server/ua_subscription.h

+ 6 - 2
src/server/ua_services_subscription.c

@@ -17,6 +17,10 @@ void Service_CreateSubscription(UA_Server *server, UA_Session *session,
                                 UA_CreateSubscriptionResponse *response) {
     response->subscriptionId = SubscriptionManager_getUniqueUIntID(&session->subscriptionManager);
     UA_Subscription *newSubscription = UA_Subscription_new(response->subscriptionId);
+    if(!newSubscription) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
+        return;
+    }
     
     /* set the publishing interval */
     UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.globalPublishingInterval,
@@ -163,7 +167,7 @@ void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishReq
             Subscription_updateNotifications(sub);
         }
         
-        if(Subscription_queuedNotifications(sub) <= 0)
+        if(sub->unpublishedNotificationsSize == 0)
             continue;
         
         response->subscriptionId = sub->subscriptionID;
@@ -175,7 +179,7 @@ void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishReq
             // .. and must be deleted
             Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, false, sub);
         } else {
-            response->availableSequenceNumbersSize = Subscription_queuedNotifications(sub);
+            response->availableSequenceNumbersSize = sub->unpublishedNotificationsSize;
             response->availableSequenceNumbers = Subscription_getAvailableSequenceNumbers(sub);
         }	  
         // FIXME: This should be in processMSG();

+ 16 - 28
src/server/ua_subscription.c

@@ -18,6 +18,7 @@ UA_Subscription *UA_Subscription_new(UA_Int32 subscriptionID) {
     new->timedUpdateIsRegistered = UA_FALSE;
     LIST_INIT(&new->MonitoredItems);
     LIST_INIT(&new->unpublishedNotifications);
+    new->unpublishedNotificationsSize = 0;
     return new;
 }
 
@@ -43,17 +44,6 @@ void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *ser
     }
 }
 
-UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription) {
-    if(!subscription)
-        return 0;
-
-    UA_UInt32 j = 0;
-    UA_unpublishedNotification *i;
-    LIST_FOREACH(i, &subscription->unpublishedNotifications, listEntry)
-        j++;
-    return j;
-}
-
 void Subscription_generateKeepAlive(UA_Subscription *subscription) {
     if(subscription->keepAliveCount.currentValue > subscription->keepAliveCount.minValue &&
        subscription->keepAliveCount.currentValue <= subscription->keepAliveCount.maxValue)
@@ -68,6 +58,7 @@ void Subscription_generateKeepAlive(UA_Subscription *subscription) {
     msg->notification.publishTime    = UA_DateTime_now();
     msg->notification.notificationDataSize = 0;
     LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
+    subscription->unpublishedNotificationsSize += 1;
     subscription->keepAliveCount.currentValue = subscription->keepAliveCount.maxValue;
 }
 
@@ -95,7 +86,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
     }
     
     // FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
-    if(Subscription_queuedNotifications(subscription) >= 10) {
+    if(subscription->unpublishedNotificationsSize >= 10) {
         // Remove last entry
         Subscription_deleteUnpublishedNotification(0, true, subscription);
     }
@@ -112,7 +103,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
     
     msg = UA_calloc(1, sizeof(UA_unpublishedNotification));
     msg->notification.sequenceNumber = subscription->sequenceNumber++;
-    msg->notification.publishTime    = UA_DateTime_now();
+    msg->notification.publishTime = UA_DateTime_now();
     
     // NotificationData is an array of Change, Status and Event messages, each containing the appropriate
     // list of Queued values from all monitoredItems of that type
@@ -152,10 +143,11 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
         }
     }
     LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
+    subscription->unpublishedNotificationsSize += 1;
 }
 
 UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
-    UA_UInt32 *seqArray = UA_malloc(sizeof(UA_UInt32) * Subscription_queuedNotifications(sub));
+    UA_UInt32 *seqArray = UA_malloc(sizeof(UA_UInt32) * sub->unpublishedNotificationsSize);
     if(!seqArray)
         return NULL;
   
@@ -172,11 +164,11 @@ void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Sub
     if(!dst)
         return;
     
-    if(Subscription_queuedNotifications(sub) == 0) {
-      dst->notificationDataSize = 0;
-      dst->publishTime = UA_DateTime_now();
-      dst->sequenceNumber = 0;
-      return;
+    if(sub->unpublishedNotificationsSize == 0) {
+        dst->notificationDataSize = 0;
+        dst->publishTime = UA_DateTime_now();
+        dst->sequenceNumber = 0;
+        return;
     }
     
     UA_NotificationMessage *latest = &LIST_FIRST(&sub->unpublishedNotifications)->notification;
@@ -197,6 +189,7 @@ UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Boolean
         if(!bDeleteAll && not->notification.sequenceNumber != seqNo)
             continue;
         LIST_REMOVE(not, listEntry);
+        sub->unpublishedNotificationsSize -= 1;
         UA_NotificationMessage_deleteMembers(&not->notification);
         UA_free(not);
         deletedItems++;
@@ -245,13 +238,11 @@ UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA
 }
 
 UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub) {
-    if(server == NULL || sub == NULL)
-        return UA_STATUSCODE_BADSERVERINDEXINVALID;
-    
     if(sub->publishingInterval <= 5 ) 
         return UA_STATUSCODE_BADNOTSUPPORTED;
     
-    // Practically enough, the client sends a uint32 in ms, which we store as datetime, which here is required in as uint32 in ms as the interval
+    /* Practically enough, the client sends a uint32 in ms, which we store as
+       datetime, which here is required in as uint32 in ms as the interval */
     UA_StatusCode retval = UA_Server_addRepeatedJob(server, *sub->timedUpdateJob, sub->publishingInterval,
                                                     &sub->timedUpdateJobGuid);
     if(retval == UA_STATUSCODE_GOOD)
@@ -260,18 +251,15 @@ UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription
 }
 
 UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub) {
-    if(server == NULL || sub == NULL)
-        return UA_STATUSCODE_BADSERVERINDEXINVALID;
-    UA_Int32 retval = UA_Server_removeRepeatedJob(server, sub->timedUpdateJobGuid);
     sub->timedUpdateIsRegistered = UA_FALSE;
-    return retval;
+    return UA_Server_removeRepeatedJob(server, sub->timedUpdateJobGuid);
 }
 
 /*****************/
 /* MonitoredItem */
 /*****************/
 
-UA_MonitoredItem *UA_MonitoredItem_new() {
+UA_MonitoredItem * UA_MonitoredItem_new() {
     UA_MonitoredItem *new = (UA_MonitoredItem *) UA_malloc(sizeof(UA_MonitoredItem));
     new->queueSize   = (UA_UInt32_BoundedValue) { .minValue = 0, .maxValue = 0, .currentValue = 0};
     new->lastSampled = 0;

+ 2 - 4
src/server/ua_subscription.h

@@ -85,21 +85,19 @@ typedef struct UA_Subscription {
     UA_Job *timedUpdateJob;
     UA_Boolean timedUpdateIsRegistered;
     LIST_HEAD(UA_ListOfUnpublishedNotifications, UA_unpublishedNotification) unpublishedNotifications;
+    size_t unpublishedNotificationsSize;
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem) MonitoredItems;
 } UA_Subscription;
 
 UA_Subscription *UA_Subscription_new(UA_Int32 subscriptionID);
 void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server);
 void Subscription_updateNotifications(UA_Subscription *subscription);
-UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription);
 UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub);
+void Subscription_generateKeepAlive(UA_Subscription *subscription);
 void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub);
 UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Boolean bDeleteAll, UA_Subscription *sub);
-void Subscription_generateKeepAlive(UA_Subscription *subscription);
 UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub);
 UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub);
 UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub);
 
-static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *data);
-
 #endif /* UA_SUBSCRIPTION_H_ */