Browse Source

answer publish requests with a dedicated call to UA_SecureChannel_sendBinaryMessage

Julius Pfrommer 9 years ago
parent
commit
360f03ba87
3 changed files with 57 additions and 38 deletions
  1. 12 4
      src/server/ua_server_binary.c
  2. 1 2
      src/server/ua_services.h
  3. 44 32
      src/server/ua_services_subscription.c

+ 12 - 4
src/server/ua_server_binary.c

@@ -142,7 +142,6 @@ static void processOPN(UA_Connection *connection, UA_Server *server, const UA_By
 
 static void init_response_header(const UA_RequestHeader *p, UA_ResponseHeader *r) {
     r->requestHandle = p->requestHandle;
-    r->stringTableSize = 0;
     r->timestamp = UA_DateTime_now();
 }
 
@@ -218,7 +217,6 @@ getServicePointers(UA_UInt32 requestTypeId, const UA_DataType **requestType,
         *responseType = &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE];
         break;
     case UA_NS0ID_PUBLISHREQUEST:
-        *service = (UA_Service)Service_Publish;
         *requestType = &UA_TYPES[UA_TYPES_PUBLISHREQUEST];
         *responseType = &UA_TYPES[UA_TYPES_PUBLISHRESPONSE];
         break;
@@ -351,7 +349,7 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
     const UA_DataType *requestType = NULL;
     const UA_DataType *responseType = NULL;
     getServicePointers(requestTypeId.identifier.numeric, &requestType, &responseType, &service);
-    if(!service) {
+    if(!requestType) {
         /* The service is not supported */
         if(requestTypeId.identifier.numeric==787)
             UA_LOG_INFO(server->logger, UA_LOGCATEGORY_SERVER,
@@ -408,8 +406,18 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
     }
 #endif
 
-    /* Call the service */
     UA_Session_updateLifetime(session);
+
+#ifdef ENABLE_SUBSCRIPTIONS
+    /* The publish request is answered with a delay */
+    if(requestTypeId.identifier.numeric - UA_ENCODINGOFFSET_BINARY == UA_NS0ID_PUBLISHREQUEST) {
+        Service_Publish(server, session, request, sequenceHeader.requestId);
+        UA_deleteMembers(request, requestType);
+        return;
+    }
+#endif
+        
+    /* Call the service */
     void *response = UA_alloca(responseType->memSize);
     UA_init(response, responseType);
     init_response_header(request, response);

+ 1 - 2
src/server/ua_services.h

@@ -327,8 +327,7 @@ Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
                                      
 void
 Service_Publish(UA_Server *server, UA_Session *session,
-                const UA_PublishRequest *request,
-                UA_PublishResponse *response);
+                const UA_PublishRequest *request, UA_UInt32 requestId);
 
 void
 Service_Republish(UA_Server *server, UA_Session *session,

+ 44 - 32
src/server/ua_services_subscription.c

@@ -130,28 +130,36 @@ void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
         createMonitoredItems(server, session, sub, &request->itemsToCreate[i], &response->results[i]);
 }
 
-void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest *request,
-                     UA_PublishResponse *response) {
+void
+Service_Publish(UA_Server *server, UA_Session *session,
+                const UA_PublishRequest *request, UA_UInt32 requestId) {
     UA_SubscriptionManager *manager= &session->subscriptionManager;
     if(!manager)
         return;
+
+    UA_PublishResponse response;
+    UA_PublishResponse_init(&response);
+    response.responseHeader.requestHandle = request->requestHeader.requestHandle;
+    response.responseHeader.timestamp = UA_DateTime_now();
     
     // Delete Acknowledged Subscription Messages
-    response->resultsSize = request->subscriptionAcknowledgementsSize;
-    response->results     = UA_malloc(sizeof(UA_StatusCode)*(response->resultsSize));
+    response.resultsSize = request->subscriptionAcknowledgementsSize;
+    response.results = UA_calloc(response.resultsSize, sizeof(UA_StatusCode));
     for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; i++) {
-        response->results[i] = UA_STATUSCODE_GOOD;
-        UA_Subscription *sub =
-            SubscriptionManager_getSubscriptionByID(&session->subscriptionManager,
-                                                    request->subscriptionAcknowledgements[i].subscriptionId);
+        response.results[i] = UA_STATUSCODE_GOOD;
+        UA_UInt32 sid = request->subscriptionAcknowledgements[i].subscriptionId;
+        UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(manager, sid);
         if(!sub) {
-            response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+            response.results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
             continue;
         }
-        if(Subscription_deleteUnpublishedNotification(request->subscriptionAcknowledgements[i].sequenceNumber, false, sub) == 0)
-            response->results[i] = UA_STATUSCODE_BADSEQUENCENUMBERINVALID;
+        UA_UInt32 sn = request->subscriptionAcknowledgements[i].sequenceNumber;
+        if(Subscription_deleteUnpublishedNotification(sn, false, sub) == 0)
+            response.results[i] = UA_STATUSCODE_BADSEQUENCENUMBERINVALID;
     }
     
+    UA_Boolean have_response = UA_FALSE;
+
     // See if any new data is available
     UA_Subscription *sub;
     LIST_FOREACH(sub, &manager->serverSubscriptions, listEntry) {
@@ -181,39 +189,43 @@ void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishReq
             continue;
     
         // We found an unpublished notification message in this subscription, which we will now publish.
-        response->subscriptionId = sub->subscriptionID;
-        Subscription_copyNotificationMessage(&response->notificationMessage, notification);
+        response.subscriptionId = sub->subscriptionID;
+        Subscription_copyNotificationMessage(&response.notificationMessage, notification);
         // Mark this notification as published
         notification->publishedOnce = UA_TRUE;
         if(notification->notification.sequenceNumber > sub->sequenceNumber) {
             // If this is a keepalive message, its seqNo is the next seqNo to be used for an actual msg.
-            response->availableSequenceNumbersSize = 0;
+            response.availableSequenceNumbersSize = 0;
             // .. and must be deleted
             Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, false, sub);
         } else {
-            response->availableSequenceNumbersSize = sub->unpublishedNotificationsSize;
-            response->availableSequenceNumbers = Subscription_getAvailableSequenceNumbers(sub);
+            response.availableSequenceNumbersSize = sub->unpublishedNotificationsSize;
+            response.availableSequenceNumbers = Subscription_getAvailableSequenceNumbers(sub);
         }	  
-        // FIXME: This should be in processMSG();
-        session->validTill = UA_DateTime_now() + session->timeout * 10000;
-        return;
+        have_response = UA_TRUE;
     }
     
-    // FIXME: At this point, we would return nothing and "queue" the publish
-    // request, but currently we need to return something to the client. If no
-    // subscriptions have notifications, force one to generate a keepalive so we
-    // don't return an empty message
-    sub = LIST_FIRST(&manager->serverSubscriptions);
-    if(sub) {
-        response->subscriptionId = sub->subscriptionID;
-        sub->keepAliveCount.currentValue=sub->keepAliveCount.minValue;
-        Subscription_generateKeepAlive(sub);
-        Subscription_copyNotificationMessage(&response->notificationMessage, sub->unpublishedNotifications.lh_first);
-        Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, false, sub);
+    if(!have_response) {
+        // FIXME: At this point, we would return nothing and "queue" the publish
+        // request, but currently we need to return something to the client. If no
+        // subscriptions have notifications, force one to generate a keepalive so we
+        // don't return an empty message
+        sub = LIST_FIRST(&manager->serverSubscriptions);
+        if(sub) {
+            response.subscriptionId = sub->subscriptionID;
+            sub->keepAliveCount.currentValue=sub->keepAliveCount.minValue;
+            Subscription_generateKeepAlive(sub);
+            Subscription_copyNotificationMessage(&response.notificationMessage,
+                                                 LIST_FIRST(&sub->unpublishedNotifications));
+            Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, false, sub);
+        }
     }
     
-    // FIXME: This should be in processMSG();
-    session->validTill = UA_DateTime_now() + session->timeout * 10000;
+    UA_SecureChannel *channel = session->channel;
+    if(channel)
+        UA_SecureChannel_sendBinaryMessage(channel, requestId, &response,
+                                           &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
+    UA_PublishResponse_deleteMembers(&response);
 }
 
 void Service_ModifySubscription(UA_Server *server, UA_Session *session,