浏览代码

refactor: decompose MonitoredItem_SampleCallback into smaller chunks

Julius Pfrommer 8 年之前
父节点
当前提交
bdf0cf6658
共有 1 个文件被更改,包括 96 次插入91 次删除
  1. 96 91
      src/server/ua_subscription.c

+ 96 - 91
src/server/ua_subscription.c

@@ -6,7 +6,7 @@
 
 #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
 
-#define UA_VALUENCODING_MAXSTACK 2048
+#define UA_VALUENCODING_MAXSTACK 512
 
 /*****************/
 /* MonitoredItem */
@@ -46,13 +46,84 @@ void MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *monitoredItem) {
     UA_free(monitoredItem);
 }
 
+static void
+ensureSpaceInMonitoredItemQueue(UA_MonitoredItem *mon) {
+    if(mon->currentQueueSize < mon->maxQueueSize)
+        return;
+    MonitoredItem_queuedValue *queueItem;
+    if(mon->discardOldest)
+        queueItem = TAILQ_FIRST(&mon->queue);
+    else
+        queueItem = TAILQ_LAST(&mon->queue, QueueOfQueueDataValues);
+    UA_assert(queueItem); /* When the currentQueueSize > 0, then there is an item */
+    TAILQ_REMOVE(&mon->queue, queueItem, listEntry);
+    UA_DataValue_deleteMembers(&queueItem->value);
+    UA_free(queueItem);
+    mon->currentQueueSize--;
+}
+
+/* Has this sample changed from the last one? The method may allocate additional
+ * space for the encoding buffer. Detect the change in encoding->data. */
+static UA_StatusCode
+detectValueChange(UA_MonitoredItem *mon, UA_DataValue *value,
+                  UA_ByteString *encoding, UA_Boolean *changed) {
+    /* Apply Filter */
+    UA_Boolean hasValue = value->hasValue;
+    if(mon->trigger == UA_DATACHANGETRIGGER_STATUS)
+        value->hasValue = false;
+    UA_Boolean hasServerTimestamp = value->hasServerTimestamp;
+    UA_Boolean hasServerPicoseconds = value->hasServerPicoseconds;
+    value->hasServerTimestamp = false;
+    value->hasServerPicoseconds = false;
+    UA_Boolean hasSourceTimestamp = value->hasSourceTimestamp;
+    UA_Boolean hasSourcePicoseconds = value->hasSourcePicoseconds;
+    if(mon->trigger < UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP) {
+        value->hasSourceTimestamp = false;
+        value->hasSourcePicoseconds = false;
+    }
+
+    /* Encode the data for comparison */
+    UA_StatusCode retval = UA_STATUSCODE_GOOD;
+    size_t binsize = UA_calcSizeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE]);
+    if(binsize == 0) {
+        retval = UA_STATUSCODE_BADINTERNALERROR;
+        goto cleanup;
+    }
+
+    /* Allocate buffer on the heap if necessary */
+    if(binsize > UA_VALUENCODING_MAXSTACK &&
+       UA_ByteString_allocBuffer(encoding, binsize) != UA_STATUSCODE_GOOD) {
+        retval = UA_STATUSCODE_BADOUTOFMEMORY;
+        goto cleanup;
+    }
+
+    /* Encode the value */
+    size_t encodingOffset = 0;
+    retval = UA_encodeBinary(value, &UA_TYPES[UA_TYPES_DATAVALUE],
+                             NULL, NULL, encoding, &encodingOffset);
+    if(retval != UA_STATUSCODE_GOOD)
+        goto cleanup;
+
+    /* The value has changed */
+    if(!mon->lastSampledValue.data || !UA_String_equal(encoding, &mon->lastSampledValue))
+        *changed = true;
+
+ cleanup:
+    /* Reset the filter */
+    value->hasValue = hasValue;
+    value->hasServerTimestamp = hasServerTimestamp;
+    value->hasServerPicoseconds = hasServerPicoseconds;
+    value->hasSourceTimestamp = hasSourceTimestamp;
+    value->hasSourcePicoseconds = hasSourcePicoseconds;
+    return retval;
+}
+
 void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monitoredItem) {
     UA_Subscription *sub = monitoredItem->subscription;
     if(monitoredItem->monitoredItemType != UA_MONITOREDITEMTYPE_CHANGENOTIFY) {
         UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
                              "Subscription %u | MonitoredItem %i | "
-                             "Cannot process a monitoreditem that is not "
-                             "a data change notification",
+                             "Not a data change notification",
                              sub->subscriptionID, monitoredItem->itemId);
         return;
     }
@@ -74,53 +145,18 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
     UA_DataValue_init(&value);
     Service_Read_single(server, sub->session, ts, &rvid, &value);
 
-    /* Apply Filter. This needs to be reset later on, as the filter may be
-     * different from the encoded value. */
-    UA_Boolean hasValue = value.hasValue;
-    UA_Boolean hasServerTimestamp = value.hasServerTimestamp;
-    UA_Boolean hasServerPicoseconds = value.hasServerPicoseconds;
-    UA_Boolean hasSourceTimestamp = value.hasSourceTimestamp;
-    UA_Boolean hasSourcePicoseconds = value.hasSourcePicoseconds;
-    value.hasServerTimestamp = false;
-    value.hasServerPicoseconds = false;
-    if(monitoredItem->trigger == UA_DATACHANGETRIGGER_STATUS)
-        value.hasValue = false;
-    if(monitoredItem->trigger < UA_DATACHANGETRIGGER_STATUSVALUETIMESTAMP) {
-        value.hasSourceTimestamp = false;
-        value.hasSourcePicoseconds = false;
-    }
-
-    /* Encode the data for comparison */
-    size_t binsize = UA_calcSizeBinary(&value, &UA_TYPES[UA_TYPES_DATAVALUE]);
-    if(binsize == 0) {
-        UA_DataValue_deleteMembers(&value);
-        return;
-    }
-    UA_ByteString comparisonByteString;
-    UA_Boolean comparisonByteStringOnStack = (binsize <= UA_VALUENCODING_MAXSTACK);
-    if(comparisonByteStringOnStack) {
-        comparisonByteString.data = (UA_Byte*)UA_alloca(binsize);
-        comparisonByteString.length = binsize;
-    } else {
-        if(UA_ByteString_allocBuffer(&comparisonByteString, binsize) !=
-           UA_STATUSCODE_GOOD) {
-            UA_DataValue_deleteMembers(&value);
-            return;
-        }
-    }
-
-    /* Error or the value has not changed */
-    size_t encodingOffset = 0;
-    if((UA_encodeBinary(&value, &UA_TYPES[UA_TYPES_DATAVALUE],
-                       NULL, NULL, &comparisonByteString,
-                       &encodingOffset) != UA_STATUSCODE_GOOD) ||
-       (monitoredItem->lastSampledValue.data &&
-        UA_String_equal(&comparisonByteString, &monitoredItem->lastSampledValue))) {
-        UA_LOG_TRACE_SESSION(server->config.logger, sub->session, "Subscription %u | "
-                             "MonitoredItem %u | Do not sample an unchanged value",
-                             sub->subscriptionID, monitoredItem->itemId);
+    /* Stack-allocate some memory for the value encoding */
+    UA_Byte *stackValueEncoding = UA_alloca(UA_VALUENCODING_MAXSTACK);
+    UA_ByteString valueEncoding;
+    valueEncoding.data = stackValueEncoding;
+    valueEncoding.length = UA_VALUENCODING_MAXSTACK;
+
+    /* Has the value changed? */
+    UA_Boolean changed = false;
+    UA_StatusCode retval = detectValueChange(monitoredItem, &value,
+                                             &valueEncoding, &changed);
+    if(!changed || retval != UA_STATUSCODE_GOOD)
         goto cleanup;
-    }
 
     /* Allocate the entry for the publish queue */
     MonitoredItem_queuedValue *newQueueItem = UA_malloc(sizeof(MonitoredItem_queuedValue));
@@ -132,10 +168,10 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
         goto cleanup;
     }
 
-    /* Copy comparisonByteString on the heap */
-    if(comparisonByteStringOnStack) {
+    /* Copy valueEncoding on the heap for the next comparison (if not already done) */
+    if(valueEncoding.data == stackValueEncoding) {
         UA_ByteString cbs;
-        if(UA_ByteString_copy(&comparisonByteString, &cbs) != UA_STATUSCODE_GOOD) {
+        if(UA_ByteString_copy(&valueEncoding, &cbs) != UA_STATUSCODE_GOOD) {
             UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
                                    "Subscription %u | MonitoredItem %i | "
                                    "ByteString to compare values could not be created",
@@ -143,28 +179,11 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
             UA_free(newQueueItem);
             goto cleanup;
         }
-        comparisonByteString = cbs;
-        comparisonByteStringOnStack = false;
-    }
-
-    /* Restore the settings changed for the filter */
-    value.hasValue = hasValue;
-    value.hasServerTimestamp = hasServerTimestamp;
-    value.hasServerPicoseconds = hasServerPicoseconds;
-    if(monitoredItem->timestampsToReturn == UA_TIMESTAMPSTORETURN_SERVER ||
-       monitoredItem->timestampsToReturn == UA_TIMESTAMPSTORETURN_NEITHER) {
-        value.hasSourceTimestamp = false;
-        value.hasSourcePicoseconds = false;
-    } else {
-        value.hasSourceTimestamp = hasSourceTimestamp;
-        value.hasSourcePicoseconds = hasSourcePicoseconds;
+        valueEncoding = cbs;
     }
 
     /* Prepare the newQueueItem */
     if(value.hasValue && value.value.storageType == UA_VARIANT_DATA_NODELETE) {
-        /* If the read request returned a datavalue pointing into the nodestore,
-         * we must make a deep copy to keep the datavalue across mainloop
-         * iterations */
         if(UA_DataValue_copy(&value, &newQueueItem->value) != UA_STATUSCODE_GOOD) {
             UA_LOG_WARNING_SESSION(server->config.logger, sub->session,
                                    "Subscription %u | MonitoredItem %i | "
@@ -174,7 +193,6 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
             goto cleanup;
         }
     } else {
-        /* Copy the content with pointers (do not clean up value) */
         newQueueItem->value = value;
     }
     newQueueItem->clientHandle = monitoredItem->clientHandle;
@@ -182,35 +200,22 @@ void UA_MoniteredItem_SampleCallback(UA_Server *server, UA_MonitoredItem *monito
     /* <-- Point of no return --> */
 
     UA_LOG_DEBUG_SESSION(server->config.logger, sub->session,
-                         "Subscription %u | MonitoredItem %u | "
-                         "Sampled a new value", sub->subscriptionID, monitoredItem->itemId);
-
-    /* Is enough space in the queue? */
-    if(monitoredItem->currentQueueSize >= monitoredItem->maxQueueSize) {
-        MonitoredItem_queuedValue *queueItem;
-        if(monitoredItem->discardOldest)
-            queueItem = TAILQ_FIRST(&monitoredItem->queue);
-        else
-            queueItem = TAILQ_LAST(&monitoredItem->queue, QueueOfQueueDataValues);
-        UA_assert(queueItem); /* When the currentQueueSize > 0, then there is an item */
-        TAILQ_REMOVE(&monitoredItem->queue, queueItem, listEntry);
-        UA_DataValue_deleteMembers(&queueItem->value);
-        UA_free(queueItem);
-        monitoredItem->currentQueueSize--;
-    }
+                         "Subscription %u | MonitoredItem %u | Sampled a new value",
+                         sub->subscriptionID, monitoredItem->itemId);
 
-    /* Replace the comparison bytestring with the current sample */
+    /* Replace the encoding for comparison */
     UA_ByteString_deleteMembers(&monitoredItem->lastSampledValue);
-    monitoredItem->lastSampledValue = comparisonByteString;
+    monitoredItem->lastSampledValue = valueEncoding;
 
     /* Add the sample to the queue for publication */
+    ensureSpaceInMonitoredItemQueue(monitoredItem);
     TAILQ_INSERT_TAIL(&monitoredItem->queue, newQueueItem, listEntry);
     monitoredItem->currentQueueSize++;
     return;
 
  cleanup:
-    if(!comparisonByteStringOnStack)
-        UA_ByteString_deleteMembers(&comparisonByteString);
+    if(valueEncoding.data != stackValueEncoding)
+        UA_ByteString_deleteMembers(&valueEncoding);
     UA_DataValue_deleteMembers(&value);
 }