|
@@ -6,13 +6,13 @@
|
|
|
/* Subscription */
|
|
|
/****************/
|
|
|
|
|
|
-UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID) {
|
|
|
+UA_Subscription *UA_Subscription_new(UA_Int32 subscriptionID) {
|
|
|
UA_Subscription *new = UA_malloc(sizeof(UA_Subscription));
|
|
|
if(!new)
|
|
|
return UA_NULL;
|
|
|
- new->SubscriptionID = SubscriptionID;
|
|
|
- new->LastPublished = 0;
|
|
|
- new->SequenceNumber = 0;
|
|
|
+ new->subscriptionID = subscriptionID;
|
|
|
+ new->lastPublished = 0;
|
|
|
+ new->sequenceNumber = 0;
|
|
|
memset(&new->timedUpdateJobGuid, 0, sizeof(UA_Guid));
|
|
|
new->timedUpdateJob = UA_NULL;
|
|
|
new->timedUpdateIsRegistered = UA_FALSE;
|
|
@@ -27,7 +27,7 @@ void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *ser
|
|
|
|
|
|
// Just in case any parallel process attempts to access this subscription
|
|
|
// while we are deleting it... make it vanish.
|
|
|
- subscription->SubscriptionID = 0;
|
|
|
+ subscription->subscriptionID = 0;
|
|
|
|
|
|
// Delete monitored Items
|
|
|
LIST_FOREACH_SAFE(mon, &subscription->MonitoredItems, listEntry, tmp_mon) {
|
|
@@ -60,8 +60,8 @@ UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription) {
|
|
|
}
|
|
|
|
|
|
void Subscription_generateKeepAlive(UA_Subscription *subscription) {
|
|
|
- if(subscription->KeepAliveCount.currentValue > subscription->KeepAliveCount.minValue &&
|
|
|
- subscription->KeepAliveCount.currentValue <= subscription->KeepAliveCount.maxValue)
|
|
|
+ if(subscription->keepAliveCount.currentValue > subscription->keepAliveCount.minValue &&
|
|
|
+ subscription->keepAliveCount.currentValue <= subscription->keepAliveCount.maxValue)
|
|
|
return;
|
|
|
|
|
|
UA_unpublishedNotification *msg = UA_malloc(sizeof(UA_unpublishedNotification));
|
|
@@ -71,11 +71,11 @@ void Subscription_generateKeepAlive(UA_Subscription *subscription) {
|
|
|
msg->notification = UA_malloc(sizeof(UA_NotificationMessage));
|
|
|
msg->notification->notificationData = UA_NULL;
|
|
|
// KeepAlive uses next message, but does not increment counter
|
|
|
- msg->notification->sequenceNumber = subscription->SequenceNumber + 1;
|
|
|
+ msg->notification->sequenceNumber = subscription->sequenceNumber + 1;
|
|
|
msg->notification->publishTime = UA_DateTime_now();
|
|
|
msg->notification->notificationDataSize = 0;
|
|
|
LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
|
|
|
- subscription->KeepAliveCount.currentValue = subscription->KeepAliveCount.maxValue;
|
|
|
+ subscription->keepAliveCount.currentValue = subscription->keepAliveCount.maxValue;
|
|
|
}
|
|
|
|
|
|
void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
@@ -86,7 +86,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
UA_DataChangeNotification *changeNotification;
|
|
|
size_t notificationOffset;
|
|
|
|
|
|
- if(!subscription || subscription->LastPublished + subscription->PublishingInterval > UA_DateTime_now())
|
|
|
+ if(!subscription || subscription->lastPublished + subscription->publishingInterval > UA_DateTime_now())
|
|
|
return;
|
|
|
|
|
|
// Make sure there is data to be published and establish which message types
|
|
@@ -95,12 +95,12 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
// Check if this MonitoredItems Queue holds data and how much data is held in total
|
|
|
if(!TAILQ_FIRST(&mon->queue))
|
|
|
continue;
|
|
|
- if((mon->MonitoredItemType & MONITOREDITEM_TYPE_CHANGENOTIFY) != 0)
|
|
|
- monItemsChangeT+=mon->QueueSize.currentValue;
|
|
|
- else if((mon->MonitoredItemType & MONITOREDITEM_TYPE_STATUSNOTIFY) != 0)
|
|
|
- monItemsStatusT+=mon->QueueSize.currentValue;
|
|
|
- else if((mon->MonitoredItemType & MONITOREDITEM_TYPE_EVENTNOTIFY) != 0)
|
|
|
- monItemsEventT+=mon->QueueSize.currentValue;
|
|
|
+ if((mon->monitoredItemType & MONITOREDITEM_TYPE_CHANGENOTIFY) != 0)
|
|
|
+ monItemsChangeT+=mon->queueSize.currentValue;
|
|
|
+ else if((mon->monitoredItemType & MONITOREDITEM_TYPE_STATUSNOTIFY) != 0)
|
|
|
+ monItemsStatusT+=mon->queueSize.currentValue;
|
|
|
+ else if((mon->monitoredItemType & MONITOREDITEM_TYPE_EVENTNOTIFY) != 0)
|
|
|
+ monItemsEventT+=mon->queueSize.currentValue;
|
|
|
}
|
|
|
|
|
|
// FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
|
|
@@ -113,7 +113,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
|
|
|
if(monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
|
|
|
// Decrement KeepAlive
|
|
|
- subscription->KeepAliveCount.currentValue--;
|
|
|
+ subscription->keepAliveCount.currentValue--;
|
|
|
// +- Generate KeepAlive msg if counter overruns
|
|
|
Subscription_generateKeepAlive(subscription);
|
|
|
return;
|
|
@@ -122,7 +122,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
msg = (UA_unpublishedNotification *) UA_malloc(sizeof(UA_unpublishedNotification));
|
|
|
msg->notification = UA_malloc(sizeof(UA_NotificationMessage));
|
|
|
INITPOINTER(msg->notification->notificationData);
|
|
|
- msg->notification->sequenceNumber = subscription->SequenceNumber++;
|
|
|
+ msg->notification->sequenceNumber = subscription->sequenceNumber++;
|
|
|
msg->notification->publishTime = UA_DateTime_now();
|
|
|
|
|
|
// NotificationData is an array of Change, Status and Event messages, each containing the appropriate
|
|
@@ -150,7 +150,7 @@ void Subscription_updateNotifications(UA_Subscription *subscription) {
|
|
|
// the propper NotificationMessageType (Status, Change, Event)
|
|
|
monItemsChangeT = 0;
|
|
|
LIST_FOREACH(mon, &subscription->MonitoredItems, listEntry) {
|
|
|
- if(mon->MonitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY || !TAILQ_FIRST(&mon->queue))
|
|
|
+ if(mon->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY || !TAILQ_FIRST(&mon->queue))
|
|
|
continue;
|
|
|
// Note: Monitored Items might not return a queuedValue if there is a problem encoding it.
|
|
|
monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications(&changeNotification->monitoredItems[monItemsChangeT], mon);
|
|
@@ -201,8 +201,6 @@ UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
|
|
|
}
|
|
|
|
|
|
void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub) {
|
|
|
- UA_NotificationMessage *latest;
|
|
|
-
|
|
|
if(!dst)
|
|
|
return;
|
|
|
|
|
@@ -213,7 +211,7 @@ void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Sub
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- latest = LIST_FIRST(&sub->unpublishedNotifications)->notification;
|
|
|
+ UA_NotificationMessage *latest = LIST_FIRST(&sub->unpublishedNotifications)->notification;
|
|
|
dst->notificationDataSize = latest->notificationDataSize;
|
|
|
dst->publishTime = latest->publishTime;
|
|
|
dst->sequenceNumber = latest->sequenceNumber;
|
|
@@ -261,7 +259,7 @@ static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *da
|
|
|
// This is set by the Subscription_delete function to detere us from fiddling with
|
|
|
// this subscription if it is being deleted (not technically thread save, but better
|
|
|
// then nothing at all)
|
|
|
- if(sub->SubscriptionID == 0)
|
|
|
+ if(sub->subscriptionID == 0)
|
|
|
return;
|
|
|
|
|
|
// FIXME: This should be done by the event system
|
|
@@ -294,15 +292,15 @@ UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription
|
|
|
if(server == UA_NULL || sub == UA_NULL)
|
|
|
return UA_STATUSCODE_BADSERVERINDEXINVALID;
|
|
|
|
|
|
- if(sub->PublishingInterval <= 5 )
|
|
|
+ if(sub->publishingInterval <= 5 )
|
|
|
return UA_STATUSCODE_BADNOTSUPPORTED;
|
|
|
|
|
|
// Practically enough, the client sends a uint32 in ms, which we store as datetime, which here is required in as uint32 in ms as the interval
|
|
|
#ifdef _MSC_VER
|
|
|
- UA_Int32 retval = UA_Server_addRepeatedJob(server, *(sub->timedUpdateJob), sub->PublishingInterval,
|
|
|
+ UA_Int32 retval = UA_Server_addRepeatedJob(server, *(sub->timedUpdateJob), sub->publishingInterval,
|
|
|
&sub->timedUpdateJobGuid);
|
|
|
#else
|
|
|
- UA_StatusCode retval = UA_Server_addRepeatedJob(server, *sub->timedUpdateJob, sub->PublishingInterval,
|
|
|
+ UA_StatusCode retval = UA_Server_addRepeatedJob(server, *sub->timedUpdateJob, sub->publishingInterval,
|
|
|
&sub->timedUpdateJobGuid);
|
|
|
#endif
|
|
|
if(!retval)
|
|
@@ -324,13 +322,13 @@ UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscriptio
|
|
|
|
|
|
UA_MonitoredItem *UA_MonitoredItem_new() {
|
|
|
UA_MonitoredItem *new = (UA_MonitoredItem *) UA_malloc(sizeof(UA_MonitoredItem));
|
|
|
- new->QueueSize = (UA_UInt32_BoundedValue) { .minValue = 0, .maxValue = 0, .currentValue = 0};
|
|
|
- new->LastSampled = 0;
|
|
|
+ new->queueSize = (UA_UInt32_BoundedValue) { .minValue = 0, .maxValue = 0, .currentValue = 0};
|
|
|
+ new->lastSampled = 0;
|
|
|
// FIXME: This is currently hardcoded;
|
|
|
- new->MonitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
|
|
|
+ new->monitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
|
|
|
TAILQ_INIT(&new->queue);
|
|
|
UA_NodeId_init(&new->monitoredNodeId);
|
|
|
- INITPOINTER(new->LastSampledValue.data );
|
|
|
+ INITPOINTER(new->lastSampledValue.data );
|
|
|
return new;
|
|
|
}
|
|
|
|
|
@@ -340,8 +338,8 @@ void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
|
|
|
// Remove from subscription list
|
|
|
LIST_REMOVE(monitoredItem, listEntry);
|
|
|
// Release comparison sample
|
|
|
- if(monitoredItem->LastSampledValue.data != NULL) {
|
|
|
- UA_free(monitoredItem->LastSampledValue.data);
|
|
|
+ if(monitoredItem->lastSampledValue.data != NULL) {
|
|
|
+ UA_free(monitoredItem->lastSampledValue.data);
|
|
|
}
|
|
|
|
|
|
UA_NodeId_deleteMembers(&(monitoredItem->monitoredNodeId));
|
|
@@ -355,7 +353,7 @@ int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *d
|
|
|
|
|
|
// Count instead of relying on the items currentValue
|
|
|
TAILQ_FOREACH(queueItem, &monitoredItem->queue, listEntry) {
|
|
|
- dst[queueSize].clientHandle = monitoredItem->ClientHandle;
|
|
|
+ dst[queueSize].clientHandle = monitoredItem->clientHandle;
|
|
|
dst[queueSize].value.hasServerPicoseconds = UA_FALSE;
|
|
|
dst[queueSize].value.hasServerTimestamp = UA_FALSE;
|
|
|
dst[queueSize].value.serverTimestamp = UA_FALSE;
|
|
@@ -380,18 +378,18 @@ void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
|
|
|
UA_Variant_deleteMembers(&val->value);
|
|
|
UA_free(val);
|
|
|
}
|
|
|
- monitoredItem->QueueSize.currentValue = 0;
|
|
|
+ monitoredItem->queueSize.currentValue = 0;
|
|
|
}
|
|
|
|
|
|
-UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 AttributeID, const UA_Node *src,
|
|
|
+UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, const UA_Node *src,
|
|
|
UA_Variant *dst) {
|
|
|
UA_Boolean samplingError = UA_TRUE;
|
|
|
UA_DataValue sourceDataValue;
|
|
|
UA_DataValue_init(&sourceDataValue);
|
|
|
const UA_VariableNode *srcAsVariableNode = (const UA_VariableNode*)src;
|
|
|
|
|
|
- // FIXME: Not all AttributeIDs can be monitored yet
|
|
|
- switch(AttributeID) {
|
|
|
+ // FIXME: Not all attributeIDs can be monitored yet
|
|
|
+ switch(attributeID) {
|
|
|
case UA_ATTRIBUTEID_NODEID:
|
|
|
UA_Variant_setScalarCopy(dst, (const UA_NodeId*)&src->nodeId, &UA_TYPES[UA_TYPES_NODEID]);
|
|
|
samplingError = UA_FALSE;
|
|
@@ -480,20 +478,18 @@ UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 AttributeID, cons
|
|
|
}
|
|
|
|
|
|
void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem) {
|
|
|
- MonitoredItem_queuedValue *newvalue = NULL, *queueItem = NULL;
|
|
|
- UA_Boolean samplingError = UA_TRUE;
|
|
|
UA_ByteString newValueAsByteString = { .length=0, .data=NULL };
|
|
|
size_t encodingOffset = 0;
|
|
|
|
|
|
- if(!monitoredItem || monitoredItem->LastSampled + monitoredItem->SamplingInterval > UA_DateTime_now())
|
|
|
+ if(!monitoredItem || monitoredItem->lastSampled + monitoredItem->samplingInterval > UA_DateTime_now())
|
|
|
return;
|
|
|
|
|
|
// FIXME: Actively suppress non change value based monitoring. There should be
|
|
|
// another function to handle status and events.
|
|
|
- if(monitoredItem->MonitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY)
|
|
|
+ if(monitoredItem->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY)
|
|
|
return;
|
|
|
|
|
|
- newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
|
|
|
+ MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
|
|
|
if(!newvalue)
|
|
|
return;
|
|
|
|
|
@@ -510,8 +506,8 @@ void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monit
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->AttributeID, target,
|
|
|
- &newvalue->value);
|
|
|
+ UA_Boolean samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->attributeID, target,
|
|
|
+ &newvalue->value);
|
|
|
UA_NodeStore_release(target);
|
|
|
if(samplingError != UA_FALSE || !newvalue->value.type) {
|
|
|
UA_Variant_deleteMembers(&newvalue->value);
|
|
@@ -519,16 +515,16 @@ void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monit
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if(monitoredItem->QueueSize.currentValue >= monitoredItem->QueueSize.maxValue) {
|
|
|
- if(monitoredItem->DiscardOldest != UA_TRUE) {
|
|
|
+ if(monitoredItem->queueSize.currentValue >= monitoredItem->queueSize.maxValue) {
|
|
|
+ if(monitoredItem->discardOldest != UA_TRUE) {
|
|
|
// We cannot remove the oldest value and theres no queue space left. We're done here.
|
|
|
UA_free(newvalue);
|
|
|
return;
|
|
|
}
|
|
|
- queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
|
|
|
+ MonitoredItem_queuedValue *queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
|
|
|
TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
|
|
|
UA_free(queueItem);
|
|
|
- monitoredItem->QueueSize.currentValue--;
|
|
|
+ monitoredItem->queueSize.currentValue--;
|
|
|
}
|
|
|
|
|
|
// encode the data to find if its different to the previous
|
|
@@ -536,22 +532,22 @@ void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monit
|
|
|
newValueAsByteString.data = UA_malloc(newValueAsByteString.length);
|
|
|
UA_encodeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_VARIANT], &newValueAsByteString, &encodingOffset);
|
|
|
|
|
|
- if(!monitoredItem->LastSampledValue.data) {
|
|
|
- monitoredItem->LastSampledValue = newValueAsByteString;
|
|
|
+ if(!monitoredItem->lastSampledValue.data) {
|
|
|
+ monitoredItem->lastSampledValue = newValueAsByteString;
|
|
|
TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
|
|
|
- monitoredItem->QueueSize.currentValue++;
|
|
|
- monitoredItem->LastSampled = UA_DateTime_now();
|
|
|
+ monitoredItem->queueSize.currentValue++;
|
|
|
+ monitoredItem->lastSampled = UA_DateTime_now();
|
|
|
} else {
|
|
|
- if(UA_String_equal(&newValueAsByteString, &monitoredItem->LastSampledValue) == UA_TRUE) {
|
|
|
+ if(UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue) == UA_TRUE) {
|
|
|
UA_Variant_deleteMembers(&newvalue->value);
|
|
|
UA_free(newvalue);
|
|
|
UA_String_deleteMembers(&newValueAsByteString);
|
|
|
return;
|
|
|
}
|
|
|
- UA_ByteString_deleteMembers(&monitoredItem->LastSampledValue);
|
|
|
- monitoredItem->LastSampledValue = newValueAsByteString;
|
|
|
+ UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
|
|
|
+ monitoredItem->lastSampledValue = newValueAsByteString;
|
|
|
TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
|
|
|
- monitoredItem->QueueSize.currentValue++;
|
|
|
- monitoredItem->LastSampled = UA_DateTime_now();
|
|
|
+ monitoredItem->queueSize.currentValue++;
|
|
|
+ monitoredItem->lastSampled = UA_DateTime_now();
|
|
|
}
|
|
|
}
|