12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045 |
- /* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- *
- * Copyright (c) 2017-2019 Fraunhofer IOSB (Author: Andreas Ebner)
- * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
- * Copyright (c) 2019 Kalycito Infotech Private Limited
- */
- #include <open62541/server_pubsub.h>
- #include "server/ua_server_internal.h"
- #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
- #include "ua_pubsub.h"
- #include "ua_pubsub_networkmessage.h"
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- #include "ua_pubsub_ns0.h"
- #endif
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- #include "ua_types_encoding_binary.h"
- #endif
- #define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */
- /* Forward declaration */
- static void
- UA_WriterGroup_clear(UA_Server *server, UA_WriterGroup *writerGroup);
- static void
- UA_DataSetField_clear(UA_DataSetField *field);
- static UA_StatusCode
- generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
- UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
- UA_ExtensionObject *messageSettings,
- UA_ExtensionObject *transportSettings,
- UA_NetworkMessage *networkMessage);
- static UA_StatusCode
- UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
- UA_DataSetWriter *dataSetWriter);
- /**********************************************/
- /* Connection */
- /**********************************************/
- UA_StatusCode
- UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
- UA_PubSubConnectionConfig *dst) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
- retVal |= UA_String_copy(&src->name, &dst->name);
- retVal |= UA_Variant_copy(&src->address, &dst->address);
- retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
- retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
- if(src->connectionPropertiesSize > 0){
- dst->connectionProperties = (UA_KeyValuePair *)
- UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
- if(!dst->connectionProperties){
- return UA_STATUSCODE_BADOUTOFMEMORY;
- }
- for(size_t i = 0; i < src->connectionPropertiesSize; i++){
- retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
- &dst->connectionProperties[i].key);
- retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
- &dst->connectionProperties[i].value);
- }
- }
- return retVal;
- }
- UA_StatusCode
- UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
- UA_PubSubConnectionConfig *config) {
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_PubSubConnection *currentPubSubConnection =
- UA_PubSubConnection_findConnectionbyId(server, connection);
- if(!currentPubSubConnection)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
- //deep copy of the actual config
- UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
- *config = tmpPubSubConnectionConfig;
- return UA_STATUSCODE_GOOD;
- }
- UA_PubSubConnection *
- UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
- UA_PubSubConnection *pubSubConnection;
- TAILQ_FOREACH(pubSubConnection, &server->pubSubManager.connections, listEntry){
- if(UA_NodeId_equal(&connectionIdentifier, &pubSubConnection->identifier)){
- return pubSubConnection;
- }
- }
- return NULL;
- }
- void
- UA_PubSubConnectionConfig_clear(UA_PubSubConnectionConfig *connectionConfig) {
- UA_String_clear(&connectionConfig->name);
- UA_String_clear(&connectionConfig->transportProfileUri);
- UA_Variant_clear(&connectionConfig->connectionTransportSettings);
- UA_Variant_clear(&connectionConfig->address);
- for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
- UA_QualifiedName_clear(&connectionConfig->connectionProperties[i].key);
- UA_Variant_clear(&connectionConfig->connectionProperties[i].value);
- }
- UA_free(connectionConfig->connectionProperties);
- }
- void
- UA_PubSubConnection_clear(UA_Server *server, UA_PubSubConnection *connection) {
- //delete connection config
- UA_PubSubConnectionConfig_clear(connection->config);
- //remove contained WriterGroups
- UA_WriterGroup *writerGroup, *tmpWriterGroup;
- LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
- UA_Server_removeWriterGroup(server, writerGroup->identifier);
- }
- /* remove contained ReaderGroups */
- UA_ReaderGroup *readerGroups, *tmpReaderGroup;
- LIST_FOREACH_SAFE(readerGroups, &connection->readerGroups, listEntry, tmpReaderGroup){
- UA_Server_removeReaderGroup(server, readerGroups->identifier);
- }
- UA_NodeId_clear(&connection->identifier);
- if(connection->channel){
- connection->channel->close(connection->channel);
- }
- UA_free(connection->config);
- }
- UA_StatusCode
- UA_PubSubConnection_regist(UA_Server *server, UA_NodeId *connectionIdentifier) {
- UA_PubSubConnection *connection =
- UA_PubSubConnection_findConnectionbyId(server, *connectionIdentifier);
- UA_StatusCode retval = UA_STATUSCODE_GOOD;
- if(connection == NULL) {
- return UA_STATUSCODE_BADNOTFOUND;
- }
- retval = connection->channel->regist(connection->channel, NULL, NULL);
- if(retval != UA_STATUSCODE_GOOD) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "register channel failed: 0x%x!", retval);
- }
- return retval;
- }
- UA_StatusCode
- UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
- const UA_WriterGroupConfig *writerGroupConfig,
- UA_NodeId *writerGroupIdentifier) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- if(!writerGroupConfig)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- //search the connection by the given connectionIdentifier
- UA_PubSubConnection *currentConnectionContext =
- UA_PubSubConnection_findConnectionbyId(server, connection);
- if(!currentConnectionContext)
- return UA_STATUSCODE_BADNOTFOUND;
- if(currentConnectionContext->config->configurationFrozen){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Adding WriterGroup failed. PubSubConnection is frozen.");
- return UA_STATUSCODE_BADCONFIGURATIONERROR;
- }
- //allocate memory for new WriterGroup
- UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
- if(!newWriterGroup)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- newWriterGroup->linkedConnection = currentConnectionContext->identifier;
- newWriterGroup->linkedConnectionPtr = currentConnectionContext;
- UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
- if(writerGroupIdentifier){
- UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
- }
- //deep copy of the config
- UA_WriterGroupConfig tmpWriterGroupConfig;
- retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
- if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
- UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
- tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
- tmpWriterGroupConfig.messageSettings.content.decoded.type =
- &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
- tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
- }
- newWriterGroup->config = tmpWriterGroupConfig;
- LIST_INSERT_HEAD(¤tConnectionContext->writerGroups, newWriterGroup, listEntry);
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- addWriterGroupRepresentation(server, newWriterGroup);
- #endif
- return retVal;
- }
- UA_StatusCode
- UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
- UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
- if(!wg)
- return UA_STATUSCODE_BADNOTFOUND;
- if(wg->config.configurationFrozen){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Delete WriterGroup failed. WriterGroup is frozen.");
- return UA_STATUSCODE_BADCONFIGURATIONERROR;
- }
- UA_PubSubConnection *connection =
- UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
- if(!connection)
- return UA_STATUSCODE_BADNOTFOUND;
- if(connection->config->configurationFrozen){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Delete WriterGroup failed. PubSubConnection is frozen.");
- return UA_STATUSCODE_BADCONFIGURATIONERROR;
- }
- if(wg->state == UA_PUBSUBSTATE_OPERATIONAL){
- //unregister the publish callback
- UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
- }
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- removeGroupRepresentation(server, wg);
- #endif
- UA_WriterGroup_clear(server, wg);
- LIST_REMOVE(wg, listEntry);
- UA_free(wg);
- return UA_STATUSCODE_GOOD;
- }
- UA_StatusCode
- UA_Server_freezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writerGroup){
- UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
- if(!wg)
- return UA_STATUSCODE_BADNOTFOUND;
- //PubSubConnection freezeCounter++
- UA_PubSubConnection *pubSubConnection = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
- pubSubConnection->configurationFreezeCounter++;
- pubSubConnection->config->configurationFrozen = UA_TRUE;
- //WriterGroup freeze
- wg->config.configurationFrozen = UA_TRUE;
- //DataSetWriter freeze
- UA_DataSetWriter *dataSetWriter;
- LIST_FOREACH(dataSetWriter, &wg->writers, listEntry){
- dataSetWriter->config.configurationFrozen = UA_TRUE;
- //PublishedDataSet freezeCounter++
- UA_PublishedDataSet *publishedDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
- publishedDataSet->configurationFreezeCounter++;
- publishedDataSet->config.configurationFrozen = UA_TRUE;
- //DataSetFields freeze
- UA_DataSetField *dataSetField;
- TAILQ_FOREACH(dataSetField, &publishedDataSet->fields, listEntry){
- dataSetField->config.configurationFrozen = UA_TRUE;
- }
- }
- if(wg->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE){
- size_t dsmCount = 0;
- if(wg->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub-RT configuration fail: Non-RT capable encoding.");
- return UA_STATUSCODE_BADNOTSUPPORTED;
- }
- //TODO Clarify: should we only allow = maxEncapsulatedDataSetMessageCount == 1 with RT?
- //TODO Clarify: Behaviour if the finale size is more than MTU
- /* Generate data set messages */
- UA_STACKARRAY(UA_UInt16, dsWriterIds, wg->writersCount);
- UA_STACKARRAY(UA_DataSetMessage, dsmStore, wg->writersCount);
- UA_DataSetWriter *dsw;
- LIST_FOREACH(dsw, &wg->writers, listEntry) {
- /* Find the dataset */
- UA_PublishedDataSet *pds =
- UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
- if(!pds) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub Publish: PublishedDataSet not found");
- continue;
- }
- if(pds->promotedFieldsCount > 0) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub-RT configuration fail: PDS contains promoted fields.");
- return UA_STATUSCODE_BADNOTSUPPORTED;
- }
- UA_DataSetField *dsf;
- TAILQ_FOREACH(dsf, &pds->fields, listEntry){
- if(!dsf->config.field.variable.staticValueSourceEnabled){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub-RT configuration fail: PDS contains variables with dynamic length types.");
- return UA_STATUSCODE_BADNOTSUPPORTED;
- }
- if((UA_NodeId_equal(&dsf->fieldMetaData.dataType, &UA_TYPES[UA_TYPES_STRING].typeId) ||
- UA_NodeId_equal(&dsf->fieldMetaData.dataType, &UA_TYPES[UA_TYPES_BYTESTRING].typeId)) &&
- dsf->fieldMetaData.maxStringLength == 0){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub-RT configuration fail: PDS contains String/ByteString with dynamic length.");
- return UA_STATUSCODE_BADNOTSUPPORTED;
- } else if(!UA_DataType_isNumeric(UA_findDataType(&dsf->fieldMetaData.dataType))){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub-RT configuration fail: PDS contains variable with dynamic size.");
- return UA_STATUSCODE_BADNOTSUPPORTED;
- }
- }
- /* Generate the DSM */
- UA_StatusCode res =
- UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
- if(res != UA_STATUSCODE_GOOD) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub RT Offset calculation: DataSetMessage buffering failed");
- continue;
- }
- dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
- dsmCount++;
- }
- UA_NetworkMessage networkMessage;
- memset(&networkMessage, 0, sizeof(networkMessage));
- UA_StatusCode res = generateNetworkMessage(pubSubConnection, wg, dsmStore, dsWriterIds, (UA_Byte) dsmCount,
- &wg->config.messageSettings, &wg->config.transportSettings, &networkMessage);
- if(res != UA_STATUSCODE_GOOD)
- return UA_STATUSCODE_BADINTERNALERROR;
- UA_NetworkMessage_calcSizeBinary(&networkMessage, &wg->bufferedMessage);
- /* Allocate the buffer. Allocate on the stack if the buffer is small. */
- UA_ByteString buf;
- size_t msgSize = UA_NetworkMessage_calcSizeBinary(&networkMessage, NULL);
- res = UA_ByteString_allocBuffer(&buf, msgSize);
- if(res != UA_STATUSCODE_GOOD)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- wg->bufferedMessage.buffer = buf;
- const UA_Byte *bufEnd = &wg->bufferedMessage.buffer.data[wg->bufferedMessage.buffer.length];
- UA_Byte *bufPos = wg->bufferedMessage.buffer.data;
- UA_NetworkMessage_encodeBinary(&networkMessage, &bufPos, bufEnd);
- /* Clean up DSM */
- for(size_t i = 0; i < dsmCount; i++){
- UA_free(dsmStore[i].data.keyFrameData.dataSetFields);
- #ifdef UA_ENABLE_JSON_ENCODING
- UA_free(dsmStore[i].data.keyFrameData.fieldNames);
- #endif
- }
- //UA_DataSetMessage_free(&dsmStore[i]);
- }
- return UA_STATUSCODE_GOOD;
- }
- UA_StatusCode
- UA_Server_unfreezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writerGroup){
- UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
- if(!wg)
- return UA_STATUSCODE_BADNOTFOUND;
- //if(wg->config.rtLevel == UA_PUBSUB_RT_NONE){
- // UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- // "PubSub configuration freeze without RT configuration has no effect.");
- // return UA_STATUSCODE_BADCONFIGURATIONERROR;
- //}
- //PubSubConnection freezeCounter--
- UA_PubSubConnection *pubSubConnection = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
- pubSubConnection->configurationFreezeCounter--;
- if(pubSubConnection->configurationFreezeCounter == 0){
- pubSubConnection->config->configurationFrozen = UA_FALSE;
- }
- //WriterGroup unfreeze
- wg->config.configurationFrozen = UA_FALSE;
- //DataSetWriter unfreeze
- UA_DataSetWriter *dataSetWriter;
- LIST_FOREACH(dataSetWriter, &wg->writers, listEntry){
- UA_PublishedDataSet *publishedDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
- //PublishedDataSet freezeCounter--
- publishedDataSet->configurationFreezeCounter--;
- if(publishedDataSet->configurationFreezeCounter == 0){
- publishedDataSet->config.configurationFrozen = UA_FALSE;
- UA_DataSetField *dataSetField;
- TAILQ_FOREACH(dataSetField, &publishedDataSet->fields, listEntry){
- dataSetField->config.configurationFrozen = UA_FALSE;
- }
- }
- dataSetWriter->config.configurationFrozen = UA_FALSE;
- }
- return UA_STATUSCODE_GOOD;
- }
- UA_StatusCode UA_EXPORT
- UA_Server_setWriterGroupOperational(UA_Server *server, const UA_NodeId writerGroup){
- UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
- if(!wg)
- return UA_STATUSCODE_BADNOTFOUND;
- return UA_WriterGroup_setPubSubState(server, UA_PUBSUBSTATE_OPERATIONAL, wg);
- }
- UA_StatusCode UA_EXPORT
- UA_Server_setWriterGroupDisabled(UA_Server *server, const UA_NodeId writerGroup){
- UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
- if(!wg)
- return UA_STATUSCODE_BADNOTFOUND;
- return UA_WriterGroup_setPubSubState(server, UA_PUBSUBSTATE_DISABLED, wg);
- }
- /**********************************************/
- /* PublishedDataSet */
- /**********************************************/
- UA_StatusCode
- UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
- UA_PublishedDataSetConfig *dst) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
- retVal |= UA_String_copy(&src->name, &dst->name);
- switch(src->publishedDataSetType){
- case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
- //no additional items
- break;
- case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
- if(src->config.itemsTemplate.variablesToAddSize > 0){
- dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
- src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
- }
- for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
- retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
- &dst->config.itemsTemplate.variablesToAdd[i]);
- }
- retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
- &dst->config.itemsTemplate.metaData);
- break;
- default:
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- return retVal;
- }
- UA_StatusCode
- UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
- UA_PublishedDataSetConfig *config){
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
- if(!currentPublishedDataSet)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
- //deep copy of the actual config
- UA_PublishedDataSetConfig_copy(¤tPublishedDataSet->config, &tmpPublishedDataSetConfig);
- *config = tmpPublishedDataSetConfig;
- return UA_STATUSCODE_GOOD;
- }
- UA_StatusCode
- UA_Server_getPublishedDataSetMetaData(UA_Server *server, const UA_NodeId pds, UA_DataSetMetaDataType *metaData){
- if(!metaData)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
- if(!currentPublishedDataSet)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_DataSetMetaDataType tmpDataSetMetaData;
- if(UA_DataSetMetaDataType_copy(¤tPublishedDataSet->dataSetMetaData, &tmpDataSetMetaData) != UA_STATUSCODE_GOOD)
- return UA_STATUSCODE_BADINTERNALERROR;
- *metaData = tmpDataSetMetaData;
- return UA_STATUSCODE_GOOD;
- }
- UA_PublishedDataSet *
- UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
- for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
- if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
- return &server->pubSubManager.publishedDataSets[i];
- }
- }
- return NULL;
- }
- void
- UA_PublishedDataSetConfig_clear(UA_PublishedDataSetConfig *pdsConfig){
- //delete pds config
- UA_String_clear(&pdsConfig->name);
- switch (pdsConfig->publishedDataSetType){
- case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
- //no additional items
- break;
- case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
- if(pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
- for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
- UA_PublishedVariableDataType_clear(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
- }
- UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
- }
- UA_DataSetMetaDataType_clear(&pdsConfig->config.itemsTemplate.metaData);
- break;
- default:
- break;
- }
- }
- void
- UA_PublishedDataSet_clear(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
- UA_PublishedDataSetConfig_clear(&publishedDataSet->config);
- //delete PDS
- UA_DataSetField *field, *tmpField;
- TAILQ_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
- UA_Server_removeDataSetField(server, field->identifier);
- }
- UA_DataSetMetaDataType_clear(&publishedDataSet->dataSetMetaData);
- UA_NodeId_clear(&publishedDataSet->identifier);
- }
- static UA_StatusCode
- generateFieldMetaData(UA_Server *server, UA_DataSetField *field, UA_FieldMetaData *fieldMetaData){
- switch (field->config.dataSetFieldType){
- case UA_PUBSUB_DATASETFIELD_VARIABLE:
- if(UA_String_copy(&field->config.field.variable.fieldNameAlias, &fieldMetaData->name) != UA_STATUSCODE_GOOD)
- return UA_STATUSCODE_BADINTERNALERROR;
- fieldMetaData->description = UA_LOCALIZEDTEXT_ALLOC("", "");
- fieldMetaData->dataSetFieldId = UA_GUID_NULL;
- //ToDo after freeze PR, the value source must be checked (other behavior for static value source)
- if(field->config.field.variable.staticValueSourceEnabled){
- fieldMetaData->arrayDimensions = (UA_UInt32 *) UA_calloc(
- field->config.field.variable.staticValueSource.value.arrayDimensionsSize, sizeof(UA_UInt32));
- if(fieldMetaData->arrayDimensions == NULL)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- memcpy(fieldMetaData->arrayDimensions, field->config.field.variable.staticValueSource.value.arrayDimensions,
- sizeof(UA_UInt32) *field->config.field.variable.staticValueSource.value.arrayDimensionsSize);
- fieldMetaData->arrayDimensionsSize = field->config.field.variable.staticValueSource.value.arrayDimensionsSize;
- if(UA_NodeId_copy(&field->config.field.variable.staticValueSource.value.type->typeId,
- &fieldMetaData->dataType) != UA_STATUSCODE_GOOD){
- if(fieldMetaData->arrayDimensions){
- UA_free(fieldMetaData->arrayDimensions);
- return UA_STATUSCODE_BADINTERNALERROR;
- }
- }
- fieldMetaData->properties = NULL;
- fieldMetaData->propertiesSize = 0;
- //TODO collect value rank for the static field source
- fieldMetaData->fieldFlags = UA_DATASETFIELDFLAGS_NONE;
- return UA_STATUSCODE_GOOD;
- }
- UA_Variant value;
- UA_Variant_init(&value);
- if(UA_Server_readArrayDimensions(server, field->config.field.variable.publishParameters.publishedVariable,
- &value) != UA_STATUSCODE_GOOD){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub meta data generation. Reading ArrayDimension failed.");
- } else {
- fieldMetaData->arrayDimensions = (UA_UInt32 *) UA_calloc(value.arrayDimensionsSize, sizeof(UA_UInt32));
- if(fieldMetaData->arrayDimensions == NULL)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- memcpy(fieldMetaData->arrayDimensions, value.arrayDimensions, sizeof(UA_UInt32)*value.arrayDimensionsSize);
- fieldMetaData->arrayDimensionsSize = value.arrayDimensionsSize;
- }
- if(UA_Server_readDataType(server, field->config.field.variable.publishParameters.publishedVariable,
- &fieldMetaData->dataType) != UA_STATUSCODE_GOOD){
- if(fieldMetaData->arrayDimensions){
- UA_free(fieldMetaData->arrayDimensions);
- return UA_STATUSCODE_BADINTERNALERROR;
- }
- }
- fieldMetaData->properties = NULL;
- fieldMetaData->propertiesSize = 0;
- UA_Int32 valueRank;
- if(UA_Server_readValueRank(server, field->config.field.variable.publishParameters.publishedVariable,
- &valueRank) != UA_STATUSCODE_GOOD){
- if(fieldMetaData->arrayDimensions){
- UA_free(fieldMetaData->arrayDimensions);
- return UA_STATUSCODE_BADINTERNALERROR;
- }
- }
- fieldMetaData->valueRank = valueRank;
- if(field->config.field.variable.promotedField){
- fieldMetaData->fieldFlags = UA_DATASETFIELDFLAGS_PROMOTEDFIELD;
- } else {
- fieldMetaData->fieldFlags = UA_DATASETFIELDFLAGS_NONE;
- }
- //TODO collect the following fields
- //fieldMetaData.builtInType
- //fieldMetaData.maxStringLength
- return UA_STATUSCODE_GOOD;
- case UA_PUBSUB_DATASETFIELD_EVENT:
- return UA_STATUSCODE_BADNOTSUPPORTED;
- default:
- return UA_STATUSCODE_BADNOTSUPPORTED;
- }
- }
- UA_DataSetFieldResult
- UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
- const UA_DataSetFieldConfig *fieldConfig,
- UA_NodeId *fieldIdentifier) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- UA_DataSetFieldResult result = {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
- if(!fieldConfig)
- return result;
- UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
- if(currentDataSet == NULL){
- result.result = UA_STATUSCODE_BADNOTFOUND;
- return result;
- }
- if(currentDataSet->config.configurationFrozen){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Adding DataSetField failed. PublishedDataSet is frozen.");
- result.result = UA_STATUSCODE_BADCONFIGURATIONERROR;
- return result;
- }
- if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
- result.result = UA_STATUSCODE_BADNOTIMPLEMENTED;
- return result;
- }
- UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
- if(!newField){
- result.result = UA_STATUSCODE_BADINTERNALERROR;
- return result;
- }
- UA_DataSetFieldConfig tmpFieldConfig;
- retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
- newField->config = tmpFieldConfig;
- UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
- if(fieldIdentifier != NULL){
- UA_NodeId_copy(&newField->identifier, fieldIdentifier);
- }
- newField->publishedDataSet = currentDataSet->identifier;
- //update major version of parent published data set
- currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
- /* The order of DataSetFields should be the same in both creating and publishing.
- * So adding DataSetFields at the the end of the DataSets using TAILQ structure */
- if (currentDataSet->fieldSize != 0)
- TAILQ_INSERT_TAIL(¤tDataSet->fields, newField, listEntry);
- else
- TAILQ_INSERT_HEAD(¤tDataSet->fields, newField, listEntry);
- if(newField->config.field.variable.promotedField)
- currentDataSet->promotedFieldsCount++;
- currentDataSet->fieldSize++;
- //generate fieldMetadata within the DataSetMetaData
- currentDataSet->dataSetMetaData.fieldsSize++;
- UA_FieldMetaData *fieldMetaData = (UA_FieldMetaData *) UA_realloc(currentDataSet->dataSetMetaData.fields, currentDataSet->dataSetMetaData.fieldsSize *
- sizeof(UA_FieldMetaData));
- if(!fieldMetaData){
- result.result = UA_STATUSCODE_BADOUTOFMEMORY;
- return result;
- }
- currentDataSet->dataSetMetaData.fields = fieldMetaData;
- UA_FieldMetaData_init(&fieldMetaData[currentDataSet->fieldSize-1]);
- if(generateFieldMetaData(server, newField, &fieldMetaData[currentDataSet->fieldSize-1]) != UA_STATUSCODE_GOOD){
- UA_Server_removeDataSetField(server, newField->identifier);
- result.result = UA_STATUSCODE_BADINTERNALERROR;
- return result;
- }
- UA_DataSetField *dsf;
- size_t counter = 0;
- TAILQ_FOREACH(dsf, ¤tDataSet->fields, listEntry){
- dsf->fieldMetaData = fieldMetaData[counter++];
- }
- result.result = retVal;
- result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
- result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
- return result;
- }
- UA_DataSetFieldResult
- UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
- UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
- UA_DataSetFieldResult result = {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
- if(!currentField)
- return result;
- if(currentField->config.configurationFrozen){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Remove DataSetField failed. DataSetField is frozen.");
- result.result = UA_STATUSCODE_BADCONFIGURATIONERROR;
- return result;
- }
- UA_PublishedDataSet *parentPublishedDataSet =
- UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
- if(!parentPublishedDataSet)
- return result;
- if(parentPublishedDataSet->config.configurationFrozen){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Remove DataSetField failed. PublishedDataSet is frozen.");
- result.result = UA_STATUSCODE_BADCONFIGURATIONERROR;
- return result;
- }
- parentPublishedDataSet->fieldSize--;
- if(currentField->config.field.variable.promotedField)
- parentPublishedDataSet->promotedFieldsCount--;
- /* update major version of PublishedDataSet */
- parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
- UA_PubSubConfigurationVersionTimeDifference();
- currentField->fieldMetaData.arrayDimensions = NULL;
- currentField->fieldMetaData.properties = NULL;
- currentField->fieldMetaData.name = UA_STRING_NULL;
- currentField->fieldMetaData.description.locale = UA_STRING_NULL;
- currentField->fieldMetaData.description.text = UA_STRING_NULL;
- UA_DataSetField_clear(currentField);
- TAILQ_REMOVE(&parentPublishedDataSet->fields, currentField, listEntry);
- UA_free(currentField);
- result.result = UA_STATUSCODE_GOOD;
- //regenerate DataSetMetaData
- parentPublishedDataSet->dataSetMetaData.fieldsSize--;
- if(parentPublishedDataSet->dataSetMetaData.fieldsSize > 0){
- for(size_t i = 0; i < parentPublishedDataSet->dataSetMetaData.fieldsSize+1; i++) {
- UA_FieldMetaData_clear(&parentPublishedDataSet->dataSetMetaData.fields[i]);
- }
- UA_free(parentPublishedDataSet->dataSetMetaData.fields);
- UA_FieldMetaData *fieldMetaData = (UA_FieldMetaData *) UA_calloc(parentPublishedDataSet->dataSetMetaData.fieldsSize,
- sizeof(UA_FieldMetaData));
- if(!fieldMetaData){
- result.result = UA_STATUSCODE_BADOUTOFMEMORY;
- return result;
- }
- UA_DataSetField *tmpDSF;
- size_t counter = 0;
- TAILQ_FOREACH(tmpDSF, &parentPublishedDataSet->fields, listEntry){
- UA_FieldMetaData tmpFieldMetaData;
- UA_FieldMetaData_init(&tmpFieldMetaData);
- if(generateFieldMetaData(server, tmpDSF, &tmpFieldMetaData) != UA_STATUSCODE_GOOD){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub MetaData generation failed!");
- //TODO how to ensure consistency if the metadata regeneration fails
- result.result = UA_STATUSCODE_BADINTERNALERROR;
- }
- fieldMetaData[counter++] = tmpFieldMetaData;
- }
- parentPublishedDataSet->dataSetMetaData.fields = fieldMetaData;
- } else {
- UA_FieldMetaData_delete(parentPublishedDataSet->dataSetMetaData.fields);
- parentPublishedDataSet->dataSetMetaData.fields = NULL;
- }
- result.configurationVersion.majorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion;
- result.configurationVersion.minorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion;
- return result;
- }
- /**********************************************/
- /* DataSetWriter */
- /**********************************************/
- UA_StatusCode
- UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
- UA_DataSetWriterConfig *dst){
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
- retVal |= UA_String_copy(&src->name, &dst->name);
- retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
- retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
- if (src->dataSetWriterPropertiesSize > 0) {
- dst->dataSetWriterProperties = (UA_KeyValuePair *)
- UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
- if(!dst->dataSetWriterProperties)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
- retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
- }
- }
- return retVal;
- }
- UA_StatusCode
- UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
- UA_DataSetWriterConfig *config){
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
- if(!currentDataSetWriter)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_DataSetWriterConfig tmpWriterConfig;
- //deep copy of the actual config
- retVal |= UA_DataSetWriterConfig_copy(¤tDataSetWriter->config, &tmpWriterConfig);
- *config = tmpWriterConfig;
- return retVal;
- }
- UA_DataSetWriter *
- UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
- UA_PubSubConnection *pubSubConnection;
- TAILQ_FOREACH(pubSubConnection, &server->pubSubManager.connections, listEntry){
- UA_WriterGroup *tmpWriterGroup;
- LIST_FOREACH(tmpWriterGroup, &pubSubConnection->writerGroups, listEntry){
- UA_DataSetWriter *tmpWriter;
- LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
- if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
- return tmpWriter;
- }
- }
- }
- }
- return NULL;
- }
- void
- UA_DataSetWriterConfig_clear(UA_DataSetWriterConfig *pdsConfig) {
- UA_String_clear(&pdsConfig->name);
- UA_String_clear(&pdsConfig->dataSetName);
- for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
- UA_KeyValuePair_clear(&pdsConfig->dataSetWriterProperties[i]);
- }
- UA_free(pdsConfig->dataSetWriterProperties);
- UA_ExtensionObject_clear(&pdsConfig->messageSettings);
- }
- static void
- UA_DataSetWriter_clear(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
- UA_DataSetWriterConfig_clear(&dataSetWriter->config);
- //delete DataSetWriter
- UA_NodeId_clear(&dataSetWriter->identifier);
- UA_NodeId_clear(&dataSetWriter->linkedWriterGroup);
- UA_NodeId_clear(&dataSetWriter->connectedDataSet);
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- //delete lastSamples store
- for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
- UA_DataValue_clear(&dataSetWriter->lastSamples[i].value);
- }
- UA_free(dataSetWriter->lastSamples);
- dataSetWriter->lastSamples = NULL;
- dataSetWriter->lastSamplesCount = 0;
- #endif
- }
- //state machine methods not part of the open62541 state machine API
- UA_StatusCode
- UA_DataSetWriter_setPubSubState(UA_Server *server, UA_PubSubState state, UA_DataSetWriter *dataSetWriter){
- switch(state){
- case UA_PUBSUBSTATE_DISABLED:
- switch (dataSetWriter->state){
- case UA_PUBSUBSTATE_DISABLED:
- return UA_STATUSCODE_GOOD;
- case UA_PUBSUBSTATE_PAUSED:
- dataSetWriter->state = UA_PUBSUBSTATE_DISABLED;
- //no further action is required
- break;
- case UA_PUBSUBSTATE_OPERATIONAL:
- dataSetWriter->state = UA_PUBSUBSTATE_DISABLED;
- break;
- case UA_PUBSUBSTATE_ERROR:
- break;
- default:
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Received unknown PubSub state!");
- }
- break;
- case UA_PUBSUBSTATE_PAUSED:
- switch (dataSetWriter->state){
- case UA_PUBSUBSTATE_DISABLED:
- break;
- case UA_PUBSUBSTATE_PAUSED:
- return UA_STATUSCODE_GOOD;
- case UA_PUBSUBSTATE_OPERATIONAL:
- break;
- case UA_PUBSUBSTATE_ERROR:
- break;
- default:
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Received unknown PubSub state!");
- }
- break;
- case UA_PUBSUBSTATE_OPERATIONAL:
- switch (dataSetWriter->state){
- case UA_PUBSUBSTATE_DISABLED:
- dataSetWriter->state = UA_PUBSUBSTATE_OPERATIONAL;
- break;
- case UA_PUBSUBSTATE_PAUSED:
- break;
- case UA_PUBSUBSTATE_OPERATIONAL:
- return UA_STATUSCODE_GOOD;
- case UA_PUBSUBSTATE_ERROR:
- break;
- default:
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Received unknown PubSub state!");
- }
- break;
- case UA_PUBSUBSTATE_ERROR:
- switch (dataSetWriter->state){
- case UA_PUBSUBSTATE_DISABLED:
- break;
- case UA_PUBSUBSTATE_PAUSED:
- break;
- case UA_PUBSUBSTATE_OPERATIONAL:
- break;
- case UA_PUBSUBSTATE_ERROR:
- return UA_STATUSCODE_GOOD;
- default:
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Received unknown PubSub state!");
- }
- break;
- default:
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Received unknown PubSub state!");
- }
- return UA_STATUSCODE_GOOD;
- }
- /**********************************************/
- /* WriterGroup */
- /**********************************************/
- UA_StatusCode
- UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
- UA_WriterGroupConfig *dst){
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- memcpy(dst, src, sizeof(UA_WriterGroupConfig));
- retVal |= UA_String_copy(&src->name, &dst->name);
- retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
- retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
- if (src->groupPropertiesSize > 0) {
- dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
- if(!dst->groupProperties)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- for(size_t i = 0; i < src->groupPropertiesSize; i++){
- retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
- }
- }
- return retVal;
- }
- UA_StatusCode
- UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
- UA_WriterGroupConfig *config){
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
- if(!currentWriterGroup){
- return UA_STATUSCODE_BADNOTFOUND;
- }
- UA_WriterGroupConfig tmpWriterGroupConfig;
- //deep copy of the actual config
- retVal |= UA_WriterGroupConfig_copy(¤tWriterGroup->config, &tmpWriterGroupConfig);
- *config = tmpWriterGroupConfig;
- return retVal;
- }
- UA_StatusCode
- UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
- const UA_WriterGroupConfig *config){
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
- if(!currentWriterGroup)
- return UA_STATUSCODE_BADNOTFOUND;
- if(currentWriterGroup->config.configurationFrozen){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Modify WriterGroup failed. WriterGroup is frozen.");
- return UA_STATUSCODE_BADCONFIGURATIONERROR;
- }
- //The update functionality will be extended during the next PubSub batches.
- //Currently is only a change of the publishing interval possible.
- if(currentWriterGroup->config.maxEncapsulatedDataSetMessageCount != config->maxEncapsulatedDataSetMessageCount){
- currentWriterGroup->config.maxEncapsulatedDataSetMessageCount = config->maxEncapsulatedDataSetMessageCount;
- if(currentWriterGroup->config.messageSettings.encoding == UA_EXTENSIONOBJECT_ENCODED_NOBODY) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "MaxEncapsulatedDataSetMessag need enabled 'PayloadHeader' within the message settings.");
- }
- }
- if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
- if(currentWriterGroup->config.rtLevel == UA_PUBSUB_RT_NONE && currentWriterGroup->state == UA_PUBSUBSTATE_OPERATIONAL){
- UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
- currentWriterGroup->config.publishingInterval = config->publishingInterval;
- UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
- } else {
- currentWriterGroup->config.publishingInterval = config->publishingInterval;
- }
- }
- if(currentWriterGroup->config.priority != config->priority) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "No or unsupported WriterGroup update.");
- }
- return UA_STATUSCODE_GOOD;
- }
- UA_WriterGroup *
- UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
- UA_PubSubConnection *tmpConnection;
- TAILQ_FOREACH(tmpConnection, &server->pubSubManager.connections, listEntry){
- UA_WriterGroup *tmpWriterGroup;
- LIST_FOREACH(tmpWriterGroup, &tmpConnection->writerGroups, listEntry) {
- if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
- return tmpWriterGroup;
- }
- }
- }
- return NULL;
- }
- void
- UA_WriterGroupConfig_clear(UA_WriterGroupConfig *writerGroupConfig){
- //delete writerGroup config
- UA_String_clear(&writerGroupConfig->name);
- UA_ExtensionObject_clear(&writerGroupConfig->transportSettings);
- UA_ExtensionObject_clear(&writerGroupConfig->messageSettings);
- for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
- UA_KeyValuePair_clear(&writerGroupConfig->groupProperties[i]);
- }
- UA_free(writerGroupConfig->groupProperties);
- }
- static void
- UA_WriterGroup_clear(UA_Server *server, UA_WriterGroup *writerGroup) {
- UA_WriterGroupConfig_clear(&writerGroup->config);
- //delete WriterGroup
- //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
- UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
- LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
- UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
- }
- if(writerGroup->bufferedMessage.offsetsSize > 0){
- for (size_t i = 0; i < writerGroup->bufferedMessage.offsetsSize; i++) {
- if(writerGroup->bufferedMessage.offsets[i].contentType == UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT){
- UA_DataValue_delete(writerGroup->bufferedMessage.offsets[i].offsetData.value.value);
- }
- }
- UA_ByteString_deleteMembers(&writerGroup->bufferedMessage.buffer);
- UA_free(writerGroup->bufferedMessage.offsets);
- }
- UA_NodeId_clear(&writerGroup->linkedConnection);
- UA_NodeId_clear(&writerGroup->identifier);
- }
- UA_StatusCode
- UA_WriterGroup_setPubSubState(UA_Server *server, UA_PubSubState state, UA_WriterGroup *writerGroup){
- UA_DataSetWriter *dataSetWriter;
- switch(state){
- case UA_PUBSUBSTATE_DISABLED:
- switch (writerGroup->state){
- case UA_PUBSUBSTATE_DISABLED:
- return UA_STATUSCODE_GOOD;
- case UA_PUBSUBSTATE_PAUSED:
- break;
- case UA_PUBSUBSTATE_OPERATIONAL:
- UA_PubSubManager_removeRepeatedPubSubCallback(server, writerGroup->publishCallbackId);
- LIST_FOREACH(dataSetWriter, &writerGroup->writers, listEntry){
- UA_DataSetWriter_setPubSubState(server, UA_PUBSUBSTATE_DISABLED, dataSetWriter);
- }
- writerGroup->state = UA_PUBSUBSTATE_DISABLED;
- break;
- case UA_PUBSUBSTATE_ERROR:
- break;
- default:
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Received unknown PubSub state!");
- }
- break;
- case UA_PUBSUBSTATE_PAUSED:
- switch (writerGroup->state){
- case UA_PUBSUBSTATE_DISABLED:
- break;
- case UA_PUBSUBSTATE_PAUSED:
- return UA_STATUSCODE_GOOD;
- case UA_PUBSUBSTATE_OPERATIONAL:
- break;
- case UA_PUBSUBSTATE_ERROR:
- break;
- default:
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Received unknown PubSub state!");
- }
- break;
- case UA_PUBSUBSTATE_OPERATIONAL:
- switch (writerGroup->state){
- case UA_PUBSUBSTATE_DISABLED:
- writerGroup->state = UA_PUBSUBSTATE_OPERATIONAL;
- UA_PubSubManager_removeRepeatedPubSubCallback(server, writerGroup->publishCallbackId);
- LIST_FOREACH(dataSetWriter, &writerGroup->writers, listEntry){
- UA_DataSetWriter_setPubSubState(server, UA_PUBSUBSTATE_OPERATIONAL, dataSetWriter);
- }
- UA_WriterGroup_addPublishCallback(server, writerGroup);
- break;
- case UA_PUBSUBSTATE_PAUSED:
- break;
- case UA_PUBSUBSTATE_OPERATIONAL:
- return UA_STATUSCODE_GOOD;
- case UA_PUBSUBSTATE_ERROR:
- break;
- default:
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Received unknown PubSub state!");
- }
- break;
- case UA_PUBSUBSTATE_ERROR:
- switch (writerGroup->state){
- case UA_PUBSUBSTATE_DISABLED:
- break;
- case UA_PUBSUBSTATE_PAUSED:
- break;
- case UA_PUBSUBSTATE_OPERATIONAL:
- break;
- case UA_PUBSUBSTATE_ERROR:
- return UA_STATUSCODE_GOOD;
- default:
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Received unknown PubSub state!");
- }
- break;
- default:
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Received unknown PubSub state!");
- }
- return UA_STATUSCODE_GOOD;
- }
- UA_StatusCode
- UA_Server_addDataSetWriter(UA_Server *server,
- const UA_NodeId writerGroup, const UA_NodeId dataSet,
- const UA_DataSetWriterConfig *dataSetWriterConfig,
- UA_NodeId *writerIdentifier) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- if(!dataSetWriterConfig)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
- if(!currentDataSetContext)
- return UA_STATUSCODE_BADNOTFOUND;
- if(currentDataSetContext->config.configurationFrozen){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Adding DataSetWriter failed. PublishedDataSet is frozen.");
- return UA_STATUSCODE_BADCONFIGURATIONERROR;
- }
- UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
- if(!wg)
- return UA_STATUSCODE_BADNOTFOUND;
- if(wg->config.configurationFrozen){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Adding DataSetWriter failed. WriterGroup is frozen.");
- return UA_STATUSCODE_BADCONFIGURATIONERROR;
- }
- if(wg->config.rtLevel != UA_PUBSUB_RT_NONE){
- UA_DataSetField *tmpDSF;
- TAILQ_FOREACH(tmpDSF, ¤tDataSetContext->fields, listEntry){
- if(tmpDSF->config.field.variable.staticValueSourceEnabled != UA_TRUE){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Adding DataSetWriter failed. Fields in PDS are not RT capable.");
- return UA_STATUSCODE_BADCONFIGURATIONERROR;
- }
- }
- }
- UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
- if(!newDataSetWriter)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- //copy the config into the new dataSetWriter
- UA_DataSetWriterConfig tmpDataSetWriterConfig;
- retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
- newDataSetWriter->config = tmpDataSetWriterConfig;
- //save the current version of the connected PublishedDataSet
- newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- //initialize the queue for the last values
- if (currentDataSetContext->fieldSize > 0) {
- newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
- UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
- if(!newDataSetWriter->lastSamples) {
- UA_DataSetWriterConfig_clear(&newDataSetWriter->config);
- UA_free(newDataSetWriter);
- return UA_STATUSCODE_BADOUTOFMEMORY;
- }
- newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
- }
- #endif
- //connect PublishedDataSet with DataSetWriter
- newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
- newDataSetWriter->linkedWriterGroup = wg->identifier;
- UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
- if(writerIdentifier != NULL)
- UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
- //add the new writer to the group
- LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
- wg->writersCount++;
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- addDataSetWriterRepresentation(server, newDataSetWriter);
- #endif
- return retVal;
- }
- UA_StatusCode
- UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
- UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
- if(!dataSetWriter)
- return UA_STATUSCODE_BADNOTFOUND;
- if(dataSetWriter->config.configurationFrozen){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Remove DataSetWriter failed. DataSetWriter is frozen.");
- return UA_STATUSCODE_BADCONFIGURATIONERROR;
- }
- UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
- if(!linkedWriterGroup)
- return UA_STATUSCODE_BADNOTFOUND;
- if(linkedWriterGroup->config.configurationFrozen){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Remove DataSetWriter failed. WriterGroup is frozen.");
- return UA_STATUSCODE_BADCONFIGURATIONERROR;
- }
- UA_PublishedDataSet *publishedDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
- if(!publishedDataSet)
- return UA_STATUSCODE_BADNOTFOUND;
- linkedWriterGroup->writersCount--;
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- removeDataSetWriterRepresentation(server, dataSetWriter);
- #endif
- //remove DataSetWriter from group
- UA_DataSetWriter_clear(server, dataSetWriter);
- LIST_REMOVE(dataSetWriter, listEntry);
- UA_free(dataSetWriter);
- return UA_STATUSCODE_GOOD;
- }
- /**********************************************/
- /* DataSetField */
- /**********************************************/
- UA_StatusCode
- UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
- memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
- if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
- UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
- UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
- &dst->field.variable.publishParameters);
- } else {
- return UA_STATUSCODE_BADNOTSUPPORTED;
- }
- return UA_STATUSCODE_GOOD;
- }
- UA_StatusCode
- UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
- UA_DataSetFieldConfig *config) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
- if(!currentDataSetField)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_DataSetFieldConfig tmpFieldConfig;
- //deep copy of the actual config
- retVal |= UA_DataSetFieldConfig_copy(¤tDataSetField->config, &tmpFieldConfig);
- *config = tmpFieldConfig;
- return retVal;
- }
- UA_DataSetField *
- UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
- for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
- UA_DataSetField *tmpField;
- TAILQ_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
- if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
- return tmpField;
- }
- }
- }
- return NULL;
- }
- void
- UA_DataSetFieldConfig_clear(UA_DataSetFieldConfig *dataSetFieldConfig){
- if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
- UA_String_clear(&dataSetFieldConfig->field.variable.fieldNameAlias);
- UA_PublishedVariableDataType_clear(&dataSetFieldConfig->field.variable.publishParameters);
- }
- }
- static void
- UA_DataSetField_clear(UA_DataSetField *field) {
- UA_DataSetFieldConfig_clear(&field->config);
- //delete DataSetField
- UA_NodeId_clear(&field->identifier);
- UA_NodeId_clear(&field->publishedDataSet);
- UA_FieldMetaData_clear(&field->fieldMetaData);
- }
- /*********************************************************/
- /* PublishValues handling */
- /*********************************************************/
- /**
- * Compare two variants. Internally used for value change detection.
- *
- * @return true if the value has changed
- */
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- static UA_Boolean
- valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
- if(! (oldValue && newValue))
- return false;
- UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
- size_t oldValueEncodingSize, newValueEncodingSize;
- oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
- newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
- if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
- return false;
- if(oldValueEncodingSize != newValueEncodingSize)
- return true;
- if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
- return false;
- if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
- return false;
- UA_Byte *bufPosOldValue = oldValueEncoding->data;
- const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
- UA_Byte *bufPosNewValue = newValueEncoding->data;
- const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
- if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
- &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
- return false;
- }
- if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
- &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
- return false;
- }
- oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
- newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
- UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
- UA_ByteString_delete(oldValueEncoding);
- UA_ByteString_delete(newValueEncoding);
- return compareResult;
- }
- #endif
- /**
- * Obtain the latest value for a specific DataSetField. This method is currently
- * called inside the DataSetMessage generation process.
- */
- static void
- UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
- UA_DataValue *value) {
- /* Read the value */
- if(field->config.field.variable.staticValueSourceEnabled == UA_FALSE){
- UA_ReadValueId rvid;
- UA_ReadValueId_init(&rvid);
- rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
- rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
- rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
- *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
- } else {
- value->value.storageType = UA_VARIANT_DATA_NODELETE;
- *value = field->config.field.variable.staticValueSource;
- }
- }
- static UA_StatusCode
- UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
- UA_DataSetWriter *dataSetWriter) {
- UA_PublishedDataSet *currentDataSet =
- UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
- if(!currentDataSet)
- return UA_STATUSCODE_BADNOTFOUND;
- /* Prepare DataSetMessageContent */
- dataSetMessage->header.dataSetMessageValid = true;
- dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
- dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
- dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
- UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
- if(!dataSetMessage->data.keyFrameData.dataSetFields)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- #ifdef UA_ENABLE_JSON_ENCODING
- /* json: insert fieldnames used as json keys */
- dataSetMessage->data.keyFrameData.fieldNames =
- (UA_String *)UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_STRING]);
- if(!dataSetMessage->data.keyFrameData.fieldNames)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- #endif
- /* Loop over the fields */
- size_t counter = 0;
- UA_DataSetField *dsf;
- TAILQ_FOREACH(dsf, ¤tDataSet->fields, listEntry) {
- #ifdef UA_ENABLE_JSON_ENCODING
- /* json: store the fieldNameAlias*/
- UA_String_copy(&dsf->config.field.variable.fieldNameAlias,
- &dataSetMessage->data.keyFrameData.fieldNames[counter]);
- #endif
- /* Sample the value */
- UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
- UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
- /* Deactivate statuscode? */
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
- dfv->hasStatus = false;
- /* Deactivate timestamps */
- if(((u64)dataSetWriter->config.dataSetFieldContentMask &
- (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
- dfv->hasSourceTimestamp = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask &
- (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
- dfv->hasSourcePicoseconds = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask &
- (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
- dfv->hasServerTimestamp = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask &
- (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
- dfv->hasServerPicoseconds = false;
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- /* Update lastValue store */
- UA_DataValue_clear(&dataSetWriter->lastSamples[counter].value);
- UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
- #endif
- counter++;
- }
- return UA_STATUSCODE_GOOD;
- }
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- static UA_StatusCode
- UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
- UA_DataSetMessage *dataSetMessage,
- UA_DataSetWriter *dataSetWriter) {
- UA_PublishedDataSet *currentDataSet =
- UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
- if(!currentDataSet)
- return UA_STATUSCODE_BADNOTFOUND;
- /* Prepare DataSetMessageContent */
- memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
- dataSetMessage->header.dataSetMessageValid = true;
- dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
- if (currentDataSet->fieldSize == 0) {
- return UA_STATUSCODE_GOOD;
- }
- UA_DataSetField *dsf;
- size_t counter = 0;
- TAILQ_FOREACH(dsf, ¤tDataSet->fields, listEntry) {
- /* Sample the value */
- UA_DataValue value;
- UA_DataValue_init(&value);
- UA_PubSubDataSetField_sampleValue(server, dsf, &value);
- /* Check if the value has changed */
- if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
- /* increase fieldCount for current delta message */
- dataSetMessage->data.deltaFrameData.fieldCount++;
- dataSetWriter->lastSamples[counter].valueChanged = true;
- /* Update last stored sample */
- UA_DataValue_clear(&dataSetWriter->lastSamples[counter].value);
- dataSetWriter->lastSamples[counter].value = value;
- } else {
- UA_DataValue_clear(&value);
- dataSetWriter->lastSamples[counter].valueChanged = false;
- }
- counter++;
- }
- /* Allocate DeltaFrameFields */
- UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
- UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
- if(!deltaFields)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
- size_t currentDeltaField = 0;
- for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
- if(!dataSetWriter->lastSamples[i].valueChanged)
- continue;
- UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
- dff->fieldIndex = (UA_UInt16) i;
- UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
- dataSetWriter->lastSamples[i].valueChanged = false;
- /* Deactivate statuscode? */
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
- dff->fieldValue.hasStatus = false;
- /* Deactivate timestamps? */
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
- dff->fieldValue.hasSourceTimestamp = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
- dff->fieldValue.hasServerPicoseconds = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
- dff->fieldValue.hasServerTimestamp = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
- dff->fieldValue.hasServerPicoseconds = false;
- currentDeltaField++;
- }
- return UA_STATUSCODE_GOOD;
- }
- #endif
- /**
- * Generate a DataSetMessage for the given writer.
- *
- * @param dataSetWriter ptr to corresponding writer
- * @return ptr to generated DataSetMessage
- */
- static UA_StatusCode
- UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
- UA_DataSetWriter *dataSetWriter) {
- UA_PublishedDataSet *currentDataSet =
- UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
- if(!currentDataSet)
- return UA_STATUSCODE_BADNOTFOUND;
- /* Reset the message */
- memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
- /* store messageType to switch between json or uadp (default) */
- UA_UInt16 messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
- UA_JsonDataSetWriterMessageDataType *jsonDataSetWriterMessageDataType = NULL;
- /* The configuration Flags are included
- * inside the std. defined UA_UadpDataSetWriterMessageDataType */
- UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
- UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
- if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
- dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
- (dataSetWriter->config.messageSettings.content.decoded.type ==
- &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
- dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
- dataSetWriter->config.messageSettings.content.decoded.data;
- /* type is UADP */
- messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
- } else if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
- dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
- (dataSetWriter->config.messageSettings.content.decoded.type ==
- &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE])) {
- jsonDataSetWriterMessageDataType = (UA_JsonDataSetWriterMessageDataType *)
- dataSetWriter->config.messageSettings.content.decoded.data;
- /* type is JSON */
- messageType = UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE;
- } else {
- /* create default flag configuration if no
- * UadpDataSetWriterMessageDataType was passed in */
- memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
- defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
- ((u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
- dataSetWriterMessageDataType = &defaultUadpConfiguration;
- }
- /* Sanity-test the configuration */
- if(dataSetWriterMessageDataType &&
- (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
- dataSetWriterMessageDataType->dataSetOffset != 0 ||
- dataSetWriterMessageDataType->configuredSize != 0)) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Static DSM configuration not supported. Using defaults");
- dataSetWriterMessageDataType->networkMessageNumber = 0;
- dataSetWriterMessageDataType->dataSetOffset = 0;
- dataSetWriterMessageDataType->configuredSize = 0;
- }
- /* The field encoding depends on the flags inside the writer config.
- * TODO: This can be moved to the encoding layer. */
- if(dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_RAWDATA
- ) {
- dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
- } else if((u64)dataSetWriter->config.dataSetFieldContentMask &
- ((u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
- (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
- dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
- } else {
- dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
- }
- if(messageType == UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE) {
- /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION) {
- dataSetMessage->header.configVersionMajorVersionEnabled = true;
- dataSetMessage->header.configVersionMajorVersion =
- currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
- }
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION) {
- dataSetMessage->header.configVersionMinorVersionEnabled = true;
- dataSetMessage->header.configVersionMinorVersion =
- currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
- }
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
- dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
- dataSetMessage->header.dataSetMessageSequenceNr =
- dataSetWriter->actualDataSetMessageSequenceCount;
- }
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
- dataSetMessage->header.timestampEnabled = true;
- dataSetMessage->header.timestamp = UA_DateTime_now();
- }
- /* TODO: Picoseconds resolution not supported atm */
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
- dataSetMessage->header.picoSecondsIncluded = false;
- }
- /* TODO: Statuscode not supported yet */
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_STATUS) {
- dataSetMessage->header.statusEnabled = false;
- }
- } else if(messageType == UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE) {
- if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
- dataSetMessage->header.configVersionMajorVersionEnabled = true;
- dataSetMessage->header.configVersionMajorVersion =
- currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
- }
- if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
- dataSetMessage->header.configVersionMinorVersionEnabled = true;
- dataSetMessage->header.configVersionMinorVersion =
- currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
- }
- if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
- dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
- dataSetMessage->header.dataSetMessageSequenceNr =
- dataSetWriter->actualDataSetMessageSequenceCount;
- }
- if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP) {
- dataSetMessage->header.timestampEnabled = true;
- dataSetMessage->header.timestamp = UA_DateTime_now();
- }
- /* TODO: Statuscode not supported yet */
- if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_JSONDATASETMESSAGECONTENTMASK_STATUS) {
- dataSetMessage->header.statusEnabled = false;
- }
- }
- /* Set the sequence count. Automatically rolls over to zero */
- dataSetWriter->actualDataSetMessageSequenceCount++;
- /* JSON does not differ between deltaframes and keyframes, only keyframes are currently used. */
- if(messageType != UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
- if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
- dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
- /* Remove old samples */
- for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
- UA_DataValue_clear(&dataSetWriter->lastSamples[i].value);
- /* Realloc pds dependent memory */
- dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
- UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
- UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
- if(!newSamplesArray)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- dataSetWriter->lastSamples = newSamplesArray;
- memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
- dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
- UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
- dataSetWriter->deltaFrameCounter = 0;
- return UA_STATUSCODE_GOOD;
- }
- /* The standard defines: if a PDS contains only one fields no delta messages
- * should be generated because they need more memory than a keyframe with 1
- * field. */
- if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
- dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
- UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
- dataSetWriter->deltaFrameCounter++;
- return UA_STATUSCODE_GOOD;
- }
- dataSetWriter->deltaFrameCounter = 1;
- #endif
- }
- UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
- return UA_STATUSCODE_GOOD;
- }
- static UA_StatusCode
- sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
- UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
- UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
- #ifdef UA_ENABLE_JSON_ENCODING
- UA_NetworkMessage nm;
- memset(&nm, 0, sizeof(UA_NetworkMessage));
- nm.version = 1;
- nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
- nm.payloadHeaderEnabled = true;
- nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
- nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
- nm.payload.dataSetPayload.dataSetMessages = dsm;
- /* Allocate the buffer. Allocate on the stack if the buffer is small. */
- UA_ByteString buf;
- size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true);
- size_t stackSize = 1;
- if(msgSize <= UA_MAX_STACKBUF)
- stackSize = msgSize;
- UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
- buf.data = stackBuf;
- buf.length = msgSize;
- if(msgSize > UA_MAX_STACKBUF) {
- retval = UA_ByteString_allocBuffer(&buf, msgSize);
- if(retval != UA_STATUSCODE_GOOD)
- return retval;
- }
- /* Encode the message */
- UA_Byte *bufPos = buf.data;
- memset(bufPos, 0, msgSize);
- const UA_Byte *bufEnd = &buf.data[buf.length];
- retval = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true);
- if(retval != UA_STATUSCODE_GOOD) {
- if(msgSize > UA_MAX_STACKBUF)
- UA_ByteString_clear(&buf);
- return retval;
- }
- /* Send the prepared messages */
- retval = connection->channel->send(connection->channel, transportSettings, &buf);
- if(msgSize > UA_MAX_STACKBUF)
- UA_ByteString_clear(&buf);
- #endif
- return retval;
- }
- static UA_StatusCode
- generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
- UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
- UA_ExtensionObject *messageSettings,
- UA_ExtensionObject *transportSettings,
- UA_NetworkMessage *networkMessage) {
- if(messageSettings->content.decoded.type !=
- &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
- return UA_STATUSCODE_BADINTERNALERROR;
- UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
- messageSettings->content.decoded.data;
- networkMessage->publisherIdEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
- networkMessage->groupHeaderEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
- networkMessage->groupHeader.writerGroupIdEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
- networkMessage->groupHeader.groupVersionEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
- networkMessage->groupHeader.networkMessageNumberEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
- networkMessage->groupHeader.sequenceNumberEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
- networkMessage->payloadHeaderEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
- networkMessage->timestampEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
- networkMessage->picosecondsEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
- networkMessage->dataSetClassIdEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
- networkMessage->promotedFieldsEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
- networkMessage->version = 1;
- networkMessage->networkMessageType = UA_NETWORKMESSAGE_DATASET;
- if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
- networkMessage->publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
- networkMessage->publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
- } else if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
- networkMessage->publisherIdType = UA_PUBLISHERDATATYPE_STRING;
- networkMessage->publisherId.publisherIdString = connection->config->publisherId.string;
- }
- if(networkMessage->groupHeader.sequenceNumberEnabled)
- networkMessage->groupHeader.sequenceNumber = wg->sequenceNumber;
- /* Compute the length of the dsm separately for the header */
- UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
- for(UA_Byte i = 0; i < dsmCount; i++)
- dsmLengths[i] = (UA_UInt16) UA_DataSetMessage_calcSizeBinary(&dsm[i], NULL, 0);
- networkMessage->payloadHeader.dataSetPayloadHeader.count = dsmCount;
- networkMessage->payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
- networkMessage->groupHeader.writerGroupId = wg->config.writerGroupId;
- /* number of the NetworkMessage inside a PublishingInterval */
- networkMessage->groupHeader.networkMessageNumber = 1;
- networkMessage->payload.dataSetPayload.sizes = dsmLengths;
- networkMessage->payload.dataSetPayload.dataSetMessages = dsm;
- return UA_STATUSCODE_GOOD;
- }
- static UA_StatusCode
- sendBufferedNetworkMessage(UA_Server *server, UA_PubSubConnection *connection,
- UA_NetworkMessageOffsetBuffer *buffer, UA_ExtensionObject *transportSettings) {
- if(UA_NetworkMessage_updateBufferedMessage(buffer) != UA_STATUSCODE_GOOD)
- UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "PubSub sending. Unknown field type.");
- UA_StatusCode retval = connection->channel->send(connection->channel, transportSettings, &buffer->buffer);
- return retval;
- }
- static UA_StatusCode
- sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
- UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
- UA_ExtensionObject *messageSettings,
- UA_ExtensionObject *transportSettings) {
- UA_NetworkMessage nm;
- memset(&nm, 0, sizeof(UA_NetworkMessage));
- generateNetworkMessage(connection, wg, dsm, writerIds, dsmCount, messageSettings, transportSettings, &nm);
- /* Allocate the buffer. Allocate on the stack if the buffer is small. */
- UA_ByteString buf;
- size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm, NULL);
- size_t stackSize = 1;
- if(msgSize <= UA_MAX_STACKBUF)
- stackSize = msgSize;
- UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
- buf.data = stackBuf;
- buf.length = msgSize;
- UA_StatusCode retval;
- if(msgSize > UA_MAX_STACKBUF) {
- retval = UA_ByteString_allocBuffer(&buf, msgSize);
- if(retval != UA_STATUSCODE_GOOD)
- return retval;
- }
- /* Encode the message */
- UA_Byte *bufPos = buf.data;
- memset(bufPos, 0, msgSize);
- const UA_Byte *bufEnd = &buf.data[buf.length];
- retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
- if(retval != UA_STATUSCODE_GOOD) {
- if(msgSize > UA_MAX_STACKBUF)
- UA_ByteString_clear(&buf);
- return retval;
- }
- /* Send the prepared messages */
- retval = connection->channel->send(connection->channel, transportSettings, &buf);
- if(msgSize > UA_MAX_STACKBUF)
- UA_ByteString_clear(&buf);
- return retval;
- }
- /* This callback triggers the collection and publish of NetworkMessages and the
- * contained DataSetMessages. */
- void
- UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
- UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Publish Callback");
- if(!writerGroup) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Publish failed. WriterGroup not found");
- return;
- }
- /* Nothing to do? */
- if(writerGroup->writersCount <= 0)
- return;
- if(!writerGroup->linkedConnectionPtr){
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Publish failed: Invalid reference to PubSubConnection");
- return;
- }
- /* Binary or Json encoding? */
- if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
- writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Publish failed: Unknown encoding type.");
- return;
- }
- /* Find the connection associated with the writer */
- UA_PubSubConnection *connection = writerGroup->linkedConnectionPtr;
- if(!connection) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Publish failed. PubSubConnection invalid.");
- return;
- }
- if(writerGroup->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE) {
- sendBufferedNetworkMessage(server, connection, &writerGroup->bufferedMessage, &writerGroup->config.transportSettings);
- writerGroup->sequenceNumber++;
- return;
- }
- /* How many DSM can be sent in one NM? */
- UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
- if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
- maxDSM = UA_BYTE_MAX;
- /* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
- if(maxDSM == 0)
- maxDSM = 1;
- /* It is possible to put several DataSetMessages into one NetworkMessage.
- * But only if they do not contain promoted fields. NM with only DSM are
- * sent out right away. The others are kept in a buffer for "batching". */
- size_t dsmCount = 0;
- UA_DataSetWriter *dsw;
- UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
- UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
- LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
- /* Find the dataset */
- UA_PublishedDataSet *pds =
- UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
- if(!pds) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub Publish: PublishedDataSet not found");
- continue;
- }
- /* Generate the DSM */
- UA_StatusCode res =
- UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
- if(res != UA_STATUSCODE_GOOD) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub Publish: DataSetMessage creation failed");
- continue;
- }
- /* Send right away if there is only this DSM in a NM. If promoted fields
- * are contained in the PublishedDataSet, then this DSM must go into a
- * dedicated NM as well. */
- if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
- if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
- res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
- &dsw->config.dataSetWriterId, 1,
- &writerGroup->config.messageSettings,
- &writerGroup->config.transportSettings);
- }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
- res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
- &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
- }
- if(res != UA_STATUSCODE_GOOD)
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub Publish: Could not send a NetworkMessage");
- if(writerGroup->config.rtLevel == UA_PUBSUB_RT_DIRECT_VALUE_ACCESS){
- for (size_t i = 0; i < dsmStore[dsmCount].data.keyFrameData.fieldCount; ++i) {
- dsmStore[dsmCount].data.keyFrameData.dataSetFields[i].value.data = NULL;
- }
- }
- UA_DataSetMessage_free(&dsmStore[dsmCount]);
- continue;
- }
- dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
- dsmCount++;
- }
- /* Send the NetworkMessages with batched DataSetMessages */
- size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
- for(UA_UInt32 i = 0; i < nmCount; i++) {
- UA_Byte nmDsmCount = maxDSM;
- if(i == nmCount - 1 && (dsmCount % maxDSM))
- nmDsmCount = (UA_Byte)dsmCount % maxDSM;
- UA_StatusCode res3 = UA_STATUSCODE_GOOD;
- if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
- res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
- &dsWriterIds[i * maxDSM], nmDsmCount,
- &writerGroup->config.messageSettings,
- &writerGroup->config.transportSettings);
- writerGroup->sequenceNumber++;
- }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
- res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
- &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
- writerGroup->sequenceNumber++;
- }
- if(res3 != UA_STATUSCODE_GOOD)
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub Publish: Sending a NetworkMessage failed");
- }
- /* Clean up DSM */
- for(size_t i = 0; i < dsmCount; i++)
- UA_DataSetMessage_free(&dsmStore[i]);
- }
- /* Add new publishCallback. The first execution is triggered directly after
- * creation. */
- UA_StatusCode
- UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
- UA_StatusCode retval =
- UA_PubSubManager_addRepeatedCallback(server,
- (UA_ServerCallback) UA_WriterGroup_publishCallback,
- writerGroup, writerGroup->config.publishingInterval,
- &writerGroup->publishCallbackId);
- if(retval == UA_STATUSCODE_GOOD)
- writerGroup->publishCallbackIsRegistered = true;
- /* Run once after creation */
- UA_WriterGroup_publishCallback(server, writerGroup);
- return retval;
- }
- #endif /* UA_ENABLE_PUBSUB */
|