Browse Source

Subscriptions now return data values and manage queues (including acknowledged items removal). VariableNodes may have their UA_DataSource as a subscription target.

ichrispa 9 years ago
parent
commit
72ca74b1b0
1 changed files with 39 additions and 8 deletions
  1. 39 8
      src/server/ua_services_subscription.c

+ 39 - 8
src/server/ua_services_subscription.c

@@ -1,7 +1,3 @@
-#ifndef ENABLESUBSCRIPTIONS
-#define ENABLESUBSCRIPTIONS
-#endif
-
 #ifdef ENABLESUBSCRIPTIONS
 #include "ua_services.h"
 #include "ua_server_internal.h"
@@ -10,8 +6,6 @@
 #include "ua_util.h"
 #include "ua_nodestore.h"
 
-#include "ua_log.h" // Remove later, debugging only
-
 #define UA_BOUNDEDVALUE_SETWBOUNDS(BOUNDS, SRC, DST) { \
     if (SRC > BOUNDS.maxValue) DST = BOUNDS.maxValue; \
     else if (SRC < BOUNDS.minValue) DST = BOUNDS.minValue; \
@@ -145,13 +139,36 @@ UA_Int32 Service_Publish(UA_Server *server, UA_Session *session,
     if ( response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) return 0;
 
     manager = &(session->subscriptionManager);    
+    if (manager == NULL) return 0;
+    
+    // Delete Acknowledged Subscription Messages
+    response->resultsSize = request->subscriptionAcknowledgementsSize;
+    response->results     = (UA_StatusCode *) malloc(sizeof(UA_StatusCode)*(response->resultsSize));
+    for(int i=0; i<request->subscriptionAcknowledgementsSize;i++ ) {
+      response->results[i] = UA_STATUSCODE_GOOD;
+      sub = SubscriptionManager_getSubscriptionByID(&(session->subscriptionManager), request->subscriptionAcknowledgements[i].subscriptionId);
+      if (sub == NULL) {
+        response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+        continue;
+      }
+      if(Subscription_deleteUnpublishedNotification(request->subscriptionAcknowledgements[i].sequenceNumber, sub) == 0) {
+        response->results[i] = UA_STATUSCODE_BADSEQUENCENUMBERINVALID;
+      }
+    }
+    
+    // See if any new data is available
     for (sub=(manager->ServerSubscriptions)->lh_first; sub != NULL; sub = sub->listEntry.le_next) {
-	if (sub->MonitoredItems->lh_first != NULL) {  
+	
+        // FIXME: We are forcing a value update for monitored items. This should be done by the event system.
+        if (sub->MonitoredItems->lh_first != NULL) {  
 	  for(mon=sub->MonitoredItems->lh_first; mon != NULL; mon=mon->listEntry.le_next) {
-	    MonitoredItem_QueuePushDataValue(mon);
+            MonitoredItem_QueuePushDataValue(mon);
 	  }
 	}
+	
+	// FIXME: We are forcing notification updates for the subscription. This should be done by a timed work item.
 	Subscription_updateNotifications(sub);
+        
         if (Subscription_queuedNotifications(sub) > 0) {
 	  response->subscriptionId = sub->SubscriptionID;
 	  
@@ -172,6 +189,20 @@ UA_Int32 Service_Publish(UA_Server *server, UA_Session *session,
 	}
     }
     
+    // FIXME: At this point, we would return nothing and "queue" the publish request, but currently we need to 
+    //        return something to the client.
+    //        If no subscriptions have notifications, force one to generate a keepalive so we don't return an 
+    //        empty message
+    sub = manager->ServerSubscriptions->lh_first;
+    if ( sub != NULL) {
+      response->subscriptionId = sub->SubscriptionID;
+      sub->KeepAliveCount.currentValue=sub->KeepAliveCount.minValue;
+      Subscription_generateKeepAlive(sub);
+      Subscription_copyTopNotificationMessage(&(response->notificationMessage), sub);
+      Subscription_deleteUnpublishedNotification(sub->SequenceNumber + 1, sub);
+      response->availableSequenceNumbersSize = 0;
+    }
+    
     response->diagnosticInfosSize = 0;
     response->diagnosticInfos     = 0;
     return 0;