Browse Source

Subscriptions: Separate file for MonitoredItem code

Julius Pfrommer 6 years ago
parent
commit
a0d60337b3

+ 8 - 3
CMakeLists.txt

@@ -499,9 +499,6 @@ set(lib_sources ${PROJECT_SOURCE_DIR}/src/ua_types.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_server_discovery.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_securechannel_manager.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_session_manager.c
-                ${PROJECT_SOURCE_DIR}/src/server/ua_subscription.c
-                ${PROJECT_SOURCE_DIR}/src/server/ua_subscription_datachange.c
-                ${PROJECT_SOURCE_DIR}/src/server/ua_subscription_events.c
                 ${PROJECT_SOURCE_DIR}/src/pubsub/ua_pubsub_networkmessage.c
                 ${PROJECT_SOURCE_DIR}/src/pubsub/ua_pubsub.c
                 ${PROJECT_SOURCE_DIR}/src/pubsub/ua_pubsub_manager.c
@@ -558,6 +555,14 @@ if(UA_ENABLE_PUBSUB)
     list(APPEND default_plugin_sources ${PROJECT_SOURCE_DIR}/plugins/ua_network_pubsub_udp.c)
 endif()
 
+if(UA_ENABLE_SUBSCRIPTIONS)
+    list(APPEND lib_sources ${PROJECT_SOURCE_DIR}/src/server/ua_subscription.c
+                            ${PROJECT_SOURCE_DIR}/src/server/ua_subscription_monitoreditem.c
+                            ${PROJECT_SOURCE_DIR}/src/server/ua_subscription_datachange.c)
+    if(UA_ENABLE_SUBSCRIPTIONS_EVENTS)
+        list(APPEND lib_sources ${PROJECT_SOURCE_DIR}/src/server/ua_subscription_events.c)
+    endif()
+endif()
 
 if(UA_DEBUG_DUMP_PKGS)
     list(APPEND lib_sources ${PROJECT_SOURCE_DIR}/plugins/ua_debug_dump_pkgs.c)

+ 1 - 1
src/server/ua_services_subscription.c

@@ -449,7 +449,7 @@ Operation_ModifyMonitoredItem(UA_Server *server, UA_Session *session, UA_Subscri
     result->revisedQueueSize = mon->maxQueueSize;
 
     /* Remove some notifications if the queue is now too small */
-    MonitoredItem_ensureQueueSpace(server, mon);
+    UA_MonitoredItem_ensureQueueSpace(server, mon);
 }
 
 void

+ 1 - 1
src/server/ua_subscription.c

@@ -40,7 +40,7 @@ UA_Notification_enqueue(UA_Server *server, UA_Subscription *sub,
 
     /* Ensure enough space is available in the MonitoredItem. Do this only after
      * adding the new Notification. */
-    MonitoredItem_ensureQueueSpace(server, mon);
+    UA_MonitoredItem_ensureQueueSpace(server, mon);
 }
 
 void

+ 6 - 2
src/server/ua_subscription.h

@@ -130,7 +130,10 @@ UA_StatusCode UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_Mo
 
 /* Remove entries until mon->maxQueueSize is reached. Sets infobits for lost
  * data if required. */
-UA_StatusCode MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon);
+UA_StatusCode UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon);
+
+UA_StatusCode UA_MonitoredItem_removeNodeEventCallback(UA_Server *server, UA_Session *session,
+                                                       UA_Node *node, void *data);
 
 /****************/
 /* Subscription */
@@ -210,7 +213,8 @@ UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
                                     UA_UInt32 monitoredItemId);
 
 void UA_Subscription_publish(UA_Server *server, UA_Subscription *sub);
-UA_StatusCode UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub, UA_UInt32 sequenceNumber);
+UA_StatusCode UA_Subscription_removeRetransmissionMessage(UA_Subscription *sub,
+                                                          UA_UInt32 sequenceNumber);
 void UA_Subscription_answerPublishRequestsNoSubscription(UA_Server *server, UA_Session *session);
 UA_Boolean UA_Subscription_reachedPublishReqLimit(UA_Server *server,  UA_Session *session);
 

+ 2 - 251
src/server/ua_subscription_datachange.c

@@ -17,230 +17,6 @@
 
 #define UA_VALUENCODING_MAXSTACK 512
 
-void UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
-    memset(mon, 0, sizeof(UA_MonitoredItem));
-    mon->subscription = sub;
-    TAILQ_INIT(&mon->queue);
-}
-
-#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
-static UA_StatusCode
-removeMonitoredItemFromNodeCallback(UA_Server *server, UA_Session *session,
-                                    UA_Node *node, void *data) {
-    /* data is the monitoredItemID */
-    /* catch edge case that it's the first element */
-    if (data == ((UA_ObjectNode *) node)->monitoredItemQueue) {
-        ((UA_ObjectNode *)node)->monitoredItemQueue = ((UA_MonitoredItem *)data)->next;
-        return UA_STATUSCODE_GOOD;
-    }
-
-    /* SLIST_FOREACH */
-    for (UA_MonitoredItem *entry = ((UA_ObjectNode *) node)->monitoredItemQueue->next;
-         entry != NULL; entry=entry->next) {
-        if (entry == (UA_MonitoredItem *)data) {
-            /* SLIST_REMOVE */
-            UA_MonitoredItem *iter = ((UA_ObjectNode *) node)->monitoredItemQueue;
-            for (; iter->next != entry; iter=iter->next) {}
-            iter->next = entry->next;
-            UA_free(entry);
-            break;
-        }
-    }
-    return UA_STATUSCODE_GOOD;
-}
-#endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
-
-void UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
-    if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
-        /* Remove the sampling callback */
-        UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
-    } else if (monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
-        /* TODO: Access val data.event */
-        UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
-                     "MonitoredItemTypes other than ChangeNotify or EventNotify are not supported yet");
-    }
-
-    /* Remove the queued notifications if attached to a subscription */
-    if(monitoredItem->subscription) {
-        UA_Subscription *sub = monitoredItem->subscription;
-        UA_Notification *notification, *notification_tmp;
-        TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
-                           listEntry, notification_tmp) {
-            /* Remove the item from the queues and free the memory */
-            UA_Notification_delete(sub, monitoredItem, notification);
-        }
-    }
-
-#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
-    if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
-        /* Remove the monitored item from the node queue */
-        UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId,
-                           removeMonitoredItemFromNodeCallback, monitoredItem);
-        /* Delete the event filter */
-        UA_EventFilter_deleteMembers(&monitoredItem->filter.eventFilter);
-    }
-#endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
-
-    /* Deregister MonitoredItem in userland */
-    if(server->config.monitoredItemRegisterCallback && monitoredItem->registered) {
-        /* Get the session context. Local MonitoredItems don't have a subscription. */
-        UA_Session *session = NULL;
-        if(monitoredItem->subscription)
-            session = monitoredItem->subscription->session;
-        if(!session)
-            session = &server->adminSession;
-
-        /* Get the node context */
-        void *targetContext = NULL;
-        UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &targetContext);
-
-        /* Deregister */
-        server->config.monitoredItemRegisterCallback(server,
-                                                     &session->sessionId, session->sessionHandle,
-                                                     &monitoredItem->monitoredNodeId, targetContext,
-                                                     monitoredItem->attributeId, true);
-    }
-
-    /* Remove the monitored item */
-    if(monitoredItem->listEntry.le_prev != NULL)
-        LIST_REMOVE(monitoredItem, listEntry);
-    UA_String_deleteMembers(&monitoredItem->indexRange);
-    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
-    UA_Variant_deleteMembers(&monitoredItem->lastValue);
-    UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
-    UA_Server_delayedFree(server, monitoredItem);
-}
-
-UA_StatusCode
-MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
-    if(mon->queueSize - mon->eventOverflows <= mon->maxQueueSize)
-        return UA_STATUSCODE_GOOD;
-
-    /* Remove notifications until the queue size is reached */
-    UA_Subscription *sub = mon->subscription;
-    while(mon->queueSize - mon->eventOverflows > mon->maxQueueSize) {
-        UA_assert(mon->queueSize >= 2); /* At least two Notifications in the queue */
-
-        /* Make sure that the MonitoredItem does not lose its place in the
-         * global queue when notifications are removed. Otherwise the
-         * MonitoredItem can "starve" itself by putting new notifications always
-         * at the end of the global queue and removing the old ones.
-         *
-         * - If the oldest notification is removed, put the second oldest
-         *   notification right behind it.
-         * - If the newest notification is removed, put the new notification
-         *   right behind it. */
-
-        UA_Notification *del; /* The notification that will be deleted */
-        UA_Notification *after_del; /* The notification to keep and move after del */
-        if(mon->discardOldest) {
-            /* Remove the oldest */
-            del = TAILQ_FIRST(&mon->queue);
-            after_del = TAILQ_NEXT(del, listEntry);
-        } else {
-            /* Remove the second newest (to keep the up-to-date notification) */
-            after_del = TAILQ_LAST(&mon->queue, NotificationQueue);
-            del = TAILQ_PREV(after_del, NotificationQueue, listEntry);
-        }
-
-        /* Move after_del right after del in the global queue */
-        TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
-        TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
-
-#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
-        /* Create an overflow notification */
-         if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
-             /* check if an overflowEvent is being deleted
-              * TODO: make sure overflowEvents are never deleted */
-             UA_NodeId overflowBaseId = UA_NODEID_NUMERIC(0, UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE);
-             UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
-
-             /* Check if an OverflowEvent is being deleted */
-             if (del->data.event.fields.eventFieldsSize == 1
-                 && del->data.event.fields.eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID]
-                 && isNodeInTree(&server->config.nodestore, (UA_NodeId*)del->data.event.fields.eventFields[0].data,
-                                 &overflowBaseId, &subtypeId, 1)) {
-                 /* Don't do anything, since adding and removing an overflow will not change anything */
-                 return UA_STATUSCODE_GOOD;
-             }
-
-             /* cause an overflowEvent */
-             /* an overflowEvent does not care about event filters and as such
-              * will not be "triggered" correctly. Instead, a notification will
-              * be inserted into the queue which includes only the nodeId of the
-              * overflowEventType. It is up to the client to check for possible
-              * overflows. */
-             UA_Notification *overflowNotification = (UA_Notification *) UA_malloc(sizeof(UA_Notification));
-             if(!overflowNotification)
-                 return UA_STATUSCODE_BADOUTOFMEMORY;
-
-             UA_EventFieldList_init(&overflowNotification->data.event.fields);
-             overflowNotification->data.event.fields.eventFields = UA_Variant_new();
-             if(!overflowNotification->data.event.fields.eventFields) {
-                 UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
-                 UA_free(overflowNotification);
-                 return UA_STATUSCODE_BADOUTOFMEMORY;
-             }
-
-             overflowNotification->data.event.fields.eventFieldsSize = 1;
-             UA_StatusCode retval = UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
-                                      &overflowId, &UA_TYPES[UA_TYPES_NODEID]);
-             if (retval != UA_STATUSCODE_GOOD) {
-                 UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
-                 UA_free(overflowNotification);
-                 return retval;
-             }
-
-             overflowNotification->mon = mon;
-             if(mon->discardOldest) {
-                 TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry);
-                 TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue, overflowNotification, globalEntry);
-             } else {
-                 TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry);
-                 TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue, overflowNotification, globalEntry);
-             }
-
-
-             /* The amount of notifications in the subscription don't change. The specification
-              * only states that the queue size in each MonitoredItem isn't affected by OverflowEvents.
-              * Since they are reduced in Notification_delete the queues are increased here, so they
-              * will remain the same in the end.
-              */
-             ++sub->notificationQueueSize;
-             ++sub->eventNotifications;
-         }
-#endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
-
-        /* Delete the notification. This also removes the notification from the
-         * linked lists. */
-        UA_Notification_delete(sub, mon, del);
-    }
-
-    if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
-        /* Get the element that carries the infobits */
-        UA_Notification *notification = NULL;
-        if(mon->discardOldest)
-            notification = TAILQ_FIRST(&mon->queue);
-        else
-            notification = TAILQ_LAST(&mon->queue, NotificationQueue);
-        UA_assert(notification);
-
-        if(mon->maxQueueSize > 1) {
-            /* Add the infobits either to the newest or the new last entry */
-            notification->data.value.hasStatus = true;
-            notification->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
-                                                UA_STATUSCODE_INFOBITS_OVERFLOW);
-        } else {
-            /* If the queue size is reduced to one, remove the infobits */
-            notification->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
-                                                                UA_STATUSCODE_INFOBITS_OVERFLOW);
-        }
-    }
-
-    /* TODO: Infobits for Events? */
-    return UA_STATUSCODE_GOOD;
-}
-
 #define ABS_SUBTRACT_TYPE_INDEPENDENT(a,b) ((a)>(b)?(a)-(b):(b)-(a))
 
 static UA_INLINE UA_Boolean
@@ -310,7 +86,6 @@ updateNeededForFilteredValue(const UA_Variant *value, const UA_Variant *oldValue
     return false;
 }
 
-
 /* When a change is detected, encoding contains the heap-allocated binary encoded value */
 static UA_Boolean
 detectValueChangeWithFilter(UA_Server *server, UA_MonitoredItem *mon, UA_DataValue *value,
@@ -526,7 +301,8 @@ UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredIt
     rvid.nodeId = monitoredItem->monitoredNodeId;
     rvid.attributeId = monitoredItem->attributeId;
     rvid.indexRange = monitoredItem->indexRange;
-    UA_DataValue value = UA_Server_readWithSession(server, session, &rvid, monitoredItem->timestampsToReturn);
+    UA_DataValue value = UA_Server_readWithSession(server, session, &rvid,
+                                                   monitoredItem->timestampsToReturn);
 
     /* Operate on the sample */
     UA_Boolean storedValue = sampleCallbackWithValue(server, monitoredItem, &value);
@@ -536,29 +312,4 @@ UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *monitoredIt
         UA_DataValue_deleteMembers(&value);
 }
 
-UA_StatusCode
-UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
-    if(mon->sampleCallbackIsRegistered)
-        return UA_STATUSCODE_GOOD;
-
-    /* Only DataChange MonitoredItems have a callback with a sampling interval */
-    if(mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY)
-        return UA_STATUSCODE_GOOD;
-
-    UA_StatusCode retval =
-        UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback,
-                                      mon, (UA_UInt32)mon->samplingInterval, &mon->sampleCallbackId);
-    if(retval == UA_STATUSCODE_GOOD)
-        mon->sampleCallbackIsRegistered = true;
-    return retval;
-}
-
-UA_StatusCode
-UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
-    if(!mon->sampleCallbackIsRegistered)
-        return UA_STATUSCODE_GOOD;
-    mon->sampleCallbackIsRegistered = false;
-    return UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
-}
-
 #endif /* UA_ENABLE_SUBSCRIPTIONS */

+ 25 - 0
src/server/ua_subscription_events.c

@@ -10,6 +10,31 @@
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
 
+UA_StatusCode
+UA_MonitoredItem_removeNodeEventCallback(UA_Server *server, UA_Session *session,
+                                         UA_Node *node, void *data) {
+    /* data is the monitoredItemID */
+    /* catch edge case that it's the first element */
+    if (data == ((UA_ObjectNode *) node)->monitoredItemQueue) {
+        ((UA_ObjectNode *)node)->monitoredItemQueue = ((UA_MonitoredItem *)data)->next;
+        return UA_STATUSCODE_GOOD;
+    }
+
+    /* SLIST_FOREACH */
+    for (UA_MonitoredItem *entry = ((UA_ObjectNode *) node)->monitoredItemQueue->next;
+         entry != NULL; entry=entry->next) {
+        if (entry == (UA_MonitoredItem *)data) {
+            /* SLIST_REMOVE */
+            UA_MonitoredItem *iter = ((UA_ObjectNode *) node)->monitoredItemQueue;
+            for (; iter->next != entry; iter=iter->next) {}
+            iter->next = entry->next;
+            UA_free(entry);
+            break;
+        }
+    }
+    return UA_STATUSCODE_GOOD;
+}
+
 typedef struct Events_nodeListElement {
     LIST_ENTRY(Events_nodeListElement) listEntry;
     UA_NodeId nodeId;

+ 245 - 0
src/server/ua_subscription_monitoreditem.c

@@ -0,0 +1,245 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. 
+ *
+ *    Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
+ *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
+ *    Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
+ *    Copyright 2018 (c) Thomas Stalder, Blue Time Concept SA
+ *    Copyright 2018 (c) Fabian Arndt, Root-Core
+ */
+
+#include "ua_server_internal.h"
+#include "ua_subscription.h"
+
+#ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
+
+void
+UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub) {
+    memset(mon, 0, sizeof(UA_MonitoredItem));
+    mon->subscription = sub;
+    TAILQ_INIT(&mon->queue);
+}
+
+void
+UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+    if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+        /* Remove the sampling callback */
+        UA_MonitoredItem_unregisterSampleCallback(server, monitoredItem);
+    } else if (monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
+        /* TODO: Access val data.event */
+        UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "MonitoredItemTypes other than ChangeNotify or EventNotify "
+                     "are not supported yet");
+    }
+
+    /* Remove the queued notifications if attached to a subscription */
+    if(monitoredItem->subscription) {
+        UA_Subscription *sub = monitoredItem->subscription;
+        UA_Notification *notification, *notification_tmp;
+        TAILQ_FOREACH_SAFE(notification, &monitoredItem->queue,
+                           listEntry, notification_tmp) {
+            /* Remove the item from the queues and free the memory */
+            UA_Notification_delete(sub, monitoredItem, notification);
+        }
+    }
+
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+    if(monitoredItem->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
+        /* Remove the monitored item from the node queue */
+        UA_Server_editNode(server, NULL, &monitoredItem->monitoredNodeId,
+                           UA_MonitoredItem_removeNodeEventCallback, monitoredItem);
+        /* Delete the event filter */
+        UA_EventFilter_deleteMembers(&monitoredItem->filter.eventFilter);
+    }
+#endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
+
+    /* Deregister MonitoredItem in userland */
+    if(server->config.monitoredItemRegisterCallback && monitoredItem->registered) {
+        /* Get the session context. Local MonitoredItems don't have a subscription. */
+        UA_Session *session = NULL;
+        if(monitoredItem->subscription)
+            session = monitoredItem->subscription->session;
+        if(!session)
+            session = &server->adminSession;
+
+        /* Get the node context */
+        void *targetContext = NULL;
+        UA_Server_getNodeContext(server, monitoredItem->monitoredNodeId, &targetContext);
+
+        /* Deregister */
+        server->config.monitoredItemRegisterCallback(server, &session->sessionId,
+                                                     session->sessionHandle, &monitoredItem->monitoredNodeId,
+                                                     targetContext, monitoredItem->attributeId, true);
+    }
+
+    /* Remove the monitored item */
+    if(monitoredItem->listEntry.le_prev != NULL)
+        LIST_REMOVE(monitoredItem, listEntry);
+    UA_String_deleteMembers(&monitoredItem->indexRange);
+    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
+    UA_Variant_deleteMembers(&monitoredItem->lastValue);
+    UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
+    UA_Server_delayedFree(server, monitoredItem);
+}
+
+UA_StatusCode
+UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon) {
+    if(mon->queueSize - mon->eventOverflows <= mon->maxQueueSize)
+        return UA_STATUSCODE_GOOD;
+
+    /* Remove notifications until the queue size is reached */
+    UA_Subscription *sub = mon->subscription;
+    while(mon->queueSize - mon->eventOverflows > mon->maxQueueSize) {
+        UA_assert(mon->queueSize >= 2); /* At least two Notifications in the queue */
+
+        /* Make sure that the MonitoredItem does not lose its place in the
+         * global queue when notifications are removed. Otherwise the
+         * MonitoredItem can "starve" itself by putting new notifications always
+         * at the end of the global queue and removing the old ones.
+         *
+         * - If the oldest notification is removed, put the second oldest
+         *   notification right behind it.
+         * - If the newest notification is removed, put the new notification
+         *   right behind it. */
+
+        UA_Notification *del; /* The notification that will be deleted */
+        UA_Notification *after_del; /* The notification to keep and move after del */
+        if(mon->discardOldest) {
+            /* Remove the oldest */
+            del = TAILQ_FIRST(&mon->queue);
+            after_del = TAILQ_NEXT(del, listEntry);
+        } else {
+            /* Remove the second newest (to keep the up-to-date notification) */
+            after_del = TAILQ_LAST(&mon->queue, NotificationQueue);
+            del = TAILQ_PREV(after_del, NotificationQueue, listEntry);
+        }
+
+        /* Move after_del right after del in the global queue */
+        TAILQ_REMOVE(&sub->notificationQueue, after_del, globalEntry);
+        TAILQ_INSERT_AFTER(&sub->notificationQueue, del, after_del, globalEntry);
+
+#ifdef UA_ENABLE_SUBSCRIPTIONS_EVENTS
+        /* Create an overflow notification */
+         if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_EVENTNOTIFY) {
+             /* check if an overflowEvent is being deleted
+              * TODO: make sure overflowEvents are never deleted */
+             UA_NodeId overflowBaseId = UA_NODEID_NUMERIC(0, UA_NS0ID_EVENTQUEUEOVERFLOWEVENTTYPE);
+             UA_NodeId overflowId = UA_NODEID_NUMERIC(0, UA_NS0ID_SIMPLEOVERFLOWEVENTTYPE);
+
+             /* Check if an OverflowEvent is being deleted */
+             if(del->data.event.fields.eventFieldsSize == 1 &&
+                 del->data.event.fields.eventFields[0].type == &UA_TYPES[UA_TYPES_NODEID] &&
+                 isNodeInTree(&server->config.nodestore,
+                              (UA_NodeId*)del->data.event.fields.eventFields[0].data,
+                              &overflowBaseId, &subtypeId, 1)) {
+                 /* Don't do anything, since adding and removing an overflow will not change anything */
+                 return UA_STATUSCODE_GOOD;
+             }
+
+             /* cause an overflowEvent */
+             /* an overflowEvent does not care about event filters and as such
+              * will not be "triggered" correctly. Instead, a notification will
+              * be inserted into the queue which includes only the nodeId of the
+              * overflowEventType. It is up to the client to check for possible
+              * overflows. */
+             UA_Notification *overflowNotification = (UA_Notification*)UA_malloc(sizeof(UA_Notification));
+             if(!overflowNotification)
+                 return UA_STATUSCODE_BADOUTOFMEMORY;
+
+             UA_EventFieldList_init(&overflowNotification->data.event.fields);
+             overflowNotification->data.event.fields.eventFields = UA_Variant_new();
+             if(!overflowNotification->data.event.fields.eventFields) {
+                 UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
+                 UA_free(overflowNotification);
+                 return UA_STATUSCODE_BADOUTOFMEMORY;
+             }
+
+             overflowNotification->data.event.fields.eventFieldsSize = 1;
+             UA_StatusCode retval =
+                 UA_Variant_setScalarCopy(overflowNotification->data.event.fields.eventFields,
+                                          &overflowId, &UA_TYPES[UA_TYPES_NODEID]);
+             if (retval != UA_STATUSCODE_GOOD) {
+                 UA_EventFieldList_deleteMembers(&overflowNotification->data.event.fields);
+                 UA_free(overflowNotification);
+                 return retval;
+             }
+
+             overflowNotification->mon = mon;
+             if(mon->discardOldest) {
+                 TAILQ_INSERT_HEAD(&mon->queue, overflowNotification, listEntry);
+                 TAILQ_INSERT_HEAD(&mon->subscription->notificationQueue,
+                                   overflowNotification, globalEntry);
+             } else {
+                 TAILQ_INSERT_TAIL(&mon->queue, overflowNotification, listEntry);
+                 TAILQ_INSERT_TAIL(&mon->subscription->notificationQueue,
+                                   overflowNotification, globalEntry);
+             }
+
+
+             /* The amount of notifications in the subscription don't change. The specification
+              * only states that the queue size in each MonitoredItem isn't affected by OverflowEvents.
+              * Since they are reduced in Notification_delete the queues are increased here, so they
+              * will remain the same in the end.
+              */
+             ++sub->notificationQueueSize;
+             ++sub->eventNotifications;
+         }
+#endif /* UA_ENABLE_SUBSCRIPTIONS_EVENTS */
+
+        /* Delete the notification. This also removes the notification from the
+         * linked lists. */
+        UA_Notification_delete(sub, mon, del);
+    }
+
+    if(mon->monitoredItemType == UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+        /* Get the element that carries the infobits */
+        UA_Notification *notification = NULL;
+        if(mon->discardOldest)
+            notification = TAILQ_FIRST(&mon->queue);
+        else
+            notification = TAILQ_LAST(&mon->queue, NotificationQueue);
+        UA_assert(notification);
+
+        if(mon->maxQueueSize > 1) {
+            /* Add the infobits either to the newest or the new last entry */
+            notification->data.value.hasStatus = true;
+            notification->data.value.status |= (UA_STATUSCODE_INFOTYPE_DATAVALUE |
+                                                UA_STATUSCODE_INFOBITS_OVERFLOW);
+        } else {
+            /* If the queue size is reduced to one, remove the infobits */
+            notification->data.value.status &= ~(UA_StatusCode)(UA_STATUSCODE_INFOTYPE_DATAVALUE |
+                                                                UA_STATUSCODE_INFOBITS_OVERFLOW);
+        }
+    }
+
+    /* TODO: Infobits for Events? */
+    return UA_STATUSCODE_GOOD;
+}
+
+UA_StatusCode
+UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
+    if(mon->sampleCallbackIsRegistered)
+        return UA_STATUSCODE_GOOD;
+
+    /* Only DataChange MonitoredItems have a callback with a sampling interval */
+    if(mon->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY)
+        return UA_STATUSCODE_GOOD;
+
+    UA_StatusCode retval =
+        UA_Server_addRepeatedCallback(server, (UA_ServerCallback)UA_MonitoredItem_sampleCallback,
+                                      mon, (UA_UInt32)mon->samplingInterval, &mon->sampleCallbackId);
+    if(retval == UA_STATUSCODE_GOOD)
+        mon->sampleCallbackIsRegistered = true;
+    return retval;
+}
+
+UA_StatusCode
+UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
+    if(!mon->sampleCallbackIsRegistered)
+        return UA_STATUSCODE_GOOD;
+    mon->sampleCallbackIsRegistered = false;
+    return UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
+}
+
+#endif /* UA_ENABLE_SUBSCRIPTIONS */