/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright 2017-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer) * Copyright 2017 (c) Stefan Profanter, fortiss GmbH * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH * Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA * Copyright 2018 (c) Fabian Arndt, Root-Core */ #include "ua_server_internal.h" #include "ua_subscription.h" #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */ /****************/ /* Notification */ /****************/ #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS static const UA_NodeId overflowEventType = {0, UA_NODEIDTYPE_NUMERIC, {UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE}}; static const UA_NodeId simpleOverflowEventType = {0, UA_NODEIDTYPE_NUMERIC, {UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE}}; static UA_Boolean UA_Notification_isOverflowEvent(UA_Server *server, UA_Notification *n) { UA_MonitoredItem *mon = n->mon; if(mon->attributeId != UA_ATTRIBUTEID_EVENTNOTIFIER) return false; UA_EventFieldList *efl = &n->data.event.fields; if(efl->eventFieldsSize >= 1 && efl->eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID] && isNodeInTree(server->nsCtx, (const UA_NodeId *)efl->eventFields[0].data, &overflowEventType, &subtypeId, 1)) { return true; } return false; } /* The specification states in Part 4 5.12.1.5 that an EventQueueOverflowEvent * "is generated when the first Event has to be discarded [...] without * discarding any other event". So only generate one for all deleted events. */ static UA_StatusCode createEventOverflowNotification(UA_Server *server, UA_Subscription *sub, UA_MonitoredItem *mon, UA_Notification *indicator) { /* Avoid two redundant overflow events in a row */ if(UA_Notification_isOverflowEvent(server, indicator)) return UA_STATUSCODE_GOOD; /* A notification is inserted into the queue which includes only the * NodeId of the overflowEventType. It is up to the client to check for * possible overflows. */ /* Allocate the notification */ UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification)); if(!overflowNotification) return UA_STATUSCODE_BADOUTOFMEMORY;; /* Set the notification fields */ overflowNotification->mon = mon; UA_EventFieldList_init(&overflowNotification->data.event.fields); overflowNotification->data.event.fields.eventFields = UA_Variant_new(); if(!overflowNotification->data.event.fields.eventFields) { UA_free(overflowNotification); return UA_STATUSCODE_BADOUTOFMEMORY;; } overflowNotification->data.event.fields.eventFieldsSize = 1; UA_StatusCode retval = UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields, &simpleOverflowEventType, &UA_TYPES[UA_TYPES_NODEID]); if(retval != UA_STATUSCODE_GOOD) { UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields); UA_free(overflowNotification); return retval; } /* Insert before the "indicator notification". This is either first in the * queue (if the oldest notification was removed) or before the new event * that remains the last element of the queue. */ TAILQ_INSERT_BEFORE(indicator, overflowNotification, listEntry); ++mon->eventOverflows; ++mon->queueSize; TAILQ_NEXT(overflowNotification, globalEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL; if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) { TAILQ_INSERT_BEFORE(indicator, overflowNotification, globalEntry); ++sub->notificationQueueSize; ++sub->eventNotifications; } return UA_STATUSCODE_GOOD; } #endif /* !!! The enqueue and dequeue operations need to match the reporting * disable/enable logic in Operation_SetMonitoringMode !!! */ void UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub, UA_MonitoredItem *mon, UA_Notification *n) { /* Add to the MonitoredItem */ TAILQ_INSERT_TAIL(&mon->queue, n, listEntry); ++mon->queueSize; #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER && UA_Notification_isOverflowEvent(server, n)) ++mon->eventOverflows; #endif /* Add to the subscription if reporting is enabled */ TAILQ_NEXT(n, globalEntry) = UA_SUBSCRIPTION_QUEUE_SENTINEL; if(mon->monitoringMode == UA_MONITORINGMODE_REPORTING) { TAILQ_INSERT_TAIL(&sub->notificationQueue, n, globalEntry); ++sub->notificationQueueSize; #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) { ++sub->eventNotifications; } else #endif { ++sub->dataChangeNotifications; } } /* Ensure enough space is available in the MonitoredItem. Do this only after * adding the new Notification. */ UA_MonitoredItem_ensureQueueSpace(server, mon); } void UA_Notification_dequeue(UA_Server *server, UA_Notification *n) { UA_MonitoredItem *mon = n->mon; UA_Subscription *sub = mon->subscription; /* Remove from the MonitoredItem queue */ #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER && UA_Notification_isOverflowEvent(server, n)) --mon->eventOverflows; #endif TAILQ_REMOVE(&mon->queue, n, listEntry); --mon->queueSize; /* Remove from the subscription's queue */ if(TAILQ_NEXT(n, globalEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) { #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) { --sub->eventNotifications; } else #endif { --sub->dataChangeNotifications; } TAILQ_REMOVE(&sub->notificationQueue, n, globalEntry); --sub->notificationQueueSize; } } void UA_Notification_delete(UA_Notification *n) { #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS UA_MonitoredItem *mon = n->mon; if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) { UA_EventFieldList_deleteMembers(&n->data.event.fields); /* EventFilterResult currently isn't being used * UA_EventFilterResult_delete(notification->data.event->result); */ } else #endif { UA_DataValue_deleteMembers(&n->data.value); } UA_free(n); } /*****************/ /* MonitoredItem */ /*****************/ void UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) { memset(mon, 0, sizeof(UA_MonitoredItem)); mon->subscription = sub; TAILQ_INIT(&mon->queue); } void UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) { /* Remove the sampling callback */ UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem); /* Remove the queued notifications if attached to a subscription (not a * local MonitoredItem) */ if(monitoredItem->subscription) { UA_Notification *notification, *notification_tmp; TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue, listEntry, notification_tmp) { /* Remove the item from the queues and free the memory */ UA_Notification_dequeue(server, notification); UA_Notification_delete(notification); } } #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS if(monitoredItem->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) { /* Remove the monitored item from the node queue */ UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId, UA_MonitoredItem_removeNodeEventCallback, monitoredItem); UA_EventFilter_clear(&monitoredItem->filter.eventFilter); } else #endif { /* UA_DataChangeFilter does not hold dynamic content we need to free */ /* UA_DataChangeFilter_clear(&monitoredItem->filter.dataChangeFilter); */ } /* Deregister MonitoredItem in userland */ if(server->config.monitoredItemRegisterCallback && monitoredItem->registered) { /* Get the session context. Local MonitoredItems don't have a subscription. */ UA_Session *session = NULL; if(monitoredItem->subscription) session = monitoredItem->subscription->session; if(!session) session = &server->adminSession; /* Get the node context */ void *targetContext = NULL; UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &targetContext); /* Deregister */ server->config.monitoredItemRegisterCallback(server, &session->sessionId, session->sessionHandle, &monitoredItem->monitoredNodeId, targetContext, monitoredItem->attributeId, true); } /* Remove the monitored item */ if(monitoredItem->listEntry.le_prev != NULL) LIST_REMOVE(monitoredItem, listEntry); UA_String_deleteMembers(&monitoredItem->indexRange); UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue); UA_Variant_deleteMembers(&monitoredItem->lastValue); UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId); /* No actual callback, just remove the structure */ monitoredItem->delayedFreePointers.callback = NULL; UA_WorkQueue_enqueueDelayed(&server->workQueue, &monitoredItem->delayedFreePointers); } UA_StatusCode UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) { /* Assert: The eventoverflow are counted in the queue size; There can be * only one eventoverflow more than normal entries */ UA_assert(mon->queueSize >= mon->eventOverflows); UA_assert(mon->eventOverflows <= mon->queueSize - mon->eventOverflows + 1); /* Nothing to do */ if(mon->queueSize - mon->eventOverflows <= mon->maxQueueSize) return UA_STATUSCODE_GOOD; #ifdef __clang_analyzer__ return UA_STATUSCODE_GOOD; #endif /* Remove notifications until the queue size is reached */ UA_Subscription *sub = mon->subscription; while(mon->queueSize - mon->eventOverflows > mon->maxQueueSize) { /* At least two notifications that are not eventOverflows in the queue */ UA_assert(mon->queueSize - mon->eventOverflows >= 2); /* Select the next notification to delete. Skip over overflow events. */ UA_Notification *del = NULL; if(mon->discardOldest) { /* Remove the oldest */ del = TAILQ_FIRST(&mon->queue); #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS while(UA_Notification_isOverflowEvent(server, del)) del = TAILQ_NEXT(del, listEntry); /* skip overflow events */ #endif } else { /* Remove the second newest (to keep the up-to-date notification). * The last entry is not an OverflowEvent -- we just added it. */ del = TAILQ_LAST(&mon->queue, NotificationQueue); del = TAILQ_PREV(del, NotificationQueue, listEntry); #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS while(UA_Notification_isOverflowEvent(server, del)) del = TAILQ_PREV(del, NotificationQueue, listEntry); /* skip overflow events */ #endif } UA_assert(del); /* There must have been one entry that can be deleted */ /* If reporting is activated (entries are also in the subscriptions * global queue): Move the entry after del in the per-MonitoredItem * queue right after del in the global queue. (It is already right after * del in the per-MonitoredItem queue.) This is required so we don't * starve MonitoredItems with a high sampling interval by always * removing their first appearance in the gloal queue for the * Subscription. */ if(TAILQ_NEXT(del, globalEntry) != UA_SUBSCRIPTION_QUEUE_SENTINEL) { UA_Notification *after_del = TAILQ_NEXT(del, listEntry); UA_assert(after_del); /* There must be one remaining element after del */ TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry); TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry); } /* Delete the notification */ UA_Notification_dequeue(server, del); UA_Notification_delete(del); } /* Get the element where the overflow shall be announced (infobits or * overflowevent) */ UA_Notification *indicator; if(mon->discardOldest) indicator = TAILQ_FIRST(&mon->queue); else indicator = TAILQ_LAST(&mon->queue, NotificationQueue); UA_assert(indicator); /* Create an overflow notification */ #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) { return createEventOverflowNotification(server, sub, mon, indicator); } else #endif { /* Set the infobits of a datachange notification */ if(mon->maxQueueSize > 1) { /* Add the infobits either to the newest or the new last entry */ indicator->data.value.hasStatus = true; indicator->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE | UA_STATUSCODE_INFOBITS_OVERFLOW); } } return UA_STATUSCODE_GOOD; } UA_StatusCode UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) { if(mon->sampleCallbackIsRegistered) return UA_STATUSCODE_GOOD; /* Only DataChange MonitoredItems have a callback with a sampling interval */ if(mon->attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER) return UA_STATUSCODE_GOOD; UA_StatusCode retval = UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback, mon, mon->samplingInterval, &mon->sampleCallbackId); if(retval == UA_STATUSCODE_GOOD) mon->sampleCallbackIsRegistered = true; return retval; } void UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) { if(!mon->sampleCallbackIsRegistered) return; UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId); mon->sampleCallbackIsRegistered = false; } #endif /* UA_ENABLE_SUBSCRIPTIONS */