Browse Source

Revert "fix an include that is only used with subscriptions"

This reverts commit d0fb6696f858c123a123a3ae989bded5b67b2486.

Revert "separate jobs for monitoreditem (sampling) and subscriptions (sending notifications)"

This reverts commit c8863cef1efb48345eeb4e1d81c802e113e62924.
Julius Pfrommer 9 years ago
parent
commit
8dd7c7e325

+ 4 - 0
src/server/ua_server_internal.h

@@ -9,6 +9,10 @@
 #include "ua_securechannel_manager.h"
 #include "ua_nodestore.h"
 
+#ifdef UA_ENABLE_SUBSCRIPTIONS
+#include "ua_subscription_manager.h"
+#endif
+
 #define ANONYMOUS_POLICY "open62541-anonymous-policy"
 #define USERNAME_POLICY "open62541-username-policy"
 

+ 8 - 12
src/server/ua_services_subscription.c

@@ -50,8 +50,8 @@ void Service_CreateSubscription(UA_Server *server, UA_Session *session,
 }
 
 static void
-createMonitoredItem(UA_Server *server, UA_Session *session, UA_Subscription *sub,
-                    const UA_MonitoredItemCreateRequest *request, UA_MonitoredItemCreateResult *result) {
+createMonitoredItems(UA_Server *server, UA_Session *session, UA_Subscription *sub,
+                     const UA_MonitoredItemCreateRequest *request, UA_MonitoredItemCreateResult *result) {
     const UA_Node *target = UA_NodeStore_get(server->nodestore, &request->itemToMonitor.nodeId);
     if(!target) {
         result->statusCode = UA_STATUSCODE_BADNODEIDINVALID;
@@ -67,7 +67,7 @@ createMonitoredItem(UA_Server *server, UA_Session *session, UA_Subscription *sub
     UA_StatusCode retval = UA_NodeId_copy(&target->nodeId, &newMon->monitoredNodeId);
     if(retval != UA_STATUSCODE_GOOD) {
         result->statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
-        MonitoredItem_delete(server, newMon);
+        MonitoredItem_delete(newMon);
         return;
     }
 
@@ -94,9 +94,6 @@ createMonitoredItem(UA_Server *server, UA_Session *session, UA_Subscription *sub
     newMon->discardOldest = request->requestedParameters.discardOldest;
     LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
 
-    // todo: handle return code
-    MonitoredItem_registerSampleJob(server, newMon);
-
     // todo: add a job that samples the value (for fixed intervals)
     // todo: add a pointer to the monitoreditem to the variable, so that events get propagated
 }
@@ -123,7 +120,7 @@ void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
     response->resultsSize = request->itemsToCreateSize;
 
     for(size_t i = 0; i < request->itemsToCreateSize; i++)
-        createMonitoredItem(server, session, sub, &request->itemsToCreate[i], &response->results[i]);
+        createMonitoredItems(server, session, sub, &request->itemsToCreate[i], &response->results[i]);
 }
 
 void
@@ -158,10 +155,9 @@ Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest
         if(sub->timedUpdateIsRegistered == false) {
             // FIXME: We are forcing a value update for monitored items. This should be done by the event system.
             // NOTE:  There is a clone of this functionality in the Subscription_timedUpdateNotificationsJob
-            // done by the sampling job
-            /* UA_MonitoredItem *mon; */
-            /* LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) */
-            /*     MonitoredItem_QueuePushDataValue(server, mon); */
+            UA_MonitoredItem *mon;
+            LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
+                MonitoredItem_QueuePushDataValue(server, mon);
             
             // FIXME: We are forcing notification updates for the subscription. This
             // should be done by a timed work item.
@@ -289,7 +285,7 @@ void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
 
     for(size_t i = 0; i < request->monitoredItemIdsSize; i++)
         response->results[i] =
-            UA_Session_deleteMonitoredItem(server, session, sub->subscriptionID,
+            UA_Session_deleteMonitoredItem(session, sub->subscriptionID,
                                            request->monitoredItemIds[i]);
 }
 

+ 61 - 66
src/server/ua_subscription.c

@@ -30,7 +30,7 @@ void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *ser
     UA_MonitoredItem *mon, *tmp_mon;
     LIST_FOREACH_SAFE(mon, &subscription->MonitoredItems, listEntry, tmp_mon) {
         LIST_REMOVE(mon, listEntry);
-        MonitoredItem_delete(server, mon);
+        MonitoredItem_delete(mon);
     }
     
     // Delete unpublished Notifications
@@ -242,29 +242,33 @@ UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscriptio
 /*****************/
 
 UA_MonitoredItem * UA_MonitoredItem_new() {
-    UA_MonitoredItem *new = UA_malloc(sizeof(UA_MonitoredItem));
-    new->queueSize = (UA_BoundedUInt32) { .min = 0, .max = 0, .current = 0};
+    UA_MonitoredItem *new = (UA_MonitoredItem *) UA_malloc(sizeof(UA_MonitoredItem));
+    new->queueSize   = (UA_BoundedUInt32) { .min = 0, .max = 0, .current = 0};
+    new->lastSampled = 0;
     // FIXME: This is currently hardcoded;
     new->monitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
     TAILQ_INIT(&new->queue);
     UA_NodeId_init(&new->monitoredNodeId);
-    new->lastSampledValue = UA_BYTESTRING_NULL;
-    memset(&new->sampleJobGuid, 0, sizeof(UA_Guid));
-    new->sampleJobIsRegistered = false;
+    new->lastSampledValue.data = 0;
     return new;
 }
 
-void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
-    MonitoredItem_unregisterUpdateJob(server, monitoredItem);
+void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
+    // Delete Queued Data
     MonitoredItem_ClearQueue(monitoredItem);
+    // Remove from subscription list
     LIST_REMOVE(monitoredItem, listEntry);
-    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
-    UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
+    // Release comparison sample
+    if(monitoredItem->lastSampledValue.data != NULL) { 
+      UA_free(monitoredItem->lastSampledValue.data);
+    }
+    
+    UA_NodeId_deleteMembers(&(monitoredItem->monitoredNodeId));
     UA_free(monitoredItem);
 }
 
 UA_UInt32 MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
-                                                       UA_MonitoredItem *monitoredItem) {
+                                                 UA_MonitoredItem *monitoredItem) {
     UA_UInt32 queueSize = 0;
     MonitoredItem_queuedValue *queueItem;
   
@@ -395,44 +399,65 @@ UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, cons
 }
 
 void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+    UA_ByteString newValueAsByteString = { .length=0, .data=NULL };
+    size_t encodingOffset = 0;
   
-    /* if(monitoredItem->lastSampled + (UA_MSEC_TO_DATETIME * monitoredItem->samplingInterval) > UA_DateTime_now()) */
-    /*     return; */
+    if(!monitoredItem || monitoredItem->lastSampled + monitoredItem->samplingInterval > UA_DateTime_now())
+        return;
   
     // FIXME: Actively suppress non change value based monitoring. There should be
     // another function to handle status and events.
     if(monitoredItem->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY)
         return;
 
-    /* Verify that the node being monitored is still valid */
-    const UA_Node *target = UA_NodeStore_get(server->nodestore, &monitoredItem->monitoredNodeId);
-    if(!target)
-        return;
-
     MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
     if(!newvalue)
         return;
+
+    newvalue->listEntry.tqe_next = NULL;
+    newvalue->listEntry.tqe_prev = NULL;
     UA_DataValue_init(&newvalue->value);
+
+    // Verify that the *Node being monitored is still valid
+    // Looking up the in the nodestore is only necessary if we suspect that it is changed during writes
+    // e.g. in multithreaded applications
+    const UA_Node *target = UA_NodeStore_get(server->nodestore, &monitoredItem->monitoredNodeId);
+    if(!target) {
+        UA_free(newvalue);
+        return;
+    }
   
     UA_Boolean samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->attributeID, target,
                                                                          &newvalue->value);
 
-    if(samplingError || !newvalue->value.value.type) {
+    if(samplingError != false || !newvalue->value.value.type) {
         UA_DataValue_deleteMembers(&newvalue->value);
         UA_free(newvalue);
         return;
     }
-
-    /* encode the data */
+  
+    if(monitoredItem->queueSize.current >= monitoredItem->queueSize.max) {
+        if(monitoredItem->discardOldest != true) {
+            // We cannot remove the oldest value and theres no queue space left. We're done here.
+            UA_DataValue_deleteMembers(&newvalue->value);
+            UA_free(newvalue);
+            return;
+        }
+        MonitoredItem_queuedValue *queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
+        TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
+        UA_free(queueItem);
+        monitoredItem->queueSize.current--;
+    }
+  
+    // encode the data to find if its different to the previous
     size_t binsize = UA_calcSizeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE]);
-    UA_ByteString newValueAsByteString;
     UA_StatusCode retval = UA_ByteString_allocBuffer(&newValueAsByteString, binsize);
     if(retval != UA_STATUSCODE_GOOD) {
         UA_DataValue_deleteMembers(&newvalue->value);
         UA_free(newvalue);
         return;
     }
-    size_t encodingOffset = 0;
+    
     retval = UA_encodeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE], &newValueAsByteString, &encodingOffset);
     if(retval != UA_STATUSCODE_GOOD) {
         UA_ByteString_deleteMembers(&newValueAsByteString);
@@ -440,54 +465,24 @@ void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monit
         UA_free(newvalue);
         return;
     }
-
-    /* did the content change? */
-    if(monitoredItem->lastSampledValue.data &&
-       UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue)) {
-        UA_DataValue_deleteMembers(&newvalue->value);
-        UA_free(newvalue);
-        UA_String_deleteMembers(&newValueAsByteString);
-        return;
-    }
   
-    /* do we have space? */
-    if(monitoredItem->queueSize.current >= monitoredItem->queueSize.max) {
-        if(!monitoredItem->discardOldest) {
-            // We cannot remove the oldest value and theres no queue space left. We're done here.
+    if(!monitoredItem->lastSampledValue.data) { 
+        UA_ByteString_copy(&newValueAsByteString, &monitoredItem->lastSampledValue);
+        TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
+        monitoredItem->queueSize.current++;
+        monitoredItem->lastSampled = UA_DateTime_now();
+        UA_free(newValueAsByteString.data);
+    } else {
+        if(UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue) == true) {
             UA_DataValue_deleteMembers(&newvalue->value);
             UA_free(newvalue);
             UA_String_deleteMembers(&newValueAsByteString);
             return;
         }
-        MonitoredItem_queuedValue *queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
-        TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
-        UA_free(queueItem);
-        monitoredItem->queueSize.current--;
+        UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
+        monitoredItem->lastSampledValue = newValueAsByteString;
+        TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
+        monitoredItem->queueSize.current++;
+        monitoredItem->lastSampled = UA_DateTime_now();
     }
-  
-    /* add the sample */
-    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
-    monitoredItem->lastSampledValue = newValueAsByteString;
-    TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
-    monitoredItem->queueSize.current++;
-}
-
-UA_StatusCode MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
-    if(mon->samplingInterval <= 5 ) 
-        return UA_STATUSCODE_BADNOTSUPPORTED;
-
-    UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
-                           .job.methodCall = {.method = (UA_ServerCallback)MonitoredItem_QueuePushDataValue,
-                                              .data = mon} };
-    
-    UA_StatusCode retval = UA_Server_addRepeatedJob(server, job, (UA_UInt32)mon->samplingInterval,
-                                                    &mon->sampleJobGuid);
-    if(retval == UA_STATUSCODE_GOOD)
-        mon->sampleJobIsRegistered = true;
-    return retval;
-}
-
-UA_StatusCode MonitoredItem_unregisterUpdateJob(UA_Server *server, UA_MonitoredItem *mon) {
-    mon->sampleJobIsRegistered = false;
-    return UA_Server_removeRepeatedJob(server, mon->sampleJobGuid);
 }

+ 4 - 8
src/server/ua_subscription.h

@@ -18,14 +18,12 @@ typedef enum {
 } UA_MONITOREDITEM_TYPE;
 
 typedef struct MonitoredItem_queuedValue {
-    TAILQ_ENTRY(MonitoredItem_queuedValue) listEntry;
     UA_DataValue value;
+    TAILQ_ENTRY(MonitoredItem_queuedValue) listEntry;
 } MonitoredItem_queuedValue;
 
 typedef struct UA_MonitoredItem {
     LIST_ENTRY(UA_MonitoredItem) listEntry;
-    UA_Guid sampleJobGuid;
-    UA_Boolean sampleJobIsRegistered;
     UA_UInt32 itemId;
     UA_MONITOREDITEM_TYPE monitoredItemType;
     UA_UInt32 timestampsToReturn;
@@ -34,16 +32,17 @@ typedef struct UA_MonitoredItem {
     UA_UInt32 attributeID;
     UA_UInt32 clientHandle;
     UA_UInt32 samplingInterval; // [ms]
+    UA_BoundedUInt32 queueSize;
     UA_Boolean discardOldest;
+    UA_DateTime lastSampled;
     UA_ByteString lastSampledValue;
     // FIXME: indexRange is ignored; array values default to element 0
     // FIXME: dataEncoding is hardcoded to UA binary
     TAILQ_HEAD(QueueOfQueueDataValues, MonitoredItem_queuedValue) queue;
-    UA_BoundedUInt32 queueSize;
 } UA_MonitoredItem;
 
 UA_MonitoredItem *UA_MonitoredItem_new(void);
-void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem);
+void MonitoredItem_delete(UA_MonitoredItem *monitoredItem);
 void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem);
 void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem);
 UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, const UA_Node *src,
@@ -51,9 +50,6 @@ UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, cons
 UA_UInt32 MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
                                                        UA_MonitoredItem *monitoredItem);
 
-UA_StatusCode MonitoredItem_unregisterUpdateJob(UA_Server *server, UA_MonitoredItem *mon);
-UA_StatusCode MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon);
-
 /****************/
 /* Subscription */
 /****************/

+ 3 - 5
src/ua_session.c

@@ -1,8 +1,6 @@
 #include "ua_session.h"
-#include "ua_util.h"
-#ifdef UA_ENABLE_SUBSCRIPTIONS
 #include "server/ua_subscription.h"
-#endif
+#include "ua_util.h"
 
 UA_Session adminSession = {
     .clientDescription =  {.applicationUri = {0, NULL}, .productUri = {0, NULL},
@@ -94,7 +92,7 @@ UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID) {
 
 
 UA_StatusCode
-UA_Session_deleteMonitoredItem(UA_Server *server, UA_Session *session, UA_UInt32 subscriptionID,
+UA_Session_deleteMonitoredItem(UA_Session *session, UA_UInt32 subscriptionID,
                                UA_UInt32 monitoredItemID) {
     UA_Subscription *sub = UA_Session_getSubscriptionByID(session, subscriptionID);
     if(!sub)
@@ -104,7 +102,7 @@ UA_Session_deleteMonitoredItem(UA_Server *server, UA_Session *session, UA_UInt32
     LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, tmp_mon) {
         if(mon->itemId == monitoredItemID) {
             LIST_REMOVE(mon, listEntry);
-            MonitoredItem_delete(server, mon);
+            MonitoredItem_delete(mon);
             return UA_STATUSCODE_GOOD;
         }
     }

+ 2 - 2
src/ua_session.h

@@ -55,8 +55,8 @@ UA_Subscription *
 UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID);
 
 UA_StatusCode
-UA_Session_deleteMonitoredItem(UA_Server *server, UA_Session *session,
-                               UA_UInt32 subscriptionID, UA_UInt32 monitoredItemID);
+UA_Session_deleteMonitoredItem(UA_Session *session, UA_UInt32 subscriptionID,
+                               UA_UInt32 monitoredItemID);
 
 UA_StatusCode
 UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,