Forráskód Böngészése

Enabled KeepAlive message generation for publish requests.

ichrispa 9 éve
szülő
commit
67b22dfe42

+ 28 - 15
src/server/ua_services_subscription.c

@@ -126,6 +126,13 @@ UA_Int32 Service_Publish(UA_Server *server, UA_Session *session,
     
     // Verify Session and Subscription
     response->responseHeader.serviceResult = UA_STATUSCODE_GOOD;
+    response->diagnosticInfosSize = 0;
+    response->availableSequenceNumbersSize = 0;
+    response->resultsSize = 0;
+    response->subscriptionId = 0;
+    response->moreNotifications = UA_FALSE;
+    response->notificationMessage.notificationDataSize = 0;
+    
     if (session == NULL ) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;           
     else if ( session->channel == NULL || session->activated == UA_FALSE) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONNOTACTIVATED;
     if ( response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) return 0;
@@ -133,22 +140,28 @@ UA_Int32 Service_Publish(UA_Server *server, UA_Session *session,
     manager = &(session->subscriptionManager);    
     for (sub=(manager->ServerSubscriptions)->lh_first; sub != NULL; sub = sub->listEntry.le_next) {
         Subscription_updateNotifications(sub);
-        /*
-        printf("Publish: Session Subscription %i\n", sub->SubscriptionID);
-        for(mon=sub->MonitoredItems->lh_first; mon != NULL; mon=mon->listEntry.le_next) {
-            printf("Publish: Session Subscription %i, Monitored Item %i\n", sub->SubscriptionID, mon->ItemId);
-            //MonitoredItem_QueuePushDataValue(mon);
-            printf("QueueSize: %i, %i\n", mon->QueueSize.currentValue, mon->QueueSize.maxValue);
-        }
-        */
+        if (Subscription_queuedNotifications(sub) > 0) {
+	  response->subscriptionId = sub->SubscriptionID;
+	  
+	  Subscription_copyTopNotificationMessage(&(response->notificationMessage), sub);
+	  
+	  if (sub->unpublishedNotifications->lh_first->notification->sequenceNumber > sub->SequenceNumber) {
+	    // If this is a keepalive message, its seqNo is the next seqNo to be used for an actual msg.
+	    response->availableSequenceNumbersSize = 0;
+	    // .. and must be deleted
+	    Subscription_deleteUnpublishedNotification(sub->SequenceNumber + 1, sub);
+	  }
+	  else {
+	    response->availableSequenceNumbersSize = Subscription_queuedNotifications(sub);
+	    response->availableSequenceNumbers = Subscription_getAvailableSequenceNumbers(sub);
+	  }	  
+	  //printf("Publish: Session Subscription %i; Queued %i, #SeqNo %i\n", sub->SubscriptionID, Subscription_queuedNotifications(sub), sub->SequenceNumber);
+	  
+	  return 0;
+	}
     }
     
-    response->subscriptionId = 0;
-    response->availableSequenceNumbersSize = 0;
-    response->availableSequenceNumbers = 0;
-    response->moreNotifications = UA_FALSE;
-    response->resultsSize = 0;
-
+    
     response->responseHeader.serviceResult = UA_STATUSCODE_BADSERVICEUNSUPPORTED;
     return 0;
 }
@@ -164,7 +177,7 @@ UA_Int32 Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
     else if ( session->channel == NULL || session->activated == UA_FALSE) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONNOTACTIVATED;
     if ( response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) return 0;
     
-    retStat = (UA_StatusCode *) malloc(sizeof(UA_StatusCode) * request->subscriptionIdsSize);
+    retStat = (UA_UInt32 *) malloc(sizeof(UA_UInt32) * request->subscriptionIdsSize);
     if (retStat==NULL) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
         return -1;

+ 91 - 24
src/server/ua_subscription_manager.c

@@ -1,6 +1,7 @@
 #ifdef ENABLESUBSCRIPTIONS
 #include "ua_types.h"
 #include "ua_server_internal.h"
+#include "ua_nodestore.h"
 #include "ua_subscription_manager.h"
 
 #include <stdio.h> // Remove later, debugging only
@@ -32,6 +33,7 @@ UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID) {
     new->SequenceNumber = 0;
     new->MonitoredItems = (UA_ListOfUAMonitoredItems *) malloc (sizeof(UA_ListOfUAMonitoredItems));
     LIST_INIT(new->MonitoredItems);
+    LIST_INITENTRY(new, listEntry);
     new->unpublishedNotifications = (UA_ListOfUnpublishedNotifications *) malloc(sizeof(UA_ListOfUnpublishedNotifications));
     LIST_INIT(new->unpublishedNotifications);
     return new;
@@ -44,7 +46,8 @@ UA_MonitoredItem *UA_MonitoredItem_new() {
     new->LastSampled = 0;
     
     LIST_INIT(new->queue);
-    
+    LIST_INITENTRY(new, listEntry);
+    INITPOINTER(new->monitoredNode);
     return new;
 }
 
@@ -130,15 +133,15 @@ UA_Int32 SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager,
     return UA_STATUSCODE_GOOD;
 } 
 
-int Subscription_queuedNotifications(UA_Subscription *subscription) {
-    int j = 0;
+UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription) {
+    UA_UInt32 j = 0;
     if (subscription == NULL) return 0;
     
     for(UA_unpublishedNotification *i = subscription->unpublishedNotifications->lh_first; i != NULL; i=(i->listEntry).le_next) j++;
     
     return j;
 }
-    
+
 void Subscription_updateNotifications(UA_Subscription *subscription) {
     UA_MonitoredItem *mon;
     //MonitoredItem_queuedValue *queuedValue;
@@ -147,7 +150,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
     
     if (subscription == NULL) return;
     if ((subscription->LastPublished + subscription->PublishingInterval) > UA_DateTime_now()) return;
-       
+         
     // Check if any MonitoredItem Queues hold data
     for(mon=subscription->MonitoredItems->lh_first; mon!= NULL; mon=mon->listEntry.le_next) {
         if (mon->queue->lh_first != NULL) {
@@ -167,15 +170,18 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
         // Decrement KeepAlive
         subscription->KeepAliveCount.currentValue--;
         // +- Generate KeepAlive msg if counter overruns
-        if (subscription->KeepAliveCount.currentValue < subscription->KeepAliveCount.minValue) {
+        if (subscription->KeepAliveCount.currentValue <= subscription->KeepAliveCount.minValue || subscription->KeepAliveCount.currentValue > subscription->KeepAliveCount.maxValue) {
             msg = (UA_unpublishedNotification *) malloc(sizeof(UA_unpublishedNotification));
-            msg->notification.sequenceNumber = (subscription->SequenceNumber)++;
-            msg->notification.publishTime    = UA_DateTime_now();
-            msg->notification.notificationDataSize = monItemsWithData;
-            msg->notification.sequenceNumber = subscription->SequenceNumber++;
-            msg->notification.notificationDataSize = 0;
-            
-            LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
+	    LIST_INITENTRY(msg, listEntry);
+	    INITPOINTER(msg->notification);
+	    
+	    msg->notification = (UA_NotificationMessage *) malloc(sizeof(UA_NotificationMessage));
+	    INITPOINTER(msg->notification->notificationData);
+	    msg->notification->sequenceNumber = (subscription->SequenceNumber)+1; // KeepAlive uses next message, but does not increment counter
+            msg->notification->publishTime    = UA_DateTime_now();
+            msg->notification->notificationDataSize = 0;
+	    
+	    LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
             subscription->KeepAliveCount.currentValue = subscription->KeepAliveCount.maxValue;
         }
         
@@ -183,25 +189,18 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
     }
     
     // One or more MonitoredItems hold data -> create a new NotificationMessage
-    printf("UpdateNotification: creating NotificationMessage for %i items holding data\n", msg->notification.notificationDataSize);
     
     // +- Create Array of NotificationData
     // +- Clear Queue
     
     // Fill the NotificationMessage with NotificationData
-    for(mon=subscription->MonitoredItems->lh_first; mon!= NULL; mon=mon->listEntry.le_next) {
-        if (mon->queue->lh_first != NULL) {
-            printf("UpdateNotification: monitored %i\n", mon->ItemId);
-            if (msg == NULL) {
-                
-            }
-        }
-    }
+    
     return;
 }
 
 void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
-    if (monitoredItem == NULL) return;
+  
+  if (monitoredItem == NULL) return;
     while(monitoredItem->queue->lh_first != NULL) {
         LIST_REMOVE(monitoredItem->queue->lh_first, listEntry);
     }
@@ -216,6 +215,7 @@ void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem) {
     if( (monitoredItem->LastSampled + monitoredItem->SamplingInterval) > UA_DateTime_now()) return;
     
     newvalue = (MonitoredItem_queuedValue *) malloc(sizeof(MonitoredItem_queuedValue));
+    LIST_INITENTRY(newvalue,listEntry);
     
     // Create new Value
     switch(monitoredItem->AttributeID) {
@@ -275,7 +275,6 @@ void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem) {
         break;
     }
     
-    printf("MonitoredItem AddValue: %i,%i\n", (monitoredItem->QueueSize).currentValue, (monitoredItem->QueueSize).maxValue);
     if ((monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
         if (monitoredItem->DiscardOldest == UA_TRUE && monitoredItem->queue->lh_first != NULL ) {
             for(queueItem = monitoredItem->queue->lh_first; queueItem->listEntry.le_next != NULL; queueItem = queueItem->listEntry.le_next) {}
@@ -295,4 +294,72 @@ void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem) {
     return;
 }
 
+UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
+  UA_UInt32 *seqArray;
+  int i;
+  UA_unpublishedNotification *not;
+  
+  if (sub == NULL) return NULL;
+  
+  seqArray = (UA_UInt32 *) malloc(sizeof(UA_UInt32) * Subscription_queuedNotifications(sub));
+  if (seqArray == NULL ) return NULL;
+  
+  i = 0;
+  for(not = sub->unpublishedNotifications->lh_first; not != NULL; not=(not->listEntry).le_next) {
+    seqArray[i] = not->notification->sequenceNumber;
+    i++;
+  }
+  
+  return seqArray;
+  
+}
+
+void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub) {
+    UA_NotificationMessage *latest;
+    
+    if (dst == NULL) return;
+    
+    if (Subscription_queuedNotifications(sub) == 0) {
+      dst->notificationDataSize = 0;
+      dst->publishTime = UA_DateTime_now();
+      dst->sequenceNumber = 0;
+      return;
+    }
+    
+    latest = sub->unpublishedNotifications->lh_first->notification;
+    dst->notificationDataSize = latest->notificationDataSize;
+    dst->publishTime = latest->publishTime;
+    dst->sequenceNumber = latest->sequenceNumber;
+    
+    if (latest->notificationDataSize == 0) return;
+    
+    dst->notificationData = (UA_ExtensionObject *) malloc(sizeof(UA_ExtensionObject));
+    dst->notificationData->encoding = latest->notificationData->encoding;
+    dst->notificationData->typeId   = latest->notificationData->typeId;
+    dst->notificationData->body.length = 0;
+    
+    return;
+}
+
+void Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub) {
+  UA_unpublishedNotification *not;
+
+  for(not=sub->unpublishedNotifications->lh_first; not != NULL; not=not->listEntry.le_next) {
+    if (not->notification->sequenceNumber == seqNo) { 
+      LIST_REMOVE(not, listEntry);
+      if (not->notification != NULL) {
+	if (not->notification->notificationData != NULL) {
+	  if (not->notification->notificationData->body.data != NULL) {
+	    UA_free(not->notification->notificationData->body.data);
+	  }
+	  UA_free(not->notification->notificationData);
+	}
+	UA_free(not->notification);
+      }
+      UA_free(not);
+    }
+  }
+
+  return;
+}
 #endif //#ifdef ENABLESUBSCRIPTIONS

+ 17 - 5
src/server/ua_subscription_manager.h

@@ -1,4 +1,4 @@
-#ifdef ENABLESUBSCRIPTIONS
+#ifdef  ENABLESUBSCRIPTIONS
 #ifndef UA_SUBSCRIPTION_MANAGER_H_
 #define UA_SUBSCRIPTION_MANAGER_H_
 
@@ -7,6 +7,15 @@
 #include "queue.h"
 #include "ua_nodestore.h"
 
+#define LIST_INITENTRY(item,entry) { \
+  (item)->entry.le_next = NULL; \
+  (item)->entry.le_prev = NULL; \
+}; 
+
+#define INITPOINTER(src) { \
+  (src)=NULL; \
+}
+
 typedef struct {
     UA_Int32 currentValue;
     UA_Int32 minValue;
@@ -43,7 +52,7 @@ typedef struct UA_MonitoredItem_s {
 } UA_MonitoredItem;
 
 typedef struct UA_unpublishedNotification_s {
-    UA_NotificationMessage notification;
+    UA_NotificationMessage 		     *notification;
     LIST_ENTRY(UA_unpublishedNotification_s) listEntry;
 } UA_unpublishedNotification;
 
@@ -82,9 +91,12 @@ UA_Subscription *SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager
 UA_Int32        SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID);
 UA_Int32        SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID, UA_UInt32 MonitoredItemID);
 
-UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID);
-void            Subscription_updateNotifications(UA_Subscription *subscription);
-int             Subscription_queuedNotifications(UA_Subscription *subscription);
+UA_Subscription 	*UA_Subscription_new(UA_Int32 SubscriptionID);
+void            	Subscription_updateNotifications(UA_Subscription *subscription);
+UA_UInt32       	Subscription_queuedNotifications(UA_Subscription *subscription);
+UA_UInt32 		*Subscription_getAvailableSequenceNumbers(UA_Subscription *sub);
+void 			Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub);
+void   Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub);
 
 UA_MonitoredItem *UA_MonitoredItem_new(void);
 void             MonitoredItem_delete(UA_MonitoredItem *monitoredItem);