|
@@ -67,7 +67,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
// will need to be generated
|
|
|
LIST_FOREACH(mon, &subscription->MonitoredItems, listEntry) {
|
|
|
// Check if this MonitoredItems Queue holds data and how much data is held in total
|
|
|
- if (!mon->queue.lh_first)
|
|
|
+ if(!TAILQ_FIRST(&mon->queue))
|
|
|
continue;
|
|
|
if((mon->MonitoredItemType & MONITOREDITEM_TYPE_CHANGENOTIFY) != 0)
|
|
|
monItemsChangeT+=mon->QueueSize.currentValue;
|
|
@@ -113,16 +113,17 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
|
|
|
if(notmsgn == 0) {
|
|
|
// Construct a DataChangeNotification
|
|
|
- changeNotification = (UA_DataChangeNotification *) UA_malloc(sizeof(UA_DataChangeNotification));
|
|
|
+ changeNotification = UA_malloc(sizeof(UA_DataChangeNotification));
|
|
|
|
|
|
// Create one DataChangeNotification for each queue item held in each monitoredItems queue:
|
|
|
- changeNotification->monitoredItems = (UA_MonitoredItemNotification *) UA_malloc(sizeof(UA_MonitoredItemNotification) * monItemsChangeT);
|
|
|
+ changeNotification->monitoredItems = UA_malloc(sizeof(UA_MonitoredItemNotification) * monItemsChangeT);
|
|
|
|
|
|
// Scan all monitoredItems in this subscription and have their queue transformed into an Array of
|
|
|
// the propper NotificationMessageType (Status, Change, Event)
|
|
|
monItemsChangeT = 0;
|
|
|
for(mon=subscription->MonitoredItems.lh_first; mon != NULL; mon=mon->listEntry.le_next) {
|
|
|
- if (mon->MonitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY || mon->queue.lh_first == NULL ) continue;
|
|
|
+ if(mon->MonitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY || !TAILQ_FIRST(&mon->queue))
|
|
|
+ continue;
|
|
|
// Note: Monitored Items might not return a queuedValue if there is a problem encoding it.
|
|
|
monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications( &((changeNotification->monitoredItems)[monItemsChangeT]), mon);
|
|
|
MonitoredItem_ClearQueue(mon);
|
|
@@ -234,7 +235,7 @@ UA_MonitoredItem *UA_MonitoredItem_new() {
|
|
|
// FIXME: This is currently hardcoded;
|
|
|
new->MonitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
|
|
|
|
|
|
- LIST_INIT(&new->queue);
|
|
|
+ TAILQ_INIT(&new->queue);
|
|
|
LIST_INITENTRY(new, listEntry);
|
|
|
UA_NodeId_init(&new->monitoredNodeId);
|
|
|
INITPOINTER(new->LastSampledValue.data );
|
|
@@ -263,7 +264,7 @@ int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *d
|
|
|
MonitoredItem_queuedValue *queueItem;
|
|
|
|
|
|
// Count instead of relying on the items currentValue
|
|
|
- LIST_FOREACH(queueItem, &monitoredItem->queue, listEntry) {
|
|
|
+ TAILQ_FOREACH(queueItem, &monitoredItem->queue, listEntry) {
|
|
|
dst[queueSize].clientHandle = monitoredItem->ClientHandle;
|
|
|
dst[queueSize].value.hasServerPicoseconds = UA_FALSE;
|
|
|
dst[queueSize].value.hasServerTimestamp = UA_FALSE;
|
|
@@ -284,16 +285,10 @@ int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *d
|
|
|
|
|
|
void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
|
|
|
MonitoredItem_queuedValue *val;
|
|
|
- if (monitoredItem == NULL)
|
|
|
- return;
|
|
|
- while(monitoredItem->queue.lh_first) {
|
|
|
- val = monitoredItem->queue.lh_first;
|
|
|
- LIST_REMOVE(val, listEntry);
|
|
|
- if(val->value.data != NULL) {
|
|
|
- UA_Variant_deleteMembers(&(val->value));
|
|
|
- UA_free(val->value.data);
|
|
|
- }
|
|
|
- UA_free(val);
|
|
|
+ while((val = TAILQ_FIRST(&monitoredItem->queue))) {
|
|
|
+ TAILQ_REMOVE(&monitoredItem->queue, val, listEntry);
|
|
|
+ UA_Variant_deleteMembers(&val->value);
|
|
|
+ UA_free(val);
|
|
|
}
|
|
|
monitoredItem->QueueSize.currentValue = 0;
|
|
|
}
|
|
@@ -407,7 +402,6 @@ void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monit
|
|
|
newvalue = (MonitoredItem_queuedValue *) UA_malloc(sizeof(MonitoredItem_queuedValue));
|
|
|
if(!newvalue)
|
|
|
return;
|
|
|
- LIST_INITENTRY(newvalue,listEntry);
|
|
|
newvalue->value.arrayLength = 0;
|
|
|
newvalue->value.arrayDimensionsSize = 0;
|
|
|
newvalue->value.arrayDimensions = NULL;
|
|
@@ -426,64 +420,48 @@ void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monit
|
|
|
samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->AttributeID, target,
|
|
|
&newvalue->value);
|
|
|
UA_NodeStore_release(target);
|
|
|
+ if(samplingError != UA_FALSE || newvalue->value.type == NULL) {
|
|
|
+ UA_Variant_deleteMembers(&(newvalue->value));
|
|
|
+ UA_free(newvalue);
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
if(monitoredItem->QueueSize.currentValue >= monitoredItem->QueueSize.maxValue) {
|
|
|
- if(newvalue->value.type && monitoredItem->DiscardOldest == UA_TRUE &&
|
|
|
- monitoredItem->queue.lh_first != NULL ) {
|
|
|
- for(queueItem = monitoredItem->queue.lh_first;
|
|
|
- queueItem->listEntry.le_next != NULL;
|
|
|
- queueItem = queueItem->listEntry.le_next) {}
|
|
|
- LIST_REMOVE(queueItem, listEntry);
|
|
|
+ if(monitoredItem->DiscardOldest == UA_TRUE) {
|
|
|
+ queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
|
|
|
+ TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
|
|
|
UA_free(queueItem);
|
|
|
- (monitoredItem->QueueSize).currentValue--;
|
|
|
- }
|
|
|
- else {
|
|
|
+ monitoredItem->QueueSize.currentValue--;
|
|
|
+ } else {
|
|
|
// We cannot remove the oldest value and theres no queue space left. We're done here.
|
|
|
UA_free(newvalue);
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Only add a value if we have sampled it correctly and it fits into the queue;
|
|
|
- if ( samplingError != UA_FALSE || newvalue->value.type == NULL || (monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
|
|
|
- if (newvalue->value.data != NULL ) {
|
|
|
- UA_Variant_deleteMembers(&(newvalue->value));
|
|
|
- UA_free(&(newvalue->value));
|
|
|
- }
|
|
|
- UA_free(newvalue);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- newValueAsByteString.length = UA_calcSizeBinary((const void *) &(newvalue->value), &UA_TYPES[UA_TYPES_VARIANT]);
|
|
|
+ // encode the data to find if its different to the previous
|
|
|
+ newValueAsByteString.length = UA_calcSizeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_VARIANT]);
|
|
|
newValueAsByteString.data = UA_malloc(newValueAsByteString.length);
|
|
|
- UA_encodeBinary((const void *) &(newvalue->value), &UA_TYPES[UA_TYPES_VARIANT], &(newValueAsByteString), &encodingOffset );
|
|
|
+ UA_encodeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_VARIANT], &newValueAsByteString, &encodingOffset);
|
|
|
|
|
|
if(monitoredItem->LastSampledValue.data == NULL) {
|
|
|
- UA_ByteString_copy((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue));
|
|
|
- LIST_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
|
|
|
- (monitoredItem->QueueSize).currentValue++;
|
|
|
- monitoredItem->LastSampled = UA_DateTime_now();
|
|
|
- UA_free(newValueAsByteString.data);
|
|
|
- }
|
|
|
- else {
|
|
|
- if (UA_String_equal((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue)) == UA_TRUE) {
|
|
|
- if (newvalue->value.data != NULL ) {
|
|
|
- UA_Variant_deleteMembers(&(newvalue->value));
|
|
|
- UA_free(&(newvalue->value)); // Same addr as newvalue
|
|
|
- }
|
|
|
- else {
|
|
|
- UA_free(newvalue); // Same address as newvalue->value
|
|
|
+ monitoredItem->LastSampledValue = newValueAsByteString;
|
|
|
+ TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
|
|
|
+ monitoredItem->QueueSize.currentValue++;
|
|
|
+ monitoredItem->LastSampled = UA_DateTime_now();
|
|
|
+ } else {
|
|
|
+ if (UA_String_equal(&newValueAsByteString, &monitoredItem->LastSampledValue) == UA_TRUE) {
|
|
|
+ UA_Variant_deleteMembers(&newvalue->value);
|
|
|
+ UA_free(newvalue);
|
|
|
+ UA_String_deleteMembers(&newValueAsByteString);
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ UA_ByteString_deleteMembers(&monitoredItem->LastSampledValue);
|
|
|
+ monitoredItem->LastSampledValue = newValueAsByteString;
|
|
|
+ TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
|
|
|
+ monitoredItem->QueueSize.currentValue++;
|
|
|
+ monitoredItem->LastSampled = UA_DateTime_now();
|
|
|
}
|
|
|
- UA_free(newValueAsByteString.data);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- UA_ByteString_copy((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue));
|
|
|
- LIST_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
|
|
|
- (monitoredItem->QueueSize).currentValue++;
|
|
|
- monitoredItem->LastSampled = UA_DateTime_now();
|
|
|
- UA_free(newValueAsByteString.data);
|
|
|
}
|
|
|
-
|
|
|
return;
|
|
|
}
|