Преглед на файлове

extended freeze logic with the creation of DSM (if RT is enabled), moved structures to UADP Layer and created offset calculation skeleton

Andreas Ebner преди 5 години
родител
ревизия
4bb65bd5e1
променени са 4 файла, в които са добавени 75 реда и са изтрити 29 реда
  1. 0 27
      src/pubsub/ua_pubsub.h
  2. 6 0
      src/pubsub/ua_pubsub_networkmessage.c
  3. 31 0
      src/pubsub/ua_pubsub_networkmessage.h
  4. 38 2
      src/pubsub/ua_pubsub_writer.c

+ 0 - 27
src/pubsub/ua_pubsub.h

@@ -117,33 +117,6 @@ UA_DataSetWriter_setPubSubState(UA_Server *server, UA_PubSubState state, UA_Data
 /*          Network Message Offsets           */
 /**********************************************/
 
-/* Offsets for buffered messages in the PubSub fast path.
- * TODO: Implement the generation of the offsets */
-typedef enum {
-	UA_PUBSUB_OFFSETTYPE_SEQUENCENUMBER,
-    UA_PUBSUB_OFFSETTYPE_TIMESTAMP,     /* source pointer */
-	UA_PUBSUB_OFFSETTYPE_TIMESTAMP_NOW, /* no source */
-    UA_PUBSUB_OFFSETTYPE_PAYLOAD_DATAVALUE,
-	UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT,
-	UA_PUBSUB_OFFSETTYPE_PAYLOAD_RAW
-    /* Add more offset types as needed */
-} UA_NetworkMessageOffsetType;
-
-typedef struct {
-    UA_NetworkMessageOffsetType contentType;
-	union {
-		UA_DateTime *timestamp;
-		UA_DataValue *value;
-	} offsetData;
-	size_t offset;
-} UA_NetworkMessageOffset;
-
-typedef struct {
-    UA_ByteString buffer; /* The precomputed message buffer */
-    UA_NetworkMessageOffset *offsets; /* Offsets for changes in the message buffer */
-    size_t offsetsSize;
-} UA_NetworkMessageOffsetBuffer;
-
 /**********************************************/
 /*               WriterGroup                  */
 /**********************************************/

+ 6 - 0
src/pubsub/ua_pubsub_networkmessage.c

@@ -51,6 +51,12 @@ static UA_Boolean UA_NetworkMessage_ExtendedFlags1Enabled(const UA_NetworkMessag
 static UA_Boolean UA_NetworkMessage_ExtendedFlags2Enabled(const UA_NetworkMessage* src);
 static UA_Boolean UA_DataSetMessageHeader_DataSetFlags2Enabled(const UA_DataSetMessageHeader* src);
 
+UA_StatusCode
+UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer, const UA_NetworkMessage* src){
+    return UA_STATUSCODE_BADNOTIMPLEMENTED;
+}
+
+
 UA_StatusCode
 UA_NetworkMessage_encodeBinary(const UA_NetworkMessage* src, UA_Byte **bufPos,
                                const UA_Byte *bufEnd) {

+ 31 - 0
src/pubsub/ua_pubsub_networkmessage.h

@@ -64,6 +64,33 @@ UA_DataSetMessageHeader_decodeBinary(const UA_ByteString *src, size_t *offset,
 size_t
 UA_DataSetMessageHeader_calcSizeBinary(const UA_DataSetMessageHeader* p);
 
+/* Offsets for buffered messages in the PubSub fast path.
+ * TODO: Implement the generation of the offsets */
+typedef enum {
+    UA_PUBSUB_OFFSETTYPE_SEQUENCENUMBER,
+    UA_PUBSUB_OFFSETTYPE_TIMESTAMP,     /* source pointer */
+    UA_PUBSUB_OFFSETTYPE_TIMESTAMP_NOW, /* no source */
+    UA_PUBSUB_OFFSETTYPE_PAYLOAD_DATAVALUE,
+    UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT,
+    UA_PUBSUB_OFFSETTYPE_PAYLOAD_RAW
+    /* Add more offset types as needed */
+} UA_NetworkMessageOffsetType;
+
+typedef struct {
+    UA_NetworkMessageOffsetType contentType;
+    union {
+        UA_DateTime *timestamp;
+        UA_DataValue *value;
+    } offsetData;
+    size_t offset;
+} UA_NetworkMessageOffset;
+
+typedef struct {
+    UA_ByteString buffer; /* The precomputed message buffer */
+    UA_NetworkMessageOffset *offsets; /* Offsets for changes in the message buffer */
+    size_t offsetsSize;
+} UA_NetworkMessageOffsetBuffer;
+
 /**
  * DataSetMessage
  * ^^^^^^^^^^^^^^ */
@@ -202,6 +229,10 @@ typedef struct {
     UA_ByteString signature;
 } UA_NetworkMessage;
 
+UA_StatusCode
+UA_NetworkMessage_generateOffsetBuffer(UA_NetworkMessageOffsetBuffer *offsetBuffer,
+                                       const UA_NetworkMessage* src);
+
 UA_StatusCode
 UA_NetworkMessage_encodeBinary(const UA_NetworkMessage* src,
                                UA_Byte **bufPos, const UA_Byte *bufEnd);

+ 38 - 2
src/pubsub/ua_pubsub_writer.c

@@ -240,6 +240,7 @@ UA_Server_freezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writ
     //WriterGroup freeze
     wg->config.configurationFrozen = UA_TRUE;
     //DataSetWriter freeze
+    size_t dsmCount = 0;
     UA_DataSetWriter *dataSetWriter;
     LIST_FOREACH(dataSetWriter, &wg->writers, listEntry){
         dataSetWriter->config.configurationFrozen = UA_TRUE;
@@ -256,9 +257,44 @@ UA_Server_freezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writ
     if(wg->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE){
         //UA_NetworkMessage_decodeBinary(, )
         //UA_NetworkMessage_calculateBufferAndOffets(server, w)
+
+        if(wg->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP) {
+            UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                           "PubSub RT Fail: Non RT capable encoding.");
+            return UA_STATUSCODE_BADNOTSUPPORTED;
+        }
+        //TODO Clarify: should we only allow = maxEncapsulatedDataSetMessageCount == 1 with RT?
+        //TODO Clarify: Behaviour if the finale size is more than MTU
+        //TODO Check if there are not allowed promoted fields contained
+        //generate data set messages
+        UA_STACKARRAY(UA_UInt16, dsWriterIds, wg->writersCount);
+        UA_STACKARRAY(UA_DataSetMessage, dsmStore, wg->writersCount);
+        UA_DataSetWriter *dsw;
+        LIST_FOREACH(dsw, &wg->writers, listEntry) {
+            /* Find the dataset */
+            UA_PublishedDataSet *pds =
+                    UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
+            if(!pds) {
+                UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                               "PubSub Publish: PublishedDataSet not found");
+                continue;
+            }
+            //TODO Check if all fields have an fixed size and static value source
+            /* Generate the DSM */
+            UA_StatusCode res =
+                    UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
+            if(res != UA_STATUSCODE_GOOD) {
+                UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
+                               "PubSub RT Offset calculation: DataSetMessage buffering failed");
+                continue;
+            }
+            dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
+            dsmCount++;
+        }
         UA_NetworkMessage networkMessage;
-        generateNetworkMessage(pubSubConnection, wg, NULL, NULL, 0,
-                &wg->config.messageSettings, &wg->config.transportSettings, &networkMessage);
+        generateNetworkMessage(pubSubConnection, wg, dsmStore, dsWriterIds, (UA_Byte) dsmCount,
+                               &wg->config.messageSettings, &wg->config.transportSettings, &networkMessage);
+        UA_NetworkMessage_generateOffsetBuffer(&wg->bufferedMessage, &networkMessage);
     }
     return UA_STATUSCODE_GOOD;
 }