|
@@ -6,6 +6,8 @@
|
|
|
|
|
|
#ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
|
|
|
|
|
|
+#define UA_VALUENCODING_MAXSTACK 2048
|
|
|
+
|
|
|
/*****************/
|
|
|
/* MonitoredItem */
|
|
|
/*****************/
|
|
@@ -55,17 +57,6 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
|
|
|
- if(!newvalue) {
|
|
|
- UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
|
|
|
- "Subscription %u | MonitoredItem %i | "
|
|
|
- "Skipped a sample due to lack of memory",
|
|
|
- sub->subscriptionID, monitoredItem->itemId);
|
|
|
- return;
|
|
|
- }
|
|
|
- UA_DataValue_init(&newvalue->value);
|
|
|
- newvalue->clientHandle = monitoredItem->clientHandle;
|
|
|
-
|
|
|
/* Adjust timestampstoreturn to get source timestamp for triggering */
|
|
|
UA_TimestampsToReturn ts = monitoredItem->timestampsToReturn;
|
|
|
if(ts == UA_TIMESTAMPSTORETURN_SERVER)
|
|
@@ -79,62 +70,120 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
|
|
|
rvid.nodeId = monitoredItem->monitoredNodeId;
|
|
|
rvid.attributeId = monitoredItem->attributeID;
|
|
|
rvid.indexRange = monitoredItem->indexRange;
|
|
|
- Service_Read_single(server, sub->session, ts, &rvid, &newvalue->value);
|
|
|
-
|
|
|
- /* Apply Filter */
|
|
|
- UA_Boolean hasValue = newvalue->value.hasValue;
|
|
|
- UA_Boolean hasServerTimestamp = newvalue->value.hasServerTimestamp;
|
|
|
- UA_Boolean hasServerPicoseconds = newvalue->value.hasServerPicoseconds;
|
|
|
- UA_Boolean hasSourceTimestamp = newvalue->value.hasSourceTimestamp;
|
|
|
- UA_Boolean hasSourcePicoseconds = newvalue->value.hasSourcePicoseconds;
|
|
|
- newvalue->value.hasServerTimestamp = false;
|
|
|
- newvalue->value.hasServerPicoseconds = false;
|
|
|
+ UA_DataValue value;
|
|
|
+ UA_DataValue_init(&value);
|
|
|
+ Service_Read_single(server, sub->session, ts, &rvid, &value);
|
|
|
+
|
|
|
+ /* Apply Filter. This needs to be reset later on, as the filter may be
|
|
|
+ * different from the encoded value. */
|
|
|
+ UA_Boolean hasValue = value.hasValue;
|
|
|
+ UA_Boolean hasServerTimestamp = value.hasServerTimestamp;
|
|
|
+ UA_Boolean hasServerPicoseconds = value.hasServerPicoseconds;
|
|
|
+ UA_Boolean hasSourceTimestamp = value.hasSourceTimestamp;
|
|
|
+ UA_Boolean hasSourcePicoseconds = value.hasSourcePicoseconds;
|
|
|
+ value.hasServerTimestamp = false;
|
|
|
+ value.hasServerPicoseconds = false;
|
|
|
if(monitoredItem->trigger == UA_DATACHANGETRIGGER_STATUS)
|
|
|
- newvalue->value.hasValue = false;
|
|
|
+ value.hasValue = false;
|
|
|
if(monitoredItem->trigger < UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP) {
|
|
|
- newvalue->value.hasSourceTimestamp = false;
|
|
|
- newvalue->value.hasSourcePicoseconds = false;
|
|
|
+ value.hasSourceTimestamp = false;
|
|
|
+ value.hasSourcePicoseconds = false;
|
|
|
}
|
|
|
|
|
|
/* Encode the data for comparison */
|
|
|
- size_t binsize = UA_calcSizeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE]);
|
|
|
- UA_ByteString newValueAsByteString;
|
|
|
- UA_StatusCode retval = UA_ByteString_allocBuffer(&newValueAsByteString, binsize);
|
|
|
- if(retval != UA_STATUSCODE_GOOD) {
|
|
|
- UA_DataValue_deleteMembers(&newvalue->value);
|
|
|
- UA_free(newvalue);
|
|
|
+ size_t binsize = UA_calcSizeBinary(&value, &UA_TYPES[UA_TYPES_DATAVALUE]);
|
|
|
+ if(binsize == 0) {
|
|
|
+ UA_DataValue_deleteMembers(&value);
|
|
|
return;
|
|
|
}
|
|
|
- size_t encodingOffset = 0;
|
|
|
- retval = UA_encodeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE],
|
|
|
- NULL, NULL, &newValueAsByteString, &encodingOffset);
|
|
|
-
|
|
|
- /* Restore the settings changed for the filter */
|
|
|
- newvalue->value.hasValue = hasValue;
|
|
|
- newvalue->value.hasServerTimestamp = hasServerTimestamp;
|
|
|
- newvalue->value.hasServerPicoseconds = hasServerPicoseconds;
|
|
|
- if(monitoredItem->timestampsToReturn == UA_TIMESTAMPSTORETURN_SERVER ||
|
|
|
- monitoredItem->timestampsToReturn == UA_TIMESTAMPSTORETURN_NEITHER) {
|
|
|
- newvalue->value.hasSourceTimestamp = false;
|
|
|
- newvalue->value.hasSourcePicoseconds = false;
|
|
|
+ UA_ByteString comparisonByteString;
|
|
|
+ UA_Boolean comparisonByteStringOnStack = (binsize <= UA_VALUENCODING_MAXSTACK);
|
|
|
+ if(comparisonByteStringOnStack) {
|
|
|
+ comparisonByteString.data = (UA_Byte*)UA_alloca(binsize);
|
|
|
+ comparisonByteString.length = binsize;
|
|
|
} else {
|
|
|
- newvalue->value.hasSourceTimestamp = hasSourceTimestamp;
|
|
|
- newvalue->value.hasSourcePicoseconds = hasSourcePicoseconds;
|
|
|
+ if(UA_ByteString_allocBuffer(&comparisonByteString, binsize) !=
|
|
|
+ UA_STATUSCODE_GOOD) {
|
|
|
+ UA_DataValue_deleteMembers(&value);
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/* Error or the value has not changed */
|
|
|
- if(retval != UA_STATUSCODE_GOOD ||
|
|
|
+ size_t encodingOffset = 0;
|
|
|
+ if((UA_encodeBinary(&value, &UA_TYPES[UA_TYPES_DATAVALUE],
|
|
|
+ NULL, NULL, &comparisonByteString,
|
|
|
+ &encodingOffset) != UA_STATUSCODE_GOOD) ||
|
|
|
(monitoredItem->lastSampledValue.data &&
|
|
|
- UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue))) {
|
|
|
+ UA_String_equal(&comparisonByteString, &monitoredItem->lastSampledValue))) {
|
|
|
UA_LOG_TRACE_SESSION(server->config.logger, sub->session, "Subscription %u | "
|
|
|
"MonitoredItem %u | Do not sample an unchanged value",
|
|
|
sub->subscriptionID, monitoredItem->itemId);
|
|
|
goto cleanup;
|
|
|
}
|
|
|
|
|
|
+ /* Allocate the entry for the publish queue */
|
|
|
+ MonitoredItem_queuedValue *newQueueItem = UA_malloc(sizeof(MonitoredItem_queuedValue));
|
|
|
+ if(!newQueueItem) {
|
|
|
+ UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
|
|
|
+ "Subscription %u | MonitoredItem %i | "
|
|
|
+ "Item for the publishing queue could not be allocated",
|
|
|
+ sub->subscriptionID, monitoredItem->itemId);
|
|
|
+ goto cleanup;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Copy comparisonByteString on the heap */
|
|
|
+ if(comparisonByteStringOnStack) {
|
|
|
+ UA_ByteString cbs;
|
|
|
+ if(UA_ByteString_copy(&comparisonByteString, &cbs) != UA_STATUSCODE_GOOD) {
|
|
|
+ UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
|
|
|
+ "Subscription %u | MonitoredItem %i | "
|
|
|
+ "ByteString to compare values could not be created",
|
|
|
+ sub->subscriptionID, monitoredItem->itemId);
|
|
|
+ UA_free(newQueueItem);
|
|
|
+ goto cleanup;
|
|
|
+ }
|
|
|
+ comparisonByteString = cbs;
|
|
|
+ comparisonByteStringOnStack = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Restore the settings changed for the filter */
|
|
|
+ value.hasValue = hasValue;
|
|
|
+ value.hasServerTimestamp = hasServerTimestamp;
|
|
|
+ value.hasServerPicoseconds = hasServerPicoseconds;
|
|
|
+ if(monitoredItem->timestampsToReturn == UA_TIMESTAMPSTORETURN_SERVER ||
|
|
|
+ monitoredItem->timestampsToReturn == UA_TIMESTAMPSTORETURN_NEITHER) {
|
|
|
+ value.hasSourceTimestamp = false;
|
|
|
+ value.hasSourcePicoseconds = false;
|
|
|
+ } else {
|
|
|
+ value.hasSourceTimestamp = hasSourceTimestamp;
|
|
|
+ value.hasSourcePicoseconds = hasSourcePicoseconds;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Prepare the newQueueItem */
|
|
|
+ if(value.hasValue && value.value.storageType == UA_VARIANT_DATA_NODELETE) {
|
|
|
+ /* If the read request returned a datavalue pointing into the nodestore,
|
|
|
+ * we must make a deep copy to keep the datavalue across mainloop
|
|
|
+ * iterations */
|
|
|
+ if(UA_DataValue_copy(&value, &newQueueItem->value) != UA_STATUSCODE_GOOD) {
|
|
|
+ UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
|
|
|
+ "Subscription %u | MonitoredItem %i | "
|
|
|
+ "Item for the publishing queue could not be prepared",
|
|
|
+ sub->subscriptionID, monitoredItem->itemId);
|
|
|
+ UA_free(newQueueItem);
|
|
|
+ goto cleanup;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ /* Copy the content with pointers (do not clean up value) */
|
|
|
+ newQueueItem->value = value;
|
|
|
+ }
|
|
|
+ newQueueItem->clientHandle = monitoredItem->clientHandle;
|
|
|
+
|
|
|
+ /* <-- Point of no return --> */
|
|
|
+
|
|
|
UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
|
|
|
"Subscription %u | MonitoredItem %u | "
|
|
|
- "Sampling the value", sub->subscriptionID, monitoredItem->itemId);
|
|
|
+ "Sampled a new value", sub->subscriptionID, monitoredItem->itemId);
|
|
|
|
|
|
/* Is enough space in the queue? */
|
|
|
if(monitoredItem->currentQueueSize >= monitoredItem->maxQueueSize) {
|
|
@@ -143,42 +192,26 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
|
|
|
queueItem = TAILQ_FIRST(&monitoredItem->queue);
|
|
|
else
|
|
|
queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
|
|
|
-
|
|
|
- if(!queueItem) {
|
|
|
- UA_LOG_WARNING_SESSION(server->config.logger, sub->session, "Subscription %u | "
|
|
|
- "MonitoredItem %u | Cannot remove an element from the full "
|
|
|
- "queue. Internal error!", sub->subscriptionID,
|
|
|
- monitoredItem->itemId);
|
|
|
- goto cleanup;
|
|
|
- }
|
|
|
-
|
|
|
+ UA_assert(queueItem); /* When the currentQueueSize > 0, then there is an item */
|
|
|
TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
|
|
|
UA_DataValue_deleteMembers(&queueItem->value);
|
|
|
UA_free(queueItem);
|
|
|
monitoredItem->currentQueueSize--;
|
|
|
}
|
|
|
|
|
|
- /* If the read request returned a datavalue pointing into the nodestore, we
|
|
|
- * must make a deep copy to keep the datavalue across mainloop iterations */
|
|
|
- if(newvalue->value.hasValue &&
|
|
|
- newvalue->value.value.storageType == UA_VARIANT_DATA_NODELETE) {
|
|
|
- UA_Variant tempv = newvalue->value.value;
|
|
|
- UA_Variant_copy(&tempv, &newvalue->value.value);
|
|
|
- }
|
|
|
-
|
|
|
/* Replace the comparison bytestring with the current sample */
|
|
|
UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
|
|
|
- monitoredItem->lastSampledValue = newValueAsByteString;
|
|
|
+ monitoredItem->lastSampledValue = comparisonByteString;
|
|
|
|
|
|
/* Add the sample to the queue for publication */
|
|
|
- TAILQ_INSERT_TAIL(&monitoredItem->queue, newvalue, listEntry);
|
|
|
+ TAILQ_INSERT_TAIL(&monitoredItem->queue, newQueueItem, listEntry);
|
|
|
monitoredItem->currentQueueSize++;
|
|
|
return;
|
|
|
|
|
|
cleanup:
|
|
|
- UA_ByteString_deleteMembers(&newValueAsByteString);
|
|
|
- UA_DataValue_deleteMembers(&newvalue->value);
|
|
|
- UA_free(newvalue);
|
|
|
+ if(!comparisonByteStringOnStack)
|
|
|
+ UA_ByteString_deleteMembers(&comparisonByteString);
|
|
|
+ UA_DataValue_deleteMembers(&value);
|
|
|
}
|
|
|
|
|
|
UA_StatusCode
|