Procházet zdrojové kódy

PubSub: Send out NetworkMessages with only one DataSetMessage right away

Julius Pfrommer před 6 roky
rodič
revize
0c19d77f42
1 změnil soubory, kde provedl 42 přidání a 73 odebrání
  1. 42 73
      src/pubsub/ua_pubsub.c

+ 42 - 73
src/pubsub/ua_pubsub.c

@@ -3,6 +3,7 @@
  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  *
  * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
+ * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  */
 
 #include "server/ua_server_internal.h"
@@ -1078,7 +1079,7 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
         return;
     }
 
-    /* How many dsm to encapsulate in one nm */
+    /* How many DSM can be sent in one NM? */
     UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
     if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
         maxDSM = UA_BYTE_MAX;
@@ -1086,30 +1087,13 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
     if(maxDSM == 0)
         maxDSM = 1;
 
-    /* The binary DataSetMessage sizes are part of the payload. Allocate some
-     * stack-memory where to put the current messages. */
-    UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
-    memset(dsmStore, 0, sizeof(UA_DataSetMessage) * writerGroup->writersCount);
-    UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
-    memset(dsWriterIds, 0, writerGroup->writersCount * sizeof(UA_UInt16));
-
-    /* Calculate the number of needed NetworkMessages. The previous allocated
-     * DataSetMessage array is filled from left for combined DSM messages and
-     * from the right for single DSM.
-     *
-     *     Allocated DSM Array
-     *    +----------------------------+
-     *    |DSM1||DSM2||DSM3||DSM4||DSM5|
-     *    +--+----+-----+-----+-----+--+
-     *       |    |     |     |     |
-     *       |    |     |     |     |
-     *    +--v----v-----v-----v--+--v--+
-     *    |    NM1        || NM2 | NM3 |
-     *    +----------------------+-----+
-     *    NetworkMessages
-     */
-    UA_UInt32 combinedNetworkMessageCount = 0, singleNetworkMessagesCount = 0;
+    /* It is possible to put several DataSetMessages into one NetworkMessage.
+     * But only if they do not contain promoted fields. NM with only DSM are
+     * sent out right away. The others are kept in a buffer for "batching". */
+    size_t dsmCount = 0;
     UA_DataSetWriter *dsw;
+    UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
+    UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
     LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
         /* Find the dataset */
         UA_PublishedDataSet *pds =
@@ -1121,66 +1105,51 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
         }
 
         /* If promoted fields are contained in the PublishedDataSet, then this
-         * DSM must encapsulated in one NM */
-        UA_UInt32 *count = &combinedNetworkMessageCount;
-        UA_UInt32 pos = combinedNetworkMessageCount;
-        if(pds->promotedFieldsCount > 0) {
-            count = &singleNetworkMessagesCount;
-            pos = (writerGroup->writersCount - 1) - singleNetworkMessagesCount;
+         * DSM must encapsulated in one NM. Send it right away. */
+        if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
+            UA_DataSetMessage dsm;
+            UA_StatusCode res = UA_DataSetWriter_generateDataSetMessage(server, &dsm, dsw);
+            if(res != UA_STATUSCODE_GOOD) {
+                UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                             "PubSub Publish: DataSetMessage creation failed");
+                continue;
+            }
+
+            res = sendNetworkMessage(connection, &dsm, &dsw->config.dataSetWriterId, 1);
+            if(res != UA_STATUSCODE_GOOD)
+                UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                             "PubSub Publish: Could not send a NetworkMessage");
+            UA_DataSetMessage_free(&dsm);
+            continue;
         }
 
-        UA_StatusCode res =
-            UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[pos], dsw);
-        if(res != UA_STATUSCODE_GOOD) {
+        UA_StatusCode res2 =
+            UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
+        if(res2 != UA_STATUSCODE_GOOD) {
             UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
                          "PubSub Publish: DataSetMessage creation failed");
             continue;
         }
 
-        dsWriterIds[pos] = dsw->config.dataSetWriterId;
-        *count = *count + 1;
+        dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
+        dsmCount++;
     }
 
-    UA_UInt32 networkMessageCount = singleNetworkMessagesCount;
-    if(combinedNetworkMessageCount != 0) {
-        combinedNetworkMessageCount = (UA_UInt16)
-            (combinedNetworkMessageCount / maxDSM +
-             (combinedNetworkMessageCount % maxDSM) == 0 ? 0 : 1);
-        networkMessageCount += combinedNetworkMessageCount;
-    }
-
-    /* Prepare, encode and send the NetworkMessages */
-    UA_UInt32 currentDSMPosition = 0;
-    for(UA_UInt32 i = 0; i < networkMessageCount; i++) {
-        UA_Byte dsmCount = 0;
-        if(i < (networkMessageCount-singleNetworkMessagesCount)){
-            /* Create combined NetworkMessages */
-            if(combinedNetworkMessageCount - (i * maxDSM)) {
-                if(combinedNetworkMessageCount == 1) {
-                    dsmCount = (UA_Byte) ((writerGroup->writersCount) - singleNetworkMessagesCount);
-                    currentDSMPosition = 0;
-                } else {
-                    dsmCount = maxDSM;
-                    currentDSMPosition = i * maxDSM;
-                }
-            } else {
-                currentDSMPosition = i * maxDSM;
-                dsmCount = (UA_Byte) (currentDSMPosition - ((i - 1) * maxDSM)); //attention cast from uint32 to byte
-            }
-        } else {
-            /* Create single NetworkMessages (1 DSM per NM) */
-            dsmCount = 1;
-            currentDSMPosition = (UA_UInt32)
-                combinedNetworkMessageCount + (i - combinedNetworkMessageCount / maxDSM
-                                               + (combinedNetworkMessageCount % maxDSM) == 0 ? 0 : 1);
-        }
-
-        sendNetworkMessage(connection, &dsmStore[currentDSMPosition],
-                           &dsWriterIds[currentDSMPosition], dsmCount);
+    /* Send the NetworkMessages with batched DataSetMessages */
+    size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
+    for(UA_UInt32 i = 0; i < nmCount; i++) {
+        UA_Byte nmDsmCount = maxDSM;
+        if(i == nmCount - 1)
+            nmDsmCount = (UA_Byte)dsmCount % maxDSM;
+        UA_StatusCode res3 = sendNetworkMessage(connection, &dsmStore[i * maxDSM],
+                                                &dsWriterIds[i * maxDSM], nmDsmCount);
+        if(res3 != UA_STATUSCODE_GOOD)
+            UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                         "PubSub Publish: Sending a NetworkMessage failed");
     }
 
-    /* Clean up */
-    for(size_t i = 0; i < writerGroup->writersCount; i++)
+    /* Clean up DSM */
+    for(size_t i = 0; i < dsmCount; i++)
         UA_DataSetMessage_free(&dsmStore[i]);
 }