|
@@ -2,7 +2,7 @@
|
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
*
|
|
|
- * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
|
|
|
+ * Copyright (c) 2017-2019 Fraunhofer IOSB (Author: Andreas Ebner)
|
|
|
* Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
|
|
|
* Copyright (c) 2019 Kalycito Infotech Private Limited
|
|
|
*/
|
|
@@ -13,6 +13,7 @@
|
|
|
#ifdef UA_ENABLE_PUBSUB /* conditional compilation */
|
|
|
|
|
|
#include "ua_pubsub.h"
|
|
|
+#include "ua_pubsub_networkmessage.h"
|
|
|
|
|
|
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
|
|
|
#include "ua_pubsub_ns0.h"
|
|
@@ -29,6 +30,15 @@ static void
|
|
|
UA_WriterGroup_clear(UA_Server *server, UA_WriterGroup *writerGroup);
|
|
|
static void
|
|
|
UA_DataSetField_clear(UA_DataSetField *field);
|
|
|
+static UA_StatusCode
|
|
|
+generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
|
|
|
+ UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
|
|
|
+ UA_ExtensionObject *messageSettings,
|
|
|
+ UA_ExtensionObject *transportSettings,
|
|
|
+ UA_NetworkMessage *networkMessage);
|
|
|
+static UA_StatusCode
|
|
|
+UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
|
|
|
+ UA_DataSetWriter *dataSetWriter);
|
|
|
|
|
|
/**********************************************/
|
|
|
/* Connection */
|
|
@@ -79,9 +89,10 @@ UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connectio
|
|
|
|
|
|
UA_PubSubConnection *
|
|
|
UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
|
|
|
- for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
|
|
|
- if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
|
|
|
- return &server->pubSubManager.connections[i];
|
|
|
+ UA_PubSubConnection *pubSubConnection;
|
|
|
+ TAILQ_FOREACH(pubSubConnection, &server->pubSubManager.connections, listEntry){
|
|
|
+ if(UA_NodeId_equal(&connectionIdentifier, &pubSubConnection->identifier)){
|
|
|
+ return pubSubConnection;
|
|
|
}
|
|
|
}
|
|
|
return NULL;
|
|
@@ -162,7 +173,7 @@ UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
|
|
|
if(!newWriterGroup)
|
|
|
return UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
|
|
|
- newWriterGroup->linkedConnection = currentConnectionContext->identifier;
|
|
|
+ newWriterGroup->linkedConnection = currentConnectionContext;
|
|
|
UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
|
|
|
if(writerGroupIdentifier){
|
|
|
UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
|
|
@@ -198,8 +209,7 @@ UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
|
|
|
return UA_STATUSCODE_BADCONFIGURATIONERROR;
|
|
|
}
|
|
|
|
|
|
- UA_PubSubConnection *connection =
|
|
|
- UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
|
|
|
+ UA_PubSubConnection *connection = wg->linkedConnection;
|
|
|
if(!connection)
|
|
|
return UA_STATUSCODE_BADNOTFOUND;
|
|
|
|
|
@@ -228,7 +238,7 @@ UA_Server_freezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writ
|
|
|
if(!wg)
|
|
|
return UA_STATUSCODE_BADNOTFOUND;
|
|
|
//PubSubConnection freezeCounter++
|
|
|
- UA_PubSubConnection *pubSubConnection = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
|
|
|
+ UA_PubSubConnection *pubSubConnection = wg->linkedConnection;;
|
|
|
pubSubConnection->configurationFreezeCounter++;
|
|
|
pubSubConnection->config->configurationFrozen = UA_TRUE;
|
|
|
//WriterGroup freeze
|
|
@@ -247,9 +257,90 @@ UA_Server_freezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writ
|
|
|
dataSetField->config.configurationFrozen = UA_TRUE;
|
|
|
}
|
|
|
}
|
|
|
- //if(wg->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE){
|
|
|
- //UA_NetworkMessage_calculateBufferAndOffets(server, w)
|
|
|
- //}
|
|
|
+ if(wg->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE){
|
|
|
+ size_t dsmCount = 0;
|
|
|
+ if(wg->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP) {
|
|
|
+ UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
+ "PubSub-RT configuration fail: Non-RT capable encoding.");
|
|
|
+ return UA_STATUSCODE_BADNOTSUPPORTED;
|
|
|
+ }
|
|
|
+ //TODO Clarify: should we only allow = maxEncapsulatedDataSetMessageCount == 1 with RT?
|
|
|
+ //TODO Clarify: Behaviour if the finale size is more than MTU
|
|
|
+ /* Generate data set messages */
|
|
|
+ UA_STACKARRAY(UA_UInt16, dsWriterIds, wg->writersCount);
|
|
|
+ UA_STACKARRAY(UA_DataSetMessage, dsmStore, wg->writersCount);
|
|
|
+ UA_DataSetWriter *dsw;
|
|
|
+ LIST_FOREACH(dsw, &wg->writers, listEntry) {
|
|
|
+ /* Find the dataset */
|
|
|
+ UA_PublishedDataSet *pds =
|
|
|
+ UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
|
|
|
+ if(!pds) {
|
|
|
+ UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
+ "PubSub Publish: PublishedDataSet not found");
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if(pds->promotedFieldsCount > 0) {
|
|
|
+ UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
+ "PubSub-RT configuration fail: PDS contains promoted fields.");
|
|
|
+ return UA_STATUSCODE_BADNOTSUPPORTED;
|
|
|
+ }
|
|
|
+ UA_DataSetField *dsf;
|
|
|
+ TAILQ_FOREACH(dsf, &pds->fields, listEntry){
|
|
|
+ if(!dsf->config.field.variable.staticValueSourceEnabled){
|
|
|
+ UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
+ "PubSub-RT configuration fail: PDS contains variables with dynamic length types.");
|
|
|
+ return UA_STATUSCODE_BADNOTSUPPORTED;
|
|
|
+ }
|
|
|
+ if((UA_NodeId_equal(&dsf->fieldMetaData.dataType, &UA_TYPES[UA_TYPES_STRING].typeId) ||
|
|
|
+ UA_NodeId_equal(&dsf->fieldMetaData.dataType, &UA_TYPES[UA_TYPES_BYTESTRING].typeId)) &&
|
|
|
+ dsf->fieldMetaData.maxStringLength == 0){
|
|
|
+ UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
+ "PubSub-RT configuration fail: PDS contains String/ByteString with dynamic length.");
|
|
|
+ return UA_STATUSCODE_BADNOTSUPPORTED;
|
|
|
+ } else if(!UA_DataType_isNumeric(UA_findDataType(&dsf->fieldMetaData.dataType))){
|
|
|
+ UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
+ "PubSub-RT configuration fail: PDS contains variable with dynamic size.");
|
|
|
+ return UA_STATUSCODE_BADNOTSUPPORTED;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /* Generate the DSM */
|
|
|
+ UA_StatusCode res =
|
|
|
+ UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
|
|
|
+ if(res != UA_STATUSCODE_GOOD) {
|
|
|
+ UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
+ "PubSub RT Offset calculation: DataSetMessage buffering failed");
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
|
|
|
+ dsmCount++;
|
|
|
+ }
|
|
|
+ UA_NetworkMessage networkMessage;
|
|
|
+ memset(&networkMessage, 0, sizeof(networkMessage));
|
|
|
+ UA_StatusCode res = generateNetworkMessage(pubSubConnection, wg, dsmStore, dsWriterIds, (UA_Byte) dsmCount,
|
|
|
+ &wg->config.messageSettings, &wg->config.transportSettings, &networkMessage);
|
|
|
+ if(res != UA_STATUSCODE_GOOD)
|
|
|
+ return UA_STATUSCODE_BADINTERNALERROR;
|
|
|
+ UA_NetworkMessage_calcSizeBinary(&networkMessage, &wg->bufferedMessage);
|
|
|
+ /* Allocate the buffer. Allocate on the stack if the buffer is small. */
|
|
|
+ UA_ByteString buf;
|
|
|
+ size_t msgSize = UA_NetworkMessage_calcSizeBinary(&networkMessage, NULL);
|
|
|
+ res = UA_ByteString_allocBuffer(&buf, msgSize);
|
|
|
+ if(res != UA_STATUSCODE_GOOD)
|
|
|
+ return UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
+ wg->bufferedMessage.buffer = buf;
|
|
|
+ const UA_Byte *bufEnd = &wg->bufferedMessage.buffer.data[wg->bufferedMessage.buffer.length];
|
|
|
+ UA_Byte *bufPos = wg->bufferedMessage.buffer.data;
|
|
|
+ UA_NetworkMessage_encodeBinary(&networkMessage, &bufPos, bufEnd);
|
|
|
+ /* Clean up DSM */
|
|
|
+ for(size_t i = 0; i < dsmCount; i++){
|
|
|
+ UA_free(dsmStore[i].data.keyFrameData.dataSetFields);
|
|
|
+#ifdef UA_ENABLE_JSON_ENCODING
|
|
|
+ UA_free(dsmStore[i].data.keyFrameData.fieldNames);
|
|
|
+#endif
|
|
|
+ }
|
|
|
+
|
|
|
+ //UA_DataSetMessage_free(&dsmStore[i]);
|
|
|
+ }
|
|
|
return UA_STATUSCODE_GOOD;
|
|
|
}
|
|
|
|
|
@@ -264,7 +355,7 @@ UA_Server_unfreezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId wr
|
|
|
// return UA_STATUSCODE_BADCONFIGURATIONERROR;
|
|
|
//}
|
|
|
//PubSubConnection freezeCounter--
|
|
|
- UA_PubSubConnection *pubSubConnection = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
|
|
|
+ UA_PubSubConnection *pubSubConnection = wg->linkedConnection;;
|
|
|
pubSubConnection->configurationFreezeCounter--;
|
|
|
if(pubSubConnection->configurationFreezeCounter == 0){
|
|
|
pubSubConnection->config->configurationFrozen = UA_FALSE;
|
|
@@ -373,9 +464,10 @@ UA_Server_getPublishedDataSetMetaData(UA_Server *server, const UA_NodeId pds, UA
|
|
|
|
|
|
UA_PublishedDataSet *
|
|
|
UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
|
|
|
- for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
|
|
|
- if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
|
|
|
- return &server->pubSubManager.publishedDataSets[i];
|
|
|
+ UA_PublishedDataSet *tmpPDS;
|
|
|
+ TAILQ_FOREACH(tmpPDS, &server->pubSubManager.publishedDataSets, listEntry){
|
|
|
+ if(UA_NodeId_equal(&tmpPDS->identifier, &identifier)){
|
|
|
+ return tmpPDS;
|
|
|
}
|
|
|
}
|
|
|
return NULL;
|
|
@@ -563,6 +655,11 @@ UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
|
|
|
result.result = UA_STATUSCODE_BADINTERNALERROR;
|
|
|
return result;
|
|
|
}
|
|
|
+ UA_DataSetField *dsf;
|
|
|
+ size_t counter = 0;
|
|
|
+ TAILQ_FOREACH(dsf, ¤tDataSet->fields, listEntry){
|
|
|
+ dsf->fieldMetaData = fieldMetaData[counter++];
|
|
|
+ }
|
|
|
result.result = retVal;
|
|
|
result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
|
|
|
result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
|
|
@@ -621,7 +718,6 @@ UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
|
|
|
UA_free(parentPublishedDataSet->dataSetMetaData.fields);
|
|
|
UA_FieldMetaData *fieldMetaData = (UA_FieldMetaData *) UA_calloc(parentPublishedDataSet->dataSetMetaData.fieldsSize,
|
|
|
sizeof(UA_FieldMetaData));
|
|
|
-
|
|
|
if(!fieldMetaData){
|
|
|
result.result = UA_STATUSCODE_BADOUTOFMEMORY;
|
|
|
return result;
|
|
@@ -694,9 +790,10 @@ UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
|
|
|
|
|
|
UA_DataSetWriter *
|
|
|
UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
|
|
|
- for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
|
|
|
+ UA_PubSubConnection *pubSubConnection;
|
|
|
+ TAILQ_FOREACH(pubSubConnection, &server->pubSubManager.connections, listEntry){
|
|
|
UA_WriterGroup *tmpWriterGroup;
|
|
|
- LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
|
|
|
+ LIST_FOREACH(tmpWriterGroup, &pubSubConnection->writerGroups, listEntry){
|
|
|
UA_DataSetWriter *tmpWriter;
|
|
|
LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
|
|
|
if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
|
|
@@ -897,9 +994,10 @@ UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdenti
|
|
|
|
|
|
UA_WriterGroup *
|
|
|
UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
|
|
|
- for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
|
|
|
+ UA_PubSubConnection *tmpConnection;
|
|
|
+ TAILQ_FOREACH(tmpConnection, &server->pubSubManager.connections, listEntry){
|
|
|
UA_WriterGroup *tmpWriterGroup;
|
|
|
- LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
|
|
|
+ LIST_FOREACH(tmpWriterGroup, &tmpConnection->writerGroups, listEntry) {
|
|
|
if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
|
|
|
return tmpWriterGroup;
|
|
|
}
|
|
@@ -929,7 +1027,15 @@ UA_WriterGroup_clear(UA_Server *server, UA_WriterGroup *writerGroup) {
|
|
|
LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
|
|
|
UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
|
|
|
}
|
|
|
- UA_NodeId_clear(&writerGroup->linkedConnection);
|
|
|
+ if(writerGroup->bufferedMessage.offsetsSize > 0){
|
|
|
+ for (size_t i = 0; i < writerGroup->bufferedMessage.offsetsSize; i++) {
|
|
|
+ if(writerGroup->bufferedMessage.offsets[i].contentType == UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT){
|
|
|
+ UA_DataValue_delete(writerGroup->bufferedMessage.offsets[i].offsetData.value.value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ UA_ByteString_deleteMembers(&writerGroup->bufferedMessage.buffer);
|
|
|
+ UA_free(writerGroup->bufferedMessage.offsets);
|
|
|
+ }
|
|
|
UA_NodeId_clear(&writerGroup->identifier);
|
|
|
}
|
|
|
|
|
@@ -1045,6 +1151,17 @@ UA_Server_addDataSetWriter(UA_Server *server,
|
|
|
return UA_STATUSCODE_BADCONFIGURATIONERROR;
|
|
|
}
|
|
|
|
|
|
+ if(wg->config.rtLevel != UA_PUBSUB_RT_NONE){
|
|
|
+ UA_DataSetField *tmpDSF;
|
|
|
+ TAILQ_FOREACH(tmpDSF, ¤tDataSetContext->fields, listEntry){
|
|
|
+ if(tmpDSF->config.field.variable.staticValueSourceEnabled != UA_TRUE){
|
|
|
+ UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
+ "Adding DataSetWriter failed. Fields in PDS are not RT capable.");
|
|
|
+ return UA_STATUSCODE_BADCONFIGURATIONERROR;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
|
|
|
if(!newDataSetWriter)
|
|
|
return UA_STATUSCODE_BADOUTOFMEMORY;
|
|
@@ -1159,9 +1276,10 @@ UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
|
|
|
|
|
|
UA_DataSetField *
|
|
|
UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
|
|
|
- for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
|
|
|
+ UA_PublishedDataSet *tmpPDS;
|
|
|
+ TAILQ_FOREACH(tmpPDS, &server->pubSubManager.publishedDataSets, listEntry){
|
|
|
UA_DataSetField *tmpField;
|
|
|
- TAILQ_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
|
|
|
+ TAILQ_FOREACH(tmpField, &tmpPDS->fields, listEntry){
|
|
|
if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
|
|
|
return tmpField;
|
|
|
}
|
|
@@ -1258,7 +1376,6 @@ UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
|
|
|
value->value.storageType = UA_VARIANT_DATA_NODELETE;
|
|
|
*value = field->config.field.variable.staticValueSource;
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
static UA_StatusCode
|
|
@@ -1658,68 +1775,89 @@ sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
|
|
|
}
|
|
|
|
|
|
static UA_StatusCode
|
|
|
-sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
|
|
|
- UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
|
|
|
- UA_ExtensionObject *messageSettings,
|
|
|
- UA_ExtensionObject *transportSettings) {
|
|
|
-
|
|
|
+generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
|
|
|
+ UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
|
|
|
+ UA_ExtensionObject *messageSettings,
|
|
|
+ UA_ExtensionObject *transportSettings,
|
|
|
+ UA_NetworkMessage *networkMessage) {
|
|
|
if(messageSettings->content.decoded.type !=
|
|
|
&UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
|
|
|
return UA_STATUSCODE_BADINTERNALERROR;
|
|
|
UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
|
|
|
- messageSettings->content.decoded.data;
|
|
|
-
|
|
|
- UA_NetworkMessage nm;
|
|
|
- memset(&nm, 0, sizeof(UA_NetworkMessage));
|
|
|
-
|
|
|
- nm.publisherIdEnabled =
|
|
|
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
|
|
|
- nm.groupHeaderEnabled =
|
|
|
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
|
|
|
- nm.groupHeader.writerGroupIdEnabled =
|
|
|
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
|
|
|
- nm.groupHeader.groupVersionEnabled =
|
|
|
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
|
|
|
- nm.groupHeader.networkMessageNumberEnabled =
|
|
|
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
|
|
|
- nm.groupHeader.sequenceNumberEnabled =
|
|
|
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
|
|
|
- nm.payloadHeaderEnabled =
|
|
|
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
|
|
|
- nm.timestampEnabled =
|
|
|
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
|
|
|
- nm.picosecondsEnabled =
|
|
|
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
|
|
|
- nm.dataSetClassIdEnabled =
|
|
|
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
|
|
|
- nm.promotedFieldsEnabled =
|
|
|
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
|
|
|
-
|
|
|
- nm.version = 1;
|
|
|
- nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
|
|
|
+ messageSettings->content.decoded.data;
|
|
|
+
|
|
|
+ networkMessage->publisherIdEnabled =
|
|
|
+ ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
|
|
|
+ networkMessage->groupHeaderEnabled =
|
|
|
+ ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
|
|
|
+ networkMessage->groupHeader.writerGroupIdEnabled =
|
|
|
+ ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
|
|
|
+ networkMessage->groupHeader.groupVersionEnabled =
|
|
|
+ ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
|
|
|
+ networkMessage->groupHeader.networkMessageNumberEnabled =
|
|
|
+ ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
|
|
|
+ networkMessage->groupHeader.sequenceNumberEnabled =
|
|
|
+ ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
|
|
|
+ networkMessage->payloadHeaderEnabled =
|
|
|
+ ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
|
|
|
+ networkMessage->timestampEnabled =
|
|
|
+ ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
|
|
|
+ networkMessage->picosecondsEnabled =
|
|
|
+ ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
|
|
|
+ networkMessage->dataSetClassIdEnabled =
|
|
|
+ ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
|
|
|
+ networkMessage->promotedFieldsEnabled =
|
|
|
+ ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
|
|
|
+ networkMessage->version = 1;
|
|
|
+ networkMessage->networkMessageType = UA_NETWORKMESSAGE_DATASET;
|
|
|
if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
|
|
|
- nm.publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
|
|
|
- nm.publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
|
|
|
+ networkMessage->publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
|
|
|
+ networkMessage->publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
|
|
|
} else if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
|
|
|
- nm.publisherIdType = UA_PUBLISHERDATATYPE_STRING;
|
|
|
- nm.publisherId.publisherIdString = connection->config->publisherId.string;
|
|
|
+ networkMessage->publisherIdType = UA_PUBLISHERDATATYPE_STRING;
|
|
|
+ networkMessage->publisherId.publisherIdString = connection->config->publisherId.string;
|
|
|
}
|
|
|
-
|
|
|
+ if(networkMessage->groupHeader.sequenceNumberEnabled)
|
|
|
+ networkMessage->groupHeader.sequenceNumber = wg->sequenceNumber;
|
|
|
/* Compute the length of the dsm separately for the header */
|
|
|
UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
|
|
|
for(UA_Byte i = 0; i < dsmCount; i++)
|
|
|
- dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
|
|
|
+ dsmLengths[i] = (UA_UInt16) UA_DataSetMessage_calcSizeBinary(&dsm[i], NULL, 0);
|
|
|
+
|
|
|
+ networkMessage->payloadHeader.dataSetPayloadHeader.count = dsmCount;
|
|
|
+ networkMessage->payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
|
|
|
+ networkMessage->groupHeader.writerGroupId = wg->config.writerGroupId;
|
|
|
+ /* number of the NetworkMessage inside a PublishingInterval */
|
|
|
+ networkMessage->groupHeader.networkMessageNumber = 1;
|
|
|
+ networkMessage->payload.dataSetPayload.sizes = dsmLengths;
|
|
|
+ networkMessage->payload.dataSetPayload.dataSetMessages = dsm;
|
|
|
+ return UA_STATUSCODE_GOOD;
|
|
|
+}
|
|
|
|
|
|
- nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
|
|
|
- nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
|
|
|
- nm.groupHeader.writerGroupId = wg->config.writerGroupId;
|
|
|
- nm.groupHeader.networkMessageNumber = 1;
|
|
|
- nm.payload.dataSetPayload.sizes = dsmLengths;
|
|
|
- nm.payload.dataSetPayload.dataSetMessages = dsm;
|
|
|
+static UA_StatusCode
|
|
|
+sendBufferedNetworkMessage(UA_Server *server, UA_PubSubConnection *connection,
|
|
|
+ UA_NetworkMessageOffsetBuffer *buffer,
|
|
|
+ UA_ExtensionObject *transportSettings) {
|
|
|
+ if(UA_NetworkMessage_updateBufferedMessage(buffer) != UA_STATUSCODE_GOOD)
|
|
|
+ UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
+ "PubSub sending. Unknown field type.");
|
|
|
+ return connection->channel->send(connection->channel,
|
|
|
+ transportSettings, &buffer->buffer);
|
|
|
+}
|
|
|
+
|
|
|
+static UA_StatusCode
|
|
|
+sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
|
|
|
+ UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
|
|
|
+ UA_ExtensionObject *messageSettings,
|
|
|
+ UA_ExtensionObject *transportSettings) {
|
|
|
+
|
|
|
+ UA_NetworkMessage nm;
|
|
|
+ memset(&nm, 0, sizeof(UA_NetworkMessage));
|
|
|
+ generateNetworkMessage(connection, wg, dsm, writerIds, dsmCount, messageSettings, transportSettings, &nm);
|
|
|
|
|
|
/* Allocate the buffer. Allocate on the stack if the buffer is small. */
|
|
|
UA_ByteString buf;
|
|
|
- size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
|
|
|
+ size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm, NULL);
|
|
|
size_t stackSize = 1;
|
|
|
if(msgSize <= UA_MAX_STACKBUF)
|
|
|
stackSize = msgSize;
|
|
@@ -1776,14 +1914,22 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
|
|
|
}
|
|
|
|
|
|
/* Find the connection associated with the writer */
|
|
|
- UA_PubSubConnection *connection =
|
|
|
- UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
|
|
|
+ UA_PubSubConnection *connection = writerGroup->linkedConnection;
|
|
|
if(!connection) {
|
|
|
UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
"Publish failed. PubSubConnection invalid.");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ if(writerGroup->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE) {
|
|
|
+ UA_StatusCode res =
|
|
|
+ sendBufferedNetworkMessage(server, connection, &writerGroup->bufferedMessage,
|
|
|
+ &writerGroup->config.transportSettings);
|
|
|
+ if(res == UA_STATUSCODE_GOOD)
|
|
|
+ writerGroup->sequenceNumber++;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
/* How many DSM can be sent in one NM? */
|
|
|
UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
|
|
|
if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
|
|
@@ -1827,13 +1973,22 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
|
|
|
&dsw->config.dataSetWriterId, 1,
|
|
|
&writerGroup->config.messageSettings,
|
|
|
&writerGroup->config.transportSettings);
|
|
|
- }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
|
|
|
+ } else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON) {
|
|
|
res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
|
|
|
- &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
|
|
|
+ &dsw->config.dataSetWriterId, 1,
|
|
|
+ &writerGroup->config.transportSettings);
|
|
|
}
|
|
|
- if(res != UA_STATUSCODE_GOOD)
|
|
|
+
|
|
|
+ if(res != UA_STATUSCODE_GOOD)
|
|
|
UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
"PubSub Publish: Could not send a NetworkMessage");
|
|
|
+
|
|
|
+ if(writerGroup->config.rtLevel == UA_PUBSUB_RT_DIRECT_VALUE_ACCESS) {
|
|
|
+ for(size_t i = 0; i < dsmStore[dsmCount].data.keyFrameData.fieldCount; ++i) {
|
|
|
+ dsmStore[dsmCount].data.keyFrameData.dataSetFields[i].value.data = NULL;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
UA_DataSetMessage_free(&dsmStore[dsmCount]);
|
|
|
continue;
|
|
|
}
|
|
@@ -1848,20 +2003,25 @@ UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
|
|
|
UA_Byte nmDsmCount = maxDSM;
|
|
|
if(i == nmCount - 1 && (dsmCount % maxDSM))
|
|
|
nmDsmCount = (UA_Byte)dsmCount % maxDSM;
|
|
|
-
|
|
|
UA_StatusCode res3 = UA_STATUSCODE_GOOD;
|
|
|
if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
|
|
|
res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
|
|
|
&dsWriterIds[i * maxDSM], nmDsmCount,
|
|
|
&writerGroup->config.messageSettings,
|
|
|
&writerGroup->config.transportSettings);
|
|
|
- }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
|
|
|
+ } else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
|
|
|
res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
|
|
|
- &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
|
|
|
+ &dsWriterIds[i * maxDSM], nmDsmCount,
|
|
|
+ &writerGroup->config.transportSettings);
|
|
|
}
|
|
|
- if(res3 != UA_STATUSCODE_GOOD)
|
|
|
+
|
|
|
+ if(res3 != UA_STATUSCODE_GOOD) {
|
|
|
UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
|
|
|
"PubSub Publish: Sending a NetworkMessage failed");
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ writerGroup->sequenceNumber++;
|
|
|
}
|
|
|
|
|
|
/* Clean up DSM */
|