|
@@ -1,15 +1,9 @@
|
|
|
-#ifndef ENABLESUBSCRIPTIONS
|
|
|
-#define ENABLESUBSCRIPTIONS
|
|
|
-#endif
|
|
|
-
|
|
|
#ifdef ENABLESUBSCRIPTIONS
|
|
|
#include "ua_types.h"
|
|
|
#include "ua_server_internal.h"
|
|
|
#include "ua_nodestore.h"
|
|
|
#include "ua_subscription_manager.h"
|
|
|
|
|
|
-#include <stdio.h> // Remove later, debugging only
|
|
|
-
|
|
|
void SubscriptionManager_init(UA_Session *session) {
|
|
|
UA_SubscriptionManager *manager = &(session->subscriptionManager);
|
|
|
|
|
@@ -55,6 +49,7 @@ UA_MonitoredItem *UA_MonitoredItem_new() {
|
|
|
LIST_INIT(new->queue);
|
|
|
LIST_INITENTRY(new, listEntry);
|
|
|
INITPOINTER(new->monitoredNode);
|
|
|
+ INITPOINTER(new->LastSampledValue.data );
|
|
|
return new;
|
|
|
}
|
|
|
|
|
@@ -103,6 +98,11 @@ void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
|
|
|
MonitoredItem_ClearQueue(monitoredItem);
|
|
|
// Remove from subscription list
|
|
|
LIST_REMOVE(monitoredItem, listEntry);
|
|
|
+ // Release comparison sample
|
|
|
+ if(monitoredItem->LastSampledValue.data != NULL) {
|
|
|
+ UA_free(monitoredItem->LastSampledValue.data);
|
|
|
+ }
|
|
|
+
|
|
|
UA_free(monitoredItem);
|
|
|
|
|
|
return;
|
|
@@ -149,6 +149,27 @@ UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription) {
|
|
|
return j;
|
|
|
}
|
|
|
|
|
|
+void Subscription_generateKeepAlive(UA_Subscription *subscription) {
|
|
|
+ UA_unpublishedNotification *msg = NULL;
|
|
|
+
|
|
|
+ if (subscription->KeepAliveCount.currentValue <= subscription->KeepAliveCount.minValue || subscription->KeepAliveCount.currentValue > subscription->KeepAliveCount.maxValue) {
|
|
|
+ msg = (UA_unpublishedNotification *) malloc(sizeof(UA_unpublishedNotification));
|
|
|
+ LIST_INITENTRY(msg, listEntry);
|
|
|
+ INITPOINTER(msg->notification);
|
|
|
+
|
|
|
+ msg->notification = (UA_NotificationMessage *) malloc(sizeof(UA_NotificationMessage));
|
|
|
+ INITPOINTER(msg->notification->notificationData);
|
|
|
+ msg->notification->sequenceNumber = (subscription->SequenceNumber)+1; // KeepAlive uses next message, but does not increment counter
|
|
|
+ msg->notification->publishTime = UA_DateTime_now();
|
|
|
+ msg->notification->notificationDataSize = 0;
|
|
|
+
|
|
|
+ LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
|
|
|
+ subscription->KeepAliveCount.currentValue = subscription->KeepAliveCount.maxValue;
|
|
|
+ }
|
|
|
+
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
UA_MonitoredItem *mon;
|
|
|
//MonitoredItem_queuedValue *queuedValue;
|
|
@@ -184,20 +205,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
// Decrement KeepAlive
|
|
|
subscription->KeepAliveCount.currentValue--;
|
|
|
// +- Generate KeepAlive msg if counter overruns
|
|
|
- if (subscription->KeepAliveCount.currentValue <= subscription->KeepAliveCount.minValue || subscription->KeepAliveCount.currentValue > subscription->KeepAliveCount.maxValue) {
|
|
|
- msg = (UA_unpublishedNotification *) malloc(sizeof(UA_unpublishedNotification));
|
|
|
- LIST_INITENTRY(msg, listEntry);
|
|
|
- INITPOINTER(msg->notification);
|
|
|
-
|
|
|
- msg->notification = (UA_NotificationMessage *) malloc(sizeof(UA_NotificationMessage));
|
|
|
- INITPOINTER(msg->notification->notificationData);
|
|
|
- msg->notification->sequenceNumber = (subscription->SequenceNumber)+1; // KeepAlive uses next message, but does not increment counter
|
|
|
- msg->notification->publishTime = UA_DateTime_now();
|
|
|
- msg->notification->notificationDataSize = 0;
|
|
|
-
|
|
|
- LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
|
|
|
- subscription->KeepAliveCount.currentValue = subscription->KeepAliveCount.maxValue;
|
|
|
- }
|
|
|
+ Subscription_generateKeepAlive(subscription);
|
|
|
|
|
|
return;
|
|
|
}
|
|
@@ -208,12 +216,12 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
|
|
|
msg->notification = (UA_NotificationMessage *) malloc(sizeof(UA_NotificationMessage));
|
|
|
INITPOINTER(msg->notification->notificationData);
|
|
|
- msg->notification->sequenceNumber = (subscription->SequenceNumber)++;
|
|
|
+ msg->notification->sequenceNumber = subscription->SequenceNumber++;
|
|
|
msg->notification->publishTime = UA_DateTime_now();
|
|
|
|
|
|
// NotificationData is an array of Change, Status and Event messages, each containing the appropriate
|
|
|
// list of Queued values from all monitoredItems of that type
|
|
|
- msg->notification->notificationDataSize = ISNOTZERO(monItemsChangeT) + ISNOTZERO(monItemsEventT) + ISNOTZERO(monItemsStatusT);
|
|
|
+ msg->notification->notificationDataSize = ISNOTZERO(monItemsChangeT);// + ISNOTZERO(monItemsEventT) + ISNOTZERO(monItemsStatusT);
|
|
|
msg->notification->notificationData = (UA_ExtensionObject *) malloc(sizeof(UA_ExtensionObject) * msg->notification->notificationDataSize);
|
|
|
|
|
|
for(int notmsgn=0; notmsgn < msg->notification->notificationDataSize; notmsgn++) {
|
|
@@ -225,9 +233,6 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
if(notmsgn == 0) {
|
|
|
// Construct a DataChangeNotification
|
|
|
changeNotification = (UA_DataChangeNotification *) malloc(sizeof(UA_DataChangeNotification));
|
|
|
- changeNotification->diagnosticInfosSize = 0;
|
|
|
- changeNotification->diagnosticInfos = NULL;
|
|
|
- changeNotification->monitoredItemsSize = monItemsChangeT;
|
|
|
|
|
|
// Create one DataChangeNotification for each queue item held in each monitoredItems queue:
|
|
|
changeNotification->monitoredItems = (UA_MonitoredItemNotification *) malloc(sizeof(UA_MonitoredItemNotification) * monItemsChangeT);
|
|
@@ -237,18 +242,22 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
monItemsChangeT = 0;
|
|
|
for(mon=subscription->MonitoredItems->lh_first; mon != NULL; mon=mon->listEntry.le_next) {
|
|
|
if (mon->MonitoredItemType != MONITOREDITEM_CHANGENOTIFY_T || mon->queue->lh_first == NULL ) continue;
|
|
|
- monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications( &((changeNotification->monitoredItems)[monItemsChangeT]), mon);
|
|
|
+ // Note: Monitored Items might not return a queuedValue if there is a problem encoding it.
|
|
|
+ monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications( &((changeNotification->monitoredItems)[monItemsChangeT]), mon);
|
|
|
MonitoredItem_ClearQueue(mon);
|
|
|
}
|
|
|
-
|
|
|
- (msg->notification->notificationData[notmsgn]).body.length = UA_Array_calcSizeBinary(changeNotification, monItemsChangeT, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]);
|
|
|
- (msg->notification->notificationData[notmsgn]).body.data = malloc((msg->notification->notificationData[notmsgn]).body.length);
|
|
|
+ changeNotification->monitoredItemsSize = monItemsChangeT;
|
|
|
+ changeNotification->diagnosticInfosSize = 0;
|
|
|
+ changeNotification->diagnosticInfos = NULL;
|
|
|
+
|
|
|
+ (msg->notification->notificationData[notmsgn]).body.length = UA_calcSizeBinary(changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]);
|
|
|
+ (msg->notification->notificationData[notmsgn]).body.data = malloc((msg->notification->notificationData[notmsgn]).body.length);
|
|
|
|
|
|
notificationOffset = 0;
|
|
|
UA_encodeBinary((const void *) changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION], &(msg->notification->notificationData[notmsgn].body), ¬ificationOffset);
|
|
|
-
|
|
|
- //FIXME: The constructed changeNotification needs to be deallocated! That includes any and all MonitoredItemNotifications and their included Variants.
|
|
|
- printf("Size of constructed ChangeNotifications: %i Notifications, %i Byte\n", monItemsChangeT, msg->notification->notificationData[notmsgn].body.length );
|
|
|
+
|
|
|
+ UA_free(changeNotification->monitoredItems);
|
|
|
+ UA_free(changeNotification);
|
|
|
}
|
|
|
else if (notmsgn == 1) {
|
|
|
// FIXME: Constructing a StatusChangeNotification is not implemented
|
|
@@ -258,8 +267,6 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
}
|
|
|
}
|
|
|
LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
|
|
|
- printf("Constructed a notification message with %i changeNotifications\n", monItemsChangeT);
|
|
|
-
|
|
|
|
|
|
return;
|
|
|
}
|
|
@@ -271,140 +278,195 @@ int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *d
|
|
|
// Count instead of relying on the items currentValue
|
|
|
for (queueItem = monitoredItem->queue->lh_first; queueItem != NULL; queueItem=queueItem->listEntry.le_next) {
|
|
|
dst[queueSize].clientHandle = monitoredItem->ClientHandle;
|
|
|
- dst[queueSize].value.hasServerPicoseconds = 0;
|
|
|
- dst[queueSize].value.hasServerTimestamp = 0;
|
|
|
- dst[queueSize].value.serverTimestamp = 0;
|
|
|
- dst[queueSize].value.hasSourcePicoseconds = 0;
|
|
|
- dst[queueSize].value.hasSourceTimestamp = 0;
|
|
|
- dst[queueSize].value.hasValue = 1;
|
|
|
+ dst[queueSize].value.hasServerPicoseconds = UA_FALSE;
|
|
|
+ dst[queueSize].value.hasServerTimestamp = UA_FALSE;
|
|
|
+ dst[queueSize].value.serverTimestamp = UA_FALSE;
|
|
|
+ dst[queueSize].value.hasSourcePicoseconds = UA_FALSE;
|
|
|
+ dst[queueSize].value.hasSourceTimestamp = UA_FALSE;
|
|
|
+ dst[queueSize].value.hasValue = UA_TRUE;
|
|
|
dst[queueSize].value.status = UA_STATUSCODE_GOOD;
|
|
|
|
|
|
UA_Variant_copy(&(queueItem->value), &(dst[queueSize].value.value));
|
|
|
+
|
|
|
+ // Do not create variants with no type -> will make calcSizeBinary() segfault.
|
|
|
+ if(dst[queueSize].value.value.type == NULL) {
|
|
|
+ queueSize--;
|
|
|
+ };
|
|
|
queueSize++;
|
|
|
}
|
|
|
if (queueSize == 0) return 0;
|
|
|
|
|
|
- // Fill the Destination DataValue
|
|
|
-
|
|
|
return queueSize;
|
|
|
}
|
|
|
|
|
|
void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
|
|
|
+ MonitoredItem_queuedValue *val;
|
|
|
|
|
|
if (monitoredItem == NULL) return;
|
|
|
- while(monitoredItem->queue->lh_first != NULL) {
|
|
|
- LIST_REMOVE(monitoredItem->queue->lh_first, listEntry);
|
|
|
+ while(monitoredItem->queue->lh_first != NULL) {
|
|
|
+ val = monitoredItem->queue->lh_first;
|
|
|
+ LIST_REMOVE(monitoredItem->queue->lh_first, listEntry);
|
|
|
+ UA_free(val);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ (monitoredItem->QueueSize).currentValue = 0;
|
|
|
+
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
-void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem) {
|
|
|
- MonitoredItem_queuedValue *newvalue, *queueItem;
|
|
|
- UA_Boolean samplingError = UA_TRUE;
|
|
|
-
|
|
|
- if (monitoredItem == NULL) return;
|
|
|
- if( (monitoredItem->LastSampled + monitoredItem->SamplingInterval) > UA_DateTime_now()) return;
|
|
|
-
|
|
|
- // FIXME: Actively suppress non change value based monitoring. There should be another function to handle status and events.
|
|
|
- if (monitoredItem->MonitoredItemType != MONITOREDITEM_CHANGENOTIFY_T) return;
|
|
|
-
|
|
|
- newvalue = (MonitoredItem_queuedValue *) malloc(sizeof(MonitoredItem_queuedValue));
|
|
|
- LIST_INITENTRY(newvalue,listEntry);
|
|
|
- newvalue->value.arrayLength = 0;
|
|
|
- newvalue->value.arrayDimensionsSize = 0;
|
|
|
- newvalue->value.arrayDimensions = NULL;
|
|
|
- newvalue->value.data = NULL;
|
|
|
-
|
|
|
- // Create new Value
|
|
|
- switch(monitoredItem->AttributeID) {
|
|
|
- case UA_ATTRIBUTEID_NODEID:
|
|
|
- UA_Variant_setScalarCopy(&(newvalue->value), (const UA_NodeId *) &((const UA_Node *) monitoredItem->monitoredNode)->nodeId, &UA_TYPES[UA_TYPES_NODEID]);
|
|
|
- samplingError = UA_FALSE;
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_NODECLASS:
|
|
|
- UA_Variant_setScalarCopy(&(newvalue->value), (const UA_Int32 *) &((const UA_Node *) monitoredItem->monitoredNode)->nodeClass, &UA_TYPES[UA_TYPES_INT32]);
|
|
|
- samplingError = UA_FALSE;
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_BROWSENAME:
|
|
|
- UA_Variant_setScalarCopy(&(newvalue->value), (const UA_String *) &((const UA_Node *) monitoredItem->monitoredNode)->browseName, &UA_TYPES[UA_TYPES_STRING]);
|
|
|
- samplingError = UA_FALSE;
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_DISPLAYNAME:
|
|
|
- UA_Variant_setScalarCopy(&(newvalue->value), (const UA_String *) &((const UA_Node *) monitoredItem->monitoredNode)->displayName, &UA_TYPES[UA_TYPES_STRING]);
|
|
|
- samplingError = UA_FALSE;
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_DESCRIPTION:
|
|
|
- UA_Variant_setScalarCopy(&(newvalue->value), (const UA_String *) &((const UA_Node *) monitoredItem->monitoredNode)->description, &UA_TYPES[UA_TYPES_STRING]);
|
|
|
- samplingError = UA_FALSE;
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_WRITEMASK:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_USERWRITEMASK:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_ISABSTRACT:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_SYMMETRIC:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_INVERSENAME:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_CONTAINSNOLOOPS:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_EVENTNOTIFIER:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_VALUE:
|
|
|
- if (((const UA_Node *) monitoredItem->monitoredNode)->nodeClass == UA_NODECLASS_VARIABLE) {
|
|
|
- UA_Variant_copy( (const UA_Variant *) &((const UA_VariableNode *) monitoredItem->monitoredNode)->value, &(newvalue->value));
|
|
|
- samplingError = UA_FALSE;
|
|
|
- }
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_DATATYPE:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_VALUERANK:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_ARRAYDIMENSIONS:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_ACCESSLEVEL:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_USERACCESSLEVEL:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_HISTORIZING:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_EXECUTABLE:
|
|
|
- break;
|
|
|
- case UA_ATTRIBUTEID_USEREXECUTABLE:
|
|
|
- break;
|
|
|
- default:
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- if ((monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
|
|
|
- if (monitoredItem->DiscardOldest == UA_TRUE && monitoredItem->queue->lh_first != NULL ) {
|
|
|
- for(queueItem = monitoredItem->queue->lh_first; queueItem->listEntry.le_next != NULL; queueItem = queueItem->listEntry.le_next) {}
|
|
|
-
|
|
|
- LIST_REMOVE(queueItem, listEntry);
|
|
|
- UA_free(queueItem);
|
|
|
- (monitoredItem->QueueSize).currentValue--;
|
|
|
+UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 AttributeID, const UA_Node *src, UA_Variant *dst) {
|
|
|
+ UA_Boolean samplingError = UA_TRUE;
|
|
|
+ UA_DataValue sourceDataValue;
|
|
|
+ const UA_VariableNode *srcAsVariableNode = (const UA_VariableNode *) src;
|
|
|
+
|
|
|
+ // FIXME: Not all AttributeIDs can be monitored yet
|
|
|
+ switch(AttributeID) {
|
|
|
+ case UA_ATTRIBUTEID_NODEID:
|
|
|
+ UA_Variant_setScalarCopy(dst, (const UA_NodeId *) &(src->nodeId), &UA_TYPES[UA_TYPES_NODEID]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_NODECLASS:
|
|
|
+ UA_Variant_setScalarCopy(dst, (const UA_Int32 *) &(src->nodeClass), &UA_TYPES[UA_TYPES_INT32]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_BROWSENAME:
|
|
|
+ UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->browseName), &UA_TYPES[UA_TYPES_QUALIFIEDNAME]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_DISPLAYNAME:
|
|
|
+ UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->displayName), &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_DESCRIPTION:
|
|
|
+ UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->displayName), &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_WRITEMASK:
|
|
|
+ UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->writeMask), &UA_TYPES[UA_TYPES_UINT32]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_USERWRITEMASK:
|
|
|
+ UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->writeMask), &UA_TYPES[UA_TYPES_UINT32]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_ISABSTRACT:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_SYMMETRIC:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_INVERSENAME:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_CONTAINSNOLOOPS:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_EVENTNOTIFIER:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_VALUE:
|
|
|
+ if (src->nodeClass == UA_NODECLASS_VARIABLE) {
|
|
|
+ if ( srcAsVariableNode->valueSource == UA_VALUESOURCE_VARIANT) {
|
|
|
+ UA_Variant_copy( (const UA_Variant *) &((const UA_VariableNode *) src)->value, dst);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
}
|
|
|
- else {
|
|
|
- // We cannot remove the oldest value and theres no queue space left. We're done here.
|
|
|
- UA_free(newvalue);
|
|
|
- return;
|
|
|
+ else if (srcAsVariableNode->valueSource == UA_VALUESOURCE_DATASOURCE) {
|
|
|
+ if (srcAsVariableNode->value.dataSource.read(((const UA_VariableNode *) src)->value.dataSource.handle, (UA_Boolean) UA_TRUE, &sourceDataValue) == UA_STATUSCODE_GOOD) {
|
|
|
+ UA_Variant_copy( (const UA_Variant *) &(sourceDataValue.value), dst);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_DATATYPE:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_VALUERANK:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_ARRAYDIMENSIONS:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_ACCESSLEVEL:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_USERACCESSLEVEL:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_HISTORIZING:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_EXECUTABLE:
|
|
|
+ break;
|
|
|
+ case UA_ATTRIBUTEID_USEREXECUTABLE:
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ return samplingError;
|
|
|
+}
|
|
|
+
|
|
|
+void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem) {
|
|
|
+ MonitoredItem_queuedValue *newvalue = NULL, *queueItem = NULL;
|
|
|
+ UA_Boolean samplingError = UA_TRUE;
|
|
|
+ UA_ByteString newValueAsByteString = { .length=0, .data=NULL };
|
|
|
+ size_t encodingOffset = 0;
|
|
|
+
|
|
|
+ if (monitoredItem == NULL) return;
|
|
|
+ if( (monitoredItem->LastSampled + monitoredItem->SamplingInterval) > UA_DateTime_now()) {
|
|
|
+ return;
|
|
|
+ };
|
|
|
+
|
|
|
+ // FIXME: Actively suppress non change value based monitoring. There should be another function to handle status and events.
|
|
|
+ if (monitoredItem->MonitoredItemType != MONITOREDITEM_CHANGENOTIFY_T) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ newvalue = (MonitoredItem_queuedValue *) malloc(sizeof(MonitoredItem_queuedValue));
|
|
|
+ LIST_INITENTRY(newvalue,listEntry);
|
|
|
+ newvalue->value.arrayLength = 0;
|
|
|
+ newvalue->value.arrayDimensionsSize = 0;
|
|
|
+ newvalue->value.arrayDimensions = NULL;
|
|
|
+ newvalue->value.data = NULL;
|
|
|
+ newvalue->value.type = NULL;
|
|
|
+
|
|
|
+ samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->AttributeID, monitoredItem->monitoredNode, &(newvalue->value));
|
|
|
+
|
|
|
+ if ((monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
|
|
|
+ if (newvalue->value.type != NULL && monitoredItem->DiscardOldest == UA_TRUE && monitoredItem->queue->lh_first != NULL ) {
|
|
|
+ for(queueItem = monitoredItem->queue->lh_first; queueItem->listEntry.le_next != NULL; queueItem = queueItem->listEntry.le_next) {}
|
|
|
+
|
|
|
+ LIST_REMOVE(queueItem, listEntry);
|
|
|
+ UA_free(queueItem);
|
|
|
+ (monitoredItem->QueueSize).currentValue--;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ // We cannot remove the oldest value and theres no queue space left. We're done here.
|
|
|
+ UA_free(newvalue);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Only add a value if we have sampled it correctly and it fits into the queue;
|
|
|
+ if ( samplingError != UA_FALSE || newvalue->value.type == NULL || (monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
|
|
|
+ UA_free(newvalue);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ newValueAsByteString.length = UA_calcSizeBinary((const void *) &(newvalue->value), &UA_TYPES[UA_TYPES_VARIANT]);
|
|
|
+ newValueAsByteString.data = malloc(newValueAsByteString.length);
|
|
|
+ UA_encodeBinary((const void *) &(newvalue->value), &UA_TYPES[UA_TYPES_VARIANT], &(newValueAsByteString), &encodingOffset );
|
|
|
+
|
|
|
+ if(monitoredItem->LastSampledValue.data == NULL) {
|
|
|
+ UA_ByteString_copy((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue));
|
|
|
+ LIST_INSERT_HEAD(monitoredItem->queue, newvalue, listEntry);
|
|
|
+ (monitoredItem->QueueSize).currentValue++;
|
|
|
+ monitoredItem->LastSampled = UA_DateTime_now();
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ if (UA_String_equal((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue)) == UA_TRUE) {
|
|
|
+ UA_free(newValueAsByteString.data);
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
- // Only add a value if we have sampled it correctly and it fits into the queue;
|
|
|
- if ( samplingError == UA_FALSE &&(monitoredItem->QueueSize).currentValue < (monitoredItem->QueueSize).maxValue) {
|
|
|
- LIST_INSERT_HEAD(monitoredItem->queue, newvalue, listEntry);
|
|
|
- (monitoredItem->QueueSize).currentValue++;
|
|
|
- }
|
|
|
- else {
|
|
|
- UA_free(newvalue);
|
|
|
- }
|
|
|
-
|
|
|
- return;
|
|
|
+ UA_ByteString_copy((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue));
|
|
|
+ LIST_INSERT_HEAD(monitoredItem->queue, newvalue, listEntry);
|
|
|
+ (monitoredItem->QueueSize).currentValue++;
|
|
|
+ monitoredItem->LastSampled = UA_DateTime_now();
|
|
|
+ }
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
|
|
@@ -453,14 +515,13 @@ void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Sub
|
|
|
dst->notificationData->body.data = malloc(latest->notificationData->body.length);
|
|
|
UA_ByteString_copy((UA_String *) &(latest->notificationData->body), (UA_String *) &(dst->notificationData->body));
|
|
|
|
|
|
- //UA_memcpy((void *) (dst->notificationData->body).data, (void *) (latest->notificationData->body).data, latest->notificationData->body.length);
|
|
|
- printf("Copied Bytestring length %i into Bytestring length %i\n", latest->notificationData->body.length, dst->notificationData->body.length);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
-void Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub) {
|
|
|
+UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub) {
|
|
|
UA_unpublishedNotification *not;
|
|
|
-
|
|
|
+ UA_UInt32 deletedItems = 0;
|
|
|
+
|
|
|
for(not=sub->unpublishedNotifications->lh_first; not != NULL; not=not->listEntry.le_next) {
|
|
|
if (not->notification->sequenceNumber == seqNo) {
|
|
|
LIST_REMOVE(not, listEntry);
|
|
@@ -473,10 +534,12 @@ void Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription
|
|
|
}
|
|
|
UA_free(not->notification);
|
|
|
}
|
|
|
+
|
|
|
UA_free(not);
|
|
|
+ deletedItems++;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- return;
|
|
|
+ return deletedItems;
|
|
|
}
|
|
|
#endif //#ifdef ENABLESUBSCRIPTIONS
|