Quellcode durchsuchen

Merge pull request #571 from open62541/refactor_subscriptions

Separate Publish Job for Subscriptions
Julius Pfrommer vor 8 Jahren
Ursprung
Commit
e4ff940e51

+ 6 - 4
CMakeLists.txt

@@ -63,11 +63,11 @@ if(CMAKE_COMPILER_IS_GNUCC OR "x${CMAKE_C_COMPILER_ID}" STREQUAL "xClang")
 
   # Debug
   if(CMAKE_BUILD_TYPE STREQUAL "Debug")
-	#add_definitions(-fsanitize=address)
-    #list(APPEND open62541_LIBRARIES asan)
+	# add_definitions(-fsanitize=address)
+    # list(APPEND open62541_LIBRARIES asan)
 
-	#add_definitions(-fsanitize=undefined)
-    #list(APPEND open62541_LIBRARIES ubsan)
+	# add_definitions(-fsanitize=undefined)
+    # list(APPEND open62541_LIBRARIES ubsan)
 
   elseif(CMAKE_BUILD_TYPE STREQUAL "MinSizeRel" OR
          CMAKE_BUILD_TYPE STREQUAL "Release")
@@ -265,6 +265,7 @@ add_custom_command(OUTPUT ${PROJECT_BINARY_DIR}/src_generated/ua_types_generated
                                                 ${PROJECT_SOURCE_DIR}/tools/schema/Opc.Ua.Types.bsd
                                                 ${PROJECT_BINARY_DIR}/src_generated/ua_types
                    DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/tools/generate_datatypes.py
+                           ${PROJECT_SOURCE_DIR}/tools/schema/datatypes_minimal.txt
                            ${CMAKE_CURRENT_SOURCE_DIR}/tools/schema/Opc.Ua.Types.bsd
                            ${CMAKE_CURRENT_SOURCE_DIR}/tools/schema/NodeIds.csv)
 
@@ -280,6 +281,7 @@ add_custom_command(OUTPUT ${PROJECT_BINARY_DIR}/src_generated/ua_transport_gener
                                                 ${PROJECT_SOURCE_DIR}/tools/schema/Custom.Opc.Ua.Transport.bsd
                                                 ${PROJECT_BINARY_DIR}/src_generated/ua_transport
                    DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/tools/generate_datatypes.py
+                           ${PROJECT_SOURCE_DIR}/tools/schema/datatypes_transport.txt
                            ${CMAKE_CURRENT_SOURCE_DIR}/tools/schema/Custom.Opc.Ua.Transport.bsd)
 
 # nodeids

+ 2 - 2
examples/client.c

@@ -107,14 +107,14 @@ int main(int argc, char *argv[]) {
     
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     // Create a subscription with interval 0 (immediate)...
-    UA_UInt32 subId=0;
+    UA_UInt32 subId = 0;
     UA_Client_Subscriptions_new(client, UA_SubscriptionSettings_standard, &subId);
     if(subId)
         printf("Create subscription succeeded, id %u\n", subId);
     
     // .. and monitor TheAnswer
     UA_NodeId monitorThis = UA_NODEID_STRING(1, "the.answer");
-    UA_UInt32 monId=0;
+    UA_UInt32 monId = 0;
     UA_Client_Subscriptions_addMonitoredItem(client, subId, monitorThis,
                                              UA_ATTRIBUTEID_VALUE, &handler_TheAnswerChanged, NULL, &monId);
     if (monId)

+ 6 - 2
examples/server.c

@@ -161,7 +161,6 @@ getMonitoredItems(void *methodHandle, const UA_NodeId objectId,
                   size_t outputSize, UA_Variant *output) {
     UA_String tmp = UA_STRING("Hello World");
     UA_Variant_setScalarCopy(output, &tmp, &UA_TYPES[UA_TYPES_STRING]);
-    printf("getMonitoredItems was called\n");
     return UA_STATUSCODE_GOOD;
 }
 #endif
@@ -225,6 +224,7 @@ int main(int argc, char** argv) {
     UA_VariableAttributes_init(&v_attr);
     v_attr.description = UA_LOCALIZEDTEXT("en_US","current time");
     v_attr.displayName = UA_LOCALIZEDTEXT("en_US","current time");
+	v_attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
     const UA_QualifiedName dateName = UA_QUALIFIEDNAME(1, "current time");
     UA_NodeId dataSourceId;
     UA_Server_addDataSourceVariableNode(server, UA_NODEID_NULL,
@@ -246,6 +246,7 @@ int main(int argc, char** argv) {
         UA_VariableAttributes_init(&v_attr);
         v_attr.description = UA_LOCALIZEDTEXT("en_US","temperature");
         v_attr.displayName = UA_LOCALIZEDTEXT("en_US","temperature");
+		v_attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
         UA_Server_addDataSourceVariableNode(server, UA_NODEID_NULL,
                                             UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
                                             UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES), tempName,
@@ -271,6 +272,7 @@ int main(int argc, char** argv) {
             UA_VariableAttributes_init(&v_attr);
             v_attr.description = UA_LOCALIZEDTEXT("en_US","status LED");
             v_attr.displayName = UA_LOCALIZEDTEXT("en_US","status LED");
+			v_attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
             const UA_QualifiedName statusName = UA_QUALIFIEDNAME(0, "status LED");
             UA_Server_addDataSourceVariableNode(server, UA_NODEID_NULL,
                                                 UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
@@ -287,6 +289,7 @@ int main(int argc, char** argv) {
     UA_VariableAttributes_init(&myVar);
     myVar.description = UA_LOCALIZEDTEXT("en_US", "the answer");
     myVar.displayName = UA_LOCALIZEDTEXT("en_US", "the answer");
+	myVar.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
     UA_Int32 myInteger = 42;
     UA_Variant_setScalarCopy(&myVar.value, &myInteger, &UA_TYPES[UA_TYPES_INT32]);
     const UA_QualifiedName myIntegerName = UA_QUALIFIEDNAME(1, "the answer");
@@ -348,6 +351,7 @@ int main(int argc, char** argv) {
         char name[15];
         sprintf(name, "%02d", type);
         attr.displayName = UA_LOCALIZEDTEXT("en_US",name);
+		attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
         UA_QualifiedName qualifiedName = UA_QUALIFIEDNAME(1, name);
 
         /* add a scalar node for every built-in type */
@@ -423,7 +427,7 @@ int main(int argc, char** argv) {
     UA_Server_forEachChildNodeCall(server, UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER), nodeIter, NULL);
   
     // Some easy localization
-    UA_LocalizedText objectsName = UA_LOCALIZEDTEXT("de_DE", "Objekte");
+    UA_LocalizedText objectsName = UA_LOCALIZEDTEXT("en_US", "Objects");
     UA_Server_writeDisplayName(server, UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER), objectsName);
   
     //start server

+ 14 - 8
include/ua_server.h

@@ -84,10 +84,14 @@ typedef struct {
 } UA_UsernamePasswordLogin;
 
 typedef struct {
-    UA_UInt32 current;
     UA_UInt32 min;
     UA_UInt32 max;
-} UA_BoundedUInt32;
+} UA_UInt32Range;
+
+typedef struct {
+	UA_Double min;
+	UA_Double max;
+} UA_DoubleRange;
 
 typedef struct {
     UA_UInt16 nThreads; // only if multithreading is enabled
@@ -108,12 +112,14 @@ typedef struct {
     UA_UsernamePasswordLogin* usernamePasswordLogins;
 
     /* Limits for subscription settings */
-    UA_BoundedUInt32 publishingIntervalLimits;
-    UA_BoundedUInt32 lifeTimeCountLimits;
-    UA_BoundedUInt32 keepAliveCountLimits;
-    UA_BoundedUInt32 notificationsPerPublishLimits;
-    UA_BoundedUInt32 samplingIntervalLimits;
-    UA_BoundedUInt32 queueSizeLimits;
+	UA_DoubleRange publishingIntervalLimits;
+	UA_UInt32Range lifeTimeCountLimits;
+	UA_UInt32Range keepAliveCountLimits;
+	UA_UInt32 maxNotificationsPerPublish;
+
+	/* Limits for monitoreditem settings */
+    UA_DoubleRange samplingIntervalLimits;
+	UA_UInt32Range queueSizeLimits;
 } UA_ServerConfig;
 
 /**

+ 8 - 7
plugins/ua_config_standard.c

@@ -44,12 +44,12 @@ const UA_ServerConfig UA_ServerConfig_standard = {
     .usernamePasswordLogins = usernamePasswords,
     .usernamePasswordLoginsSize = 2,
     
-    .publishingIntervalLimits = { .max = 10000, .min = 0, .current = 0 },
-    .lifeTimeCountLimits = { .max = 15000, .min = 0, .current = 0 },
-    .keepAliveCountLimits = { .max = 100, .min = 0, .current = 0 },
-    .notificationsPerPublishLimits = { .max = 1000, .min = 1, .current = 0 },
-    .samplingIntervalLimits = { .max = 1000, .min = 5, .current = 0 },
-    .queueSizeLimits = { .max = 100, .min = 0, .current = 0 }
+	.publishingIntervalLimits = { .min = 100.0, .max = 3600.0 * 1000.0 },
+    .lifeTimeCountLimits = { .max = 15000, .min = 3 },
+    .keepAliveCountLimits = { .max = 100, .min = 1 },
+    .maxNotificationsPerPublish = 1000,
+	.samplingIntervalLimits = { .min = 50.0, .max = 24.0 * 3600.0 * 1000.0 },
+    .queueSizeLimits = { .max = 100, .min = 1 }
 };
 
 const UA_EXPORT UA_ClientConfig UA_ClientConfig_standard = {
@@ -62,4 +62,5 @@ const UA_EXPORT UA_ClientConfig UA_ClientConfig_standard = {
         .recvBufferSize  = 65536,
         .maxMessageSize = 65536,
         .maxChunkCount = 1 },
-    .connectionFunc = UA_ClientConnectionTCP };
+    .connectionFunc = UA_ClientConnectionTCP
+};

+ 81 - 96
src/client/ua_client_highlevel_subscriptions.c

@@ -4,9 +4,9 @@
 #include "ua_types_generated_encoding_binary.h"
 
 const UA_SubscriptionSettings UA_SubscriptionSettings_standard = {
-    .requestedPublishingInterval = 0.0,
-    .requestedLifetimeCount = 100,
-    .requestedMaxKeepAliveCount = 10,
+    .requestedPublishingInterval = 500.0,
+    .requestedLifetimeCount = 10000,
+    .requestedMaxKeepAliveCount = 1,
     .maxNotificationsPerPublish = 10,
     .publishingEnabled = true,
     .priority = 0
@@ -101,6 +101,7 @@ UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscripti
     if(!sub)
         return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
     
+    /* Send the request */
     UA_CreateMonitoredItemsRequest request;
     UA_CreateMonitoredItemsRequest_init(&request);
     request.subscriptionId = subscriptionId;
@@ -115,35 +116,39 @@ UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscripti
     item.requestedParameters.queueSize = 1;
     request.itemsToCreate = &item;
     request.itemsToCreateSize = 1;
-    // Filter can be left void for now, only changes are supported (UA_Expert does the same with changeItems)
-    
     UA_CreateMonitoredItemsResponse response = UA_Client_Service_createMonitoredItems(client, request);
     
-    UA_StatusCode retval;
     // slight misuse of retval here to check if the deletion was successfull.
+    UA_StatusCode retval;
     if(response.resultsSize == 0)
         retval = response.responseHeader.serviceResult;
     else
         retval = response.results[0].statusCode;
-    
-    if(retval == UA_STATUSCODE_GOOD) {
-        UA_Client_MonitoredItem *newMon = UA_malloc(sizeof(UA_Client_MonitoredItem));
-        newMon->MonitoringMode = UA_MONITORINGMODE_REPORTING;
-        UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId); 
-        newMon->AttributeID = attributeID;
-        newMon->ClientHandle = client->monitoredItemHandles;
-        newMon->SamplingInterval = sub->PublishingInterval;
-        newMon->QueueSize = 1;
-        newMon->DiscardOldest = true;
-        newMon->handler = handlingFunction;
-        newMon->handlerContext = handlingContext;
-        newMon->MonitoredItemId = response.results[0].monitoredItemId;
-        LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
-        *newMonitoredItemId = newMon->MonitoredItemId;
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_CreateMonitoredItemsResponse_deleteMembers(&response);
+        return retval;
     }
+
+    /* Create the handler */
+    UA_Client_MonitoredItem *newMon = UA_malloc(sizeof(UA_Client_MonitoredItem));
+    newMon->MonitoringMode = UA_MONITORINGMODE_REPORTING;
+    UA_NodeId_copy(&nodeId, &newMon->monitoredNodeId); 
+    newMon->AttributeID = attributeID;
+    newMon->ClientHandle = client->monitoredItemHandles;
+    newMon->SamplingInterval = sub->PublishingInterval;
+    newMon->QueueSize = 1;
+    newMon->DiscardOldest = true;
+    newMon->handler = handlingFunction;
+    newMon->handlerContext = handlingContext;
+    newMon->MonitoredItemId = response.results[0].monitoredItemId;
+    LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
+    *newMonitoredItemId = newMon->MonitoredItemId;
+
+    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                 "Created a monitored item with client handle %u", client->monitoredItemHandles);
     
     UA_CreateMonitoredItemsResponse_deleteMembers(&response);
-    return retval;
+    return UA_STATUSCODE_GOOD;
 }
 
 UA_StatusCode
@@ -191,95 +196,77 @@ UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscri
     return retval;
 }
 
-static UA_Boolean
-UA_Client_processPublishRx(UA_Client *client, UA_PublishResponse response) {
-    if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
-        return false;
-    
-    // Check if the server has acknowledged any of our ACKS
-    // Note that a list of serverside status codes may be send without valid publish data, i.e. 
-    // during keepalives or no data availability
+static void
+UA_Client_processPublishResponse(UA_Client *client, UA_PublishResponse *response) {
+    if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
+        return;
+
+    /* Find the subscription */
+    UA_Client_Subscription *sub;
+    LIST_FOREACH(sub, &client->subscriptions, listEntry) {
+        if(sub->SubscriptionID == response->subscriptionId)
+            break;
+    }
+    if(!sub)
+        return;
+
+    UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                 "Processing a publish response on subscription %u with %u notifications",
+                 sub->SubscriptionID, response->notificationMessage.notificationDataSize);
+
+    /* Check if the server has acknowledged any of our ACKS */
+    // TODO: The acks should be attached to the subscription
     UA_Client_NotificationsAckNumber *ack, *tmpAck;
     size_t i = 0;
     LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, tmpAck) {
-        if(response.results[i] == UA_STATUSCODE_GOOD ||
-           response.results[i] == UA_STATUSCODE_BADSEQUENCENUMBERINVALID) {
+        if(response->results[i] == UA_STATUSCODE_GOOD ||
+           response->results[i] == UA_STATUSCODE_BADSEQUENCENUMBERINVALID) {
             LIST_REMOVE(ack, listEntry);
             UA_free(ack);
         }
         i++;
     }
     
-    if(response.subscriptionId == 0)
-        return false;
-    
-    UA_Client_Subscription *sub;
-    LIST_FOREACH(sub, &client->subscriptions, listEntry) {
-        if(sub->SubscriptionID == response.subscriptionId)
-            break;
-    }
-    if(!sub)
-        return false;
-    
-    UA_NotificationMessage msg = response.notificationMessage;
-    UA_Client_MonitoredItem *mon;
-    for(size_t k = 0; k < msg.notificationDataSize; k++) {
-        if(msg.notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
+    /* Process the notification messages */
+    UA_NotificationMessage *msg = &response->notificationMessage;
+    for(size_t k = 0; k < msg->notificationDataSize; k++) {
+        if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
             continue;
         
-        if(msg.notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
-            // This is a dataChangeNotification
-            UA_DataChangeNotification *dataChangeNotification = msg.notificationData[k].content.decoded.data;
-            for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; j++) {
+        /* Currently only dataChangeNotifications are supported */
+        if(msg->notificationData[k].content.decoded.type != &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION])
+            continue;
+        
+        UA_DataChangeNotification *dataChangeNotification = msg->notificationData[k].content.decoded.data;
+        for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; j++) {
             UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
-                // find this client handle
-                LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
-                    if(mon->ClientHandle == mitemNot->clientHandle) {
-                        mon->handler(mon->MonitoredItemId, &mitemNot->value, mon->handlerContext);
-                        break;
-                    }
+            UA_Client_MonitoredItem *mon;
+            LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+                if(mon->ClientHandle == mitemNot->clientHandle) {
+                    mon->handler(mon->MonitoredItemId, &mitemNot->value, mon->handlerContext);
+                    break;
                 }
             }
-            continue;
+            if(!mon)
+                UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
+                             "Could not process a notification with clienthandle %u on subscription %u",
+                             mitemNot->clientHandle, sub->SubscriptionID);
         }
-
-        /* if(msg.notificationData[k].typeId.namespaceIndex == 0 && */
-        /*    msg.notificationData[k].typeId.identifier.numeric == 820 ) { */
-        /*     //FIXME: This is a statusChangeNotification (not supported yet) */
-        /*     continue; */
-        /* } */
-
-        /* if(msg.notificationData[k].typeId.namespaceIndex == 0 && */
-        /*    msg.notificationData[k].typeId.identifier.numeric == 916 ) { */
-        /*     //FIXME: This is an EventNotification */
-        /*     continue; */
-        /* } */
-    }
-    
-    /* We processed this message, add it to the list of pending acks (but make
-       sure it's not in the list first) */
-    LIST_FOREACH(tmpAck, &client->pendingNotificationsAcks, listEntry) {
-        if(tmpAck->subAck.sequenceNumber == msg.sequenceNumber &&
-            tmpAck->subAck.subscriptionId == response.subscriptionId)
-            break;
-    }
-
-    if(!tmpAck) {
-        tmpAck = UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
-        tmpAck->subAck.sequenceNumber = msg.sequenceNumber;
-        tmpAck->subAck.subscriptionId = sub->SubscriptionID;
-        LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
     }
     
-    return response.moreNotifications;
+    /* Add to the list of pending acks */
+    tmpAck = UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
+    tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
+    tmpAck->subAck.subscriptionId = sub->SubscriptionID;
+    LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
 }
 
 UA_StatusCode UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
-    if (client->state == UA_CLIENTSTATE_ERRORED){
+    if (client->state == UA_CLIENTSTATE_ERRORED)
         return UA_STATUSCODE_BADSERVERNOTCONNECTED;
-    }
+
     UA_Boolean moreNotifications = true;
-    do {
+    while(moreNotifications == true) {
         UA_PublishRequest request;
         UA_PublishRequest_init(&request);
         request.subscriptionAcknowledgementsSize = 0;
@@ -288,8 +275,8 @@ UA_StatusCode UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *clie
         LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
             request.subscriptionAcknowledgementsSize++;
         if(request.subscriptionAcknowledgementsSize > 0) {
-            request.subscriptionAcknowledgements = UA_malloc(sizeof(UA_SubscriptionAcknowledgement) *
-                                                             request.subscriptionAcknowledgementsSize);
+            request.subscriptionAcknowledgements =
+                UA_malloc(sizeof(UA_SubscriptionAcknowledgement) * request.subscriptionAcknowledgementsSize);
             if(!request.subscriptionAcknowledgements)
                 return UA_STATUSCODE_GOOD;
         }
@@ -302,13 +289,11 @@ UA_StatusCode UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *clie
         }
         
         UA_PublishResponse response = UA_Client_Service_publish(client, request);
-        if(response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
-            moreNotifications = UA_Client_processPublishRx(client, response);
-        else
-            moreNotifications = false;
+        UA_Client_processPublishResponse(client, &response);
+        moreNotifications = response.moreNotifications;
         
         UA_PublishResponse_deleteMembers(&response);
         UA_PublishRequest_deleteMembers(&request);
-    } while(moreNotifications == true);
+    }
     return UA_STATUSCODE_GOOD;
 }

+ 17 - 2
src/server/ua_server_binary.c

@@ -241,6 +241,11 @@ getServicePointers(UA_UInt32 requestTypeId, const UA_DataType **requestType,
         *requestType = &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST];
         *responseType = &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE];
         break;
+	case UA_NS0ID_SETPUBLISHINGMODEREQUEST:
+		*service = (UA_Service)Service_SetPublishingMode;
+		*requestType = &UA_TYPES[UA_TYPES_SETPUBLISHINGMODEREQUEST];
+		*responseType = &UA_TYPES[UA_TYPES_SETPUBLISHINGMODERESPONSE];
+		break;
     case UA_NS0ID_DELETESUBSCRIPTIONSREQUEST:
         *service = (UA_Service)Service_DeleteSubscriptions;
         *requestType = &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST];
@@ -256,6 +261,11 @@ getServicePointers(UA_UInt32 requestTypeId, const UA_DataType **requestType,
         *requestType = &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST];
         *responseType = &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE];
         break;
+    case UA_NS0ID_MODIFYMONITOREDITEMSREQUEST:
+        *service = (UA_Service)Service_ModifyMonitoredItems;
+        *requestType = &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSREQUEST];
+        *responseType = &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSRESPONSE];
+        break;
 #endif
 
 #ifdef UA_ENABLE_METHODCALLS
@@ -348,7 +358,6 @@ chunkEntryFromRequestId(UA_SecureChannel *channel, UA_UInt32 requestId) {
             return ch;
         }
     }
-
     return NULL;
 }
 
@@ -543,7 +552,7 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
     UA_Session_updateLifetime(session);
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS
-    /* The publish request is answered with a delay */
+    /* The publish request is answered asynchronously */
     if(requestTypeId.identifier.numeric - UA_ENCODINGOFFSET_BINARY == UA_NS0ID_PUBLISHREQUEST) {
         Service_Publish(server, session, request, sequenceHeader.requestId);
         UA_deleteMembers(request, requestType);
@@ -552,6 +561,12 @@ processMSG(UA_Connection *connection, UA_Server *server, const UA_ByteString *ms
 #endif
         
     /* Call the service */
+    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "Processing a service with type id %u on Session %u",
+                 requestType->typeId.identifier.numeric, session->authenticationToken.identifier.numeric);
+    UA_assert(service);
+    UA_assert(requestType);
+    UA_assert(responseType);
     void *response = UA_alloca(responseType->memSize);
     UA_init(response, responseType);
     init_response_header(request, response);

+ 20 - 15
src/server/ua_server_worker.c

@@ -200,7 +200,7 @@ static UA_StatusCode addRepeatedJob(UA_Server *server, struct AddRepeatedJob * U
     UA_StatusCode retval = UA_STATUSCODE_GOOD;
 
     /* search for matching entry */
-    UA_DateTime firstTime = UA_DateTime_nowMonotonic() + arw->interval;
+    UA_DateTime firstTime = UA_DateTime_nowMonotonic();
     tempTw = LIST_FIRST(&server->repeatedJobs);
     while(tempTw) {
         if(arw->interval == tempTw->interval) {
@@ -292,8 +292,9 @@ UA_StatusCode UA_Server_addRepeatedJob(UA_Server *server, UA_Job job, UA_UInt32
 
 /* Returns the next datetime when a repeated job is scheduled */
 static UA_DateTime processRepeatedJobs(UA_Server *server, UA_DateTime current) {
-    struct RepeatedJobs *tw = NULL;
-    while((tw = LIST_FIRST(&server->repeatedJobs)) != NULL) {
+    struct RepeatedJobs *tw, *tmp_tw;
+    /* Iterate over the list of elements (sorted according to the next execution timestamp) */
+    LIST_FOREACH_SAFE(tw, &server->repeatedJobs, pointers, tmp_tw) {
         if(tw->nextTime > current)
             break;
 
@@ -309,18 +310,26 @@ static UA_DateTime processRepeatedJobs(UA_Server *server, UA_DateTime current) {
             jobsCopy[i] = tw->jobs[i].job;
         dispatchJobs(server, jobsCopy, tw->jobsSize); // frees the job pointer
 #else
-        for(size_t i=0;i<tw->jobsSize;i++)
-            //processJobs may sort the list but dont delete entries
+		size_t size = tw->jobsSize;
+        for(size_t i = 0; i < size; i++)
             processJobs(server, &tw->jobs[i].job, 1); // does not free the job ptr
 #endif
 
-        /* set the time for the next execution */
+		/* Elements are removed only here. Check if empty. */
+		if(tw->jobsSize == 0) {
+			LIST_REMOVE(tw, pointers);
+			UA_free(tw);
+            UA_assert(LIST_FIRST(&server->repeatedJobs) != tw); /* Assert for static code checkers */
+			continue;
+		}
+
+        /* Set the time for the next execution */
         tw->nextTime += tw->interval;
         if(tw->nextTime < current)
             tw->nextTime = current;
 
-        //start iterating the list from the beginning
-        struct RepeatedJobs *prevTw = LIST_FIRST(&server->repeatedJobs); // after which tw do we insert?
+        /* Reinsert to keep the list sorted */
+        struct RepeatedJobs *prevTw = LIST_FIRST(&server->repeatedJobs);
         while(true) {
             struct RepeatedJobs *n = LIST_NEXT(prevTw, pointers);
             if(!n || n->nextTime > tw->nextTime)
@@ -349,14 +358,10 @@ static void removeRepeatedJob(UA_Server *server, UA_Guid *jobId) {
         for(size_t i = 0; i < tw->jobsSize; i++) {
             if(!UA_Guid_equal(jobId, &tw->jobs[i].id))
                 continue;
-            if(tw->jobsSize == 1) {
-                LIST_REMOVE(tw, pointers);
-                UA_free(tw);
-            } else {
-                tw->jobsSize--;
+			tw->jobsSize--; /* if size == 0, tw is freed during the next processing */
+            if(tw->jobsSize > 0)
                 tw->jobs[i] = tw->jobs[tw->jobsSize]; // move the last entry to overwrite
-            }
-            goto finish; // ugly break
+            goto finish;
         }
     }
  finish:

+ 8 - 2
src/server/ua_services.h

@@ -274,7 +274,9 @@ void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
                                   const UA_DeleteMonitoredItemsRequest *request,
                                   UA_DeleteMonitoredItemsResponse *response);
 
-/* Not Implemented: Service_ModifyMonitoredItems */
+void Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,
+                                  const UA_ModifyMonitoredItemsRequest *request,
+                                  UA_ModifyMonitoredItemsResponse *response);
 /* Not Implemented: Service_SetMonitoringMode */
 /* Not Implemented: Service_SetTriggering */
 
@@ -298,6 +300,11 @@ void Service_ModifySubscription(UA_Server *server, UA_Session *session,
                                 const UA_ModifySubscriptionRequest *request,
                                 UA_ModifySubscriptionResponse *response);
 
+/* Used to enable sending of Notifications on one or more Subscriptions. */
+void Service_SetPublishingMode(UA_Server *server, UA_Session *session,
+	                           const UA_SetPublishingModeRequest *request,
+	                           UA_SetPublishingModeResponse *response);
+
 /* Used for two purposes. First, it is used to acknowledge the receipt of
  * NotificationMessages for one or more Subscriptions. Second, it is used to
  * request the Server to return a NotificationMessage or a keep-alive
@@ -321,7 +328,6 @@ void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
                                  const UA_DeleteSubscriptionsRequest *request,
                                  UA_DeleteSubscriptionsResponse *response);
 
-/* Not Implemented: Service_SetPublishingMode */
 /* Not Implemented: Service_TransferSubscription */
 
 #endif

+ 1 - 1
src/server/ua_services_attribute.c

@@ -203,7 +203,7 @@ void Service_Read_single(UA_Server *server, UA_Session *session, const UA_Timest
                          const UA_ReadValueId *id, UA_DataValue *v) {
 	if(id->dataEncoding.name.length > 0 && !UA_String_equal(&binEncoding, &id->dataEncoding.name)) {
            v->hasStatus = true;
-           v->status = UA_STATUSCODE_BADDATAENCODINGINVALID;
+           v->status = UA_STATUSCODE_BADDATAENCODINGUNSUPPORTED;
            return;
 	}
 

+ 3 - 3
src/server/ua_services_nodemanagement.c

@@ -765,8 +765,7 @@ Service_AddReferences_single(UA_Server *server, UA_Session *session, const UA_Ad
 
     /* cast away the const to loop the call through UA_Server_editNode */
     UA_StatusCode retval = UA_Server_editNode(server, session, &item->sourceNodeId,
-                                              (UA_EditNodeCallback)addOneWayReference,
-                                              item);
+                                              (UA_EditNodeCallback)addOneWayReference, item);
     if(retval != UA_STATUSCODE_GOOD)
         return retval;
 
@@ -782,7 +781,8 @@ Service_AddReferences_single(UA_Server *server, UA_Session *session, const UA_Ad
     return retval;
 } 
 
-void Service_AddReferences(UA_Server *server, UA_Session *session, const UA_AddReferencesRequest *request,
+void Service_AddReferences(UA_Server *server, UA_Session *session,
+                           const UA_AddReferencesRequest *request,
                            UA_AddReferencesResponse *response) {
 	if(request->referencesToAddSize <= 0) {
 		response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;

+ 272 - 212
src/server/ua_services_subscription.c

@@ -3,116 +3,195 @@
 #include "ua_subscription.h"
 
 #define UA_BOUNDEDVALUE_SETWBOUNDS(BOUNDS, SRC, DST) { \
-    if(SRC > BOUNDS.max) DST = BOUNDS.max; \
-    else if(SRC < BOUNDS.min) DST = BOUNDS.min; \
-    else DST = SRC; \
+        if(SRC > BOUNDS.max) DST = BOUNDS.max;         \
+        else if(SRC < BOUNDS.min) DST = BOUNDS.min;    \
+        else DST = SRC;                                \
     }
 
+static void
+setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
+                        UA_Double requestedPublishingInterval,
+                        UA_UInt32 requestedLifetimeCount,
+                        UA_UInt32 requestedMaxKeepAliveCount,
+                        UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) {
+    Subscription_unregisterPublishJob(server, subscription);
+	subscription->publishingInterval = requestedPublishingInterval;
+	UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
+		requestedPublishingInterval, subscription->publishingInterval);
+	/* check for nan*/
+	if(requestedPublishingInterval != requestedPublishingInterval)
+		subscription->publishingInterval = server->config.publishingIntervalLimits.min;
+    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
+                               requestedMaxKeepAliveCount, subscription->maxKeepAliveCount);
+    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
+                               requestedLifetimeCount, subscription->lifeTimeCount);
+    if(subscription->lifeTimeCount < 3 * subscription->maxKeepAliveCount)
+        subscription->lifeTimeCount = 3 * subscription->maxKeepAliveCount;
+	subscription->notificationsPerPublish = maxNotificationsPerPublish;
+	if(maxNotificationsPerPublish == 0 || maxNotificationsPerPublish > server->config.maxNotificationsPerPublish)
+		subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish;
+    subscription->priority = priority;
+    Subscription_registerPublishJob(server, subscription);
+}
+
 void Service_CreateSubscription(UA_Server *server, UA_Session *session,
                                 const UA_CreateSubscriptionRequest *request,
                                 UA_CreateSubscriptionResponse *response) {
     response->subscriptionId = UA_Session_getUniqueSubscriptionID(session);
-    UA_Subscription *newSubscription = UA_Subscription_new(response->subscriptionId);
+    UA_Subscription *newSubscription = UA_Subscription_new(session, response->subscriptionId);
     if(!newSubscription) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
         return;
     }
-    
-    /* set the publishing interval */
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
-                               request->requestedPublishingInterval, response->revisedPublishingInterval);
-    newSubscription->publishingInterval = response->revisedPublishingInterval;
-    
-    /* set the subscription lifetime (deleted when no publish requests arrive within this time) */
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
-                               request->requestedLifetimeCount, response->revisedLifetimeCount);
-    newSubscription->lifeTime = (UA_BoundedUInt32)  {
-        .min = server->config.lifeTimeCountLimits.min,
-        .max = server->config.lifeTimeCountLimits.max,
-        .current=response->revisedLifetimeCount};
-    
-    /* set the keepalive count. the server sends an empty notification when
-       nothin has happened for n publishing intervals */
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
-                               request->requestedMaxKeepAliveCount, response->revisedMaxKeepAliveCount);
-    newSubscription->keepAliveCount = (UA_BoundedUInt32)  {
-        .min = server->config.keepAliveCountLimits.min,
-        .max = server->config.keepAliveCountLimits.max,
-        .current = response->revisedMaxKeepAliveCount};
-    
-    newSubscription->notificationsPerPublish = request->maxNotificationsPerPublish;
-    newSubscription->publishingMode          = request->publishingEnabled;
-    newSubscription->priority                = request->priority;
-    
-    /* add the update job */
-    Subscription_registerUpdateJob(server, newSubscription);
+
     UA_Session_addSubscription(session, newSubscription);    
+    newSubscription->publishingEnabled = request->publishingEnabled;
+    setSubscriptionSettings(server, newSubscription, request->requestedPublishingInterval,
+                            request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
+                            request->maxNotificationsPerPublish, request->priority);
+    newSubscription->currentKeepAliveCount = newSubscription->maxKeepAliveCount; /* immediately send the first response */
+    response->revisedPublishingInterval = newSubscription->publishingInterval;
+    response->revisedLifetimeCount = newSubscription->lifeTimeCount;
+    response->revisedMaxKeepAliveCount = newSubscription->maxKeepAliveCount;
+}
+
+void Service_ModifySubscription(UA_Server *server, UA_Session *session,
+                                const UA_ModifySubscriptionRequest *request,
+                                UA_ModifySubscriptionResponse *response) {
+    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
+    if(!sub) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+        return;
+    }
+
+    setSubscriptionSettings(server, sub, request->requestedPublishingInterval,
+                            request->requestedLifetimeCount, request->requestedMaxKeepAliveCount,
+                            request->maxNotificationsPerPublish, request->priority);
+    sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */
+    response->revisedPublishingInterval = sub->publishingInterval;
+    response->revisedLifetimeCount = sub->lifeTimeCount;
+    response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount;
+    return;
+}
+
+void Service_SetPublishingMode(UA_Server *server, UA_Session *session,
+	const UA_SetPublishingModeRequest *request,	UA_SetPublishingModeResponse *response) {
+
+	if(request->subscriptionIdsSize <= 0) {
+		response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
+		return;
+	}
+
+	size_t size = request->subscriptionIdsSize;
+	response->results = UA_Array_new(size, &UA_TYPES[UA_TYPES_STATUSCODE]);
+	if(!response->results) {
+		response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
+		return;
+	}
+
+	response->resultsSize = size;
+	for(size_t i = 0; i < size; i++) {
+		UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionIds[i]);
+		if(!sub) {
+			response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+			continue;
+		}
+		sub->publishingEnabled = request->publishingEnabled;
+        sub->currentLifetimeCount = 0; /* Reset the subscription lifetime */
+	}
+}
+
+static void
+setMonitoredItemSettings(UA_Server *server, UA_MonitoredItem *mon,
+                         UA_MonitoringMode monitoringMode, UA_UInt32 clientHandle,
+                         UA_Double samplingInterval, UA_UInt32 queueSize,
+                         UA_Boolean discardOldest) {
+    MonitoredItem_unregisterSampleJob(server, mon);
+    mon->monitoringMode = monitoringMode;
+    mon->clientHandle = clientHandle;
+	mon->samplingInterval = samplingInterval;
+	UA_BOUNDEDVALUE_SETWBOUNDS(server->config.samplingIntervalLimits,
+		samplingInterval, mon->samplingInterval);
+	/* Check for nan */
+	if(samplingInterval != samplingInterval)
+		mon->samplingInterval = server->config.samplingIntervalLimits.min;
+    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.queueSizeLimits,
+                               queueSize, mon->maxQueueSize);
+    mon->discardOldest = discardOldest;
+    MonitoredItem_registerSampleJob(server, mon);
 }
 
+static const UA_String binaryEncoding = {sizeof("DefaultBinary")-1, (UA_Byte*)"DefaultBinary"};
 static void
-createMonitoredItems(UA_Server *server, UA_Session *session, UA_Subscription *sub,
-                     const UA_MonitoredItemCreateRequest *request, UA_MonitoredItemCreateResult *result) {
+Service_CreateMonitoredItems_single(UA_Server *server, UA_Session *session, UA_Subscription *sub,
+                                    const UA_TimestampsToReturn timestampsToReturn,
+                                    const UA_MonitoredItemCreateRequest *request,
+                                    UA_MonitoredItemCreateResult *result) {
+    /* Check if the target exists */
     const UA_Node *target = UA_NodeStore_get(server->nodestore, &request->itemToMonitor.nodeId);
     if(!target) {
         result->statusCode = UA_STATUSCODE_BADNODEIDINVALID;
         return;
     }
+    // TODO: Check if the target node type has the requested attribute
 
+    /* Check if the encoding is supported */
+	if(request->itemToMonitor.dataEncoding.name.length > 0 &&
+       !UA_String_equal(&binaryEncoding, &request->itemToMonitor.dataEncoding.name)) {
+        result->statusCode = UA_STATUSCODE_BADDATAENCODINGUNSUPPORTED;
+        return;
+    }
+
+    /* Create the monitoreditem */
     UA_MonitoredItem *newMon = UA_MonitoredItem_new();
     if(!newMon) {
         result->statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
         return;
     }
-
     UA_StatusCode retval = UA_NodeId_copy(&target->nodeId, &newMon->monitoredNodeId);
     if(retval != UA_STATUSCODE_GOOD) {
-        result->statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
-        MonitoredItem_delete(newMon);
+        result->statusCode = retval;
+        MonitoredItem_delete(server, newMon);
         return;
     }
-
-    newMon->itemId = UA_Session_getUniqueSubscriptionID(session);
-    result->monitoredItemId = newMon->itemId;
-    newMon->clientHandle = request->requestedParameters.clientHandle;
-
-    /* set the sampling interval */
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.samplingIntervalLimits,
-                               request->requestedParameters.samplingInterval,
-                               result->revisedSamplingInterval);
-    newMon->samplingInterval = (UA_UInt32)result->revisedSamplingInterval;
-
-    /* set the queue size */
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.queueSizeLimits,
-                               request->requestedParameters.queueSize,
-                               result->revisedQueueSize);
-    newMon->queueSize = (UA_BoundedUInt32) {
-        .max=(result->revisedQueueSize) + 1,
-        .min=0, .current=0 };
-
+    newMon->subscription = sub;
     newMon->attributeID = request->itemToMonitor.attributeId;
-    newMon->monitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
-    newMon->discardOldest = request->requestedParameters.discardOldest;
+    newMon->itemId = UA_Session_getUniqueSubscriptionID(session);
+    newMon->timestampsToReturn = timestampsToReturn;
+    setMonitoredItemSettings(server, newMon, request->monitoringMode,
+                             request->requestedParameters.clientHandle,
+                             request->requestedParameters.samplingInterval,
+                             request->requestedParameters.queueSize,
+                             request->requestedParameters.discardOldest);
     LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
 
-    // todo: add a job that samples the value (for fixed intervals)
-    // todo: add a pointer to the monitoreditem to the variable, so that events get propagated
+    /* Prepare the response */
+    UA_String_copy(&request->itemToMonitor.indexRange, &newMon->indexRange);
+    result->revisedSamplingInterval = newMon->samplingInterval;
+    result->revisedQueueSize = newMon->maxQueueSize;
+    result->monitoredItemId = newMon->itemId;
 }
 
-void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
-                                  const UA_CreateMonitoredItemsRequest *request,
-                                  UA_CreateMonitoredItemsResponse *response) {
+void
+Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
+                             const UA_CreateMonitoredItemsRequest *request,
+                             UA_CreateMonitoredItemsResponse *response) {
     UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
     if(!sub) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
         return;
     }
+
+    /* Reset the subscription lifetime */
+    sub->currentLifetimeCount = 0;
     
     if(request->itemsToCreateSize <= 0) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
         return;
     }
 
-    response->results = UA_Array_new(request->itemsToCreateSize, &UA_TYPES[UA_TYPES_MONITOREDITEMCREATERESULT]);
+    response->results = UA_Array_new(request->itemsToCreateSize,
+                                     &UA_TYPES[UA_TYPES_MONITOREDITEMCREATERESULT]);
     if(!response->results) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
         return;
@@ -120,136 +199,128 @@ void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
     response->resultsSize = request->itemsToCreateSize;
 
     for(size_t i = 0; i < request->itemsToCreateSize; i++)
-        createMonitoredItems(server, session, sub, &request->itemsToCreate[i], &response->results[i]);
+        Service_CreateMonitoredItems_single(server, session, sub, request->timestampsToReturn,
+                                            &request->itemsToCreate[i], &response->results[i]);
 }
 
-void
-Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest *request,
-                UA_UInt32 requestId) {
-    UA_PublishResponse response;
-    UA_PublishResponse_init(&response);
-    response.responseHeader.requestHandle = request->requestHeader.requestHandle;
-    response.responseHeader.timestamp = UA_DateTime_now();
+static void
+Service_ModifyMonitoredItems_single(UA_Server *server, UA_Session *session, UA_Subscription *sub,
+                                    const UA_MonitoredItemModifyRequest *request,
+                                    UA_MonitoredItemModifyResult *result) {
+    UA_MonitoredItem *mon = UA_Subscription_getMonitoredItem(sub, request->monitoredItemId);
+    if(!mon) {
+        result->statusCode = UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
+        return;
+    }
+
+    setMonitoredItemSettings(server, mon, mon->monitoringMode,
+                             request->requestedParameters.clientHandle,
+                             request->requestedParameters.samplingInterval,
+                             request->requestedParameters.queueSize,
+                             request->requestedParameters.discardOldest);
+    result->revisedSamplingInterval = mon->samplingInterval;
+    result->revisedQueueSize = mon->maxQueueSize;
+}
+
+void Service_ModifyMonitoredItems(UA_Server *server, UA_Session *session,
+                                  const UA_ModifyMonitoredItemsRequest *request,
+                                  UA_ModifyMonitoredItemsResponse *response) {
+    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
+    if(!sub) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+        return;
+    }
+
+    /* Reset the subscription lifetime */
+    sub->currentLifetimeCount = 0;
     
-    // Delete Acknowledged Subscription Messages
-    response.resultsSize = request->subscriptionAcknowledgementsSize;
-    response.results = UA_calloc(response.resultsSize, sizeof(UA_StatusCode));
+    if(request->itemsToModifySize <= 0) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
+        return;
+    }
+
+    response->results = UA_Array_new(request->itemsToModifySize,
+                                     &UA_TYPES[UA_TYPES_MONITOREDITEMMODIFYRESULT]);
+    if(!response->results) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
+        return;
+    }
+    response->resultsSize = request->itemsToModifySize;
+
+    for(size_t i = 0; i < request->itemsToModifySize; i++)
+        Service_ModifyMonitoredItems_single(server, session, sub, &request->itemsToModify[i],
+                                            &response->results[i]);
+
+}
+
+void
+Service_Publish(UA_Server *server, UA_Session *session,
+	            const UA_PublishRequest *request, UA_UInt32 requestId) {
+	/* Return an error if the session has no subscription */
+	if(LIST_EMPTY(&session->serverSubscriptions)) {
+		UA_PublishResponse response;
+		UA_PublishResponse_init(&response);
+		response.responseHeader.requestHandle = request->requestHeader.requestHandle;
+		response.responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
+		UA_SecureChannel_sendBinaryMessage(session->channel, requestId, &response,
+			                               &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
+		return;
+	}
+
+	// todo error handling for malloc
+    UA_PublishResponseEntry *entry = UA_malloc(sizeof(UA_PublishResponseEntry));
+	entry->requestId = requestId;
+    UA_PublishResponse *response = &entry->response;
+	UA_PublishResponse_init(response);
+	response->responseHeader.requestHandle = request->requestHeader.requestHandle;
+
+    /* Delete Acknowledged Subscription Messages */
+    response->results = UA_malloc(request->subscriptionAcknowledgementsSize * sizeof(UA_StatusCode));
+    if(!response->results) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
+        return;
+    }
+    response->resultsSize = request->subscriptionAcknowledgementsSize;
     for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; i++) {
-        response.results[i] = UA_STATUSCODE_GOOD;
-        UA_UInt32 sid = request->subscriptionAcknowledgements[i].subscriptionId;
-        UA_Subscription *sub = UA_Session_getSubscriptionByID(session, sid);
+        UA_SubscriptionAcknowledgement *ack = &request->subscriptionAcknowledgements[i];
+        UA_Subscription *sub = UA_Session_getSubscriptionByID(session, ack->subscriptionId);
         if(!sub) {
-            response.results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+            response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+            UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                         "Cannot process acknowledgements subscription %u", ack->subscriptionId);
             continue;
         }
-        UA_UInt32 sn = request->subscriptionAcknowledgements[i].sequenceNumber;
-        if(Subscription_deleteUnpublishedNotification(sn, false, sub) == 0)
-            response.results[i] = UA_STATUSCODE_BADSEQUENCENUMBERINVALID;
-    }
-    
-    UA_Boolean have_response = false;
-
-    // See if any new data is available
-    UA_Subscription *sub;
-    LIST_FOREACH(sub, &session->serverSubscriptions, listEntry) {
-        if(sub->timedUpdateIsRegistered == false) {
-            // FIXME: We are forcing a value update for monitored items. This should be done by the event system.
-            // NOTE:  There is a clone of this functionality in the Subscription_timedUpdateNotificationsJob
-            UA_MonitoredItem *mon;
-            LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
-                MonitoredItem_QueuePushDataValue(server, mon);
-            
-            // FIXME: We are forcing notification updates for the subscription. This
-            // should be done by a timed work item.
-            Subscription_updateNotifications(sub);
-        }
-        
-        if(sub->unpublishedNotificationsSize == 0)
-            continue;
-        
-        // This subscription has notifications in its queue (top NotificationMessage exists in the queue). 
-        // Due to republish, we need to check if there are any unplublished notifications first ()
-        UA_unpublishedNotification *notification = NULL;
-        LIST_FOREACH(notification, &sub->unpublishedNotifications, listEntry) {
-            if (notification->publishedOnce == false)
+
+        response->results[i] = UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN;
+        UA_NotificationMessageEntry *pre, *pre_tmp;
+        LIST_FOREACH_SAFE(pre, &sub->retransmissionQueue, listEntry, pre_tmp) {
+            if(pre->message.sequenceNumber == ack->sequenceNumber) {
+                LIST_REMOVE(pre, listEntry);
+                response->results[i] = UA_STATUSCODE_GOOD;
+                UA_NotificationMessage_deleteMembers(&pre->message);
+                UA_free(pre);
                 break;
+            }
         }
-        if (notification == NULL)
-            continue;
-    
-        // We found an unpublished notification message in this subscription, which we will now publish.
-        response.subscriptionId = sub->subscriptionID;
-        Subscription_copyNotificationMessage(&response.notificationMessage, notification);
-        // Mark this notification as published
-        notification->publishedOnce = true;
-        if(notification->notification.sequenceNumber > sub->sequenceNumber) {
-            // If this is a keepalive message, its seqNo is the next seqNo to be used for an actual msg.
-            response.availableSequenceNumbersSize = 0;
-            // .. and must be deleted
-            Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, false, sub);
-        } else {
-            response.availableSequenceNumbersSize = sub->unpublishedNotificationsSize;
-            response.availableSequenceNumbers = Subscription_getAvailableSequenceNumbers(sub);
-        }	  
-        have_response = true;
     }
-    
-    if(!have_response) {
-        // FIXME: At this point, we would return nothing and "queue" the publish
-        // request, but currently we need to return something to the client. If no
-        // subscriptions have notifications, force one to generate a keepalive so we
-        // don't return an empty message
-        sub = LIST_FIRST(&session->serverSubscriptions);
-        if(sub) {
-            response.subscriptionId = sub->subscriptionID;
-            sub->keepAliveCount.current=sub->keepAliveCount.min;
-            Subscription_generateKeepAlive(sub);
-            Subscription_copyNotificationMessage(&response.notificationMessage,
-                                                 LIST_FIRST(&sub->unpublishedNotifications));
-            Subscription_deleteUnpublishedNotification(sub->sequenceNumber + 1, false, sub);
+
+    /* Queue the publish response */
+    SIMPLEQ_INSERT_TAIL(&session->responseQueue, entry, listEntry);
+    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "Queued a publication message on session %u",
+                 session->authenticationToken.identifier.numeric);
+
+    /* Answer immediately to a late subscription */
+    UA_Subscription *immediate;
+    LIST_FOREACH(immediate, &session->serverSubscriptions, listEntry) {
+        if(immediate->state == UA_SUBSCRIPTIONSTATE_LATE) {
+            UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                         "Response on a late subscription on session %u",
+                         session->authenticationToken.identifier.numeric);
+            UA_Subscription_publishCallback(server, immediate);
+            return;
         }
     }
-    
-    UA_SecureChannel *channel = session->channel;
-    if(channel)
-        UA_SecureChannel_sendBinaryMessage(channel, requestId, &response,
-                                           &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
-    UA_PublishResponse_deleteMembers(&response);
-}
-
-void
-Service_ModifySubscription(UA_Server *server, UA_Session *session, const UA_ModifySubscriptionRequest *request,
-                           UA_ModifySubscriptionResponse *response) {
-    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
-    if(!sub) {
-        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
-        return;
-    }
-    
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
-                               request->requestedPublishingInterval, response->revisedPublishingInterval);
-    sub->publishingInterval = response->revisedPublishingInterval;
-    
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
-                               request->requestedLifetimeCount, response->revisedLifetimeCount);
-    sub->lifeTime = (UA_BoundedUInt32)  {
-        .min = server->config.lifeTimeCountLimits.min,
-        .max = server->config.lifeTimeCountLimits.max,
-        .current=response->revisedLifetimeCount};
-        
-    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
-                               request->requestedMaxKeepAliveCount, response->revisedMaxKeepAliveCount);
-    sub->keepAliveCount = (UA_BoundedUInt32)  {
-        .min = server->config.keepAliveCountLimits.min,
-        .max = server->config.keepAliveCountLimits.max,
-        .current=response->revisedMaxKeepAliveCount};
-        
-    sub->notificationsPerPublish = request->maxNotificationsPerPublish;
-    sub->priority                = request->priority;
-    
-    Subscription_unregisterUpdateJob(server, sub);
-    Subscription_registerUpdateJob(server, sub);
-    return;
 }
 
 void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
@@ -263,8 +334,7 @@ void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
     response->resultsSize = request->subscriptionIdsSize;
 
     for(size_t i = 0; i < request->subscriptionIdsSize; i++)
-        response->results[i] =
-            UA_Session_deleteSubscription(server, session, request->subscriptionIds[i]);
+        response->results[i] = UA_Session_deleteSubscription(server, session, request->subscriptionIds[i]);
 } 
 
 void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
@@ -275,6 +345,9 @@ void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
         response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
         return;
     }
+
+    /* Reset the subscription lifetime */
+    sub->currentLifetimeCount = 0;
     
     response->results = UA_malloc(sizeof(UA_StatusCode) * request->monitoredItemIdsSize);
     if(!response->results) {
@@ -284,43 +357,30 @@ void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
     response->resultsSize = request->monitoredItemIdsSize;
 
     for(size_t i = 0; i < request->monitoredItemIdsSize; i++)
-        response->results[i] =
-            UA_Session_deleteMonitoredItem(session, sub->subscriptionID,
-                                           request->monitoredItemIds[i]);
+        response->results[i] = UA_Subscription_deleteMonitoredItem(server, sub, request->monitoredItemIds[i]);
 }
 
 void Service_Republish(UA_Server *server, UA_Session *session, const UA_RepublishRequest *request,
                        UA_RepublishResponse *response) {
+    /* get the subscription */
     UA_Subscription *sub = UA_Session_getSubscriptionByID(session, request->subscriptionId);
     if (!sub) {
         response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
         return;
     }
+
+    /* Reset the subscription lifetime */
+    sub->currentLifetimeCount = 0;
     
-    // Find the notification in question
-    UA_unpublishedNotification *notification;
-    LIST_FOREACH(notification, &sub->unpublishedNotifications, listEntry) {
-        if(notification->notification.sequenceNumber == request->retransmitSequenceNumber)
+    /* Find the notification in the retransmission queue  */
+    UA_NotificationMessageEntry *entry;
+    LIST_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
+        if(entry->message.sequenceNumber == request->retransmitSequenceNumber)
             break;
     }
-    if(!notification) {
-      response->responseHeader.serviceResult = UA_STATUSCODE_BADSEQUENCENUMBERINVALID;
-      return;
-    }
-    
-    // FIXME: By spec, this notification has to be in the "retransmit queue", i.e. publishedOnce must be
-    //        true. If this is not tested, the client just gets what he asks for... hence this part is
-    //        commented:
-    /* Check if the notification is in the published queue
-    if (notification->publishedOnce == false) {
-      response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
-      return;
-    }
-    */
-    // Retransmit 
-    Subscription_copyNotificationMessage(&response->notificationMessage, notification);
-    // Mark this notification as published
-    notification->publishedOnce = true;
-    
-    return;
+    if(entry)
+        response->responseHeader.serviceResult =
+            UA_NotificationMessage_copy(&entry->message, &response->notificationMessage);
+    else
+      response->responseHeader.serviceResult = UA_STATUSCODE_BADMESSAGENOTAVAILABLE;
 }

+ 311 - 430
src/server/ua_subscription.c

@@ -1,488 +1,369 @@
 #include "ua_subscription.h"
 #include "ua_server_internal.h"
+#include "ua_services.h"
 #include "ua_nodestore.h"
 
-/****************/
-/* Subscription */
-/****************/
+/*****************/
+/* MonitoredItem */
+/*****************/
 
-UA_Subscription *UA_Subscription_new(UA_UInt32 subscriptionID) {
-    UA_Subscription *new = UA_malloc(sizeof(UA_Subscription));
-    if(!new)
-        return NULL;
-    new->subscriptionID = subscriptionID;
-    new->lastPublished  = 0;
-    new->sequenceNumber = 1;
-    memset(&new->timedUpdateJobGuid, 0, sizeof(UA_Guid));
-    new->timedUpdateIsRegistered = false;
-    LIST_INIT(&new->MonitoredItems);
-    LIST_INIT(&new->unpublishedNotifications);
-    new->unpublishedNotificationsSize = 0;
+UA_MonitoredItem * UA_MonitoredItem_new() {
+    UA_MonitoredItem *new = UA_malloc(sizeof(UA_MonitoredItem));
+    new->subscription = NULL;
+    new->currentQueueSize = 0;
+    new->maxQueueSize = 0;
+    new->monitoredItemType = UA_MONITOREDITEMTYPE_CHANGENOTIFY; /* currently hardcoded */
+    new->timestampsToReturn = UA_TIMESTAMPSTORETURN_SOURCE;
+    UA_String_init(&new->indexRange);
+    TAILQ_INIT(&new->queue);
+    UA_NodeId_init(&new->monitoredNodeId);
+    new->lastSampledValue = UA_BYTESTRING_NULL;
+    memset(&new->sampleJobGuid, 0, sizeof(UA_Guid));
+    new->sampleJobIsRegistered = false;
     return new;
 }
 
-void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server) {
-    // Just in case any parallel process attempts to access this subscription
-    // while we are deleting it... make it vanish.
-    subscription->subscriptionID = 0;
-    
-    // Delete monitored Items
-    UA_MonitoredItem *mon, *tmp_mon;
-    LIST_FOREACH_SAFE(mon, &subscription->MonitoredItems, listEntry, tmp_mon) {
-        LIST_REMOVE(mon, listEntry);
-        MonitoredItem_delete(mon);
-    }
-    
-    // Delete unpublished Notifications
-    Subscription_deleteUnpublishedNotification(0, true, subscription);
-    
-    // Unhook/Unregister any timed work assiociated with this subscription
-    if(subscription->timedUpdateIsRegistered) {
-        Subscription_unregisterUpdateJob(server, subscription);
-        subscription->timedUpdateIsRegistered = false;
+void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+    MonitoredItem_unregisterSampleJob(server, monitoredItem);
+    /* clear the queued samples */
+    MonitoredItem_queuedValue *val, *val_tmp;
+    TAILQ_FOREACH_SAFE(val, &monitoredItem->queue, listEntry, val_tmp) {
+        TAILQ_REMOVE(&monitoredItem->queue, val, listEntry);
+        UA_DataValue_deleteMembers(&val->value);
+        UA_free(val);
     }
+    monitoredItem->currentQueueSize = 0;
+    LIST_REMOVE(monitoredItem, listEntry);
+    UA_String_deleteMembers(&monitoredItem->indexRange);
+    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
+    UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
+    UA_free(monitoredItem);
 }
 
-void Subscription_generateKeepAlive(UA_Subscription *subscription) {
-    if(subscription->keepAliveCount.current > subscription->keepAliveCount.min &&
-       subscription->keepAliveCount.current <= subscription->keepAliveCount.max)
+static void SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+    if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
+        UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "Cannot process a monitoreditem that is not a data change notification");
         return;
+    }
 
-    UA_unpublishedNotification *msg = UA_calloc(1,sizeof(UA_unpublishedNotification));
-    if(!msg)
+    MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
+    if(!newvalue) {
+        UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
+                    "Skipped a sample due to lack of memory on monitoreditem %u", monitoredItem->itemId);
         return;
-    msg->notification.notificationData = NULL;
-    // KeepAlive uses next message, but does not increment counter
-    msg->notification.sequenceNumber = subscription->sequenceNumber + 1;
-    msg->notification.publishTime    = UA_DateTime_now();
-    msg->notification.notificationDataSize = 0;
-    LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
-    subscription->unpublishedNotificationsSize += 1;
-    subscription->keepAliveCount.current = subscription->keepAliveCount.max;
-}
+    }
+    UA_DataValue_init(&newvalue->value);
+    newvalue->clientHandle = monitoredItem->clientHandle;
+    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "Sampling the value on monitoreditem %u", monitoredItem->itemId);
+  
+    /* Read the value */
+    UA_ReadValueId rvid;
+    UA_ReadValueId_init(&rvid);
+    rvid.nodeId = monitoredItem->monitoredNodeId;
+    rvid.attributeId = monitoredItem->attributeID;
+    rvid.indexRange = monitoredItem->indexRange;
+    UA_Subscription *sub = monitoredItem->subscription;
+    Service_Read_single(server, sub->session, monitoredItem->timestampsToReturn, &rvid, &newvalue->value);
 
-void Subscription_updateNotifications(UA_Subscription *subscription) {
-    UA_MonitoredItem *mon;
-    //MonitoredItem_queuedValue *queuedValue;
-    UA_unpublishedNotification *msg;
-    UA_UInt32 monItemsChangeT = 0, monItemsStatusT = 0, monItemsEventT = 0;
-    
-    if(!subscription || subscription->lastPublished +
-       (UA_UInt32)(subscription->publishingInterval * UA_MSEC_TO_DATETIME) > UA_DateTime_now())
+    /* encode to see if the data has changed */
+    size_t binsize = UA_calcSizeBinary(&newvalue->value.value, &UA_TYPES[UA_TYPES_VARIANT]);
+    UA_ByteString newValueAsByteString;
+    UA_StatusCode retval = UA_ByteString_allocBuffer(&newValueAsByteString, binsize);
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_DataValue_deleteMembers(&newvalue->value);
+        UA_free(newvalue);
         return;
-    
-    // Make sure there is data to be published and establish which message types
-    // will need to be generated
-    LIST_FOREACH(mon, &subscription->MonitoredItems, listEntry) {
-        // Check if this MonitoredItems Queue holds data and how much data is held in total
-        if(!TAILQ_FIRST(&mon->queue))
-            continue;
-        if((mon->monitoredItemType & MONITOREDITEM_TYPE_CHANGENOTIFY) != 0)
-            monItemsChangeT+=mon->queueSize.current;
-	    else if((mon->monitoredItemType & MONITOREDITEM_TYPE_STATUSNOTIFY) != 0)
-            monItemsStatusT+=mon->queueSize.current;
-	    else if((mon->monitoredItemType & MONITOREDITEM_TYPE_EVENTNOTIFY)  != 0)
-            monItemsEventT+=mon->queueSize.current;
     }
-    
-    // FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
-    if(subscription->unpublishedNotificationsSize >= 10) {
-        // Remove last entry
-        Subscription_deleteUnpublishedNotification(0, true, subscription);
-    }
-    
-    if(monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
-        // Decrement KeepAlive
-        subscription->keepAliveCount.current--;
-        // +- Generate KeepAlive msg if counter overruns
-        if (subscription->keepAliveCount.current < subscription->keepAliveCount.min)
-          Subscription_generateKeepAlive(subscription);
-        
+    size_t encodingOffset = 0;
+    retval = UA_encodeBinary(&newvalue->value.value, &UA_TYPES[UA_TYPES_VARIANT],
+                             &newValueAsByteString, &encodingOffset);
+
+    /* error or the content has not changed */
+    if(retval != UA_STATUSCODE_GOOD ||
+       (monitoredItem->lastSampledValue.data &&
+        UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue))) {
+        UA_ByteString_deleteMembers(&newValueAsByteString);
+        UA_DataValue_deleteMembers(&newvalue->value);
+        UA_free(newvalue);
+        UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "Do not sample since the data value has not changed");
         return;
     }
-    
-    msg = UA_calloc(1, sizeof(UA_unpublishedNotification));
-    msg->notification.sequenceNumber = subscription->sequenceNumber++;
-    msg->notification.publishTime = UA_DateTime_now();
-    
-    // NotificationData is an array of Change, Status and Event messages, each containing the appropriate
-    // list of Queued values from all monitoredItems of that type
-    msg->notification.notificationDataSize = !!monItemsChangeT; // 1 if the pointer is not null, else 0
-    // + ISNOTZERO(monItemsEventT) + ISNOTZERO(monItemsStatusT);
-    msg->notification.notificationData =
-        UA_Array_new(msg->notification.notificationDataSize, &UA_TYPES[UA_TYPES_EXTENSIONOBJECT]);
-    
-    for(size_t notmsgn = 0; notmsgn < msg->notification.notificationDataSize; notmsgn++) {
-        // Set the notification message type and encoding for each of 
-        //   the three possible NotificationData Types
-
-        /* msg->notification->notificationData[notmsgn].encoding = 1; // Encoding is always binary */
-        /* msg->notification->notificationData[notmsgn].typeId = UA_NODEID_NUMERIC(0, 811); */
-      
-        if(notmsgn == 0) {
-            UA_DataChangeNotification *changeNotification = UA_DataChangeNotification_new();
-            changeNotification->monitoredItems = UA_Array_new(monItemsChangeT, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
-	
-            // Scan all monitoredItems in this subscription and have their queue transformed into an Array of
-            // the propper NotificationMessageType (Status, Change, Event)
-            monItemsChangeT = 0;
-            LIST_FOREACH(mon, &subscription->MonitoredItems, listEntry) {
-                if(mon->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY || !TAILQ_FIRST(&mon->queue))
-                    continue;
-                // Note: Monitored Items might not return a queuedValue if there is a problem encoding it.
-                monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications(&changeNotification->monitoredItems[monItemsChangeT], mon);
-                MonitoredItem_ClearQueue(mon);
-            }
-            changeNotification->monitoredItemsSize = monItemsChangeT;
-            msg->notification.notificationData[notmsgn].encoding = UA_EXTENSIONOBJECT_DECODED;
-            msg->notification.notificationData[notmsgn].content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
-            msg->notification.notificationData[notmsgn].content.decoded.data = changeNotification;
-        } else if(notmsgn == 1) {
-            // FIXME: Constructing a StatusChangeNotification is not implemented
-        } else if(notmsgn == 2) {
-            // FIXME: Constructing a EventListNotification is not implemented
+  
+    /* do we have space in the queue? */
+    if(monitoredItem->currentQueueSize >= monitoredItem->maxQueueSize) {
+        if(!monitoredItem->discardOldest) {
+            // We cannot remove the oldest value and theres no queue space left. We're done here.
+            UA_ByteString_deleteMembers(&newValueAsByteString);
+            UA_DataValue_deleteMembers(&newvalue->value);
+            UA_free(newvalue);
+            return;
         }
+        MonitoredItem_queuedValue *queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
+        TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
+        UA_DataValue_deleteMembers(&queueItem->value);
+        UA_free(queueItem);
+        monitoredItem->currentQueueSize--;
     }
-    LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
-    subscription->unpublishedNotificationsSize += 1;
-}
-
-UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
-    UA_UInt32 *seqArray = UA_malloc(sizeof(UA_UInt32) * sub->unpublishedNotificationsSize);
-    if(!seqArray)
-        return NULL;
   
-    int i = 0;
-    UA_unpublishedNotification *not;
-    LIST_FOREACH(not, &sub->unpublishedNotifications, listEntry) {
-        seqArray[i] = not->notification.sequenceNumber;
-        i++;
-    }
-    return seqArray;
-}
-
-void Subscription_copyNotificationMessage(UA_NotificationMessage *dst, UA_unpublishedNotification *src) {
-    if(!dst)
-        return;
-    
-    UA_NotificationMessage *latest = &src->notification;
-    dst->notificationDataSize = latest->notificationDataSize;
-    dst->publishTime = latest->publishTime;
-    dst->sequenceNumber = latest->sequenceNumber;
-    
-    if(latest->notificationDataSize == 0)
-        return;
-
-    dst->notificationData = UA_ExtensionObject_new();
-    UA_ExtensionObject_copy(latest->notificationData, dst->notificationData);
-}
-
-UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Boolean bDeleteAll, UA_Subscription *sub) {
-    UA_UInt32 deletedItems = 0;
-    UA_unpublishedNotification *not, *tmp;
-    LIST_FOREACH_SAFE(not, &sub->unpublishedNotifications, listEntry, tmp) {
-        if(!bDeleteAll && not->notification.sequenceNumber != seqNo)
-            continue;
-        LIST_REMOVE(not, listEntry);
-        sub->unpublishedNotificationsSize -= 1;
-        UA_NotificationMessage_deleteMembers(&not->notification);
-        UA_free(not);
-        deletedItems++;
-    }
-    return deletedItems;
-}
-
-
-static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *data) {
-    // Timed-Worker/Job Version of updateNotifications
-    UA_Subscription *sub = (UA_Subscription *) data;
-    UA_MonitoredItem *mon;
-    
-    if(!data || !server)
-        return;
-    
-    // This is set by the Subscription_delete function to detere us from fiddling with
-    // this subscription if it is being deleted (not technically thread save, but better
-    // then nothing at all)
-    if(sub->subscriptionID == 0)
-        return;
-    
-    // FIXME: This should be done by the event system
-    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
-        MonitoredItem_QueuePushDataValue(server, mon);
-    
-    Subscription_updateNotifications(sub);
+    /* add the sample */
+    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
+    monitoredItem->lastSampledValue = newValueAsByteString;
+    TAILQ_INSERT_TAIL(&monitoredItem->queue, newvalue, listEntry);
+    monitoredItem->currentQueueSize++;
 }
 
-UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub) {
-    if(sub->publishingInterval <= 5 ) 
-        return UA_STATUSCODE_BADNOTSUPPORTED;
-
-    UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
-                           .job.methodCall = {.method = Subscription_timedUpdateNotificationsJob,
-                                              .data = sub} };
-    
-    /* Practically enough, the client sends a uint32 in ms, which we store as
-       datetime, which here is required in as uint32 in ms as the interval */
-    UA_StatusCode retval = UA_Server_addRepeatedJob(server, job,
-                                                    (UA_UInt32)sub->publishingInterval,
-                                                    &sub->timedUpdateJobGuid);
+UA_StatusCode MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
+    //SampleCallback(server, mon);
+    UA_Job job = {.type = UA_JOBTYPE_METHODCALL,
+                  .job.methodCall = {.method = (UA_ServerCallback)SampleCallback, .data = mon} };
+    UA_StatusCode retval = UA_Server_addRepeatedJob(server, job, (UA_UInt32)mon->samplingInterval,
+                                                    &mon->sampleJobGuid);
     if(retval == UA_STATUSCODE_GOOD)
-        sub->timedUpdateIsRegistered = true;
+        mon->sampleJobIsRegistered = true;
     return retval;
 }
 
-UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub) {
-    sub->timedUpdateIsRegistered = false;
-    return UA_Server_removeRepeatedJob(server, sub->timedUpdateJobGuid);
+UA_StatusCode MonitoredItem_unregisterSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
+    if(!mon->sampleJobIsRegistered)
+        return UA_STATUSCODE_GOOD;
+    mon->sampleJobIsRegistered = false;
+    return UA_Server_removeRepeatedJob(server, mon->sampleJobGuid);
 }
 
-/*****************/
-/* MonitoredItem */
-/*****************/
+/****************/
+/* Subscription */
+/****************/
 
-UA_MonitoredItem * UA_MonitoredItem_new() {
-    UA_MonitoredItem *new = (UA_MonitoredItem *) UA_malloc(sizeof(UA_MonitoredItem));
-    new->queueSize   = (UA_BoundedUInt32) { .min = 0, .max = 0, .current = 0};
-    new->lastSampled = 0;
-    // FIXME: This is currently hardcoded;
-    new->monitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
-    TAILQ_INIT(&new->queue);
-    UA_NodeId_init(&new->monitoredNodeId);
-    new->lastSampledValue.data = 0;
+UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID) {
+    UA_Subscription *new = UA_malloc(sizeof(UA_Subscription));
+    if(!new)
+        return NULL;
+    new->session = session;
+    new->subscriptionID = subscriptionID;
+    new->sequenceNumber = 0;
+    new->maxKeepAliveCount = 0;
+	new->publishingEnabled = false;
+    memset(&new->publishJobGuid, 0, sizeof(UA_Guid));
+    new->publishJobIsRegistered = false;
+	new->currentKeepAliveCount = 0;
+	new->currentLifetimeCount = 0;
+	new->state = UA_SUBSCRIPTIONSTATE_LATE; /* The first publish response is sent immediately */
+    LIST_INIT(&new->retransmissionQueue);
+    LIST_INIT(&new->MonitoredItems);
     return new;
 }
 
-void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
-    // Delete Queued Data
-    MonitoredItem_ClearQueue(monitoredItem);
-    // Remove from subscription list
-    LIST_REMOVE(monitoredItem, listEntry);
-    // Release comparison sample
-    if(monitoredItem->lastSampledValue.data != NULL) { 
-      UA_free(monitoredItem->lastSampledValue.data);
+void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server) {
+    Subscription_unregisterPublishJob(server, subscription);
+
+    /* Delete monitored Items */
+    UA_MonitoredItem *mon, *tmp_mon;
+    LIST_FOREACH_SAFE(mon, &subscription->MonitoredItems, listEntry, tmp_mon) {
+        LIST_REMOVE(mon, listEntry);
+        MonitoredItem_delete(server, mon);
     }
-    
-    UA_NodeId_deleteMembers(&(monitoredItem->monitoredNodeId));
-    UA_free(monitoredItem);
-}
 
-UA_UInt32 MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
-                                                 UA_MonitoredItem *monitoredItem) {
-    UA_UInt32 queueSize = 0;
-    MonitoredItem_queuedValue *queueItem;
-  
-    // Count instead of relying on the items current
-    TAILQ_FOREACH(queueItem, &monitoredItem->queue, listEntry) {
-        dst[queueSize].clientHandle = monitoredItem->clientHandle;
-        UA_DataValue_copy(&queueItem->value, &dst[queueSize].value);
+    /* Delete Retransmission Queue */
+    UA_NotificationMessageEntry *nme, *nme_tmp;
+    LIST_FOREACH_SAFE(nme, &subscription->retransmissionQueue, listEntry, nme_tmp) {
+        LIST_REMOVE(nme, listEntry);
+        UA_NotificationMessage_deleteMembers(&nme->message);
+        UA_free(nme);
+    }
+}
 
-        dst[queueSize].value.hasServerPicoseconds = false;
-        dst[queueSize].value.hasServerTimestamp   = true;
-        dst[queueSize].value.serverTimestamp      = UA_DateTime_now();
-    
-        // Do not create variants with no type -> will make calcSizeBinary() segfault.
-        if(dst[queueSize].value.value.type)
-            queueSize++;
+UA_MonitoredItem *
+UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemID) {
+    UA_MonitoredItem *mon;
+    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+        if(mon->itemId == monitoredItemID)
+            break;
     }
-    return queueSize;
+    return mon;
 }
 
-void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
-    MonitoredItem_queuedValue *val, *val_tmp;
-    TAILQ_FOREACH_SAFE(val, &monitoredItem->queue, listEntry, val_tmp) {
-        TAILQ_REMOVE(&monitoredItem->queue, val, listEntry);
-        UA_DataValue_deleteMembers(&val->value);
-        UA_free(val);
+UA_StatusCode
+UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
+                                    UA_UInt32 monitoredItemID) {
+    UA_MonitoredItem *mon;
+    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+        if(mon->itemId == monitoredItemID) {
+            LIST_REMOVE(mon, listEntry);
+            MonitoredItem_delete(server, mon);
+            return UA_STATUSCODE_GOOD;
+        }
     }
-    monitoredItem->queueSize.current = 0;
+    return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
 }
 
-UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, const UA_Node *src,
-                                                     UA_DataValue *dst) {
-    UA_Boolean samplingError = true; 
-    UA_DataValue sourceDataValue;
-    UA_DataValue_init(&sourceDataValue);
-  
-    // FIXME: Not all attributeIDs can be monitored yet
-    switch(attributeID) {
-    case UA_ATTRIBUTEID_NODEID:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_NodeId*)&src->nodeId, &UA_TYPES[UA_TYPES_NODEID]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_NODECLASS:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_Int32*)&src->nodeClass, &UA_TYPES[UA_TYPES_INT32]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_BROWSENAME:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_String*)&src->browseName, &UA_TYPES[UA_TYPES_QUALIFIEDNAME]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_DISPLAYNAME:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_String*)&src->displayName, &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_DESCRIPTION:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_String*)&src->displayName, &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_WRITEMASK:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_String*)&src->writeMask, &UA_TYPES[UA_TYPES_UINT32]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_USERWRITEMASK:
-        UA_Variant_setScalarCopy(&dst->value, (const UA_String*)&src->writeMask, &UA_TYPES[UA_TYPES_UINT32]);
-        dst->hasValue = true;
-        samplingError = false;
-        break;
-    case UA_ATTRIBUTEID_ISABSTRACT:
-        break;
-    case UA_ATTRIBUTEID_SYMMETRIC:
-        break;
-    case UA_ATTRIBUTEID_INVERSENAME:
-        break;
-    case UA_ATTRIBUTEID_CONTAINSNOLOOPS:
-        break;
-    case UA_ATTRIBUTEID_EVENTNOTIFIER:
-        break;
-    case UA_ATTRIBUTEID_VALUE: 
-        if(src->nodeClass == UA_NODECLASS_VARIABLE) {
-            const UA_VariableNode *vsrc = (const UA_VariableNode*)src;
-            if(vsrc->valueSource == UA_VALUESOURCE_VARIANT) {
-                if(vsrc->value.variant.callback.onRead)
-                    vsrc->value.variant.callback.onRead(vsrc->value.variant.callback.handle, vsrc->nodeId,
-                                                        &dst->value, NULL);
-                UA_Variant_copy(&vsrc->value.variant.value, &dst->value);
-                dst->hasValue = true;
-                samplingError = false;
-            } else {
-                if(vsrc->valueSource != UA_VALUESOURCE_DATASOURCE || vsrc->value.dataSource.read == NULL)
-                    break;
-                if(vsrc->value.dataSource.read(vsrc->value.dataSource.handle, vsrc->nodeId, true,
-                                               NULL, &sourceDataValue) != UA_STATUSCODE_GOOD)
+void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub) {
+    /* Count the available notifications */
+    size_t notifications = 0;
+    UA_Boolean moreNotifications = false;
+    if(sub->publishingEnabled) {
+        UA_MonitoredItem *mon;
+        LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+            MonitoredItem_queuedValue *qv;
+            TAILQ_FOREACH(qv, &mon->queue, listEntry) {
+                if(notifications >= sub->notificationsPerPublish) {
+                    moreNotifications = true;
                     break;
-                UA_DataValue_copy(&sourceDataValue, dst);
-                UA_DataValue_deleteMembers(&sourceDataValue);
-                samplingError = false;
+                }
+                notifications++;
             }
         }
-        break;
-    case UA_ATTRIBUTEID_DATATYPE:
-        break;
-    case UA_ATTRIBUTEID_VALUERANK:
-        break;
-    case UA_ATTRIBUTEID_ARRAYDIMENSIONS:
-        break;
-    case UA_ATTRIBUTEID_ACCESSLEVEL:
-        break;
-    case UA_ATTRIBUTEID_USERACCESSLEVEL:
-        break;
-    case UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
-        break;
-    case UA_ATTRIBUTEID_HISTORIZING:
-        break;
-    case UA_ATTRIBUTEID_EXECUTABLE:
-        break;
-    case UA_ATTRIBUTEID_USEREXECUTABLE:
-        break;
-    default:
-        break;
     }
-  
-    return samplingError;
-}
 
-void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem) {
-    UA_ByteString newValueAsByteString = { .length=0, .data=NULL };
-    size_t encodingOffset = 0;
-  
-    if(!monitoredItem || monitoredItem->lastSampled + monitoredItem->samplingInterval > UA_DateTime_now())
-        return;
-  
-    // FIXME: Actively suppress non change value based monitoring. There should be
-    // another function to handle status and events.
-    if(monitoredItem->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY)
-        return;
+    /* Return if nothing to do */
+    if(notifications == 0) {
+        sub->currentKeepAliveCount++;
+        if(sub->currentKeepAliveCount < sub->maxKeepAliveCount)
+            return;
+    }
 
-    MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
-    if(!newvalue)
+    /* Check if the securechannel is valid */
+    UA_SecureChannel *channel = sub->session->channel;
+    if(!channel)
         return;
 
-    newvalue->listEntry.tqe_next = NULL;
-    newvalue->listEntry.tqe_prev = NULL;
-    UA_DataValue_init(&newvalue->value);
+	/* Dequeue a response */
+	UA_PublishResponseEntry *pre = SIMPLEQ_FIRST(&sub->session->responseQueue);
+	if(!pre) {
+		UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+			"Cannot send a publish response on subscription %u " \
+			"since the publish queue is empty on session %u",
+			sub->subscriptionID, sub->session->authenticationToken.identifier.numeric);
+		if(sub->state != UA_SUBSCRIPTIONSTATE_LATE) {
+			sub->state = UA_SUBSCRIPTIONSTATE_LATE;
+		} else {
+			sub->currentLifetimeCount++;
+			if(sub->currentLifetimeCount >= sub->lifeTimeCount) {
+				UA_LOG_INFO(server->config.logger, UA_LOGCATEGORY_SERVER,
+					"End of lifetime for subscription %u on session %u",
+					sub->subscriptionID, sub->session->authenticationToken.identifier.numeric);
+				UA_Session_deleteSubscription(server, sub->session, sub->subscriptionID);
+			}
+		}
+		return;
+	}
+	SIMPLEQ_REMOVE_HEAD(&sub->session->responseQueue, listEntry);
+    UA_PublishResponse *response = &pre->response;
+    UA_UInt32 requestId = pre->requestId;
 
-    // Verify that the *Node being monitored is still valid
-    // Looking up the in the nodestore is only necessary if we suspect that it is changed during writes
-    // e.g. in multithreaded applications
-    const UA_Node *target = UA_NodeStore_get(server->nodestore, &monitoredItem->monitoredNodeId);
-    if(!target) {
-        UA_free(newvalue);
-        return;
-    }
-  
-    UA_Boolean samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->attributeID, target,
-                                                                         &newvalue->value);
+	/* We have a request. Reset state to normal. */
+	sub->state = UA_SUBSCRIPTIONSTATE_NORMAL;
+    sub->currentKeepAliveCount = 0;
+    sub->currentLifetimeCount = 0;
 
-    if(samplingError != false || !newvalue->value.value.type) {
-        UA_DataValue_deleteMembers(&newvalue->value);
-        UA_free(newvalue);
-        return;
-    }
-  
-    if(monitoredItem->queueSize.current >= monitoredItem->queueSize.max) {
-        if(monitoredItem->discardOldest != true) {
-            // We cannot remove the oldest value and theres no queue space left. We're done here.
-            UA_DataValue_deleteMembers(&newvalue->value);
-            UA_free(newvalue);
-            return;
+	/* Prepare the response */
+    response->responseHeader.timestamp = UA_DateTime_now();
+    response->subscriptionId = sub->subscriptionID;
+	response->moreNotifications = moreNotifications;
+    UA_NotificationMessage *message = &response->notificationMessage;
+    message->publishTime = response->responseHeader.timestamp;
+    if(notifications == 0) {
+        /* Send sequence number for the next notification */
+		message->sequenceNumber = sub->sequenceNumber + 1;
+    } else {
+        /* Increase the sequence number */
+        message->sequenceNumber = ++sub->sequenceNumber;
+
+        /* Collect the notification messages */
+        message->notificationData = UA_ExtensionObject_new();
+        message->notificationDataSize = 1;
+        UA_ExtensionObject *data = message->notificationData;
+        UA_DataChangeNotification *dcn = UA_DataChangeNotification_new();
+        dcn->monitoredItems = UA_Array_new(notifications, &UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION]);
+        dcn->monitoredItemsSize = notifications;
+        size_t l = 0;
+        UA_MonitoredItem *mon;
+        LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+            MonitoredItem_queuedValue *qv, *qv_tmp;
+            TAILQ_FOREACH_SAFE(qv, &mon->queue, listEntry, qv_tmp) {
+                if(notifications <= l)
+                    break;
+                UA_MonitoredItemNotification *min = &dcn->monitoredItems[l];
+                min->clientHandle = qv->clientHandle;
+                min->value = qv->value;
+                TAILQ_REMOVE(&mon->queue, qv, listEntry);
+                UA_free(qv);
+                mon->currentQueueSize--;
+                l++;
+            }
         }
-        MonitoredItem_queuedValue *queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
-        TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
-        UA_free(queueItem);
-        monitoredItem->queueSize.current--;
-    }
-  
-    // encode the data to find if its different to the previous
-    size_t binsize = UA_calcSizeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE]);
-    UA_StatusCode retval = UA_ByteString_allocBuffer(&newValueAsByteString, binsize);
-    if(retval != UA_STATUSCODE_GOOD) {
-        UA_DataValue_deleteMembers(&newvalue->value);
-        UA_free(newvalue);
-        return;
+        data->encoding = UA_EXTENSIONOBJECT_DECODED;
+        data->content.decoded.data = dcn;
+        data->content.decoded.type = &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION];
+
+        /* Put the notification message into the retransmission queue */
+        UA_NotificationMessageEntry *retransmission = malloc(sizeof(UA_NotificationMessageEntry));
+        retransmission->message = response->notificationMessage;
+        LIST_INSERT_HEAD(&sub->retransmissionQueue, retransmission, listEntry);
     }
-    
-    retval = UA_encodeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE], &newValueAsByteString, &encodingOffset);
-    if(retval != UA_STATUSCODE_GOOD) {
-        UA_ByteString_deleteMembers(&newValueAsByteString);
-        UA_DataValue_deleteMembers(&newvalue->value);
-        UA_free(newvalue);
-        return;
+
+    /* Get the available sequence numbers from the retransmission queue */
+    size_t available = 0, i = 0;
+    UA_NotificationMessageEntry *nme;
+    LIST_FOREACH(nme, &sub->retransmissionQueue, listEntry)
+        available++;
+    if(available > 0) {
+        response->availableSequenceNumbers = UA_alloca(available * sizeof(UA_UInt32));
+        response->availableSequenceNumbersSize = available;
     }
-  
-    if(!monitoredItem->lastSampledValue.data) { 
-        UA_ByteString_copy(&newValueAsByteString, &monitoredItem->lastSampledValue);
-        TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
-        monitoredItem->queueSize.current++;
-        monitoredItem->lastSampled = UA_DateTime_now();
-        UA_free(newValueAsByteString.data);
-    } else {
-        if(UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue) == true) {
-            UA_DataValue_deleteMembers(&newvalue->value);
-            UA_free(newvalue);
-            UA_String_deleteMembers(&newValueAsByteString);
-            return;
-        }
-        UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
-        monitoredItem->lastSampledValue = newValueAsByteString;
-        TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
-        monitoredItem->queueSize.current++;
-        monitoredItem->lastSampled = UA_DateTime_now();
+    LIST_FOREACH(nme, &sub->retransmissionQueue, listEntry) {
+        response->availableSequenceNumbers[i] = nme->message.sequenceNumber;
+        i++;
     }
+
+    /* Send the response */
+    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "Sending out a publish response on subscription %u on securechannel %u " \
+                 "with %u notifications", sub->subscriptionID,
+                 sub->session->authenticationToken.identifier.numeric, (UA_UInt32)notifications);
+    UA_SecureChannel_sendBinaryMessage(sub->session->channel, requestId, response,
+                                       &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
+
+    /* Remove the queued request */
+    UA_NotificationMessage_init(&response->notificationMessage); /* message was copied to the queue */
+    response->availableSequenceNumbers = NULL; /* stack-allocated */
+    response->availableSequenceNumbersSize = 0;
+    UA_PublishResponse_deleteMembers(&pre->response);
+    UA_free(pre);
+
+    /* Repeat if there are more notifications to send */
+    if(moreNotifications)
+        UA_Subscription_publishCallback(server, sub);
+}
+
+UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub) {
+    UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
+                           .job.methodCall = {.method = (UA_ServerCallback)UA_Subscription_publishCallback,
+                                              .data = sub} };
+    UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                 "Adding a subscription with %i millisec interval", (int)sub->publishingInterval);
+    UA_StatusCode retval = UA_Server_addRepeatedJob(server, job,
+                                                    (UA_UInt32)sub->publishingInterval,
+                                                    &sub->publishJobGuid);
+    if(retval == UA_STATUSCODE_GOOD)
+        sub->publishJobIsRegistered = true;
+    else
+        UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "Could not register a subscription publication job " \
+                     "with status code 0x%08x\n", retval);
+    return retval;
+}
+
+UA_StatusCode Subscription_unregisterPublishJob(UA_Server *server, UA_Subscription *sub) {
+    if(!sub->publishJobIsRegistered)
+        return UA_STATUSCODE_GOOD;
+    sub->publishJobIsRegistered = false;
+    UA_StatusCode retval = UA_Server_removeRepeatedJob(server, sub->publishJobGuid);
+    if(retval != UA_STATUSCODE_GOOD)
+        UA_LOG_DEBUG(server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "Could not remove a subscription publication job " \
+                     "with status code 0x%08x\n", retval);
+    return retval;
 }

+ 68 - 43
src/server/ua_subscription.h

@@ -12,82 +12,107 @@
 /*****************/
 
 typedef enum {
-    MONITOREDITEM_TYPE_CHANGENOTIFY = 1,
-    MONITOREDITEM_TYPE_STATUSNOTIFY = 2,
-    MONITOREDITEM_TYPE_EVENTNOTIFY = 4
-} UA_MONITOREDITEM_TYPE;
+    UA_MONITOREDITEMTYPE_CHANGENOTIFY = 1,
+    UA_MONITOREDITEMTYPE_STATUSNOTIFY = 2,
+    UA_MONITOREDITEMTYPE_EVENTNOTIFY = 4
+} UA_MonitoredItemType;
 
 typedef struct MonitoredItem_queuedValue {
-    UA_DataValue value;
     TAILQ_ENTRY(MonitoredItem_queuedValue) listEntry;
+    UA_UInt32 clientHandle;
+    UA_DataValue value;
 } MonitoredItem_queuedValue;
 
 typedef struct UA_MonitoredItem {
     LIST_ENTRY(UA_MonitoredItem) listEntry;
+
+    /* Settings */
+    UA_Subscription *subscription;
     UA_UInt32 itemId;
-    UA_MONITOREDITEM_TYPE monitoredItemType;
-    UA_UInt32 timestampsToReturn;
-    UA_UInt32 monitoringMode;
+    UA_MonitoredItemType monitoredItemType;
+    UA_TimestampsToReturn timestampsToReturn;
+    UA_MonitoringMode monitoringMode;
     UA_NodeId monitoredNodeId; 
     UA_UInt32 attributeID;
     UA_UInt32 clientHandle;
-    UA_UInt32 samplingInterval; // [ms]
-    UA_BoundedUInt32 queueSize;
+    UA_Double samplingInterval; // [ms]
+    UA_UInt32 currentQueueSize;
+    UA_UInt32 maxQueueSize;
     UA_Boolean discardOldest;
-    UA_DateTime lastSampled;
+    UA_String indexRange;
+    // TODO: dataEncoding is hardcoded to UA binary
+
+    /* Sample Job */
+    UA_Guid sampleJobGuid;
+    UA_Boolean sampleJobIsRegistered;
+
+    /* Sample Queue */
     UA_ByteString lastSampledValue;
-    // FIXME: indexRange is ignored; array values default to element 0
-    // FIXME: dataEncoding is hardcoded to UA binary
     TAILQ_HEAD(QueueOfQueueDataValues, MonitoredItem_queuedValue) queue;
 } UA_MonitoredItem;
 
 UA_MonitoredItem *UA_MonitoredItem_new(void);
-void MonitoredItem_delete(UA_MonitoredItem *monitoredItem);
-void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem);
-void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem);
-UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, const UA_Node *src,
-                                                     UA_DataValue *dst);
-UA_UInt32 MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
-                                                       UA_MonitoredItem *monitoredItem);
+void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem);
+UA_StatusCode MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon);
+UA_StatusCode MonitoredItem_unregisterSampleJob(UA_Server *server, UA_MonitoredItem *mon);
 
 /****************/
 /* Subscription */
 /****************/
 
-typedef struct UA_unpublishedNotification {
-    UA_Boolean publishedOnce;
-    LIST_ENTRY(UA_unpublishedNotification) listEntry;
-    UA_NotificationMessage notification;
-} UA_unpublishedNotification;
+typedef struct UA_NotificationMessageEntry {
+    LIST_ENTRY(UA_NotificationMessageEntry) listEntry;
+    UA_NotificationMessage message;
+} UA_NotificationMessageEntry;
+
+/* We use only a subset of the states defined in the standard */
+typedef enum {
+	/* UA_SUBSCRIPTIONSTATE_CLOSED */
+	/* UA_SUBSCRIPTIONSTATE_CREATING */
+	UA_SUBSCRIPTIONSTATE_NORMAL,
+	UA_SUBSCRIPTIONSTATE_LATE,
+    UA_SUBSCRIPTIONSTATE_KEEPALIVE
+} UA_SubscriptionState;
 
 struct UA_Subscription {
     LIST_ENTRY(UA_Subscription) listEntry;
-    UA_BoundedUInt32 lifeTime;
-    UA_BoundedUInt32 keepAliveCount;
+
+    /* Settings */
+    UA_Session *session;
+    UA_UInt32 lifeTimeCount;
+    UA_UInt32 maxKeepAliveCount;
     UA_Double publishingInterval;     // [ms] 
-    UA_DateTime lastPublished;
     UA_UInt32 subscriptionID;
     UA_UInt32 notificationsPerPublish;
-    UA_Boolean publishingMode;
+    UA_Boolean publishingEnabled;
     UA_UInt32 priority;
     UA_UInt32 sequenceNumber;
-    UA_Guid timedUpdateJobGuid;
-    UA_Boolean timedUpdateIsRegistered;
-    LIST_HEAD(UA_ListOfUnpublishedNotifications, UA_unpublishedNotification) unpublishedNotifications;
-    size_t unpublishedNotificationsSize;
+
+    /* Runtime information */
+	UA_SubscriptionState state;
+    UA_UInt32 currentKeepAliveCount;
+	UA_UInt32 currentLifetimeCount;
+
+    /* Publish Job */
+    UA_Guid publishJobGuid;
+    UA_Boolean publishJobIsRegistered;
+
     LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem) MonitoredItems;
+    LIST_HEAD(UA_ListOfNotificationMessages, UA_NotificationMessageEntry) retransmissionQueue;
 };
 
-UA_Subscription *UA_Subscription_new(UA_UInt32 subscriptionID);
+UA_Subscription *UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionID);
 void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server);
-void Subscription_updateNotifications(UA_Subscription *subscription);
-UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub);
-void Subscription_generateKeepAlive(UA_Subscription *subscription);
-void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub);
-UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Boolean bDeleteAll, UA_Subscription *sub);
-void Subscription_copyNotificationMessage(UA_NotificationMessage *dst, UA_unpublishedNotification *src);
-UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub);
-UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub);
-UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub);
+UA_StatusCode Subscription_registerPublishJob(UA_Server *server, UA_Subscription *sub);
+UA_StatusCode Subscription_unregisterPublishJob(UA_Server *server, UA_Subscription *sub);
+
+UA_StatusCode
+UA_Subscription_deleteMonitoredItem(UA_Server *server, UA_Subscription *sub,
+                                    UA_UInt32 monitoredItemID);
+
+UA_MonitoredItem *
+UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemID);
+
+void UA_Subscription_publishCallback(UA_Server *server, UA_Subscription *sub);
 
 #endif /* UA_SUBSCRIPTION_H_ */

+ 9 - 21
src/ua_session.c

@@ -29,12 +29,13 @@ void UA_Session_init(UA_Session *session) {
     session->timeout = 0;
     UA_DateTime_init(&session->validTill);
     session->channel = NULL;
+    session->availableContinuationPoints = MAXCONTINUATIONPOINTS;
+    LIST_INIT(&session->continuationPoints);
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     LIST_INIT(&session->serverSubscriptions);
     session->lastSubscriptionID = UA_UInt32_random();
+    SIMPLEQ_INIT(&session->responseQueue);
 #endif
-    session->availableContinuationPoints = MAXCONTINUATIONPOINTS;
-    LIST_INIT(&session->continuationPoints);
 }
 
 void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server* server) {
@@ -58,6 +59,12 @@ void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server* server) {
         UA_Subscription_deleteMembers(currents, server);
         UA_free(currents);
     }
+    UA_PublishResponseEntry *entry;
+    while((entry = SIMPLEQ_FIRST(&session->responseQueue))) {
+        SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry);
+        UA_PublishResponse_deleteMembers(&entry->response);
+        UA_free(entry);
+    }
 #endif
 }
 
@@ -92,25 +99,6 @@ UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID) {
     return sub;
 }
 
-
-UA_StatusCode
-UA_Session_deleteMonitoredItem(UA_Session *session, UA_UInt32 subscriptionID,
-                               UA_UInt32 monitoredItemID) {
-    UA_Subscription *sub = UA_Session_getSubscriptionByID(session, subscriptionID);
-    if(!sub)
-        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
-    
-    UA_MonitoredItem *mon, *tmp_mon;
-    LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, tmp_mon) {
-        if(mon->itemId == monitoredItemID) {
-            LIST_REMOVE(mon, listEntry);
-            MonitoredItem_delete(mon);
-            return UA_STATUSCODE_GOOD;
-        }
-    }
-    return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
-}
-
 UA_UInt32 UA_Session_getUniqueSubscriptionID(UA_Session *session) {
     return ++(session->lastSubscriptionID);
 }

+ 7 - 4
src/ua_session.h

@@ -19,6 +19,12 @@ struct ContinuationPointEntry {
 struct UA_Subscription;
 typedef struct UA_Subscription UA_Subscription;
 
+typedef struct UA_PublishResponseEntry {
+    SIMPLEQ_ENTRY(UA_PublishResponseEntry) listEntry;
+    UA_UInt32 requestId;
+    UA_PublishResponse response;
+} UA_PublishResponseEntry;
+
 struct UA_Session {
     UA_ApplicationDescription clientDescription;
     UA_Boolean        activated;
@@ -35,6 +41,7 @@ struct UA_Session {
 #ifdef UA_ENABLE_SUBSCRIPTIONS
     UA_UInt32 lastSubscriptionID;
     LIST_HEAD(UA_ListOfUASubscriptions, UA_Subscription) serverSubscriptions;
+    SIMPLEQ_HEAD(UA_ListOfQueuedPublishResponses, UA_PublishResponseEntry) responseQueue;
 #endif
 };
 
@@ -54,10 +61,6 @@ void UA_Session_addSubscription(UA_Session *session, UA_Subscription *newSubscri
 UA_Subscription *
 UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID);
 
-UA_StatusCode
-UA_Session_deleteMonitoredItem(UA_Session *session, UA_UInt32 subscriptionID,
-                               UA_UInt32 monitoredItemID);
-
 UA_StatusCode
 UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,
                               UA_UInt32 subscriptionID);

+ 6 - 2
tools/schema/datatypes_minimal.txt

@@ -53,7 +53,6 @@ PublishRequest
 PublishResponse
 FindServersRequest
 FindServersResponse
-SetPublishingModeResponse
 SubscriptionAcknowledgement
 ReadRequest
 ReadResponse
@@ -62,7 +61,6 @@ TimestampsToReturn
 WriteRequest
 WriteResponse
 WriteValue
-SetPublishingModeRequest
 CreateMonitoredItemsResponse
 MonitoredItemCreateResult
 CreateMonitoredItemsRequest
@@ -143,6 +141,8 @@ QueryNextRequest
 QueryNextResponse
 CreateSubscriptionRequest
 CreateSubscriptionResponse
+SetPublishingModeRequest
+SetPublishingModeResponse
 DeleteMonitoredItemsRequest
 DeleteMonitoredItemsResponse
 NotificationMessage
@@ -152,3 +152,7 @@ ModifySubscriptionRequest
 ModifySubscriptionResponse
 RepublishRequest
 RepublishResponse
+MonitoredItemModifyRequest
+ModifyMonitoredItemsRequest
+MonitoredItemModifyResult
+ModifyMonitoredItemsResponse