Przeglądaj źródła

separate jobs for monitoreditem (sampling) and subscriptions (sending notifications)

Julius Pfrommer 9 lat temu
rodzic
commit
c8863cef1e

+ 0 - 4
src/server/ua_server_internal.h

@@ -9,10 +9,6 @@
 #include "ua_securechannel_manager.h"
 #include "ua_nodestore.h"
 
-#ifdef UA_ENABLE_SUBSCRIPTIONS
-#include "ua_subscription_manager.h"
-#endif
-
 #define ANONYMOUS_POLICY "open62541-anonymous-policy"
 #define USERNAME_POLICY "open62541-username-policy"
 

+ 12 - 8
src/server/ua_services_subscription.c

@@ -50,8 +50,8 @@ void Service_CreateSubscription(UA_Server *server, UA_Session *session,
 }
 
 static void
-createMonitoredItems(UA_Server *server, UA_Session *session, UA_Subscription *sub,
-                     const UA_MonitoredItemCreateRequest *request, UA_MonitoredItemCreateResult *result) {
+createMonitoredItem(UA_Server *server, UA_Session *session, UA_Subscription *sub,
+                    const UA_MonitoredItemCreateRequest *request, UA_MonitoredItemCreateResult *result) {
     const UA_Node *target = UA_NodeStore_get(server->nodestore, &request->itemToMonitor.nodeId);
     if(!target) {
         result->statusCode = UA_STATUSCODE_BADNODEIDINVALID;
@@ -67,7 +67,7 @@ createMonitoredItems(UA_Server *server, UA_Session *session, UA_Subscription *su
     UA_StatusCode retval = UA_NodeId_copy(&target->nodeId, &newMon->monitoredNodeId);
     if(retval != UA_STATUSCODE_GOOD) {
         result->statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
-        MonitoredItem_delete(newMon);
+        MonitoredItem_delete(server, newMon);
         return;
     }
 
@@ -94,6 +94,9 @@ createMonitoredItems(UA_Server *server, UA_Session *session, UA_Subscription *su
     newMon->discardOldest = request->requestedParameters.discardOldest;
     LIST_INSERT_HEAD(&sub->MonitoredItems, newMon, listEntry);
 
+    // todo: handle return code
+    MonitoredItem_registerSampleJob(server, newMon);
+
     // todo: add a job that samples the value (for fixed intervals)
     // todo: add a pointer to the monitoreditem to the variable, so that events get propagated
 }
@@ -120,7 +123,7 @@ void Service_CreateMonitoredItems(UA_Server *server, UA_Session *session,
     response->resultsSize = request->itemsToCreateSize;
 
     for(size_t i = 0; i < request->itemsToCreateSize; i++)
-        createMonitoredItems(server, session, sub, &request->itemsToCreate[i], &response->results[i]);
+        createMonitoredItem(server, session, sub, &request->itemsToCreate[i], &response->results[i]);
 }
 
 void
@@ -155,9 +158,10 @@ Service_Publish(UA_Server *server, UA_Session *session, const UA_PublishRequest
         if(sub->timedUpdateIsRegistered == false) {
             // FIXME: We are forcing a value update for monitored items. This should be done by the event system.
             // NOTE:  There is a clone of this functionality in the Subscription_timedUpdateNotificationsJob
-            UA_MonitoredItem *mon;
-            LIST_FOREACH(mon, &sub->MonitoredItems, listEntry)
-                MonitoredItem_QueuePushDataValue(server, mon);
+            // done by the sampling job
+            /* UA_MonitoredItem *mon; */
+            /* LIST_FOREACH(mon, &sub->MonitoredItems, listEntry) */
+            /*     MonitoredItem_QueuePushDataValue(server, mon); */
             
             // FIXME: We are forcing notification updates for the subscription. This
             // should be done by a timed work item.
@@ -285,7 +289,7 @@ void Service_DeleteMonitoredItems(UA_Server *server, UA_Session *session,
 
     for(size_t i = 0; i < request->monitoredItemIdsSize; i++)
         response->results[i] =
-            UA_Session_deleteMonitoredItem(session, sub->subscriptionID,
+            UA_Session_deleteMonitoredItem(server, session, sub->subscriptionID,
                                            request->monitoredItemIds[i]);
 }
 

+ 66 - 61
src/server/ua_subscription.c

@@ -30,7 +30,7 @@ void UA_Subscription_deleteMembers(UA_Subscription *subscription, UA_Server *ser
     UA_MonitoredItem *mon, *tmp_mon;
     LIST_FOREACH_SAFE(mon, &subscription->MonitoredItems, listEntry, tmp_mon) {
         LIST_REMOVE(mon, listEntry);
-        MonitoredItem_delete(mon);
+        MonitoredItem_delete(server, mon);
     }
     
     // Delete unpublished Notifications
@@ -242,33 +242,29 @@ UA_StatusCode Subscription_unregisterUpdateJob(UA_Server *server, UA_Subscriptio
 /*****************/
 
 UA_MonitoredItem * UA_MonitoredItem_new() {
-    UA_MonitoredItem *new = (UA_MonitoredItem *) UA_malloc(sizeof(UA_MonitoredItem));
-    new->queueSize   = (UA_BoundedUInt32) { .min = 0, .max = 0, .current = 0};
-    new->lastSampled = 0;
+    UA_MonitoredItem *new = UA_malloc(sizeof(UA_MonitoredItem));
+    new->queueSize = (UA_BoundedUInt32) { .min = 0, .max = 0, .current = 0};
     // FIXME: This is currently hardcoded;
     new->monitoredItemType = MONITOREDITEM_TYPE_CHANGENOTIFY;
     TAILQ_INIT(&new->queue);
     UA_NodeId_init(&new->monitoredNodeId);
-    new->lastSampledValue.data = 0;
+    new->lastSampledValue = UA_BYTESTRING_NULL;
+    memset(&new->sampleJobGuid, 0, sizeof(UA_Guid));
+    new->sampleJobIsRegistered = false;
     return new;
 }
 
-void MonitoredItem_delete(UA_MonitoredItem *monitoredItem) {
-    // Delete Queued Data
+void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
+    MonitoredItem_unregisterUpdateJob(server, monitoredItem);
     MonitoredItem_ClearQueue(monitoredItem);
-    // Remove from subscription list
     LIST_REMOVE(monitoredItem, listEntry);
-    // Release comparison sample
-    if(monitoredItem->lastSampledValue.data != NULL) { 
-      UA_free(monitoredItem->lastSampledValue.data);
-    }
-    
-    UA_NodeId_deleteMembers(&(monitoredItem->monitoredNodeId));
+    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
+    UA_NodeId_deleteMembers(&monitoredItem->monitoredNodeId);
     UA_free(monitoredItem);
 }
 
 UA_UInt32 MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
-                                                 UA_MonitoredItem *monitoredItem) {
+                                                       UA_MonitoredItem *monitoredItem) {
     UA_UInt32 queueSize = 0;
     MonitoredItem_queuedValue *queueItem;
   
@@ -399,65 +395,44 @@ UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, cons
 }
 
 void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem) {
-    UA_ByteString newValueAsByteString = { .length=0, .data=NULL };
-    size_t encodingOffset = 0;
   
-    if(!monitoredItem || monitoredItem->lastSampled + monitoredItem->samplingInterval > UA_DateTime_now())
-        return;
+    /* if(monitoredItem->lastSampled + (UA_MSEC_TO_DATETIME * monitoredItem->samplingInterval) > UA_DateTime_now()) */
+    /*     return; */
   
     // FIXME: Actively suppress non change value based monitoring. There should be
     // another function to handle status and events.
     if(monitoredItem->monitoredItemType != MONITOREDITEM_TYPE_CHANGENOTIFY)
         return;
 
+    /* Verify that the node being monitored is still valid */
+    const UA_Node *target = UA_NodeStore_get(server->nodestore, &monitoredItem->monitoredNodeId);
+    if(!target)
+        return;
+
     MonitoredItem_queuedValue *newvalue = UA_malloc(sizeof(MonitoredItem_queuedValue));
     if(!newvalue)
         return;
-
-    newvalue->listEntry.tqe_next = NULL;
-    newvalue->listEntry.tqe_prev = NULL;
     UA_DataValue_init(&newvalue->value);
-
-    // Verify that the *Node being monitored is still valid
-    // Looking up the in the nodestore is only necessary if we suspect that it is changed during writes
-    // e.g. in multithreaded applications
-    const UA_Node *target = UA_NodeStore_get(server->nodestore, &monitoredItem->monitoredNodeId);
-    if(!target) {
-        UA_free(newvalue);
-        return;
-    }
   
     UA_Boolean samplingError = MonitoredItem_CopyMonitoredValueToVariant(monitoredItem->attributeID, target,
                                                                          &newvalue->value);
 
-    if(samplingError != false || !newvalue->value.value.type) {
+    if(samplingError || !newvalue->value.value.type) {
         UA_DataValue_deleteMembers(&newvalue->value);
         UA_free(newvalue);
         return;
     }
-  
-    if(monitoredItem->queueSize.current >= monitoredItem->queueSize.max) {
-        if(monitoredItem->discardOldest != true) {
-            // We cannot remove the oldest value and theres no queue space left. We're done here.
-            UA_DataValue_deleteMembers(&newvalue->value);
-            UA_free(newvalue);
-            return;
-        }
-        MonitoredItem_queuedValue *queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
-        TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
-        UA_free(queueItem);
-        monitoredItem->queueSize.current--;
-    }
-  
-    // encode the data to find if its different to the previous
+
+    /* encode the data */
     size_t binsize = UA_calcSizeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE]);
+    UA_ByteString newValueAsByteString;
     UA_StatusCode retval = UA_ByteString_allocBuffer(&newValueAsByteString, binsize);
     if(retval != UA_STATUSCODE_GOOD) {
         UA_DataValue_deleteMembers(&newvalue->value);
         UA_free(newvalue);
         return;
     }
-    
+    size_t encodingOffset = 0;
     retval = UA_encodeBinary(&newvalue->value, &UA_TYPES[UA_TYPES_DATAVALUE], &newValueAsByteString, &encodingOffset);
     if(retval != UA_STATUSCODE_GOOD) {
         UA_ByteString_deleteMembers(&newValueAsByteString);
@@ -465,24 +440,54 @@ void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monit
         UA_free(newvalue);
         return;
     }
+
+    /* did the content change? */
+    if(monitoredItem->lastSampledValue.data &&
+       UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue)) {
+        UA_DataValue_deleteMembers(&newvalue->value);
+        UA_free(newvalue);
+        UA_String_deleteMembers(&newValueAsByteString);
+        return;
+    }
   
-    if(!monitoredItem->lastSampledValue.data) { 
-        UA_ByteString_copy(&newValueAsByteString, &monitoredItem->lastSampledValue);
-        TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
-        monitoredItem->queueSize.current++;
-        monitoredItem->lastSampled = UA_DateTime_now();
-        UA_free(newValueAsByteString.data);
-    } else {
-        if(UA_String_equal(&newValueAsByteString, &monitoredItem->lastSampledValue) == true) {
+    /* do we have space? */
+    if(monitoredItem->queueSize.current >= monitoredItem->queueSize.max) {
+        if(!monitoredItem->discardOldest) {
+            // We cannot remove the oldest value and theres no queue space left. We're done here.
             UA_DataValue_deleteMembers(&newvalue->value);
             UA_free(newvalue);
             UA_String_deleteMembers(&newValueAsByteString);
             return;
         }
-        UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
-        monitoredItem->lastSampledValue = newValueAsByteString;
-        TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
-        monitoredItem->queueSize.current++;
-        monitoredItem->lastSampled = UA_DateTime_now();
+        MonitoredItem_queuedValue *queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
+        TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
+        UA_free(queueItem);
+        monitoredItem->queueSize.current--;
     }
+  
+    /* add the sample */
+    UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
+    monitoredItem->lastSampledValue = newValueAsByteString;
+    TAILQ_INSERT_HEAD(&monitoredItem->queue, newvalue, listEntry);
+    monitoredItem->queueSize.current++;
+}
+
+UA_StatusCode MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon) {
+    if(mon->samplingInterval <= 5 ) 
+        return UA_STATUSCODE_BADNOTSUPPORTED;
+
+    UA_Job job = (UA_Job) {.type = UA_JOBTYPE_METHODCALL,
+                           .job.methodCall = {.method = (UA_ServerCallback)MonitoredItem_QueuePushDataValue,
+                                              .data = mon} };
+    
+    UA_StatusCode retval = UA_Server_addRepeatedJob(server, job, (UA_UInt32)mon->samplingInterval,
+                                                    &mon->sampleJobGuid);
+    if(retval == UA_STATUSCODE_GOOD)
+        mon->sampleJobIsRegistered = true;
+    return retval;
+}
+
+UA_StatusCode MonitoredItem_unregisterUpdateJob(UA_Server *server, UA_MonitoredItem *mon) {
+    mon->sampleJobIsRegistered = false;
+    return UA_Server_removeRepeatedJob(server, mon->sampleJobGuid);
 }

+ 8 - 4
src/server/ua_subscription.h

@@ -18,12 +18,14 @@ typedef enum {
 } UA_MONITOREDITEM_TYPE;
 
 typedef struct MonitoredItem_queuedValue {
-    UA_DataValue value;
     TAILQ_ENTRY(MonitoredItem_queuedValue) listEntry;
+    UA_DataValue value;
 } MonitoredItem_queuedValue;
 
 typedef struct UA_MonitoredItem {
     LIST_ENTRY(UA_MonitoredItem) listEntry;
+    UA_Guid sampleJobGuid;
+    UA_Boolean sampleJobIsRegistered;
     UA_UInt32 itemId;
     UA_MONITOREDITEM_TYPE monitoredItemType;
     UA_UInt32 timestampsToReturn;
@@ -32,17 +34,16 @@ typedef struct UA_MonitoredItem {
     UA_UInt32 attributeID;
     UA_UInt32 clientHandle;
     UA_UInt32 samplingInterval; // [ms]
-    UA_BoundedUInt32 queueSize;
     UA_Boolean discardOldest;
-    UA_DateTime lastSampled;
     UA_ByteString lastSampledValue;
     // FIXME: indexRange is ignored; array values default to element 0
     // FIXME: dataEncoding is hardcoded to UA binary
     TAILQ_HEAD(QueueOfQueueDataValues, MonitoredItem_queuedValue) queue;
+    UA_BoundedUInt32 queueSize;
 } UA_MonitoredItem;
 
 UA_MonitoredItem *UA_MonitoredItem_new(void);
-void MonitoredItem_delete(UA_MonitoredItem *monitoredItem);
+void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem);
 void MonitoredItem_QueuePushDataValue(UA_Server *server, UA_MonitoredItem *monitoredItem);
 void MonitoredItem_ClearQueue(UA_MonitoredItem *monitoredItem);
 UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, const UA_Node *src,
@@ -50,6 +51,9 @@ UA_Boolean MonitoredItem_CopyMonitoredValueToVariant(UA_UInt32 attributeID, cons
 UA_UInt32 MonitoredItem_QueueToDataChangeNotifications(UA_MonitoredItemNotification *dst,
                                                        UA_MonitoredItem *monitoredItem);
 
+UA_StatusCode MonitoredItem_unregisterUpdateJob(UA_Server *server, UA_MonitoredItem *mon);
+UA_StatusCode MonitoredItem_registerSampleJob(UA_Server *server, UA_MonitoredItem *mon);
+
 /****************/
 /* Subscription */
 /****************/

+ 2 - 2
src/ua_session.c

@@ -92,7 +92,7 @@ UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID) {
 
 
 UA_StatusCode
-UA_Session_deleteMonitoredItem(UA_Session *session, UA_UInt32 subscriptionID,
+UA_Session_deleteMonitoredItem(UA_Server *server, UA_Session *session, UA_UInt32 subscriptionID,
                                UA_UInt32 monitoredItemID) {
     UA_Subscription *sub = UA_Session_getSubscriptionByID(session, subscriptionID);
     if(!sub)
@@ -102,7 +102,7 @@ UA_Session_deleteMonitoredItem(UA_Session *session, UA_UInt32 subscriptionID,
     LIST_FOREACH_SAFE(mon, &sub->MonitoredItems, listEntry, tmp_mon) {
         if(mon->itemId == monitoredItemID) {
             LIST_REMOVE(mon, listEntry);
-            MonitoredItem_delete(mon);
+            MonitoredItem_delete(server, mon);
             return UA_STATUSCODE_GOOD;
         }
     }

+ 2 - 2
src/ua_session.h

@@ -55,8 +55,8 @@ UA_Subscription *
 UA_Session_getSubscriptionByID(UA_Session *session, UA_UInt32 subscriptionID);
 
 UA_StatusCode
-UA_Session_deleteMonitoredItem(UA_Session *session, UA_UInt32 subscriptionID,
-                               UA_UInt32 monitoredItemID);
+UA_Session_deleteMonitoredItem(UA_Server *server, UA_Session *session,
+                               UA_UInt32 subscriptionID, UA_UInt32 monitoredItemID);
 
 UA_StatusCode
 UA_Session_deleteSubscription(UA_Server *server, UA_Session *session,