|
@@ -1,3 +1,7 @@
|
|
|
+#ifndef ENABLESUBSCRIPTIONS
|
|
|
+#define ENABLESUBSCRIPTIONS
|
|
|
+#endif
|
|
|
+
|
|
|
#ifdef ENABLESUBSCRIPTIONS
|
|
|
#include "ua_types.h"
|
|
|
#include "ua_server_internal.h"
|
|
@@ -45,6 +49,9 @@ UA_MonitoredItem *UA_MonitoredItem_new() {
|
|
|
new->QueueSize = (UA_UInt32_BoundedValue) { .minValue = 0, .maxValue = 0, .currentValue = 0};
|
|
|
new->LastSampled = 0;
|
|
|
|
|
|
+ // FIXME: This is currently hardcoded;
|
|
|
+ new->MonitoredItemType = MONITOREDITEM_CHANGENOTIFY_T;
|
|
|
+
|
|
|
LIST_INIT(new->queue);
|
|
|
LIST_INITENTRY(new, listEntry);
|
|
|
INITPOINTER(new->monitoredNode);
|
|
@@ -146,15 +153,22 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
UA_MonitoredItem *mon;
|
|
|
//MonitoredItem_queuedValue *queuedValue;
|
|
|
UA_unpublishedNotification *msg = NULL;
|
|
|
- UA_Int32 monItemsWithData = 0;
|
|
|
+ UA_UInt32 monItemsChangeT = 0, monItemsStatusT = 0, monItemsEventT = 0;
|
|
|
+ UA_DataChangeNotification *changeNotification;
|
|
|
+ size_t notificationOffset;
|
|
|
|
|
|
if (subscription == NULL) return;
|
|
|
if ((subscription->LastPublished + subscription->PublishingInterval) > UA_DateTime_now()) return;
|
|
|
|
|
|
- // Check if any MonitoredItem Queues hold data
|
|
|
+
|
|
|
+ // Make sure there is data to be published and establish which message types
|
|
|
+ // will need to be generated
|
|
|
for(mon=subscription->MonitoredItems->lh_first; mon!= NULL; mon=mon->listEntry.le_next) {
|
|
|
- if (mon->queue->lh_first != NULL) {
|
|
|
- monItemsWithData++;
|
|
|
+ // Check if this MonitoredItems Queue holds data and how much data is held in total
|
|
|
+ if (mon->queue->lh_first != NULL) {
|
|
|
+ if ((mon->MonitoredItemType & MONITOREDITEM_CHANGENOTIFY_T) != 0) monItemsChangeT+=mon->QueueSize.currentValue;
|
|
|
+ else if ((mon->MonitoredItemType & MONITOREDITEM_STATUSNOTIFY_T) != 0) monItemsStatusT+=mon->QueueSize.currentValue;
|
|
|
+ else if ((mon->MonitoredItemType & MONITOREDITEM_EVENTNOTIFY_T) != 0) monItemsEventT+=mon->QueueSize.currentValue;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -166,7 +180,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
UA_free(msg);
|
|
|
}
|
|
|
|
|
|
- if (monItemsWithData == 0) {
|
|
|
+ if (monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
|
|
|
// Decrement KeepAlive
|
|
|
subscription->KeepAliveCount.currentValue--;
|
|
|
// +- Generate KeepAlive msg if counter overruns
|
|
@@ -178,7 +192,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
msg->notification = (UA_NotificationMessage *) malloc(sizeof(UA_NotificationMessage));
|
|
|
INITPOINTER(msg->notification->notificationData);
|
|
|
msg->notification->sequenceNumber = (subscription->SequenceNumber)+1; // KeepAlive uses next message, but does not increment counter
|
|
|
- msg->notification->publishTime = UA_DateTime_now();
|
|
|
+ msg->notification->publishTime = UA_DateTime_now();
|
|
|
msg->notification->notificationDataSize = 0;
|
|
|
|
|
|
LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
|
|
@@ -188,90 +202,181 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // One or more MonitoredItems hold data -> create a new NotificationMessage
|
|
|
-
|
|
|
- // +- Create Array of NotificationData
|
|
|
- // +- Clear Queue
|
|
|
+ msg = (UA_unpublishedNotification *) malloc(sizeof(UA_unpublishedNotification));
|
|
|
+ LIST_INITENTRY(msg, listEntry);
|
|
|
+ INITPOINTER(msg->notification);
|
|
|
+
|
|
|
+ msg->notification = (UA_NotificationMessage *) malloc(sizeof(UA_NotificationMessage));
|
|
|
+ INITPOINTER(msg->notification->notificationData);
|
|
|
+ msg->notification->sequenceNumber = (subscription->SequenceNumber)++;
|
|
|
+ msg->notification->publishTime = UA_DateTime_now();
|
|
|
+
|
|
|
+ // NotificationData is an array of Change, Status and Event messages, each containing the appropriate
|
|
|
+ // list of Queued values from all monitoredItems of that type
|
|
|
+ msg->notification->notificationDataSize = ISNOTZERO(monItemsChangeT) + ISNOTZERO(monItemsEventT) + ISNOTZERO(monItemsStatusT);
|
|
|
+ msg->notification->notificationData = (UA_ExtensionObject *) malloc(sizeof(UA_ExtensionObject) * msg->notification->notificationDataSize);
|
|
|
+
|
|
|
+ for(int notmsgn=0; notmsgn < msg->notification->notificationDataSize; notmsgn++) {
|
|
|
+ // Set the notification message type and encoding for each of
|
|
|
+ // the three possible NotificationData Types
|
|
|
+ (msg->notification->notificationData)[notmsgn].encoding = 1; // Encoding is always binary
|
|
|
+ (msg->notification->notificationData)[notmsgn].typeId = UA_NODEID_NUMERIC(0, 811);
|
|
|
+
|
|
|
+ if(notmsgn == 0) {
|
|
|
+ // Construct a DataChangeNotification
|
|
|
+ changeNotification = (UA_DataChangeNotification *) malloc(sizeof(UA_DataChangeNotification));
|
|
|
+ changeNotification->diagnosticInfosSize = 0;
|
|
|
+ changeNotification->diagnosticInfos = NULL;
|
|
|
+ changeNotification->monitoredItemsSize = monItemsChangeT;
|
|
|
+
|
|
|
+ // Create one DataChangeNotification for each queue item held in each monitoredItems queue:
|
|
|
+ changeNotification->monitoredItems = (UA_MonitoredItemNotification *) malloc(sizeof(UA_MonitoredItemNotification) * monItemsChangeT);
|
|
|
+
|
|
|
+ // Scan all monitoredItems in this subscription and have their queue transformed into an Array of
|
|
|
+ // the propper NotificationMessageType (Status, Change, Event)
|
|
|
+ monItemsChangeT = 0;
|
|
|
+ for(mon=subscription->MonitoredItems->lh_first; mon != NULL; mon=mon->listEntry.le_next) {
|
|
|
+ if (mon->MonitoredItemType != MONITOREDITEM_CHANGENOTIFY_T || mon->queue->lh_first == NULL ) continue;
|
|
|
+ monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications( &((changeNotification->monitoredItems)[monItemsChangeT]), mon);
|
|
|
+ MonitoredItem_ClearQueue(mon);
|
|
|
+ }
|
|
|
+
|
|
|
+ (msg->notification->notificationData[notmsgn]).body.length = UA_Array_calcSizeBinary(changeNotification, monItemsChangeT, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]);
|
|
|
+ (msg->notification->notificationData[notmsgn]).body.data = malloc((msg->notification->notificationData[notmsgn]).body.length);
|
|
|
+
|
|
|
+ notificationOffset = 0;
|
|
|
+ UA_encodeBinary((const void *) changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION], &(msg->notification->notificationData[notmsgn].body), ¬ificationOffset);
|
|
|
+
|
|
|
+ //FIXME: The constructed changeNotification needs to be deallocated! That includes any and all MonitoredItemNotifications and their included Variants.
|
|
|
+ printf("Size of constructed ChangeNotifications: %i Notifications, %i Byte\n", monItemsChangeT, msg->notification->notificationData[notmsgn].body.length );
|
|
|
+ }
|
|
|
+ else if (notmsgn == 1) {
|
|
|
+ // FIXME: Constructing a StatusChangeNotification is not implemented
|
|
|
+ }
|
|
|
+ else if (notmsgn == 2) {
|
|
|
+ // FIXME: Constructing a EventListNotification is not implemented
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
|
|
|
+ printf("Constructed a notification message with %i changeNotifications\n", monItemsChangeT);
|
|
|
|
|
|
- // Fill the NotificationMessage with NotificationData
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst, UA_MonitoredItem *monitoredItem) {
|
|
|
+ int queueSize = 0;
|
|
|
+ MonitoredItem_queuedValue *queueItem;
|
|
|
+
|
|
|
+ // Count instead of relying on the items currentValue
|
|
|
+ for (queueItem = monitoredItem->queue->lh_first; queueItem != NULL; queueItem=queueItem->listEntry.le_next) {
|
|
|
+ dst[queueSize].clientHandle = monitoredItem->ClientHandle;
|
|
|
+ dst[queueSize].value.hasServerPicoseconds = 0;
|
|
|
+ dst[queueSize].value.hasServerTimestamp = 0;
|
|
|
+ dst[queueSize].value.serverTimestamp = 0;
|
|
|
+ dst[queueSize].value.hasSourcePicoseconds = 0;
|
|
|
+ dst[queueSize].value.hasSourceTimestamp = 0;
|
|
|
+ dst[queueSize].value.hasValue = 1;
|
|
|
+ dst[queueSize].value.status = UA_STATUSCODE_GOOD;
|
|
|
+
|
|
|
+ UA_Variant_copy(&(queueItem->value), &(dst[queueSize].value.value));
|
|
|
+ queueSize++;
|
|
|
+ }
|
|
|
+ if (queueSize == 0) return 0;
|
|
|
+
|
|
|
+ // Fill the Destination DataValue
|
|
|
+
|
|
|
+ return queueSize;
|
|
|
+}
|
|
|
+
|
|
|
void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
|
|
|
|
|
|
if (monitoredItem == NULL) return;
|
|
|
while(monitoredItem->queue->lh_first != NULL) {
|
|
|
LIST_REMOVE(monitoredItem->queue->lh_first, listEntry);
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
- return;
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem) {
|
|
|
MonitoredItem_queuedValue *newvalue, *queueItem;
|
|
|
-
|
|
|
+ UA_Boolean samplingError = UA_TRUE;
|
|
|
+
|
|
|
if (monitoredItem == NULL) return;
|
|
|
if( (monitoredItem->LastSampled + monitoredItem->SamplingInterval) > UA_DateTime_now()) return;
|
|
|
|
|
|
+ // FIXME: Actively suppress non change value based monitoring. There should be another function to handle status and events.
|
|
|
+ if (monitoredItem->MonitoredItemType != MONITOREDITEM_CHANGENOTIFY_T) return;
|
|
|
+
|
|
|
newvalue = (MonitoredItem_queuedValue *) malloc(sizeof(MonitoredItem_queuedValue));
|
|
|
LIST_INITENTRY(newvalue,listEntry);
|
|
|
+ newvalue->value.arrayLength = 0;
|
|
|
+ newvalue->value.arrayDimensionsSize = 0;
|
|
|
+ newvalue->value.arrayDimensions = NULL;
|
|
|
+ newvalue->value.data = NULL;
|
|
|
|
|
|
// Create new Value
|
|
|
switch(monitoredItem->AttributeID) {
|
|
|
- UA_ATTRIBUTEID_NODEID:
|
|
|
+ case UA_ATTRIBUTEID_NODEID:
|
|
|
UA_Variant_setScalarCopy(&(newvalue->value), (const UA_NodeId *) &((const UA_Node *) monitoredItem->monitoredNode)->nodeId, &UA_TYPES[UA_TYPES_NODEID]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_NODECLASS:
|
|
|
+ case UA_ATTRIBUTEID_NODECLASS:
|
|
|
UA_Variant_setScalarCopy(&(newvalue->value), (const UA_Int32 *) &((const UA_Node *) monitoredItem->monitoredNode)->nodeClass, &UA_TYPES[UA_TYPES_INT32]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_BROWSENAME:
|
|
|
+ case UA_ATTRIBUTEID_BROWSENAME:
|
|
|
UA_Variant_setScalarCopy(&(newvalue->value), (const UA_String *) &((const UA_Node *) monitoredItem->monitoredNode)->browseName, &UA_TYPES[UA_TYPES_STRING]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_DISPLAYNAME:
|
|
|
+ case UA_ATTRIBUTEID_DISPLAYNAME:
|
|
|
UA_Variant_setScalarCopy(&(newvalue->value), (const UA_String *) &((const UA_Node *) monitoredItem->monitoredNode)->displayName, &UA_TYPES[UA_TYPES_STRING]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_DESCRIPTION:
|
|
|
+ case UA_ATTRIBUTEID_DESCRIPTION:
|
|
|
UA_Variant_setScalarCopy(&(newvalue->value), (const UA_String *) &((const UA_Node *) monitoredItem->monitoredNode)->description, &UA_TYPES[UA_TYPES_STRING]);
|
|
|
+ samplingError = UA_FALSE;
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_WRITEMASK:
|
|
|
+ case UA_ATTRIBUTEID_WRITEMASK:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_USERWRITEMASK:
|
|
|
+ case UA_ATTRIBUTEID_USERWRITEMASK:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_ISABSTRACT:
|
|
|
+ case UA_ATTRIBUTEID_ISABSTRACT:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_SYMMETRIC:
|
|
|
+ case UA_ATTRIBUTEID_SYMMETRIC:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_INVERSENAME:
|
|
|
+ case UA_ATTRIBUTEID_INVERSENAME:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_CONTAINSNOLOOPS:
|
|
|
+ case UA_ATTRIBUTEID_CONTAINSNOLOOPS:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_EVENTNOTIFIER:
|
|
|
+ case UA_ATTRIBUTEID_EVENTNOTIFIER:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_VALUE:
|
|
|
+ case UA_ATTRIBUTEID_VALUE:
|
|
|
if (((const UA_Node *) monitoredItem->monitoredNode)->nodeClass == UA_NODECLASS_VARIABLE) {
|
|
|
UA_Variant_copy( (const UA_Variant *) &((const UA_VariableNode *) monitoredItem->monitoredNode)->value, &(newvalue->value));
|
|
|
+ samplingError = UA_FALSE;
|
|
|
}
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_DATATYPE:
|
|
|
+ case UA_ATTRIBUTEID_DATATYPE:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_VALUERANK:
|
|
|
+ case UA_ATTRIBUTEID_VALUERANK:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_ARRAYDIMENSIONS:
|
|
|
+ case UA_ATTRIBUTEID_ARRAYDIMENSIONS:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_ACCESSLEVEL:
|
|
|
+ case UA_ATTRIBUTEID_ACCESSLEVEL:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_USERACCESSLEVEL:
|
|
|
+ case UA_ATTRIBUTEID_USERACCESSLEVEL:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
|
|
|
+ case UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_HISTORIZING:
|
|
|
+ case UA_ATTRIBUTEID_HISTORIZING:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_EXECUTABLE:
|
|
|
+ case UA_ATTRIBUTEID_EXECUTABLE:
|
|
|
break;
|
|
|
- UA_ATTRIBUTEID_USEREXECUTABLE:
|
|
|
+ case UA_ATTRIBUTEID_USEREXECUTABLE:
|
|
|
break;
|
|
|
- default:
|
|
|
+ default:
|
|
|
break;
|
|
|
}
|
|
|
|
|
@@ -284,13 +389,21 @@ void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem) {
|
|
|
(monitoredItem->QueueSize).currentValue--;
|
|
|
}
|
|
|
else {
|
|
|
+ // We cannot remove the oldest value and theres no queue space left. We're done here.
|
|
|
UA_free(newvalue);
|
|
|
+ return;
|
|
|
}
|
|
|
}
|
|
|
- if ((monitoredItem->QueueSize).currentValue < (monitoredItem->QueueSize).maxValue) {
|
|
|
- LIST_INSERT_HEAD(monitoredItem->queue, newvalue, listEntry);
|
|
|
- (monitoredItem->QueueSize).currentValue++;
|
|
|
+
|
|
|
+ // Only add a value if we have sampled it correctly and it fits into the queue;
|
|
|
+ if ( samplingError == UA_FALSE &&(monitoredItem->QueueSize).currentValue < (monitoredItem->QueueSize).maxValue) {
|
|
|
+ LIST_INSERT_HEAD(monitoredItem->queue, newvalue, listEntry);
|
|
|
+ (monitoredItem->QueueSize).currentValue++;
|
|
|
}
|
|
|
+ else {
|
|
|
+ UA_free(newvalue);
|
|
|
+ }
|
|
|
+
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -336,8 +449,12 @@ void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Sub
|
|
|
dst->notificationData = (UA_ExtensionObject *) malloc(sizeof(UA_ExtensionObject));
|
|
|
dst->notificationData->encoding = latest->notificationData->encoding;
|
|
|
dst->notificationData->typeId = latest->notificationData->typeId;
|
|
|
- dst->notificationData->body.length = 0;
|
|
|
+ dst->notificationData->body.length = latest->notificationData->body.length;
|
|
|
+ dst->notificationData->body.data = malloc(latest->notificationData->body.length);
|
|
|
+ UA_ByteString_copy((UA_String *) &(latest->notificationData->body), (UA_String *) &(dst->notificationData->body));
|
|
|
|
|
|
+ //UA_memcpy((void *) (dst->notificationData->body).data, (void *) (latest->notificationData->body).data, latest->notificationData->body.length);
|
|
|
+ printf("Copied Bytestring length %i into Bytestring length %i\n", latest->notificationData->body.length, dst->notificationData->body.length);
|
|
|
return;
|
|
|
}
|
|
|
|