Pārlūkot izejas kodu

better statuscode checking in publish callback

Julius Pfrommer 8 gadi atpakaļ
vecāks
revīzija
fb6ed18703
1 mainītis faili ar 116 papildinājumiem un 78 dzēšanām
  1. 116 78
      src/server/ua_subscription.c

+ 116 - 78
src/server/ua_subscription.c

@@ -269,26 +269,82 @@ UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
     return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
 }
 
-void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
-                         "Publish Callback", sub->subscriptionID);
-
-    /* Count the available notifications */
+static size_t
+countQueuedNotifications(UA_Subscription *sub, UA_Boolean *moreNotifications) {
     size_t notifications = 0;
-    UA_Boolean moreNotifications = false;
     if(sub->publishingEnabled) {
         UA_MonitoredItem *mon;
         LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
             MonitoredItem_queuedValue *qv;
             TAILQ_FOREACH(qv, &mon->queue, listEntry) {
                 if(notifications >= sub->notificationsPerPublish) {
-                    moreNotifications = true;
+                    *moreNotifications = true;
                     break;
                 }
                 notifications++;
             }
         }
     }
+    return notifications;
+}
+
+static UA_StatusCode
+prepareNotificationMessage(UA_Subscription *sub, UA_NotificationMessage *message,
+                           size_t notifications) {
+    /* Array of ExtensionObject to hold different kinds of notifications
+       (currently only DataChangeNotifications) */
+    message->notificationData = UA_Array_new(1, &UA_TYPES[UA_TYPES_EXTENSIONOBJECT]);
+    if(!message->notificationData)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    message->notificationDataSize = 1;
+
+    /* Allocate Notification */
+    UA_DataChangeNotification *dcn = UA_DataChangeNotification_new();
+    if(!dcn)
+        goto cleanup;
+    UA_ExtensionObject *data = message->notificationData;
+    data->encoding = UA_EXTENSIONOBJECT_DECODED;
+    data->content.decoded.data = dcn;
+    data->content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
+
+    /* Allocate array of notifications */
+    dcn->monitoredItems =
+        UA_Array_new(notifications, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
+    if(!dcn->monitoredItems)
+        goto cleanup;
+    dcn->monitoredItemsSize = notifications;
+
+    /* Move notifications into the response .. the point of no return */
+    size_t l = 0;
+    UA_MonitoredItem *mon;
+    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+        MonitoredItem_queuedValue *qv, *qv_tmp;
+        TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
+            if(l >= notifications)
+                return UA_STATUSCODE_GOOD;
+            UA_MonitoredItemNotification *min = &dcn->monitoredItems[l];
+            min->clientHandle = qv->clientHandle;
+            min->value = qv->value;
+            TAILQ_REMOVE(&mon->queue, qv, listEntry);
+            UA_free(qv);
+            mon->currentQueueSize--;
+            l++;
+        }
+    }
+    return UA_STATUSCODE_GOOD;
+    
+ cleanup:
+    UA_NotificationMessage_deleteMembers(message);
+    return UA_STATUSCODE_BADOUTOFMEMORY;
+}
+
+void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
+    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
+                         "Publish Callback", sub->subscriptionID);
+
+    /* Count the available notifications */
+    UA_Boolean moreNotifications = false;
+    size_t notifications = countQueuedNotifications(sub, &moreNotifications);
 
     /* Return if nothing to do */
     if(notifications == 0) {
@@ -307,6 +363,8 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
 
     /* Dequeue a response */
     UA_PublishResponseEntry *pre = SIMPLEQ_FIRST(&sub->session->responseQueue);
+
+    /* Cannot publish without a response */
     if(!pre) {
         UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
                              "Cannot send a publish response on subscription %u, "
@@ -324,20 +382,39 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         return;
     }
 
-    SIMPLEQ_REMOVE_HEAD(&sub->session->responseQueue, listEntry);
     UA_PublishResponse *response = &pre->response;
-    UA_UInt32 requestId = pre->requestId;
+    UA_NotificationMessage *message = &response->notificationMessage;
+    UA_NotificationMessageEntry *retransmission;
+    if(notifications > 0) {
+        /* Allocate the retransmission entry */
+        retransmission = malloc(sizeof(UA_NotificationMessageEntry));
+        if(!retransmission) {
+            UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
+                                   "Subscription %u | Could not allocate memory "
+                                   "for retransmission", sub->subscriptionID);
+            return;
+        }
+        /* Prepare the response */
+        UA_StatusCode retval =
+            prepareNotificationMessage(sub, message, notifications);
+        if(retval != UA_STATUSCODE_GOOD) {
+            UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
+                                   "Subscription %u | Could not prepare the "
+                                   "notification message", sub->subscriptionID);
+            UA_free(retransmission);
+            return;
+        }
+    }
 
-    /* We have a request. Reset state to normal. */
-    sub->state = UA_SUBSCRIPTIONSTATE_NORMAL;
-    sub->currentKeepAliveCount = 0;
-    sub->currentLifetimeCount = 0;
+    /* <-- The point of no return --> */
+
+    /* Remove the response from the response queue */
+    SIMPLEQ_REMOVE_HEAD(&sub->session->responseQueue, listEntry);
 
-    /* Prepare the response */
+    /* Set up the response */
     response->responseHeader.timestamp = UA_DateTime_now();
     response->subscriptionId = sub->subscriptionID;
     response->moreNotifications = moreNotifications;
-    UA_NotificationMessage *message = &response->notificationMessage;
     message->publishTime = response->responseHeader.timestamp;
     if(notifications == 0) {
         /* Send sequence number for the next notification */
@@ -346,66 +423,23 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         /* Increase the sequence number */
         message->sequenceNumber = ++sub->sequenceNumber;
 
-        /* Collect the notification messages */
-        message->notificationData = UA_ExtensionObject_new();
-        message->notificationDataSize = 1;
-        UA_ExtensionObject *data = message->notificationData;
-        UA_DataChangeNotification *dcn = UA_DataChangeNotification_new();
-        dcn->monitoredItems =
-            UA_Array_new(notifications, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
-        dcn->monitoredItemsSize = notifications;
-        size_t l = 0;
-        UA_MonitoredItem *mon;
-        LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
-            MonitoredItem_queuedValue *qv, *qv_tmp;
-            size_t mon_l = 0;
-            TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
-                if(notifications <= l)
-                    break;
-                UA_MonitoredItemNotification *min = &dcn->monitoredItems[l];
-                min->clientHandle = qv->clientHandle;
-                min->value = qv->value;
-                TAILQ_REMOVE(&mon->queue, qv, listEntry);
-                UA_free(qv);
-                mon->currentQueueSize--;
-                mon_l++;
-            }
-            UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                                 "Subscription %u | MonitoredItem %u | "
-                                 "Adding %u notifications to the publish response. "
-                                 "%u notifications remain in the queue",
-                                 sub->subscriptionID, mon->itemId, mon_l,
-                                 mon->currentQueueSize);
-            l += mon_l;
-        }
-        data->encoding = UA_EXTENSIONOBJECT_DECODED;
-        data->content.decoded.data = dcn;
-        data->content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
-
-        /* Put the notification message into the retransmission queue */
-        UA_NotificationMessageEntry *retransmission =
-            malloc(sizeof(UA_NotificationMessageEntry));
-        if(retransmission) {
-            UA_NotificationMessage_copy(&response->notificationMessage,
-                                        &retransmission->message);
-            LIST_INSERT_HEAD(&sub->retransmissionQueue, retransmission, listEntry);
-        } else {
-            UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
-                                   "Subscription %u | Could not allocate memory "
-                                   "for retransmission", sub->subscriptionID);
-        }
+        /* Put the notification message into the retransmission queue. This needs to
+         * be done here, so that the message itself is included in the available
+         * sequence numbers for acknowledgement. */
+        retransmission->message = response->notificationMessage;
+        LIST_INSERT_HEAD(&sub->retransmissionQueue, retransmission, listEntry);
     }
 
     /* Get the available sequence numbers from the retransmission queue */
-    size_t available = 0, i = 0;
+    size_t available = 0;
     UA_NotificationMessageEntry *nme;
     LIST_FOREACH(nme, &sub->retransmissionQueue, listEntry)
         available++;
-    //cppcheck-suppress knownConditionTrueFalse
     if(available > 0) {
         response->availableSequenceNumbers = UA_alloca(available * sizeof(UA_UInt32));
         response->availableSequenceNumbersSize = available;
     }
+    size_t i = 0;
     LIST_FOREACH(nme, &sub->retransmissionQueue, listEntry) {
         response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
         i++;
@@ -415,22 +449,26 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
     UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
                          "Subscription %u | Sending out a publish response with %u "
                          "notifications", sub->subscriptionID, (UA_UInt32)notifications);
-    UA_assert(response->responseHeader.requestHandle != 0);
-    UA_SecureChannel_sendBinaryMessage(sub->session->channel, requestId, response,
+    UA_SecureChannel_sendBinaryMessage(sub->session->channel, pre->requestId, response,
                                        &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
 
-    /* Remove the queued request */
-    response->availableSequenceNumbers = NULL; /* stack-allocated */
-    response->availableSequenceNumbersSize = 0;
-    UA_PublishResponse_deleteMembers(&pre->response);
-    UA_free(pre);
+    /* Reset subscription state to normal. */
+    sub->state = UA_SUBSCRIPTIONSTATE_NORMAL;
+    sub->currentKeepAliveCount = 0;
+    sub->currentLifetimeCount = 0;
+    
+    /* Free the response */
+    UA_Array_delete(response->results, response->resultsSize,
+                    &UA_TYPES[UA_TYPES_UINT32]);
+    UA_free(pre); /* no need for UA_PublishResponse_deleteMembers */
 
     /* Repeat if there are more notifications to send */
     if(moreNotifications)
         UA_Subscription_publishCallback(server, sub);
 }
 
-UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub) {
+UA_StatusCode
+Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub) {
     if(sub->publishJobIsRegistered)
         return UA_STATUSCODE_GOOD;
 
@@ -438,15 +476,16 @@ UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription
     job.type = UA_JOBTYPE_METHODCALL;
     job.job.methodCall.method = (UA_ServerCallback)UA_Subscription_publishCallback;
     job.job.methodCall.data = sub;
-    UA_StatusCode retval = UA_Server_addRepeatedJob(server, job,
-                                                    (UA_UInt32)sub->publishingInterval,
-                                                    &sub->publishJobGuid);
+    UA_StatusCode retval =
+        UA_Server_addRepeatedJob(server, job, (UA_UInt32)sub->publishingInterval,
+                                 &sub->publishJobGuid);
     if(retval == UA_STATUSCODE_GOOD)
         sub->publishJobIsRegistered = true;
     return retval;
 }
 
-UA_StatusCode Subscription_unregisterPublishJob(UA_Server *server, UA_Subscription *sub) {
+UA_StatusCode
+Subscription_unregisterPublishJob(UA_Server *server, UA_Subscription *sub) {
     if(!sub->publishJobIsRegistered)
         return UA_STATUSCODE_GOOD;
     sub->publishJobIsRegistered = false;
@@ -472,7 +511,6 @@ UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server, UA_NodeId
         UA_PublishResponse *response = &pre->response;
         response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
         response->responseHeader.timestamp = UA_DateTime_now();
-        UA_assert(response->responseHeader.requestHandle != 0);
         UA_SecureChannel_sendBinaryMessage(session->channel, pre->requestId, response,
                                            &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
         UA_PublishResponse_deleteMembers(response);