Browse Source

feat(pubsub) added code review improvements and fixes

Andreas Ebner 4 years ago
parent
commit
9842cf1a6b

+ 1 - 5
src/pubsub/ua_pubsub.h

@@ -114,10 +114,6 @@ UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier);
 UA_StatusCode
 UA_DataSetWriter_setPubSubState(UA_Server *server, UA_PubSubState state, UA_DataSetWriter *dataSetWriter);
 
-/**********************************************/
-/*          Network Message Offsets           */
-/**********************************************/
-
 /**********************************************/
 /*               WriterGroup                  */
 /**********************************************/
@@ -135,7 +131,7 @@ struct UA_WriterGroup{
     UA_Boolean publishCallbackIsRegistered;
     UA_PubSubState state;
     UA_NetworkMessageOffsetBuffer bufferedMessage;
-    UA_UInt16 sequenceNumber;
+    UA_UInt16 sequenceNumber; /* Increased after every succressuly sent message */
 };
 
 UA_StatusCode

+ 54 - 40
src/pubsub/ua_pubsub_networkmessage.c

@@ -684,6 +684,17 @@ UA_NetworkMessage_decodeBinary(const UA_ByteString *src, size_t *offset, UA_Netw
     return retval;
 }
 
+static UA_Boolean
+increaseOffsetArray(UA_NetworkMessageOffsetBuffer *offsetBuffer) {
+    UA_NetworkMessageOffset *tmpOffsets = (UA_NetworkMessageOffset *)
+        UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffset) * (offsetBuffer->offsetsSize + (size_t)1));
+    if(!tmpOffsets)
+        return false;
+    offsetBuffer->offsets = tmpOffsets;
+    offsetBuffer->offsetsSize++;
+    return true;
+}
+
 size_t
 UA_NetworkMessage_calcSizeBinary(UA_NetworkMessage *p, UA_NetworkMessageOffsetBuffer *offsetBuffer) {
     size_t retval = 0;
@@ -734,13 +745,15 @@ UA_NetworkMessage_calcSizeBinary(UA_NetworkMessage *p, UA_NetworkMessageOffsetBu
 
         if(p->groupHeader.networkMessageNumberEnabled) {
             if(offsetBuffer){
-                offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffset) * (offsetBuffer->offsetsSize + (size_t)1));
-                offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
-                offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value = UA_DataValue_new();
-                UA_DataValue_init(offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value);
-                UA_Variant_setScalar(&offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value->value,
+                size_t pos = offsetBuffer->offsetsSize;
+                if(!increaseOffsetArray(offsetBuffer))
+                    return 0;
+                offsetBuffer->offsets[pos].offset = size;
+                offsetBuffer->offsets[pos].offsetData.value.value = UA_DataValue_new();
+                UA_DataValue_init(offsetBuffer->offsets[pos].offsetData.value.value);
+                UA_Variant_setScalar(&offsetBuffer->offsets[pos].offsetData.value.value->value,
                                      &p->groupHeader.sequenceNumber, &UA_TYPES[UA_TYPES_UINT16]);
-                offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGE_SEQUENCENUMBER;
+                offsetBuffer->offsets[pos].contentType = UA_PUBSUB_OFFSETTYPE_NETWORKMESSAGE_SEQUENCENUMBER;
             }
             size += UA_UInt16_calcSizeBinary(&p->groupHeader.networkMessageNumber);
         }
@@ -767,20 +780,22 @@ UA_NetworkMessage_calcSizeBinary(UA_NetworkMessage *p, UA_NetworkMessageOffsetBu
 
     if(p->timestampEnabled) {
         if(offsetBuffer){
-            offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets, sizeof(UA_NetworkMessageOffset) * (offsetBuffer->offsetsSize + 1));
-            offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
-            offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP;
+            size_t pos = offsetBuffer->offsetsSize;
+            if(!increaseOffsetArray(offsetBuffer))
+                return 0;
+            offsetBuffer->offsets[pos].offset = size;
+            offsetBuffer->offsets[pos].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP;
         }
         size += UA_DateTime_calcSizeBinary(&p->timestamp);
     }
 
     if(p->picosecondsEnabled){
         if (offsetBuffer) {
-            offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets,
-                                                                           sizeof(UA_NetworkMessageOffset) *
-                                                                           (offsetBuffer->offsetsSize + 1));
-            offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
-            offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP_PICOSECONDS;
+            size_t pos = offsetBuffer->offsetsSize;
+            if(!increaseOffsetArray(offsetBuffer))
+                return 0;
+            offsetBuffer->offsets[pos].offset = size;
+            offsetBuffer->offsets[pos].contentType = UA_PUBSUB_OFFSETTYPE_TIMESTAMP_PICOSECONDS;
         }
         size += UA_UInt16_calcSizeBinary(&p->picoseconds);
     }
@@ -1286,15 +1301,15 @@ UA_DataSetMessage_calcSizeBinary(UA_DataSetMessage* p, UA_NetworkMessageOffsetBu
 
     if(p->header.dataSetMessageSequenceNrEnabled) {
         if (offsetBuffer) {
-            offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets,
-                                                                           sizeof(UA_NetworkMessageOffset) *
-                                                                           (offsetBuffer->offsetsSize + 1));
-            offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
-            offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value = UA_DataValue_new();
-            UA_DataValue_init(offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value);
-            UA_Variant_setScalar(&offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value->value,
+            size_t pos = offsetBuffer->offsetsSize;
+            if(!increaseOffsetArray(offsetBuffer))
+                return 0;
+            offsetBuffer->offsets[pos].offset = size;
+            offsetBuffer->offsets[pos].offsetData.value.value = UA_DataValue_new();
+            UA_DataValue_init(offsetBuffer->offsets[pos].offsetData.value.value);
+            UA_Variant_setScalar(&offsetBuffer->offsets[pos].offsetData.value.value->value,
                                  &p->header.dataSetMessageSequenceNr, &UA_TYPES[UA_TYPES_UINT16]);
-            offsetBuffer->offsets[offsetBuffer->offsetsSize++].contentType = UA_PUBSUB_OFFSETTYPE_DATASETMESSAGE_SEQUENCENUMBER;
+            offsetBuffer->offsets[pos].contentType = UA_PUBSUB_OFFSETTYPE_DATASETMESSAGE_SEQUENCENUMBER;
         }
         size += UA_UInt16_calcSizeBinary(&p->header.dataSetMessageSequenceNr);
     }
@@ -1323,18 +1338,17 @@ UA_DataSetMessage_calcSizeBinary(UA_DataSetMessage* p, UA_NetworkMessageOffsetBu
         if(p->header.fieldEncoding == UA_FIELDENCODING_VARIANT) {
             for (UA_UInt16 i = 0; i < p->data.keyFrameData.fieldCount; i++){
                 if (offsetBuffer) {
-                    offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets,
-                                                                                   sizeof(UA_NetworkMessageOffset) *
-                                                                                   (offsetBuffer->offsetsSize + 1));
-                    offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
-                    offsetBuffer->offsets[offsetBuffer->offsetsSize].contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT;
+                    size_t pos = offsetBuffer->offsetsSize;
+                    if(!increaseOffsetArray(offsetBuffer))
+                        return 0;
+                    offsetBuffer->offsets[pos].offset = size;
+                    offsetBuffer->offsets[pos].contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT;
                     //TODO check value source and alloc!
-                    offsetBuffer->offsets[offsetBuffer->offsetsSize].offsetData.value.value = UA_DataValue_new();
-                    UA_Variant_setScalar(
-                            &offsetBuffer->offsets[offsetBuffer->offsetsSize++].offsetData.value.value->value,
-                            p->data.keyFrameData.dataSetFields[i].value.data,
-                            p->data.keyFrameData.dataSetFields[i].value.type);
-                    //offsetBuffer->offsets[offsetBuffer->offsetsSize++].offsetData.value.value->value = p->data.keyFrameData.dataSetFields->value;
+                    offsetBuffer->offsets[pos].offsetData.value.value = UA_DataValue_new();
+                    UA_Variant_setScalar(&offsetBuffer->offsets[pos].offsetData.value.value->value,
+                                         p->data.keyFrameData.dataSetFields[i].value.data,
+                                         p->data.keyFrameData.dataSetFields[i].value.type);
+                    //offsetBuffer->offsets[pos].offsetData.value.value->value = p->data.keyFrameData.dataSetFields->value;
                 }
                 size += UA_calcSizeBinary(&p->data.keyFrameData.dataSetFields[i].value, &UA_TYPES[UA_TYPES_VARIANT]);
             }
@@ -1343,13 +1357,13 @@ UA_DataSetMessage_calcSizeBinary(UA_DataSetMessage* p, UA_NetworkMessageOffsetBu
         } else if(p->header.fieldEncoding == UA_FIELDENCODING_DATAVALUE) {
             for (UA_UInt16 i = 0; i < p->data.keyFrameData.fieldCount; i++) {
                 if (offsetBuffer) {
-                    offsetBuffer->offsets = (UA_NetworkMessageOffset *) UA_realloc(offsetBuffer->offsets,
-                                                                                   sizeof(UA_NetworkMessageOffset) *
-                                                                                   (offsetBuffer->offsetsSize + 1));
-                    offsetBuffer->offsets[offsetBuffer->offsetsSize].offset = size;
-                    offsetBuffer->offsets[offsetBuffer->offsetsSize].contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_DATAVALUE;
+                    size_t pos = offsetBuffer->offsetsSize;
+                    if(!increaseOffsetArray(offsetBuffer))
+                        return 0;
+                    offsetBuffer->offsets[pos].offset = size;
+                    offsetBuffer->offsets[pos].contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_DATAVALUE;
                     //TODO check value source, change implementation to 'variant'
-                    offsetBuffer->offsets[offsetBuffer->offsetsSize++].offsetData.value.value = p->data.keyFrameData.dataSetFields;
+                    offsetBuffer->offsets[pos].offsetData.value.value = p->data.keyFrameData.dataSetFields;
                 }
                 size += UA_calcSizeBinary(&p->data.keyFrameData.dataSetFields[i], &UA_TYPES[UA_TYPES_DATAVALUE]);
             }
@@ -1400,4 +1414,4 @@ void UA_DataSetMessage_free(const UA_DataSetMessage* p) {
         }
     }
 }
-#endif /* UA_ENABLE_PUBSUB */
+#endif /* UA_ENABLE_PUBSUB */

+ 4 - 0
src/pubsub/ua_pubsub_networkmessage.h

@@ -65,6 +65,10 @@ UA_DataSetMessageHeader_decodeBinary(const UA_ByteString *src, size_t *offset,
 size_t
 UA_DataSetMessageHeader_calcSizeBinary(const UA_DataSetMessageHeader* p);
 
+/**********************************************/
+/*          Network Message Offsets           */
+/**********************************************/
+
 /* Offsets for buffered messages in the PubSub fast path. */
 typedef enum {
     UA_PUBSUB_OFFSETTYPE_DATASETMESSAGE_SEQUENCENUMBER,

+ 35 - 29
src/pubsub/ua_pubsub_writer.c

@@ -1839,11 +1839,13 @@ generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
 
 static UA_StatusCode
 sendBufferedNetworkMessage(UA_Server *server, UA_PubSubConnection *connection,
-        UA_NetworkMessageOffsetBuffer *buffer, UA_ExtensionObject *transportSettings) {
+                           UA_NetworkMessageOffsetBuffer *buffer,
+                           UA_ExtensionObject *transportSettings) {
     if(UA_NetworkMessage_updateBufferedMessage(buffer) != UA_STATUSCODE_GOOD)
-        UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "PubSub sending. Unknown field type.");
-    UA_StatusCode retval = connection->channel->send(connection->channel, transportSettings, &buffer->buffer);
-    return retval;
+        UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                     "PubSub sending. Unknown field type.");
+    return connection->channel->send(connection->channel,
+                                     transportSettings, &buffer->buffer);
 }
 
 static UA_StatusCode
@@ -1906,12 +1908,6 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
     if(writerGroup->writersCount <= 0)
         return;
 
-    if(!writerGroup->linkedConnectionPtr){
-        UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
-                       "Publish failed: Invalid reference to PubSubConnection");
-        return;
-    }
-
     /* Binary or Json encoding?  */
     if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
        writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
@@ -1929,8 +1925,11 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
     }
 
     if(writerGroup->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE) {
-        sendBufferedNetworkMessage(server, connection, &writerGroup->bufferedMessage, &writerGroup->config.transportSettings);
-        writerGroup->sequenceNumber++;
+        UA_StatusCode res =
+            sendBufferedNetworkMessage(server, connection, &writerGroup->bufferedMessage,
+                                       &writerGroup->config.transportSettings);
+        if(res == UA_STATUSCODE_GOOD)
+            writerGroup->sequenceNumber++;
         return;
     }
 
@@ -1977,20 +1976,24 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
                                          &dsw->config.dataSetWriterId, 1,
                                          &writerGroup->config.messageSettings,
                                          &writerGroup->config.transportSettings);
-            }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
+            } else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON) {
                 res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
-                        &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
+                                             &dsw->config.dataSetWriterId, 1,
+                                             &writerGroup->config.transportSettings);
             }
-           if(res != UA_STATUSCODE_GOOD)
+
+            if(res != UA_STATUSCODE_GOOD)
                 UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
                                "PubSub Publish: Could not send a NetworkMessage");
-           if(writerGroup->config.rtLevel == UA_PUBSUB_RT_DIRECT_VALUE_ACCESS){
-               for (size_t i = 0; i < dsmStore[dsmCount].data.keyFrameData.fieldCount; ++i) {
-                   dsmStore[dsmCount].data.keyFrameData.dataSetFields[i].value.data = NULL;
-               }
-           }
-           UA_DataSetMessage_free(&dsmStore[dsmCount]);
-           continue;
+
+            if(writerGroup->config.rtLevel == UA_PUBSUB_RT_DIRECT_VALUE_ACCESS) {
+                for(size_t i = 0; i < dsmStore[dsmCount].data.keyFrameData.fieldCount; ++i) {
+                    dsmStore[dsmCount].data.keyFrameData.dataSetFields[i].value.data = NULL;
+                }
+            }
+
+            UA_DataSetMessage_free(&dsmStore[dsmCount]);
+            continue;
         }
 
         dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
@@ -2003,22 +2006,25 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
         UA_Byte nmDsmCount = maxDSM;
         if(i == nmCount - 1  && (dsmCount % maxDSM))
             nmDsmCount = (UA_Byte)dsmCount % maxDSM;
-
         UA_StatusCode res3 = UA_STATUSCODE_GOOD;
         if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
             res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
                                       &dsWriterIds[i * maxDSM], nmDsmCount,
                                       &writerGroup->config.messageSettings,
                                       &writerGroup->config.transportSettings);
-            writerGroup->sequenceNumber++;
-        }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
+        } else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
             res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
-                    &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
-            writerGroup->sequenceNumber++;
+                                          &dsWriterIds[i * maxDSM], nmDsmCount,
+                                          &writerGroup->config.transportSettings);
         }
-        if(res3 != UA_STATUSCODE_GOOD)
+
+        if(res3 != UA_STATUSCODE_GOOD) {
             UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
                            "PubSub Publish: Sending a NetworkMessage failed");
+            continue;
+        }
+
+        writerGroup->sequenceNumber++;
     }
 
     /* Clean up DSM */
@@ -2043,4 +2049,4 @@ UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup
     return retval;
 }
 
-#endif /* UA_ENABLE_PUBSUB */
+#endif /* UA_ENABLE_PUBSUB */