Просмотр исходного кода

Republish service and retransmit queuing implemented

Chris Iatrou лет назад: 9
Родитель
Сommit
efae15466b

+ 3 - 0
src/server/ua_server_binary.c

@@ -355,6 +355,9 @@ static void processMSG(UA_Connection *connection, UA_Server *server, const UA_By
     case UA_NS0ID_PUBLISHREQUEST:
         INVOKE_SERVICE(Publish, UA_TYPES_PUBLISHRESPONSE);
         break;
+    case UA_NS0ID_REPUBLISHREQUEST:
+        INVOKE_SERVICE(Republish, UA_TYPES_REPUBLISHRESPONSE);
+        break;
     case UA_NS0ID_MODIFYSUBSCRIPTIONREQUEST:
         INVOKE_SERVICE(ModifySubscription, UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE);
         break;

+ 4 - 1
src/server/ua_services.h

@@ -298,7 +298,10 @@ void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
                                      
 void Service_Publish(UA_Server *server, UA_Session *session,
                      const UA_PublishRequest *request, UA_PublishResponse *response);
-                         
+
+void Service_Republish(UA_Server *server, UA_Session *session, const UA_RepublishRequest *request,
+                                UA_RepublishResponse *response);
+
 // Service_ModifySubscription
 // Service_SetPublishingMode
 // UA_Int32 Service_SetPublishingMode(UA_Server *server, UA_Session *session,

+ 54 - 3
src/server/ua_services_subscription.c

@@ -170,9 +170,22 @@ void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishReq
         if(Subscription_queuedNotifications(sub) <= 0)
             continue;
         
+	// This subscription has notifications in its queue (top NotificationMessage exists in the queue). 
+	// Due to republish, we need to check if there are any unplublished notifications first ()
+	UA_unpublishedNotification *notification = NULL;
+	LIST_FOREACH(notification, &sub->unpublishedNotifications, listEntry) {
+	  if (notification->publishedOnce == UA_FALSE)
+	    break;
+	}
+	if (notification == NULL)
+	  continue;
+	
+	// We found an unpublished notification message in this subscription, which we will now publish.
         response->subscriptionId = sub->subscriptionID;
-        Subscription_copyTopNotificationMessage(&response->notificationMessage, sub);
-        if(sub->unpublishedNotifications.lh_first->notification->sequenceNumber > sub->sequenceNumber) {
+        Subscription_copyNotificationMessage(&response->notificationMessage, notification);
+	// Mark this notification as published
+	notification->publishedOnce = UA_TRUE;
+        if(notification->notification->sequenceNumber > sub->sequenceNumber) {
             // If this is a keepalive message, its seqNo is the next seqNo to be used for an actual msg.
             response->availableSequenceNumbersSize = 0;
             // .. and must be deleted
@@ -195,7 +208,7 @@ void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishReq
         response->subscriptionId = sub->subscriptionID;
         sub->keepAliveCount.currentValue=sub->keepAliveCount.minValue;
         Subscription_generateKeepAlive(sub);
-        Subscription_copyTopNotificationMessage(&response->notificationMessage, sub);
+        Subscription_copyNotificationMessage(&response->notificationMessage, sub->unpublishedNotifications.lh_first);
         Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, false, sub);
     }
     
@@ -274,3 +287,41 @@ void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
         response->results[i] = SubscriptionManager_deleteMonitoredItem(manager, sub->subscriptionID,
                                                                        request->monitoredItemIds[i]);
 }
+
+void Service_Republish(UA_Server *server, UA_Session *session,
+                                const UA_RepublishRequest *request,
+                                UA_RepublishResponse *response) {
+    UA_SubscriptionManager *manager = &session->subscriptionManager;
+    UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(manager, request->subscriptionId);
+    if (!sub) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+        return;
+    }
+    
+    // Find the notification in question
+    UA_unpublishedNotification *notification;
+    LIST_FOREACH(notification, &sub->unpublishedNotifications, listEntry) {
+      if (notification->notification->sequenceNumber == request->retransmitSequenceNumber)
+	break;
+    }
+    if (!notification) {
+      response->responseHeader.serviceResult = UA_STATUSCODE_BADSEQUENCENUMBERINVALID;
+      return;
+    }
+    
+    // FIXME: By spec, this notification has to be in the "retransmit queue", i.e. publishedOnce must be
+    //        true. If this is not tested, the client just gets what he asks for... hence this part is
+    //        commented:
+    /* Check if the notification is in the published queue
+    if (notification->publishedOnce == UA_FALSE) {
+      response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+      return;
+    }
+    */
+    // Retransmit 
+    Subscription_copyNotificationMessage(&response->notificationMessage, notification);
+    // Mark this notification as published
+    notification->publishedOnce = UA_TRUE;
+    
+    return;
+}

+ 4 - 10
src/server/ua_subscription.c

@@ -118,6 +118,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
     msg = (UA_unpublishedNotification *) UA_malloc(sizeof(UA_unpublishedNotification));
     msg->notification = UA_malloc(sizeof(UA_NotificationMessage));
     INITPOINTER(msg->notification->notificationData);
+    msg->publishedOnce = UA_FALSE;
     msg->notification->sequenceNumber = subscription->sequenceNumber++;
     msg->notification->publishTime    = UA_DateTime_now();
     
@@ -133,7 +134,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
         //   the three possible NotificationData Types
         msg->notification->notificationData[notmsgn].encoding = 1; // Encoding is always binary
         msg->notification->notificationData[notmsgn].typeId = UA_NODEID_NUMERIC(0, 811);
-      
+        
         if(notmsgn == 0) {
             // Construct a DataChangeNotification
             changeNotification = UA_malloc(sizeof(UA_DataChangeNotification));
@@ -198,18 +199,11 @@ UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
     return seqArray;
 }
 
-void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub) {
+void Subscription_copyNotificationMessage(UA_NotificationMessage *dst, UA_unpublishedNotification *src) {
     if(!dst)
         return;
     
-    if(Subscription_queuedNotifications(sub) == 0) {
-      dst->notificationDataSize = 0;
-      dst->publishTime = UA_DateTime_now();
-      dst->sequenceNumber = 0;
-      return;
-    }
-    
-    UA_NotificationMessage *latest = LIST_FIRST(&sub->unpublishedNotifications)->notification;
+    UA_NotificationMessage *latest = src->notification;
     dst->notificationDataSize = latest->notificationDataSize;
     dst->publishTime = latest->publishTime;
     dst->sequenceNumber = latest->sequenceNumber;

+ 2 - 1
src/server/ua_subscription.h

@@ -69,6 +69,7 @@ int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *d
 /****************/
 
 typedef struct UA_unpublishedNotification {
+    UA_Boolean publishedOnce;
     LIST_ENTRY(UA_unpublishedNotification) listEntry;
     UA_NotificationMessage *notification;
 } UA_unpublishedNotification;
@@ -96,7 +97,7 @@ void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *ser
 void Subscription_updateNotifications(UA_Subscription *subscription);
 UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription);
 UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub);
-void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub);
+void Subscription_copyNotificationMessage(UA_NotificationMessage *dst, UA_unpublishedNotification *sub);
 UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Boolean bDeleteAll, UA_Subscription *sub);
 void Subscription_generateKeepAlive(UA_Subscription *subscription);
 UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub);

+ 1 - 1
tools/generate_datatypes.py

@@ -69,7 +69,7 @@ minimal_types = ["InvalidType", "Node", "NodeClass", "ReferenceNode", "Applicati
 subscription_types = ["CreateSubscriptionRequest", "CreateSubscriptionResponse",
                       "DeleteMonitoredItemsRequest", "DeleteMonitoredItemsResponse", "NotificationMessage",
                       "MonitoredItemNotification", "DataChangeNotification", "ModifySubscriptionRequest",
-                      "ModifySubscriptionResponse"]
+                      "ModifySubscriptionResponse", "RepublishRequest", "RepublishResponse"]
 
 class TypeDescription(object):
     def __init__(self, name, nodeid, namespaceid):