Browse Source

fix a rare memory issue for subcriptions; add more logging to debug remaining CTT cases

Julius Pfrommer 8 years ago
parent
commit
6b4c6f5f19
2 changed files with 19 additions and 7 deletions
  1. 18 6
      src/server/ua_subscription.c
  2. 1 1
      src/server/ua_subscription.h

+ 18 - 6
src/server/ua_subscription.c

@@ -46,16 +46,16 @@ void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
 void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
     UA_Subscription *sub = monitoredItem->subscription;
     if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
-        UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "MonitoredItem %i | "
-                     "Cannot process a monitoreditem that is not a data change notification",
-                     monitoredItem->itemId);
+        UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | MonitoredItem %i | "
+                             "Cannot process a monitoreditem that is not a data change notification",
+                             sub->subscriptionID, monitoredItem->itemId);
         return;
     }
 
     MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
     if(!newvalue) {
-        UA_LOG_WARNING_SESSION(server->config.logger, sub->session, "MonitoredItem %i | "
-                            "Skipped a sample due to lack of memory", monitoredItem->itemId);
+        UA_LOG_WARNING_SESSION(server->config.logger, sub->session, "Subscription %u | MonitoredItem %i | "
+                               "Skipped a sample due to lack of memory", sub->subscriptionID, monitoredItem->itemId);
         return;
     }
     UA_DataValue_init(&newvalue->value);
@@ -116,6 +116,13 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
         }
     }
 
+    /* if the read request returned a datavalue pointing into the nodestore, we
+       must make a copy to keep the datavalue across mainloop iterations */
+    if(newvalue->value.hasValue && newvalue->value.value.storageType == UA_VARIANT_DATA_NODELETE) {
+        UA_Variant tempv = newvalue->value.value;
+        UA_Variant_copy(&tempv, &newvalue->value.value);
+    }
+
     /* add the sample */
     UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
     monitoredItem->lastSampledValue = newValueAsByteString;
@@ -289,6 +296,7 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
         UA_MonitoredItem *mon;
         LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
             MonitoredItem_queuedValue *qv, *qv_tmp;
+            size_t mon_l = 0;
             TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
                 if(notifications <= l)
                     break;
@@ -298,8 +306,12 @@ void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
                 TAILQ_REMOVE(&mon->queue, qv, listEntry);
                 UA_free(qv);
                 mon->currentQueueSize--;
-                l++;
+                mon_l++;
             }
+            UA_LOG_DEBUG_SESSION(server->config.logger, sub->session, "Subscription %u | MonitoredItem %u | " \
+                                 "Adding %u notifications to the publish response. %u notifications remain in the queue",
+                                 sub->subscriptionID, mon->itemId, mon_l, mon->currentQueueSize);
+            l += mon_l;
         }
         data->encoding = UA_EXTENSIONOBJECT_DECODED;
         data->content.decoded.data = dcn;

+ 1 - 1
src/server/ua_subscription.h

@@ -32,7 +32,7 @@ typedef struct UA_MonitoredItem {
     UA_MonitoredItemType monitoredItemType;
     UA_TimestampsToReturn timestampsToReturn;
     UA_MonitoringMode monitoringMode;
-    UA_NodeId monitoredNodeId; 
+    UA_NodeId monitoredNodeId;
     UA_UInt32 attributeID;
     UA_UInt32 clientHandle;
     UA_Double samplingInterval; // [ms]