Browse Source

split out ua_subscription.c/.h

Julius Pfrommer 9 years ago
parent
commit
47269ea3b8

+ 1 - 0
CMakeLists.txt

@@ -100,6 +100,7 @@ set(lib_sources ${PROJECT_SOURCE_DIR}/src/ua_types.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_services_nodemanagement.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_services_view.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_services_subscription.c
+                ${PROJECT_SOURCE_DIR}/src/server/ua_subscription.c
                 ${PROJECT_SOURCE_DIR}/src/server/ua_subscription_manager.c
                 ${PROJECT_SOURCE_DIR}/src/client/ua_client.c
                 ${PROJECT_SOURCE_DIR}/examples/networklayer_tcp.c 

+ 8 - 8
src/server/ua_services_subscription.c

@@ -97,11 +97,11 @@ void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
             UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalQueueSize, thisItemsRequest->requestedParameters.queueSize, thisItemsResult->revisedQueueSize);
             newMon->QueueSize = (UA_UInt32_BoundedValue) { .maxValue=(thisItemsResult->revisedQueueSize) + 1, .minValue=0, .currentValue=0 };
             newMon->AttributeID = thisItemsRequest->itemToMonitor.attributeId;
-            newMon->MonitoredItemType = MONITOREDITEM_CHANGENOTIFY_T;
+            newMon->MonitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
 
             newMon->DiscardOldest = thisItemsRequest->requestedParameters.discardOldest;
             
-            LIST_INSERT_HEAD(sub->MonitoredItems, newMon, listEntry);
+            LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
         }
     }
 }
@@ -146,11 +146,11 @@ void Service_Publish(UA_Server *server, UA_Session *session,
     for (sub=(manager->ServerSubscriptions)->lh_first; sub != NULL; sub = sub->listEntry.le_next) {
 	
         // FIXME: We are forcing a value update for monitored items. This should be done by the event system.
-        if (sub->MonitoredItems->lh_first != NULL) {  
-	  for(mon=sub->MonitoredItems->lh_first; mon != NULL; mon=mon->listEntry.le_next) {
-            MonitoredItem_QueuePushDataValue(mon);
-	  }
-	}
+        if (sub->MonitoredItems.lh_first != NULL) {  
+            for(mon=sub->MonitoredItems.lh_first; mon != NULL; mon=mon->listEntry.le_next) {
+                MonitoredItem_QueuePushDataValue(mon);
+            }
+        }
 	
 	// FIXME: We are forcing notification updates for the subscription. This should be done by a timed work item.
 	Subscription_updateNotifications(sub);
@@ -160,7 +160,7 @@ void Service_Publish(UA_Server *server, UA_Session *session,
 	  
 	  Subscription_copyTopNotificationMessage(&(response->notificationMessage), sub);
 	  
-	  if (sub->unpublishedNotifications->lh_first->notification->sequenceNumber > sub->SequenceNumber) {
+	  if (sub->unpublishedNotifications.lh_first->notification->sequenceNumber > sub->SequenceNumber) {
 	    // If this is a keepalive message, its seqNo is the next seqNo to be used for an actual msg.
 	    response->availableSequenceNumbersSize = 0;
 	    // .. and must be deleted

+ 448 - 0
src/server/ua_subscription.c

@@ -0,0 +1,448 @@
+#ifdef ENABLE_SUBSCRIPTIONS
+
+#include "ua_subscription.h"
+
+/****************/
+/* Subscription */
+/****************/
+
+UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID) {
+    UA_Subscription *new = (UA_Subscription *) UA_malloc(sizeof(UA_Subscription));
+    
+    new->SubscriptionID = SubscriptionID;
+    new->LastPublished  = 0;
+    new->SequenceNumber = 0;
+    LIST_INIT(&new->MonitoredItems);
+    LIST_INITENTRY(new, listEntry);
+    LIST_INIT(&new->unpublishedNotifications);
+    return new;
+}
+
+UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription) {
+    if (!subscription)
+        return 0;
+
+    UA_UInt32 j = 0;
+    UA_unpublishedNotification *i;
+    LIST_FOREACH(i, &subscription->unpublishedNotifications, listEntry)
+        j++;
+    return j;
+}
+
+void Subscription_generateKeepAlive(UA_Subscription *subscription) {
+  UA_unpublishedNotification *msg = NULL;
+  
+  if (subscription->KeepAliveCount.currentValue <= subscription->KeepAliveCount.minValue || subscription->KeepAliveCount.currentValue > subscription->KeepAliveCount.maxValue) {
+    msg = (UA_unpublishedNotification *) UA_malloc(sizeof(UA_unpublishedNotification));
+    LIST_INITENTRY(msg, listEntry);
+    INITPOINTER(msg->notification);
+    
+    msg->notification = (UA_NotificationMessage *) UA_malloc(sizeof(UA_NotificationMessage));
+    INITPOINTER(msg->notification->notificationData);
+    msg->notification->sequenceNumber = (subscription->SequenceNumber)+1; // KeepAlive uses next message, but does not increment counter
+    msg->notification->publishTime    = UA_DateTime_now();
+    msg->notification->notificationDataSize = 0;
+    
+    LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
+    subscription->KeepAliveCount.currentValue = subscription->KeepAliveCount.maxValue;
+  }
+  
+  return;
+}
+
+void Subscription_updateNotifications(UA_Subscription *subscription) {
+    UA_MonitoredItem *mon;
+    //MonitoredItem_queuedValue *queuedValue;
+    UA_unpublishedNotification *msg = NULL;
+    UA_UInt32 monItemsChangeT = 0, monItemsStatusT = 0, monItemsEventT = 0;
+    UA_DataChangeNotification *changeNotification;
+    size_t notificationOffset;
+    
+    if(!subscription)
+        return;
+    if((subscription->LastPublished + subscription->PublishingInterval) > UA_DateTime_now())
+        return;
+    
+    // Make sure there is data to be published and establish which message types
+    // will need to be generated
+    LIST_FOREACH(mon, &subscription->MonitoredItems, listEntry) {
+        // Check if this MonitoredItems Queue holds data and how much data is held in total
+        if (!mon->queue.lh_first)
+            continue;
+        if((mon->MonitoredItemType & MONITOREDITEM_TYPE_CHANGENOTIFY) != 0)
+            monItemsChangeT+=mon->QueueSize.currentValue;
+	    else if((mon->MonitoredItemType & MONITOREDITEM_TYPE_STATUSNOTIFY) != 0)
+            monItemsStatusT+=mon->QueueSize.currentValue;
+	    else if ((mon->MonitoredItemType & MONITOREDITEM_TYPE_EVENTNOTIFY)  != 0)
+            monItemsEventT+=mon->QueueSize.currentValue;
+    }
+    
+    // FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
+    if (Subscription_queuedNotifications(subscription) >= 10) {
+        // Remove last entry
+        LIST_FOREACH(msg, &subscription->unpublishedNotifications, listEntry)
+            LIST_REMOVE(msg, listEntry);
+        UA_free(msg);
+    }
+    
+    if (monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
+        // Decrement KeepAlive
+        subscription->KeepAliveCount.currentValue--;
+        // +- Generate KeepAlive msg if counter overruns
+        Subscription_generateKeepAlive(subscription);
+        return;
+    }
+    
+    msg = (UA_unpublishedNotification *) UA_malloc(sizeof(UA_unpublishedNotification));
+    LIST_INITENTRY(msg, listEntry);
+    msg->notification = UA_malloc(sizeof(UA_NotificationMessage));
+    INITPOINTER(msg->notification->notificationData);
+    msg->notification->sequenceNumber = subscription->SequenceNumber++;
+    msg->notification->publishTime    = UA_DateTime_now();
+    
+    // NotificationData is an array of Change, Status and Event messages, each containing the appropriate
+    // list of Queued values from all monitoredItems of that type
+    msg->notification->notificationDataSize = ISNOTZERO(monItemsChangeT);// + ISNOTZERO(monItemsEventT) + ISNOTZERO(monItemsStatusT);
+    msg->notification->notificationData = (UA_ExtensionObject *) UA_malloc(sizeof(UA_ExtensionObject) * msg->notification->notificationDataSize);
+    
+    for(int notmsgn=0; notmsgn < msg->notification->notificationDataSize; notmsgn++) {
+      // Set the notification message type and encoding for each of 
+      //   the three possible NotificationData Types
+      (msg->notification->notificationData)[notmsgn].encoding = 1; // Encoding is always binary
+      (msg->notification->notificationData)[notmsgn].typeId = UA_NODEID_NUMERIC(0, 811);
+      
+      if(notmsgn == 0) {
+	// Construct a DataChangeNotification
+	changeNotification = (UA_DataChangeNotification *) UA_malloc(sizeof(UA_DataChangeNotification));
+	
+	// Create one DataChangeNotification for each queue item held in each monitoredItems queue:
+	changeNotification->monitoredItems      = (UA_MonitoredItemNotification *) UA_malloc(sizeof(UA_MonitoredItemNotification) * monItemsChangeT);
+	
+        // Scan all monitoredItems in this subscription and have their queue transformed into an Array of
+        // the propper NotificationMessageType (Status, Change, Event)
+	monItemsChangeT = 0;
+	for(mon=subscription->MonitoredItems.lh_first; mon != NULL; mon=mon->listEntry.le_next) {
+	  if (mon->MonitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY || mon->queue.lh_first == NULL ) continue;
+	  // Note: Monitored Items might not return a queuedValue if there is a problem encoding it.
+          monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications( &((changeNotification->monitoredItems)[monItemsChangeT]), mon);
+          MonitoredItem_ClearQueue(mon);
+	}
+	changeNotification->monitoredItemsSize  = monItemsChangeT;
+        changeNotification->diagnosticInfosSize = 0;
+        changeNotification->diagnosticInfos     = NULL;
+        
+	(msg->notification->notificationData[notmsgn]).body.length = UA_calcSizeBinary(changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]);
+        (msg->notification->notificationData[notmsgn]).body.data   =  UA_malloc((msg->notification->notificationData[notmsgn]).body.length);
+        
+        notificationOffset = 0;
+        UA_encodeBinary((const void *) changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION], &(msg->notification->notificationData[notmsgn].body), &notificationOffset);
+
+        UA_free(changeNotification->monitoredItems);
+        UA_free(changeNotification);
+      }
+      else if (notmsgn == 1) {
+	// FIXME: Constructing a StatusChangeNotification is not implemented
+      }
+      else if (notmsgn == 2) {
+	// FIXME: Constructing a EventListNotification is not implemented
+      }
+    }
+    LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
+}
+
+UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
+  UA_UInt32 *seqArray;
+  int i;
+  UA_unpublishedNotification *not;
+  
+  if(!sub)
+      return NULL;
+  
+  seqArray = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32) * Subscription_queuedNotifications(sub));
+  if (seqArray == NULL ) return NULL;
+  
+  i = 0;
+  for(not = sub->unpublishedNotifications.lh_first; not != NULL; not=(not->listEntry).le_next) {
+    seqArray[i] = not->notification->sequenceNumber;
+    i++;
+  }
+  return seqArray;
+}
+
+void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub) {
+    UA_NotificationMessage *latest;
+    
+    if (dst == NULL) return;
+    
+    if (Subscription_queuedNotifications(sub) == 0) {
+      dst->notificationDataSize = 0;
+      dst->publishTime = UA_DateTime_now();
+      dst->sequenceNumber = 0;
+      return;
+    }
+    
+    latest = sub->unpublishedNotifications.lh_first->notification;
+    dst->notificationDataSize = latest->notificationDataSize;
+    dst->publishTime = latest->publishTime;
+    dst->sequenceNumber = latest->sequenceNumber;
+    
+    if (latest->notificationDataSize == 0) return;
+    
+    dst->notificationData = (UA_ExtensionObject *) UA_malloc(sizeof(UA_ExtensionObject));
+    dst->notificationData->encoding = latest->notificationData->encoding;
+    dst->notificationData->typeId   = latest->notificationData->typeId;
+    dst->notificationData->body.length = latest->notificationData->body.length;
+    dst->notificationData->body.data   = UA_malloc(latest->notificationData->body.length);
+    UA_ByteString_copy((UA_String *) &(latest->notificationData->body),
+                       (UA_String *) &(dst->notificationData->body));
+}
+
+UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub) {
+    UA_unpublishedNotification *not;
+    UA_UInt32 deletedItems = 0;
+  
+    for(not=sub->unpublishedNotifications.lh_first; not != NULL; not=not->listEntry.le_next) {
+        if(not->notification->sequenceNumber != seqNo)
+            continue;
+        LIST_REMOVE(not, listEntry);
+        if(not->notification != NULL) {
+            if (not->notification->notificationData != NULL) {
+                if (not->notification->notificationData->body.data != NULL)
+                    UA_free(not->notification->notificationData->body.data);
+                UA_free(not->notification->notificationData);
+            }
+            UA_free(not->notification);
+        }
+        UA_free(not);
+        deletedItems++;
+    }
+    return deletedItems;
+}
+
+
+/*****************/
+/* MonitoredItem */
+/*****************/
+
+UA_MonitoredItem *UA_MonitoredItem_new() {
+    UA_MonitoredItem *new = (UA_MonitoredItem *) UA_malloc(sizeof(UA_MonitoredItem));
+    new->QueueSize   = (UA_UInt32_BoundedValue) { .minValue = 0, .maxValue = 0, .currentValue = 0};
+    new->LastSampled = 0;
+    
+    // FIXME: This is currently hardcoded;
+    new->MonitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
+    
+    LIST_INIT(&new->queue);
+    LIST_INITENTRY(new, listEntry);
+    INITPOINTER(new->monitoredNode);
+    INITPOINTER(new->LastSampledValue.data );
+    return new;
+}
+
+void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
+    if (monitoredItem == NULL) return;
+    
+    // Delete Queued Data
+    MonitoredItem_ClearQueue(monitoredItem);
+    // Remove from subscription list
+    LIST_REMOVE(monitoredItem, listEntry);
+    // Release comparison sample
+    if(monitoredItem->LastSampledValue.data != NULL) { 
+      UA_free(monitoredItem->LastSampledValue.data);
+    }
+    
+    UA_free(monitoredItem);
+}
+
+int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
+                                                 UA_MonitoredItem *monitoredItem) {
+  int queueSize = 0;
+  MonitoredItem_queuedValue *queueItem;
+  
+  // Count instead of relying on the items currentValue
+  LIST_FOREACH(queueItem, &monitoredItem->queue, listEntry) {
+    dst[queueSize].clientHandle = monitoredItem->ClientHandle;
+    dst[queueSize].value.hasServerPicoseconds = UA_FALSE;
+    dst[queueSize].value.hasServerTimestamp   = UA_FALSE;
+    dst[queueSize].value.serverTimestamp      = UA_FALSE;
+    dst[queueSize].value.hasSourcePicoseconds = UA_FALSE;
+    dst[queueSize].value.hasSourceTimestamp   = UA_FALSE;
+    dst[queueSize].value.hasValue             = UA_TRUE;
+    dst[queueSize].value.status = UA_STATUSCODE_GOOD;
+    
+    UA_Variant_copy(&(queueItem->value), &(dst[queueSize].value.value));
+    
+    // Do not create variants with no type -> will make calcSizeBinary() segfault.
+    if(dst[queueSize].value.value.type)
+        queueSize++;
+  }
+  return queueSize;
+}
+
+void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
+    MonitoredItem_queuedValue *val;
+    if (monitoredItem == NULL)
+        return;
+    while(monitoredItem->queue.lh_first) {
+        val = monitoredItem->queue.lh_first;
+        LIST_REMOVE(val, listEntry);
+        UA_free(val);
+    }
+    monitoredItem->QueueSize.currentValue = 0;
+}
+
+UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 AttributeID, const UA_Node *src,
+                                                     UA_Variant *dst) {
+  UA_Boolean samplingError = UA_TRUE; 
+  UA_DataValue sourceDataValue;
+  const UA_VariableNode *srcAsVariableNode = (const UA_VariableNode *) src;
+  
+  // FIXME: Not all AttributeIDs can be monitored yet
+  switch(AttributeID) {
+    case UA_ATTRIBUTEID_NODEID:
+      UA_Variant_setScalarCopy(dst, (const UA_NodeId *) &(src->nodeId), &UA_TYPES[UA_TYPES_NODEID]);
+      samplingError = UA_FALSE;
+      break;
+    case UA_ATTRIBUTEID_NODECLASS:
+      UA_Variant_setScalarCopy(dst, (const UA_Int32 *) &(src->nodeClass), &UA_TYPES[UA_TYPES_INT32]);
+      samplingError = UA_FALSE;
+      break;
+    case UA_ATTRIBUTEID_BROWSENAME:
+      UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->browseName), &UA_TYPES[UA_TYPES_QUALIFIEDNAME]);
+      samplingError = UA_FALSE;
+      break;
+    case UA_ATTRIBUTEID_DISPLAYNAME:
+      UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->displayName), &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
+      samplingError = UA_FALSE;
+      break;
+    case UA_ATTRIBUTEID_DESCRIPTION:
+      UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->displayName), &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
+      samplingError = UA_FALSE;
+      break;
+    case UA_ATTRIBUTEID_WRITEMASK:
+      UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->writeMask), &UA_TYPES[UA_TYPES_UINT32]);
+      samplingError = UA_FALSE;
+      break;
+    case UA_ATTRIBUTEID_USERWRITEMASK:
+      UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->writeMask), &UA_TYPES[UA_TYPES_UINT32]);
+      samplingError = UA_FALSE;
+      break;
+    case UA_ATTRIBUTEID_ISABSTRACT:
+      break;
+    case UA_ATTRIBUTEID_SYMMETRIC:
+      break;
+    case UA_ATTRIBUTEID_INVERSENAME:
+      break;
+    case UA_ATTRIBUTEID_CONTAINSNOLOOPS:
+      break;
+    case UA_ATTRIBUTEID_EVENTNOTIFIER:
+      break;
+    case UA_ATTRIBUTEID_VALUE: 
+      if (src->nodeClass == UA_NODECLASS_VARIABLE) {
+        if ( srcAsVariableNode->valueSource == UA_VALUESOURCE_VARIANT) {
+          UA_Variant_copy( (const UA_Variant *) &((const UA_VariableNode *) src)->value, dst);
+          samplingError = UA_FALSE;
+        }
+        else if (srcAsVariableNode->valueSource == UA_VALUESOURCE_DATASOURCE) {
+            // todo: handle numeric ranges
+            if (srcAsVariableNode->value.dataSource.read(((const UA_VariableNode *) src)->value.dataSource.handle, (UA_Boolean) UA_TRUE, UA_NULL, &sourceDataValue) == UA_STATUSCODE_GOOD) {
+            UA_Variant_copy( (const UA_Variant *) &(sourceDataValue.value), dst);
+            samplingError = UA_FALSE;
+          }
+        }
+      }
+      break;
+    case UA_ATTRIBUTEID_DATATYPE:
+      break;
+    case UA_ATTRIBUTEID_VALUERANK:
+      break;
+    case UA_ATTRIBUTEID_ARRAYDIMENSIONS:
+      break;
+    case UA_ATTRIBUTEID_ACCESSLEVEL:
+      break;
+    case UA_ATTRIBUTEID_USERACCESSLEVEL:
+      break;
+    case UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
+      break;
+    case UA_ATTRIBUTEID_HISTORIZING:
+      break;
+    case UA_ATTRIBUTEID_EXECUTABLE:
+      break;
+    case UA_ATTRIBUTEID_USEREXECUTABLE:
+      break;
+    default:
+      break;
+  }
+  
+  return samplingError;
+}
+
+void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem) {
+  MonitoredItem_queuedValue *newvalue = NULL, *queueItem = NULL;
+  UA_Boolean samplingError = UA_TRUE; 
+  UA_ByteString newValueAsByteString = { .length=0, .data=NULL };
+  size_t encodingOffset = 0;
+  
+  if(!monitoredItem || monitoredItem->LastSampled + monitoredItem->SamplingInterval > UA_DateTime_now())
+      return;
+  
+  // FIXME: Actively suppress non change value based monitoring. There should be
+  // another function to handle status and events.
+  if (monitoredItem->MonitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY)
+      return;
+  
+  newvalue = (MonitoredItem_queuedValue *) UA_malloc(sizeof(MonitoredItem_queuedValue));
+  LIST_INITENTRY(newvalue,listEntry);
+  newvalue->value.arrayLength         = 0;
+  newvalue->value.arrayDimensionsSize = 0;
+  newvalue->value.arrayDimensions     = NULL;
+  newvalue->value.data                = NULL;
+  newvalue->value.type                = NULL;
+  
+  samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->AttributeID, monitoredItem->monitoredNode, &(newvalue->value));
+  
+  if ((monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
+    if (newvalue->value.type != NULL && monitoredItem->DiscardOldest == UA_TRUE && monitoredItem->queue.lh_first != NULL ) {
+          for(queueItem = monitoredItem->queue.lh_first; queueItem->listEntry.le_next != NULL; queueItem = queueItem->listEntry.le_next) {}
+
+          LIST_REMOVE(queueItem, listEntry);
+          UA_free(queueItem);
+          (monitoredItem->QueueSize).currentValue--;
+      }
+      else {
+          // We cannot remove the oldest value and theres no queue space left. We're done here.
+          UA_free(newvalue);
+          return;
+      }
+  }
+  
+  // Only add a value if we have sampled it correctly and it fits into the queue;
+  if ( samplingError != UA_FALSE || newvalue->value.type == NULL || (monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
+    UA_free(newvalue);
+    return;
+  }
+  
+  newValueAsByteString.length = UA_calcSizeBinary((const void *) &(newvalue->value), &UA_TYPES[UA_TYPES_VARIANT]);
+  newValueAsByteString.data   = UA_malloc(newValueAsByteString.length);
+  UA_encodeBinary((const void *) &(newvalue->value), &UA_TYPES[UA_TYPES_VARIANT], &(newValueAsByteString), &encodingOffset );
+  
+  if(monitoredItem->LastSampledValue.data == NULL) { 
+    UA_ByteString_copy((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue));
+    LIST_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
+    (monitoredItem->QueueSize).currentValue++;
+    monitoredItem->LastSampled = UA_DateTime_now();
+  }
+  else {
+    if (UA_String_equal((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue)) == UA_TRUE) {
+      UA_free(newValueAsByteString.data);
+      return;
+    }
+    
+    UA_ByteString_copy((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue));
+    LIST_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
+    (monitoredItem->QueueSize).currentValue++;
+    monitoredItem->LastSampled = UA_DateTime_now();
+  }
+}
+
+#endif

+ 105 - 0
src/server/ua_subscription.h

@@ -0,0 +1,105 @@
+#ifdef  ENABLE_SUBSCRIPTIONS
+#ifndef UA_SUBSCRIPTION_H_
+#define UA_SUBSCRIPTION_H_
+
+#include "ua_util.h"
+#include "ua_types.h"
+#include "ua_types_generated.h"
+#include "ua_nodes.h"
+
+#define LIST_INITENTRY(item,entry) \
+  (item)->entry.le_next = NULL; \
+  (item)->entry.le_prev = NULL;
+
+#define INITPOINTER(src) (src) = NULL;
+#define ISNOTZERO(value) ((value == 0) ? 0 : 1)
+
+/*****************/
+/* MonitoredItem */
+/*****************/
+
+typedef struct {
+    UA_Int32 currentValue;
+    UA_Int32 minValue;
+    UA_Int32 maxValue;
+} UA_Int32_BoundedValue;
+
+typedef struct {
+    UA_UInt32 currentValue;
+    UA_UInt32 minValue;
+    UA_UInt32 maxValue;
+} UA_UInt32_BoundedValue;
+
+typedef enum {
+    MONITOREDITEM_TYPE_CHANGENOTIFY = 1,
+    MONITOREDITEM_TYPE_STATUSNOTIFY = 2,
+    MONITOREDITEM_TYPE_EVENTNOTIFY = 4
+} MONITOREDITEM_TYPE;
+
+typedef struct MonitoredItem_queuedValue_s {
+    UA_Variant value;
+    LIST_ENTRY(MonitoredItem_queuedValue_s) listEntry;
+} MonitoredItem_queuedValue;
+
+typedef struct UA_MonitoredItem_s {
+    UA_UInt32                       ItemId;
+    MONITOREDITEM_TYPE		    MonitoredItemType;
+    UA_UInt32                       TimestampsToReturn;
+    UA_UInt32                       MonitoringMode;
+    const UA_Node                   *monitoredNode; // Pointer to a node of any type
+    UA_UInt32                       AttributeID;
+    UA_UInt32                       ClientHandle;
+    UA_UInt32                       SamplingInterval;
+    UA_UInt32_BoundedValue          QueueSize;
+    UA_Boolean                      DiscardOldest;
+    UA_DateTime                     LastSampled;
+    UA_ByteString                   LastSampledValue;
+    // FIXME: indexRange is ignored; array values default to element 0
+    // FIXME: dataEncoding is hardcoded to UA binary
+    LIST_ENTRY(UA_MonitoredItem_s)  listEntry;
+    LIST_HEAD(UA_ListOfQueuedDataValues, MonitoredItem_queuedValue_s) queue;
+} UA_MonitoredItem;
+
+UA_MonitoredItem *UA_MonitoredItem_new(void);
+void MonitoredItem_delete(UA_MonitoredItem *monitoredItem);
+void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem);
+void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem);
+UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 AttributeID, const UA_Node *src,
+                                                     UA_Variant *dst);
+int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
+                                                 UA_MonitoredItem *monitoredItem);
+
+/****************/
+/* Subscription */
+/****************/
+
+typedef struct UA_unpublishedNotification_s {
+    UA_NotificationMessage 		     *notification;
+    LIST_ENTRY(UA_unpublishedNotification_s) listEntry;
+} UA_unpublishedNotification;
+
+typedef struct UA_Subscription_s {
+    UA_UInt32_BoundedValue              LifeTime;
+    UA_Int32_BoundedValue               KeepAliveCount;
+    UA_DateTime                         PublishingInterval;
+    UA_DateTime                         LastPublished;
+    UA_Int32                            SubscriptionID;
+    UA_Int32                            NotificationsPerPublish;
+    UA_Boolean                          PublishingMode;
+    UA_UInt32                           Priority;
+    UA_UInt32                           SequenceNumber;
+    LIST_ENTRY(UA_Subscription_s)       listEntry;
+    LIST_HEAD(UA_ListOfUnpublishedNotifications, UA_unpublishedNotification_s) unpublishedNotifications;
+    LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem_s) MonitoredItems;
+} UA_Subscription;
+
+UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID);
+void Subscription_updateNotifications(UA_Subscription *subscription);
+UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription);
+UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub);
+void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub);
+UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub);
+void Subscription_generateKeepAlive(UA_Subscription *subscription);
+
+#endif //UA_SUBSCRIPTION_H_
+#endif //ENABLE_SUBSCRIPTIONS

+ 13 - 469
src/server/ua_subscription_manager.c

@@ -9,7 +9,7 @@ void SubscriptionManager_init(UA_Session *session) {
 
     /* FIXME: These init values are empirical. Maybe they should be part
      *        of the server config? */
-    manager->GlobalPublishingInterval      = (UA_Int32_BoundedValue)  { .maxValue = 100,   .minValue = 0, .currentValue=0 };
+    manager->GlobalPublishingInterval = (UA_Int32_BoundedValue) { .maxValue = 100,   .minValue = 0, .currentValue=0 };
     manager->GlobalLifeTimeCount           = (UA_UInt32_BoundedValue) { .maxValue = 15000, .minValue = 0, .currentValue=0 };
     manager->GlobalKeepAliveCount          = (UA_UInt32_BoundedValue) { .maxValue = 100,   .minValue = 0, .currentValue=0 };
     manager->GlobalNotificationsPerPublish = (UA_Int32_BoundedValue)  { .maxValue = 1000,  .minValue = 1, .currentValue=0 };
@@ -18,59 +18,23 @@ void SubscriptionManager_init(UA_Session *session) {
     
     manager->ServerSubscriptions = (UA_ListOfUASubscriptions *) UA_malloc (sizeof(UA_ListOfUASubscriptions));
     LIST_INIT(manager->ServerSubscriptions);
-    
     manager->LastSessionID = (UA_UInt32) UA_DateTime_now();
-    return;
 }
 
 void SubscriptionManager_deleteMembers(UA_Session *session) {
     UA_SubscriptionManager *manager = &(session->subscriptionManager);
-
     UA_free(manager->ServerSubscriptions);
-
-    return;
-}
-
-UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID) {
-    UA_Subscription *new = (UA_Subscription *) UA_malloc(sizeof(UA_Subscription));
-    
-    new->SubscriptionID = SubscriptionID;
-    new->LastPublished  = 0;
-    new->SequenceNumber = 0;
-    new->MonitoredItems = (UA_ListOfUAMonitoredItems *) UA_malloc (sizeof(UA_ListOfUAMonitoredItems));
-    LIST_INIT(new->MonitoredItems);
-    LIST_INITENTRY(new, listEntry);
-    new->unpublishedNotifications = (UA_ListOfUnpublishedNotifications *) UA_malloc(sizeof(UA_ListOfUnpublishedNotifications));
-    LIST_INIT(new->unpublishedNotifications);
-    return new;
 }
 
-UA_MonitoredItem *UA_MonitoredItem_new() {
-    UA_MonitoredItem *new = (UA_MonitoredItem *) UA_malloc(sizeof(UA_MonitoredItem));
-    new->queue       = (UA_ListOfQueuedDataValues *) UA_malloc (sizeof(UA_ListOfQueuedDataValues));
-    new->QueueSize   = (UA_UInt32_BoundedValue) { .minValue = 0, .maxValue = 0, .currentValue = 0};
-    new->LastSampled = 0;
-    
-    // FIXME: This is currently hardcoded;
-    new->MonitoredItemType = MONITOREDITEM_CHANGENOTIFY_T;
-    
-    LIST_INIT(new->queue);
-    LIST_INITENTRY(new, listEntry);
-    INITPOINTER(new->monitoredNode);
-    INITPOINTER(new->LastSampledValue.data );
-    return new;
-}
 
-void SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *newSubscription) {    
+void SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *newSubscription) {
     LIST_INSERT_HEAD(manager->ServerSubscriptions, newSubscription, listEntry);
-
-    return;
 }
 
-UA_Subscription *SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID) {
+UA_Subscription *SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager,
+                                                         UA_Int32 SubscriptionID) {
     UA_Subscription *retsub, *sub;
-    retsub = (UA_Subscription *) NULL;
-    
+    retsub = UA_NULL;
     for (sub = (manager->ServerSubscriptions)->lh_first; sub != NULL; sub = sub->listEntry.le_next) {
         if (sub->SubscriptionID == SubscriptionID) {
             retsub = sub;
@@ -80,7 +44,8 @@ UA_Subscription *SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager
     return retsub;
 }
 
-UA_Int32 SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID, UA_UInt32 MonitoredItemID) {
+UA_Int32 SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID,
+                                                 UA_UInt32 MonitoredItemID) {
     UA_Subscription *sub;
     UA_MonitoredItem *mon;
     
@@ -89,7 +54,7 @@ UA_Int32 SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager
     sub = SubscriptionManager_getSubscriptionByID(manager, SubscriptionID);
     if (sub == NULL) return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
     
-    for(mon=(sub->MonitoredItems)->lh_first; mon != NULL; mon=(mon->listEntry).le_next) {
+    for(mon=sub->MonitoredItems.lh_first; mon != NULL; mon=mon->listEntry.le_next) {
         if (mon->ItemId == MonitoredItemID) {
             MonitoredItem_delete(mon);
             return UA_STATUSCODE_GOOD;
@@ -99,23 +64,6 @@ UA_Int32 SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager
     return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
 }
 
-void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
-    if (monitoredItem == NULL) return;
-    
-    // Delete Queued Data
-    MonitoredItem_ClearQueue(monitoredItem);
-    // Remove from subscription list
-    LIST_REMOVE(monitoredItem, listEntry);
-    // Release comparison sample
-    if(monitoredItem->LastSampledValue.data != NULL) { 
-      UA_free(monitoredItem->LastSampledValue.data);
-    }
-    
-    UA_free(monitoredItem);
-    
-    return;
-}
-
 UA_Int32 SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID) {
     UA_Subscription  *sub;
     UA_MonitoredItem *mon;
@@ -124,8 +72,8 @@ UA_Int32 SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager,
     sub = SubscriptionManager_getSubscriptionByID(manager, SubscriptionID);    
     if (sub != NULL) {
         // Delete registered subscriptions
-        while (sub->MonitoredItems->lh_first != NULL)  {
-           mon = sub->MonitoredItems->lh_first;
+        while (sub->MonitoredItems.lh_first != NULL)  {
+           mon = sub->MonitoredItems.lh_first;
            // Delete Sampled data
            MonitoredItem_delete(mon);
         }
@@ -135,420 +83,16 @@ UA_Int32 SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager,
     }
     
     // Delete queued notification messages
-    notify = sub->unpublishedNotifications->lh_first;
-    while (sub->unpublishedNotifications->lh_first != NULL)  {
-       notify = sub->unpublishedNotifications->lh_first;
+    notify = LIST_FIRST(&sub->unpublishedNotifications);
+    while(sub->unpublishedNotifications.lh_first != NULL)  {
+       notify = sub->unpublishedNotifications.lh_first;
        LIST_REMOVE(notify, listEntry);
        UA_free(notify);
     }
     
     LIST_REMOVE(sub, listEntry);
     UA_free(sub);
-    
     return UA_STATUSCODE_GOOD;
 } 
 
-UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription) {
-    UA_UInt32 j = 0;
-    if (subscription == NULL) return 0;
-    
-    for(UA_unpublishedNotification *i = subscription->unpublishedNotifications->lh_first; i != NULL; i=(i->listEntry).le_next) j++;
-    
-    return j;
-}
-
-void Subscription_generateKeepAlive(UA_Subscription *subscription) {
-  UA_unpublishedNotification *msg = NULL;
-  
-  if (subscription->KeepAliveCount.currentValue <= subscription->KeepAliveCount.minValue || subscription->KeepAliveCount.currentValue > subscription->KeepAliveCount.maxValue) {
-    msg = (UA_unpublishedNotification *) UA_malloc(sizeof(UA_unpublishedNotification));
-    LIST_INITENTRY(msg, listEntry);
-    INITPOINTER(msg->notification);
-    
-    msg->notification = (UA_NotificationMessage *) UA_malloc(sizeof(UA_NotificationMessage));
-    INITPOINTER(msg->notification->notificationData);
-    msg->notification->sequenceNumber = (subscription->SequenceNumber)+1; // KeepAlive uses next message, but does not increment counter
-    msg->notification->publishTime    = UA_DateTime_now();
-    msg->notification->notificationDataSize = 0;
-    
-    LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
-    subscription->KeepAliveCount.currentValue = subscription->KeepAliveCount.maxValue;
-  }
-  
-  return;
-}
-
-void Subscription_updateNotifications(UA_Subscription *subscription) {
-    UA_MonitoredItem *mon;
-    //MonitoredItem_queuedValue *queuedValue;
-    UA_unpublishedNotification *msg = NULL;
-    UA_UInt32 monItemsChangeT = 0, monItemsStatusT = 0, monItemsEventT = 0;
-    UA_DataChangeNotification *changeNotification;
-    size_t notificationOffset;
-    
-    if (subscription == NULL) return;
-    if ((subscription->LastPublished + subscription->PublishingInterval) > UA_DateTime_now()) return;
-         
-    
-    // Make sure there is data to be published and establish which message types
-    // will need to be generated
-    for(mon=subscription->MonitoredItems->lh_first; mon!= NULL; mon=mon->listEntry.le_next) {
-        // Check if this MonitoredItems Queue holds data and how much data is held in total
-	if (mon->queue->lh_first != NULL) {
-            if      ((mon->MonitoredItemType & MONITOREDITEM_CHANGENOTIFY_T) != 0) monItemsChangeT+=mon->QueueSize.currentValue;
-	    else if ((mon->MonitoredItemType & MONITOREDITEM_STATUSNOTIFY_T) != 0) monItemsStatusT+=mon->QueueSize.currentValue;
-	    else if ((mon->MonitoredItemType & MONITOREDITEM_EVENTNOTIFY_T)  != 0) monItemsEventT+=mon->QueueSize.currentValue;
-        }
-    }
-    
-    // FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
-    if (Subscription_queuedNotifications(subscription) >= 10) {
-        // Remove last entry
-        for(msg = subscription->unpublishedNotifications->lh_first; (msg->listEntry).le_next != NULL; msg=(msg->listEntry).le_next);
-        LIST_REMOVE(msg, listEntry);
-        UA_free(msg);
-    }
-    
-    if (monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
-        // Decrement KeepAlive
-        subscription->KeepAliveCount.currentValue--;
-        // +- Generate KeepAlive msg if counter overruns
-        Subscription_generateKeepAlive(subscription);
-        
-        return;
-    }
-    
-    msg = (UA_unpublishedNotification *) UA_malloc(sizeof(UA_unpublishedNotification));
-    LIST_INITENTRY(msg, listEntry);
-    INITPOINTER(msg->notification);
-    
-    msg->notification = (UA_NotificationMessage *) UA_malloc(sizeof(UA_NotificationMessage));
-    INITPOINTER(msg->notification->notificationData);
-    msg->notification->sequenceNumber = subscription->SequenceNumber++;
-    msg->notification->publishTime    = UA_DateTime_now();
-    
-    // NotificationData is an array of Change, Status and Event messages, each containing the appropriate
-    // list of Queued values from all monitoredItems of that type
-    msg->notification->notificationDataSize = ISNOTZERO(monItemsChangeT);// + ISNOTZERO(monItemsEventT) + ISNOTZERO(monItemsStatusT);
-    msg->notification->notificationData = (UA_ExtensionObject *) UA_malloc(sizeof(UA_ExtensionObject) * msg->notification->notificationDataSize);
-    
-    for(int notmsgn=0; notmsgn < msg->notification->notificationDataSize; notmsgn++) {
-      // Set the notification message type and encoding for each of 
-      //   the three possible NotificationData Types
-      (msg->notification->notificationData)[notmsgn].encoding = 1; // Encoding is always binary
-      (msg->notification->notificationData)[notmsgn].typeId = UA_NODEID_NUMERIC(0, 811);
-      
-      if(notmsgn == 0) {
-	// Construct a DataChangeNotification
-	changeNotification = (UA_DataChangeNotification *) UA_malloc(sizeof(UA_DataChangeNotification));
-	
-	// Create one DataChangeNotification for each queue item held in each monitoredItems queue:
-	changeNotification->monitoredItems      = (UA_MonitoredItemNotification *) UA_malloc(sizeof(UA_MonitoredItemNotification) * monItemsChangeT);
-	
-        // Scan all monitoredItems in this subscription and have their queue transformed into an Array of
-        // the propper NotificationMessageType (Status, Change, Event)
-	monItemsChangeT = 0;
-	for(mon=subscription->MonitoredItems->lh_first; mon != NULL; mon=mon->listEntry.le_next) {
-	  if (mon->MonitoredItemType != MONITOREDITEM_CHANGENOTIFY_T || mon->queue->lh_first == NULL ) continue;
-	  // Note: Monitored Items might not return a queuedValue if there is a problem encoding it.
-          monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications( &((changeNotification->monitoredItems)[monItemsChangeT]), mon);
-          MonitoredItem_ClearQueue(mon);
-	}
-	changeNotification->monitoredItemsSize  = monItemsChangeT;
-        changeNotification->diagnosticInfosSize = 0;
-        changeNotification->diagnosticInfos     = NULL;
-        
-	(msg->notification->notificationData[notmsgn]).body.length = UA_calcSizeBinary(changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]);
-        (msg->notification->notificationData[notmsgn]).body.data   =  UA_malloc((msg->notification->notificationData[notmsgn]).body.length);
-        
-        notificationOffset = 0;
-        UA_encodeBinary((const void *) changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION], &(msg->notification->notificationData[notmsgn].body), &notificationOffset);
-
-        UA_free(changeNotification->monitoredItems);
-        UA_free(changeNotification);
-      }
-      else if (notmsgn == 1) {
-	// FIXME: Constructing a StatusChangeNotification is not implemented
-      }
-      else if (notmsgn == 2) {
-	// FIXME: Constructing a EventListNotification is not implemented
-      }
-    }
-    LIST_INSERT_HEAD(subscription->unpublishedNotifications, msg, listEntry);
-    
-    return;
-}
-
-int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst, UA_MonitoredItem *monitoredItem) {
-  int queueSize = 0;
-  MonitoredItem_queuedValue *queueItem;
-  
-  // Count instead of relying on the items currentValue
-  for (queueItem = monitoredItem->queue->lh_first; queueItem != NULL; queueItem=queueItem->listEntry.le_next) {
-    dst[queueSize].clientHandle = monitoredItem->ClientHandle;
-    dst[queueSize].value.hasServerPicoseconds = UA_FALSE;
-    dst[queueSize].value.hasServerTimestamp   = UA_FALSE;
-    dst[queueSize].value.serverTimestamp      = UA_FALSE;
-    dst[queueSize].value.hasSourcePicoseconds = UA_FALSE;
-    dst[queueSize].value.hasSourceTimestamp   = UA_FALSE;
-    dst[queueSize].value.hasValue             = UA_TRUE;
-    dst[queueSize].value.status = UA_STATUSCODE_GOOD;
-    
-    UA_Variant_copy(&(queueItem->value), &(dst[queueSize].value.value));
-    
-    // Do not create variants with no type -> will make calcSizeBinary() segfault.
-    if(dst[queueSize].value.value.type == NULL) {
-      queueSize--;
-    };
-    queueSize++;
-  }
-  if (queueSize == 0) return 0;
-  
-  return queueSize;
-}
-
-void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
-  MonitoredItem_queuedValue *val;
-  
-  if (monitoredItem == NULL) return;
-  while(monitoredItem->queue->lh_first != NULL) {
-    val = monitoredItem->queue->lh_first;
-    LIST_REMOVE(monitoredItem->queue->lh_first, listEntry);
-    UA_free(val);
-  }
-  
-  (monitoredItem->QueueSize).currentValue = 0;
-  
-  return;
-}
-
-UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 AttributeID, const UA_Node *src, UA_Variant *dst) {
-  UA_Boolean samplingError = UA_TRUE; 
-  UA_DataValue sourceDataValue;
-  const UA_VariableNode *srcAsVariableNode = (const UA_VariableNode *) src;
-  
-  // FIXME: Not all AttributeIDs can be monitored yet
-  switch(AttributeID) {
-    case UA_ATTRIBUTEID_NODEID:
-      UA_Variant_setScalarCopy(dst, (const UA_NodeId *) &(src->nodeId), &UA_TYPES[UA_TYPES_NODEID]);
-      samplingError = UA_FALSE;
-      break;
-    case UA_ATTRIBUTEID_NODECLASS:
-      UA_Variant_setScalarCopy(dst, (const UA_Int32 *) &(src->nodeClass), &UA_TYPES[UA_TYPES_INT32]);
-      samplingError = UA_FALSE;
-      break;
-    case UA_ATTRIBUTEID_BROWSENAME:
-      UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->browseName), &UA_TYPES[UA_TYPES_QUALIFIEDNAME]);
-      samplingError = UA_FALSE;
-      break;
-    case UA_ATTRIBUTEID_DISPLAYNAME:
-      UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->displayName), &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
-      samplingError = UA_FALSE;
-      break;
-    case UA_ATTRIBUTEID_DESCRIPTION:
-      UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->displayName), &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
-      samplingError = UA_FALSE;
-      break;
-    case UA_ATTRIBUTEID_WRITEMASK:
-      UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->writeMask), &UA_TYPES[UA_TYPES_UINT32]);
-      samplingError = UA_FALSE;
-      break;
-    case UA_ATTRIBUTEID_USERWRITEMASK:
-      UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->writeMask), &UA_TYPES[UA_TYPES_UINT32]);
-      samplingError = UA_FALSE;
-      break;
-    case UA_ATTRIBUTEID_ISABSTRACT:
-      break;
-    case UA_ATTRIBUTEID_SYMMETRIC:
-      break;
-    case UA_ATTRIBUTEID_INVERSENAME:
-      break;
-    case UA_ATTRIBUTEID_CONTAINSNOLOOPS:
-      break;
-    case UA_ATTRIBUTEID_EVENTNOTIFIER:
-      break;
-    case UA_ATTRIBUTEID_VALUE: 
-      if (src->nodeClass == UA_NODECLASS_VARIABLE) {
-        if ( srcAsVariableNode->valueSource == UA_VALUESOURCE_VARIANT) {
-          UA_Variant_copy( (const UA_Variant *) &((const UA_VariableNode *) src)->value, dst);
-          samplingError = UA_FALSE;
-        }
-        else if (srcAsVariableNode->valueSource == UA_VALUESOURCE_DATASOURCE) {
-            // todo: handle numeric ranges
-            if (srcAsVariableNode->value.dataSource.read(((const UA_VariableNode *) src)->value.dataSource.handle, (UA_Boolean) UA_TRUE, UA_NULL, &sourceDataValue) == UA_STATUSCODE_GOOD) {
-            UA_Variant_copy( (const UA_Variant *) &(sourceDataValue.value), dst);
-            samplingError = UA_FALSE;
-          }
-        }
-      }
-      break;
-    case UA_ATTRIBUTEID_DATATYPE:
-      break;
-    case UA_ATTRIBUTEID_VALUERANK:
-      break;
-    case UA_ATTRIBUTEID_ARRAYDIMENSIONS:
-      break;
-    case UA_ATTRIBUTEID_ACCESSLEVEL:
-      break;
-    case UA_ATTRIBUTEID_USERACCESSLEVEL:
-      break;
-    case UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
-      break;
-    case UA_ATTRIBUTEID_HISTORIZING:
-      break;
-    case UA_ATTRIBUTEID_EXECUTABLE:
-      break;
-    case UA_ATTRIBUTEID_USEREXECUTABLE:
-      break;
-    default:
-      break;
-  }
-  
-  return samplingError;
-}
-
-void MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem) {
-  MonitoredItem_queuedValue *newvalue = NULL, *queueItem = NULL;
-  UA_Boolean samplingError = UA_TRUE; 
-  UA_ByteString newValueAsByteString = { .length=0, .data=NULL };
-  size_t encodingOffset = 0;
-  
-  if (monitoredItem == NULL) return;
-  if( (monitoredItem->LastSampled + monitoredItem->SamplingInterval) > UA_DateTime_now()) {
-    return;
-  };
-  
-  // FIXME: Actively suppress non change value based monitoring. There should be another function to handle status and events.
-  if (monitoredItem->MonitoredItemType != MONITOREDITEM_CHANGENOTIFY_T) {
-    return;
-  }
-  
-  newvalue = (MonitoredItem_queuedValue *) UA_malloc(sizeof(MonitoredItem_queuedValue));
-  LIST_INITENTRY(newvalue,listEntry);
-  newvalue->value.arrayLength         = 0;
-  newvalue->value.arrayDimensionsSize = 0;
-  newvalue->value.arrayDimensions     = NULL;
-  newvalue->value.data                = NULL;
-  newvalue->value.type                = NULL;
-  
-  samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->AttributeID, monitoredItem->monitoredNode, &(newvalue->value));
-  
-  if ((monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
-    if (newvalue->value.type != NULL && monitoredItem->DiscardOldest == UA_TRUE && monitoredItem->queue->lh_first != NULL ) {
-          for(queueItem = monitoredItem->queue->lh_first; queueItem->listEntry.le_next != NULL; queueItem = queueItem->listEntry.le_next) {}
-
-          LIST_REMOVE(queueItem, listEntry);
-          UA_free(queueItem);
-          (monitoredItem->QueueSize).currentValue--;
-      }
-      else {
-          // We cannot remove the oldest value and theres no queue space left. We're done here.
-          UA_free(newvalue);
-          return;
-      }
-  }
-  
-  // Only add a value if we have sampled it correctly and it fits into the queue;
-  if ( samplingError != UA_FALSE || newvalue->value.type == NULL || (monitoredItem->QueueSize).currentValue >= (monitoredItem->QueueSize).maxValue) {
-    UA_free(newvalue);
-    return;
-  }
-  
-  newValueAsByteString.length = UA_calcSizeBinary((const void *) &(newvalue->value), &UA_TYPES[UA_TYPES_VARIANT]);
-  newValueAsByteString.data   = UA_malloc(newValueAsByteString.length);
-  UA_encodeBinary((const void *) &(newvalue->value), &UA_TYPES[UA_TYPES_VARIANT], &(newValueAsByteString), &encodingOffset );
-  
-  if(monitoredItem->LastSampledValue.data == NULL) { 
-    UA_ByteString_copy((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue));
-    LIST_INSERT_HEAD(monitoredItem->queue, newvalue, listEntry);
-    (monitoredItem->QueueSize).currentValue++;
-    monitoredItem->LastSampled = UA_DateTime_now();
-  }
-  else {
-    if (UA_String_equal((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue)) == UA_TRUE) {
-      UA_free(newValueAsByteString.data);
-      return;
-    }
-    
-    UA_ByteString_copy((UA_String *) &newValueAsByteString, (UA_String *) &(monitoredItem->LastSampledValue));
-    LIST_INSERT_HEAD(monitoredItem->queue, newvalue, listEntry);
-    (monitoredItem->QueueSize).currentValue++;
-    monitoredItem->LastSampled = UA_DateTime_now();
-  }
-  return;
-}
-
-UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
-  UA_UInt32 *seqArray;
-  int i;
-  UA_unpublishedNotification *not;
-  
-  if (sub == NULL) return NULL;
-  
-  seqArray = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32) * Subscription_queuedNotifications(sub));
-  if (seqArray == NULL ) return NULL;
-  
-  i = 0;
-  for(not = sub->unpublishedNotifications->lh_first; not != NULL; not=(not->listEntry).le_next) {
-    seqArray[i] = not->notification->sequenceNumber;
-    i++;
-  }
-  
-  return seqArray;
-  
-}
-
-void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub) {
-    UA_NotificationMessage *latest;
-    
-    if (dst == NULL) return;
-    
-    if (Subscription_queuedNotifications(sub) == 0) {
-      dst->notificationDataSize = 0;
-      dst->publishTime = UA_DateTime_now();
-      dst->sequenceNumber = 0;
-      return;
-    }
-    
-    latest = sub->unpublishedNotifications->lh_first->notification;
-    dst->notificationDataSize = latest->notificationDataSize;
-    dst->publishTime = latest->publishTime;
-    dst->sequenceNumber = latest->sequenceNumber;
-    
-    if (latest->notificationDataSize == 0) return;
-    
-    dst->notificationData = (UA_ExtensionObject *) UA_malloc(sizeof(UA_ExtensionObject));
-    dst->notificationData->encoding = latest->notificationData->encoding;
-    dst->notificationData->typeId   = latest->notificationData->typeId;
-    dst->notificationData->body.length = latest->notificationData->body.length;
-    dst->notificationData->body.data   = UA_malloc(latest->notificationData->body.length);
-    UA_ByteString_copy((UA_String *) &(latest->notificationData->body), (UA_String *) &(dst->notificationData->body));
-    
-    return;
-}
-
-UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub) {
-  UA_unpublishedNotification *not;
-  UA_UInt32 deletedItems = 0;
-  
-  for(not=sub->unpublishedNotifications->lh_first; not != NULL; not=not->listEntry.le_next) {
-    if (not->notification->sequenceNumber == seqNo) { 
-      LIST_REMOVE(not, listEntry);
-      if (not->notification != NULL) {
-	if (not->notification->notificationData != NULL) {
-	  if (not->notification->notificationData->body.data != NULL) {
-	    UA_free(not->notification->notificationData->body.data);
-	  }
-	  UA_free(not->notification->notificationData);
-	}
-	UA_free(not->notification);
-      }
-
-      UA_free(not);
-      deletedItems++;
-    }
-  }
-
-  return deletedItems;
-}
 #endif //#ifdef ENABLE_SUBSCRIPTIONS

+ 9 - 92
src/server/ua_subscription_manager.h

@@ -6,78 +6,7 @@
 #include "ua_types.h"
 #include "queue.h"
 #include "ua_nodestore.h"
-
-#define LIST_INITENTRY(item,entry) \
-  (item)->entry.le_next = NULL; \
-  (item)->entry.le_prev = NULL;
-
-#define INITPOINTER(src) (src) = NULL;
-
-#define ISNOTZERO(value) ((value == 0) ? 0 : 1)
-
-typedef struct {
-    UA_Int32 currentValue;
-    UA_Int32 minValue;
-    UA_Int32 maxValue;
-} UA_Int32_BoundedValue;
-
-typedef struct {
-    UA_UInt32 currentValue;
-    UA_UInt32 minValue;
-    UA_UInt32 maxValue;
-} UA_UInt32_BoundedValue;
-
-typedef enum MONITOREDITEM_TYPE_ENUM {
-    MONITOREDITEM_CHANGENOTIFY_T = 1,
-    MONITOREDITEM_STATUSNOTIFY_T = 2,
-    MONITOREDITEM_EVENTNOTIFY_T = 4 } MONITOREDITEM_TYPE;
-
-typedef struct MonitoredItem_queuedValue_s {
-    UA_Variant value;
-    LIST_ENTRY(MonitoredItem_queuedValue_s) listEntry;
-} MonitoredItem_queuedValue;
-
-typedef LIST_HEAD(UA_ListOfQueuedDataValues_s, MonitoredItem_queuedValue_s) UA_ListOfQueuedDataValues;
-typedef struct UA_MonitoredItem_s {
-    UA_UInt32                       ItemId;
-    MONITOREDITEM_TYPE		    MonitoredItemType;
-    UA_UInt32                       TimestampsToReturn;
-    UA_UInt32                       MonitoringMode;
-    const UA_Node                   *monitoredNode; // Pointer to a node of any type
-    UA_UInt32                       AttributeID;
-    UA_UInt32                       ClientHandle;
-    UA_UInt32                       SamplingInterval;
-    UA_UInt32_BoundedValue          QueueSize;
-    UA_Boolean                      DiscardOldest;
-    UA_DateTime                     LastSampled;
-    UA_ByteString                   LastSampledValue;
-    // FIXME: indexRange is ignored; array values default to element 0
-    // FIXME: dataEncoding is hardcoded to UA binary
-    LIST_ENTRY(UA_MonitoredItem_s)  listEntry;
-    UA_ListOfQueuedDataValues       *queue;
-} UA_MonitoredItem;
-
-typedef struct UA_unpublishedNotification_s {
-    UA_NotificationMessage 		     *notification;
-    LIST_ENTRY(UA_unpublishedNotification_s) listEntry;
-} UA_unpublishedNotification;
-
-typedef LIST_HEAD(UA_ListOfUAMonitoredItems_s, UA_MonitoredItem_s) UA_ListOfUAMonitoredItems;
-typedef LIST_HEAD(UA_ListOfUnpublishedNotifications_s, UA_unpublishedNotification_s) UA_ListOfUnpublishedNotifications;
-typedef struct UA_Subscription_s {
-    UA_UInt32_BoundedValue              LifeTime;
-    UA_Int32_BoundedValue               KeepAliveCount;
-    UA_DateTime                         PublishingInterval;
-    UA_DateTime                         LastPublished;
-    UA_Int32                            SubscriptionID;
-    UA_Int32                            NotificationsPerPublish;
-    UA_Boolean                          PublishingMode;
-    UA_UInt32                           Priority;
-    UA_UInt32                           SequenceNumber;
-    LIST_ENTRY(UA_Subscription_s)       listEntry;
-    UA_ListOfUnpublishedNotifications   *unpublishedNotifications;
-    UA_ListOfUAMonitoredItems           *MonitoredItems;
-} UA_Subscription;
+#include "ua_subscription.h"
 
 typedef LIST_HEAD(UA_ListOfUASubscriptions_s, UA_Subscription_s) UA_ListOfUASubscriptions;
 typedef struct UA_SubscriptionManager_s {
@@ -91,26 +20,14 @@ typedef struct UA_SubscriptionManager_s {
     UA_ListOfUASubscriptions *ServerSubscriptions;
 } UA_SubscriptionManager;
 
-void            SubscriptionManager_init(UA_Session *session);
-void            SubscriptionManager_deleteMembers(UA_Session *session);
-void            SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *subscription);
-UA_Subscription *SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID);
-UA_Int32        SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID);
-UA_Int32        SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID, UA_UInt32 MonitoredItemID);
-
-UA_Subscription 	*UA_Subscription_new(UA_Int32 SubscriptionID);
-void            	Subscription_updateNotifications(UA_Subscription *subscription);
-UA_UInt32       	Subscription_queuedNotifications(UA_Subscription *subscription);
-UA_UInt32 		*Subscription_getAvailableSequenceNumbers(UA_Subscription *sub);
-void 			Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub);
-UA_UInt32		Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub);
-void                    Subscription_generateKeepAlive(UA_Subscription *subscription);
+void SubscriptionManager_init(UA_Session *session);
+void SubscriptionManager_deleteMembers(UA_Session *session);
+void SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *subscription);
+UA_Subscription *SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager,
+                                                         UA_Int32 SubscriptionID);
+UA_Int32 SubscriptionManager_deleteSubscription(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID);
+UA_Int32 SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID,
+                                                 UA_UInt32 MonitoredItemID);
 
-UA_MonitoredItem *UA_MonitoredItem_new(void);
-void             MonitoredItem_delete(UA_MonitoredItem *monitoredItem);
-void             MonitoredItem_QueuePushDataValue(UA_MonitoredItem *monitoredItem);
-void             MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem);
-UA_Boolean       MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 AttributeID, const UA_Node *src, UA_Variant *dst);
-int              MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst, UA_MonitoredItem *monitoredItem);
 #endif  // ifndef... define UA_SUBSCRIPTION_MANAGER_H_
 #endif  // ifdef EnableSubscriptions ...