|
@@ -67,11 +67,36 @@ ensureSpaceInMonitoredItemQueue(UA_MonitoredItem *mon) {
|
|
|
--mon->currentQueueSize;
|
|
|
}
|
|
|
|
|
|
+/* Errors are returned as no change detected */
|
|
|
+static UA_Boolean
|
|
|
+detectValueChangeWithFilter(UA_MonitoredItem *mon, UA_DataValue *value,
|
|
|
+ UA_ByteString *encoding) {
|
|
|
+ /* Encode the data for comparison */
|
|
|
+ size_t binsize = UA_calcSizeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE]);
|
|
|
+ if(binsize == 0)
|
|
|
+ return false;
|
|
|
+
|
|
|
+ /* Allocate buffer on the heap if necessary */
|
|
|
+ if(binsize > UA_VALUENCODING_MAXSTACK &&
|
|
|
+ UA_ByteString_allocBuffer(encoding, binsize) != UA_STATUSCODE_GOOD)
|
|
|
+ return false;
|
|
|
+
|
|
|
+ /* Encode the value */
|
|
|
+ size_t encodingOffset = 0;
|
|
|
+ UA_StatusCode retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
|
|
|
+ NULL, NULL, encoding, &encodingOffset);
|
|
|
+ if(retval != UA_STATUSCODE_GOOD)
|
|
|
+ return false;
|
|
|
+
|
|
|
+ /* The value has changed */
|
|
|
+ encoding->length = encodingOffset;
|
|
|
+ return !mon->lastSampledValue.data || !UA_String_equal(encoding, &mon->lastSampledValue);
|
|
|
+}
|
|
|
+
|
|
|
/* Has this sample changed from the last one? The method may allocate additional
|
|
|
* space for the encoding buffer. Detect the change in encoding->data. */
|
|
|
-static UA_StatusCode
|
|
|
-detectValueChange(UA_MonitoredItem *mon, UA_DataValue *value,
|
|
|
- UA_ByteString *encoding, UA_Boolean *changed) {
|
|
|
+static UA_Boolean
|
|
|
+detectValueChange(UA_MonitoredItem *mon, UA_DataValue *value, UA_ByteString *encoding) {
|
|
|
/* Apply Filter */
|
|
|
UA_Boolean hasValue = value->hasValue;
|
|
|
if(mon->trigger == UA_DATACHANGETRIGGER_STATUS)
|
|
@@ -87,125 +112,70 @@ detectValueChange(UA_MonitoredItem *mon, UA_DataValue *value,
|
|
|
value->hasSourcePicoseconds = false;
|
|
|
}
|
|
|
|
|
|
- /* Forward declare before goto */
|
|
|
- UA_StatusCode retval = UA_STATUSCODE_GOOD;
|
|
|
- size_t encodingOffset = 0;
|
|
|
- size_t binsize;
|
|
|
-
|
|
|
- /* Encode the data for comparison */
|
|
|
- binsize = UA_calcSizeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE]);
|
|
|
- if(binsize == 0) {
|
|
|
- retval = UA_STATUSCODE_BADINTERNALERROR;
|
|
|
- goto cleanup;
|
|
|
- }
|
|
|
-
|
|
|
- /* Allocate buffer on the heap if necessary */
|
|
|
- if(binsize > UA_VALUENCODING_MAXSTACK &&
|
|
|
- UA_ByteString_allocBuffer(encoding, binsize) != UA_STATUSCODE_GOOD) {
|
|
|
- retval = UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
- goto cleanup;
|
|
|
- }
|
|
|
-
|
|
|
- /* Encode the value */
|
|
|
- retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
|
|
|
- NULL, NULL, encoding, &encodingOffset);
|
|
|
- if(retval != UA_STATUSCODE_GOOD)
|
|
|
- goto cleanup;
|
|
|
-
|
|
|
- /* The value has changed */
|
|
|
- encoding->length = encodingOffset;
|
|
|
- if(!mon->lastSampledValue.data || !UA_String_equal(encoding, &mon->lastSampledValue))
|
|
|
- *changed = true;
|
|
|
+ /* Detect the Value Change */
|
|
|
+ UA_Boolean res = detectValueChangeWithFilter(mon, value, encoding);
|
|
|
|
|
|
- cleanup:
|
|
|
/* Reset the filter */
|
|
|
value->hasValue = hasValue;
|
|
|
value->hasServerTimestamp = hasServerTimestamp;
|
|
|
value->hasServerPicoseconds = hasServerPicoseconds;
|
|
|
value->hasSourceTimestamp = hasSourceTimestamp;
|
|
|
value->hasSourcePicoseconds = hasSourcePicoseconds;
|
|
|
- return retval;
|
|
|
+ return res;
|
|
|
}
|
|
|
|
|
|
-void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
|
|
|
- UA_Subscription *sub = monitoredItem->subscription;
|
|
|
- if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
|
|
|
- UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | MonitoredItem %i | "
|
|
|
- "Not a data change notification",
|
|
|
- sub->subscriptionID, monitoredItem->itemId);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- /* Adjust timestampstoreturn to get source timestamp for triggering */
|
|
|
- UA_TimestampsToReturn ts = monitoredItem->timestampsToReturn;
|
|
|
- if(ts == UA_TIMESTAMPSTORETURN_SERVER)
|
|
|
- ts = UA_TIMESTAMPSTORETURN_BOTH;
|
|
|
- else if(ts == UA_TIMESTAMPSTORETURN_NEITHER)
|
|
|
- ts = UA_TIMESTAMPSTORETURN_SOURCE;
|
|
|
-
|
|
|
- /* Read the value */
|
|
|
- UA_ReadValueId rvid;
|
|
|
- UA_ReadValueId_init(&rvid);
|
|
|
- rvid.nodeId = monitoredItem->monitoredNodeId;
|
|
|
- rvid.attributeId = monitoredItem->attributeID;
|
|
|
- rvid.indexRange = monitoredItem->indexRange;
|
|
|
- UA_DataValue value;
|
|
|
- UA_DataValue_init(&value);
|
|
|
- Service_Read_single(server, sub->session, ts, &rvid, &value);
|
|
|
-
|
|
|
- /* Stack-allocate some memory for the value encoding */
|
|
|
- UA_Byte *stackValueEncoding = (UA_Byte *)UA_alloca(UA_VALUENCODING_MAXSTACK);
|
|
|
- UA_ByteString valueEncoding;
|
|
|
- valueEncoding.data = stackValueEncoding;
|
|
|
- valueEncoding.length = UA_VALUENCODING_MAXSTACK;
|
|
|
-
|
|
|
- /* Forward declaration before goto */
|
|
|
- MonitoredItem_queuedValue *newQueueItem;
|
|
|
+/* Returns whether a new sample was created */
|
|
|
+static UA_Boolean
|
|
|
+sampleCallbackWithValue(UA_Server *server, UA_Subscription *sub,
|
|
|
+ UA_MonitoredItem *monitoredItem,
|
|
|
+ UA_DataValue *value, UA_ByteString *valueEncoding) {
|
|
|
+ /* Store the pointer to the stack-allocated bytestring to see if a heap-allocation
|
|
|
+ * was necessary */
|
|
|
+ UA_Byte *stackValueEncoding = valueEncoding->data;
|
|
|
|
|
|
/* Has the value changed? */
|
|
|
- UA_Boolean changed = false;
|
|
|
- UA_StatusCode retval = detectValueChange(monitoredItem, &value,
|
|
|
- &valueEncoding, &changed);
|
|
|
- if(!changed || retval != UA_STATUSCODE_GOOD)
|
|
|
- goto cleanup;
|
|
|
+ UA_Boolean changed = detectValueChange(monitoredItem, value, valueEncoding);
|
|
|
+ if(!changed)
|
|
|
+ return false;
|
|
|
|
|
|
/* Allocate the entry for the publish queue */
|
|
|
- newQueueItem = (MonitoredItem_queuedValue *)UA_malloc(sizeof(MonitoredItem_queuedValue));
|
|
|
+ MonitoredItem_queuedValue *newQueueItem =
|
|
|
+ (MonitoredItem_queuedValue *)UA_malloc(sizeof(MonitoredItem_queuedValue));
|
|
|
if(!newQueueItem) {
|
|
|
UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
|
|
|
"Subscription %u | MonitoredItem %i | "
|
|
|
"Item for the publishing queue could not be allocated",
|
|
|
sub->subscriptionID, monitoredItem->itemId);
|
|
|
- goto cleanup;
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
/* Copy valueEncoding on the heap for the next comparison (if not already done) */
|
|
|
- if(valueEncoding.data == stackValueEncoding) {
|
|
|
+ if(valueEncoding->data == stackValueEncoding) {
|
|
|
UA_ByteString cbs;
|
|
|
- if(UA_ByteString_copy(&valueEncoding, &cbs) != UA_STATUSCODE_GOOD) {
|
|
|
+ if(UA_ByteString_copy(valueEncoding, &cbs) != UA_STATUSCODE_GOOD) {
|
|
|
UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
|
|
|
"Subscription %u | MonitoredItem %i | "
|
|
|
"ByteString to compare values could not be created",
|
|
|
sub->subscriptionID, monitoredItem->itemId);
|
|
|
UA_free(newQueueItem);
|
|
|
- goto cleanup;
|
|
|
+ return false;
|
|
|
}
|
|
|
- valueEncoding = cbs;
|
|
|
+ *valueEncoding = cbs;
|
|
|
}
|
|
|
|
|
|
/* Prepare the newQueueItem */
|
|
|
- if(value.hasValue && value.value.storageType == UA_VARIANT_DATA_NODELETE) {
|
|
|
- if(UA_DataValue_copy(&value, &newQueueItem->value) != UA_STATUSCODE_GOOD) {
|
|
|
+ if(value->hasValue && value->value.storageType == UA_VARIANT_DATA_NODELETE) {
|
|
|
+ /* Make a deep copy of the value */
|
|
|
+ if(UA_DataValue_copy(value, &newQueueItem->value) != UA_STATUSCODE_GOOD) {
|
|
|
UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
|
|
|
"Subscription %u | MonitoredItem %i | "
|
|
|
"Item for the publishing queue could not be prepared",
|
|
|
sub->subscriptionID, monitoredItem->itemId);
|
|
|
UA_free(newQueueItem);
|
|
|
- goto cleanup;
|
|
|
+ return false;
|
|
|
}
|
|
|
} else {
|
|
|
- newQueueItem->value = value;
|
|
|
+ newQueueItem->value = *value; /* Just copy the value and do not release it */
|
|
|
}
|
|
|
newQueueItem->clientHandle = monitoredItem->clientHandle;
|
|
|
|
|
@@ -217,18 +187,60 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
|
|
|
|
|
|
/* Replace the encoding for comparison */
|
|
|
UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
|
|
|
- monitoredItem->lastSampledValue = valueEncoding;
|
|
|
+ monitoredItem->lastSampledValue = *valueEncoding;
|
|
|
|
|
|
/* Add the sample to the queue for publication */
|
|
|
ensureSpaceInMonitoredItemQueue(monitoredItem);
|
|
|
TAILQ_INSERT_TAIL(&monitoredItem->queue, newQueueItem, listEntry);
|
|
|
++monitoredItem->currentQueueSize;
|
|
|
- return;
|
|
|
+ return true;;
|
|
|
+}
|
|
|
+
|
|
|
+void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
|
|
|
+ UA_Subscription *sub = monitoredItem->subscription;
|
|
|
+ if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
|
|
|
+ UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
+ "Subscription %u | MonitoredItem %i | "
|
|
|
+ "Not a data change notification",
|
|
|
+ sub->subscriptionID, monitoredItem->itemId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- cleanup:
|
|
|
- if(valueEncoding.data != stackValueEncoding)
|
|
|
- UA_ByteString_deleteMembers(&valueEncoding);
|
|
|
- UA_DataValue_deleteMembers(&value);
|
|
|
+ /* Adjust timestampstoreturn to get source timestamp for triggering */
|
|
|
+ UA_TimestampsToReturn ts = monitoredItem->timestampsToReturn;
|
|
|
+ if(ts == UA_TIMESTAMPSTORETURN_SERVER)
|
|
|
+ ts = UA_TIMESTAMPSTORETURN_BOTH;
|
|
|
+ else if(ts == UA_TIMESTAMPSTORETURN_NEITHER)
|
|
|
+ ts = UA_TIMESTAMPSTORETURN_SOURCE;
|
|
|
+
|
|
|
+ /* Read the value */
|
|
|
+ UA_ReadValueId rvid;
|
|
|
+ UA_ReadValueId_init(&rvid);
|
|
|
+ rvid.nodeId = monitoredItem->monitoredNodeId;
|
|
|
+ rvid.attributeId = monitoredItem->attributeID;
|
|
|
+ rvid.indexRange = monitoredItem->indexRange;
|
|
|
+ UA_DataValue value;
|
|
|
+ UA_DataValue_init(&value);
|
|
|
+ Service_Read_single(server, sub->session, ts, &rvid, &value);
|
|
|
+
|
|
|
+ /* Stack-allocate some memory for the value encoding. We might heap-allocate
|
|
|
+ * more memory if needed. This is just enough for scalars and small
|
|
|
+ * structures. */
|
|
|
+ UA_Byte *stackValueEncoding = (UA_Byte *)UA_alloca(UA_VALUENCODING_MAXSTACK);
|
|
|
+ UA_ByteString valueEncoding;
|
|
|
+ valueEncoding.data = stackValueEncoding;
|
|
|
+ valueEncoding.length = UA_VALUENCODING_MAXSTACK;
|
|
|
+
|
|
|
+ /* Create a sample and compare with the last value */
|
|
|
+ UA_Boolean newNotification = sampleCallbackWithValue(server, sub, monitoredItem,
|
|
|
+ &value, &valueEncoding);
|
|
|
+
|
|
|
+ /* Clean up */
|
|
|
+ if(!newNotification) {
|
|
|
+ if(valueEncoding.data != stackValueEncoding)
|
|
|
+ UA_ByteString_deleteMembers(&valueEncoding);
|
|
|
+ UA_DataValue_deleteMembers(&value);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
UA_StatusCode
|
|
@@ -380,7 +392,7 @@ prepareNotificationMessage(UA_Subscription *sub, UA_NotificationMessage *message
|
|
|
size_t notifications) {
|
|
|
/* Array of ExtensionObject to hold different kinds of notifications
|
|
|
(currently only DataChangeNotifications) */
|
|
|
- message->notificationData = (UA_ExtensionObject *)UA_Array_new(1, &UA_TYPES[UA_TYPES_EXTENSIONOBJECT]);
|
|
|
+ message->notificationData = UA_ExtensionObject_new();
|
|
|
if(!message->notificationData)
|
|
|
return UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
message->notificationDataSize = 1;
|