Sfoglia il codice sorgente

PubSub: Pull out encoding / sending of NetworkMessages into a separate function

Julius Pfrommer 6 anni fa
parent
commit
e50616da7b
1 ha cambiato i file con 53 aggiunte e 50 eliminazioni
  1. 53 50
      src/pubsub/ua_pubsub.c

+ 53 - 50
src/pubsub/ua_pubsub.c

@@ -993,6 +993,48 @@ UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *da
     return UA_STATUSCODE_GOOD;
 }
 
+static UA_StatusCode
+sendNetworkMessage(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
+                   UA_UInt16 *writerIds, UA_Byte dsmCount) {
+    UA_NetworkMessage nm;
+    memset(&nm, 0, sizeof(UA_NetworkMessage));
+    nm.version = 1;
+    nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
+    nm.payloadHeaderEnabled = true;
+
+    /* Compute the length of the dsm separately for the header */
+    UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
+    for(UA_Byte i = 0; i < dsmCount; i++)
+        dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
+
+    nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
+    nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
+    nm.payload.dataSetPayload.sizes = dsmLengths;
+    nm.payload.dataSetPayload.dataSetMessages = dsm;
+
+    /* Allocate the buffer */
+    UA_ByteString buf;
+    size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
+    UA_StatusCode retval = UA_ByteString_allocBuffer(&buf, msgSize);
+    if(retval != UA_STATUSCODE_GOOD)
+        return retval;
+        
+    /* Encode the message */
+    UA_Byte *bufPos = buf.data;
+    memset(bufPos, 0, msgSize);
+    const UA_Byte *bufEnd = &buf.data[buf.length];
+    retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
+    if(retval != UA_STATUSCODE_GOOD) {
+        UA_ByteString_deleteMembers(&buf);
+        return retval;
+    }
+
+    /* Send the prepared messages */
+    retval = connection->channel->send(connection->channel, NULL, &buf);
+    UA_ByteString_deleteMembers(&buf);
+    return retval;
+}
+
 /* This callback triggers the collection and publish of NetworkMessages and the
  * contained DataSetMessages. */
 void
@@ -1035,8 +1077,6 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
      * stack-memory where to put the current messages. */
     UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
     memset(dsmStore, 0, sizeof(UA_DataSetMessage) * writerGroup->writersCount);
-    UA_STACKARRAY(UA_UInt16, dsmSizes, writerGroup->writersCount);
-    memset(dsmSizes, 0, writerGroup->writersCount * sizeof(UA_UInt16));
     UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
     memset(dsWriterIds, 0, writerGroup->writersCount * sizeof(UA_UInt16));
 
@@ -1085,7 +1125,6 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
         }
 
         dsWriterIds[pos] = dsw->config.dataSetWriterId;
-        dsmSizes[pos] = (UA_UInt16) UA_DataSetMessage_calcSizeBinary(&dsmStore[pos]);
         *count = *count + 1;
     }
 
@@ -1097,75 +1136,39 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
         networkMessageCount += combinedNetworkMessageCount;
     }
 
-    /* Nothing to do */
-    if(networkMessageCount == 0) {
-        for(size_t i = 0; i < writerGroup->writersCount; i++)
-            UA_DataSetMessage_free(&dsmStore[i]);
-        return;
-    }
-
     /* Prepare, encode and send the NetworkMessages */
     UA_UInt32 currentDSMPosition = 0;
     for(UA_UInt32 i = 0; i < networkMessageCount; i++) {
-        UA_NetworkMessage nm;
-        memset(&nm, 0, sizeof(UA_NetworkMessage));
-        nm.version = 1;
-        nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
-        nm.payloadHeaderEnabled = true;
-
+        UA_Byte dsmCount = 0;
         if(i < (networkMessageCount-singleNetworkMessagesCount)){
             /* Create combined NetworkMessages */
             if(combinedNetworkMessageCount - (i * maxDSM)) {
                 if(combinedNetworkMessageCount == 1) {
-                    nm.payloadHeader.dataSetPayloadHeader.count = (UA_Byte) ((writerGroup->writersCount) - singleNetworkMessagesCount);
+                    dsmCount = (UA_Byte) ((writerGroup->writersCount) - singleNetworkMessagesCount);
                     currentDSMPosition = 0;
                 } else {
-                    nm.payloadHeader.dataSetPayloadHeader.count = maxDSM;
+                    dsmCount = maxDSM;
                     currentDSMPosition = i * maxDSM;
                 }
-                nm.payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
-                nm.payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
-                nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = &dsWriterIds[currentDSMPosition];
             } else {
                 currentDSMPosition = i * maxDSM;
-                nm.payloadHeader.dataSetPayloadHeader.count = (UA_Byte) (currentDSMPosition - ((i - 1) * maxDSM)); //attention cast from uint32 to byte
-                nm.payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
-                nm.payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
-                nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = &dsWriterIds[currentDSMPosition];
+                dsmCount = (UA_Byte) (currentDSMPosition - ((i - 1) * maxDSM)); //attention cast from uint32 to byte
             }
         } else {
             /* Create single NetworkMessages (1 DSM per NM) */
-            nm.payloadHeader.dataSetPayloadHeader.count = 1;
+            dsmCount = 1;
             currentDSMPosition = (UA_UInt32)
                 combinedNetworkMessageCount + (i - combinedNetworkMessageCount / maxDSM
                                                + (combinedNetworkMessageCount % maxDSM) == 0 ? 0 : 1);
-            nm.payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
-            nm.payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
-            nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = &dsWriterIds[currentDSMPosition];
         }
 
-        /* Send the prepared messages */
-        UA_ByteString buf;
-        size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
-        if(UA_ByteString_allocBuffer(&buf, msgSize) == UA_STATUSCODE_GOOD) {
-            UA_Byte *bufPos = buf.data;
-            memset(bufPos, 0, msgSize);
-            const UA_Byte *bufEnd = &buf.data[buf.length];
-            if(UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd) == UA_STATUSCODE_GOOD)
-                connection->channel->send(connection->channel, NULL, &buf);
-            else
-                UA_ByteString_deleteMembers(&buf);
-        }
-
-        /* The stack allocated sizes and dataSetWriterIds field must be set to
-         * NULL to prevent invalid free */
-        UA_ByteString_deleteMembers(&buf);
-        nm.payload.dataSetPayload.sizes = NULL;
-        nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = NULL;
-        nm.payload.dataSetPayload.dataSetMessages = NULL;
-        nm.payloadHeader.dataSetPayloadHeader.count = 0;
-        UA_NetworkMessage_deleteMembers(&nm);
+        sendNetworkMessage(connection, &dsmStore[currentDSMPosition],
+                           &dsWriterIds[currentDSMPosition], dsmCount);
     }
+
+    /* Clean up */
+    for(size_t i = 0; i < writerGroup->writersCount; i++)
+        UA_DataSetMessage_free(&dsmStore[i]);
 }
 
 /* Add new publishCallback. The first execution is triggered directly after