123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- /* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- *
- * Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
- * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
- * Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
- * Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
- * Copyright 2018 (c) Fabian Arndt, Root-Core
- */
- #include "ua_server_internal.h"
- #include "ua_subscription.h"
- #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
- void
- UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
- memset(mon, 0, sizeof(UA_MonitoredItem));
- mon->subscription = sub;
- TAILQ_INIT(&mon->queue);
- }
- void
- UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
- UA_Subscription *sub = monitoredItem->subscription;
- UA_LOG_INFO_SESSION(server->config.logger, sub->session,
- "Subscription %u | MonitoredItem %i | "
- "Delete the MonitoredItem", sub->subscriptionId,
- monitoredItem->monitoredItemId);
- if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
- /* Remove the sampling callback */
- UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
- } else if (monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
- /* TODO: Access val data.event */
- UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
- "MonitoredItemTypes other than ChangeNotify or EventNotify "
- "are not supported yet");
- }
- /* Remove the queued notifications if attached to a subscription */
- if(monitoredItem->subscription) {
- UA_Notification *notification, *notification_tmp;
- TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
- listEntry, notification_tmp) {
- /* Remove the item from the queues and free the memory */
- UA_Notification_delete(sub, monitoredItem, notification);
- }
- }
- /* if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY)
- * -> UA_DataChangeFilter does not hold dynamic content we need to free */
- #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
- if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
- /* Remove the monitored item from the node queue */
- UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId,
- UA_MonitoredItem_removeNodeEventCallback, monitoredItem);
- /* Delete the event filter */
- UA_EventFilter_deleteMembers(&monitoredItem->filter.eventFilter);
- }
- #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
- /* Deregister MonitoredItem in userland */
- if(server->config.monitoredItemRegisterCallback && monitoredItem->registered) {
- /* Get the session context. Local MonitoredItems don't have a subscription. */
- UA_Session *session = NULL;
- if(monitoredItem->subscription)
- session = monitoredItem->subscription->session;
- if(!session)
- session = &server->adminSession;
- /* Get the node context */
- void *targetContext = NULL;
- UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &targetContext);
- /* Deregister */
- server->config.monitoredItemRegisterCallback(server, &session->sessionId,
- session->sessionHandle, &monitoredItem->monitoredNodeId,
- targetContext, monitoredItem->attributeId, true);
- }
- /* Remove the monitored item */
- if(monitoredItem->listEntry.le_prev != NULL)
- LIST_REMOVE(monitoredItem, listEntry);
- UA_String_deleteMembers(&monitoredItem->indexRange);
- UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
- UA_Variant_deleteMembers(&monitoredItem->lastValue);
- UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
- /* No actual callback, just remove the structure */
- monitoredItem->delayedFreePointers.callback = NULL;
- UA_WorkQueue_enqueueDelayed(&server->workQueue, &monitoredItem->delayedFreePointers);
- }
- UA_StatusCode
- UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
- if(mon->queueSize - mon->eventOverflows <= mon->maxQueueSize)
- return UA_STATUSCODE_GOOD;
- /* Remove notifications until the queue size is reached */
- UA_Subscription *sub = mon->subscription;
- while(mon->queueSize - mon->eventOverflows > mon->maxQueueSize) {
- UA_assert(mon->queueSize >= 2); /* At least two Notifications in the queue */
- /* Make sure that the MonitoredItem does not lose its place in the
- * global queue when notifications are removed. Otherwise the
- * MonitoredItem can "starve" itself by putting new notifications always
- * at the end of the global queue and removing the old ones.
- *
- * - If the oldest notification is removed, put the second oldest
- * notification right behind it.
- * - If the newest notification is removed, put the new notification
- * right behind it. */
- UA_Notification *del; /* The notification that will be deleted */
- UA_Notification *after_del; /* The notification to keep and move after del */
- if(mon->discardOldest) {
- /* Remove the oldest */
- del = TAILQ_FIRST(&mon->queue);
- after_del = TAILQ_NEXT(del, listEntry);
- } else {
- /* Remove the second newest (to keep the up-to-date notification) */
- after_del = TAILQ_LAST(&mon->queue, NotificationQueue);
- del = TAILQ_PREV(after_del, NotificationQueue, listEntry);
- }
- /* Move after_del right after del in the global queue */
- TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
- TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
- #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
- /* Check if an OverflowEvent is being deleted */
- /* TODO: EventOverflows should never be deleted */
- UA_NodeId overflowBaseId = UA_NODEID_NUMERIC(0, UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE);
- if(del->data.event.fields.eventFieldsSize == 1 &&
- del->data.event.fields.eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID] &&
- isNodeInTree(&server->config.nodestore,
- (UA_NodeId*)del->data.event.fields.eventFields[0].data,
- &overflowBaseId, &subtypeId, 1)) {
- --mon->eventOverflows;
- }
- #endif
- /* Delete the notification. This also removes the notification from the
- * linked lists. */
- UA_Notification_delete(sub, mon, del);
- }
- #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
- /* Create an overflow notification */
- /* The specification states in Part 4 5.12.1.5 that an EventQueueOverflowEvent
- * "is generated when the first Event has to be discarded [...] without discarding
- * any other event". So only generate one for all deleted events. */
- if (mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
- /* check if an overflowEvent is being deleted
- * TODO: make sure overflowEvents are never deleted */
- UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
- /* an overflowEvent does not care about event filters and as such
- * will not be "triggered" correctly. Instead, a notification will
- * be inserted into the queue which includes only the nodeId of the
- * overflowEventType. It is up to the client to check for possible
- * overflows. */
- UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification));
- if (!overflowNotification)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- UA_EventFieldList_init(&overflowNotification->data.event.fields);
- overflowNotification->data.event.fields.eventFields = UA_Variant_new();
- if (!overflowNotification->data.event.fields.eventFields) {
- UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
- UA_free(overflowNotification);
- return UA_STATUSCODE_BADOUTOFMEMORY;
- }
- overflowNotification->data.event.fields.eventFieldsSize = 1;
- UA_StatusCode retval =
- UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
- &overflowId, &UA_TYPES[UA_TYPES_NODEID]);
- if (retval != UA_STATUSCODE_GOOD) {
- UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
- UA_free(overflowNotification);
- return retval;
- }
- overflowNotification->mon = mon;
- /* The amount of notifications in the subscription don't change. The specification
- * only states that the queue size in each MonitoredItem isn't affected by OverflowEvents.
- * (In this case the queue in the MonitoredItemQueue IS affected internally because externally
- * the queueSize will always appear with eventOverflows subtracted from it)
- *
- * Since they are reduced in Notification_delete the queues are increased here, so they
- * will remain the same in the end.
- *
- * Do not use Notification_enqueue to insert the notification into the queues, since this would
- * cause a bad recursive call of this function.
- */
- if (mon->discardOldest) {
- TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry);
- TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue,
- overflowNotification, globalEntry);
- } else {
- TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry);
- TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue,
- overflowNotification, globalEntry);
- }
- ++mon->eventOverflows;
- ++mon->queueSize;
- ++sub->notificationQueueSize;
- ++sub->eventNotifications;
- }
- #endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
- if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
- /* Get the element that carries the infobits */
- UA_Notification *notification = NULL;
- if(mon->discardOldest)
- notification = TAILQ_FIRST(&mon->queue);
- else
- notification = TAILQ_LAST(&mon->queue, NotificationQueue);
- UA_assert(notification);
- if(mon->maxQueueSize > 1) {
- /* Add the infobits either to the newest or the new last entry */
- notification->data.value.hasStatus = true;
- notification->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
- UA_STATUSCODE_INFOBITS_OVERFLOW);
- } else {
- /* If the queue size is reduced to one, remove the infobits */
- notification->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
- UA_STATUSCODE_INFOBITS_OVERFLOW);
- }
- }
- /* TODO: Infobits for Events? */
- return UA_STATUSCODE_GOOD;
- }
- UA_StatusCode
- UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
- if(mon->sampleCallbackIsRegistered)
- return UA_STATUSCODE_GOOD;
- /* Only DataChange MonitoredItems have a callback with a sampling interval */
- if(mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY)
- return UA_STATUSCODE_GOOD;
- UA_StatusCode retval =
- UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback,
- mon, (UA_UInt32)mon->samplingInterval, &mon->sampleCallbackId);
- if(retval == UA_STATUSCODE_GOOD)
- mon->sampleCallbackIsRegistered = true;
- return retval;
- }
- UA_StatusCode
- UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
- if(!mon->sampleCallbackIsRegistered)
- return UA_STATUSCODE_GOOD;
- mon->sampleCallbackIsRegistered = false;
- return UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
- }
- #endif /* UA_ENABLE_SUBSCRIPTIONS */
|