Browse Source

use delayed callback to send dangling publish responses; fix setting the
initial currentKeepAliveCount; simplify error handling

Julius Pfrommer 8 years ago
parent
commit
4859fd5219

+ 0 - 6
src/server/ua_server_binary.c

@@ -591,12 +591,6 @@ processRequest(UA_SecureChannel *channel, UA_Server *server,
         UA_LOG_INFO_CHANNEL(server->config.logger, channel, "Could not send the message over "
                              "the SecureChannel with error code 0x%08x", retval);
 
-#ifdef UA_ENABLE_SUBSCRIPTIONS
-    /* See if we need to return publish requests without a subscription */
-    if(session && requestType == &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST])
-        UA_Session_answerPublishRequestsWithoutSubscription(session);
-#endif
-
     /* Clean up */
     UA_deleteMembers(request, requestType);
     UA_deleteMembers(response, responseType);

+ 34 - 32
src/server/ua_services_subscription.c

@@ -66,10 +66,10 @@ Service_CreateSubscription(UA_Server *server, UA_Session *session,
 
     /* Set the subscription parameters */
     newSubscription->publishingEnabled = request->publishingEnabled;
-    newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount;
     setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval,
                             request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
                             request->maxNotificationsPerPublish, request->priority);
+    newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount; /* set settings first */
 
     /* Prepare the response */
     response->subscriptionId = newSubscription->subscriptionID;
@@ -134,10 +134,6 @@ Service_SetPublishingMode(UA_Server *server, UA_Session *session,
         if(sub->publishingEnabled != request->publishingEnabled) {
             sub->publishingEnabled = request->publishingEnabled;
             sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */
-            if(sub->publishingEnabled)
-                Subscription_registerPublishJob(server, sub);
-            else
-                Subscription_unregisterPublishJob(server, sub);
         }
     }
 }
@@ -402,28 +398,17 @@ void
 Service_Publish(UA_Server *server, UA_Session *session,
                 const UA_PublishRequest *request, UA_UInt32 requestId) {
     UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing PublishRequest");
+    UA_StatusCode retval = UA_STATUSCODE_GOOD;
     /* Return an error if the session has no subscription */
     if(LIST_EMPTY(&session->serverSubscriptions)) {
-        UA_PublishResponse response;
-        UA_PublishResponse_init(&response);
-        response.responseHeader.requestHandle = request->requestHeader.requestHandle;
-        response.responseHeader.timestamp = UA_DateTime_now();
-        response.responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
-        UA_SecureChannel_sendBinaryMessage(session->channel, requestId, &response,
-                                           &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
-        return;
+        retval = UA_STATUSCODE_BADNOSUBSCRIPTION;
+        goto send_error;
     }
 
     UA_PublishResponseEntry *entry = UA_malloc(sizeof(UA_PublishResponseEntry));
     if(!entry) {
-        UA_PublishResponse response;
-        UA_PublishResponse_init(&response);
-        response.responseHeader.requestHandle = request->requestHeader.requestHandle;
-        response.responseHeader.timestamp = UA_DateTime_now();
-        response.responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
-        UA_SecureChannel_sendBinaryMessage(session->channel, requestId, &response,
-                                           &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
-        return;
+        retval = UA_STATUSCODE_BADOUTOFMEMORY;
+        goto send_error;
     }
     entry->requestId = requestId;
 
@@ -435,14 +420,9 @@ Service_Publish(UA_Server *server, UA_Session *session,
         response->results = UA_Array_new(request->subscriptionAcknowledgementsSize,
                                          &UA_TYPES[UA_TYPES_STATUSCODE]);
         if(!response->results) {
-            /* Respond immediately with the error code */
-            response->responseHeader.timestamp = UA_DateTime_now();
-            response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
-            UA_SecureChannel_sendBinaryMessage(session->channel, requestId, response,
-                                               &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
-            UA_PublishResponse_deleteMembers(response);
             UA_free(entry);
-            return;
+            retval = UA_STATUSCODE_BADOUTOFMEMORY;
+            goto send_error;
         }
         response->resultsSize = request->subscriptionAcknowledgementsSize;
     }
@@ -450,15 +430,15 @@ Service_Publish(UA_Server *server, UA_Session *session,
     /* Delete Acknowledged Subscription Messages */
     for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; i++) {
         UA_SubscriptionAcknowledgement *ack = &request->subscriptionAcknowledgements[i];
-        /* Get the subscription */
         UA_Subscription *sub = UA_Session_getSubscriptionByID(session, ack->subscriptionId);
         if(!sub) {
             response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
             UA_LOG_DEBUG_SESSION(server->config.logger, session,
-                         "Cannot process acknowledgements subscription %u", ack->subscriptionId);
+                                 "Cannot process acknowledgements subscription %u",
+                                 ack->subscriptionId);
             continue;
         }
-        /* Remove the acked transmission for the retransmission queue */
+        /* Remove the acked transmission from the retransmission queue */
         response->results[i] = UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN;
         UA_NotificationMessageEntry *pre, *pre_tmp;
         LIST_FOREACH_SAFE(pre, &sub->retransmissionQueue, listEntry, pre_tmp) {
@@ -485,9 +465,20 @@ Service_Publish(UA_Server *server, UA_Session *session,
                                  "Response on a late subscription", immediate->subscriptionID,
                                  session->authenticationToken.identifier.numeric);
             UA_Subscription_publishCallback(server, immediate);
-            return;
+            break;
         }
     }
+    return;
+
+    UA_PublishResponse err_response;
+ send_error:
+    UA_PublishResponse_init(&err_response);
+    err_response.responseHeader.requestHandle = request->requestHeader.requestHandle;
+    err_response.responseHeader.timestamp = UA_DateTime_now();
+    err_response.responseHeader.serviceResult = retval;
+    UA_assert(err_response.responseHeader.requestHandle != 0);
+    UA_SecureChannel_sendBinaryMessage(session->channel, requestId, &err_response,
+                                       &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
 }
 
 void
@@ -520,6 +511,17 @@ Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
                                  response->results[i]);
         }
     }
+
+    /* Send dangling publish responses in a delayed job if the last subscription
+       was removed */
+    if(LIST_FIRST(&session->serverSubscriptions))
+        return;
+    UA_NodeId *sessionToken = UA_NodeId_new();
+    if(!sessionToken)
+        return;
+    UA_NodeId_copy(&session->authenticationToken, sessionToken);
+    UA_Server_delayedCallback(server, (UA_ServerCallback)UA_Subscription_answerPublishRequestsNoSubscription,
+                              sessionToken);
 }
 
 void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,

+ 31 - 2
src/server/ua_subscription.c

@@ -295,6 +295,9 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         sub->currentKeepAliveCount++;
         if(sub->currentKeepAliveCount < sub->maxKeepAliveCount)
             return;
+        UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
+                             "Sending a keepalive on subscription %u",
+                             sub->subscriptionID)
     }
 
     /* Check if the securechannel is valid */
@@ -412,6 +415,7 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
                          "Subscription %u | Sending out a publish response with %u "
                          "notifications", sub->subscriptionID, (UA_UInt32)notifications);
+    UA_assert(response->responseHeader.requestHandle != 0);
     UA_SecureChannel_sendBinaryMessage(sub->session->channel, requestId, response,
                                        &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
 
@@ -429,8 +433,6 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
 UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub) {
     if(sub->publishJobIsRegistered)
         return UA_STATUSCODE_GOOD;
-    if(!sub->publishingEnabled)
-        return UA_STATUSCODE_GOOD;
 
     UA_Job job;
     job.type = UA_JOBTYPE_METHODCALL;
@@ -451,4 +453,31 @@ UA_StatusCode Subscription_unregisterPublishJob(UA_Server *server, UA_Subscripti
     return UA_Server_removeRepeatedJob(server, sub->publishJobGuid);
 }
 
+/* When the session has publish requests stored but the last subscription is
+   deleted... Send out empty responses */
+void
+UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server, UA_NodeId *sessionToken) {
+    /* Get session */
+    UA_Session *session = UA_SessionManager_getSession(&server->sessionManager, sessionToken);
+    UA_NodeId_delete(sessionToken);
+
+    /* No session or there are remaining subscriptions */
+    if(!session || LIST_FIRST(&session->serverSubscriptions))
+        return;
+
+    /* Send a response for every queued request */
+    UA_PublishResponseEntry *pre;
+    while((pre = SIMPLEQ_FIRST(&session->responseQueue))) {
+        SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry);
+        UA_PublishResponse *response = &pre->response;
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
+        response->responseHeader.timestamp = UA_DateTime_now();
+        UA_assert(response->responseHeader.requestHandle != 0);
+        UA_SecureChannel_sendBinaryMessage(session->channel, pre->requestId, response,
+                                           &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
+        UA_PublishResponse_deleteMembers(response);
+        UA_free(pre);
+    }
+}
+
 #endif /* UA_ENABLE_SUBSCRIPTIONS */

+ 4 - 0
src/server/ua_subscription.h

@@ -118,4 +118,8 @@ UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemID
 
 void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub);
 
+void
+UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server,
+                                                    UA_NodeId *sessionToken);
+
 #endif /* UA_SUBSCRIPTION_H_ */

+ 0 - 20
src/ua_session.c

@@ -106,24 +106,4 @@ UA_UInt32 UA_Session_getUniqueSubscriptionID(UA_Session *session) {
     return ++(session->lastSubscriptionID);
 }
 
-void UA_Session_answerPublishRequestsWithoutSubscription(UA_Session *session) {
-    /* Are there remaining subscriptions? */
-    if(LIST_FIRST(&session->serverSubscriptions))
-        return;
-
-    /* Send a response for every queued request */
-    UA_PublishResponseEntry *pre;
-    while((pre = SIMPLEQ_FIRST(&session->responseQueue))) {
-        SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry);
-        UA_PublishResponse *response = &pre->response;
-        UA_UInt32 requestId = pre->requestId;
-        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
-        response->responseHeader.timestamp = UA_DateTime_now();
-        UA_SecureChannel_sendBinaryMessage(session->channel, requestId, response,
-                                           &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
-        UA_PublishResponse_deleteMembers(response);
-        UA_free(pre);
-    }
-}
-
 #endif

+ 0 - 2
src/ua_session.h

@@ -69,8 +69,6 @@ UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
 
 UA_UInt32
 UA_Session_getUniqueSubscriptionID(UA_Session *session);
-
-void UA_Session_answerPublishRequestsWithoutSubscription(UA_Session *session);
 #endif
 
 /**