Browse Source

Refactor subscriptions to use different sampling and response intervals for subscriptions and monitoreditems

Julius Pfrommer 9 years ago
parent
commit
2af51b18da

+ 4 - 4
CMakeLists.txt

@@ -63,11 +63,11 @@ if(CMAKE_COMPILER_IS_GNUCC OR "x${CMAKE_C_COMPILER_ID}" STREQUAL "xClang")
 
   # Debug
   if(CMAKE_BUILD_TYPE STREQUAL "Debug")
-	#add_definitions(-fsanitize=address)
-    #list(APPEND open62541_LIBRARIES asan)
+	# add_definitions(-fsanitize=address)
+    # list(APPEND open62541_LIBRARIES asan)
 
-	#add_definitions(-fsanitize=undefined)
-    #list(APPEND open62541_LIBRARIES ubsan)
+	# add_definitions(-fsanitize=undefined)
+    # list(APPEND open62541_LIBRARIES ubsan)
 
   elseif(CMAKE_BUILD_TYPE STREQUAL "MinSizeRel" OR
          CMAKE_BUILD_TYPE STREQUAL "Release")

+ 2 - 2
examples/client.c

@@ -107,14 +107,14 @@ int main(int argc, char *argv[]) {
     
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     // Create a subscription with interval 0 (immediate)...
-    UA_UInt32 subId=0;
+    UA_UInt32 subId = 0;
     UA_Client_Subscriptions_new(client, UA_SubscriptionSettings_standard, &subId);
     if(subId)
         printf("Create subscription succeeded, id %u\n", subId);
     
     // .. and monitor TheAnswer
     UA_NodeId monitorThis = UA_NODEID_STRING(1, "the.answer");
-    UA_UInt32 monId=0;
+    UA_UInt32 monId = 0;
     UA_Client_Subscriptions_addMonitoredItem(client, subId, monitorThis,
                                              UA_ATTRIBUTEID_VALUE, &handler_TheAnswerChanged, NULL, &monId);
     if (monId)

+ 1 - 1
plugins/ua_config_standard.c

@@ -44,7 +44,7 @@ const UA_ServerConfig UA_ServerConfig_standard = {
     .usernamePasswordLogins = usernamePasswords,
     .usernamePasswordLoginsSize = 2,
     
-    .publishingIntervalLimits = { .max = 10000, .min = 0, .current = 0 },
+    .publishingIntervalLimits = { .max = 10000, .min = 100, .current = 0 },
     .lifeTimeCountLimits = { .max = 15000, .min = 0, .current = 0 },
     .keepAliveCountLimits = { .max = 100, .min = 0, .current = 0 },
     .notificationsPerPublishLimits = { .max = 1000, .min = 1, .current = 0 },

+ 79 - 94
src/client/ua_client_highlevel_subscriptions.c

@@ -4,7 +4,7 @@
 #include "ua_types_generated_encoding_binary.h"
 
 const UA_SubscriptionSettings UA_SubscriptionSettings_standard = {
-    .requestedPublishingInterval = 0.0,
+    .requestedPublishingInterval = 1000.0,
     .requestedLifetimeCount = 100,
     .requestedMaxKeepAliveCount = 10,
     .maxNotificationsPerPublish = 10,
@@ -101,6 +101,7 @@ UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscripti
     if(!sub)
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
     
+    /* Send the request */
     UA_CreateMonitoredItemsRequest request;
     UA_CreateMonitoredItemsRequest_init(&request);
     request.subscriptionId = subscriptionId;
@@ -115,35 +116,39 @@ UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscripti
     item.requestedParameters.queueSize = 1;
     request.itemsToCreate = &item;
     request.itemsToCreateSize = 1;
-    // Filter can be left void for now, only changes are supported (UA_Expert does the same with changeItems)
-    
     UA_CreateMonitoredItemsResponse response = UA_Client_Service_createMonitoredItems(client, request);
     
-    UA_StatusCode retval;
     // slight misuse of retval here to check if the deletion was successfull.
+    UA_StatusCode retval;
     if(response.resultsSize == 0)
         retval = response.responseHeader.serviceResult;
     else
         retval = response.results[0].statusCode;
-    
-    if(retval == UA_STATUSCODE_GOOD) {
-        UA_Client_MonitoredItem *newMon = UA_malloc(sizeof(UA_Client_MonitoredItem));
-        newMon->MonitoringMode = UA_MONITORINGMODE_REPORTING;
-        UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId); 
-        newMon->AttributeID = attributeID;
-        newMon->ClientHandle = client->monitoredItemHandles;
-        newMon->SamplingInterval = sub->PublishingInterval;
-        newMon->QueueSize = 1;
-        newMon->DiscardOldest = true;
-        newMon->handler = handlingFunction;
-        newMon->handlerContext = handlingContext;
-        newMon->MonitoredItemId = response.results[0].monitoredItemId;
-        LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
-        *newMonitoredItemId = newMon->MonitoredItemId;
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_CreateMonitoredItemsResponse_deleteMembers(&response);
+        return retval;
     }
+
+    /* Create the handler */
+    UA_Client_MonitoredItem *newMon = UA_malloc(sizeof(UA_Client_MonitoredItem));
+    newMon->MonitoringMode = UA_MONITORINGMODE_REPORTING;
+    UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId); 
+    newMon->AttributeID = attributeID;
+    newMon->ClientHandle = client->monitoredItemHandles;
+    newMon->SamplingInterval = sub->PublishingInterval;
+    newMon->QueueSize = 1;
+    newMon->DiscardOldest = true;
+    newMon->handler = handlingFunction;
+    newMon->handlerContext = handlingContext;
+    newMon->MonitoredItemId = response.results[0].monitoredItemId;
+    LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
+    *newMonitoredItemId = newMon->MonitoredItemId;
+
+    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                 "Created a monitored item with client handle %u", client->monitoredItemHandles);
     
     UA_CreateMonitoredItemsResponse_deleteMembers(&response);
-    return retval;
+    return UA_STATUSCODE_GOOD;
 }
 
 UA_StatusCode
@@ -191,95 +196,77 @@ UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscri
     return retval;
 }
 
-static UA_Boolean
-UA_Client_processPublishRx(UA_Client *client, UA_PublishResponse response) {
-    if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
-        return false;
-    
-    // Check if the server has acknowledged any of our ACKS
-    // Note that a list of serverside status codes may be send without valid publish data, i.e. 
-    // during keepalives or no data availability
+static void
+UA_Client_processPublishResponse(UA_Client *client, UA_PublishResponse *response) {
+    if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
+        return;
+
+    /* Find the subscription */
+    UA_Client_Subscription *sub;
+    LIST_FOREACH(sub, &client->subscriptions, listEntry) {
+        if(sub->SubscriptionID == response->subscriptionId)
+            break;
+    }
+    if(!sub)
+        return;
+
+    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                 "Processing a publish response on subscription %u with %u notifications",
+                 sub->SubscriptionID, response->notificationMessage.notificationDataSize);
+
+    /* Check if the server has acknowledged any of our ACKS */
+    // TODO: The acks should be attached to the subscription
     UA_Client_NotificationsAckNumber *ack, *tmpAck;
     size_t i = 0;
     LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, tmpAck) {
-        if(response.results[i] == UA_STATUSCODE_GOOD ||
-           response.results[i] == UA_STATUSCODE_BADSEQUENCENUMBERINVALID) {
+        if(response->results[i] == UA_STATUSCODE_GOOD ||
+           response->results[i] == UA_STATUSCODE_BADSEQUENCENUMBERINVALID) {
             LIST_REMOVE(ack, listEntry);
             UA_free(ack);
         }
         i++;
     }
     
-    if(response.subscriptionId == 0)
-        return false;
-    
-    UA_Client_Subscription *sub;
-    LIST_FOREACH(sub, &client->subscriptions, listEntry) {
-        if(sub->SubscriptionID == response.subscriptionId)
-            break;
-    }
-    if(!sub)
-        return false;
-    
-    UA_NotificationMessage msg = response.notificationMessage;
-    UA_Client_MonitoredItem *mon;
-    for(size_t k = 0; k < msg.notificationDataSize; k++) {
-        if(msg.notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
+    /* Process the notification messages */
+    UA_NotificationMessage *msg = &response->notificationMessage;
+    for(size_t k = 0; k < msg->notificationDataSize; k++) {
+        if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
             continue;
         
-        if(msg.notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
-            // This is a dataChangeNotification
-            UA_DataChangeNotification *dataChangeNotification = msg.notificationData[k].content.decoded.data;
-            for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; j++) {
+        /* Currently only dataChangeNotifications are supported */
+        if(msg->notificationData[k].content.decoded.type != &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION])
+            continue;
+        
+        UA_DataChangeNotification *dataChangeNotification = msg->notificationData[k].content.decoded.data;
+        for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; j++) {
             UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
-                // find this client handle
-                LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
-                    if(mon->ClientHandle == mitemNot->clientHandle) {
-                        mon->handler(mon->MonitoredItemId, &mitemNot->value, mon->handlerContext);
-                        break;
-                    }
+            UA_Client_MonitoredItem *mon;
+            LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+                if(mon->ClientHandle == mitemNot->clientHandle) {
+                    mon->handler(mon->MonitoredItemId, &mitemNot->value, mon->handlerContext);
+                    break;
                 }
             }
-            continue;
+            if(!mon)
+                UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                             "Could not process a notification with clienthandle %u on subscription %u",
+                             mitemNot->clientHandle, sub->SubscriptionID);
         }
-
-        /* if(msg.notificationData[k].typeId.namespaceIndex == 0 && */
-        /*    msg.notificationData[k].typeId.identifier.numeric == 820 ) { */
-        /*     //FIXME: This is a statusChangeNotification (not supported yet) */
-        /*     continue; */
-        /* } */
-
-        /* if(msg.notificationData[k].typeId.namespaceIndex == 0 && */
-        /*    msg.notificationData[k].typeId.identifier.numeric == 916 ) { */
-        /*     //FIXME: This is an EventNotification */
-        /*     continue; */
-        /* } */
-    }
-    
-    /* We processed this message, add it to the list of pending acks (but make
-       sure it's not in the list first) */
-    LIST_FOREACH(tmpAck, &client->pendingNotificationsAcks, listEntry) {
-        if(tmpAck->subAck.sequenceNumber == msg.sequenceNumber &&
-            tmpAck->subAck.subscriptionId == response.subscriptionId)
-            break;
-    }
-
-    if(!tmpAck) {
-        tmpAck = UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
-        tmpAck->subAck.sequenceNumber = msg.sequenceNumber;
-        tmpAck->subAck.subscriptionId = sub->SubscriptionID;
-        LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
     }
     
-    return response.moreNotifications;
+    /* Add to the list of pending acks */
+    tmpAck = UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
+    tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
+    tmpAck->subAck.subscriptionId = sub->SubscriptionID;
+    LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
 }
 
 UA_StatusCode UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
-    if (client->state == UA_CLIENTSTATE_ERRORED){
+    if (client->state == UA_CLIENTSTATE_ERRORED)
         return UA_STATUSCODE_BADSERVERNOTCONNECTED;
-    }
+
     UA_Boolean moreNotifications = true;
-    do {
+    while(moreNotifications == true) {
         UA_PublishRequest request;
         UA_PublishRequest_init(&request);
         request.subscriptionAcknowledgementsSize = 0;
@@ -288,8 +275,8 @@ UA_StatusCode UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *clie
         LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
             request.subscriptionAcknowledgementsSize++;
         if(request.subscriptionAcknowledgementsSize > 0) {
-            request.subscriptionAcknowledgements = UA_malloc(sizeof(UA_SubscriptionAcknowledgement) *
-                                                             request.subscriptionAcknowledgementsSize);
+            request.subscriptionAcknowledgements =
+                UA_malloc(sizeof(UA_SubscriptionAcknowledgement) * request.subscriptionAcknowledgementsSize);
             if(!request.subscriptionAcknowledgements)
                 return UA_STATUSCODE_GOOD;
         }
@@ -302,13 +289,11 @@ UA_StatusCode UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *clie
         }
         
         UA_PublishResponse response = UA_Client_Service_publish(client, request);
-        if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
-            moreNotifications = UA_Client_processPublishRx(client, response);
-        else
-            moreNotifications = false;
+        UA_Client_processPublishResponse(client, &response);
+        moreNotifications = response.moreNotifications;
         
         UA_PublishResponse_deleteMembers(&response);
         UA_PublishRequest_deleteMembers(&request);
-    } while(moreNotifications == true);
+    }
     return UA_STATUSCODE_GOOD;
 }

+ 3 - 1
src/server/ua_services.h

@@ -274,7 +274,9 @@ void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
                                   const UA_DeleteMonitoredItemsRequest *request,
                                   UA_DeleteMonitoredItemsResponse *response);
 
-/* Not Implemented: Service_ModifyMonitoredItems */
+void Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,
+                                  const UA_ModifyMonitoredItemsRequest *request,
+                                  UA_ModifyMonitoredItemsResponse *response);
 /* Not Implemented: Service_SetMonitoringMode */
 /* Not Implemented: Service_SetTriggering */
 

+ 174 - 204
src/server/ua_services_subscription.c

@@ -8,50 +8,81 @@
     else DST = SRC; \
     }
 
+static void
+setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
+                        UA_Double requestedPublishingInterval,
+                        UA_UInt32 requestedLifetimeCount,
+                        UA_UInt32 requestedMaxKeepAliveCount,
+                        UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) {
+    Subscription_unregisterPublishJob(server, subscription);
+    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
+                               requestedPublishingInterval, subscription->publishingInterval);
+    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
+                               requestedLifetimeCount, subscription->lifeTime);
+    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
+                               requestedMaxKeepAliveCount, subscription->maxKeepAliveCount);
+    subscription->notificationsPerPublish = maxNotificationsPerPublish;
+    subscription->priority = priority;
+    Subscription_registerPublishJob(server, subscription);
+}
+
 void Service_CreateSubscription(UA_Server *server, UA_Session *session,
                                 const UA_CreateSubscriptionRequest *request,
                                 UA_CreateSubscriptionResponse *response) {
     response->subscriptionId = UA_Session_getUniqueSubscriptionID(session);
-    UA_Subscription *newSubscription = UA_Subscription_new(response->subscriptionId);
+    UA_Subscription *newSubscription = UA_Subscription_new(session, response->subscriptionId);
     if(!newSubscription) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
         return;
     }
-    
-    /* set the publishing interval */
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
-                               request->requestedPublishingInterval, response->revisedPublishingInterval);
-    newSubscription->publishingInterval = response->revisedPublishingInterval;
-    
-    /* set the subscription lifetime (deleted when no publish requests arrive within this time) */
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
-                               request->requestedLifetimeCount, response->revisedLifetimeCount);
-    newSubscription->lifeTime = (UA_BoundedUInt32)  {
-        .min = server->config.lifeTimeCountLimits.min,
-        .max = server->config.lifeTimeCountLimits.max,
-        .current=response->revisedLifetimeCount};
-    
-    /* set the keepalive count. the server sends an empty notification when
-       nothin has happened for n publishing intervals */
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
-                               request->requestedMaxKeepAliveCount, response->revisedMaxKeepAliveCount);
-    newSubscription->keepAliveCount = (UA_BoundedUInt32)  {
-        .min = server->config.keepAliveCountLimits.min,
-        .max = server->config.keepAliveCountLimits.max,
-        .current = response->revisedMaxKeepAliveCount};
-    
-    newSubscription->notificationsPerPublish = request->maxNotificationsPerPublish;
-    newSubscription->publishingMode          = request->publishingEnabled;
-    newSubscription->priority                = request->priority;
-    
-    /* add the update job */
-    Subscription_registerUpdateJob(server, newSubscription);
+
     UA_Session_addSubscription(session, newSubscription);    
+    newSubscription->publishingMode = request->publishingEnabled;
+    setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval,
+                            request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
+                            request->maxNotificationsPerPublish, request->priority);
+    response->revisedPublishingInterval = newSubscription->publishingInterval;
+    response->revisedLifetimeCount = newSubscription->lifeTime;
+    response->revisedMaxKeepAliveCount = newSubscription->maxKeepAliveCount;
+}
+
+void Service_ModifySubscription(UA_Server *server, UA_Session *session,
+                                const UA_ModifySubscriptionRequest *request,
+                                UA_ModifySubscriptionResponse *response) {
+    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
+    if(!sub) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+        return;
+    }
+
+    setSubscriptionSettings(server, sub, request->requestedPublishingInterval,
+                            request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
+                            request->maxNotificationsPerPublish, request->priority);
+    response->revisedPublishingInterval = sub->publishingInterval;
+    response->revisedLifetimeCount = sub->lifeTime;
+    response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount;
+    return;
 }
 
 static void
-createMonitoredItems(UA_Server *server, UA_Session *session, UA_Subscription *sub,
-                     const UA_MonitoredItemCreateRequest *request, UA_MonitoredItemCreateResult *result) {
+setMonitoredItemSettings(UA_Server *server, UA_MonitoredItem *mon,
+                         UA_MonitoringMode monitoringMode, UA_UInt32 clientHandle,
+                         UA_Double samplingInterval, UA_UInt32 queueSize, UA_Boolean discardOldest) {
+    MonitoredItem_unregisterSampleJob(server, mon);
+    mon->monitoringMode = monitoringMode;
+    mon->clientHandle = clientHandle;
+    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.samplingIntervalLimits,
+                               samplingInterval, mon->samplingInterval);
+    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.queueSizeLimits,
+                               queueSize, mon->maxQueueSize);
+    mon->discardOldest = discardOldest;
+    MonitoredItem_registerSampleJob(server, mon);
+}
+
+static void
+Service_CreateMonitoredItems_single(UA_Server *server, UA_Session *session, UA_Subscription *sub,
+                                    const UA_MonitoredItemCreateRequest *request,
+                                    UA_MonitoredItemCreateResult *result) {
     const UA_Node *target = UA_NodeStore_get(server->nodestore, &request->itemToMonitor.nodeId);
     if(!target) {
         result->statusCode = UA_STATUSCODE_BADNODEIDINVALID;
@@ -66,41 +97,30 @@ createMonitoredItems(UA_Server *server, UA_Session *session, UA_Subscription *su
 
     UA_StatusCode retval = UA_NodeId_copy(&target->nodeId, &newMon->monitoredNodeId);
     if(retval != UA_STATUSCODE_GOOD) {
-        result->statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
-        MonitoredItem_delete(newMon);
+        result->statusCode = retval;
+        MonitoredItem_delete(server, newMon);
         return;
     }
 
+    newMon->subscription = sub;
+    newMon->attributeID = request->itemToMonitor.attributeId;
     newMon->itemId = UA_Session_getUniqueSubscriptionID(session);
-    result->monitoredItemId = newMon->itemId;
-    newMon->clientHandle = request->requestedParameters.clientHandle;
-
-    /* set the sampling interval */
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.samplingIntervalLimits,
-                               request->requestedParameters.samplingInterval,
-                               result->revisedSamplingInterval);
-    newMon->samplingInterval = (UA_UInt32)result->revisedSamplingInterval;
+    setMonitoredItemSettings(server, newMon, MONITOREDITEM_TYPE_CHANGENOTIFY,
+                             request->requestedParameters.clientHandle,
+                             request->requestedParameters.samplingInterval,
+                             request->requestedParameters.queueSize,
+                             request->requestedParameters.discardOldest);
 
-    /* set the queue size */
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.queueSizeLimits,
-                               request->requestedParameters.queueSize,
-                               result->revisedQueueSize);
-    newMon->queueSize = (UA_BoundedUInt32) {
-        .max=(result->revisedQueueSize) + 1,
-        .min=0, .current=0 };
-
-    newMon->attributeID = request->itemToMonitor.attributeId;
-    newMon->monitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
-    newMon->discardOldest = request->requestedParameters.discardOldest;
+    result->revisedSamplingInterval = newMon->samplingInterval;
+    result->revisedQueueSize = newMon->maxQueueSize;
+    result->monitoredItemId = newMon->itemId;
     LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
-
-    // todo: add a job that samples the value (for fixed intervals)
-    // todo: add a pointer to the monitoreditem to the variable, so that events get propagated
 }
 
-void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
-                                  const UA_CreateMonitoredItemsRequest *request,
-                                  UA_CreateMonitoredItemsResponse *response) {
+void
+Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
+                             const UA_CreateMonitoredItemsRequest *request,
+                             UA_CreateMonitoredItemsResponse *response) {
     UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
     if(!sub) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
@@ -112,7 +132,8 @@ void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
         return;
     }
 
-    response->results = UA_Array_new(request->itemsToCreateSize, &UA_TYPES[UA_TYPES_MONITOREDITEMCREATERESULT]);
+    response->results = UA_Array_new(request->itemsToCreateSize,
+                                     &UA_TYPES[UA_TYPES_MONITOREDITEMCREATERESULT]);
     if(!response->results) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
         return;
@@ -120,136 +141,102 @@ void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
     response->resultsSize = request->itemsToCreateSize;
 
     for(size_t i = 0; i < request->itemsToCreateSize; i++)
-        createMonitoredItems(server, session, sub, &request->itemsToCreate[i], &response->results[i]);
+        Service_CreateMonitoredItems_single(server, session, sub,
+                                            &request->itemsToCreate[i],
+                                            &response->results[i]);
+}
+
+static void
+Service_ModifyMonitoredItems_single(UA_Server *server, UA_Session *session, UA_Subscription *sub,
+                                    const UA_MonitoredItemModifyRequest *request,
+                                    UA_MonitoredItemModifyResult *result) {
+    UA_MonitoredItem *mon = UA_Subscription_getMonitoredItem(sub, request->monitoredItemId);
+    if(!mon) {
+        result->statusCode = UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
+        return;
+    }
+    setMonitoredItemSettings(server, mon, MONITOREDITEM_TYPE_CHANGENOTIFY,
+                             request->requestedParameters.clientHandle,
+                             request->requestedParameters.samplingInterval,
+                             request->requestedParameters.queueSize,
+                             request->requestedParameters.discardOldest);
+    result->revisedSamplingInterval = mon->samplingInterval;
+    result->revisedQueueSize = mon->maxQueueSize;
+}
+
+void Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,
+                                  const UA_ModifyMonitoredItemsRequest *request,
+                                  UA_ModifyMonitoredItemsResponse *response) {
+    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
+    if(!sub) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+        return;
+    }
+    
+    if(request->itemsToModifySize <= 0) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
+        return;
+    }
+
+    response->results = UA_Array_new(request->itemsToModifySize,
+                                     &UA_TYPES[UA_TYPES_MONITOREDITEMMODIFYRESULT]);
+    if(!response->results) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
+        return;
+    }
+    response->resultsSize = request->itemsToModifySize;
+
+    for(size_t i = 0; i < request->itemsToModifySize; i++)
+        Service_ModifyMonitoredItems_single(server, session, sub,
+                                            &request->itemsToModify[i],
+                                            &response->results[i]);
+
 }
 
 void
 Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest *request,
                 UA_UInt32 requestId) {
-    UA_PublishResponse response;
-    UA_PublishResponse_init(&response);
-    response.responseHeader.requestHandle = request->requestHeader.requestHandle;
-    response.responseHeader.timestamp = UA_DateTime_now();
-    
-    // Delete Acknowledged Subscription Messages
-    response.resultsSize = request->subscriptionAcknowledgementsSize;
-    response.results = UA_calloc(response.resultsSize, sizeof(UA_StatusCode));
+    // todo error handling for malloc
+    UA_PublishResponseEntry *entry = UA_malloc(sizeof(UA_PublishResponseEntry));
+    entry->requestId = requestId;
+    UA_PublishResponse *response = &entry->response;
+    UA_PublishResponse_init(response);
+    response->responseHeader.requestHandle = request->requestHeader.requestHandle;
+
+    /* Delete Acknowledged Subscription Messages */
+    response->results = UA_malloc(request->subscriptionAcknowledgementsSize * sizeof(UA_StatusCode));
+    if(!response->results) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
+        return;
+    }
+    response->resultsSize = request->subscriptionAcknowledgementsSize;
     for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; i++) {
-        response.results[i] = UA_STATUSCODE_GOOD;
-        UA_UInt32 sid = request->subscriptionAcknowledgements[i].subscriptionId;
-        UA_Subscription *sub = UA_Session_getSubscriptionByID(session, sid);
+        UA_SubscriptionAcknowledgement *ack = &request->subscriptionAcknowledgements[i];
+        UA_Subscription *sub = UA_Session_getSubscriptionByID(session, ack->subscriptionId);
         if(!sub) {
-            response.results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+            response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+            UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                         "Cannot process acknowledgements subscription %u", ack->subscriptionId);
             continue;
         }
-        UA_UInt32 sn = request->subscriptionAcknowledgements[i].sequenceNumber;
-        if(Subscription_deleteUnpublishedNotification(sn, false, sub) == 0)
-            response.results[i] = UA_STATUSCODE_BADSEQUENCENUMBERINVALID;
-    }
-    
-    UA_Boolean have_response = false;
 
-    // See if any new data is available
-    UA_Subscription *sub;
-    LIST_FOREACH(sub, &session->serverSubscriptions, listEntry) {
-        if(sub->timedUpdateIsRegistered == false) {
-            // FIXME: We are forcing a value update for monitored items. This should be done by the event system.
-            // NOTE:  There is a clone of this functionality in the Subscription_timedUpdateNotificationsJob
-            UA_MonitoredItem *mon;
-            LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
-                MonitoredItem_QueuePushDataValue(server, mon);
-            
-            // FIXME: We are forcing notification updates for the subscription. This
-            // should be done by a timed work item.
-            Subscription_updateNotifications(sub);
-        }
-        
-        if(sub->unpublishedNotificationsSize == 0)
-            continue;
-        
-        // This subscription has notifications in its queue (top NotificationMessage exists in the queue). 
-        // Due to republish, we need to check if there are any unplublished notifications first ()
-        UA_unpublishedNotification *notification = NULL;
-        LIST_FOREACH(notification, &sub->unpublishedNotifications, listEntry) {
-            if (notification->publishedOnce == false)
+        response->results[i] = UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN;
+        UA_NotificationMessageEntry *pre, *pre_tmp;
+        LIST_FOREACH_SAFE(pre, &sub->retransmissionQueue, listEntry, pre_tmp) {
+            if(pre->message.sequenceNumber == ack->sequenceNumber) {
+                LIST_REMOVE(pre, listEntry);
+                response->results[i] = UA_STATUSCODE_GOOD;
+                UA_NotificationMessage_deleteMembers(&pre->message);
+                UA_free(pre);
                 break;
-        }
-        if (notification == NULL)
-            continue;
-    
-        // We found an unpublished notification message in this subscription, which we will now publish.
-        response.subscriptionId = sub->subscriptionID;
-        Subscription_copyNotificationMessage(&response.notificationMessage, notification);
-        // Mark this notification as published
-        notification->publishedOnce = true;
-        if(notification->notification.sequenceNumber > sub->sequenceNumber) {
-            // If this is a keepalive message, its seqNo is the next seqNo to be used for an actual msg.
-            response.availableSequenceNumbersSize = 0;
-            // .. and must be deleted
-            Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, false, sub);
-        } else {
-            response.availableSequenceNumbersSize = sub->unpublishedNotificationsSize;
-            response.availableSequenceNumbers = Subscription_getAvailableSequenceNumbers(sub);
-        }	  
-        have_response = true;
-    }
-    
-    if(!have_response) {
-        // FIXME: At this point, we would return nothing and "queue" the publish
-        // request, but currently we need to return something to the client. If no
-        // subscriptions have notifications, force one to generate a keepalive so we
-        // don't return an empty message
-        sub = LIST_FIRST(&session->serverSubscriptions);
-        if(sub) {
-            response.subscriptionId = sub->subscriptionID;
-            sub->keepAliveCount.current=sub->keepAliveCount.min;
-            Subscription_generateKeepAlive(sub);
-            Subscription_copyNotificationMessage(&response.notificationMessage,
-                                                 LIST_FIRST(&sub->unpublishedNotifications));
-            Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, false, sub);
+            }
         }
     }
-    
-    UA_SecureChannel *channel = session->channel;
-    if(channel)
-        UA_SecureChannel_sendBinaryMessage(channel, requestId, &response,
-                                           &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
-    UA_PublishResponse_deleteMembers(&response);
-}
 
-void
-Service_ModifySubscription(UA_Server *server, UA_Session *session, const UA_ModifySubscriptionRequest *request,
-                           UA_ModifySubscriptionResponse *response) {
-    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
-    if(!sub) {
-        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
-        return;
-    }
-    
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
-                               request->requestedPublishingInterval, response->revisedPublishingInterval);
-    sub->publishingInterval = response->revisedPublishingInterval;
-    
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
-                               request->requestedLifetimeCount, response->revisedLifetimeCount);
-    sub->lifeTime = (UA_BoundedUInt32)  {
-        .min = server->config.lifeTimeCountLimits.min,
-        .max = server->config.lifeTimeCountLimits.max,
-        .current=response->revisedLifetimeCount};
-        
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
-                               request->requestedMaxKeepAliveCount, response->revisedMaxKeepAliveCount);
-    sub->keepAliveCount = (UA_BoundedUInt32)  {
-        .min = server->config.keepAliveCountLimits.min,
-        .max = server->config.keepAliveCountLimits.max,
-        .current=response->revisedMaxKeepAliveCount};
-        
-    sub->notificationsPerPublish = request->maxNotificationsPerPublish;
-    sub->priority                = request->priority;
-    
-    Subscription_unregisterUpdateJob(server, sub);
-    Subscription_registerUpdateJob(server, sub);
-    return;
+    /* Queue the publish response */
+    SIMPLEQ_INSERT_TAIL(&session->responseQueue, entry, listEntry);
+    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "Queued a publication message on session %u", session->authenticationToken.identifier.numeric);
 }
 
 void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
@@ -263,8 +250,7 @@ void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
     response->resultsSize = request->subscriptionIdsSize;
 
     for(size_t i = 0; i < request->subscriptionIdsSize; i++)
-        response->results[i] =
-            UA_Session_deleteSubscription(server, session, request->subscriptionIds[i]);
+        response->results[i] = UA_Session_deleteSubscription(server, session, request->subscriptionIds[i]);
 } 
 
 void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
@@ -284,43 +270,27 @@ void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
     response->resultsSize = request->monitoredItemIdsSize;
 
     for(size_t i = 0; i < request->monitoredItemIdsSize; i++)
-        response->results[i] =
-            UA_Session_deleteMonitoredItem(session, sub->subscriptionID,
-                                           request->monitoredItemIds[i]);
+        response->results[i] = UA_Subscription_deleteMonitoredItem(server, sub, request->monitoredItemIds[i]);
 }
 
 void Service_Republish(UA_Server *server, UA_Session *session, const UA_RepublishRequest *request,
                        UA_RepublishResponse *response) {
+    /* get the subscription */
     UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
     if (!sub) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
         return;
     }
     
-    // Find the notification in question
-    UA_unpublishedNotification *notification;
-    LIST_FOREACH(notification, &sub->unpublishedNotifications, listEntry) {
-        if(notification->notification.sequenceNumber == request->retransmitSequenceNumber)
+    /* Find the notification in the retransmission queue  */
+    UA_NotificationMessageEntry *entry;
+    LIST_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
+        if(entry->message.sequenceNumber == request->retransmitSequenceNumber)
             break;
     }
-    if(!notification) {
+    if(entry)
+        response->responseHeader.serviceResult =
+            UA_NotificationMessage_copy(&entry->message, &response->notificationMessage);
+    else
       response->responseHeader.serviceResult = UA_STATUSCODE_BADSEQUENCENUMBERINVALID;
-      return;
-    }
-    
-    // FIXME: By spec, this notification has to be in the "retransmit queue", i.e. publishedOnce must be
-    //        true. If this is not tested, the client just gets what he asks for... hence this part is
-    //        commented:
-    /* Check if the notification is in the published queue
-    if (notification->publishedOnce == false) {
-      response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
-      return;
-    }
-    */
-    // Retransmit 
-    Subscription_copyNotificationMessage(&response->notificationMessage, notification);
-    // Mark this notification as published
-    notification->publishedOnce = true;
-    
-    return;
 }

+ 265 - 431
src/server/ua_subscription.c

@@ -1,488 +1,322 @@
 #include "ua_subscription.h"
 #include "ua_server_internal.h"
+#include "ua_services.h"
 #include "ua_nodestore.h"
 
-/****************/
-/* Subscription */
-/****************/
+/*****************/
+/* MonitoredItem */
+/*****************/
 
-UA_Subscription *UA_Subscription_new(UA_UInt32 subscriptionID) {
-    UA_Subscription *new = UA_malloc(sizeof(UA_Subscription));
-    if(!new)
-        return NULL;
-    new->subscriptionID = subscriptionID;
-    new->lastPublished  = 0;
-    new->sequenceNumber = 1;
-    memset(&new->timedUpdateJobGuid, 0, sizeof(UA_Guid));
-    new->timedUpdateIsRegistered = false;
-    LIST_INIT(&new->MonitoredItems);
-    LIST_INIT(&new->unpublishedNotifications);
-    new->unpublishedNotificationsSize = 0;
+UA_MonitoredItem * UA_MonitoredItem_new() {
+    UA_MonitoredItem *new = UA_malloc(sizeof(UA_MonitoredItem));
+    new->subscription = NULL;
+    new->currentQueueSize = 0;
+    new->maxQueueSize = 0;
+    new->monitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY; // TODO: This is currently hardcoded;
+    TAILQ_INIT(&new->queue);
+    UA_NodeId_init(&new->monitoredNodeId);
+    new->lastSampledValue = UA_BYTESTRING_NULL;
+    memset(&new->sampleJobGuid, 0, sizeof(UA_Guid));
+    new->sampleJobIsRegistered = false;
     return new;
 }
 
-void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server) {
-    // Just in case any parallel process attempts to access this subscription
-    // while we are deleting it... make it vanish.
-    subscription->subscriptionID = 0;
-    
-    // Delete monitored Items
-    UA_MonitoredItem *mon, *tmp_mon;
-    LIST_FOREACH_SAFE(mon, &subscription->MonitoredItems, listEntry, tmp_mon) {
-        LIST_REMOVE(mon, listEntry);
-        MonitoredItem_delete(mon);
-    }
-    
-    // Delete unpublished Notifications
-    Subscription_deleteUnpublishedNotification(0, true, subscription);
-    
-    // Unhook/Unregister any timed work assiociated with this subscription
-    if(subscription->timedUpdateIsRegistered) {
-        Subscription_unregisterUpdateJob(server, subscription);
-        subscription->timedUpdateIsRegistered = false;
+void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+    MonitoredItem_unregisterSampleJob(server, monitoredItem);
+    /* clear the queued samples */
+    MonitoredItem_queuedValue *val, *val_tmp;
+    TAILQ_FOREACH_SAFE(val, &monitoredItem->queue, listEntry, val_tmp) {
+        TAILQ_REMOVE(&monitoredItem->queue, val, listEntry);
+        UA_DataValue_deleteMembers(&val->value);
+        UA_free(val);
     }
+    monitoredItem->currentQueueSize = 0;
+    LIST_REMOVE(monitoredItem, listEntry);
+    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
+    UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
+    UA_free(monitoredItem);
 }
 
-void Subscription_generateKeepAlive(UA_Subscription *subscription) {
-    if(subscription->keepAliveCount.current > subscription->keepAliveCount.min &&
-       subscription->keepAliveCount.current <= subscription->keepAliveCount.max)
+static void SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+    if(monitoredItem->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY) {
+        UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "Cannot process a monitoreditem that is not a data change notification");
         return;
+    }
+
+    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "Sampling the value on monitoreditem %u", monitoredItem->itemId);
 
-    UA_unpublishedNotification *msg = UA_calloc(1,sizeof(UA_unpublishedNotification));
-    if(!msg)
+    MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
+    if(!newvalue)
         return;
-    msg->notification.notificationData = NULL;
-    // KeepAlive uses next message, but does not increment counter
-    msg->notification.sequenceNumber = subscription->sequenceNumber + 1;
-    msg->notification.publishTime    = UA_DateTime_now();
-    msg->notification.notificationDataSize = 0;
-    LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
-    subscription->unpublishedNotificationsSize += 1;
-    subscription->keepAliveCount.current = subscription->keepAliveCount.max;
-}
+    UA_DataValue_init(&newvalue->value);
+    newvalue->clientHandle = monitoredItem->clientHandle;
+    UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
+                "Creating a sample with client handle %u", newvalue->clientHandle);
+  
+    /* Read the value */
+    UA_ReadValueId rvid;
+    UA_ReadValueId_init(&rvid);
+    rvid.nodeId = monitoredItem->monitoredNodeId;
+    rvid.attributeId = monitoredItem->attributeID;
+    UA_Subscription *sub = monitoredItem->subscription;
+    Service_Read_single(server, sub->session, monitoredItem->timestampsToReturn, &rvid, &newvalue->value);
 
-void Subscription_updateNotifications(UA_Subscription *subscription) {
-    UA_MonitoredItem *mon;
-    //MonitoredItem_queuedValue *queuedValue;
-    UA_unpublishedNotification *msg;
-    UA_UInt32 monItemsChangeT = 0, monItemsStatusT = 0, monItemsEventT = 0;
-    
-    if(!subscription || subscription->lastPublished +
-       (UA_UInt32)(subscription->publishingInterval * UA_MSEC_TO_DATETIME) > UA_DateTime_now())
+    /* encode to see if the data has changed */
+    size_t binsize = UA_calcSizeBinary(&newvalue->value.value, &UA_TYPES[UA_TYPES_VARIANT]);
+    UA_ByteString newValueAsByteString;
+    UA_StatusCode retval = UA_ByteString_allocBuffer(&newValueAsByteString, binsize);
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_DataValue_deleteMembers(&newvalue->value);
+        UA_free(newvalue);
         return;
-    
-    // Make sure there is data to be published and establish which message types
-    // will need to be generated
-    LIST_FOREACH(mon, &subscription->MonitoredItems, listEntry) {
-        // Check if this MonitoredItems Queue holds data and how much data is held in total
-        if(!TAILQ_FIRST(&mon->queue))
-            continue;
-        if((mon->monitoredItemType & MONITOREDITEM_TYPE_CHANGENOTIFY) != 0)
-            monItemsChangeT+=mon->queueSize.current;
-	    else if((mon->monitoredItemType & MONITOREDITEM_TYPE_STATUSNOTIFY) != 0)
-            monItemsStatusT+=mon->queueSize.current;
-	    else if((mon->monitoredItemType & MONITOREDITEM_TYPE_EVENTNOTIFY)  != 0)
-            monItemsEventT+=mon->queueSize.current;
-    }
-    
-    // FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
-    if(subscription->unpublishedNotificationsSize >= 10) {
-        // Remove last entry
-        Subscription_deleteUnpublishedNotification(0, true, subscription);
     }
-    
-    if(monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
-        // Decrement KeepAlive
-        subscription->keepAliveCount.current--;
-        // +- Generate KeepAlive msg if counter overruns
-        if (subscription->keepAliveCount.current < subscription->keepAliveCount.min)
-          Subscription_generateKeepAlive(subscription);
-        
+    size_t encodingOffset = 0;
+    retval = UA_encodeBinary(&newvalue->value.value, &UA_TYPES[UA_TYPES_VARIANT],
+                             &newValueAsByteString, &encodingOffset);
+
+    /* error or the content has not changed */
+    if(retval != UA_STATUSCODE_GOOD ||
+       (monitoredItem->lastSampledValue.data &&
+        UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue))) {
+        UA_ByteString_deleteMembers(&newValueAsByteString);
+        UA_DataValue_deleteMembers(&newvalue->value);
+        UA_free(newvalue);
+        UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "Do not sample since the data value has not changed");
         return;
     }
-    
-    msg = UA_calloc(1, sizeof(UA_unpublishedNotification));
-    msg->notification.sequenceNumber = subscription->sequenceNumber++;
-    msg->notification.publishTime = UA_DateTime_now();
-    
-    // NotificationData is an array of Change, Status and Event messages, each containing the appropriate
-    // list of Queued values from all monitoredItems of that type
-    msg->notification.notificationDataSize = !!monItemsChangeT; // 1 if the pointer is not null, else 0
-    // + ISNOTZERO(monItemsEventT) + ISNOTZERO(monItemsStatusT);
-    msg->notification.notificationData =
-        UA_Array_new(msg->notification.notificationDataSize, &UA_TYPES[UA_TYPES_EXTENSIONOBJECT]);
-    
-    for(size_t notmsgn = 0; notmsgn < msg->notification.notificationDataSize; notmsgn++) {
-        // Set the notification message type and encoding for each of 
-        //   the three possible NotificationData Types
-
-        /* msg->notification->notificationData[notmsgn].encoding = 1; // Encoding is always binary */
-        /* msg->notification->notificationData[notmsgn].typeId = UA_NODEID_NUMERIC(0, 811); */
-      
-        if(notmsgn == 0) {
-            UA_DataChangeNotification *changeNotification = UA_DataChangeNotification_new();
-            changeNotification->monitoredItems = UA_Array_new(monItemsChangeT, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
-	
-            // Scan all monitoredItems in this subscription and have their queue transformed into an Array of
-            // the propper NotificationMessageType (Status, Change, Event)
-            monItemsChangeT = 0;
-            LIST_FOREACH(mon, &subscription->MonitoredItems, listEntry) {
-                if(mon->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY || !TAILQ_FIRST(&mon->queue))
-                    continue;
-                // Note: Monitored Items might not return a queuedValue if there is a problem encoding it.
-                monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications(&changeNotification->monitoredItems[monItemsChangeT], mon);
-                MonitoredItem_ClearQueue(mon);
-            }
-            changeNotification->monitoredItemsSize = monItemsChangeT;
-            msg->notification.notificationData[notmsgn].encoding = UA_EXTENSIONOBJECT_DECODED;
-            msg->notification.notificationData[notmsgn].content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
-            msg->notification.notificationData[notmsgn].content.decoded.data = changeNotification;
-        } else if(notmsgn == 1) {
-            // FIXME: Constructing a StatusChangeNotification is not implemented
-        } else if(notmsgn == 2) {
-            // FIXME: Constructing a EventListNotification is not implemented
+  
+    /* do we have space? */
+    if(monitoredItem->currentQueueSize >= monitoredItem->maxQueueSize) {
+        if(!monitoredItem->discardOldest) {
+            // We cannot remove the oldest value and theres no queue space left. We're done here.
+            UA_ByteString_deleteMembers(&newValueAsByteString);
+            UA_DataValue_deleteMembers(&newvalue->value);
+            UA_free(newvalue);
+            return;
         }
+        MonitoredItem_queuedValue *queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
+        TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
+        UA_DataValue_deleteMembers(&queueItem->value);
+        UA_free(queueItem);
+        monitoredItem->currentQueueSize--;
     }
-    LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
-    subscription->unpublishedNotificationsSize += 1;
-}
-
-UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
-    UA_UInt32 *seqArray = UA_malloc(sizeof(UA_UInt32) * sub->unpublishedNotificationsSize);
-    if(!seqArray)
-        return NULL;
   
-    int i = 0;
-    UA_unpublishedNotification *not;
-    LIST_FOREACH(not, &sub->unpublishedNotifications, listEntry) {
-        seqArray[i] = not->notification.sequenceNumber;
-        i++;
-    }
-    return seqArray;
-}
-
-void Subscription_copyNotificationMessage(UA_NotificationMessage *dst, UA_unpublishedNotification *src) {
-    if(!dst)
-        return;
-    
-    UA_NotificationMessage *latest = &src->notification;
-    dst->notificationDataSize = latest->notificationDataSize;
-    dst->publishTime = latest->publishTime;
-    dst->sequenceNumber = latest->sequenceNumber;
-    
-    if(latest->notificationDataSize == 0)
-        return;
-
-    dst->notificationData = UA_ExtensionObject_new();
-    UA_ExtensionObject_copy(latest->notificationData, dst->notificationData);
-}
-
-UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Boolean bDeleteAll, UA_Subscription *sub) {
-    UA_UInt32 deletedItems = 0;
-    UA_unpublishedNotification *not, *tmp;
-    LIST_FOREACH_SAFE(not, &sub->unpublishedNotifications, listEntry, tmp) {
-        if(!bDeleteAll && not->notification.sequenceNumber != seqNo)
-            continue;
-        LIST_REMOVE(not, listEntry);
-        sub->unpublishedNotificationsSize -= 1;
-        UA_NotificationMessage_deleteMembers(&not->notification);
-        UA_free(not);
-        deletedItems++;
-    }
-    return deletedItems;
-}
-
-
-static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *data) {
-    // Timed-Worker/Job Version of updateNotifications
-    UA_Subscription *sub = (UA_Subscription *) data;
-    UA_MonitoredItem *mon;
-    
-    if(!data || !server)
-        return;
-    
-    // This is set by the Subscription_delete function to detere us from fiddling with
-    // this subscription if it is being deleted (not technically thread save, but better
-    // then nothing at all)
-    if(sub->subscriptionID == 0)
-        return;
-    
-    // FIXME: This should be done by the event system
-    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
-        MonitoredItem_QueuePushDataValue(server, mon);
-    
-    Subscription_updateNotifications(sub);
+    /* add the sample */
+    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
+    monitoredItem->lastSampledValue = newValueAsByteString;
+    TAILQ_INSERT_TAIL(&monitoredItem->queue, newvalue, listEntry);
+    monitoredItem->currentQueueSize++;
 }
 
-UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub) {
-    if(sub->publishingInterval <= 5 ) 
-        return UA_STATUSCODE_BADNOTSUPPORTED;
-
-    UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
-                           .job.methodCall = {.method = Subscription_timedUpdateNotificationsJob,
-                                              .data = sub} };
-    
-    /* Practically enough, the client sends a uint32 in ms, which we store as
-       datetime, which here is required in as uint32 in ms as the interval */
-    UA_StatusCode retval = UA_Server_addRepeatedJob(server, job,
-                                                    (UA_UInt32)sub->publishingInterval,
-                                                    &sub->timedUpdateJobGuid);
+UA_StatusCode MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
+    UA_Job job = {.type = UA_JOBTYPE_METHODCALL,
+                  .job.methodCall = {.method = (UA_ServerCallback)SampleCallback, .data = mon} };
+    UA_StatusCode retval = UA_Server_addRepeatedJob(server, job, (UA_UInt32)mon->samplingInterval,
+                                                    &mon->sampleJobGuid);
     if(retval == UA_STATUSCODE_GOOD)
-        sub->timedUpdateIsRegistered = true;
+        mon->sampleJobIsRegistered = true;
     return retval;
 }
 
-UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub) {
-    sub->timedUpdateIsRegistered = false;
-    return UA_Server_removeRepeatedJob(server, sub->timedUpdateJobGuid);
+UA_StatusCode MonitoredItem_unregisterSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
+    if(!mon->sampleJobIsRegistered)
+        return UA_STATUSCODE_GOOD;
+    mon->sampleJobIsRegistered = false;
+    return UA_Server_removeRepeatedJob(server, mon->sampleJobGuid);
 }
 
-/*****************/
-/* MonitoredItem */
-/*****************/
+/****************/
+/* Subscription */
+/****************/
 
-UA_MonitoredItem * UA_MonitoredItem_new() {
-    UA_MonitoredItem *new = (UA_MonitoredItem *) UA_malloc(sizeof(UA_MonitoredItem));
-    new->queueSize   = (UA_BoundedUInt32) { .min = 0, .max = 0, .current = 0};
-    new->lastSampled = 0;
-    // FIXME: This is currently hardcoded;
-    new->monitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
-    TAILQ_INIT(&new->queue);
-    UA_NodeId_init(&new->monitoredNodeId);
-    new->lastSampledValue.data = 0;
+UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID) {
+    UA_Subscription *new = UA_malloc(sizeof(UA_Subscription));
+    if(!new)
+        return NULL;
+    new->session = session;
+    new->subscriptionID = subscriptionID;
+    new->sequenceNumber = 1;
+    new->currentKeepAliveCount = 0;
+    new->maxKeepAliveCount = 0;
+    memset(&new->publishJobGuid, 0, sizeof(UA_Guid));
+    new->publishJobIsRegistered = false;
+    LIST_INIT(&new->retransmissionQueue);
+    LIST_INIT(&new->MonitoredItems);
     return new;
 }
 
-void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
-    // Delete Queued Data
-    MonitoredItem_ClearQueue(monitoredItem);
-    // Remove from subscription list
-    LIST_REMOVE(monitoredItem, listEntry);
-    // Release comparison sample
-    if(monitoredItem->lastSampledValue.data != NULL) { 
-      UA_free(monitoredItem->lastSampledValue.data);
-    }
-    
-    UA_NodeId_deleteMembers(&(monitoredItem->monitoredNodeId));
-    UA_free(monitoredItem);
-}
+void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server) {
+    Subscription_unregisterPublishJob(server, subscription);
 
-UA_UInt32 MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
-                                                 UA_MonitoredItem *monitoredItem) {
-    UA_UInt32 queueSize = 0;
-    MonitoredItem_queuedValue *queueItem;
-  
-    // Count instead of relying on the items current
-    TAILQ_FOREACH(queueItem, &monitoredItem->queue, listEntry) {
-        dst[queueSize].clientHandle = monitoredItem->clientHandle;
-        UA_DataValue_copy(&queueItem->value, &dst[queueSize].value);
+    /* Delete monitored Items */
+    UA_MonitoredItem *mon, *tmp_mon;
+    LIST_FOREACH_SAFE(mon, &subscription->MonitoredItems, listEntry, tmp_mon) {
+        LIST_REMOVE(mon, listEntry);
+        MonitoredItem_delete(server, mon);
+    }
 
-        dst[queueSize].value.hasServerPicoseconds = false;
-        dst[queueSize].value.hasServerTimestamp   = true;
-        dst[queueSize].value.serverTimestamp      = UA_DateTime_now();
-    
-        // Do not create variants with no type -> will make calcSizeBinary() segfault.
-        if(dst[queueSize].value.value.type)
-            queueSize++;
+    /* Delete Retransmission Queue */
+    UA_NotificationMessageEntry *nme, *nme_tmp;
+    LIST_FOREACH_SAFE(nme, &subscription->retransmissionQueue, listEntry, nme_tmp) {
+        LIST_REMOVE(nme, listEntry);
+        UA_NotificationMessage_deleteMembers(&nme->message);
+        UA_free(nme);
     }
-    return queueSize;
 }
 
-void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
-    MonitoredItem_queuedValue *val, *val_tmp;
-    TAILQ_FOREACH_SAFE(val, &monitoredItem->queue, listEntry, val_tmp) {
-        TAILQ_REMOVE(&monitoredItem->queue, val, listEntry);
-        UA_DataValue_deleteMembers(&val->value);
-        UA_free(val);
+UA_MonitoredItem *
+UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemID) {
+    UA_MonitoredItem *mon;
+    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+        if(mon->itemId == monitoredItemID)
+            break;
     }
-    monitoredItem->queueSize.current = 0;
+    return mon;
 }
 
-UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, const UA_Node *src,
-                                                     UA_DataValue *dst) {
-    UA_Boolean samplingError = true; 
-    UA_DataValue sourceDataValue;
-    UA_DataValue_init(&sourceDataValue);
-  
-    // FIXME: Not all attributeIDs can be monitored yet
-    switch(attributeID) {
-    case UA_ATTRIBUTEID_NODEID:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_NodeId*)&src->nodeId, &UA_TYPES[UA_TYPES_NODEID]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_NODECLASS:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_Int32*)&src->nodeClass, &UA_TYPES[UA_TYPES_INT32]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_BROWSENAME:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_String*)&src->browseName, &UA_TYPES[UA_TYPES_QUALIFIEDNAME]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_DISPLAYNAME:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_String*)&src->displayName, &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_DESCRIPTION:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_String*)&src->displayName, &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_WRITEMASK:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_String*)&src->writeMask, &UA_TYPES[UA_TYPES_UINT32]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_USERWRITEMASK:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_String*)&src->writeMask, &UA_TYPES[UA_TYPES_UINT32]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_ISABSTRACT:
-        break;
-    case UA_ATTRIBUTEID_SYMMETRIC:
-        break;
-    case UA_ATTRIBUTEID_INVERSENAME:
-        break;
-    case UA_ATTRIBUTEID_CONTAINSNOLOOPS:
-        break;
-    case UA_ATTRIBUTEID_EVENTNOTIFIER:
-        break;
-    case UA_ATTRIBUTEID_VALUE: 
-        if(src->nodeClass == UA_NODECLASS_VARIABLE) {
-            const UA_VariableNode *vsrc = (const UA_VariableNode*)src;
-            if(vsrc->valueSource == UA_VALUESOURCE_VARIANT) {
-                if(vsrc->value.variant.callback.onRead)
-                    vsrc->value.variant.callback.onRead(vsrc->value.variant.callback.handle, vsrc->nodeId,
-                                                        &dst->value, NULL);
-                UA_Variant_copy(&vsrc->value.variant.value, &dst->value);
-                dst->hasValue = true;
-                samplingError = false;
-            } else {
-                if(vsrc->valueSource != UA_VALUESOURCE_DATASOURCE || vsrc->value.dataSource.read == NULL)
-                    break;
-                if(vsrc->value.dataSource.read(vsrc->value.dataSource.handle, vsrc->nodeId, true,
-                                               NULL, &sourceDataValue) != UA_STATUSCODE_GOOD)
-                    break;
-                UA_DataValue_copy(&sourceDataValue, dst);
-                UA_DataValue_deleteMembers(&sourceDataValue);
-                samplingError = false;
-            }
+UA_StatusCode
+UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
+                                    UA_UInt32 monitoredItemID) {
+    UA_MonitoredItem *mon;
+    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+        if(mon->itemId == monitoredItemID) {
+            LIST_REMOVE(mon, listEntry);
+            MonitoredItem_delete(server, mon);
+            return UA_STATUSCODE_GOOD;
         }
-        break;
-    case UA_ATTRIBUTEID_DATATYPE:
-        break;
-    case UA_ATTRIBUTEID_VALUERANK:
-        break;
-    case UA_ATTRIBUTEID_ARRAYDIMENSIONS:
-        break;
-    case UA_ATTRIBUTEID_ACCESSLEVEL:
-        break;
-    case UA_ATTRIBUTEID_USERACCESSLEVEL:
-        break;
-    case UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
-        break;
-    case UA_ATTRIBUTEID_HISTORIZING:
-        break;
-    case UA_ATTRIBUTEID_EXECUTABLE:
-        break;
-    case UA_ATTRIBUTEID_USEREXECUTABLE:
-        break;
-    default:
-        break;
     }
-  
-    return samplingError;
+    return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
 }
 
-void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem) {
-    UA_ByteString newValueAsByteString = { .length=0, .data=NULL };
-    size_t encodingOffset = 0;
-  
-    if(!monitoredItem || monitoredItem->lastSampled + monitoredItem->samplingInterval > UA_DateTime_now())
-        return;
-  
-    // FIXME: Actively suppress non change value based monitoring. There should be
-    // another function to handle status and events.
-    if(monitoredItem->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY)
-        return;
-
-    MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
-    if(!newvalue)
-        return;
+static void PublishCallback(UA_Server *server, UA_Subscription *sub) {
+    /* Count the available notifications */
+    size_t notifications = 0;
+    UA_MonitoredItem *mon;
+    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+        MonitoredItem_queuedValue *qv;
+        TAILQ_FOREACH(qv, &mon->queue, listEntry) {
+            if(notifications >= sub->notificationsPerPublish)
+                break;
+            notifications++;
+        }
+    }
 
-    newvalue->listEntry.tqe_next = NULL;
-    newvalue->listEntry.tqe_prev = NULL;
-    UA_DataValue_init(&newvalue->value);
+    /* Continue only if we have data or want to send a keepalive */
+    if(notifications == 0) {
+        sub->currentKeepAliveCount++;
+        if(sub->currentKeepAliveCount < sub->maxKeepAliveCount)
+            return;
+    }
 
-    // Verify that the *Node being monitored is still valid
-    // Looking up the in the nodestore is only necessary if we suspect that it is changed during writes
-    // e.g. in multithreaded applications
-    const UA_Node *target = UA_NodeStore_get(server->nodestore, &monitoredItem->monitoredNodeId);
-    if(!target) {
-        UA_free(newvalue);
+    /* Check if the securechannel is valid */
+    UA_SecureChannel *channel = sub->session->channel;
+    if(!channel)
         return;
-    }
-  
-    UA_Boolean samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->attributeID, target,
-                                                                         &newvalue->value);
 
-    if(samplingError != false || !newvalue->value.value.type) {
-        UA_DataValue_deleteMembers(&newvalue->value);
-        UA_free(newvalue);
+    /* Dequeue a response */
+    UA_PublishResponseEntry *pre = SIMPLEQ_FIRST(&sub->session->responseQueue);
+    if(!pre) {
+        UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
+                    "Cannot send a publish response on subscription %u " \
+                    "since the publish queue is empty on session %u",
+                    sub->subscriptionID, sub->session->authenticationToken.identifier.numeric);
         return;
     }
-  
-    if(monitoredItem->queueSize.current >= monitoredItem->queueSize.max) {
-        if(monitoredItem->discardOldest != true) {
-            // We cannot remove the oldest value and theres no queue space left. We're done here.
-            UA_DataValue_deleteMembers(&newvalue->value);
-            UA_free(newvalue);
-            return;
+    SIMPLEQ_REMOVE_HEAD(&sub->session->responseQueue, listEntry);
+
+    /* Prepare the response */
+    UA_PublishResponse *response = &pre->response;
+    response->responseHeader.timestamp = UA_DateTime_now();
+    response->subscriptionId = sub->subscriptionID;
+    UA_NotificationMessage *message = &response->notificationMessage;
+    message->sequenceNumber = ++(sub->sequenceNumber);
+    message->publishTime = response->responseHeader.timestamp;
+    message->notificationData = UA_ExtensionObject_new();
+    message->notificationDataSize = 1;
+    UA_ExtensionObject *data = message->notificationData;
+    UA_DataChangeNotification *dcn = UA_DataChangeNotification_new();
+    dcn->monitoredItems = UA_Array_new(notifications, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
+    dcn->monitoredItemsSize = notifications;
+    size_t l = 0;
+    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+        MonitoredItem_queuedValue *qv, *qv_tmp;
+        TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
+            if(notifications <= l)
+                break;
+            UA_MonitoredItemNotification *min = &dcn->monitoredItems[l];
+            min->clientHandle = qv->clientHandle;
+            min->value = qv->value;
+            TAILQ_REMOVE(&mon->queue, qv, listEntry);
+            UA_free(qv);
+            mon->currentQueueSize--;
+            l++;
         }
-        MonitoredItem_queuedValue *queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
-        TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
-        UA_free(queueItem);
-        monitoredItem->queueSize.current--;
     }
-  
-    // encode the data to find if its different to the previous
-    size_t binsize = UA_calcSizeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE]);
-    UA_StatusCode retval = UA_ByteString_allocBuffer(&newValueAsByteString, binsize);
-    if(retval != UA_STATUSCODE_GOOD) {
-        UA_DataValue_deleteMembers(&newvalue->value);
-        UA_free(newvalue);
-        return;
+    data->encoding = UA_EXTENSIONOBJECT_DECODED;
+    data->content.decoded.data = dcn;
+    data->content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
+
+    /* Get the available sequence numbers from the retransmission queue */
+    size_t available = 0;
+    UA_NotificationMessageEntry *nme;
+    LIST_FOREACH(nme, &sub->retransmissionQueue, listEntry)
+        available++;
+    response->availableSequenceNumbers = UA_malloc(available * sizeof(UA_UInt32));
+    response->availableSequenceNumbersSize = available;
+    size_t i = 0;
+    LIST_FOREACH(nme, &sub->retransmissionQueue, listEntry) {
+        response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
+        i++;
     }
     
-    retval = UA_encodeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE], &newValueAsByteString, &encodingOffset);
-    if(retval != UA_STATUSCODE_GOOD) {
-        UA_ByteString_deleteMembers(&newValueAsByteString);
-        UA_DataValue_deleteMembers(&newvalue->value);
-        UA_free(newvalue);
-        return;
-    }
-  
-    if(!monitoredItem->lastSampledValue.data) { 
-        UA_ByteString_copy(&newValueAsByteString, &monitoredItem->lastSampledValue);
-        TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
-        monitoredItem->queueSize.current++;
-        monitoredItem->lastSampled = UA_DateTime_now();
-        UA_free(newValueAsByteString.data);
-    } else {
-        if(UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue) == true) {
-            UA_DataValue_deleteMembers(&newvalue->value);
-            UA_free(newvalue);
-            UA_String_deleteMembers(&newValueAsByteString);
-            return;
-        }
-        UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
-        monitoredItem->lastSampledValue = newValueAsByteString;
-        TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
-        monitoredItem->queueSize.current++;
-        monitoredItem->lastSampled = UA_DateTime_now();
-    }
+    /* send out the response */
+    UA_SecureChannel_sendBinaryMessage(channel, pre->requestId, response,
+                                       &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
+    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "Sending out a publish response on subscription %u on securechannel %u " \
+                 "with %u notifications", sub->subscriptionID,
+                 sub->session->authenticationToken.identifier.numeric, (UA_UInt32)notifications);
+
+    /* Reset the keepalive count */
+    sub->currentKeepAliveCount = 0;
+
+    /* Put the notification message into the retransmission queue and delete the response */
+    UA_NotificationMessageEntry *retransmission = malloc(sizeof(UA_NotificationMessageEntry));
+    retransmission->message = response->notificationMessage;
+    UA_NotificationMessage_init(&response->notificationMessage);
+    LIST_INSERT_HEAD(&sub->retransmissionQueue, retransmission, listEntry);
+    UA_PublishResponse_deleteMembers(response);
+    UA_free(pre);
+}
+
+UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub) {
+    UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
+                           .job.methodCall = {.method = (UA_ServerCallback)PublishCallback, .data = sub} };
+    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "Adding a subscription with %i millisec interval", (int)sub->publishingInterval);
+    UA_StatusCode retval = UA_Server_addRepeatedJob(server, job,
+                                                    (UA_UInt32)sub->publishingInterval,
+                                                    &sub->publishJobGuid);
+    if(retval == UA_STATUSCODE_GOOD)
+        sub->publishJobIsRegistered = true;
+    else
+        UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "Could not register a subscription publication job with status code 0x%08x\n",
+                     retval);
+    return retval;
+}
+
+UA_StatusCode Subscription_unregisterPublishJob(UA_Server *server, UA_Subscription *sub) {
+    if(!sub->publishJobIsRegistered)
+        return UA_STATUSCODE_GOOD;
+    sub->publishJobIsRegistered = false;
+    UA_StatusCode retval = UA_Server_removeRepeatedJob(server, sub->publishJobGuid);
+    if(retval != UA_STATUSCODE_GOOD)
+        UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "Could not remove a subscription publication job with status code 0x%08x\n",
+                     retval);
+    return retval;
 }

+ 50 - 37
src/server/ua_subscription.h

@@ -18,76 +18,89 @@ typedef enum {
 } UA_MONITOREDITEM_TYPE;
 
 typedef struct MonitoredItem_queuedValue {
-    UA_DataValue value;
     TAILQ_ENTRY(MonitoredItem_queuedValue) listEntry;
+    UA_UInt32 clientHandle;
+    UA_DataValue value;
 } MonitoredItem_queuedValue;
 
 typedef struct UA_MonitoredItem {
     LIST_ENTRY(UA_MonitoredItem) listEntry;
+
+    /* Settings */
+    UA_Subscription *subscription;
     UA_UInt32 itemId;
     UA_MONITOREDITEM_TYPE monitoredItemType;
-    UA_UInt32 timestampsToReturn;
-    UA_UInt32 monitoringMode;
+    UA_TimestampsToReturn timestampsToReturn;
+    UA_MonitoringMode monitoringMode;
     UA_NodeId monitoredNodeId; 
     UA_UInt32 attributeID;
     UA_UInt32 clientHandle;
-    UA_UInt32 samplingInterval; // [ms]
-    UA_BoundedUInt32 queueSize;
+    UA_Double samplingInterval; // [ms]
+    UA_UInt32 currentQueueSize;
+    UA_UInt32 maxQueueSize;
     UA_Boolean discardOldest;
-    UA_DateTime lastSampled;
+    // TODO: indexRange is ignored; array values default to element 0
+    // TODO: dataEncoding is hardcoded to UA binary
+
+    /* Sample Job */
+    UA_Guid sampleJobGuid;
+    UA_Boolean sampleJobIsRegistered;
+
+    /* Sample Queue */
     UA_ByteString lastSampledValue;
-    // FIXME: indexRange is ignored; array values default to element 0
-    // FIXME: dataEncoding is hardcoded to UA binary
     TAILQ_HEAD(QueueOfQueueDataValues, MonitoredItem_queuedValue) queue;
 } UA_MonitoredItem;
 
 UA_MonitoredItem *UA_MonitoredItem_new(void);
-void MonitoredItem_delete(UA_MonitoredItem *monitoredItem);
-void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem);
-void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem);
-UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, const UA_Node *src,
-                                                     UA_DataValue *dst);
-UA_UInt32 MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
-                                                       UA_MonitoredItem *monitoredItem);
+void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem);
+UA_StatusCode MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon);
+UA_StatusCode MonitoredItem_unregisterSampleJob(UA_Server *server, UA_MonitoredItem *mon);
 
 /****************/
 /* Subscription */
 /****************/
 
-typedef struct UA_unpublishedNotification {
-    UA_Boolean publishedOnce;
-    LIST_ENTRY(UA_unpublishedNotification) listEntry;
-    UA_NotificationMessage notification;
-} UA_unpublishedNotification;
+typedef struct UA_NotificationMessageEntry {
+    LIST_ENTRY(UA_NotificationMessageEntry) listEntry;
+    UA_NotificationMessage message;
+} UA_NotificationMessageEntry;
 
 struct UA_Subscription {
     LIST_ENTRY(UA_Subscription) listEntry;
-    UA_BoundedUInt32 lifeTime;
-    UA_BoundedUInt32 keepAliveCount;
+
+    /* Settings */
+    UA_Session *session;
+    UA_UInt32 lifeTime;
+    UA_UInt32 maxKeepAliveCount;
     UA_Double publishingInterval;     // [ms] 
-    UA_DateTime lastPublished;
     UA_UInt32 subscriptionID;
     UA_UInt32 notificationsPerPublish;
     UA_Boolean publishingMode;
     UA_UInt32 priority;
     UA_UInt32 sequenceNumber;
-    UA_Guid timedUpdateJobGuid;
-    UA_Boolean timedUpdateIsRegistered;
-    LIST_HEAD(UA_ListOfUnpublishedNotifications, UA_unpublishedNotification) unpublishedNotifications;
-    size_t unpublishedNotificationsSize;
+
+    /* Runtime information */
+    UA_UInt32 currentKeepAliveCount;
+
+    /* Publish Job */
+    UA_Guid publishJobGuid;
+    UA_Boolean publishJobIsRegistered;
+
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem) MonitoredItems;
+    LIST_HEAD(UA_ListOfNotificationMessages, UA_NotificationMessageEntry) retransmissionQueue;
 };
 
-UA_Subscription *UA_Subscription_new(UA_UInt32 subscriptionID);
+UA_Subscription *UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID);
 void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server);
-void Subscription_updateNotifications(UA_Subscription *subscription);
-UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub);
-void Subscription_generateKeepAlive(UA_Subscription *subscription);
-void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub);
-UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Boolean bDeleteAll, UA_Subscription *sub);
-void Subscription_copyNotificationMessage(UA_NotificationMessage *dst, UA_unpublishedNotification *src);
-UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub);
-UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub);
-UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub);
+UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub);
+UA_StatusCode Subscription_unregisterPublishJob(UA_Server *server, UA_Subscription *sub);
+
+UA_StatusCode
+UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
+                                    UA_UInt32 monitoredItemID);
+
+UA_MonitoredItem *
+UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemID);
+
 
 #endif /* UA_SUBSCRIPTION_H_ */

+ 9 - 21
src/ua_session.c

@@ -29,12 +29,13 @@ void UA_Session_init(UA_Session *session) {
     session->timeout = 0;
     UA_DateTime_init(&session->validTill);
     session->channel = NULL;
+    session->availableContinuationPoints = MAXCONTINUATIONPOINTS;
+    LIST_INIT(&session->continuationPoints);
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     LIST_INIT(&session->serverSubscriptions);
     session->lastSubscriptionID = UA_UInt32_random();
+    SIMPLEQ_INIT(&session->responseQueue);
 #endif
-    session->availableContinuationPoints = MAXCONTINUATIONPOINTS;
-    LIST_INIT(&session->continuationPoints);
 }
 
 void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server* server) {
@@ -58,6 +59,12 @@ void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server* server) {
         UA_Subscription_deleteMembers(currents, server);
         UA_free(currents);
     }
+    UA_PublishResponseEntry *entry;
+    while((entry = SIMPLEQ_FIRST(&session->responseQueue))) {
+        SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry);
+        UA_PublishResponse_deleteMembers(&entry->response);
+        UA_free(entry);
+    }
 #endif
 }
 
@@ -92,25 +99,6 @@ UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID) {
     return sub;
 }
 
-
-UA_StatusCode
-UA_Session_deleteMonitoredItem(UA_Session *session, UA_UInt32 subscriptionID,
-                               UA_UInt32 monitoredItemID) {
-    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, subscriptionID);
-    if(!sub)
-        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
-    
-    UA_MonitoredItem *mon, *tmp_mon;
-    LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, tmp_mon) {
-        if(mon->itemId == monitoredItemID) {
-            LIST_REMOVE(mon, listEntry);
-            MonitoredItem_delete(mon);
-            return UA_STATUSCODE_GOOD;
-        }
-    }
-    return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
-}
-
 UA_UInt32 UA_Session_getUniqueSubscriptionID(UA_Session *session) {
     return ++(session->lastSubscriptionID);
 }

+ 7 - 4
src/ua_session.h

@@ -19,6 +19,12 @@ struct ContinuationPointEntry {
 struct UA_Subscription;
 typedef struct UA_Subscription UA_Subscription;
 
+typedef struct UA_PublishResponseEntry {
+    SIMPLEQ_ENTRY(UA_PublishResponseEntry) listEntry;
+    UA_UInt32 requestId;
+    UA_PublishResponse response;
+} UA_PublishResponseEntry;
+
 struct UA_Session {
     UA_ApplicationDescription clientDescription;
     UA_Boolean        activated;
@@ -35,6 +41,7 @@ struct UA_Session {
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     UA_UInt32 lastSubscriptionID;
     LIST_HEAD(UA_ListOfUASubscriptions, UA_Subscription) serverSubscriptions;
+    SIMPLEQ_HEAD(UA_ListOfQueuedPublishResponses, UA_PublishResponseEntry) responseQueue;
 #endif
 };
 
@@ -54,10 +61,6 @@ void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscri
 UA_Subscription *
 UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID);
 
-UA_StatusCode
-UA_Session_deleteMonitoredItem(UA_Session *session, UA_UInt32 subscriptionID,
-                               UA_UInt32 monitoredItemID);
-
 UA_StatusCode
 UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
                               UA_UInt32 subscriptionID);

+ 4 - 0
tools/schema/datatypes_minimal.txt

@@ -152,3 +152,7 @@ ModifySubscriptionRequest
 ModifySubscriptionResponse
 RepublishRequest
 RepublishResponse
+MonitoredItemModifyRequest
+ModifyMonitoredItemsRequest
+MonitoredItemModifyResult
+ModifyMonitoredItemsResponse