Browse Source

Enabled Notification Queues for subscriptions and KeepAlive message generation on Publish.

ichrispa 9 years ago
parent
commit
fd936aee3e

+ 68 - 6
src/server/ua_services_subscription.c

@@ -6,7 +6,6 @@
 #include "ua_util.h"
 #include "ua_nodestore.h"
 
-
 #include "ua_log.h" // Remove later, debugging only
 
 #define UA_BOUNDEDVALUE_SETWBOUNDS(BOUNDS, SRC, DST) { \
@@ -37,7 +36,7 @@ UA_Int32 Service_CreateSubscription(UA_Server *server, UA_Session *session,
     newSubscription->LifeTime = (UA_UInt32_BoundedValue)  { .minValue=session->subscriptionManager.GlobalLifeTimeCount.minValue, .maxValue=session->subscriptionManager.GlobalLifeTimeCount.maxValue, .currentValue=response->revisedLifetimeCount};
     
     UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalKeepAliveCount, request->requestedMaxKeepAliveCount, response->revisedMaxKeepAliveCount);
-    newSubscription->KeepAliveCount = (UA_UInt32_BoundedValue)  { .minValue=session->subscriptionManager.GlobalKeepAliveCount.minValue, .maxValue=session->subscriptionManager.GlobalKeepAliveCount.maxValue, .currentValue=response->revisedMaxKeepAliveCount};
+    newSubscription->KeepAliveCount = (UA_Int32_BoundedValue)  { .minValue=session->subscriptionManager.GlobalKeepAliveCount.minValue, .maxValue=session->subscriptionManager.GlobalKeepAliveCount.maxValue, .currentValue=response->revisedMaxKeepAliveCount};
     
     newSubscription->NotificationsPerPublish = request->maxNotificationsPerPublish;
     newSubscription->PublishingMode          = request->publishingEnabled;
@@ -96,7 +95,7 @@ UA_Int32 Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
             thisItemsResult->statusCode = UA_STATUSCODE_GOOD;
             
             newMon = UA_MonitoredItem_new();
-            memcpy(&(newMon->ItemNodeId), &(thisItemsRequest->itemToMonitor.nodeId), sizeof(UA_NodeId));
+            newMon->monitoredNode = UA_NodeStore_get(server->nodestore, (const UA_NodeId *) &(thisItemsRequest->itemToMonitor.nodeId));
             newMon->ItemId = ++(session->subscriptionManager.LastSessionID);
             thisItemsResult->monitoredItemId = newMon->ItemId;
             
@@ -106,7 +105,9 @@ UA_Int32 Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
             newMon->SamplingInterval = thisItemsResult->revisedSamplingInterval;
             
             UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalQueueSize, thisItemsRequest->requestedParameters.queueSize, thisItemsResult->revisedQueueSize);
-            newMon->QueueSize = thisItemsResult->revisedQueueSize;
+            newMon->QueueSize = (UA_UInt32_BoundedValue) { .maxValue=(thisItemsResult->revisedQueueSize) + 1, .minValue=0, .currentValue=0 };
+            
+            newMon->DiscardOldest = thisItemsRequest->requestedParameters.discardOldest;
             
             LIST_INSERT_HEAD(sub->MonitoredItems, newMon, listEntry);
         }
@@ -119,13 +120,35 @@ UA_Int32 Service_Publish(UA_Server *server, UA_Session *session,
                          const UA_PublishRequest *request,
                          UA_PublishResponse *response) {
     
-    // Verify Session
+    UA_Subscription  *sub;
+    //UA_MonitoredItem *mon;
+    UA_SubscriptionManager *manager;
+    
+    // Verify Session and Subscription
     response->responseHeader.serviceResult = UA_STATUSCODE_GOOD;
     if (session == NULL ) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;           
     else if ( session->channel == NULL || session->activated == UA_FALSE) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONNOTACTIVATED;
     if ( response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) return 0;
+
+    manager = &(session->subscriptionManager);    
+    for (sub=(manager->ServerSubscriptions)->lh_first; sub != NULL; sub = sub->listEntry.le_next) {
+        Subscription_updateNotifications(sub);
+        /*
+        printf("Publish: Session Subscription %i\n", sub->SubscriptionID);
+        for(mon=sub->MonitoredItems->lh_first; mon != NULL; mon=mon->listEntry.le_next) {
+            printf("Publish: Session Subscription %i, Monitored Item %i\n", sub->SubscriptionID, mon->ItemId);
+            //MonitoredItem_QueuePushDataValue(mon);
+            printf("QueueSize: %i, %i\n", mon->QueueSize.currentValue, mon->QueueSize.maxValue);
+        }
+        */
+    }
     
-    // FIXME
+    response->subscriptionId = 0;
+    response->availableSequenceNumbersSize = 0;
+    response->availableSequenceNumbers = 0;
+    response->moreNotifications = UA_FALSE;
+    response->resultsSize = 0;
+
     response->responseHeader.serviceResult = UA_STATUSCODE_BADSERVICEUNSUPPORTED;
     return 0;
 }
@@ -155,4 +178,43 @@ UA_Int32 Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
     return 0;
 } 
 
+UA_Int32 Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
+                                      const UA_DeleteMonitoredItemsRequest *request,
+                                      UA_DeleteMonitoredItemsResponse *response) {
+    UA_SubscriptionManager *manager;
+    UA_Subscription *sub;
+    UA_Int32 *resultCodes;
+    
+    // Verify Session
+    response->responseHeader.serviceResult = UA_STATUSCODE_GOOD;
+    if (session == NULL ) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;           
+    else if ( session->channel == NULL || session->activated == UA_FALSE) response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONNOTACTIVATED;
+    if ( response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) return 0;
+    
+    response->diagnosticInfosSize=0;
+    response->resultsSize=0;
+    
+    manager = &(session->subscriptionManager);
+    
+    sub = SubscriptionManager_getSubscriptionByID(manager, request->subscriptionId);
+    
+    if (sub == NULL) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+        return 0;
+    }
+    
+    resultCodes = (UA_Int32 *) malloc(sizeof(UA_UInt32) * request->monitoredItemIdsSize);
+    if (resultCodes == NULL) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
+        return 0;
+    }
+
+    response->results = (UA_StatusCode *) resultCodes;
+    response->resultsSize = request->monitoredItemIdsSize;
+    for(int i=0; i < request->monitoredItemIdsSize; i++) {
+        resultCodes[i] = SubscriptionManager_deleteMonitoredItem(manager, sub->SubscriptionID, (request->monitoredItemIds)[i]);
+    }
+    
+    return 0;
+}
 #endif //#ifdef ENABLESUBSCRIPTIONS

+ 218 - 7
src/server/ua_subscription_manager.c

@@ -28,14 +28,22 @@ UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID) {
     UA_Subscription *new = (UA_Subscription *) malloc(sizeof(UA_Subscription));
     
     new->SubscriptionID = SubscriptionID;
+    new->LastPublished  = 0;
+    new->SequenceNumber = 0;
     new->MonitoredItems = (UA_ListOfUAMonitoredItems *) malloc (sizeof(UA_ListOfUAMonitoredItems));
     LIST_INIT(new->MonitoredItems);
-    
+    new->unpublishedNotifications = (UA_ListOfUnpublishedNotifications *) malloc(sizeof(UA_ListOfUnpublishedNotifications));
+    LIST_INIT(new->unpublishedNotifications);
     return new;
 }
 
 UA_MonitoredItem *UA_MonitoredItem_new() {
     UA_MonitoredItem *new = (UA_MonitoredItem *) malloc(sizeof(UA_MonitoredItem));
+    new->queue       = (UA_ListOfQueuedDataValues *) malloc (sizeof(UA_ListOfQueuedDataValues));
+    new->QueueSize   = (UA_UInt32_BoundedValue) { .minValue = 0, .maxValue = 0, .currentValue = 0};
+    new->LastSampled = 0;
+    
+    LIST_INIT(new->queue);
     
     return new;
 }
@@ -59,29 +67,232 @@ UA_Subscription *SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager
     return retsub;
 }
 
+UA_Int32 SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID, UA_UInt32 MonitoredItemID) {
+    UA_Subscription *sub;
+    UA_MonitoredItem *mon;
+    
+    if (manager == NULL) return UA_STATUSCODE_BADINTERNALERROR;
+    
+    sub = SubscriptionManager_getSubscriptionByID(manager, SubscriptionID);
+    if (sub == NULL) return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+    
+    for(mon=(sub->MonitoredItems)->lh_first; mon != NULL; mon=(mon->listEntry).le_next) {
+        if (mon->ItemId == MonitoredItemID) {
+            MonitoredItem_delete(mon);
+            return UA_STATUSCODE_GOOD;
+        }
+    }
+    
+    return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
+}
+
+void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
+    if (monitoredItem == NULL) return;
+    
+    // Delete Queued Data
+    MonitoredItem_ClearQueue(monitoredItem);
+    // Remove from subscription list
+    LIST_REMOVE(monitoredItem, listEntry);
+    UA_free(monitoredItem);
+    
+    return;
+}
+
 UA_Int32 SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID) {
     UA_Subscription  *sub;
     UA_MonitoredItem *mon;
+    UA_unpublishedNotification *notify;
     
     sub = SubscriptionManager_getSubscriptionByID(manager, SubscriptionID);    
     if (sub != NULL) {
+        // Delete registered subscriptions
         while (sub->MonitoredItems->lh_first != NULL)  {
            mon = sub->MonitoredItems->lh_first;
-           LIST_REMOVE(sub->MonitoredItems->lh_first, listEntry);
-           free(mon);
+           // Delete Sampled data
+           MonitoredItem_delete(mon);
         }
     }
     else {
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
     }
+    
+    // Delete queued notification messages
+    notify = sub->unpublishedNotifications->lh_first;
+    while (sub->unpublishedNotifications->lh_first != NULL)  {
+       notify = sub->unpublishedNotifications->lh_first;
+       LIST_REMOVE(notify, listEntry);
+       UA_free(notify);
+    }
+    
     LIST_REMOVE(sub, listEntry);
-    free(sub);
+    UA_free(sub);
     
     return UA_STATUSCODE_GOOD;
 } 
 
-// void MonitoredItem_UpdateQueue(UA_MonitoredItem *monitoredItem) {
-    //
-//}
+int Subscription_queuedNotifications(UA_Subscription *subscription) {
+    int j = 0;
+    if (subscription == NULL) return 0;
+    
+    for(UA_unpublishedNotification *i = subscription->unpublishedNotifications->lh_first; i != NULL; i=(i->listEntry).le_next) j++;
+    
+    return j;
+}
+    
+void Subscription_updateNotifications(UA_Subscription *subscription) {
+    UA_MonitoredItem *mon;
+    //MonitoredItem_queuedValue *queuedValue;
+    UA_unpublishedNotification *msg = NULL;
+    UA_Int32 monItemsWithData = 0;
+    
+    if (subscription == NULL) return;
+    if ((subscription->LastPublished + subscription->PublishingInterval) > UA_DateTime_now()) return;
+       
+    // Check if any MonitoredItem Queues hold data
+    for(mon=subscription->MonitoredItems->lh_first; mon!= NULL; mon=mon->listEntry.le_next) {
+        if (mon->queue->lh_first != NULL) {
+            monItemsWithData++;
+        }
+    }
+    
+    // FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
+    if (Subscription_queuedNotifications(subscription) >= 10) {
+        // Remove last entry
+        for(msg = subscription->unpublishedNotifications->lh_first; (msg->listEntry).le_next != NULL; msg=(msg->listEntry).le_next);
+        LIST_REMOVE(msg, listEntry);
+        UA_free(msg);
+    }
+    
+    if (monItemsWithData == 0) {
+        // Decrement KeepAlive
+        subscription->KeepAliveCount.currentValue--;
+        // +- Generate KeepAlive msg if counter overruns
+        if (subscription->KeepAliveCount.currentValue < subscription->KeepAliveCount.minValue) {
+            msg = (UA_unpublishedNotification *) malloc(sizeof(UA_unpublishedNotification));
+            msg->notification.sequenceNumber = (subscription->SequenceNumber)++;
+            msg->notification.publishTime    = UA_DateTime_now();
+            msg->notification.notificationDataSize = monItemsWithData;
+            msg->notification.sequenceNumber = subscription->SequenceNumber++;
+            msg->notification.notificationDataSize = 0;
+            
+            LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
+            subscription->KeepAliveCount.currentValue = subscription->KeepAliveCount.maxValue;
+        }
+        
+        return;
+    }
+    
+    // One or more MonitoredItems hold data -> create a new NotificationMessage
+    printf("UpdateNotification: creating NotificationMessage for %i items holding data\n", msg->notification.notificationDataSize);
+    
+    // +- Create Array of NotificationData
+    // +- Clear Queue
+    
+    // Fill the NotificationMessage with NotificationData
+    for(mon=subscription->MonitoredItems->lh_first; mon!= NULL; mon=mon->listEntry.le_next) {
+        if (mon->queue->lh_first != NULL) {
+            printf("UpdateNotification: monitored %i\n", mon->ItemId);
+            if (msg == NULL) {
+                
+            }
+        }
+    }
+    return;
+}
+
+void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
+    if (monitoredItem == NULL) return;
+    while(monitoredItem->queue->lh_first != NULL) {
+        LIST_REMOVE(monitoredItem->queue->lh_first, listEntry);
+    }
+    
+    return;
+}
+
+void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem) {
+    MonitoredItem_queuedValue *newvalue, *queueItem;
+    
+    if (monitoredItem == NULL) return;
+    if( (monitoredItem->LastSampled + monitoredItem->SamplingInterval) > UA_DateTime_now()) return;
+    
+    newvalue = (MonitoredItem_queuedValue *) malloc(sizeof(MonitoredItem_queuedValue));
+    
+    // Create new Value
+    switch(monitoredItem->AttributeID) {
+        UA_ATTRIBUTEID_NODEID:
+            UA_Variant_setScalarCopy(&(newvalue->value), (const UA_NodeId *) &((const UA_Node *) monitoredItem->monitoredNode)->nodeId, &UA_TYPES[UA_TYPES_NODEID]);
+        break;
+        UA_ATTRIBUTEID_NODECLASS:
+            UA_Variant_setScalarCopy(&(newvalue->value), (const UA_Int32 *) &((const UA_Node *) monitoredItem->monitoredNode)->nodeClass, &UA_TYPES[UA_TYPES_INT32]);
+        break;
+        UA_ATTRIBUTEID_BROWSENAME:
+            UA_Variant_setScalarCopy(&(newvalue->value), (const UA_String *) &((const UA_Node *) monitoredItem->monitoredNode)->browseName, &UA_TYPES[UA_TYPES_STRING]);
+        break;
+        UA_ATTRIBUTEID_DISPLAYNAME:
+            UA_Variant_setScalarCopy(&(newvalue->value), (const UA_String *) &((const UA_Node *) monitoredItem->monitoredNode)->displayName, &UA_TYPES[UA_TYPES_STRING]);
+        break;
+        UA_ATTRIBUTEID_DESCRIPTION:
+            UA_Variant_setScalarCopy(&(newvalue->value), (const UA_String *) &((const UA_Node *) monitoredItem->monitoredNode)->description, &UA_TYPES[UA_TYPES_STRING]);
+        break;
+        UA_ATTRIBUTEID_WRITEMASK:
+        break;
+        UA_ATTRIBUTEID_USERWRITEMASK:
+        break;
+        UA_ATTRIBUTEID_ISABSTRACT:
+        break;
+        UA_ATTRIBUTEID_SYMMETRIC:
+        break;
+        UA_ATTRIBUTEID_INVERSENAME:
+        break;
+        UA_ATTRIBUTEID_CONTAINSNOLOOPS:
+        break;
+        UA_ATTRIBUTEID_EVENTNOTIFIER:
+        break;
+        UA_ATTRIBUTEID_VALUE: 
+            if (((const UA_Node *) monitoredItem->monitoredNode)->nodeClass == UA_NODECLASS_VARIABLE) {
+                UA_Variant_copy( (const UA_Variant *) &((const UA_VariableNode *) monitoredItem->monitoredNode)->value, &(newvalue->value));
+            }
+        break;
+        UA_ATTRIBUTEID_DATATYPE:
+        break;
+        UA_ATTRIBUTEID_VALUERANK:
+        break;
+        UA_ATTRIBUTEID_ARRAYDIMENSIONS:
+        break;
+        UA_ATTRIBUTEID_ACCESSLEVEL:
+        break;
+        UA_ATTRIBUTEID_USERACCESSLEVEL:
+        break;
+        UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
+        break;
+        UA_ATTRIBUTEID_HISTORIZING:
+        break;
+        UA_ATTRIBUTEID_EXECUTABLE:
+        break;
+        UA_ATTRIBUTEID_USEREXECUTABLE:
+        break;
+        default:
+        break;
+    }
+    
+    printf("MonitoredItem AddValue: %i,%i\n", (monitoredItem->QueueSize).currentValue, (monitoredItem->QueueSize).maxValue);
+    if ((monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
+        if (monitoredItem->DiscardOldest == UA_TRUE && monitoredItem->queue->lh_first != NULL ) {
+            for(queueItem = monitoredItem->queue->lh_first; queueItem->listEntry.le_next != NULL; queueItem = queueItem->listEntry.le_next) {}
+
+            LIST_REMOVE(queueItem, listEntry);
+            UA_free(queueItem);
+            (monitoredItem->QueueSize).currentValue--;
+        }
+        else {
+            UA_free(newvalue);
+        }
+    }
+    if ((monitoredItem->QueueSize).currentValue < (monitoredItem->QueueSize).maxValue) {
+        LIST_INSERT_HEAD(monitoredItem->queue, newvalue, listEntry);
+        (monitoredItem->QueueSize).currentValue++;
+    }
+    return;
+}
 
 #endif //#ifdef ENABLESUBSCRIPTIONS

+ 56 - 30
src/server/ua_subscription_manager.h

@@ -5,6 +5,7 @@
 #include "ua_server.h"
 #include "ua_types.h"
 #include "queue.h"
+#include "ua_nodestore.h"
 
 typedef struct {
     UA_Int32 currentValue;
@@ -18,52 +19,77 @@ typedef struct {
     UA_UInt32 maxValue;
 } UA_UInt32_BoundedValue;
 
+typedef struct MonitoredItem_queuedValue_s {
+    UA_Variant value;
+    LIST_ENTRY(MonitoredItem_queuedValue_s) listEntry;
+} MonitoredItem_queuedValue;
+
+typedef LIST_HEAD(UA_ListOfQueuedDataValues_s, MonitoredItem_queuedValue_s) UA_ListOfQueuedDataValues;
 typedef struct UA_MonitoredItem_s {
-    UA_UInt32  ItemId;
-    UA_UInt32  TimestampsToReturn;
-    UA_UInt32  MonitoringMode;
-    UA_NodeId  ItemNodeId;
-    UA_UInt32  AttributeID;
-    UA_UInt32  ClientHandle;
-    UA_UInt32  SamplingInterval;
-    UA_UInt32  QueueSize;
-    UA_Boolean DiscardOldest;
+    UA_UInt32                       ItemId;
+    UA_UInt32                       TimestampsToReturn;
+    UA_UInt32                       MonitoringMode;
+    const void                      *monitoredNode; // Pointer to a node of any type
+    UA_UInt32                       AttributeID;
+    UA_UInt32                       ClientHandle;
+    UA_UInt32                       SamplingInterval;
+    UA_DateTime                     LastSampled;
+    UA_UInt32_BoundedValue          QueueSize;
+    UA_Boolean                      DiscardOldest;
     // FIXME: indexRange is ignored; array values default to element 0
     // FIXME: dataEncoding is hardcoded to UA binary
-    LIST_ENTRY(UA_MonitoredItem_s) listEntry;
+    LIST_ENTRY(UA_MonitoredItem_s)  listEntry;
+    UA_ListOfQueuedDataValues       *queue;
 } UA_MonitoredItem;
 
+typedef struct UA_unpublishedNotification_s {
+    UA_NotificationMessage notification;
+    LIST_ENTRY(UA_unpublishedNotification_s) listEntry;
+} UA_unpublishedNotification;
+
 typedef LIST_HEAD(UA_ListOfUAMonitoredItems_s, UA_MonitoredItem_s) UA_ListOfUAMonitoredItems;
+typedef LIST_HEAD(UA_ListOfUnpublishedNotifications_s, UA_unpublishedNotification_s) UA_ListOfUnpublishedNotifications;
 typedef struct UA_Subscription_s {
-    UA_UInt32_BoundedValue LifeTime;
-    UA_UInt32_BoundedValue KeepAliveCount;
-    UA_Int32   PublishingInterval;
-    UA_Int32   SubscriptionID;
-    UA_Int32   NotificationsPerPublish;
-    UA_Boolean PublishingMode;
-    UA_UInt32  Priority;
-    LIST_ENTRY(UA_Subscription_s) listEntry;
-    UA_ListOfUAMonitoredItems *MonitoredItems;
+    UA_UInt32_BoundedValue              LifeTime;
+    UA_Int32_BoundedValue               KeepAliveCount;
+    UA_DateTime                         PublishingInterval;
+    UA_DateTime                         LastPublished;
+    UA_Int32                            SubscriptionID;
+    UA_Int32                            NotificationsPerPublish;
+    UA_Boolean                          PublishingMode;
+    UA_UInt32                           Priority;
+    UA_UInt32                           SequenceNumber;
+    LIST_ENTRY(UA_Subscription_s)       listEntry;
+    UA_ListOfUnpublishedNotifications   *unpublishedNotifications;
+    UA_ListOfUAMonitoredItems           *MonitoredItems;
 } UA_Subscription;
 
 typedef LIST_HEAD(UA_ListOfUASubscriptions_s, UA_Subscription_s) UA_ListOfUASubscriptions;
 typedef struct UA_SubscriptionManager_s {
-    UA_Int32_BoundedValue  GlobalPublishingInterval;
-    UA_UInt32_BoundedValue GlobalLifeTimeCount;
-    UA_UInt32_BoundedValue GlobalKeepAliveCount;
-    UA_Int32_BoundedValue  GlobalNotificationsPerPublish;
-    UA_UInt32_BoundedValue GlobalSamplingInterval;
-    UA_UInt32_BoundedValue GlobalQueueSize;
-    UA_Int32 LastSessionID;
+    UA_Int32_BoundedValue    GlobalPublishingInterval;
+    UA_UInt32_BoundedValue   GlobalLifeTimeCount;
+    UA_UInt32_BoundedValue   GlobalKeepAliveCount;
+    UA_Int32_BoundedValue    GlobalNotificationsPerPublish;
+    UA_UInt32_BoundedValue   GlobalSamplingInterval;
+    UA_UInt32_BoundedValue   GlobalQueueSize;
+    UA_Int32                 LastSessionID;
     UA_ListOfUASubscriptions *ServerSubscriptions;
 } UA_SubscriptionManager;
 
-void SubscriptionManager_init(UA_Session *session);
-UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID);
-void SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *subscription);
+void            SubscriptionManager_init(UA_Session *session);
+void            SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *subscription);
 UA_Subscription *SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID);
-UA_Int32  SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID);
+UA_Int32        SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID);
+UA_Int32        SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID, UA_UInt32 MonitoredItemID);
+
+UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID);
+void            Subscription_updateNotifications(UA_Subscription *subscription);
+int             Subscription_queuedNotifications(UA_Subscription *subscription);
+
 UA_MonitoredItem *UA_MonitoredItem_new(void);
+void             MonitoredItem_delete(UA_MonitoredItem *monitoredItem);
+void             MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem);
+void             MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem);
 
 #endif  // ifndef... define UA_SUBSCRIPTION_MANAGER_H_
 #endif  // ifdef EnableSubscriptions ...