Browse Source

Merge pull request #295 from acplt/dev

Merging dev - subscriptions are partially there
Sten Grüner 9 years ago
parent
commit
feb85f7d4f

+ 22 - 2
CMakeLists.txt

@@ -56,7 +56,6 @@ if(CMAKE_COMPILER_IS_GNUCC OR "x${CMAKE_C_COMPILER_ID}" STREQUAL "xClang")
         set(CMAKE_C_LINK_FLAGS "${CMAKE_C_LINK_FLAGS} -Wl,-z,norelro -Wl,--hash-style=gnu -Wl,--build-id=none")
 	endif()
     if(CMAKE_BUILD_TYPE STREQUAL "MinSizeRel" OR CMAKE_BUILD_TYPE STREQUAL "Release")
-#        add_definitions(-flto)
         set(CMAKE_C_LINK_FLAGS "${CMAKE_C_LINK_FLAGS} -s")
         set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -s")
     endif()
@@ -79,8 +78,8 @@ set(internal_headers ${PROJECT_SOURCE_DIR}/src/ua_util.h
                      ${PROJECT_BINARY_DIR}/src_generated/ua_transport_generated.h
                      ${PROJECT_SOURCE_DIR}/src/ua_types_encoding_binary.h
                      ${PROJECT_SOURCE_DIR}/src/ua_securechannel.h
-                     ${PROJECT_SOURCE_DIR}/src/ua_session.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_nodes.h
+                     ${PROJECT_SOURCE_DIR}/src/ua_session.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_nodestore.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_session_manager.h
                      ${PROJECT_SOURCE_DIR}/src/server/ua_securechannel_manager.h
@@ -113,6 +112,26 @@ set(lib_sources ${PROJECT_SOURCE_DIR}/src/ua_types.c
 
 ## generate code from xml definitions
 file(MAKE_DIRECTORY "${PROJECT_BINARY_DIR}/src_generated")
+
+option(ENABLE_SUBSCRIPTIONS "Enable compilation of subscription and monitoring support." OFF)
+if(ENABLE_SUBSCRIPTIONS)
+  add_definitions(-DENABLE_SUBSCRIPTIONS)
+  list(APPEND lib_sources ${PROJECT_SOURCE_DIR}/src/server/ua_services_subscription.c
+                          ${PROJECT_SOURCE_DIR}/src/server/ua_subscription.c
+                          ${PROJECT_SOURCE_DIR}/src/server/ua_subscription_manager.c)
+  ##append subscription headers at before ua_session.
+  list(FIND internal_headers "${PROJECT_SOURCE_DIR}/src/ua_session.h" UaSessionPos)
+  list(INSERT internal_headers  ${UaSessionPos} ${PROJECT_SOURCE_DIR}/src/server/ua_subscription.h
+                          ${PROJECT_SOURCE_DIR}/src/server/ua_subscription_manager.h)
+                          
+  add_custom_command(OUTPUT ${PROJECT_BINARY_DIR}/src_generated/ua_types_generated.c
+                            ${PROJECT_BINARY_DIR}/src_generated/ua_types_generated.h
+                     PRE_BUILD
+                     COMMAND ${PYTHON_EXECUTABLE} ${PROJECT_SOURCE_DIR}/tools/generate_datatypes.py --enable-subscription-types=1 --typedescriptions ${PROJECT_SOURCE_DIR}/tools/schema/NodeIds.csv 0 ${PROJECT_SOURCE_DIR}/tools/schema/Opc.Ua.Types.bsd ${PROJECT_BINARY_DIR}/src_generated/ua_types
+                     DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/tools/generate_datatypes.py
+                             ${CMAKE_CURRENT_SOURCE_DIR}/tools/schema/Opc.Ua.Types.bsd
+                             ${CMAKE_CURRENT_SOURCE_DIR}/tools/schema/NodeIds.csv)
+else()
 add_custom_command(OUTPUT ${PROJECT_BINARY_DIR}/src_generated/ua_types_generated.c
                           ${PROJECT_BINARY_DIR}/src_generated/ua_types_generated.h
                    PRE_BUILD
@@ -120,6 +139,7 @@ add_custom_command(OUTPUT ${PROJECT_BINARY_DIR}/src_generated/ua_types_generated
                    DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/tools/generate_datatypes.py
                            ${CMAKE_CURRENT_SOURCE_DIR}/tools/schema/Opc.Ua.Types.bsd
                            ${CMAKE_CURRENT_SOURCE_DIR}/tools/schema/NodeIds.csv)
+endif()
 
 add_custom_command(OUTPUT ${PROJECT_BINARY_DIR}/src_generated/ua_transport_generated.c
                           ${PROJECT_BINARY_DIR}/src_generated/ua_transport_generated.h

+ 40 - 2
examples/client.c

@@ -13,6 +13,12 @@
 
 #include <stdio.h>
 
+void handler_TheAnswerChanged(UA_UInt32 handle, UA_DataValue *value);
+void handler_TheAnswerChanged(UA_UInt32 handle, UA_DataValue *value) {
+    printf("The Answer has changed!\n");
+    return;
+}
+
 int main(int argc, char *argv[]) {
     UA_Client *client = UA_Client_new(UA_ClientConfig_standard, Logger_Stdout_new());
     UA_StatusCode retval = UA_Client_connect(client, ClientNetworkLayerTCP_connect,
@@ -53,7 +59,28 @@ int main(int argc, char *argv[]) {
     }
     UA_BrowseRequest_deleteMembers(&bReq);
     UA_BrowseResponse_deleteMembers(&bResp);
-
+    
+#ifdef ENABLE_SUBSCRIPTIONS
+    // Create a subscription with interval 0 (immediate)...
+    UA_Int32 subId = UA_Client_newSubscription(client, 0);
+    if (subId)
+        printf("Create subscription succeeded, id %u\n", subId);
+    
+    // .. and monitor TheAnswer
+    UA_NodeId monitorThis;
+    monitorThis = UA_NODEID_STRING_ALLOC(1, "the.answer");
+    UA_UInt32 monId = UA_Client_monitorItemChanges(client, subId, monitorThis, UA_ATTRIBUTEID_VALUE, &handler_TheAnswerChanged );
+    if (monId)
+        printf("Monitoring 'the.answer', id %u\n", subId);
+    UA_NodeId_deleteMembers(&monitorThis);
+    
+    // First Publish always generates data (current value) and call out handler.
+    UA_Client_doPublish(client);
+    
+    // This should not generate anything
+    UA_Client_doPublish(client);
+#endif
+    
     UA_Int32 value = 0;
     // Read node's value
     printf("\nReading the value of node (1, \"the.answer\"):\n");
@@ -96,6 +123,15 @@ int main(int argc, char *argv[]) {
     UA_WriteRequest_deleteMembers(&wReq);
     UA_WriteResponse_deleteMembers(&wResp);
 
+#ifdef ENABLE_SUBSCRIPTIONS
+    // Take another look at the.answer... this should call the handler.
+    UA_Client_doPublish(client);
+    
+    // Delete our subscription (which also unmonitors all items)
+    if(!UA_Client_removeSubscription(client, subId))
+        printf("Subscription removed\n");
+#endif
+    
 #ifdef ENABLE_METHODCALLS
     /* Note:  This example requires Namespace 0 Node 11489 (ServerType -> GetMonitoredItems) 
        FIXME: Provide a namespace 0 independant example on the server side
@@ -109,7 +145,8 @@ int main(int argc, char *argv[]) {
     UA_Int32 outputSize;
     UA_Variant *output;
     
-    retval = UA_Client_CallServerMethod(client, UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER), UA_NODEID_NUMERIC(1, 62541), 1, &input, &outputSize, &output);
+    retval = UA_Client_CallServerMethod(client, UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
+                                        UA_NODEID_NUMERIC(1, 62541), 1, &input, &outputSize, &output);
     if(retval == UA_STATUSCODE_GOOD) {
         printf("Method call was successfull, and %i returned values available.\n", outputSize);
         UA_Array_delete(output, &UA_TYPES[UA_TYPES_VARIANT], outputSize);
@@ -119,6 +156,7 @@ int main(int argc, char *argv[]) {
     UA_Variant_deleteMembers(&input);
 
 #endif
+
 #ifdef ENABLE_ADDNODES 
     /* Create a new object type node */
     // New ReferenceType

+ 54 - 1
include/ua_client.h

@@ -10,7 +10,8 @@ extern "C" {
 #include "ua_connection.h"
 #include "ua_log.h"
 #include "ua_types_generated.h"
-    
+#include "queue.h"
+
 struct UA_Client;
 typedef struct UA_Client UA_Client;
 
@@ -89,4 +90,56 @@ UA_AddNodesResponse UA_EXPORT *UA_Client_createObjectTypeNode(UA_Client *client,
 } // extern "C"
 #endif
 
+
+#ifdef ENABLE_SUBSCRIPTIONS
+typedef struct UA_Client_NotificationsAckNumber_s {
+    UA_SubscriptionAcknowledgement subAck;
+    LIST_ENTRY(UA_Client_NotificationsAckNumber_s) listEntry;
+} UA_Client_NotificationsAckNumber;
+
+typedef struct UA_Client_MonitoredItem_s {
+    UA_UInt32          MonitoredItemId;
+    UA_UInt32          MonitoringMode;
+    UA_NodeId          monitoredNodeId; 
+    UA_UInt32          AttributeID;
+    UA_UInt32          ClientHandle;
+    UA_UInt32          SamplingInterval;
+    UA_UInt32          QueueSize;
+    UA_Boolean         DiscardOldest;
+    void               (*handler)(UA_UInt32 handle, UA_DataValue *value);
+    LIST_ENTRY(UA_Client_MonitoredItem_s)  listEntry;
+} UA_Client_MonitoredItem;
+
+typedef struct UA_Client_Subscription_s {
+    UA_UInt32    LifeTime;
+    UA_Int32     KeepAliveCount;
+    UA_DateTime  PublishingInterval;
+    UA_UInt32    SubscriptionID;
+    UA_Int32     NotificationsPerPublish;
+    UA_UInt32    Priority;
+    LIST_ENTRY(UA_Client_Subscription_s) listEntry; 
+    LIST_HEAD(UA_ListOfClientMonitoredItems, UA_Client_MonitoredItem_s) MonitoredItems;
+} UA_Client_Subscription;
+
+UA_CreateSubscriptionResponse   UA_EXPORT UA_Client_createSubscription(UA_Client *client, UA_CreateSubscriptionRequest *request);
+UA_ModifySubscriptionResponse   UA_EXPORT UA_Client_modifySubscription(UA_Client *client, UA_ModifySubscriptionRequest *request);
+UA_DeleteSubscriptionsResponse  UA_EXPORT UA_Client_deleteSubscriptions(UA_Client *client, UA_DeleteSubscriptionsRequest *request);
+UA_CreateMonitoredItemsResponse UA_EXPORT UA_Client_createMonitoredItems(UA_Client *client, UA_CreateMonitoredItemsRequest *request);
+UA_DeleteMonitoredItemsResponse UA_EXPORT UA_Client_deleteMonitoredItems(UA_Client *client, UA_DeleteMonitoredItemsRequest *request);
+UA_PublishResponse              UA_EXPORT UA_Client_publish(UA_Client *client, UA_PublishRequest *request);
+
+UA_Int32      UA_EXPORT UA_Client_newSubscription(UA_Client *client, UA_Int32 publishInterval);
+UA_StatusCode UA_EXPORT UA_Client_removeSubscription(UA_Client *client, UA_UInt32 subscriptionId);
+//void UA_EXPORT UA_Client_modifySubscription(UA_Client *client);
+
+UA_UInt32     UA_EXPORT UA_Client_monitorItemChanges(UA_Client *client, UA_UInt32 subscriptionId, 
+                                                     UA_NodeId nodeId, UA_UInt32 attributeID, 
+                                                     void *handlingFunction);
+UA_StatusCode UA_EXPORT UA_Client_unMonitorItemChanges(UA_Client *client, UA_UInt32 subscriptionId, 
+                                                       UA_UInt32 monitoredItemId );
+
+void UA_EXPORT UA_Client_doPublish(UA_Client *client);
+UA_Boolean UA_Client_processPublishRx(UA_Client *client, UA_PublishResponse response);
+
+#endif
 #endif /* UA_CLIENT_H_ */

+ 3 - 0
include/ua_server.h

@@ -154,6 +154,9 @@ UA_Server_addMethodNode(UA_Server *server, const UA_QualifiedName browseName, UA
                         UA_MethodCallback method, UA_Int32 inputArgumentsSize,
                         const UA_Argument *inputArguments, UA_Int32 outputArgumentsSize,
                         const UA_Argument *outputArguments);
+
+UA_StatusCode UA_EXPORT
+UA_Server_attachMethod_toNode(UA_Server *server, UA_NodeId methodNodeId, UA_MethodCallback method);
 #endif
 
 /** Jobs describe work that is executed once or repeatedly. */

+ 367 - 2
src/client/ua_client.c

@@ -16,7 +16,13 @@ struct UA_Client {
     UA_UserTokenPolicy token;
     UA_NodeId sessionId;
     UA_NodeId authenticationToken;
-
+    
+#ifdef ENABLE_SUBSCRIPTIONS
+    UA_Int32 monitoredItemHandles;
+    LIST_HEAD(UA_ListOfUnacknowledgedNotificationNumbers, UA_Client_NotificationsAckNumber_s) pendingNotificationsAcks;
+    LIST_HEAD(UA_ListOfClientSubscriptionItems, UA_Client_Subscription_s) subscriptions;
+#endif
+    
     /* Config */
     UA_Logger logger;
     UA_ClientConfig config;
@@ -44,7 +50,12 @@ UA_Client * UA_Client_new(UA_ClientConfig config, UA_Logger logger) {
     client->logger = logger;
     client->config = config;
     client->scExpiresAt = 0;
-    
+
+#ifdef ENABLE_SUBSCRIPTIONS
+    client->monitoredItemHandles = 0;
+    LIST_INIT(&client->pendingNotificationsAcks);
+    LIST_INIT(&client->subscriptions);
+#endif
     return client;
 }
 
@@ -579,6 +590,360 @@ UA_DeleteReferencesResponse UA_Client_deleteReferences(UA_Client *client, UA_Del
     return response;
 }
 
+#ifdef ENABLE_SUBSCRIPTIONS
+UA_CreateSubscriptionResponse UA_Client_createSubscription(UA_Client *client, UA_CreateSubscriptionRequest *request) {
+    UA_CreateSubscriptionResponse response;
+    synchronousRequest(client, request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST],
+                       &response, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE]);
+    return response;
+}
+
+UA_DeleteSubscriptionsResponse UA_Client_deleteSubscriptions(UA_Client *client, UA_DeleteSubscriptionsRequest *request) {
+    UA_DeleteSubscriptionsResponse response;
+    synchronousRequest(client, request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST],
+                       &response, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE]);
+    return response;
+}
+
+UA_ModifySubscriptionResponse UA_Client_modifySubscription(UA_Client *client, UA_ModifySubscriptionRequest *request) {
+    UA_ModifySubscriptionResponse response;
+    synchronousRequest(client, request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
+                       &response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]);
+    return response;
+}
+
+UA_CreateMonitoredItemsResponse UA_Client_createMonitoredItems(UA_Client *client, UA_CreateMonitoredItemsRequest *request) {
+    UA_CreateMonitoredItemsResponse response;
+    synchronousRequest(client, request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
+                       &response, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE]);
+    return response;
+}
+
+UA_DeleteMonitoredItemsResponse UA_Client_deleteMonitoredItems(UA_Client *client, UA_DeleteMonitoredItemsRequest *request) {
+    UA_DeleteMonitoredItemsResponse response;
+    synchronousRequest(client, request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST],
+                       &response, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE]);
+    return response;
+}
+
+UA_PublishResponse UA_Client_publish(UA_Client *client, UA_PublishRequest *request) {
+    UA_PublishResponse response;
+    synchronousRequest(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST],
+                       &response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]);
+    return response;
+}
+
+UA_Int32 UA_Client_newSubscription(UA_Client *client, UA_Int32 publishInterval) {
+    UA_Int32 retval;
+    UA_CreateSubscriptionRequest aReq;
+    UA_CreateSubscriptionResponse aRes;
+    UA_CreateSubscriptionRequest_init(&aReq);
+    UA_CreateSubscriptionResponse_init(&aRes);
+    
+    aReq.maxNotificationsPerPublish = 10;
+    aReq.priority = 0;
+    aReq.publishingEnabled = UA_TRUE;
+    aReq.requestedLifetimeCount = 100;
+    aReq.requestedMaxKeepAliveCount = 10;
+    aReq.requestedPublishingInterval = publishInterval;
+    
+    aRes = UA_Client_createSubscription(client, &aReq);
+    
+    if (aRes.responseHeader.serviceResult == UA_STATUSCODE_GOOD) {
+        UA_Client_Subscription *newSub = UA_malloc(sizeof(UA_Client_Subscription));
+        LIST_INIT(&newSub->MonitoredItems);
+        
+        newSub->LifeTime = aRes.revisedLifetimeCount;
+        newSub->KeepAliveCount = aRes.revisedMaxKeepAliveCount;
+        newSub->PublishingInterval = aRes.revisedPublishingInterval;
+        newSub->SubscriptionID = aRes.subscriptionId;
+        newSub->NotificationsPerPublish = aReq.maxNotificationsPerPublish;
+        newSub->Priority = aReq.priority;
+        retval = newSub->SubscriptionID;
+        LIST_INSERT_HEAD(&(client->subscriptions), newSub, listEntry);
+    } else
+        retval = 0;
+    
+    UA_CreateSubscriptionResponse_deleteMembers(&aRes);
+    UA_CreateSubscriptionRequest_deleteMembers(&aReq);
+    return retval;
+}
+
+UA_StatusCode UA_Client_removeSubscription(UA_Client *client, UA_UInt32 subscriptionId) {
+    UA_Client_Subscription *sub;
+    UA_StatusCode retval = UA_STATUSCODE_GOOD;
+    
+    LIST_FOREACH(sub, &(client->subscriptions), listEntry) {
+        if (sub->SubscriptionID == subscriptionId)
+            break;
+    }
+    
+    // Problem? We do not have this subscription registeres. Maybe the server should
+    // be consulted at this point?
+    if (sub == NULL)
+        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+    
+    UA_DeleteSubscriptionsRequest  request;
+    UA_DeleteSubscriptionsResponse response;
+    UA_DeleteSubscriptionsRequest_init(&request);
+    UA_DeleteSubscriptionsResponse_init(&response);
+    
+    request.subscriptionIdsSize=1;
+    request.subscriptionIds = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32));
+    *(request.subscriptionIds) = sub->SubscriptionID;
+    
+    UA_Client_MonitoredItem *mon;
+    LIST_FOREACH(mon, &(sub->MonitoredItems), listEntry) {
+        retval |= UA_Client_unMonitorItemChanges(client, sub->SubscriptionID, mon->MonitoredItemId);
+    }
+    if (retval != UA_STATUSCODE_GOOD)
+        return retval;
+    
+    response = UA_Client_deleteSubscriptions(client, &request);
+    
+    if (response.resultsSize > 0)
+        retval = response.results[0];
+    else
+        retval = response.responseHeader.serviceResult;
+    
+    if (retval == UA_STATUSCODE_GOOD) {
+        LIST_REMOVE(sub, listEntry);
+        UA_free(sub);
+    }
+    UA_DeleteSubscriptionsRequest_deleteMembers(&request);
+    UA_DeleteSubscriptionsResponse_deleteMembers(&response);
+    return retval;
+}
+
+UA_UInt32 UA_Client_monitorItemChanges(UA_Client *client, UA_UInt32 subscriptionId,
+                                       UA_NodeId nodeId, UA_UInt32 attributeID, void *handlingFunction) {
+    UA_Client_Subscription *sub;
+    UA_StatusCode retval = 0;
+    
+    LIST_FOREACH(sub, &(client->subscriptions), listEntry) {
+        if (sub->SubscriptionID == subscriptionId)
+            break;
+    }
+    
+    // Maybe the same problem as in DeleteSubscription... ask the server?
+    if (sub == NULL)
+        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+    
+    UA_CreateMonitoredItemsRequest request;
+    UA_CreateMonitoredItemsResponse response;
+    UA_CreateMonitoredItemsRequest_init(&request);
+    UA_CreateMonitoredItemsResponse_init(&response);
+    request.subscriptionId = subscriptionId;
+    request.itemsToCreateSize = 1;
+    request.itemsToCreate = UA_MonitoredItemCreateRequest_new();
+    UA_NodeId_copy(&nodeId, &((request.itemsToCreate[0]).itemToMonitor.nodeId));
+    (request.itemsToCreate[0]).itemToMonitor.attributeId = attributeID;
+    (request.itemsToCreate[0]).monitoringMode = UA_MONITORINGMODE_REPORTING;
+    (request.itemsToCreate[0]).requestedParameters.clientHandle = ++(client->monitoredItemHandles);
+    (request.itemsToCreate[0]).requestedParameters.samplingInterval = sub->PublishingInterval;
+    (request.itemsToCreate[0]).requestedParameters.discardOldest = UA_TRUE;
+    (request.itemsToCreate[0]).requestedParameters.queueSize = 1;
+    // Filter can be left void for now, only changes are supported (UA_Expert does the same with changeItems)
+    
+    response = UA_Client_createMonitoredItems(client, &request);
+    
+    // slight misuse of retval here to check if the deletion was successfull.
+    if (response.resultsSize == 0)
+        retval = response.responseHeader.serviceResult;
+    else
+        retval = response.results[0].statusCode;
+    
+    if (retval == UA_STATUSCODE_GOOD) {
+        UA_Client_MonitoredItem *newMon = (UA_Client_MonitoredItem *) UA_malloc(sizeof(UA_Client_MonitoredItem));
+        newMon->MonitoringMode = UA_MONITORINGMODE_REPORTING;
+        UA_NodeId_copy(&nodeId, &(newMon->monitoredNodeId)); 
+        newMon->AttributeID = attributeID;
+        newMon->ClientHandle = client->monitoredItemHandles;
+        newMon->SamplingInterval = sub->PublishingInterval;
+        newMon->QueueSize = 1;
+        newMon->DiscardOldest = UA_TRUE;
+        newMon->handler = handlingFunction;
+        newMon->MonitoredItemId = response.results[0].monitoredItemId;
+        
+        LIST_INSERT_HEAD(&(sub->MonitoredItems), newMon, listEntry);
+        retval = newMon->MonitoredItemId ;
+    }
+    else {
+        retval = 0;
+    }
+    
+    UA_CreateMonitoredItemsRequest_deleteMembers(&request);
+    UA_CreateMonitoredItemsResponse_deleteMembers(&response);
+    
+    return retval;
+}
+
+UA_StatusCode UA_Client_unMonitorItemChanges(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId ) {
+    UA_Client_Subscription *sub;
+    UA_StatusCode retval = 0;
+    
+    LIST_FOREACH(sub, &(client->subscriptions), listEntry) {
+        if (sub->SubscriptionID == subscriptionId)
+            break;
+    }
+    // Maybe the same problem as in DeleteSubscription... ask the server?
+    if (sub == NULL)
+        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+    
+    UA_Client_MonitoredItem *mon;
+    LIST_FOREACH(mon, &(sub->MonitoredItems), listEntry) {
+        if (mon->MonitoredItemId == monitoredItemId)
+            break;
+    }
+    // Also... ask the server?
+    if(mon==NULL) {
+        return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
+    }
+    
+    UA_DeleteMonitoredItemsRequest request;
+    UA_DeleteMonitoredItemsResponse response;
+    UA_DeleteMonitoredItemsRequest_init(&request);
+    UA_DeleteMonitoredItemsResponse_init(&response);
+    
+    request.subscriptionId = sub->SubscriptionID;
+    request.monitoredItemIdsSize = 1;
+    request.monitoredItemIds = (UA_UInt32 *) UA_malloc(sizeof(UA_UInt32));
+    request.monitoredItemIds[0] = mon->MonitoredItemId;
+    
+    response = UA_Client_deleteMonitoredItems(client, &request);
+    if (response.resultsSize > 1)
+        retval = response.results[0];
+    else
+        retval = response.responseHeader.serviceResult;
+    
+    if (retval == 0) {
+        LIST_REMOVE(mon, listEntry);
+        UA_free(mon);
+    }
+    
+    UA_DeleteMonitoredItemsRequest_deleteMembers(&request);
+    UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
+    
+    return retval;
+}
+
+UA_Boolean UA_Client_processPublishRx(UA_Client *client, UA_PublishResponse response) {
+    UA_Client_Subscription *sub;
+    UA_Client_MonitoredItem *mon;
+    UA_StatusCode retval = UA_STATUSCODE_GOOD;
+    
+    if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
+        return UA_FALSE;
+    
+    // Check if the server has acknowledged any of our ACKS
+    // Note that a list of serverside status codes may be send without valid publish data, i.e. 
+    // during keepalives or no data availability
+    UA_Client_NotificationsAckNumber *tmpAck = client->pendingNotificationsAcks.lh_first;
+    UA_Client_NotificationsAckNumber *nxtAck = tmpAck;
+    for(int i=0; i<response.resultsSize && nxtAck != NULL; i++) {
+        tmpAck = nxtAck;
+        nxtAck = tmpAck->listEntry.le_next;
+        if (response.results[i] == UA_STATUSCODE_GOOD) {
+            LIST_REMOVE(tmpAck, listEntry);
+            UA_free(tmpAck);
+        }
+    }
+    
+    if(response.subscriptionId == 0)
+        return UA_FALSE;
+    
+    LIST_FOREACH(sub, &(client->subscriptions), listEntry) {
+        if (sub->SubscriptionID == response.subscriptionId)
+            break;
+    }
+    if (sub == NULL)
+        return UA_FALSE;
+    
+    UA_NotificationMessage msg = response.notificationMessage;
+    UA_DataChangeNotification dataChangeNotification;
+    size_t decodingOffset = 0;
+    for (int k=0; k<msg.notificationDataSize; k++) {
+        if (msg.notificationData[k].encoding == UA_EXTENSIONOBJECT_ENCODINGMASK_BODYISBYTESTRING) {
+            if (msg.notificationData[k].typeId.namespaceIndex == 0 && msg.notificationData[k].typeId.identifier.numeric == 811 ) {
+                // This is a dataChangeNotification
+                retval |= UA_DataChangeNotification_decodeBinary(&(msg.notificationData[k].body), &decodingOffset, &dataChangeNotification);
+                UA_MonitoredItemNotification *mitemNot;
+                for(int i=0; i<dataChangeNotification.monitoredItemsSize; i++) {
+                    mitemNot = &dataChangeNotification.monitoredItems[i];
+                    // find this client handle
+                    LIST_FOREACH(mon, &(sub->MonitoredItems), listEntry) {
+                        if (mon->ClientHandle == mitemNot->clientHandle) {
+                            mon->handler(mitemNot->clientHandle, &(mitemNot->value));
+                            break;
+                        }
+                    }
+                }
+            }
+            else if (msg.notificationData[k].typeId.namespaceIndex == 0 && msg.notificationData[k].typeId.identifier.numeric == 820 ) {
+                //FIXME: This is a statusChangeNotification (not supported yet)
+                continue;
+            }
+            else if (msg.notificationData[k].typeId.namespaceIndex == 0 && msg.notificationData[k].typeId.identifier.numeric == 916 ) {
+                //FIXME: This is an EventNotification
+                continue;
+            }
+        }
+    }
+    
+    // We processed this message, add it to the list of pending acks (but make sure it's not in the list first)
+    LIST_FOREACH(tmpAck, &(client->pendingNotificationsAcks), listEntry) {
+        if (tmpAck->subAck.sequenceNumber == msg.sequenceNumber &&
+            tmpAck->subAck.subscriptionId == response.subscriptionId)
+            break;
+    }
+    if (tmpAck == NULL ){
+        tmpAck = (UA_Client_NotificationsAckNumber *) malloc(sizeof(UA_Client_NotificationsAckNumber));
+        tmpAck->subAck.sequenceNumber = msg.sequenceNumber;
+        tmpAck->subAck.subscriptionId = sub->SubscriptionID;
+        LIST_INSERT_HEAD(&(client->pendingNotificationsAcks), tmpAck, listEntry);
+    }
+    
+    return response.moreNotifications;
+}
+
+void UA_Client_doPublish(UA_Client *client) {
+    UA_PublishRequest request;
+    UA_PublishResponse response;
+    UA_Client_NotificationsAckNumber *ack;
+    UA_Boolean moreNotifications = UA_TRUE;
+    int index = 0 ;
+    
+    do {
+        UA_PublishRequest_init(&request);
+        UA_PublishResponse_init(&response);
+        
+        request.subscriptionAcknowledgementsSize = 0;
+        LIST_FOREACH(ack, &(client->pendingNotificationsAcks), listEntry) {
+            request.subscriptionAcknowledgementsSize++;
+        }
+        request.subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement *) UA_malloc(sizeof(UA_SubscriptionAcknowledgement)*request.subscriptionAcknowledgementsSize);
+        
+        index = 0;
+        LIST_FOREACH(ack, &(client->pendingNotificationsAcks), listEntry) {
+            ack = client->pendingNotificationsAcks.lh_first;
+            request.subscriptionAcknowledgements[index].sequenceNumber = ack->subAck.sequenceNumber;
+            request.subscriptionAcknowledgements[index].subscriptionId = ack->subAck.subscriptionId;
+            index++;
+        }
+        
+        response = UA_Client_publish(client, &request);
+        if (response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
+            moreNotifications = UA_Client_processPublishRx(client, response);
+        else
+            moreNotifications = UA_FALSE;
+        
+        UA_PublishResponse_deleteMembers(&response);
+        UA_PublishRequest_deleteMembers(&request);
+    }  while(moreNotifications == UA_TRUE);
+    return;
+}
+
+#endif
 
 /**********************************/
 /* User-Facing Macros-Function    */

+ 13 - 12
src/server/ua_nodestore.h

@@ -28,15 +28,15 @@
 struct UA_NodeStore;
 typedef struct UA_NodeStore UA_NodeStore;
 
-/** Create a new namespace */
+/** Create a new nodestore */
 UA_NodeStore * UA_NodeStore_new(void);
 
-/** Delete the namespace and all nodes in it */
+/** Delete the nodestore and all nodes in it */
 void UA_NodeStore_delete(UA_NodeStore *ns);
 
 /**
- * Inserts a new node into the namespace. If the nodeid is zero, then a fresh
- * numeric nodeid from namespace 1 is assigned. The memory of the original node
+ * Inserts a new node into the nodestore. If the nodeid is zero, then a fresh
+ * numeric nodeid from nodestore 1 is assigned. The memory of the original node
  * is freed and the content is moved to a managed (immutable) node. If inserted
  * is not NULL, then a pointer to the managed node is returned (and must be
  * released).
@@ -51,31 +51,32 @@ UA_StatusCode UA_NodeStore_insert(UA_NodeStore *ns, UA_Node *node, const UA_Node
 UA_StatusCode UA_NodeStore_replace(UA_NodeStore *ns, const UA_Node *oldNode, UA_Node *node, const UA_Node **inserted);
 
 /**
- * Remove a node from the namespace. Always succeeds, even if the node was not
+ * Remove a node from the nodestore. Always succeeds, even if the node was not
  * found.
  */
 UA_StatusCode UA_NodeStore_remove(UA_NodeStore *ns, const UA_NodeId *nodeid);
 
 /**
- * Retrieve a node (read-only) from the namespace. Nodes are immutable. They
- * can only be replaced. After the Node is no longer used, the locked entry
- * needs to be released.
+ * Retrieve a managed node (read-only) from the nodestore. Nodes are reference-
+ * counted (for garbage collection) and immutable. They can only be replaced
+ * entirely. After the node is no longer used, it needs to be released to decrease
+ * the reference count.
  */
 const UA_Node * UA_NodeStore_get(const UA_NodeStore *ns, const UA_NodeId *nodeid);
 
 /**
- * Release a managed node. Do never insert a node that isn't stored in a
- * namespace.
+ * Release a managed node. Do never call this with a node that isn't managed by a
+ * nodestore.
  */
 void UA_NodeStore_release(const UA_Node *managed);
 
 /**
- * A function that can be evaluated on all entries in a namespace via
+ * A function that can be evaluated on all entries in a nodestore via
  * UA_NodeStore_iterate. Note that the visitor is read-only on the nodes.
  */
 typedef void (*UA_NodeStore_nodeVisitor)(const UA_Node *node);
 
-/** Iterate over all nodes in a namespace. */
+/** Iterate over all nodes in a nodestore. */
 void UA_NodeStore_iterate(const UA_NodeStore *ns, UA_NodeStore_nodeVisitor visitor);
 
 /** @} */

+ 0 - 1
src/server/ua_securechannel_manager.c

@@ -48,7 +48,6 @@ void UA_SecureChannelManager_cleanupTimedOut(UA_SecureChannelManager *cm, UA_Dat
     }
 }
 
-
 UA_StatusCode UA_SecureChannelManager_open(UA_SecureChannelManager *cm, UA_Connection *conn,
                                            const UA_OpenSecureChannelRequest *request,
                                            UA_OpenSecureChannelResponse *response) {

+ 2 - 3
src/server/ua_server.c

@@ -155,7 +155,7 @@ void UA_Server_delete(UA_Server *server) {
     // Delete all internal data
     UA_ApplicationDescription_deleteMembers(&server->description);
     UA_SecureChannelManager_deleteMembers(&server->secureChannelManager);
-    UA_SessionManager_deleteMembers(&server->sessionManager);
+    UA_SessionManager_deleteMembers(&server->sessionManager, server);
     UA_NodeStore_delete(server->nodestore);
 #ifdef UA_EXTERNAL_NAMESPACES
     UA_Server_deleteExternalNamespaces(server);
@@ -183,7 +183,7 @@ void UA_Server_delete(UA_Server *server) {
 /* Recurring cleanup. Removing unused and timed-out channels and sessions */
 static void UA_Server_cleanup(UA_Server *server, void *nothing) {
     UA_DateTime now = UA_DateTime_now();
-    UA_SessionManager_cleanupTimedOut(&server->sessionManager, now);
+    UA_SessionManager_cleanupTimedOut(&server->sessionManager, server, now);
     UA_SecureChannelManager_cleanupTimedOut(&server->secureChannelManager, now);
 }
 
@@ -793,7 +793,6 @@ UA_Server * UA_Server_new(UA_ServerConfig config) {
     addDataTypeNode(server, "Enumeration", UA_NS0ID_ENUMERATION, UA_NS0ID_BASEDATATYPE);
         addDataTypeNode(server, "ServerState", UA_NS0ID_SERVERSTATE, UA_NS0ID_ENUMERATION);
 
-
    UA_ObjectNode *variabletypes = UA_ObjectNode_new();
    copyNames((UA_Node*)variabletypes, "VariableTypes");
    variabletypes->nodeId.identifier.numeric = UA_NS0ID_VARIABLETYPESFOLDER;

+ 26 - 0
src/server/ua_server_addressspace.c

@@ -445,4 +445,30 @@ UA_Server_addMethodNode(UA_Server *server, const UA_QualifiedName browseName, UA
     
     return retval;
 }
+
+UA_StatusCode
+UA_Server_attachMethod_toNode(UA_Server *server, UA_NodeId methodNodeId, UA_MethodCallback method) {
+    UA_StatusCode retval = UA_STATUSCODE_GOOD;
+    const UA_Node *attachToMethod;
+    UA_MethodNode *replacementMethod = UA_MethodNode_new();
+    
+    if (!method)
+        return UA_STATUSCODE_BADMETHODINVALID;
+    if (!server)
+        return UA_STATUSCODE_BADSERVERINDEXINVALID;
+    
+    attachToMethod =  UA_NodeStore_get(server->nodestore, &methodNodeId);
+    if (!attachToMethod)
+        return UA_STATUSCODE_BADNODEIDINVALID;
+    
+    if (attachToMethod->nodeClass != UA_NODECLASS_METHOD)
+        return UA_STATUSCODE_BADNODEIDINVALID;
+    
+    UA_MethodNode_copy((const UA_MethodNode *) attachToMethod, replacementMethod);
+    UA_NodeStore_release(attachToMethod);
+    
+    replacementMethod->attachedMethod = method;
+    retval |= UA_NodeStore_replace(server->nodestore, attachToMethod, (UA_Node *) replacementMethod, UA_NULL);
+    return retval;
+}
 #endif

+ 24 - 8
src/server/ua_server_binary.c

@@ -157,16 +157,12 @@ static void invoke_service(UA_Server *server, UA_SecureChannel *channel, UA_UInt
     init_response_header(request, response);
     /* try to get the session from the securechannel first */
     UA_Session *session = UA_SecureChannel_getSession(channel, &request->authenticationToken);
-    if(!session)
-        session = UA_SessionManager_getSession(&server->sessionManager, &request->authenticationToken);
-    if(!session)
+    if(!session || session->channel != channel) {
         response->serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;
-    else if(session->activated == UA_FALSE) {
+    } else if(session->activated == UA_FALSE) {
         response->serviceResult = UA_STATUSCODE_BADSESSIONNOTACTIVATED;
         /* the session is invalidated FIXME: do this delayed*/
-        UA_SessionManager_removeSession(&server->sessionManager, &request->authenticationToken);
-    } else if(session->channel != channel) {
-        response->serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;
+        UA_SessionManager_removeSession(&server->sessionManager, server, &request->authenticationToken);
     } else {
         UA_Session_updateLifetime(session);
         service(server, session, request, response);
@@ -299,7 +295,7 @@ static void processMSG(UA_Connection *connection, UA_Server *server, const UA_By
         UA_ActivateSessionResponse_deleteMembers(&r);
         break;
     }
-
+    
     case UA_NS0ID_CLOSESESSIONREQUEST:
         INVOKE_SERVICE(CloseSession, UA_TYPES_CLOSESESSIONRESPONSE);
         break;
@@ -327,6 +323,26 @@ static void processMSG(UA_Connection *connection, UA_Server *server, const UA_By
     case UA_NS0ID_TRANSLATEBROWSEPATHSTONODEIDSREQUEST:
         INVOKE_SERVICE(TranslateBrowsePathsToNodeIds, UA_TYPES_TRANSLATEBROWSEPATHSTONODEIDSRESPONSE);
         break;
+#ifdef ENABLE_SUBSCRIPTIONS    
+    case UA_NS0ID_CREATESUBSCRIPTIONREQUEST:
+        INVOKE_SERVICE(CreateSubscription, UA_TYPES_CREATESUBSCRIPTIONRESPONSE);
+        break;
+    case UA_NS0ID_PUBLISHREQUEST:
+        INVOKE_SERVICE(Publish, UA_TYPES_PUBLISHRESPONSE);
+        break;
+    case UA_NS0ID_MODIFYSUBSCRIPTIONREQUEST:
+        INVOKE_SERVICE(ModifySubscription, UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE);
+        break;
+    case UA_NS0ID_DELETESUBSCRIPTIONSREQUEST:
+        INVOKE_SERVICE(DeleteSubscriptions, UA_TYPES_DELETESUBSCRIPTIONSRESPONSE);
+        break;
+    case UA_NS0ID_CREATEMONITOREDITEMSREQUEST:
+        INVOKE_SERVICE(CreateMonitoredItems, UA_TYPES_CREATEMONITOREDITEMSRESPONSE);
+        break;
+    case UA_NS0ID_DELETEMONITOREDITEMSREQUEST:
+        INVOKE_SERVICE(DeleteMonitoredItems, UA_TYPES_DELETEMONITOREDITEMSRESPONSE);
+        break;
+#endif
 #ifdef ENABLE_METHODCALLS
     case UA_NS0ID_CALLREQUEST:
         INVOKE_SERVICE(Call, UA_TYPES_CALLRESPONSE);

+ 5 - 1
src/server/ua_server_internal.h

@@ -7,6 +7,10 @@
 #include "ua_securechannel_manager.h"
 #include "ua_nodestore.h"
 
+#ifdef ENABLE_SUBSCRIPTIONS
+#include "ua_subscription_manager.h"
+#endif
+
 #define PRODUCT_URI "http://open62541.org"
 #define ANONYMOUS_POLICY "open62541-anonymous-policy"
 #define USERNAME_POLICY "open62541-username-policy"
@@ -47,7 +51,7 @@ struct UA_Server {
     UA_String *namespaces;
     size_t externalNamespacesSize;
     UA_ExternalNamespace *externalNamespaces;
-
+     
     /* Jobs with a repetition interval */
     LIST_HEAD(RepeatedJobsList, RepeatedJobs) repeatedJobs;
     

+ 30 - 18
src/server/ua_services.h

@@ -237,6 +237,7 @@ UA_StatusCode writeValue(UA_Server *server, UA_WriteValue *wvalue);
 // Service_Call
 /** @} */
 
+#ifdef ENABLE_SUBSCRIPTIONS
 /**
  * @name MonitoredItem Service Set
  *
@@ -254,13 +255,16 @@ UA_StatusCode writeValue(UA_Server *server, UA_WriteValue *wvalue);
  * links to be deleted, but has no effect on the MonitoredItems referenced by
  * the triggered items.
  */
-/* UA_Int32 Service_CreateMonitoredItems(UA_Server *server, UA_Session *session, */
-/*                                       const UA_CreateMonitoredItemsRequest *request, */
-/*                                       UA_CreateMonitoredItemsResponse *response); */
+void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
+                                       const UA_CreateMonitoredItemsRequest *request, 
+                                       UA_CreateMonitoredItemsResponse *response);
 // Service_ModifyMonitoredItems
 // Service_SetMonitoringMode
 // Service_SetTriggering
-// Service_DeleteMonitoredItems
+void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
+                                  const UA_DeleteMonitoredItemsRequest *request,
+                                  UA_DeleteMonitoredItemsResponse *response);
+                                      
 /** @} */
 
 /**
@@ -270,25 +274,33 @@ UA_StatusCode writeValue(UA_Server *server, UA_WriteValue *wvalue);
  *
  * @{
  */
-// Service_CreateSubscription
-/* UA_Int32 Service_CreateSubscription(UA_Server *server, UA_Session *session, */
-/*                                     const UA_CreateSubscriptionRequest *request, */
-/*                                     UA_CreateSubscriptionResponse *response); */
-// Service_ModifySubscription
-// Service_SetPublishingMode
-/* UA_Int32 Service_SetPublishingMode(UA_Server *server, UA_Session *session, */
-/*                                    const UA_SetPublishingModeRequest *request, */
-/*                                    UA_SetPublishingModeResponse *response); */
-
-/* UA_Int32 Service_Publish(UA_Server *server, UA_Session *session, */
-/*                          const UA_PublishRequest *request, */
-/*                          UA_PublishResponse *response); */
-
+    
+void Service_CreateSubscription(UA_Server *server, UA_Session *session,
+                                const UA_CreateSubscriptionRequest *request,
+                                UA_CreateSubscriptionResponse *response);
+
+void Service_ModifySubscription(UA_Server *server, UA_Session *session,
+                                const UA_ModifySubscriptionRequest *request,
+                                UA_ModifySubscriptionResponse *response);
+
+void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
+                                 const UA_DeleteSubscriptionsRequest *request,
+                                 UA_DeleteSubscriptionsResponse *response);
+                                     
+void Service_Publish(UA_Server *server, UA_Session *session,
+                     const UA_PublishRequest *request, UA_PublishResponse *response);
+                         
+//~ Service_ModifySubscription
+//~ Service_SetPublishingMode
+//~ UA_Int32 Service_SetPublishingMode(UA_Server *server, UA_Session *session,
+                                    //~ const UA_SetPublishingModeRequest *request,
+                                    //~ UA_SetPublishingModeResponse *response);
 // Service_Republish
 // Service_TransferSubscription
 // Service_DeleteSubscription
 /** @} */
 /** @} */
+#endif
 
 #ifdef ENABLE_METHODCALLS
 void Service_Call(UA_Server *server, UA_Session *session,

+ 6 - 3
src/server/ua_services_call.c

@@ -181,8 +181,8 @@ void Service_Call(UA_Server *server, UA_Session *session, const UA_CallRequest *
         const UA_VariableNode *outputArguments = getArgumentsVariableNode(server, methodCalled,
                                                                           UA_STRING("OutputArguments"));
         if(!outputArguments) {
+            // A MethodNode must have an OutputArguments variable (which may be empty)
             rs->statusCode = UA_STATUSCODE_BADINTERNALERROR;
-            UA_NodeStore_release((const UA_Node*)outputArguments);
             continue;
         }
         
@@ -196,8 +196,11 @@ void Service_Call(UA_Server *server, UA_Session *session, const UA_CallRequest *
         }
         else
             rs->statusCode = UA_STATUSCODE_BADNOTWRITABLE; // There is no NOTEXECUTABLE?
-
-        UA_NodeStore_release((const UA_Node*)outputArguments);
+            
+        /* FIXME: Verify Output Argument count, types and sizes */
+        if(outputArguments) {
+            UA_NodeStore_release((const UA_Node*)outputArguments);
+        }
         UA_NodeStore_release((const UA_Node *)withObject);
         UA_NodeStore_release((const UA_Node *)methodCalled);
     }

+ 1 - 1
src/server/ua_services_nodemanagement.c

@@ -96,7 +96,7 @@ static UA_StatusCode parseObjectNode(UA_ExtensionObject *attributes, UA_Node **n
     // now copy all the attributes. This potentially removes them from the decoded attributes.
     COPY_STANDARDATTRIBUTES;
     if(attr.specifiedAttributes & UA_NODEATTRIBUTESMASK_EVENTNOTIFIER)
-        vnode->eventNotifier = attr.eventNotifier;
+      vnode->eventNotifier = attr.eventNotifier;
     UA_ObjectAttributes_deleteMembers(&attr);
     *new_node = (UA_Node*) vnode;
     return UA_STATUSCODE_GOOD;

+ 2 - 2
src/server/ua_services_session.c

@@ -30,7 +30,7 @@ void Service_CreateSession(UA_Server *server, UA_SecureChannel *channel,
         response->responseHeader.serviceResult |=
             UA_ByteString_copy(&server->endpointDescriptions->serverCertificate, &response->serverCertificate);
     if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
-        UA_SessionManager_removeSession(&server->sessionManager, &newSession->authenticationToken);
+        UA_SessionManager_removeSession(&server->sessionManager, server, &newSession->authenticationToken);
          return;
     }
 }
@@ -119,5 +119,5 @@ void Service_CloseSession(UA_Server *server, UA_Session *session, const UA_Close
 		response->responseHeader.serviceResult = UA_STATUSCODE_BADSESSIONIDINVALID;
 	else 
         response->responseHeader.serviceResult =
-            UA_SessionManager_removeSession(&server->sessionManager, &session->authenticationToken);
+            UA_SessionManager_removeSession(&server->sessionManager, server, &session->authenticationToken);
 }

+ 258 - 0
src/server/ua_services_subscription.c

@@ -0,0 +1,258 @@
+#include "ua_services.h"
+#include "ua_server_internal.h"
+#include "ua_subscription_manager.h"
+#include "ua_subscription.h"
+#include "ua_statuscodes.h"
+#include "ua_util.h"
+#include "ua_nodestore.h"
+
+#define UA_BOUNDEDVALUE_SETWBOUNDS(BOUNDS, SRC, DST) { \
+    if(SRC > BOUNDS.maxValue) DST = BOUNDS.maxValue; \
+    else if(SRC < BOUNDS.minValue) DST = BOUNDS.minValue; \
+    else DST = SRC; \
+    }
+
+void Service_CreateSubscription(UA_Server *server, UA_Session *session,
+                                const UA_CreateSubscriptionRequest *request,
+                                UA_CreateSubscriptionResponse *response) {
+    response->subscriptionId = SubscriptionManager_getUniqueUIntID(&session->subscriptionManager);
+    UA_Subscription *newSubscription = UA_Subscription_new(response->subscriptionId);
+    
+    UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalPublishingInterval,
+                               request->requestedPublishingInterval, response->revisedPublishingInterval);
+    newSubscription->PublishingInterval = response->revisedPublishingInterval;
+    
+    UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalLifeTimeCount,
+                               request->requestedLifetimeCount, response->revisedLifetimeCount);
+    newSubscription->LifeTime = (UA_UInt32_BoundedValue)  {
+        .minValue=session->subscriptionManager.GlobalLifeTimeCount.minValue,
+        .maxValue=session->subscriptionManager.GlobalLifeTimeCount.maxValue,
+        .currentValue=response->revisedLifetimeCount};
+    
+    UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalKeepAliveCount,
+                               request->requestedMaxKeepAliveCount, response->revisedMaxKeepAliveCount);
+    newSubscription->KeepAliveCount = (UA_Int32_BoundedValue)  {
+        .minValue=session->subscriptionManager.GlobalKeepAliveCount.minValue,
+        .maxValue=session->subscriptionManager.GlobalKeepAliveCount.maxValue,
+        .currentValue=response->revisedMaxKeepAliveCount};
+    
+    newSubscription->NotificationsPerPublish = request->maxNotificationsPerPublish;
+    newSubscription->PublishingMode          = request->publishingEnabled;
+    newSubscription->Priority                = request->priority;
+    
+    UA_Guid jobId = SubscriptionManager_getUniqueGUID(&session->subscriptionManager);
+    Subscription_createdUpdateJob(server, jobId, newSubscription);
+    Subscription_registerUpdateJob(server, newSubscription);
+    SubscriptionManager_addSubscription(&session->subscriptionManager, newSubscription);    
+}
+
+static void createMonitoredItems(UA_Server *server, UA_Session *session,
+                                 UA_Subscription *sub, const UA_MonitoredItemCreateRequest *request,
+                                 UA_MonitoredItemCreateResult *result) {
+    const UA_Node *target = UA_NodeStore_get(server->nodestore, &request->itemToMonitor.nodeId);
+    if(!target) {
+        result->statusCode = UA_STATUSCODE_BADNODEIDINVALID;
+        result->monitoredItemId = 0;
+        result->revisedSamplingInterval = 0;
+        result->revisedQueueSize = 0;
+        return;
+    }
+
+    UA_MonitoredItem *newMon = UA_MonitoredItem_new();
+    UA_NodeId_copy(&target->nodeId, &newMon->monitoredNodeId);
+    newMon->ItemId = ++(session->subscriptionManager.LastSessionID);
+    result->monitoredItemId = newMon->ItemId;
+    newMon->ClientHandle = request->requestedParameters.clientHandle;
+    UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalSamplingInterval,
+                               request->requestedParameters.samplingInterval,
+                               result->revisedSamplingInterval);
+    newMon->SamplingInterval = result->revisedSamplingInterval;
+    UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalQueueSize,
+                               request->requestedParameters.queueSize,
+                               result->revisedQueueSize);
+    newMon->QueueSize = (UA_UInt32_BoundedValue) {
+        .maxValue=(result->revisedQueueSize) + 1,
+        .minValue=0, .currentValue=0 };
+    newMon->AttributeID = request->itemToMonitor.attributeId;
+    newMon->MonitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
+    newMon->DiscardOldest = request->requestedParameters.discardOldest;
+    LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
+    UA_NodeStore_release(target);
+}
+
+void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
+                                  const UA_CreateMonitoredItemsRequest *request,
+                                  UA_CreateMonitoredItemsResponse *response) {
+    UA_Subscription  *sub = SubscriptionManager_getSubscriptionByID(&session->subscriptionManager,
+                                                                    request->subscriptionId);
+    if(!sub) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+        return;
+    }
+    
+    if(request->itemsToCreateSize <= 0) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
+        return;
+    }
+
+    response->results = UA_Array_new(&UA_TYPES[UA_TYPES_MONITOREDITEMCREATERESULT], request->itemsToCreateSize);
+    if(!response->results) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
+        return;
+    }
+    response->resultsSize = request->itemsToCreateSize;
+
+    for(UA_Int32 i = 0; i<request->itemsToCreateSize; i++)
+        createMonitoredItems(server, session, sub, &request->itemsToCreate[i], &response->results[i]);
+}
+
+static void publish(UA_Server *server, UA_Session *session, UA_Subscription *sub,
+                    const UA_PublishRequest *request, UA_PublishResponse *response) {
+
+}
+
+void Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest *request,
+                     UA_PublishResponse *response) {
+    UA_SubscriptionManager *manager= &session->subscriptionManager;
+    if(!manager)
+        return;
+    
+    // Delete Acknowledged Subscription Messages
+    response->resultsSize = request->subscriptionAcknowledgementsSize;
+    response->results     = UA_malloc(sizeof(UA_StatusCode)*(response->resultsSize));
+    for(UA_Int32 i = 0; i < request->subscriptionAcknowledgementsSize; i++) {
+        response->results[i] = UA_STATUSCODE_GOOD;
+        UA_Subscription *sub =
+            SubscriptionManager_getSubscriptionByID(&session->subscriptionManager,
+                                                    request->subscriptionAcknowledgements[i].subscriptionId);
+        if(!sub) {
+            response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+            continue;
+        }
+        if(Subscription_deleteUnpublishedNotification(request->subscriptionAcknowledgements[i].sequenceNumber,
+                                                      sub) == 0)
+            response->results[i] = UA_STATUSCODE_BADSEQUENCENUMBERINVALID;
+    }
+    
+    // See if any new data is available
+    UA_Subscription *sub;
+    LIST_FOREACH(sub, &manager->ServerSubscriptions, listEntry) {
+        if(sub->timedUpdateIsRegistered == UA_FALSE) {
+            // FIXME: We are forcing a value update for monitored items. This should be done by the event system.
+            // NOTE:  There is a clone of this functionality in the Subscription_timedUpdateNotificationsJob
+            UA_MonitoredItem *mon;
+            LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
+                MonitoredItem_QueuePushDataValue(server, mon);
+            
+            // FIXME: We are forcing notification updates for the subscription. This
+            // should be done by a timed work item.
+            Subscription_updateNotifications(sub);
+        }
+        
+        if(Subscription_queuedNotifications(sub) <= 0)
+            continue;
+        
+        response->subscriptionId = sub->SubscriptionID;
+        Subscription_copyTopNotificationMessage(&response->notificationMessage, sub);
+        if(sub->unpublishedNotifications.lh_first->notification->sequenceNumber > sub->SequenceNumber) {
+            // If this is a keepalive message, its seqNo is the next seqNo to be used for an actual msg.
+            response->availableSequenceNumbersSize = 0;
+            // .. and must be deleted
+            Subscription_deleteUnpublishedNotification(sub->SequenceNumber + 1, sub);
+        } else {
+            response->availableSequenceNumbersSize = Subscription_queuedNotifications(sub);
+            response->availableSequenceNumbers = Subscription_getAvailableSequenceNumbers(sub);
+        }	  
+        // FIXME: This should be in processMSG();
+        session->validTill = UA_DateTime_now() + session->timeout * 10000;
+        return;
+    }
+    
+    // FIXME: At this point, we would return nothing and "queue" the publish
+    // request, but currently we need to return something to the client. If no
+    // subscriptions have notifications, force one to generate a keepalive so we
+    // don't return an empty message
+    sub = LIST_FIRST(&manager->ServerSubscriptions);
+    if(!sub) {
+        response->subscriptionId = sub->SubscriptionID;
+        sub->KeepAliveCount.currentValue=sub->KeepAliveCount.minValue;
+        Subscription_generateKeepAlive(sub);
+        Subscription_copyTopNotificationMessage(&response->notificationMessage, sub);
+        Subscription_deleteUnpublishedNotification(sub->SequenceNumber + 1, sub);
+    }
+    
+    // FIXME: This should be in processMSG();
+    session->validTill = UA_DateTime_now() + session->timeout * 10000;
+}
+
+void Service_ModifySubscription(UA_Server *server, UA_Session *session,
+                                 const UA_ModifySubscriptionRequest *request,
+                                 UA_ModifySubscriptionResponse *response) {
+    UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(&session->subscriptionManager,
+                                                                   request->subscriptionId);
+    if(!sub) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+        return;
+    }
+    
+    UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalPublishingInterval,
+                               request->requestedPublishingInterval, response->revisedPublishingInterval);
+    sub->PublishingInterval = response->revisedPublishingInterval;
+    
+    UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalLifeTimeCount,
+                               request->requestedLifetimeCount, response->revisedLifetimeCount);
+    sub->LifeTime = (UA_UInt32_BoundedValue)  {
+        .minValue=session->subscriptionManager.GlobalLifeTimeCount.minValue,
+        .maxValue=session->subscriptionManager.GlobalLifeTimeCount.maxValue,
+        .currentValue=response->revisedLifetimeCount};
+        
+    UA_BOUNDEDVALUE_SETWBOUNDS(session->subscriptionManager.GlobalKeepAliveCount,
+                                request->requestedMaxKeepAliveCount, response->revisedMaxKeepAliveCount);
+    sub->KeepAliveCount = (UA_Int32_BoundedValue)  {
+        .minValue=session->subscriptionManager.GlobalKeepAliveCount.minValue,
+        .maxValue=session->subscriptionManager.GlobalKeepAliveCount.maxValue,
+        .currentValue=response->revisedMaxKeepAliveCount};
+        
+    sub->NotificationsPerPublish = request->maxNotificationsPerPublish;
+    sub->Priority                = request->priority;
+    
+    Subscription_registerUpdateJob(server, sub);
+    return;
+}
+
+void Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
+                                 const UA_DeleteSubscriptionsRequest *request,
+                                 UA_DeleteSubscriptionsResponse *response) {
+    response->results = UA_malloc(sizeof(UA_StatusCode) * request->subscriptionIdsSize);
+    if(!response->results) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
+        return;
+    }
+    response->resultsSize = request->subscriptionIdsSize;
+
+    for(UA_Int32 i = 0; i < request->subscriptionIdsSize; i++)
+        response->results[i] = SubscriptionManager_deleteSubscription(server, &session->subscriptionManager,
+                                                                      request->subscriptionIds[i]);
+} 
+
+void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
+                                  const UA_DeleteMonitoredItemsRequest *request,
+                                  UA_DeleteMonitoredItemsResponse *response) {
+    UA_SubscriptionManager *manager = &session->subscriptionManager;
+    UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(manager, request->subscriptionId);
+    if(!sub) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+        return;
+    }
+    
+    response->results = UA_malloc(sizeof(UA_StatusCode) * request->monitoredItemIdsSize);
+    if(!response->results) {
+        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
+        return;
+    }
+    response->resultsSize = request->monitoredItemIdsSize;
+
+    for(UA_Int32 i = 0; i < request->monitoredItemIdsSize; i++)
+        response->results[i] = SubscriptionManager_deleteMonitoredItem(manager, sub->SubscriptionID,
+                                                                       request->monitoredItemIds[i]);
+}

+ 6 - 6
src/server/ua_session_manager.c

@@ -12,22 +12,22 @@ UA_StatusCode UA_SessionManager_init(UA_SessionManager *sessionManager, UA_UInt3
     return UA_STATUSCODE_GOOD;
 }
 
-void UA_SessionManager_deleteMembers(UA_SessionManager *sessionManager) {
+void UA_SessionManager_deleteMembers(UA_SessionManager *sessionManager, UA_Server *server) {
     session_list_entry *current;
     while((current = LIST_FIRST(&sessionManager->sessions))) {
         LIST_REMOVE(current, pointers);
-        UA_Session_deleteMembersCleanup(&current->session);
+        UA_Session_deleteMembersCleanup(&current->session, server);
         UA_free(current);
     }
 }
 
-void UA_SessionManager_cleanupTimedOut(UA_SessionManager *sessionManager, UA_DateTime now) {
+void UA_SessionManager_cleanupTimedOut(UA_SessionManager *sessionManager, UA_Server* server, UA_DateTime now) {
     session_list_entry *sentry = LIST_FIRST(&sessionManager->sessions);
     while(sentry) {
         if(sentry->session.validTill < now) {
             session_list_entry *next = LIST_NEXT(sentry, pointers);
             LIST_REMOVE(sentry, pointers);
-            UA_Session_deleteMembersCleanup(&sentry->session);
+            UA_Session_deleteMembersCleanup(&sentry->session, server);
             UA_free(sentry);
             sessionManager->currentSessionCount--;
             sentry = next;
@@ -74,7 +74,7 @@ UA_StatusCode UA_SessionManager_createSession(UA_SessionManager *sessionManager,
     return UA_STATUSCODE_GOOD;
 }
 
-UA_StatusCode UA_SessionManager_removeSession(UA_SessionManager *sessionManager, const UA_NodeId *token) {
+UA_StatusCode UA_SessionManager_removeSession(UA_SessionManager *sessionManager, UA_Server* server, const UA_NodeId *token) {
     session_list_entry *current;
     LIST_FOREACH(current, &sessionManager->sessions, pointers) {
         if(UA_NodeId_equal(&current->session.authenticationToken, token))
@@ -83,7 +83,7 @@ UA_StatusCode UA_SessionManager_removeSession(UA_SessionManager *sessionManager,
     if(!current)
         return UA_STATUSCODE_BADSESSIONIDINVALID;
     LIST_REMOVE(current, pointers);
-    UA_Session_deleteMembersCleanup(&current->session);
+    UA_Session_deleteMembersCleanup(&current->session, server);
     UA_free(current);
     sessionManager->currentSessionCount--;
     return UA_STATUSCODE_GOOD;

+ 3 - 3
src/server/ua_session_manager.h

@@ -22,15 +22,15 @@ typedef struct UA_SessionManager {
 UA_StatusCode UA_SessionManager_init(UA_SessionManager *sessionManager, UA_UInt32 maxSessionCount,
                                     UA_UInt32 maxSessionLifeTime, UA_UInt32 startSessionId);
 
-void UA_SessionManager_deleteMembers(UA_SessionManager *sessionManager);
+void UA_SessionManager_deleteMembers(UA_SessionManager *sessionManager, UA_Server *server);
 
-void UA_SessionManager_cleanupTimedOut(UA_SessionManager *sessionManager, UA_DateTime now);
+void UA_SessionManager_cleanupTimedOut(UA_SessionManager *sessionManager, UA_Server *server, UA_DateTime now);
 
 UA_StatusCode UA_SessionManager_createSession(UA_SessionManager *sessionManager,
                                               UA_SecureChannel *channel, const UA_CreateSessionRequest *request,
                                               UA_Session **session);
 
-UA_StatusCode UA_SessionManager_removeSession(UA_SessionManager *sessionManager, const UA_NodeId *token);
+UA_StatusCode UA_SessionManager_removeSession(UA_SessionManager *sessionManager, UA_Server *server, const UA_NodeId *token);
 
 /** Finds the session which is identified by the authentication token */
 UA_Session * UA_SessionManager_getSession(UA_SessionManager *sessionManager, const UA_NodeId *token);

+ 557 - 0
src/server/ua_subscription.c

@@ -0,0 +1,557 @@
+#include "ua_subscription.h"
+#include "ua_server_internal.h"
+#include "ua_nodestore.h"
+
+/****************/
+/* Subscription */
+/****************/
+
+UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID) {
+    UA_Subscription *new = UA_malloc(sizeof(UA_Subscription));
+    if(!new)
+        return UA_NULL;
+    new->SubscriptionID = SubscriptionID;
+    new->LastPublished  = 0;
+    new->SequenceNumber = 0;
+    memset(&new->timedUpdateJobGuid, 0, sizeof(UA_Guid));
+    new->timedUpdateJob          = UA_NULL;
+    new->timedUpdateIsRegistered = UA_FALSE;
+    LIST_INIT(&new->MonitoredItems);
+    LIST_INIT(&new->unpublishedNotifications);
+    return new;
+}
+
+void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server) {
+    UA_unpublishedNotification *not;
+    UA_MonitoredItem *mon;
+    
+    // Just in case any parallel process attempts to access this subscription
+    // while we are deleting it... make it vanish.
+    subscription->SubscriptionID = 0;
+    
+    // Delete monitored Items
+    while((mon = LIST_FIRST(&subscription->MonitoredItems))) {
+        LIST_REMOVE(mon, listEntry);
+        MonitoredItem_delete(mon);
+    }
+    
+    // Delete unpublished Notifications
+    while((not = LIST_FIRST(&subscription->unpublishedNotifications))) {
+        LIST_REMOVE(not, listEntry);
+        Subscription_deleteUnpublishedNotification(not->notification->sequenceNumber, subscription);
+    }
+    
+    // Unhook/Unregister any timed work assiociated with this subscription
+    if(subscription->timedUpdateJob != UA_NULL){
+        Subscription_unregisterUpdateJob(server, subscription);
+        UA_free(subscription->timedUpdateJob);
+    }
+    
+    return;
+}
+
+UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription) {
+    if(!subscription)
+        return 0;
+
+    UA_UInt32 j = 0;
+    UA_unpublishedNotification *i;
+    LIST_FOREACH(i, &subscription->unpublishedNotifications, listEntry)
+        j++;
+    return j;
+}
+
+void Subscription_generateKeepAlive(UA_Subscription *subscription) {
+    if(subscription->KeepAliveCount.currentValue > subscription->KeepAliveCount.minValue &&
+       subscription->KeepAliveCount.currentValue <= subscription->KeepAliveCount.maxValue)
+        return;
+
+    UA_unpublishedNotification *msg = UA_malloc(sizeof(UA_unpublishedNotification));
+    if(!msg)
+        return;
+    msg->notification = UA_NULL;
+    msg->notification = UA_malloc(sizeof(UA_NotificationMessage));
+    msg->notification->notificationData = UA_NULL;
+    // KeepAlive uses next message, but does not increment counter
+    msg->notification->sequenceNumber = subscription->SequenceNumber + 1;
+    msg->notification->publishTime    = UA_DateTime_now();
+    msg->notification->notificationDataSize = 0;
+    LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
+    subscription->KeepAliveCount.currentValue = subscription->KeepAliveCount.maxValue;
+}
+
+void Subscription_updateNotifications(UA_Subscription *subscription) {
+    UA_MonitoredItem *mon;
+    //MonitoredItem_queuedValue *queuedValue;
+    UA_unpublishedNotification *msg = NULL;
+    UA_UInt32 monItemsChangeT = 0, monItemsStatusT = 0, monItemsEventT = 0;
+    UA_DataChangeNotification *changeNotification;
+    size_t notificationOffset;
+    
+    if(!subscription || subscription->LastPublished + subscription->PublishingInterval > UA_DateTime_now())
+        return;
+    
+    // Make sure there is data to be published and establish which message types
+    // will need to be generated
+    LIST_FOREACH(mon, &subscription->MonitoredItems, listEntry) {
+        // Check if this MonitoredItems Queue holds data and how much data is held in total
+        if(!TAILQ_FIRST(&mon->queue))
+            continue;
+        if((mon->MonitoredItemType & MONITOREDITEM_TYPE_CHANGENOTIFY) != 0)
+            monItemsChangeT+=mon->QueueSize.currentValue;
+	    else if((mon->MonitoredItemType & MONITOREDITEM_TYPE_STATUSNOTIFY) != 0)
+            monItemsStatusT+=mon->QueueSize.currentValue;
+	    else if((mon->MonitoredItemType & MONITOREDITEM_TYPE_EVENTNOTIFY)  != 0)
+            monItemsEventT+=mon->QueueSize.currentValue;
+    }
+    
+    // FIXME: This is hardcoded to 100 because it is not covered by the spec but we need to protect the server!
+    if(Subscription_queuedNotifications(subscription) >= 10) {
+        // Remove last entry
+        LIST_FOREACH(msg, &subscription->unpublishedNotifications, listEntry)
+            LIST_REMOVE(msg, listEntry);
+        UA_free(msg);
+    }
+    
+    if(monItemsChangeT == 0 && monItemsEventT == 0 && monItemsStatusT == 0) {
+        // Decrement KeepAlive
+        subscription->KeepAliveCount.currentValue--;
+        // +- Generate KeepAlive msg if counter overruns
+        Subscription_generateKeepAlive(subscription);
+        return;
+    }
+    
+    msg = (UA_unpublishedNotification *) UA_malloc(sizeof(UA_unpublishedNotification));
+    msg->notification = UA_malloc(sizeof(UA_NotificationMessage));
+    INITPOINTER(msg->notification->notificationData);
+    msg->notification->sequenceNumber = subscription->SequenceNumber++;
+    msg->notification->publishTime    = UA_DateTime_now();
+    
+    // NotificationData is an array of Change, Status and Event messages, each containing the appropriate
+    // list of Queued values from all monitoredItems of that type
+    msg->notification->notificationDataSize = ISNOTZERO(monItemsChangeT);
+    // + ISNOTZERO(monItemsEventT) + ISNOTZERO(monItemsStatusT);
+    msg->notification->notificationData = UA_Array_new(&UA_TYPES[UA_TYPES_EXTENSIONOBJECT], 
+                                                       msg->notification->notificationDataSize);
+    
+    for(int notmsgn=0; notmsgn < msg->notification->notificationDataSize; notmsgn++) {
+        // Set the notification message type and encoding for each of 
+        //   the three possible NotificationData Types
+        msg->notification->notificationData[notmsgn].encoding = 1; // Encoding is always binary
+        msg->notification->notificationData[notmsgn].typeId = UA_NODEID_NUMERIC(0, 811);
+      
+        if(notmsgn == 0) {
+            // Construct a DataChangeNotification
+            changeNotification = UA_malloc(sizeof(UA_DataChangeNotification));
+	
+            // Create one DataChangeNotification for each queue item held in each monitoredItems queue:
+            changeNotification->monitoredItems = UA_Array_new(&UA_TYPES[UA_TYPES_MONITOREDITEMNOTIFICATION],
+                                                              monItemsChangeT);
+	
+            // Scan all monitoredItems in this subscription and have their queue transformed into an Array of
+            // the propper NotificationMessageType (Status, Change, Event)
+            monItemsChangeT = 0;
+            LIST_FOREACH(mon, &subscription->MonitoredItems, listEntry) {
+                if(mon->MonitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY || !TAILQ_FIRST(&mon->queue))
+                    continue;
+                // Note: Monitored Items might not return a queuedValue if there is a problem encoding it.
+                monItemsChangeT += MonitoredItem_QueueToDataChangeNotifications(&changeNotification->monitoredItems[monItemsChangeT], mon);
+                MonitoredItem_ClearQueue(mon);
+            }
+
+            changeNotification->monitoredItemsSize  = monItemsChangeT;
+            changeNotification->diagnosticInfosSize = 0;
+            changeNotification->diagnosticInfos     = UA_NULL;
+        
+            msg->notification->notificationData[notmsgn].body.length =
+                UA_calcSizeBinary(changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]);
+            msg->notification->notificationData[notmsgn].body.data   =
+                UA_calloc(msg->notification->notificationData[notmsgn].body.length, sizeof(UA_Byte));
+        
+            notificationOffset = 0;
+            UA_encodeBinary(changeNotification, &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION],
+                            &msg->notification->notificationData[notmsgn].body, &notificationOffset);
+	
+            // FIXME: Not properly freed!
+            for(unsigned int i=0; i<monItemsChangeT; i++) {
+                UA_MonitoredItemNotification *thisNotification = &(changeNotification->monitoredItems[i]);
+                UA_DataValue_deleteMembers(&(thisNotification->value));
+            }
+            UA_free(changeNotification->monitoredItems);
+            UA_free(changeNotification);
+        } else if(notmsgn == 1) {
+            // FIXME: Constructing a StatusChangeNotification is not implemented
+        } else if(notmsgn == 2) {
+            // FIXME: Constructing a EventListNotification is not implemented
+        }
+    }
+    LIST_INSERT_HEAD(&subscription->unpublishedNotifications, msg, listEntry);
+}
+
+UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub) {
+    UA_UInt32 *seqArray = UA_malloc(sizeof(UA_UInt32) * Subscription_queuedNotifications(sub));
+    if(!seqArray)
+        return UA_NULL;
+  
+    int i = 0;
+    UA_unpublishedNotification *not;
+    LIST_FOREACH(not, &sub->unpublishedNotifications, listEntry) {
+        seqArray[i] = not->notification->sequenceNumber;
+        i++;
+    }
+    return seqArray;
+}
+
+void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub) {
+    UA_NotificationMessage *latest;
+    
+    if(!dst)
+        return;
+    
+    if(Subscription_queuedNotifications(sub) == 0) {
+      dst->notificationDataSize = 0;
+      dst->publishTime = UA_DateTime_now();
+      dst->sequenceNumber = 0;
+      return;
+    }
+    
+    latest = LIST_FIRST(&sub->unpublishedNotifications)->notification;
+    dst->notificationDataSize = latest->notificationDataSize;
+    dst->publishTime = latest->publishTime;
+    dst->sequenceNumber = latest->sequenceNumber;
+    
+    if(latest->notificationDataSize == 0) return;
+    
+    dst->notificationData = (UA_ExtensionObject *) UA_malloc(sizeof(UA_ExtensionObject));
+    dst->notificationData->encoding = latest->notificationData->encoding;
+    dst->notificationData->typeId   = latest->notificationData->typeId;
+    UA_ByteString_copy(&latest->notificationData->body,
+                       &dst->notificationData->body);
+}
+
+UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub) {
+    UA_UInt32 deletedItems = 0;
+    UA_unpublishedNotification *not, *tmp;
+    LIST_FOREACH_SAFE(not, &sub->unpublishedNotifications, listEntry, tmp) {
+        if(not->notification->sequenceNumber != seqNo)
+            continue;
+        LIST_REMOVE(not, listEntry);
+        if(not->notification) {
+            if(not->notification->notificationData) {
+                if(not->notification->notificationData->body.data)
+                    UA_free(not->notification->notificationData->body.data);
+                UA_free(not->notification->notificationData);
+            }
+            UA_free(not->notification);
+        }
+        UA_free(not);
+        deletedItems++;
+    }
+    return deletedItems;
+}
+
+
+static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *data) {
+    // Timed-Worker/Job Version of updateNotifications
+    UA_Subscription *sub = (UA_Subscription *) data;
+    UA_MonitoredItem *mon;
+    
+    if(!data || !server)
+        return;
+    
+    // This is set by the Subscription_delete function to detere us from fiddling with
+    // this subscription if it is being deleted (not technically thread save, but better
+    // then nothing at all)
+    if(sub->SubscriptionID == 0)
+        return;
+    
+    // FIXME: This should be done by the event system
+    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
+        MonitoredItem_QueuePushDataValue(server, mon);
+    
+    Subscription_updateNotifications(sub);
+    return;
+}
+
+
+UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub) {
+    if(server == UA_NULL || sub == UA_NULL)
+        return UA_STATUSCODE_BADSERVERINDEXINVALID;
+        
+    UA_Job *theWork;
+    theWork = (UA_Job *) malloc(sizeof(UA_Job));
+    if(!theWork)
+        return UA_STATUSCODE_BADOUTOFMEMORY;
+    
+   *theWork = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
+                        .job.methodCall = {.method = Subscription_timedUpdateNotificationsJob, .data = sub} };
+   
+   sub->timedUpdateJobGuid = jobId;
+   sub->timedUpdateJob     = theWork;
+   
+   return UA_STATUSCODE_GOOD;
+}
+
+UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub) {
+    if(server == UA_NULL || sub == UA_NULL)
+        return UA_STATUSCODE_BADSERVERINDEXINVALID;
+    
+    if(sub->PublishingInterval <= 5 ) 
+        return UA_STATUSCODE_BADNOTSUPPORTED;
+    
+    // Practically enough, the client sends a uint32 in ms, which we store as datetime, which here is required in as uint32 in ms as the interval
+#ifdef _MSC_VER
+    UA_Int32 retval = UA_Server_addRepeatedJob(server, *(sub->timedUpdateJob), sub->PublishingInterval,
+                                               &sub->timedUpdateJobGuid);
+#else
+    UA_StatusCode retval = UA_Server_addRepeatedJob(server, *sub->timedUpdateJob, sub->PublishingInterval,
+                                                    &sub->timedUpdateJobGuid);
+#endif
+    if(!retval)
+        sub->timedUpdateIsRegistered = UA_TRUE;
+    return retval;
+}
+
+UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub) {
+    if(server == UA_NULL || sub == UA_NULL)
+        return UA_STATUSCODE_BADSERVERINDEXINVALID;
+    UA_Int32 retval = UA_Server_removeRepeatedJob(server, sub->timedUpdateJobGuid);
+    sub->timedUpdateIsRegistered = UA_FALSE;
+    return retval;
+}
+
+/*****************/
+/* MonitoredItem */
+/*****************/
+
+UA_MonitoredItem *UA_MonitoredItem_new() {
+    UA_MonitoredItem *new = (UA_MonitoredItem *) UA_malloc(sizeof(UA_MonitoredItem));
+    new->QueueSize   = (UA_UInt32_BoundedValue) { .minValue = 0, .maxValue = 0, .currentValue = 0};
+    new->LastSampled = 0;
+    // FIXME: This is currently hardcoded;
+    new->MonitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
+    TAILQ_INIT(&new->queue);
+    UA_NodeId_init(&new->monitoredNodeId);
+    INITPOINTER(new->LastSampledValue.data );
+    return new;
+}
+
+void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
+    // Delete Queued Data
+    MonitoredItem_ClearQueue(monitoredItem);
+    // Remove from subscription list
+    LIST_REMOVE(monitoredItem, listEntry);
+    // Release comparison sample
+    if(monitoredItem->LastSampledValue.data != NULL) { 
+      UA_free(monitoredItem->LastSampledValue.data);
+    }
+    
+    UA_NodeId_deleteMembers(&(monitoredItem->monitoredNodeId));
+    UA_free(monitoredItem);
+}
+
+int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
+                                                 UA_MonitoredItem *monitoredItem) {
+    int queueSize = 0;
+    MonitoredItem_queuedValue *queueItem;
+  
+    // Count instead of relying on the items currentValue
+    TAILQ_FOREACH(queueItem, &monitoredItem->queue, listEntry) {
+        dst[queueSize].clientHandle = monitoredItem->ClientHandle;
+        dst[queueSize].value.hasServerPicoseconds = UA_FALSE;
+        dst[queueSize].value.hasServerTimestamp   = UA_FALSE;
+        dst[queueSize].value.serverTimestamp      = UA_FALSE;
+        dst[queueSize].value.hasSourcePicoseconds = UA_FALSE;
+        dst[queueSize].value.hasSourceTimestamp   = UA_FALSE;
+        dst[queueSize].value.hasValue             = UA_TRUE;
+        dst[queueSize].value.status = UA_STATUSCODE_GOOD;
+    
+        UA_Variant_copy(&(queueItem->value), &(dst[queueSize].value.value));
+    
+        // Do not create variants with no type -> will make calcSizeBinary() segfault.
+        if(dst[queueSize].value.value.type)
+            queueSize++;
+    }
+    return queueSize;
+}
+
+void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem) {
+    MonitoredItem_queuedValue *val;
+    while((val = TAILQ_FIRST(&monitoredItem->queue))) {
+        TAILQ_REMOVE(&monitoredItem->queue, val, listEntry);
+        UA_Variant_deleteMembers(&val->value);
+        UA_free(val);
+    }
+    monitoredItem->QueueSize.currentValue = 0;
+}
+
+UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 AttributeID, const UA_Node *src,
+                                                     UA_Variant *dst) {
+    UA_Boolean samplingError = UA_TRUE; 
+    UA_DataValue sourceDataValue;
+    UA_DataValue_init(&sourceDataValue);
+    const UA_VariableNode *srcAsVariableNode = (const UA_VariableNode *) src;
+  
+    // FIXME: Not all AttributeIDs can be monitored yet
+    switch(AttributeID) {
+    case UA_ATTRIBUTEID_NODEID:
+        UA_Variant_setScalarCopy(dst, (const UA_NodeId *) &(src->nodeId), &UA_TYPES[UA_TYPES_NODEID]);
+        samplingError = UA_FALSE;
+        break;
+    case UA_ATTRIBUTEID_NODECLASS:
+        UA_Variant_setScalarCopy(dst, (const UA_Int32 *) &(src->nodeClass), &UA_TYPES[UA_TYPES_INT32]);
+        samplingError = UA_FALSE;
+        break;
+    case UA_ATTRIBUTEID_BROWSENAME:
+        UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->browseName), &UA_TYPES[UA_TYPES_QUALIFIEDNAME]);
+        samplingError = UA_FALSE;
+        break;
+    case UA_ATTRIBUTEID_DISPLAYNAME:
+        UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->displayName), &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
+        samplingError = UA_FALSE;
+        break;
+    case UA_ATTRIBUTEID_DESCRIPTION:
+        UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->displayName), &UA_TYPES[UA_TYPES_LOCALIZEDTEXT]);
+        samplingError = UA_FALSE;
+        break;
+    case UA_ATTRIBUTEID_WRITEMASK:
+        UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->writeMask), &UA_TYPES[UA_TYPES_UINT32]);
+        samplingError = UA_FALSE;
+        break;
+    case UA_ATTRIBUTEID_USERWRITEMASK:
+        UA_Variant_setScalarCopy(dst, (const UA_String *) &(src->writeMask), &UA_TYPES[UA_TYPES_UINT32]);
+        samplingError = UA_FALSE;
+        break;
+    case UA_ATTRIBUTEID_ISABSTRACT:
+        break;
+    case UA_ATTRIBUTEID_SYMMETRIC:
+        break;
+    case UA_ATTRIBUTEID_INVERSENAME:
+        break;
+    case UA_ATTRIBUTEID_CONTAINSNOLOOPS:
+        break;
+    case UA_ATTRIBUTEID_EVENTNOTIFIER:
+        break;
+    case UA_ATTRIBUTEID_VALUE: 
+        if(src->nodeClass == UA_NODECLASS_VARIABLE) {
+            const UA_VariableNode *vsrc = (const UA_VariableNode*)src;
+            if(srcAsVariableNode->valueSource == UA_VALUESOURCE_VARIANT) {
+                UA_Variant_copy(&vsrc->value.variant, dst);
+                samplingError = UA_FALSE;
+            } else if(srcAsVariableNode->valueSource == UA_VALUESOURCE_DATASOURCE) {
+                // todo: handle numeric ranges
+                if(srcAsVariableNode->value.dataSource.read(vsrc->value.dataSource.handle, UA_TRUE, UA_NULL,
+                                                            &sourceDataValue) == UA_STATUSCODE_GOOD) {
+                    UA_Variant_copy(&sourceDataValue.value, dst);
+                    if(sourceDataValue.value.data != NULL) {
+                        UA_deleteMembers(sourceDataValue.value.data, sourceDataValue.value.type);
+                        UA_free(sourceDataValue.value.data);
+                        sourceDataValue.value.data = NULL;
+                    }
+                    UA_DataValue_deleteMembers(&sourceDataValue);
+                    samplingError = UA_FALSE;
+                }
+            }
+        }
+        break;
+    case UA_ATTRIBUTEID_DATATYPE:
+        break;
+    case UA_ATTRIBUTEID_VALUERANK:
+        break;
+    case UA_ATTRIBUTEID_ARRAYDIMENSIONS:
+        break;
+    case UA_ATTRIBUTEID_ACCESSLEVEL:
+        break;
+    case UA_ATTRIBUTEID_USERACCESSLEVEL:
+        break;
+    case UA_ATTRIBUTEID_MINIMUMSAMPLINGINTERVAL:
+        break;
+    case UA_ATTRIBUTEID_HISTORIZING:
+        break;
+    case UA_ATTRIBUTEID_EXECUTABLE:
+        break;
+    case UA_ATTRIBUTEID_USEREXECUTABLE:
+        break;
+    default:
+        break;
+    }
+  
+    return samplingError;
+}
+
+void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+    MonitoredItem_queuedValue *newvalue = NULL, *queueItem = NULL;
+    UA_Boolean samplingError = UA_TRUE; 
+    UA_ByteString newValueAsByteString = { .length=0, .data=NULL };
+    size_t encodingOffset = 0;
+  
+    if(!monitoredItem || monitoredItem->LastSampled + monitoredItem->SamplingInterval > UA_DateTime_now())
+        return;
+  
+    // FIXME: Actively suppress non change value based monitoring. There should be
+    // another function to handle status and events.
+    if(monitoredItem->MonitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY)
+        return;
+
+    newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
+    if(!newvalue)
+        return;
+
+    newvalue->listEntry.tqe_next = UA_NULL;
+    newvalue->listEntry.tqe_prev = UA_NULL;
+    UA_Variant_init(&newvalue->value);
+
+    // Verify that the *Node being monitored is still valid
+    // Looking up the in the nodestore is only necessary if we suspect that it is changed during writes
+    // e.g. in multithreaded applications
+    const UA_Node *target = UA_NodeStore_get(server->nodestore, &monitoredItem->monitoredNodeId);
+    if(!target) {
+        UA_free(newvalue);
+        return;
+    }
+  
+    samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->AttributeID, target,
+                                                              &newvalue->value);
+    UA_NodeStore_release(target);
+    if(samplingError != UA_FALSE || !newvalue->value.type) {
+        UA_Variant_deleteMembers(&newvalue->value);
+        UA_free(newvalue);
+        return;
+    }
+  
+    if(monitoredItem->QueueSize.currentValue >= monitoredItem->QueueSize.maxValue) {
+        if(monitoredItem->DiscardOldest != UA_TRUE) {
+            // We cannot remove the oldest value and theres no queue space left. We're done here.
+            UA_free(newvalue);
+            return;
+        }
+        queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
+        TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
+        UA_free(queueItem);
+        monitoredItem->QueueSize.currentValue--;
+    }
+  
+    // encode the data to find if its different to the previous
+    newValueAsByteString.length = UA_calcSizeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_VARIANT]);
+    newValueAsByteString.data   = UA_malloc(newValueAsByteString.length);
+    UA_encodeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_VARIANT], &newValueAsByteString, &encodingOffset);
+  
+    if(!monitoredItem->LastSampledValue.data) { 
+        monitoredItem->LastSampledValue = newValueAsByteString;
+        TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
+        monitoredItem->QueueSize.currentValue++;
+        monitoredItem->LastSampled = UA_DateTime_now();
+    } else {
+        if(UA_String_equal(&newValueAsByteString, &monitoredItem->LastSampledValue) == UA_TRUE) {
+            UA_Variant_deleteMembers(&newvalue->value);
+            UA_free(newvalue);
+            UA_String_deleteMembers(&newValueAsByteString);
+            return;
+        }
+        UA_ByteString_deleteMembers(&monitoredItem->LastSampledValue);
+        monitoredItem->LastSampledValue = newValueAsByteString;
+        TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
+        monitoredItem->QueueSize.currentValue++;
+        monitoredItem->LastSampled = UA_DateTime_now();
+    }
+}

+ 108 - 0
src/server/ua_subscription.h

@@ -0,0 +1,108 @@
+#ifndef UA_SUBSCRIPTION_H_
+#define UA_SUBSCRIPTION_H_
+
+#include "ua_util.h"
+#include "ua_types.h"
+#include "ua_types_generated.h"
+#include "ua_nodes.h"
+
+#define INITPOINTER(src) (src) = NULL;
+#define ISNOTZERO(value) ((value == 0) ? 0 : 1)
+
+/*****************/
+/* MonitoredItem */
+/*****************/
+
+typedef struct {
+    UA_Int32 currentValue;
+    UA_Int32 minValue;
+    UA_Int32 maxValue;
+} UA_Int32_BoundedValue;
+
+typedef struct {
+    UA_UInt32 currentValue;
+    UA_UInt32 minValue;
+    UA_UInt32 maxValue;
+} UA_UInt32_BoundedValue;
+
+typedef enum {
+    MONITOREDITEM_TYPE_CHANGENOTIFY = 1,
+    MONITOREDITEM_TYPE_STATUSNOTIFY = 2,
+    MONITOREDITEM_TYPE_EVENTNOTIFY = 4
+} MONITOREDITEM_TYPE;
+
+typedef struct MonitoredItem_queuedValue {
+    UA_Variant value;
+    TAILQ_ENTRY(MonitoredItem_queuedValue) listEntry;
+} MonitoredItem_queuedValue;
+
+typedef struct UA_MonitoredItem_s {
+    UA_UInt32                       ItemId;
+    MONITOREDITEM_TYPE		    MonitoredItemType;
+    UA_UInt32                       TimestampsToReturn;
+    UA_UInt32                       MonitoringMode;
+    UA_NodeId                       monitoredNodeId; 
+    UA_UInt32                       AttributeID;
+    UA_UInt32                       ClientHandle;
+    UA_UInt32                       SamplingInterval;
+    UA_UInt32_BoundedValue          QueueSize;
+    UA_Boolean                      DiscardOldest;
+    UA_DateTime                     LastSampled;
+    UA_ByteString                   LastSampledValue;
+    // FIXME: indexRange is ignored; array values default to element 0
+    // FIXME: dataEncoding is hardcoded to UA binary
+    LIST_ENTRY(UA_MonitoredItem_s)  listEntry;
+    TAILQ_HEAD(QueueOfQueueDataValues, MonitoredItem_queuedValue) queue;
+} UA_MonitoredItem;
+
+UA_MonitoredItem *UA_MonitoredItem_new(void);
+void MonitoredItem_delete(UA_MonitoredItem *monitoredItem);
+void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem);
+void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem);
+UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 AttributeID, const UA_Node *src,
+                                                     UA_Variant *dst);
+int MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
+                                                 UA_MonitoredItem *monitoredItem);
+
+/****************/
+/* Subscription */
+/****************/
+
+typedef struct UA_unpublishedNotification_s {
+    UA_NotificationMessage 		     *notification;
+    LIST_ENTRY(UA_unpublishedNotification_s) listEntry;
+} UA_unpublishedNotification;
+
+typedef struct UA_Subscription_s {
+    UA_UInt32_BoundedValue              LifeTime;
+    UA_Int32_BoundedValue               KeepAliveCount;
+    UA_DateTime                         PublishingInterval;
+    UA_DateTime                         LastPublished;
+    UA_Int32                            SubscriptionID;
+    UA_Int32                            NotificationsPerPublish;
+    UA_Boolean                          PublishingMode;
+    UA_UInt32                           Priority;
+    UA_UInt32                           SequenceNumber;
+    UA_Guid                             timedUpdateJobGuid;
+    UA_Job                              *timedUpdateJob;
+    UA_Boolean                          timedUpdateIsRegistered;
+    LIST_ENTRY(UA_Subscription_s)       listEntry;
+    LIST_HEAD(UA_ListOfUnpublishedNotifications, UA_unpublishedNotification_s) unpublishedNotifications;
+    LIST_HEAD(UA_ListOfUAMonitoredItems, UA_MonitoredItem_s) MonitoredItems;
+} UA_Subscription;
+
+UA_Subscription *UA_Subscription_new(UA_Int32 SubscriptionID);
+void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *server);
+void Subscription_updateNotifications(UA_Subscription *subscription);
+UA_UInt32 Subscription_queuedNotifications(UA_Subscription *subscription);
+UA_UInt32 *Subscription_getAvailableSequenceNumbers(UA_Subscription *sub);
+void Subscription_copyTopNotificationMessage(UA_NotificationMessage *dst, UA_Subscription *sub);
+UA_UInt32 Subscription_deleteUnpublishedNotification(UA_UInt32 seqNo, UA_Subscription *sub);
+void Subscription_generateKeepAlive(UA_Subscription *subscription);
+UA_StatusCode Subscription_createdUpdateJob(UA_Server *server, UA_Guid jobId, UA_Subscription *sub);
+UA_StatusCode Subscription_registerUpdateJob(UA_Server *server, UA_Subscription *sub);
+UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscription *sub);
+
+static void Subscription_timedUpdateNotificationsJob(UA_Server *server, void *data);
+
+#endif /* UA_SUBSCRIPTION_H_ */

+ 121 - 0
src/server/ua_subscription_manager.c

@@ -0,0 +1,121 @@
+#include "ua_types.h"
+#include "ua_server_internal.h"
+#include "ua_nodestore.h"
+#include "ua_subscription_manager.h"
+
+void SubscriptionManager_init(UA_Session *session) {
+    UA_SubscriptionManager *manager = &(session->subscriptionManager);
+
+    /* FIXME: These init values are empirical. Maybe they should be part
+     *        of the server config? */
+    manager->GlobalPublishingInterval = (UA_Int32_BoundedValue) { .maxValue = 10000, .minValue = 0, .currentValue=0 };
+    manager->GlobalLifeTimeCount = (UA_UInt32_BoundedValue) { .maxValue = 15000, .minValue = 0, .currentValue=0 };
+    manager->GlobalKeepAliveCount = (UA_UInt32_BoundedValue) { .maxValue = 100, .minValue = 0, .currentValue=0 };
+    manager->GlobalNotificationsPerPublish = (UA_Int32_BoundedValue)  { .maxValue = 1000, .minValue = 1, .currentValue=0 };
+    manager->GlobalSamplingInterval = (UA_UInt32_BoundedValue) { .maxValue = 1000, .minValue = 5, .currentValue=0 };
+    manager->GlobalQueueSize = (UA_UInt32_BoundedValue) { .maxValue = 100, .minValue = 0, .currentValue=0 };
+    LIST_INIT(&manager->ServerSubscriptions);
+    manager->LastSessionID = (UA_UInt32) UA_DateTime_now();
+    
+    // Initialize a GUID with a 2^64 time dependant part, then fold the time in on itself to provide a more randomish
+    // Counter
+    // NOTE: On a 32 bit plattform, assigning 64 bit (2 regs) is allowed by the compiler, but shifting though multiple
+    //       regs is usually not. To support both 32 and 64bit, the struct splits the 64Bit timestamp into two parts.
+    union {
+        struct {
+            UA_UInt32 ui32h;
+            UA_UInt32 ui32l;
+        };
+        UA_UInt64 ui64;
+    } guidInitH;
+    guidInitH.ui64 = (UA_UInt64) UA_DateTime_now();
+    manager->LastJobGuid.data1 = guidInitH.ui32h;
+    manager->LastJobGuid.data2 = (UA_UInt16) (guidInitH.ui32l >> 16);
+    manager->LastJobGuid.data3 = (UA_UInt16) (guidInitH.ui32l);
+    union {
+        struct {
+            UA_UInt32 ui32h;
+            UA_UInt32 ui32l;
+        };
+        UA_UInt64 ui64;
+    } guidInitL;
+    guidInitL.ui64 = (UA_UInt64) UA_DateTime_now();
+    manager->LastJobGuid.data4[0] = (UA_Byte) guidInitL.ui32l;
+    manager->LastJobGuid.data4[1] = (UA_Byte) (guidInitL.ui32l >> 8); 
+    manager->LastJobGuid.data4[2] = (UA_Byte) (guidInitL.ui32l >> 16);
+    manager->LastJobGuid.data4[3] = (UA_Byte) (guidInitL.ui32l >> 24);
+    manager->LastJobGuid.data4[4] = (UA_Byte) (manager->LastJobGuid.data4[0]) ^ (guidInitL.ui32h);
+    manager->LastJobGuid.data4[5] = (UA_Byte) (manager->LastJobGuid.data4[0]) ^ (guidInitL.ui32h >> 8);
+    manager->LastJobGuid.data4[6] = (UA_Byte) (manager->LastJobGuid.data4[1]) ^ (guidInitL.ui32h >> 16);
+    manager->LastJobGuid.data4[7] = (UA_Byte) (manager->LastJobGuid.data4[0]) ^ (guidInitL.ui32h >> 24);
+}
+
+void SubscriptionManager_deleteMembers(UA_Session *session, UA_Server *server) {
+    UA_SubscriptionManager *manager = &(session->subscriptionManager);
+    UA_Subscription *current;
+    while((current = LIST_FIRST(&manager->ServerSubscriptions))) {
+        LIST_REMOVE(current, listEntry);
+        UA_Subscription_deleteMembers(current, server);
+        UA_free(current);
+    }
+}
+
+void SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *newSubscription) {
+    LIST_INSERT_HEAD(&manager->ServerSubscriptions, newSubscription, listEntry);
+}
+
+UA_Subscription *SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager,
+                                                         UA_Int32 SubscriptionID) {
+    UA_Subscription *sub;
+    LIST_FOREACH(sub, &manager->ServerSubscriptions, listEntry) {
+        if(sub->SubscriptionID == SubscriptionID)
+            break;
+    }
+    return sub;
+}
+
+UA_Int32 SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID,
+                                                 UA_UInt32 MonitoredItemID) {
+    UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(manager, SubscriptionID);
+    if(!sub)
+        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+    
+    UA_MonitoredItem *mon;
+    LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) {
+        if (mon->ItemId == MonitoredItemID) {
+            // FIXME!! don't we need to remove the list entry?
+            MonitoredItem_delete(mon);
+            return UA_STATUSCODE_GOOD;
+        }
+    }
+    return UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
+}
+
+UA_Int32 SubscriptionManager_deleteSubscription(UA_Server *server, UA_SubscriptionManager *manager, UA_Int32 SubscriptionID) {
+    UA_Subscription *sub = SubscriptionManager_getSubscriptionByID(manager, SubscriptionID);    
+    if(!sub)
+        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
+
+    UA_Subscription_deleteMembers(sub, server);
+    
+    LIST_REMOVE(sub, listEntry);
+    UA_free(sub);
+    return UA_STATUSCODE_GOOD;
+} 
+
+UA_UInt32 SubscriptionManager_getUniqueUIntID(UA_SubscriptionManager *manager) {
+    UA_UInt32 id = ++(manager->LastSessionID);
+    return id;
+}
+
+UA_Guid SubscriptionManager_getUniqueGUID(UA_SubscriptionManager *manager) {
+    UA_Guid id;
+    unsigned long *incremental;
+    
+    incremental = (unsigned long *) &manager->LastJobGuid.data4[0];
+    incremental++;
+    
+    UA_Guid_copy(&(manager->LastJobGuid), &id);
+    
+    return id;
+}

+ 33 - 0
src/server/ua_subscription_manager.h

@@ -0,0 +1,33 @@
+#ifndef UA_SUBSCRIPTION_MANAGER_H_
+#define UA_SUBSCRIPTION_MANAGER_H_
+
+#include "ua_server.h"
+#include "ua_types.h"
+#include "queue.h"
+#include "ua_nodestore.h"
+#include "ua_subscription.h"
+
+typedef struct UA_SubscriptionManager {
+    UA_Int32_BoundedValue    GlobalPublishingInterval;
+    UA_UInt32_BoundedValue   GlobalLifeTimeCount;
+    UA_UInt32_BoundedValue   GlobalKeepAliveCount;
+    UA_Int32_BoundedValue    GlobalNotificationsPerPublish;
+    UA_UInt32_BoundedValue   GlobalSamplingInterval;
+    UA_UInt32_BoundedValue   GlobalQueueSize;
+    UA_Int32                 LastSessionID;
+    UA_Guid                  LastJobGuid;
+    LIST_HEAD(UA_ListOfUASubscriptions, UA_Subscription_s) ServerSubscriptions;
+} UA_SubscriptionManager;
+
+void SubscriptionManager_init(UA_Session *session);
+void SubscriptionManager_deleteMembers(UA_Session *session, UA_Server *server);
+void SubscriptionManager_addSubscription(UA_SubscriptionManager *manager, UA_Subscription *subscription);
+UA_Subscription *SubscriptionManager_getSubscriptionByID(UA_SubscriptionManager *manager,
+                                                         UA_Int32 SubscriptionID);
+UA_Int32 SubscriptionManager_deleteSubscription(UA_Server *server, UA_SubscriptionManager *manager, UA_Int32 SubscriptionID);
+UA_Int32 SubscriptionManager_deleteMonitoredItem(UA_SubscriptionManager *manager, UA_Int32 SubscriptionID,
+                                                 UA_UInt32 MonitoredItemID);
+
+UA_UInt32 SubscriptionManager_getUniqueUIntID(UA_SubscriptionManager *manager);
+UA_Guid SubscriptionManager_getUniqueGUID(UA_SubscriptionManager *manager);
+#endif /* UA_SUBSCRIPTION_MANAGER_H_ */

+ 7 - 1
src/ua_session.c

@@ -41,11 +41,14 @@ void UA_Session_init(UA_Session *session) {
     session->timeout = 0;
     UA_DateTime_init(&session->validTill);
     session->channel = UA_NULL;
+#ifdef ENABLE_SUBSCRIPTIONS
+    SubscriptionManager_init(session);
+#endif
     session->availableContinuationPoints = MAXCONTINUATIONPOINTS;
     LIST_INIT(&session->continuationPoints);
 }
 
-void UA_Session_deleteMembersCleanup(UA_Session *session) {
+void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server* server) {
     UA_ApplicationDescription_deleteMembers(&session->clientDescription);
     UA_NodeId_deleteMembers(&session->authenticationToken);
     UA_NodeId_deleteMembers(&session->sessionId);
@@ -59,6 +62,9 @@ void UA_Session_deleteMembersCleanup(UA_Session *session) {
     }
     if(session->channel)
         UA_SecureChannel_detachSession(session->channel, session);
+#ifdef ENABLE_SUBSCRIPTIONS
+    SubscriptionManager_deleteMembers(session, server);
+#endif
 }
 
 void UA_Session_updateLifetime(UA_Session *session) {

+ 9 - 1
src/ua_session.h

@@ -4,9 +4,14 @@
 #include "queue.h"
 #include "ua_types.h"
 #include "ua_securechannel.h"
+#include "ua_server.h"
 
 #define MAXCONTINUATIONPOINTS 5
 
+#ifdef ENABLE_SUBSCRIPTIONS
+#include "server/ua_subscription_manager.h"
+#endif
+
 /**
  *  @ingroup communication
  *
@@ -31,6 +36,9 @@ struct UA_Session {
     UA_UInt32         maxResponseMessageSize;
     UA_Int64          timeout;
     UA_DateTime       validTill;
+    #ifdef ENABLE_SUBSCRIPTIONS
+        UA_SubscriptionManager subscriptionManager;
+    #endif
     UA_SecureChannel *channel;
     UA_UInt16 availableContinuationPoints;
     LIST_HEAD(ContinuationPointList, ContinuationPointEntry) continuationPoints;
@@ -40,7 +48,7 @@ extern UA_Session anonymousSession; ///< If anonymous access is allowed, this se
 extern UA_Session adminSession; ///< Local access to the services (for startup and maintenance) uses this Session with all possible access rights (Session ID: 1)
 
 void UA_Session_init(UA_Session *session);
-void UA_Session_deleteMembersCleanup(UA_Session *session);
+void UA_Session_deleteMembersCleanup(UA_Session *session, UA_Server *server);
 
 /** If any activity on a session happens, the timeout is extended */
 void UA_Session_updateLifetime(UA_Session *session);

+ 16 - 0
src/ua_util.h

@@ -22,6 +22,22 @@
 # define UA_RESTRICT restrict
 #endif
 
+/* Visual Studio does not know fnct/unistd file access results */
+#ifdef _MSC_VER
+    #ifndef R_OK
+        #define R_OK    4               /* Test for read permission.  */
+    #endif
+    #ifndef R_OK
+        #define W_OK    2               /* Test for write permission.  */
+    #endif
+    #ifndef X_OK
+        #define X_OK    1               /* Test for execute permission.  */
+    #endif
+    #ifndef F_OK
+        #define F_OK    0               /* Test for existence.  */
+    #endif
+#endif
+
 #define UA_NULL ((void *)0)
 
 // subtract from nodeids to get from the encoding to the content

+ 8 - 1
tools/generate_datatypes.py

@@ -48,7 +48,7 @@ minimal_types = ["InvalidType", "Node", "NodeClass", "ReferenceNode", "Applicati
                  "TranslateBrowsePathsToNodeIdsResponse", "BrowsePath", "BrowsePathResult", "RelativePath",
                  "BrowsePathTarget", "RelativePathElement", "CreateSubscriptionRequest", "CreateSubscriptionResponse",
                  "BrowseResponse", "BrowseResult", "ReferenceDescription", "BrowseRequest", "ViewDescription",
-                 "BrowseNextRequest", "BrowseNextResponse",
+                 "BrowseNextRequest", "BrowseNextResponse", "DeleteSubscriptionsRequest", "DeleteSubscriptionsResponse",
                  "BrowseDescription", "BrowseDirection", "CloseSessionRequest", "AddNodesRequest", "AddNodesResponse",
                  "AddNodesItem", "AddNodesResult", "DeleteNodesItem","AddReferencesRequest", "AddReferencesResponse",
                  "AddReferencesItem","DeleteReferencesItem", "VariableNode", "MethodNode", "VariableTypeNode",
@@ -61,6 +61,10 @@ minimal_types = ["InvalidType", "Node", "NodeClass", "ReferenceNode", "Applicati
                  "UserIdentityToken", "UserNameIdentityToken", "AnonymousIdentityToken", "ServiceFault",
                  "CallMethodRequest", "CallMethodResult", "CallResponse", "CallRequest", "Argument"]
 
+subscription_types = ["DeleteMonitoredItemsRequest", "DeleteMonitoredItemsResponse", "NotificationMessage",
+                      "MonitoredItemNotification", "DataChangeNotification", "ModifySubscriptionRequest",
+                      "ModifySubscriptionResponse"]
+
 class TypeDescription(object):
     def __init__(self, name, nodeid, namespaceid):
         self.name = name # without the UA_ prefix
@@ -437,6 +441,7 @@ def parseTypeDefinitions(xmlDescription, existing_types = OrderedDict()):
 
 parser = argparse.ArgumentParser()
 parser.add_argument('--ns0-types-xml', nargs=1, help='xml-definition of the ns0 types that are assumed to already exist')
+parser.add_argument('--enable-subscription-types', nargs=1, help='Generate datatypes necessary for Montoring and Subscriptions.')
 parser.add_argument('--typedescriptions', nargs=1, help='csv file with type descriptions')
 parser.add_argument('namespace_id', type=int, help='the id of the target namespace')
 parser.add_argument('types_xml', help='path/to/Opc.Ua.Types.bsd')
@@ -446,6 +451,8 @@ args = parser.parse_args()
 outname = args.outfile.split("/")[-1] 
 inname = args.types_xml.split("/")[-1]
 existing_types = OrderedDict()
+if args.enable_subscription_types:
+    minimal_types = minimal_types + subscription_types
 if args.namespace_id == 0 or args.ns0_types_xml:
     existing_types = OrderedDict([(t, BuiltinType(t)) for t in builtin_types])
 if args.ns0_types_xml:

+ 2 - 2
tools/pyUANamespace/generate_open62541CCode.py

@@ -60,7 +60,7 @@ if __name__ == '__main__':
   supressGenerationOfAttribute=[]
 
   GLOBAL_LOG_LEVEL = LOG_LEVEL_DEBUG
-
+  
   arg_isIgnore    = False
   arg_isBlacklist = False
   arg_isSupress   = False
@@ -174,7 +174,7 @@ if __name__ == '__main__':
       else:
         ignoreNodes.append(ns.getNodeByIDString(id))
     ig.close()
-
+  
   # Create the C Code
   log(None, "Generating Header", LOG_LEVEL_INFO)
   # Returns a tuple of (["Header","lines"],["Code","lines","generated"])

+ 5 - 1
tools/pyUANamespace/open62541_MacroHelper.py

@@ -106,7 +106,11 @@ class open62541_MacroHelper():
 
     code.append(nodetype + " *" + node.getCodePrintableID() + " = " + nodetype + "_new();")
     if not "browsename" in self.supressGenerationOfAttribute:
-      code.append(node.getCodePrintableID() + "->browseName = UA_QUALIFIEDNAME_ALLOC(" +  str(node.id().ns) + ", \"" + node.browseName() + "\");")
+      extrNs = node.browseName().split(":")
+      if len(extrNs) > 1:
+        code.append(node.getCodePrintableID() + "->browseName = UA_QUALIFIEDNAME_ALLOC(" +  str(extrNs[0]) + ", \"" + extrNs[1] + "\");")
+      else:
+        code.append(node.getCodePrintableID() + "->browseName = UA_QUALIFIEDNAME_ALLOC(0, \"" + node.browseName() + "\");")
     if not "displayname" in self.supressGenerationOfAttribute:
       code.append(node.getCodePrintableID() + "->displayName = UA_LOCALIZEDTEXT_ALLOC(\"en_US\", \"" +  node.displayName() + "\");")
     if not "description" in self.supressGenerationOfAttribute:

+ 5 - 0
tools/pyUANamespace/ua_builtin_types.py

@@ -328,6 +328,11 @@ class opcua_value_t():
     # -1: Scalar
     #  0: x-dim | x>0
     #  n: n-dim | n>0
+    if (len(self.value) == 0):
+      return code
+    if not isinstance(self.value[0], opcua_value_t):
+      return code
+  
     if self.parent.valueRank() != -1 and (self.parent.valueRank() >=0 or (len(self.value) > 1 and (self.parent.valueRank() != -2 or self.parent.valueRank() != -3))):
       # User the following strategy for all directly mappable values a la 'UA_Type MyInt = (UA_Type) 23;'
       if self.value[0].__binTypeId__ == BUILTINTYPE_TYPEID_GUID:

+ 1 - 1
tools/pyUANamespace/ua_node_types.py

@@ -712,7 +712,7 @@ class opcua_node_t:
     if self in unPrintedNodes:
       # This is necessery to make printing work at all!
       unPrintedNodes.remove(self)
-
+    
     code.append("} while(0);")
     return code
 

File diff suppressed because it is too large
+ 2 - 5953
tools/schema/namespace0/Opc.Ua.NodeSet2.xml