Browse Source

Remove repeated job when a subscription has publishing disabled; improve logging

Julius Pfrommer 8 years ago
parent
commit
2ac78bb448
2 changed files with 78 additions and 25 deletions
  1. 62 20
      src/server/ua_services_subscription.c
  2. 16 5
      src/server/ua_subscription.c

+ 62 - 20
src/server/ua_services_subscription.c

@@ -15,8 +15,16 @@ setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
                         UA_Double requestedPublishingInterval,
                         UA_UInt32 requestedLifetimeCount,
                         UA_UInt32 requestedMaxKeepAliveCount,
-                        UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) {
-    Subscription_unregisterPublishJob(server, subscription);
+                        UA_UInt32 maxNotificationsPerPublish, UA_Byte priority)
+{
+    /* deregister the job if required */
+    UA_StatusCode retval = Subscription_unregisterPublishJob(server, subscription);
+    if(retval != UA_STATUSCODE_GOOD)
+        UA_LOG_DEBUG_SESSION(server->config.logger, subscription->session, "Subscription %u | "
+                             "Could not unregister publish job with error code 0x%08x",
+                             subscription->subscriptionID, retval);
+
+    /* re-parameterize the subscription */
     subscription->publishingInterval = requestedPublishingInterval;
     UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
                                requestedPublishingInterval, subscription->publishingInterval);
@@ -34,30 +42,45 @@ setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
        maxNotificationsPerPublish > server->config.maxNotificationsPerPublish)
         subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish;
     subscription->priority = priority;
-    Subscription_registerPublishJob(server, subscription);
+
+    retval = Subscription_registerPublishJob(server, subscription);
+    if(retval != UA_STATUSCODE_GOOD)
+        UA_LOG_DEBUG_SESSION(server->config.logger, subscription->session, "Subscription %u | "
+                             "Could not register publish job with error code 0x%08x",
+                             subscription->subscriptionID, retval);
 }
 
-void Service_CreateSubscription(UA_Server *server, UA_Session *session,
-                                const UA_CreateSubscriptionRequest *request,
-                                UA_CreateSubscriptionResponse *response) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing CreateSubscriptionRequest");
-    response->subscriptionId = UA_Session_getUniqueSubscriptionID(session);
+void
+Service_CreateSubscription(UA_Server *server, UA_Session *session,
+                           const UA_CreateSubscriptionRequest *request,
+                           UA_CreateSubscriptionResponse *response)
+{
+    /* Create the subscription */
     UA_Subscription *newSubscription = UA_Subscription_new(session, response->subscriptionId);
     if(!newSubscription) {
+        UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing CreateSubscriptionRequest failed");
         response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
         return;
     }
-
+    newSubscription->subscriptionID = UA_Session_getUniqueSubscriptionID(session);
     UA_Session_addSubscription(session, newSubscription);
+
+    /* Set the subscription parameters */
     newSubscription->publishingEnabled = request->publishingEnabled;
+    newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount;
     setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval,
                             request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
                             request->maxNotificationsPerPublish, request->priority);
-    /* immediately send the first response */
-    newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount;
+
+    /* Prepare the response */
+    response->subscriptionId = newSubscription->subscriptionID;
     response->revisedPublishingInterval = newSubscription->publishingInterval;
     response->revisedLifetimeCount = newSubscription->lifeTimeCount;
     response->revisedMaxKeepAliveCount = newSubscription->maxKeepAliveCount;
+
+    UA_LOG_DEBUG_SESSION(server->config.logger, session, "CreateSubscriptionRequest: Created Subscription %u "
+                         "with a publishing interval of %f ms", response->subscriptionId,
+                         newSubscription->publishingInterval);
 }
 
 void Service_ModifySubscription(UA_Server *server, UA_Session *session,
@@ -103,8 +126,14 @@ void Service_SetPublishingMode(UA_Server *server, UA_Session *session,
             response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
             continue;
         }
-        sub->publishingEnabled = request->publishingEnabled;
-        sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */
+        if(sub->publishingEnabled != request->publishingEnabled) {
+            sub->publishingEnabled = request->publishingEnabled;
+            sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */
+            if(sub->publishingEnabled)
+                Subscription_registerPublishJob(server, sub);
+            else
+                Subscription_unregisterPublishJob(server, sub);
+        }
     }
 }
 
@@ -290,7 +319,8 @@ void Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,
     response->resultsSize = request->itemsToModifySize;
 
     for(size_t i = 0; i < request->itemsToModifySize; i++)
-        Service_ModifyMonitoredItems_single(server, session, sub, &request->itemsToModify[i], &response->results[i]);
+        Service_ModifyMonitoredItems_single(server, session, sub, &request->itemsToModify[i],
+                                            &response->results[i]);
 
 }
 
@@ -407,7 +437,8 @@ Service_Publish(UA_Server *server, UA_Session *session,
     UA_Subscription *immediate;
     LIST_FOREACH(immediate, &session->serverSubscriptions, listEntry) {
         if(immediate->state == UA_SUBSCRIPTIONSTATE_LATE) {
-            UA_LOG_DEBUG_SESSION(server->config.logger, session, "Response on a late subscription",
+            UA_LOG_DEBUG_SESSION(server->config.logger, session, "Subscription %u | "
+                                 "Response on a late subscription", immediate->subscriptionID,
                                  session->authenticationToken.identifier.numeric);
             UA_Subscription_publishCallback(server, immediate);
             return;
@@ -415,9 +446,10 @@ Service_Publish(UA_Server *server, UA_Session *session,
     }
 }
 
-void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
-                                 const UA_DeleteSubscriptionsRequest *request,
-                                 UA_DeleteSubscriptionsResponse *response) {
+void
+Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
+                            const UA_DeleteSubscriptionsRequest *request,
+                            UA_DeleteSubscriptionsResponse *response) {
     UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing DeleteSubscriptionsRequest");
 
     if(request->subscriptionIdsSize == 0){
@@ -432,14 +464,24 @@ void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
     }
     response->resultsSize = request->subscriptionIdsSize;
 
-    for(size_t i = 0; i < request->subscriptionIdsSize; i++)
+    for(size_t i = 0; i < request->subscriptionIdsSize; i++) {
         response->results[i] = UA_Session_deleteSubscription(server, session, request->subscriptionIds[i]);
+        if(response->results[i] == UA_STATUSCODE_GOOD) {
+            UA_LOG_DEBUG_SESSION(server->config.logger, session, "Subscription %u | "
+                                "Subscription deleted", request->subscriptionIds[i]);
+        } else {
+            UA_LOG_DEBUG_SESSION(server->config.logger, session, "Deleting Subscription with Id "
+                                 "%u failed with error code 0x%08x", request->subscriptionIds[i],
+                                 response->results[i]);
+        }
+    }
 }
 
 void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
                                   const UA_DeleteMonitoredItemsRequest *request,
                                   UA_DeleteMonitoredItemsResponse *response) {
-    UA_LOG_DEBUG_SESSION(server->config.logger, session, "Processing DeleteMonitoredItemsRequest");
+    UA_LOG_DEBUG_SESSION(server->config.logger, session,
+                         "Processing DeleteMonitoredItemsRequest");
 
     if(request->monitoredItemIdsSize == 0) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;

+ 16 - 5
src/server/ua_subscription.c

@@ -55,7 +55,8 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
     MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
     if(!newvalue) {
         UA_LOG_WARNING_SESSION(server->config.logger, sub->session, "Subscription %u | MonitoredItem %i | "
-                               "Skipped a sample due to lack of memory", sub->subscriptionID, monitoredItem->itemId);
+                               "Skipped a sample due to lack of memory", sub->subscriptionID,
+                               monitoredItem->itemId);
         return;
     }
     UA_DataValue_init(&newvalue->value);
@@ -222,6 +223,9 @@ UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
 }
 
 void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
+    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
+                         "Publish Callback", sub->subscriptionID);
+
     /* Count the available notifications */
     size_t notifications = 0;
     UA_Boolean moreNotifications = false;
@@ -262,8 +266,8 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         } else {
             sub->currentLifetimeCount++;
             if(sub->currentLifetimeCount > sub->lifeTimeCount) {
-                UA_LOG_INFO_SESSION(server->config.logger, sub->session, "Subscription %u | "
-                                    "End of lifetime for subscription", sub->subscriptionID);
+                UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | "
+                                     "End of lifetime for subscription", sub->subscriptionID);
                 UA_Session_deleteSubscription(server, sub->session, sub->subscriptionID);
             }
         }
@@ -316,7 +320,8 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
                 mon_l++;
             }
             UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | MonitoredItem %u | " \
-                                 "Adding %u notifications to the publish response. %u notifications remain in the queue",
+                                 "Adding %u notifications to the publish response. " \
+                                 "%u notifications remain in the queue",
                                  sub->subscriptionID, mon->itemId, mon_l, mon->currentQueueSize);
             l += mon_l;
         }
@@ -352,7 +357,8 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
 
     /* Send the response */
     UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                         "Sending out a publish response with %u notifications", (UA_UInt32)notifications);
+                         "Subscription %u | Sending out a publish response with %u notifications",
+                         sub->subscriptionID, (UA_UInt32)notifications);
     UA_SecureChannel_sendBinaryMessage(sub->session->channel, requestId, response,
                                        &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
 
@@ -368,6 +374,11 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
 }
 
 UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub) {
+    if(sub->publishJobIsRegistered)
+        return UA_STATUSCODE_GOOD;
+    if(!sub->publishingEnabled)
+        return UA_STATUSCODE_GOOD;
+
     UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
                            .job.methodCall = {.method = (UA_ServerCallback)UA_Subscription_publishCallback,
                                               .data = sub} };