Browse Source

Add handling of local callbacks in the MonitoredItem callback

Julius Pfrommer 6 years ago
parent
commit
c2bae47473
1 changed files with 154 additions and 151 deletions
  1. 154 151
      src/server/ua_subscription_datachange.c

+ 154 - 151
src/server/ua_subscription_datachange.c

@@ -22,34 +22,29 @@ void UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
 }
 
 void UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
-    UA_Subscription *sub = monitoredItem->subscription;
-    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                        "Subscription %u | MonitoredItem %i | "
-                        "Delete the MonitoredItem", sub->subscriptionId,
-                        monitoredItem->monitoredItemId);
-
     if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
         /* Remove the sampling callback */
         UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
-
     } else {
         /* TODO: Access val data.event */
         UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
                      "MonitoredItemTypes other than ChangeNotify are not supported yet");
     }
 
-    /* Remove the queued notifications */
-    UA_Notification *notification, *notification_tmp;
-    TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
-                       listEntry, notification_tmp) {
-        /* Remove the item from the queues */
-        TAILQ_REMOVE(&monitoredItem->queue, notification, listEntry);
-        TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
-        --sub->notificationQueueSize;
-
-        UA_Notification_delete(notification);
+    /* Remove the queued notifications if attached to a subscription */
+    if(monitoredItem->subscription) {
+        UA_Subscription *sub = monitoredItem->subscription;
+        UA_Notification *notification, *notification_tmp;
+        TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
+                           listEntry, notification_tmp) {
+            /* Remove the item from the queues */
+            TAILQ_REMOVE(&monitoredItem->queue, notification, listEntry);
+            TAILQ_REMOVE(&sub->notificationQueue, notification, globalEntry);
+            --sub->notificationQueueSize;
+            UA_Notification_delete(notification);
+        }
+        monitoredItem->queueSize = 0;
     }
-    monitoredItem->queueSize = 0;
 
     /* Remove the monitored item */
     UA_String_deleteMembers(&monitoredItem->indexRange);
@@ -194,17 +189,26 @@ updateNeededForFilteredValue(const UA_Variant *value, const UA_Variant *oldValue
     return false;
 }
 
-/* Errors are returned as no change detected */
+/* When a change is detected, encoding contains the heap-allocated binary encoded value */
 static UA_Boolean
-detectValueChangeWithFilter(UA_MonitoredItem *mon, UA_DataValue *value,
+detectValueChangeWithFilter(UA_Server *server, UA_MonitoredItem *mon, UA_DataValue *value,
                             UA_ByteString *encoding) {
-    if (isDataTypeNumeric(value->value.type)
-            && (mon->filter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE
-                || mon->filter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
-        if (mon->filter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE) {
-            if (!updateNeededForFilteredValue(&value->value, &mon->lastValue, mon->filter.deadbandValue))
+    UA_Session *session = &adminSession;
+    UA_UInt32 subscriptionId = 0;
+    UA_Subscription *sub = mon->subscription;
+    if(sub) {
+        session = sub->session;
+        subscriptionId = sub->subscriptionId;
+    }
+
+    if(isDataTypeNumeric(value->value.type) &&
+       (mon->filter.trigger == UA_DATACHANGETRIGGER_STATUSVALUE ||
+        mon->filter.trigger == UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP)) {
+        if(mon->filter.deadbandType == UA_DEADBANDTYPE_ABSOLUTE) {
+            if(!updateNeededForFilteredValue(&value->value, &mon->lastValue, mon->filter.deadbandValue))
                 return false;
-        } /*else if (mon->filter.deadbandType == UA_DEADBANDTYPE_PERCENT) {
+        }
+        /* else if (mon->filter.deadbandType == UA_DEADBANDTYPE_PERCENT) {
             // TODO where do this EURange come from ?
             UA_Double deadbandValue = fabs(mon->filter.deadbandValue * (EURange.high-EURange.low));
             if (!updateNeededForFilteredValue(value->value, mon->lastValue, deadbandValue))
@@ -212,185 +216,184 @@ detectValueChangeWithFilter(UA_MonitoredItem *mon, UA_DataValue *value,
         }*/
     }
 
-    /* Encode the data for comparison */
-    size_t binsize = UA_calcSizeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE]);
-    if(binsize == 0)
-        return false;
-
-    /* Allocate buffer on the heap if necessary */
-    if(binsize > UA_VALUENCODING_MAXSTACK &&
-       UA_ByteString_allocBuffer(encoding, binsize) != UA_STATUSCODE_GOOD)
-        return false;
+    /* Stack-allocate some memory for the value encoding. We might heap-allocate
+     * more memory if needed. This is just enough for scalars and small
+     * structures. */
+    UA_STACKARRAY(UA_Byte, stackValueEncoding, UA_VALUENCODING_MAXSTACK);
+    UA_ByteString valueEncoding;
+    valueEncoding.data = stackValueEncoding;
+    valueEncoding.length = UA_VALUENCODING_MAXSTACK;
 
     /* Encode the value */
-    UA_Byte *bufPos = encoding->data;
-    const UA_Byte *bufEnd = &encoding->data[encoding->length];
+    UA_Byte *bufPos = valueEncoding.data;
+    const UA_Byte *bufEnd = &valueEncoding.data[valueEncoding.length];
     UA_StatusCode retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
                                            &bufPos, &bufEnd, NULL, NULL);
-    if(retval != UA_STATUSCODE_GOOD)
+    if(retval == UA_STATUSCODE_BADENCODINGLIMITSEXCEEDED) {
+        size_t binsize = UA_calcSizeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE]);
+        if(binsize == 0)
+            return false;
+        retval = UA_ByteString_allocBuffer(&valueEncoding, binsize);
+        if(retval == UA_STATUSCODE_GOOD) {
+            bufPos = valueEncoding.data;
+            bufEnd = &valueEncoding.data[valueEncoding.length];
+            retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
+                                     &bufPos, &bufEnd, NULL, NULL);
+        }
+    }
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_LOG_WARNING_SESSION(server->config.logger, session,
+                               "Subscription %u | MonitoredItem %i | "
+                               "Could not encode the value the MonitoredItem with status %s",
+                               subscriptionId, mon->monitoredItemId, UA_StatusCode_name(retval));
         return false;
+    }
+
+    /* Has the value changed? */
+    valueEncoding.length = (uintptr_t)bufPos - (uintptr_t)valueEncoding.data;
+    UA_Boolean changed = (!mon->lastSampledValue.data ||
+                          !UA_String_equal(&valueEncoding, &mon->lastSampledValue));
+
+    /* No change */
+    if(!changed) {
+        if(valueEncoding.data != stackValueEncoding)
+            UA_ByteString_deleteMembers(&valueEncoding);
+        return false;
+    }
+
+    /* Change detected. Copy encoding on the heap if necessary. */
+    if(valueEncoding.data == stackValueEncoding) {
+        retval = UA_ByteString_copy(&valueEncoding, encoding);
+        if(retval != UA_STATUSCODE_GOOD) {
+            UA_LOG_WARNING_SESSION(server->config.logger, session,
+                                   "Subscription %u | MonitoredItem %i | "
+                                   "Detected change, but could not allocate memory for the notification"
+                                   "with status %s", subscriptionId, mon->monitoredItemId,
+                                   UA_StatusCode_name(retval));
+            return false;
+        }
+        return true;
+    }
 
-    /* The value has changed */
-    encoding->length = (uintptr_t)bufPos - (uintptr_t)encoding->data;
-    return !mon->lastSampledValue.data || !UA_String_equal(encoding, &mon->lastSampledValue);
+    *encoding = valueEncoding;
+    return true;
 }
 
 /* Has this sample changed from the last one? The method may allocate additional
  * space for the encoding buffer. Detect the change in encoding->data. */
 static UA_Boolean
-detectValueChange(UA_MonitoredItem *mon, UA_DataValue *value, UA_ByteString *encoding) {
+detectValueChange(UA_Server *server, UA_MonitoredItem *mon,
+                  UA_DataValue value, UA_ByteString *encoding) {
     /* Apply Filter */
-    UA_Boolean hasValue = value->hasValue;
     if(mon->filter.trigger == UA_DATACHANGETRIGGER_STATUS)
-        value->hasValue = false;
-
-    UA_Boolean hasServerTimestamp = value->hasServerTimestamp;
-    UA_Boolean hasServerPicoseconds = value->hasServerPicoseconds;
-    value->hasServerTimestamp = false;
-    value->hasServerPicoseconds = false;
+        value.hasValue = false;
 
-    UA_Boolean hasSourceTimestamp = value->hasSourceTimestamp;
-    UA_Boolean hasSourcePicoseconds = value->hasSourcePicoseconds;
+    value.hasServerTimestamp = false;
+    value.hasServerPicoseconds = false;
     if(mon->filter.trigger < UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP) {
-        value->hasSourceTimestamp = false;
-        value->hasSourcePicoseconds = false;
+        value.hasSourceTimestamp = false;
+        value.hasSourcePicoseconds = false;
     }
 
-    /* Detect the Value Change */
-    UA_Boolean res = detectValueChangeWithFilter(mon, value, encoding);
-
-    /* Reset the filter */
-    value->hasValue = hasValue;
-    value->hasServerTimestamp = hasServerTimestamp;
-    value->hasServerPicoseconds = hasServerPicoseconds;
-    value->hasSourceTimestamp = hasSourceTimestamp;
-    value->hasSourcePicoseconds = hasSourcePicoseconds;
-    return res;
+    /* Detect the value change */
+    return detectValueChangeWithFilter(server, mon, &value, encoding);
 }
 
-/* Returns whether a new sample was created */
+/* Returns whether the sample was stored in the MonitoredItem */
 static UA_Boolean
-sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
-                        UA_MonitoredItem *monitoredItem,
-                        UA_DataValue *value,
-                        UA_ByteString *valueEncoding) {
+sampleCallbackWithValue(UA_Server *server, UA_MonitoredItem *monitoredItem,
+                        UA_DataValue *value) {
     UA_assert(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY);
-    /* Store the pointer to the stack-allocated bytestring to see if a heap-allocation
-     * was necessary */
-    UA_Byte *stackValueEncoding = valueEncoding->data;
+    UA_Subscription *sub = monitoredItem->subscription;
 
-    /* Has the value changed? */
-    UA_Boolean changed = detectValueChange(monitoredItem, value, valueEncoding);
-    if(!changed)
-        return false;
+    /* Contains heap-allocated binary encoding of the value if a change was detected */
+    UA_ByteString binaryEncoding = UA_BYTESTRING_NULL;
 
-    /* Allocate the entry for the publish queue */
-    UA_Notification *newNotification =
-        (UA_Notification *)UA_malloc(sizeof(UA_Notification));
-    if(!newNotification) {
-        UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
-                               "Subscription %u | MonitoredItem %i | "
-                               "Item for the publishing queue could not be allocated",
-                               sub->subscriptionId, monitoredItem->monitoredItemId);
+    /* Has the value changed? Allocates memory in binaryEncoding if necessary.
+     * value is edited internally so we make a shallow copy. */
+    UA_Boolean changed = detectValueChange(server, monitoredItem, *value, &binaryEncoding);
+    if(!changed)
         return false;
-    }
 
-    /* Copy valueEncoding on the heap for the next comparison (if not already done) */
-    if(valueEncoding->data == stackValueEncoding) {
-        UA_ByteString cbs;
-        if(UA_ByteString_copy(valueEncoding, &cbs) != UA_STATUSCODE_GOOD) {
+    UA_Boolean storedValue = false;
+    if(sub) {
+        /* Allocate a new notification */
+        UA_Notification *newNotification = (UA_Notification *)UA_malloc(sizeof(UA_Notification));
+        if(!newNotification) {
             UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
                                    "Subscription %u | MonitoredItem %i | "
-                                   "ByteString to compare values could not be created",
+                                   "Item for the publishing queue could not be allocated",
                                    sub->subscriptionId, monitoredItem->monitoredItemId);
-            UA_free(newNotification);
+            UA_ByteString_deleteMembers(&binaryEncoding);
             return false;
         }
-        *valueEncoding = cbs;
-    }
 
-    /* Prepare the newQueueItem */
-    if(value->hasValue && value->value.storageType == UA_VARIANT_DATA_NODELETE) {
-        /* Make a deep copy of the value */
-        UA_StatusCode retval = UA_DataValue_copy(value, &newNotification->data.value);
-        if(retval != UA_STATUSCODE_GOOD) {
-            UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
-                                   "Subscription %u | MonitoredItem %i | "
-                                   "Item for the publishing queue could not be prepared",
-                                   sub->subscriptionId, monitoredItem->monitoredItemId);
-            UA_free(newNotification);
-            return false;
-        }
-    } else {
-        newNotification->data.value = *value; /* Just copy the value and do not release it */
-    }
+        /* <-- Point of no return --> */
 
-    /* <-- Point of no return --> */
+        newNotification->mon = monitoredItem;
+        newNotification->data.value = *value; /* Move the value to the notification */
+        storedValue = true;
 
-    UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                         "Subscription %u | MonitoredItem %u | Sampled a new value",
-                         sub->subscriptionId, monitoredItem->monitoredItemId);
+        /* Add the notification to the end of local and global queue */
+        TAILQ_INSERT_TAIL(&monitoredItem->queue, newNotification, listEntry);
+        TAILQ_INSERT_TAIL(&sub->notificationQueue, newNotification, globalEntry);
+        ++monitoredItem->queueSize;
+        ++sub->notificationQueueSize;
 
-    newNotification->mon = monitoredItem;
+        /* Remove some notifications if the queue is beyond maximum capacity */
+        MonitoredItem_ensureQueueSpace(monitoredItem);
+    } else {
+        /* Call the local callback if not attached to a subscription */
+        UA_LocalMonitoredItem *localMon = (UA_LocalMonitoredItem*) monitoredItem;
+        void *nodeContext = NULL;
+        UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &nodeContext);
+        localMon->callback.dataChangeCallback(server, monitoredItem->monitoredItemId,
+                                              localMon->context,
+                                              &monitoredItem->monitoredNodeId,
+                                              nodeContext, monitoredItem->attributeId,
+                                              value);
+    }
 
-    /* Replace the encoding for comparison */
+    /* Store the encoding for comparison */
+    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
+    monitoredItem->lastSampledValue = binaryEncoding;
     UA_Variant_deleteMembers(&monitoredItem->lastValue);
     UA_Variant_copy(&value->value, &monitoredItem->lastValue);
-    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
-    monitoredItem->lastSampledValue = *valueEncoding;
-
-    /* Add the notification to the end of local and global queue */
-    TAILQ_INSERT_TAIL(&monitoredItem->queue, newNotification, listEntry);
-    TAILQ_INSERT_TAIL(&sub->notificationQueue, newNotification, globalEntry);
-    ++monitoredItem->queueSize;
-    ++sub->notificationQueueSize;
-
-    /* Remove some notifications if the queue is beyond maximum capacity */
-    MonitoredItem_ensureQueueSpace(monitoredItem);
 
-    return true;
+    return storedValue;
 }
 
 void
-UA_MonitoredItem_sampleCallback(UA_Server *server,
-                                UA_MonitoredItem *monitoredItem) {
+UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+    UA_Session *session = &adminSession;
+    UA_UInt32 subscriptionId = 0;
     UA_Subscription *sub = monitoredItem->subscription;
+    if(sub) {
+        session = sub->session;
+        subscriptionId = sub->subscriptionId;
+    }
+
     if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
-        UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                             "Subscription %u | MonitoredItem %i | "
-                             "Not a data change notification",
-                             sub->subscriptionId, monitoredItem->monitoredItemId);
+        UA_LOG_DEBUG_SESSION(server->config.logger, session, "Subscription %u | "
+                             "MonitoredItem %i | Not a data change notification",
+                             subscriptionId, monitoredItem->monitoredItemId);
         return;
     }
 
-    /* Read the value */
+    /* Sample the value */
     UA_ReadValueId rvid;
     UA_ReadValueId_init(&rvid);
     rvid.nodeId = monitoredItem->monitoredNodeId;
     rvid.attributeId = monitoredItem->attributeId;
     rvid.indexRange = monitoredItem->indexRange;
-    UA_DataValue value =
-        UA_Server_readWithSession(server, sub->session,
-                                  &rvid, monitoredItem->timestampsToReturn);
+    UA_DataValue value = UA_Server_readWithSession(server, session, &rvid, monitoredItem->timestampsToReturn);
 
-    /* Stack-allocate some memory for the value encoding. We might heap-allocate
-     * more memory if needed. This is just enough for scalars and small
-     * structures. */
-    UA_STACKARRAY(UA_Byte, stackValueEncoding, UA_VALUENCODING_MAXSTACK);
-    UA_ByteString valueEncoding;
-    valueEncoding.data = stackValueEncoding;
-    valueEncoding.length = UA_VALUENCODING_MAXSTACK;
+    /* Operate on the sample */
+    UA_Boolean storedValue = sampleCallbackWithValue(server, monitoredItem, &value);
 
-    /* Create a sample and compare with the last value */
-    UA_Boolean newNotification = sampleCallbackWithValue(server, sub, monitoredItem,
-                                                         &value, &valueEncoding);
-
-    /* Clean up */
-    if(!newNotification) {
-        if(valueEncoding.data != stackValueEncoding)
-            UA_ByteString_deleteMembers(&valueEncoding);
+    /* Delete the sample if it was not stored in the MonitoredItem  */
+    if(!storedValue)
         UA_DataValue_deleteMembers(&value);
-    }
 }
 
 UA_StatusCode