Browse Source

Subscriptions: Remove complexity and log more status codes

Julius Pfrommer 5 years ago
parent
commit
a82e151e55
1 changed files with 101 additions and 127 deletions
  1. 101 127
      src/server/ua_subscription_datachange.c

+ 101 - 127
src/server/ua_subscription_datachange.c

@@ -185,17 +185,18 @@ updateNeededForStatusCode(const UA_DataValue *value, const UA_MonitoredItem *mon
 #endif
 
 
-/* When a change is detected, encoding contains the heap-allocated binary encoded value */
-static UA_Boolean
-detectValueChangeWithFilter(UA_Server *server, UA_Subscription *sub, UA_MonitoredItem *mon,
-                            UA_DataValue *value, UA_ByteString *encoding) {
+/* When a change is detected, encoding contains the heap-allocated binary
+ * encoded value. The default for changed is false. */
+static UA_StatusCode
+detectValueChangeWithFilter(UA_Server *server, UA_MonitoredItem *mon, UA_DataValue *value,
+                            UA_ByteString *encoding, UA_Boolean *changed) {
     if(UA_DataType_isNumeric(value->value.type) &&
        (mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
         mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
         if(mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE) {
             if(!updateNeededForFilteredValue(&value->value, &mon->lastValue,
                                              mon->filter.dataChangeFilter.deadbandValue))
-                return false;
+                return UA_STATUSCODE_GOOD;
         }
 #ifdef UA_ENABLE_DA
 		else if(mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_PERCENT) {
@@ -203,14 +204,14 @@ detectValueChangeWithFilter(UA_Server *server, UA_Subscription *sub, UA_Monitore
 			UA_BrowsePathResult bpr = UA_Server_browseSimplifiedBrowsePath(server, mon->monitoredNodeId, 1, &qn);
 			if(bpr.statusCode != UA_STATUSCODE_GOOD || bpr.targetsSize < 1) { //if branch is not entried, property has been found
 				  UA_BrowsePathResult_deleteMembers(&bpr);
-				  return false;
+				  return UA_STATUSCODE_GOOD;
 			}
 			const UA_VariableNode* node = (const UA_VariableNode*) UA_Nodestore_get(server, &bpr.targets->targetId.nodeId);
 			UA_Range* euRange = (UA_Range*) node->value.data.value.value.data;
 			if(!updateNeededForFilteredPercentValue(&value->value, &mon->lastValue,
 			                                        mon->filter.dataChangeFilter.deadbandValue, euRange)) {
 				if(!updateNeededForStatusCode(value, mon)) //when same value, but different status code is written
-				  return false;
+				  return UA_STATUSCODE_GOOD;
 			}
 		}
 #endif
@@ -232,7 +233,7 @@ detectValueChangeWithFilter(UA_Server *server, UA_Subscription *sub, UA_Monitore
     if(retval == UA_STATUSCODE_BADENCODINGERROR) {
         size_t binsize = UA_calcSizeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE]);
         if(binsize == 0)
-            return false;
+            return UA_STATUSCODE_BADENCODINGERROR;
 
         if(binsize > UA_VALUENCODING_MAXSTACK) {
             retval = UA_ByteString_allocBuffer(&valueEncoding, binsize);
@@ -244,51 +245,37 @@ detectValueChangeWithFilter(UA_Server *server, UA_Subscription *sub, UA_Monitore
             }
         }
     }
-
     if(retval != UA_STATUSCODE_GOOD) {
-        UA_LOG_WARNING_SESSION(&server->config.logger, sub ? sub->session : &server->adminSession,
-                               "Subscription %u | MonitoredItem %i | "
-                               "Could not encode the value the MonitoredItem with status %s",
-                               sub ? sub->subscriptionId : 0, mon->monitoredItemId,
-                               UA_StatusCode_name(retval));
-        return false;
+        if(valueEncoding.data != stackValueEncoding)
+            UA_ByteString_deleteMembers(&valueEncoding);
+        return retval;
     }
 
     /* Has the value changed? */
     valueEncoding.length = (uintptr_t)bufPos - (uintptr_t)valueEncoding.data;
-    UA_Boolean changed = (!mon->lastSampledValue.data ||
-                          !UA_String_equal(&valueEncoding, &mon->lastSampledValue));
+    *changed = (!mon->lastSampledValue.data ||
+                !UA_String_equal(&valueEncoding, &mon->lastSampledValue));
 
     /* No change */
-    if(!changed) {
+    if(!(*changed)) {
         if(valueEncoding.data != stackValueEncoding)
             UA_ByteString_deleteMembers(&valueEncoding);
-        return false;
+        return UA_STATUSCODE_GOOD;
     }
 
     /* Change detected. Copy encoding on the heap if necessary. */
-    if(valueEncoding.data == stackValueEncoding) {
-        retval = UA_ByteString_copy(&valueEncoding, encoding);
-        if(retval != UA_STATUSCODE_GOOD) {
-            UA_LOG_WARNING_SESSION(&server->config.logger, sub ? sub->session : &server->adminSession,
-                                   "Subscription %u | MonitoredItem %i | "
-                                   "Detected change, but could not allocate memory for the notification"
-                                   "with status %s", sub ? sub->subscriptionId : 0,
-                                   mon->monitoredItemId, UA_StatusCode_name(retval));
-            return false;
-        }
-        return true;
-    }
+    if(valueEncoding.data == stackValueEncoding)
+        return UA_ByteString_copy(&valueEncoding, encoding);
 
     *encoding = valueEncoding;
-    return true;
+    return UA_STATUSCODE_GOOD;
 }
 
 /* Has this sample changed from the last one? The method may allocate additional
  * space for the encoding buffer. Detect the change in encoding->data. */
-static UA_Boolean
+static UA_StatusCode
 detectValueChange(UA_Server *server, UA_MonitoredItem *mon,
-                  UA_DataValue value, UA_ByteString *encoding) {
+                  UA_DataValue value, UA_ByteString *encoding, UA_Boolean *changed) {
     /* Apply Filter */
     if(mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUS)
         value.hasValue = false;
@@ -301,127 +288,107 @@ detectValueChange(UA_Server *server, UA_MonitoredItem *mon,
     }
 
     /* Detect the value change */
-    return detectValueChangeWithFilter(server, mon->subscription, mon, &value, encoding);
+    return detectValueChangeWithFilter(server, mon, &value, encoding, changed);
 }
 
-/* Returns whether the sample was stored in the MonitoredItem */
-static UA_Boolean
-sampleCallbackWithValue(UA_Server *server, UA_MonitoredItem *monitoredItem,
-                        UA_DataValue *value) {
-    UA_assert(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY);
-    UA_Subscription *sub = monitoredItem->subscription;
+/* movedValue returns whether the sample was moved to the notification. The
+ * default is false. */
+static UA_StatusCode
+sampleCallbackWithValue(UA_Server *server, UA_Session *session,
+                        UA_Subscription *sub, UA_MonitoredItem *mon,
+                        UA_DataValue *value, UA_Boolean *movedValue) {
+    UA_assert(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY);
 
     /* Contains heap-allocated binary encoding of the value if a change was detected */
     UA_ByteString binValueEncoding = UA_BYTESTRING_NULL;
 
     /* Has the value changed? Allocates memory in binValueEncoding if necessary.
      * value is edited internally so we make a shallow copy. */
-    UA_Boolean changed = detectValueChange(server, monitoredItem, *value, &binValueEncoding);
-    if(!changed)
-        return false;
+    UA_Boolean changed = false;
+    UA_StatusCode retval = detectValueChange(server, mon, *value, &binValueEncoding, &changed);
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_LOG_WARNING_SESSION(&server->config.logger, session, "Subscription %u | "
+                               "MonitoredItem %i | Value change detection failed with StatusCode %s",
+                               sub ? sub->subscriptionId : 0, mon->monitoredItemId,
+                               UA_StatusCode_name(retval));
+        return retval;
+    }
+    if(!changed) {
+        UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Subscription %u | "
+                             "MonitoredItem %i | The value has not changed",
+                             sub ? sub->subscriptionId : 0, mon->monitoredItemId);
+        return UA_STATUSCODE_GOOD;
+    }
 
-    UA_Boolean storedValue = false;
+    /* The MonitoredItem is attached to a subscription (not server-local).
+     * Prepare a notification and enqueue it. */
     if(sub) {
         /* Allocate a new notification */
         UA_Notification *newNotification = (UA_Notification *)UA_malloc(sizeof(UA_Notification));
         if(!newNotification) {
-            UA_LOG_WARNING_SESSION(&server->config.logger, sub ? sub->session : &server->adminSession,
-                                   "Subscription %u | MonitoredItem %i | "
-                                   "Item for the publishing queue could not be allocated",
-                                   sub->subscriptionId, monitoredItem->monitoredItemId);
             UA_ByteString_deleteMembers(&binValueEncoding);
-            return false;
+            return UA_STATUSCODE_BADOUTOFMEMORY;
         }
 
-        if(monitoredItem->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUS){
-            if(value->value.storageType == UA_VARIANT_DATA) {
-                newNotification->data.value.hasStatus = true;
-                UA_StatusCode_copy(&value->status, &newNotification->data.value.status);
-                storedValue = true;
-            }
-            else { /* => (value->value.storageType == UA_VARIANT_DATA_NODELETE) */
-                UA_StatusCode retval = UA_DataValue_copy(value, &newNotification->data.value);
-                if(retval != UA_STATUSCODE_GOOD) {
-                    UA_ByteString_deleteMembers(&binValueEncoding);
-                    UA_free(newNotification);
-                    return false;
-                }
-            }
-        }
-        else if(monitoredItem->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
-                monitoredItem->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP) {
-            if(value->value.storageType == UA_VARIANT_DATA) {
-                newNotification->data.value = *value; /* Move the value to the notification */
-                storedValue = true;
-            } else { /* => (value->value.storageType == UA_VARIANT_DATA_NODELETE) */
-                UA_StatusCode retval = UA_DataValue_copy(value, &newNotification->data.value);
-                if(retval != UA_STATUSCODE_GOOD) {
-                    UA_ByteString_deleteMembers(&binValueEncoding);
-                    UA_free(newNotification);
-                    return false;
-                }
+        if(value->value.storageType == UA_VARIANT_DATA) {
+            newNotification->data.value = *value; /* Move the value to the notification */
+            *movedValue = true;
+        } else { /* => (value->value.storageType == UA_VARIANT_DATA_NODELETE) */
+            retval = UA_DataValue_copy(value, &newNotification->data.value);
+            if(retval != UA_STATUSCODE_GOOD) {
+                UA_ByteString_deleteMembers(&binValueEncoding);
+                UA_free(newNotification);
+                return retval;
             }
         }
 
         /* <-- Point of no return --> */
 
-        /* Enqueue the new notification */
-        newNotification->mon = monitoredItem;
-        UA_Notification_enqueue(server, sub, monitoredItem, newNotification);
-    } else {
-        /* Call the local callback if not attached to a subscription */
-        UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*) monitoredItem;
-        void *nodeContext = NULL;
-        UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &nodeContext);
-        localMon->callback.dataChangeCallback(server, monitoredItem->monitoredItemId,
-                                              localMon->context,
-                                              &monitoredItem->monitoredNodeId,
-                                              nodeContext, monitoredItem->attributeId,
-                                              value);
-    }
+        UA_LOG_DEBUG_SESSION(&server->config.logger, session, "Subscription %u | "
+                             "MonitoredItem %i | Enqueue a new notification",
+                             sub ? sub->subscriptionId : 0, mon->monitoredItemId);
 
-    // If someone called UA_Server_deleteMonitoredItem in the user callback,
-    // then the monitored item will be deleted soon. So, there is no need to
-    // add the lastValue or lastSampledValue to it.
-    //
-    // If we do so, we will leak
-    // the memory of that values, because UA_Server_deleteMonitoredItem
-    // already deleted all members and scheduled the monitored item pointer
-    // for later delete. In the later delete the monitored item will be deleted
-    // and not the members.
-    //
-    // Also in the later delete, all type information is lost and a deleteMember
-    // is not possible.
-    //
-    // We do detect if the monitored item is already defunct.
-    if (!monitoredItem->sampleCallbackIsRegistered) {
-        UA_ByteString_deleteMembers(&binValueEncoding);
-        return storedValue;
+        newNotification->mon = mon;
+        UA_Notification_enqueue(server, sub, mon, newNotification);
     }
 
     /* Store the encoding for comparison */
-    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
-    monitoredItem->lastSampledValue = binValueEncoding;
+    UA_ByteString_deleteMembers(&mon->lastSampledValue);
+    mon->lastSampledValue = binValueEncoding;
 
     /* Store the value for filter comparison (we don't want to decode
-     * lastSampledValue in every iteration) */
-    if((monitoredItem->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_NONE ||
-        monitoredItem->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE ||
-        monitoredItem->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_PERCENT) &&
-       (monitoredItem->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUS ||
-        monitoredItem->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
-        monitoredItem->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
-        UA_Variant_deleteMembers(&monitoredItem->lastValue);
-        UA_Variant_copy(&value->value, &monitoredItem->lastValue);
+     * lastSampledValue in every iteration). Don't test the return code here. If
+     * this fails, lastValue is empty and a notification will be forced for the
+     * next deadband comparison. */
+    if((mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_NONE ||
+        mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE ||
+        mon->filter.dataChangeFilter.deadbandType == UA_DEADBANDTYPE_PERCENT) &&
+       (mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUS ||
+        mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
+        mon->filter.dataChangeFilter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
+        UA_Variant_deleteMembers(&mon->lastValue);
+        UA_Variant_copy(&value->value, &mon->lastValue);
 #ifdef UA_ENABLE_DA
-        UA_StatusCode_deleteMembers(&monitoredItem->lastStatus);
-        UA_StatusCode_copy(&value->status, &monitoredItem->lastStatus);
+        UA_StatusCode_deleteMembers(&mon->lastStatus);
+        UA_StatusCode_copy(&value->status, &mon->lastStatus);
 #endif
-        /* Don't test the return code here. If this fails, lastValue is empty
-         * and a notification will be forced for the next deadband comparison. */
     }
 
-    return storedValue;
+    /* Call the local callback if the MonitoredItem is not attached to a
+     * subscription. Do this at the very end. Because the callback might delete
+     * the subscription. */
+    if(!sub) {
+        UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*) mon;
+        void *nodeContext = NULL;
+        UA_Server_getNodeContext(server, mon->monitoredNodeId, &nodeContext);
+        localMon->callback.dataChangeCallback(server, mon->monitoredItemId,
+                                              localMon->context,
+                                              &mon->monitoredNodeId,
+                                              nodeContext, mon->attributeId,
+                                              value);
+    }
+
+    return UA_STATUSCODE_GOOD;
 }
 
 void
@@ -461,10 +428,17 @@ UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredIt
     }
 
     /* Operate on the sample */
-    UA_Boolean storedValue = sampleCallbackWithValue(server, monitoredItem, &value);
+    UA_Boolean movedValue = false;
+    UA_StatusCode retval = sampleCallbackWithValue(server, session, sub, monitoredItem, &value, &movedValue);
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_LOG_WARNING_SESSION(&server->config.logger, session, "Subscription %u | "
+                               "MonitoredItem %i | Sampling returned the statuscode %s",
+                               sub ? sub->subscriptionId : 0, monitoredItem->monitoredItemId,
+                               UA_StatusCode_name(retval));
+    }
 
-    /* Delete the sample if it was not stored in the MonitoredItem  */
-    if(!storedValue)
+    /* Delete the sample if it was not moved to the notification. */
+    if(!movedValue)
         UA_DataValue_deleteMembers(&value); /* Does nothing for UA_VARIANT_DATA_NODELETE */
     if(node)
         UA_Nodestore_release(server, node);