|
- /* This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- *
- * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
- * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
- * Copyright (c) 2019 Kalycito Infotech Private Limited
- */
- #include "server/ua_server_internal.h"
- #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
- #include "ua_pubsub.h"
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- #include "ua_pubsub_ns0.h"
- #endif
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- #include "ua_types_encoding_binary.h"
- #endif
- #define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */
- #define UA_MAX_SIZENAME 64 /* Max size of Qualified Name of Subscribed Variable */
- /* Forward declaration */
- static void
- UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup);
- static void
- UA_DataSetField_deleteMembers(UA_DataSetField *field);
- /**********************************************/
- /* Connection */
- /**********************************************/
- UA_StatusCode
- UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
- UA_PubSubConnectionConfig *dst) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
- retVal |= UA_String_copy(&src->name, &dst->name);
- retVal |= UA_Variant_copy(&src->address, &dst->address);
- retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
- retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
- if(src->connectionPropertiesSize > 0){
- dst->connectionProperties = (UA_KeyValuePair *)
- UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
- if(!dst->connectionProperties){
- return UA_STATUSCODE_BADOUTOFMEMORY;
- }
- for(size_t i = 0; i < src->connectionPropertiesSize; i++){
- retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
- &dst->connectionProperties[i].key);
- retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
- &dst->connectionProperties[i].value);
- }
- }
- return retVal;
- }
- UA_StatusCode
- UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
- UA_PubSubConnectionConfig *config) {
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_PubSubConnection *currentPubSubConnection =
- UA_PubSubConnection_findConnectionbyId(server, connection);
- if(!currentPubSubConnection)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
- //deep copy of the actual config
- UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
- *config = tmpPubSubConnectionConfig;
- return UA_STATUSCODE_GOOD;
- }
- UA_PubSubConnection *
- UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
- for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
- if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
- return &server->pubSubManager.connections[i];
- }
- }
- return NULL;
- }
- void
- UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig) {
- UA_String_deleteMembers(&connectionConfig->name);
- UA_String_deleteMembers(&connectionConfig->transportProfileUri);
- UA_Variant_deleteMembers(&connectionConfig->connectionTransportSettings);
- UA_Variant_deleteMembers(&connectionConfig->address);
- for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
- UA_QualifiedName_deleteMembers(&connectionConfig->connectionProperties[i].key);
- UA_Variant_deleteMembers(&connectionConfig->connectionProperties[i].value);
- }
- UA_free(connectionConfig->connectionProperties);
- }
- void
- UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection) {
- //delete connection config
- UA_PubSubConnectionConfig_deleteMembers(connection->config);
- //remove contained WriterGroups
- UA_WriterGroup *writerGroup, *tmpWriterGroup;
- LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
- UA_Server_removeWriterGroup(server, writerGroup->identifier);
- }
- /* remove contained ReaderGroups */
- UA_ReaderGroup *readerGroups, *tmpReaderGroup;
- LIST_FOREACH_SAFE(readerGroups, &connection->readerGroups, listEntry, tmpReaderGroup){
- UA_Server_removeReaderGroup(server, readerGroups->identifier);
- }
- UA_NodeId_deleteMembers(&connection->identifier);
- if(connection->channel){
- connection->channel->close(connection->channel);
- }
- UA_free(connection->config);
- }
- UA_StatusCode
- UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
- const UA_WriterGroupConfig *writerGroupConfig,
- UA_NodeId *writerGroupIdentifier) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- if(!writerGroupConfig)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- //search the connection by the given connectionIdentifier
- UA_PubSubConnection *currentConnectionContext =
- UA_PubSubConnection_findConnectionbyId(server, connection);
- if(!currentConnectionContext)
- return UA_STATUSCODE_BADNOTFOUND;
- //allocate memory for new WriterGroup
- UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
- if(!newWriterGroup)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- newWriterGroup->linkedConnection = currentConnectionContext->identifier;
- UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
- if(writerGroupIdentifier){
- UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
- }
- //deep copy of the config
- UA_WriterGroupConfig tmpWriterGroupConfig;
- retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
- if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
- UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
- tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
- tmpWriterGroupConfig.messageSettings.content.decoded.type =
- &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
- tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
- }
- newWriterGroup->config = tmpWriterGroupConfig;
- retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
- LIST_INSERT_HEAD(¤tConnectionContext->writerGroups, newWriterGroup, listEntry);
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- addWriterGroupRepresentation(server, newWriterGroup);
- #endif
- return retVal;
- }
- UA_StatusCode
- UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
- UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
- if(!wg)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_PubSubConnection *connection =
- UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
- if(!connection)
- return UA_STATUSCODE_BADNOTFOUND;
- //unregister the publish callback
- UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- removeGroupRepresentation(server, wg);
- #endif
- UA_WriterGroup_deleteMembers(server, wg);
- LIST_REMOVE(wg, listEntry);
- UA_free(wg);
- return UA_STATUSCODE_GOOD;
- }
- /**********************************************/
- /* ReaderGroup */
- /**********************************************/
- /**
- * Add ReaderGroup to connection.
- *
- * @param server
- * @param connectionIdentifier
- * @param readerGroupConfiguration
- * @param readerGroupIdentifier
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_Server_addReaderGroup(UA_Server *server, UA_NodeId connectionIdentifier,
- const UA_ReaderGroupConfig *readerGroupConfig,
- UA_NodeId *readerGroupIdentifier) {
- UA_StatusCode retval = UA_STATUSCODE_GOOD;
- UA_ReaderGroupConfig tmpReaderGroupConfig;
- /* Search the connection by the given connectionIdentifier */
- if(!readerGroupConfig) {
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- /* Search the connection by the given connectionIdentifier */
- UA_PubSubConnection *currentConnectionContext = UA_PubSubConnection_findConnectionbyId(server, connectionIdentifier);
- if(!currentConnectionContext) {
- return UA_STATUSCODE_BADNOTFOUND;
- }
- /* Allocate memory for new reader group */
- UA_ReaderGroup *newGroup = (UA_ReaderGroup *)UA_calloc(1, sizeof(UA_ReaderGroup));
- if(!newGroup) {
- return UA_STATUSCODE_BADOUTOFMEMORY;
- }
- /* Generate nodeid for the readergroup identifier */
- newGroup->linkedConnection = currentConnectionContext->identifier;
- UA_PubSubManager_generateUniqueNodeId(server, &newGroup->identifier);
- if(readerGroupIdentifier) {
- UA_NodeId_copy(&newGroup->identifier, readerGroupIdentifier);
- }
- /* Deep copy of the config */
- retval |= UA_ReaderGroupConfig_copy(readerGroupConfig, &tmpReaderGroupConfig);
- newGroup->config = tmpReaderGroupConfig;
- retval |= UA_ReaderGroup_addSubscribeCallback(server, newGroup);
- LIST_INSERT_HEAD(¤tConnectionContext->readerGroups, newGroup, listEntry);
- currentConnectionContext->readerGroupsSize++;
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- addReaderGroupRepresentation(server, newGroup);
- #endif
- return retval;
- }
- /**
- * Remove ReaderGroup from connection and delete contained readers.
- *
- * @param server
- * @param groupIdentifier
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_Server_removeReaderGroup(UA_Server *server, UA_NodeId groupIdentifier) {
- UA_ReaderGroup* readerGroup = UA_ReaderGroup_findRGbyId(server, groupIdentifier);
- if(readerGroup == NULL) {
- return UA_STATUSCODE_BADNOTFOUND;
- }
- /* Search the connection to which the given readergroup is connected to */
- UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
- if(connection == NULL) {
- return UA_STATUSCODE_BADNOTFOUND;
- }
- /* Unregister subscribe callback */
- UA_PubSubManager_removeRepeatedPubSubCallback(server, readerGroup->subscribeCallbackId);
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- /* To Do:RemoveGroupRepresentation(server, &readerGroup->identifier) */
- #endif
- /* UA_Server_ReaderGroup_delete also removes itself from the list */
- UA_Server_ReaderGroup_delete(server, readerGroup);
- /* Remove readerGroup from Connection */
- LIST_REMOVE(readerGroup, listEntry);
- UA_free(readerGroup);
- return UA_STATUSCODE_GOOD;
- }
- /**
- * To Do:
- * Update ReaderGroup configuration.
- *
- * @param server
- * @param readerGroupIdentifier
- * @param readerGroupConfiguration
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_Server_ReaderGroup_updateConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
- const UA_ReaderGroupConfig *config) {
- return UA_STATUSCODE_BADNOTIMPLEMENTED;
- }
- /**
- * Get ReaderGroup configuration.
- *
- * @param server
- * @param groupIdentifier
- * @param readerGroupConfiguration
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_Server_ReaderGroup_getConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
- UA_ReaderGroupConfig *config) {
- if(!config) {
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- /* Identify the readergroup through the readerGroupIdentifier */
- UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
- if(!currentReaderGroup) {
- return UA_STATUSCODE_BADNOTFOUND;
- }
- UA_ReaderGroupConfig tmpReaderGroupConfig;
- /* deep copy of the actual config */
- UA_ReaderGroupConfig_copy(¤tReaderGroup->config, &tmpReaderGroupConfig);
- *config = tmpReaderGroupConfig;
- return UA_STATUSCODE_GOOD;
- }
- /* To Do UA_ReaderGroupConfig delete */
- /**
- * Delete ReaderGroup.
- *
- * @param server
- * @param groupIdentifier
- */
- void UA_Server_ReaderGroup_delete(UA_Server* server, UA_ReaderGroup *readerGroup) {
- /* To Do Call UA_ReaderGroupConfig_delete */
- UA_DataSetReader *dataSetReader, *tmpDataSetReader;
- LIST_FOREACH_SAFE(dataSetReader, &readerGroup->readers, listEntry, tmpDataSetReader) {
- UA_DataSetReader_delete(server, dataSetReader);
- }
- UA_PubSubConnection* pConn = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
- if(pConn != NULL) {
- pConn->readerGroupsSize--;
- }
- /* Delete ReaderGroup and its members */
- UA_String_deleteMembers(&readerGroup->config.name);
- UA_NodeId_deleteMembers(&readerGroup->linkedConnection);
- UA_NodeId_deleteMembers(&readerGroup->identifier);
- }
- /**
- * Copy ReaderGroup configuration.
- *
- * @param source
- * @param destination
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,
- UA_ReaderGroupConfig *dst) {
- UA_String_copy(&src->name, &dst->name);
- /* Currently simple memcpy only */
- memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
- return UA_STATUSCODE_GOOD;
- }
- static UA_DataSetReader *
- getReaderFromIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_PubSubConnection *pConnection) {
- if(pConnection->readerGroupsSize == 1) {
- if(LIST_FIRST(&pConnection->readerGroups)->readersCount == 1) {
- UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "only 1 DataSetReader available. This one will be used.");
- return LIST_FIRST(&LIST_FIRST(&pConnection->readerGroups)->readers);
- }
- }
- if(!pMsg->publisherIdEnabled)
- return NULL;
- UA_ReaderGroup* readerGroup;
- LIST_FOREACH(readerGroup, &pConnection->readerGroups, listEntry) {
- UA_DataSetReader *tmpReader;
- LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
- switch (pMsg->publisherIdType) {
- case UA_PUBLISHERDATATYPE_BYTE:
- if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_BYTE] &&
- pMsg->publisherIdType == UA_PUBLISHERDATATYPE_BYTE &&
- pMsg->publisherId.publisherIdByte == *(UA_Byte*)tmpReader->config.publisherId.data) {
- return tmpReader;
- }
- break;
- case UA_PUBLISHERDATATYPE_UINT16:
- if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT16] &&
- pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT16 &&
- pMsg->publisherId.publisherIdUInt16 == *(UA_UInt16*)tmpReader->config.publisherId.data) {
- return tmpReader;
- }
- break;
- case UA_PUBLISHERDATATYPE_UINT32:
- if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT32] &&
- pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT32 &&
- pMsg->publisherId.publisherIdUInt32 == *(UA_UInt32*)tmpReader->config.publisherId.data) {
- return tmpReader;
- }
- break;
- case UA_PUBLISHERDATATYPE_UINT64:
- if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT64] &&
- pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT64 &&
- pMsg->publisherId.publisherIdUInt64 == *(UA_UInt64*)tmpReader->config.publisherId.data) {
- return tmpReader;
- }
- break;
- case UA_PUBLISHERDATATYPE_STRING:
- if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_STRING] &&
- pMsg->publisherIdType == UA_PUBLISHERDATATYPE_STRING &&
- UA_String_equal(&pMsg->publisherId.publisherIdString, (UA_String*)tmpReader->config.publisherId.data)) {
- return tmpReader;
- }
- break;
- default:
- return NULL;
- }
- }
- }
- return NULL;
- }
- /**
- * Process NetworkMessage.
- *
- * @param server
- * @param networkmessage
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_Server_processNetworkMessage(UA_Server *server, UA_NetworkMessage *pMsg,
- UA_PubSubConnection *pConnection) {
- if(!pMsg || !pConnection)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- /* To Do The condition with dataSetWriterIdAvailable and WriterGroupIdAvailable to be handled
- * when pMsg->groupHeaderEnabled, pMsg->dataSetClassIdEnabled, pMsg->payloadHeaderEnabled
- * Here some filtering is possible */
- UA_DataSetReader* dataSetReaderErg = getReaderFromIdentifier(server, pMsg, pConnection);
- /* No Reader with the specified id found */
- if(!dataSetReaderErg) {
- UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "No DataSetReader found with PublisherId");
- return UA_STATUSCODE_BADNOTFOUND; /* TODO: Check the return code */
- }
- UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetReader found with PublisherId");
- UA_Byte anzDataSets = 1;
- if(pMsg->payloadHeaderEnabled)
- anzDataSets = pMsg->payloadHeader.dataSetPayloadHeader.count;
- for(UA_Byte iterator = 0; iterator < anzDataSets; iterator++) {
- UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Process Msg with DataSetReader!");
- UA_Server_DataSetReader_process(server, dataSetReaderErg, &pMsg->payload.dataSetPayload.dataSetMessages[iterator]);
- }
- /* To Do the condition with dataSetWriterId and WriterGroupId
- * else condition for dataSetWriterIdAvailable and writerGroupIdAvailable) */
- return UA_STATUSCODE_GOOD;
- }
- /**
- * Find ReaderGroup with its identifier.
- *
- * @param server
- * @param groupIdentifier
- * @return the ReaderGroup or NULL if not found
- */
- UA_ReaderGroup * UA_ReaderGroup_findRGbyId(UA_Server *server, UA_NodeId identifier) {
- for (size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
- UA_ReaderGroup* readerGroup = NULL;
- LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
- if(UA_NodeId_equal(&identifier, &readerGroup->identifier)) {
- return readerGroup;
- }
- }
- }
- return NULL;
- }
- /**
- * Find a DataSetReader with its identifier
- *
- * @param server
- * @param identifier
- * @return the DataSetReader or NULL if not found
- */
- UA_DataSetReader *UA_ReaderGroup_findDSRbyId(UA_Server *server, UA_NodeId identifier) {
- for (size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
- UA_ReaderGroup* readerGroup = NULL;
- LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
- UA_DataSetReader *tmpReader;
- LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
- if(UA_NodeId_equal(&tmpReader->identifier, &identifier)) {
- return tmpReader;
- }
- }
- }
- }
- return NULL;
- }
- /**********************************************/
- /* DataSetReader */
- /**********************************************/
- /**
- * Add a DataSetReader to ReaderGroup
- *
- * @param server
- * @param readerGroupIdentifier
- * @param dataSetReaderConfig
- * @param readerIdentifier
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_Server_addDataSetReader(UA_Server *server, UA_NodeId readerGroupIdentifier,
- const UA_DataSetReaderConfig *dataSetReaderConfig,
- UA_NodeId *readerIdentifier) {
- /* Search the reader group by the given readerGroupIdentifier */
- UA_ReaderGroup *readerGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
- if(!dataSetReaderConfig) {
- return UA_STATUSCODE_BADNOTFOUND;
- }
- if(readerGroup == NULL) {
- return UA_STATUSCODE_BADNOTFOUND;
- }
- /* Allocate memory for new DataSetReader */
- UA_DataSetReader *newDataSetReader = (UA_DataSetReader *)UA_calloc(1, sizeof(UA_DataSetReader));
- /* Copy the config into the new dataSetReader */
- UA_DataSetReaderConfig_copy(dataSetReaderConfig, &newDataSetReader->config);
- newDataSetReader->linkedReaderGroup = readerGroup->identifier;
- UA_PubSubManager_generateUniqueNodeId(server, &newDataSetReader->identifier);
- if(readerIdentifier != NULL) {
- UA_NodeId_copy(&newDataSetReader->identifier, readerIdentifier);
- }
- /* Add the new reader to the group */
- LIST_INSERT_HEAD(&readerGroup->readers, newDataSetReader, listEntry);
- readerGroup->readersCount++;
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- addDataSetReaderRepresentation(server, newDataSetReader);
- #endif
- return UA_STATUSCODE_GOOD;
- }
- /**
- * Remove a DataSetReader from ReaderGroup
- *
- * @param server
- * @param readerGroupIdentifier
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_Server_removeDataSetReader(UA_Server *server, UA_NodeId readerIdentifier) {
- /* Remove datasetreader given by the identifier */
- UA_DataSetReader *dataSetReader = UA_ReaderGroup_findDSRbyId(server, readerIdentifier);
- if(!dataSetReader) {
- return UA_STATUSCODE_BADNOTFOUND;
- }
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- removeDataSetReaderRepresentation(server, dataSetReader);
- #endif
- UA_DataSetReader_delete(server, dataSetReader);
- return UA_STATUSCODE_GOOD;
- }
- /**
- * Update the config of the DataSetReader.
- *
- * @param server
- * @param dataSetReaderIdentifier
- * @param readerGroupIdentifier
- * @param config
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_Server_DataSetReader_updateConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier, UA_NodeId readerGroupIdentifier,
- const UA_DataSetReaderConfig *config) {
- if(config == NULL) {
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- UA_DataSetReader *currentDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
- UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
- if(!currentDataSetReader) {
- return UA_STATUSCODE_BADNOTFOUND;
- }
- /* The update functionality will be extended during the next PubSub batches.
- * Currently is only a change of the publishing interval possible. */
- if(currentDataSetReader->config.writerGroupId != config->writerGroupId) {
- UA_PubSubManager_removeRepeatedPubSubCallback(server, currentReaderGroup->subscribeCallbackId);
- currentDataSetReader->config.writerGroupId = config->writerGroupId;
- UA_ReaderGroup_subscribeCallback(server, currentReaderGroup);
- }
- else {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "No or unsupported ReaderGroup update.");
- }
- return UA_STATUSCODE_GOOD;
- }
- /**
- * Get the current config of the UA_DataSetReader.
- *
- * @param server
- * @param dataSetReaderIdentifier
- * @param config
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_Server_DataSetReader_getConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier,
- UA_DataSetReaderConfig *config) {
- if(!config) {
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- UA_DataSetReader *currentDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
- if(!currentDataSetReader) {
- return UA_STATUSCODE_BADNOTFOUND;
- }
- UA_DataSetReaderConfig tmpReaderConfig;
- /* Deep copy of the actual config */
- UA_DataSetReaderConfig_copy(¤tDataSetReader->config, &tmpReaderConfig);
- *config = tmpReaderConfig;
- return UA_STATUSCODE_GOOD;
- }
- /**
- * This Method is used to initially set the SubscribedDataSet to TargetVariablesType and to create the list of target Variables of a SubscribedDataSetType.
- *
- * @param server
- * @param dataSetReaderIdentifier
- * @param targetVariables
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_Server_DataSetReader_createTargetVariables(UA_Server *server, UA_NodeId dataSetReaderIdentifier, UA_TargetVariablesDataType *targetVariables) {
- UA_StatusCode retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
- UA_DataSetReader* pDS = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
- if(pDS == NULL) {
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- if(pDS->subscribedDataSetTarget.targetVariablesSize > 0) {
- UA_TargetVariablesDataType_deleteMembers(&pDS->subscribedDataSetTarget);
- pDS->subscribedDataSetTarget.targetVariablesSize = 0;
- pDS->subscribedDataSetTarget.targetVariables = NULL;
- }
- /* Set subscribed dataset to TargetVariableType */
- pDS->subscribedDataSetType = UA_PUBSUB_SDS_TARGET;
- retval = UA_TargetVariablesDataType_copy(targetVariables, &pDS->subscribedDataSetTarget);
- return retval;
- }
- /**
- * Adds Subscribed Variables from the DataSetMetaData for the given DataSet into the given parent node
- * and creates the corresponding data in the targetVariables of the DataSetReader
- *
- * @param server
- * @param parentNode
- * @param dataSetReaderIdentifier
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode UA_Server_DataSetReader_addTargetVariables(UA_Server *server, UA_NodeId *parentNode, UA_NodeId dataSetReaderIdentifier, UA_SubscribedDataSetEnumType sdsType) {
- if((server == NULL) || (parentNode == NULL)) {
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- UA_StatusCode retval = UA_STATUSCODE_GOOD;
- UA_DataSetReader* pDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
- if(pDataSetReader == NULL) {
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- UA_TargetVariablesDataType targetVars;
- targetVars.targetVariablesSize = pDataSetReader->config.dataSetMetaData.fieldsSize;
- targetVars.targetVariables = (UA_FieldTargetDataType *)UA_calloc(targetVars.targetVariablesSize, sizeof(UA_FieldTargetDataType));
- for (size_t iteratorField = 0; iteratorField < pDataSetReader->config.dataSetMetaData.fieldsSize; iteratorField++) {
- UA_VariableAttributes vAttr = UA_VariableAttributes_default;
- vAttr.valueRank = pDataSetReader->config.dataSetMetaData.fields[iteratorField].valueRank;
- if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize > 0) {
- retval = UA_Array_copy(pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensions, pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize, (void**)&vAttr.arrayDimensions, &UA_TYPES[UA_TYPES_UINT32]);
- if(retval == UA_STATUSCODE_GOOD) {
- vAttr.arrayDimensionsSize = pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize;
- }
- }
- vAttr.dataType = pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType;
- vAttr.accessLevel = UA_ACCESSLEVELMASK_READ;
- UA_LocalizedText_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].description, &vAttr.description);
- UA_QualifiedName qn;
- UA_QualifiedName_init(&qn);
- char szTmpName[UA_MAX_SIZENAME];
- if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length > 0) {
- UA_UInt16 slen = UA_MAX_SIZENAME -1;
- vAttr.displayName.locale = UA_STRING("en-US");
- vAttr.displayName.text = pDataSetReader->config.dataSetMetaData.fields[iteratorField].name;
- if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length < slen) {
- slen = (UA_UInt16)pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length;
- UA_snprintf(szTmpName, sizeof(szTmpName), "%.*s", (int)slen, (const char*)pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.data);
- }
- szTmpName[slen] = '\0';
- qn = UA_QUALIFIEDNAME(1, szTmpName);
- }
- else {
- strcpy(szTmpName, "SubscribedVariable");
- vAttr.displayName = UA_LOCALIZEDTEXT("en-US", szTmpName);
- qn = UA_QUALIFIEDNAME(1, "SubscribedVariable");
- }
- /* Add variable to the given parent node */
- UA_NodeId newNode;
- retval = UA_Server_addVariableNode(server, UA_NODEID_NULL, *parentNode,
- UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), qn,
- UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), vAttr, NULL, &newNode);
- if(retval == UA_STATUSCODE_GOOD) {
- UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "addVariableNode %s succeeded", szTmpName);
- }
- else {
- UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_USERLAND, "addVariableNode: error 0x%x", retval);
- }
- UA_FieldTargetDataType_init(&targetVars.targetVariables[iteratorField]);
- targetVars.targetVariables[iteratorField].attributeId = UA_ATTRIBUTEID_VALUE;
- UA_NodeId_copy(&newNode, &targetVars.targetVariables[iteratorField].targetNodeId);
- UA_NodeId_deleteMembers(&newNode);
- if(vAttr.arrayDimensionsSize > 0) {
- UA_Array_delete(vAttr.arrayDimensions, vAttr.arrayDimensionsSize, &UA_TYPES[UA_TYPES_UINT32]);
- }
- }
- if(sdsType == UA_PUBSUB_SDS_TARGET) {
- retval = UA_Server_DataSetReader_createTargetVariables(server, pDataSetReader->identifier, &targetVars);
- }
- UA_TargetVariablesDataType_deleteMembers(&targetVars);
- return retval;
- }
- /**
- * Process a NetworkMessage with a DataSetReader.
- *
- * @param server
- * @param dataSetReader
- * @param dataSetMsg
- */
- void UA_Server_DataSetReader_process(UA_Server *server, UA_DataSetReader *dataSetReader, UA_DataSetMessage* dataSetMsg) {
- if((dataSetReader == NULL) || (dataSetMsg == NULL) || (server == NULL)) {
- return;
- }
- if(!dataSetMsg->header.dataSetMessageValid) {
- UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: message is not valid");
- /* To Do check ConfigurationVersion*/
- /*if(dataSetMsg->header.configVersionMajorVersionEnabled)
- * {
- * if(dataSetMsg->header.configVersionMajorVersion != dataSetReader->config.dataSetMetaData.configurationVersion.majorVersion)
- * {
- * UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: ConfigurationVersion MajorVersion does not match");
- * return;
- * }
- } */
- }
- else {
- if(dataSetMsg->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) {
- if(dataSetMsg->header.fieldEncoding != UA_FIELDENCODING_RAWDATA) {
- size_t anzFields = dataSetMsg->data.keyFrameData.fieldCount;
- if(dataSetReader->config.dataSetMetaData.fieldsSize < anzFields) {
- anzFields = dataSetReader->config.dataSetMetaData.fieldsSize;
- }
- if(dataSetReader->subscribedDataSetTarget.targetVariablesSize < anzFields) {
- anzFields = dataSetReader->subscribedDataSetTarget.targetVariablesSize;
- }
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- for (UA_UInt16 iteratorField = 0; iteratorField < anzFields; iteratorField++) {
- if(dataSetMsg->data.keyFrameData.dataSetFields[iteratorField].hasValue) {
- if(dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].attributeId == UA_ATTRIBUTEID_VALUE) {
- retVal = UA_Server_writeValue(server, dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].targetNodeId, dataSetMsg->data.keyFrameData.dataSetFields[iteratorField].value);
- if(retVal != UA_STATUSCODE_GOOD) {
- UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write Value KF %u: 0x%x", iteratorField, retVal);
- }
- }
- else {
- UA_WriteValue writeVal;
- UA_WriteValue_init(&writeVal);
- writeVal.attributeId = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].attributeId;
- writeVal.indexRange = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].receiverIndexRange;
- writeVal.nodeId = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].targetNodeId;
- UA_DataValue_copy(&dataSetMsg->data.keyFrameData.dataSetFields[iteratorField], &writeVal.value);
- retVal = UA_Server_write(server, &writeVal);
- if(retVal != UA_STATUSCODE_GOOD) {
- UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write KF %u: 0x%x", iteratorField, retVal);
- }
- }
- }
- }
- }
- }
- }
- }
- /**
- * Copy the config of the DataSetReader.
- *
- * @param src
- * @param dst
- * @return UA_STATUSCODE_GOOD on success
- */
- UA_StatusCode
- UA_DataSetReaderConfig_copy(const UA_DataSetReaderConfig *src,
- UA_DataSetReaderConfig *dst) {
- memset(dst, 0, sizeof(UA_DataSetReaderConfig));
- UA_StatusCode retVal = UA_String_copy(&src->name, &dst->name);
- if(retVal != UA_STATUSCODE_GOOD) {
- return retVal;
- }
- retVal = UA_Variant_copy(&src->publisherId, &dst->publisherId);
- if(retVal != UA_STATUSCODE_GOOD) {
- return retVal;
- }
- dst->writerGroupId = src->writerGroupId;
- dst->dataSetWriterId = src->dataSetWriterId;
- retVal = UA_DataSetMetaDataType_copy(&src->dataSetMetaData, &dst->dataSetMetaData);
- if(retVal != UA_STATUSCODE_GOOD) {
- return retVal;
- }
- dst->dataSetFieldContentMask = src->dataSetFieldContentMask;
- dst->messageReceiveTimeout = src->messageReceiveTimeout;
- /* Currently memcpy is used to copy the securityParameters */
- memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
- retVal = UA_UadpDataSetReaderMessageDataType_copy(&src->messageSettings, &dst->messageSettings);
- if(retVal != UA_STATUSCODE_GOOD) {
- return retVal;
- }
- return UA_STATUSCODE_GOOD;
- }
- /**
- * Delete the DataSetReader.
- *
- * @param server
- * @param dataSetReader
- */
- void UA_DataSetReader_delete(UA_Server *server, UA_DataSetReader *dataSetReader) {
- /* Delete DataSetReader config */
- UA_String_deleteMembers(&dataSetReader->config.name);
- UA_Variant_deleteMembers(&dataSetReader->config.publisherId);
- UA_DataSetMetaDataType_deleteMembers(&dataSetReader->config.dataSetMetaData);
- UA_UadpDataSetReaderMessageDataType_deleteMembers(&dataSetReader->config.messageSettings);
- UA_TargetVariablesDataType_deleteMembers(&dataSetReader->subscribedDataSetTarget);
- /* Delete DataSetReader */
- UA_ReaderGroup* pGroup = UA_ReaderGroup_findRGbyId(server, dataSetReader->linkedReaderGroup);
- if(pGroup != NULL) {
- pGroup->readersCount--;
- }
- UA_NodeId_deleteMembers(&dataSetReader->identifier);
- UA_NodeId_deleteMembers(&dataSetReader->linkedReaderGroup);
- /* Remove DataSetReader from group */
- LIST_REMOVE(dataSetReader, listEntry);
- /* Free memory allocated for DataSetReader */
- UA_free(dataSetReader);
- }
- /**********************************************/
- /* PublishedDataSet */
- /**********************************************/
- UA_StatusCode
- UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
- UA_PublishedDataSetConfig *dst) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
- retVal |= UA_String_copy(&src->name, &dst->name);
- switch(src->publishedDataSetType){
- case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
- //no additional items
- break;
- case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
- if(src->config.itemsTemplate.variablesToAddSize > 0){
- dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
- src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
- }
- for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
- retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
- &dst->config.itemsTemplate.variablesToAdd[i]);
- }
- retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
- &dst->config.itemsTemplate.metaData);
- break;
- default:
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- }
- return retVal;
- }
- UA_StatusCode
- UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
- UA_PublishedDataSetConfig *config){
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
- if(!currentPublishedDataSet)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
- //deep copy of the actual config
- UA_PublishedDataSetConfig_copy(¤tPublishedDataSet->config, &tmpPublishedDataSetConfig);
- *config = tmpPublishedDataSetConfig;
- return UA_STATUSCODE_GOOD;
- }
- UA_PublishedDataSet *
- UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
- for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
- if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
- return &server->pubSubManager.publishedDataSets[i];
- }
- }
- return NULL;
- }
- void
- UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
- //delete pds config
- UA_String_deleteMembers(&pdsConfig->name);
- switch (pdsConfig->publishedDataSetType){
- case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
- //no additional items
- break;
- case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
- if(pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
- for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
- UA_PublishedVariableDataType_deleteMembers(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
- }
- UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
- }
- UA_DataSetMetaDataType_deleteMembers(&pdsConfig->config.itemsTemplate.metaData);
- break;
- default:
- break;
- }
- }
- void
- UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
- UA_PublishedDataSetConfig_deleteMembers(&publishedDataSet->config);
- //delete PDS
- UA_DataSetMetaDataType_deleteMembers(&publishedDataSet->dataSetMetaData);
- UA_DataSetField *field, *tmpField;
- LIST_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
- UA_Server_removeDataSetField(server, field->identifier);
- }
- UA_NodeId_deleteMembers(&publishedDataSet->identifier);
- }
- UA_DataSetFieldResult
- UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
- const UA_DataSetFieldConfig *fieldConfig,
- UA_NodeId *fieldIdentifier) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- UA_DataSetFieldResult result = {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
- if(!fieldConfig)
- return result;
- UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
- if(currentDataSet == NULL){
- result.result = UA_STATUSCODE_BADNOTFOUND;
- return result;
- }
- if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
- result.result = UA_STATUSCODE_BADNOTIMPLEMENTED;
- return result;
- }
- UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
- if(!newField){
- result.result = UA_STATUSCODE_BADINTERNALERROR;
- return result;
- }
- UA_DataSetFieldConfig tmpFieldConfig;
- retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
- newField->config = tmpFieldConfig;
- UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
- if(fieldIdentifier != NULL){
- UA_NodeId_copy(&newField->identifier, fieldIdentifier);
- }
- newField->publishedDataSet = currentDataSet->identifier;
- //update major version of parent published data set
- currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
- LIST_INSERT_HEAD(¤tDataSet->fields, newField, listEntry);
- if(newField->config.field.variable.promotedField)
- currentDataSet->promotedFieldsCount++;
- currentDataSet->fieldSize++;
- result.result = retVal;
- result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
- result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
- return result;
- }
- UA_DataSetFieldResult
- UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
- UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
- UA_DataSetFieldResult result = {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
- if(!currentField)
- return result;
- UA_PublishedDataSet *parentPublishedDataSet =
- UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
- if(!parentPublishedDataSet)
- return result;
- parentPublishedDataSet->fieldSize--;
- if(currentField->config.field.variable.promotedField)
- parentPublishedDataSet->promotedFieldsCount--;
- /* update major version of PublishedDataSet */
- parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
- UA_PubSubConfigurationVersionTimeDifference();
- UA_DataSetField_deleteMembers(currentField);
- LIST_REMOVE(currentField, listEntry);
- UA_free(currentField);
- result.result = UA_STATUSCODE_GOOD;
- result.configurationVersion.majorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion;
- result.configurationVersion.minorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion;
- return result;
- }
- /**********************************************/
- /* DataSetWriter */
- /**********************************************/
- UA_StatusCode
- UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
- UA_DataSetWriterConfig *dst){
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
- retVal |= UA_String_copy(&src->name, &dst->name);
- retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
- retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
- dst->dataSetWriterProperties = (UA_KeyValuePair *)
- UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
- if(!dst->dataSetWriterProperties)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
- retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
- }
- return retVal;
- }
- UA_StatusCode
- UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
- UA_DataSetWriterConfig *config){
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
- if(!currentDataSetWriter)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_DataSetWriterConfig tmpWriterConfig;
- //deep copy of the actual config
- retVal |= UA_DataSetWriterConfig_copy(¤tDataSetWriter->config, &tmpWriterConfig);
- *config = tmpWriterConfig;
- return retVal;
- }
- UA_DataSetWriter *
- UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
- for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
- UA_WriterGroup *tmpWriterGroup;
- LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
- UA_DataSetWriter *tmpWriter;
- LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
- if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
- return tmpWriter;
- }
- }
- }
- }
- return NULL;
- }
- void
- UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig) {
- UA_String_deleteMembers(&pdsConfig->name);
- UA_String_deleteMembers(&pdsConfig->dataSetName);
- for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
- UA_KeyValuePair_deleteMembers(&pdsConfig->dataSetWriterProperties[i]);
- }
- UA_free(pdsConfig->dataSetWriterProperties);
- UA_ExtensionObject_deleteMembers(&pdsConfig->messageSettings);
- }
- static void
- UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
- UA_DataSetWriterConfig_deleteMembers(&dataSetWriter->config);
- //delete DataSetWriter
- UA_NodeId_deleteMembers(&dataSetWriter->identifier);
- UA_NodeId_deleteMembers(&dataSetWriter->linkedWriterGroup);
- UA_NodeId_deleteMembers(&dataSetWriter->connectedDataSet);
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- //delete lastSamples store
- for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
- UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
- }
- UA_free(dataSetWriter->lastSamples);
- dataSetWriter->lastSamples = NULL;
- dataSetWriter->lastSamplesCount = 0;
- #endif
- }
- /**********************************************/
- /* WriterGroup */
- /**********************************************/
- UA_StatusCode
- UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
- UA_WriterGroupConfig *dst){
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- memcpy(dst, src, sizeof(UA_WriterGroupConfig));
- retVal |= UA_String_copy(&src->name, &dst->name);
- retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
- retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
- dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
- if(!dst->groupProperties)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- for(size_t i = 0; i < src->groupPropertiesSize; i++){
- retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
- }
- return retVal;
- }
- UA_StatusCode
- UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
- UA_WriterGroupConfig *config){
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
- if(!currentWriterGroup){
- return UA_STATUSCODE_BADNOTFOUND;
- }
- UA_WriterGroupConfig tmpWriterGroupConfig;
- //deep copy of the actual config
- retVal |= UA_WriterGroupConfig_copy(¤tWriterGroup->config, &tmpWriterGroupConfig);
- *config = tmpWriterGroupConfig;
- return retVal;
- }
- UA_StatusCode
- UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
- const UA_WriterGroupConfig *config){
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
- if(!currentWriterGroup)
- return UA_STATUSCODE_BADNOTFOUND;
- //The update functionality will be extended during the next PubSub batches.
- //Currently is only a change of the publishing interval possible.
- if(currentWriterGroup->config.maxEncapsulatedDataSetMessageCount != config->maxEncapsulatedDataSetMessageCount){
- currentWriterGroup->config.maxEncapsulatedDataSetMessageCount = config->maxEncapsulatedDataSetMessageCount;
- if(currentWriterGroup->config.messageSettings.encoding == UA_EXTENSIONOBJECT_ENCODED_NOBODY) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "MaxEncapsulatedDataSetMessag need enabled 'PayloadHeader' within the message settings.");
- }
- }
- if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
- UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
- currentWriterGroup->config.publishingInterval = config->publishingInterval;
- UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
- }
- if(currentWriterGroup->config.priority != config->priority) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "No or unsupported WriterGroup update.");
- }
- return UA_STATUSCODE_GOOD;
- }
- UA_WriterGroup *
- UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
- for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
- UA_WriterGroup *tmpWriterGroup;
- LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
- if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
- return tmpWriterGroup;
- }
- }
- }
- return NULL;
- }
- void
- UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig){
- //delete writerGroup config
- UA_String_deleteMembers(&writerGroupConfig->name);
- UA_ExtensionObject_deleteMembers(&writerGroupConfig->transportSettings);
- UA_ExtensionObject_deleteMembers(&writerGroupConfig->messageSettings);
- for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
- UA_KeyValuePair_deleteMembers(&writerGroupConfig->groupProperties[i]);
- }
- UA_free(writerGroupConfig->groupProperties);
- }
- static void
- UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup) {
- UA_WriterGroupConfig_deleteMembers(&writerGroup->config);
- //delete WriterGroup
- //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
- UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
- LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
- UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
- }
- UA_NodeId_deleteMembers(&writerGroup->linkedConnection);
- UA_NodeId_deleteMembers(&writerGroup->identifier);
- }
- UA_StatusCode
- UA_Server_addDataSetWriter(UA_Server *server,
- const UA_NodeId writerGroup, const UA_NodeId dataSet,
- const UA_DataSetWriterConfig *dataSetWriterConfig,
- UA_NodeId *writerIdentifier) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- if(!dataSetWriterConfig)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
- if(!currentDataSetContext)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
- if(!wg)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
- if(!newDataSetWriter)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- //copy the config into the new dataSetWriter
- UA_DataSetWriterConfig tmpDataSetWriterConfig;
- retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
- newDataSetWriter->config = tmpDataSetWriterConfig;
- //save the current version of the connected PublishedDataSet
- newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- //initialize the queue for the last values
- newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
- UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
- if(!newDataSetWriter->lastSamples) {
- UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
- UA_free(newDataSetWriter);
- return UA_STATUSCODE_BADOUTOFMEMORY;
- }
- newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
- #endif
- //connect PublishedDataSet with DataSetWriter
- newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
- newDataSetWriter->linkedWriterGroup = wg->identifier;
- UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
- if(writerIdentifier != NULL)
- UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
- //add the new writer to the group
- LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
- wg->writersCount++;
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- addDataSetWriterRepresentation(server, newDataSetWriter);
- #endif
- return retVal;
- }
- UA_StatusCode
- UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
- UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
- if(!dataSetWriter)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
- if(!linkedWriterGroup)
- return UA_STATUSCODE_BADNOTFOUND;
- linkedWriterGroup->writersCount--;
- #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
- removeDataSetWriterRepresentation(server, dataSetWriter);
- #endif
- //remove DataSetWriter from group
- UA_DataSetWriter_deleteMembers(server, dataSetWriter);
- LIST_REMOVE(dataSetWriter, listEntry);
- UA_free(dataSetWriter);
- return UA_STATUSCODE_GOOD;
- }
- /**********************************************/
- /* DataSetField */
- /**********************************************/
- UA_StatusCode
- UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
- memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
- if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
- UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
- UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
- &dst->field.variable.publishParameters);
- } else {
- return UA_STATUSCODE_BADNOTSUPPORTED;
- }
- return UA_STATUSCODE_GOOD;
- }
- UA_StatusCode
- UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
- UA_DataSetFieldConfig *config) {
- UA_StatusCode retVal = UA_STATUSCODE_GOOD;
- if(!config)
- return UA_STATUSCODE_BADINVALIDARGUMENT;
- UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
- if(!currentDataSetField)
- return UA_STATUSCODE_BADNOTFOUND;
- UA_DataSetFieldConfig tmpFieldConfig;
- //deep copy of the actual config
- retVal |= UA_DataSetFieldConfig_copy(¤tDataSetField->config, &tmpFieldConfig);
- *config = tmpFieldConfig;
- return retVal;
- }
- UA_DataSetField *
- UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
- for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
- UA_DataSetField *tmpField;
- LIST_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
- if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
- return tmpField;
- }
- }
- }
- return NULL;
- }
- void
- UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig){
- if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
- UA_String_deleteMembers(&dataSetFieldConfig->field.variable.fieldNameAlias);
- UA_PublishedVariableDataType_deleteMembers(&dataSetFieldConfig->field.variable.publishParameters);
- }
- }
- static void
- UA_DataSetField_deleteMembers(UA_DataSetField *field) {
- UA_DataSetFieldConfig_deleteMembers(&field->config);
- //delete DataSetField
- UA_NodeId_deleteMembers(&field->identifier);
- UA_NodeId_deleteMembers(&field->publishedDataSet);
- UA_FieldMetaData_deleteMembers(&field->fieldMetaData);
- }
- /*********************************************************/
- /* PublishValues handling */
- /*********************************************************/
- /**
- * Compare two variants. Internally used for value change detection.
- *
- * @return true if the value has changed
- */
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- static UA_Boolean
- valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
- if(! (oldValue && newValue))
- return false;
- UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
- size_t oldValueEncodingSize, newValueEncodingSize;
- oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
- newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
- if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
- return false;
- if(oldValueEncodingSize != newValueEncodingSize)
- return true;
- if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
- return false;
- if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
- return false;
- UA_Byte *bufPosOldValue = oldValueEncoding->data;
- const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
- UA_Byte *bufPosNewValue = newValueEncoding->data;
- const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
- if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
- &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
- return false;
- }
- if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
- &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
- return false;
- }
- oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
- newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
- UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
- UA_ByteString_delete(oldValueEncoding);
- UA_ByteString_delete(newValueEncoding);
- return compareResult;
- }
- #endif
- /**
- * Obtain the latest value for a specific DataSetField. This method is currently
- * called inside the DataSetMessage generation process.
- */
- static void
- UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
- UA_DataValue *value) {
- /* Read the value */
- UA_ReadValueId rvid;
- UA_ReadValueId_init(&rvid);
- rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
- rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
- rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
- *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
- }
- static UA_StatusCode
- UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
- UA_DataSetWriter *dataSetWriter) {
- UA_PublishedDataSet *currentDataSet =
- UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
- if(!currentDataSet)
- return UA_STATUSCODE_BADNOTFOUND;
- /* Prepare DataSetMessageContent */
- dataSetMessage->header.dataSetMessageValid = true;
- dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
- dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
- dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
- UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
- if(!dataSetMessage->data.keyFrameData.dataSetFields)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- #ifdef UA_ENABLE_JSON_ENCODING
- /* json: insert fieldnames used as json keys */
- dataSetMessage->data.keyFrameData.fieldNames =
- (UA_String *)UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_STRING]);
- if(!dataSetMessage->data.keyFrameData.fieldNames)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- #endif
- /* Loop over the fields */
- size_t counter = 0;
- UA_DataSetField *dsf;
- LIST_FOREACH(dsf, ¤tDataSet->fields, listEntry) {
- #ifdef UA_ENABLE_JSON_ENCODING
- /* json: store the fieldNameAlias*/
- UA_String_copy(&dsf->config.field.variable.fieldNameAlias,
- &dataSetMessage->data.keyFrameData.fieldNames[counter]);
- #endif
- /* Sample the value */
- UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
- UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
- /* Deactivate statuscode? */
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
- dfv->hasStatus = false;
- /* Deactivate timestamps */
- if(((u64)dataSetWriter->config.dataSetFieldContentMask &
- (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
- dfv->hasSourceTimestamp = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask &
- (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
- dfv->hasSourcePicoseconds = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask &
- (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
- dfv->hasServerTimestamp = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask &
- (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
- dfv->hasServerPicoseconds = false;
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- /* Update lastValue store */
- UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
- UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
- #endif
- counter++;
- }
- return UA_STATUSCODE_GOOD;
- }
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- static UA_StatusCode
- UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
- UA_DataSetMessage *dataSetMessage,
- UA_DataSetWriter *dataSetWriter) {
- UA_PublishedDataSet *currentDataSet =
- UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
- if(!currentDataSet)
- return UA_STATUSCODE_BADNOTFOUND;
- /* Prepare DataSetMessageContent */
- memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
- dataSetMessage->header.dataSetMessageValid = true;
- dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
- UA_DataSetField *dsf;
- size_t counter = 0;
- LIST_FOREACH(dsf, ¤tDataSet->fields, listEntry) {
- /* Sample the value */
- UA_DataValue value;
- UA_DataValue_init(&value);
- UA_PubSubDataSetField_sampleValue(server, dsf, &value);
- /* Check if the value has changed */
- if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
- /* increase fieldCount for current delta message */
- dataSetMessage->data.deltaFrameData.fieldCount++;
- dataSetWriter->lastSamples[counter].valueChanged = true;
- /* Update last stored sample */
- UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
- dataSetWriter->lastSamples[counter].value = value;
- } else {
- UA_DataValue_deleteMembers(&value);
- dataSetWriter->lastSamples[counter].valueChanged = false;
- }
- counter++;
- }
- /* Allocate DeltaFrameFields */
- UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
- UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
- if(!deltaFields)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
- size_t currentDeltaField = 0;
- for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
- if(!dataSetWriter->lastSamples[i].valueChanged)
- continue;
- UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
- dff->fieldIndex = (UA_UInt16) i;
- UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
- dataSetWriter->lastSamples[i].valueChanged = false;
- /* Deactivate statuscode? */
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
- dff->fieldValue.hasStatus = false;
- /* Deactivate timestamps? */
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
- dff->fieldValue.hasSourceTimestamp = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
- dff->fieldValue.hasServerPicoseconds = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
- dff->fieldValue.hasServerTimestamp = false;
- if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
- dff->fieldValue.hasServerPicoseconds = false;
- currentDeltaField++;
- }
- return UA_STATUSCODE_GOOD;
- }
- #endif
- /**
- * Generate a DataSetMessage for the given writer.
- *
- * @param dataSetWriter ptr to corresponding writer
- * @return ptr to generated DataSetMessage
- */
- static UA_StatusCode
- UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
- UA_DataSetWriter *dataSetWriter) {
- UA_PublishedDataSet *currentDataSet =
- UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
- if(!currentDataSet)
- return UA_STATUSCODE_BADNOTFOUND;
- /* Reset the message */
- memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
- /* store messageType to switch between json or uadp (default) */
- UA_UInt16 messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
- UA_JsonDataSetWriterMessageDataType *jsonDataSetWriterMessageDataType = NULL;
- /* The configuration Flags are included
- * inside the std. defined UA_UadpDataSetWriterMessageDataType */
- UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
- UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
- if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
- dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
- (dataSetWriter->config.messageSettings.content.decoded.type ==
- &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
- dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
- dataSetWriter->config.messageSettings.content.decoded.data;
- /* type is UADP */
- messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
- } else if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
- dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
- (dataSetWriter->config.messageSettings.content.decoded.type ==
- &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE])) {
- jsonDataSetWriterMessageDataType = (UA_JsonDataSetWriterMessageDataType *)
- dataSetWriter->config.messageSettings.content.decoded.data;
- /* type is JSON */
- messageType = UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE;
- } else {
- /* create default flag configuration if no
- * UadpDataSetWriterMessageDataType was passed in */
- memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
- defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
- ((u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
- dataSetWriterMessageDataType = &defaultUadpConfiguration;
- }
- /* Sanity-test the configuration */
- if(dataSetWriterMessageDataType &&
- (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
- dataSetWriterMessageDataType->dataSetOffset != 0 ||
- dataSetWriterMessageDataType->configuredSize != 0)) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Static DSM configuration not supported. Using defaults");
- dataSetWriterMessageDataType->networkMessageNumber = 0;
- dataSetWriterMessageDataType->dataSetOffset = 0;
- dataSetWriterMessageDataType->configuredSize = 0;
- }
- /* The field encoding depends on the flags inside the writer config.
- * TODO: This can be moved to the encoding layer. */
- if(dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_RAWDATA
- ) {
- dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
- } else if((u64)dataSetWriter->config.dataSetFieldContentMask &
- ((u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
- (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
- dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
- } else {
- dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
- }
- if(messageType == UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE) {
- /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION) {
- dataSetMessage->header.configVersionMajorVersionEnabled = true;
- dataSetMessage->header.configVersionMajorVersion =
- currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
- }
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION) {
- dataSetMessage->header.configVersionMinorVersionEnabled = true;
- dataSetMessage->header.configVersionMinorVersion =
- currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
- }
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
- dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
- dataSetMessage->header.dataSetMessageSequenceNr =
- dataSetWriter->actualDataSetMessageSequenceCount;
- }
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
- dataSetMessage->header.timestampEnabled = true;
- dataSetMessage->header.timestamp = UA_DateTime_now();
- }
- /* TODO: Picoseconds resolution not supported atm */
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
- dataSetMessage->header.picoSecondsIncluded = false;
- }
- /* TODO: Statuscode not supported yet */
- if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_UADPDATASETMESSAGECONTENTMASK_STATUS) {
- dataSetMessage->header.statusEnabled = false;
- }
- } else if(messageType == UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE) {
- if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
- dataSetMessage->header.configVersionMajorVersionEnabled = true;
- dataSetMessage->header.configVersionMajorVersion =
- currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
- }
- if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
- dataSetMessage->header.configVersionMinorVersionEnabled = true;
- dataSetMessage->header.configVersionMinorVersion =
- currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
- }
- if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
- dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
- dataSetMessage->header.dataSetMessageSequenceNr =
- dataSetWriter->actualDataSetMessageSequenceCount;
- }
- if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP) {
- dataSetMessage->header.timestampEnabled = true;
- dataSetMessage->header.timestamp = UA_DateTime_now();
- }
- /* TODO: Statuscode not supported yet */
- if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
- (u64)UA_JSONDATASETMESSAGECONTENTMASK_STATUS) {
- dataSetMessage->header.statusEnabled = false;
- }
- }
- /* Set the sequence count. Automatically rolls over to zero */
- dataSetWriter->actualDataSetMessageSequenceCount++;
- /* JSON does not differ between deltaframes and keyframes, only keyframes are currently used. */
- if(messageType != UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
- #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
- /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
- if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
- dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
- /* Remove old samples */
- for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
- UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
- /* Realloc pds dependent memory */
- dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
- UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
- UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
- if(!newSamplesArray)
- return UA_STATUSCODE_BADOUTOFMEMORY;
- dataSetWriter->lastSamples = newSamplesArray;
- memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
- dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
- UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
- dataSetWriter->deltaFrameCounter = 0;
- return UA_STATUSCODE_GOOD;
- }
- /* The standard defines: if a PDS contains only one fields no delta messages
- * should be generated because they need more memory than a keyframe with 1
- * field. */
- if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
- dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
- UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
- dataSetWriter->deltaFrameCounter++;
- return UA_STATUSCODE_GOOD;
- }
- dataSetWriter->deltaFrameCounter = 1;
- #endif
- }
- UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
- return UA_STATUSCODE_GOOD;
- }
- static UA_StatusCode
- sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
- UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
- UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
- #ifdef UA_ENABLE_JSON_ENCODING
- UA_NetworkMessage nm;
- memset(&nm, 0, sizeof(UA_NetworkMessage));
- nm.version = 1;
- nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
- nm.payloadHeaderEnabled = true;
- nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
- nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
- nm.payload.dataSetPayload.dataSetMessages = dsm;
- /* Allocate the buffer. Allocate on the stack if the buffer is small. */
- UA_ByteString buf;
- size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true);
- size_t stackSize = 1;
- if(msgSize <= UA_MAX_STACKBUF)
- stackSize = msgSize;
- UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
- buf.data = stackBuf;
- buf.length = msgSize;
- if(msgSize > UA_MAX_STACKBUF) {
- retval = UA_ByteString_allocBuffer(&buf, msgSize);
- if(retval != UA_STATUSCODE_GOOD)
- return retval;
- }
- /* Encode the message */
- UA_Byte *bufPos = buf.data;
- memset(bufPos, 0, msgSize);
- const UA_Byte *bufEnd = &buf.data[buf.length];
- retval = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true);
- if(retval != UA_STATUSCODE_GOOD) {
- if(msgSize > UA_MAX_STACKBUF)
- UA_ByteString_deleteMembers(&buf);
- return retval;
- }
- /* Send the prepared messages */
- retval = connection->channel->send(connection->channel, transportSettings, &buf);
- if(msgSize > UA_MAX_STACKBUF)
- UA_ByteString_deleteMembers(&buf);
- #endif
- return retval;
- }
- static UA_StatusCode
- sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
- UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
- UA_ExtensionObject *messageSettings,
- UA_ExtensionObject *transportSettings) {
- if(messageSettings->content.decoded.type !=
- &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
- return UA_STATUSCODE_BADINTERNALERROR;
- UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
- messageSettings->content.decoded.data;
- UA_NetworkMessage nm;
- memset(&nm, 0, sizeof(UA_NetworkMessage));
- nm.publisherIdEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
- nm.groupHeaderEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
- nm.groupHeader.writerGroupIdEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
- nm.groupHeader.groupVersionEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
- nm.groupHeader.networkMessageNumberEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
- nm.groupHeader.sequenceNumberEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
- nm.payloadHeaderEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
- nm.timestampEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
- nm.picosecondsEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
- nm.dataSetClassIdEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
- nm.promotedFieldsEnabled =
- ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
- nm.version = 1;
- nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
- if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
- nm.publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
- nm.publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
- } else if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
- nm.publisherIdType = UA_PUBLISHERDATATYPE_STRING;
- nm.publisherId.publisherIdString = connection->config->publisherId.string;
- }
- /* Compute the length of the dsm separately for the header */
- UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
- for(UA_Byte i = 0; i < dsmCount; i++)
- dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
- nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
- nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
- nm.groupHeader.writerGroupId = wg->config.writerGroupId;
- nm.groupHeader.networkMessageNumber = 1;
- nm.payload.dataSetPayload.sizes = dsmLengths;
- nm.payload.dataSetPayload.dataSetMessages = dsm;
- /* Allocate the buffer. Allocate on the stack if the buffer is small. */
- UA_ByteString buf;
- size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
- size_t stackSize = 1;
- if(msgSize <= UA_MAX_STACKBUF)
- stackSize = msgSize;
- UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
- buf.data = stackBuf;
- buf.length = msgSize;
- UA_StatusCode retval;
- if(msgSize > UA_MAX_STACKBUF) {
- retval = UA_ByteString_allocBuffer(&buf, msgSize);
- if(retval != UA_STATUSCODE_GOOD)
- return retval;
- }
- /* Encode the message */
- UA_Byte *bufPos = buf.data;
- memset(bufPos, 0, msgSize);
- const UA_Byte *bufEnd = &buf.data[buf.length];
- retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
- if(retval != UA_STATUSCODE_GOOD) {
- if(msgSize > UA_MAX_STACKBUF)
- UA_ByteString_deleteMembers(&buf);
- return retval;
- }
- /* Send the prepared messages */
- retval = connection->channel->send(connection->channel, transportSettings, &buf);
- if(msgSize > UA_MAX_STACKBUF)
- UA_ByteString_deleteMembers(&buf);
- return retval;
- }
- /* This callback triggers the collection and publish of NetworkMessages and the
- * contained DataSetMessages. */
- void
- UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
- UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Publish Callback");
- if(!writerGroup) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Publish failed. WriterGroup not found");
- return;
- }
- /* Nothing to do? */
- if(writerGroup->writersCount <= 0)
- return;
- /* Binary or Json encoding? */
- if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
- writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Publish failed: Unknown encoding type.");
- return;
- }
- /* Find the connection associated with the writer */
- UA_PubSubConnection *connection =
- UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
- if(!connection) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "Publish failed. PubSubConnection invalid.");
- return;
- }
- /* How many DSM can be sent in one NM? */
- UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
- if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
- maxDSM = UA_BYTE_MAX;
- /* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
- if(maxDSM == 0)
- maxDSM = 1;
- /* It is possible to put several DataSetMessages into one NetworkMessage.
- * But only if they do not contain promoted fields. NM with only DSM are
- * sent out right away. The others are kept in a buffer for "batching". */
- size_t dsmCount = 0;
- UA_DataSetWriter *dsw;
- UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
- UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
- LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
- /* Find the dataset */
- UA_PublishedDataSet *pds =
- UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
- if(!pds) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub Publish: PublishedDataSet not found");
- continue;
- }
- /* Generate the DSM */
- UA_StatusCode res =
- UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
- if(res != UA_STATUSCODE_GOOD) {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub Publish: DataSetMessage creation failed");
- continue;
- }
- /* Send right away if there is only this DSM in a NM. If promoted fields
- * are contained in the PublishedDataSet, then this DSM must go into a
- * dedicated NM as well. */
- if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
- if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
- res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
- &dsw->config.dataSetWriterId, 1,
- &writerGroup->config.messageSettings,
- &writerGroup->config.transportSettings);
- }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
- res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
- &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
- }
- if(res != UA_STATUSCODE_GOOD)
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub Publish: Could not send a NetworkMessage");
- UA_DataSetMessage_free(&dsmStore[dsmCount]);
- continue;
- }
- dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
- dsmCount++;
- }
- /* Send the NetworkMessages with batched DataSetMessages */
- size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
- for(UA_UInt32 i = 0; i < nmCount; i++) {
- UA_Byte nmDsmCount = maxDSM;
- if(i == nmCount - 1 && (dsmCount % maxDSM))
- nmDsmCount = (UA_Byte)dsmCount % maxDSM;
- UA_StatusCode res3 = UA_STATUSCODE_GOOD;
- if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
- res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
- &dsWriterIds[i * maxDSM], nmDsmCount,
- &writerGroup->config.messageSettings,
- &writerGroup->config.transportSettings);
- }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
- res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
- &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
- }
- if(res3 != UA_STATUSCODE_GOOD)
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
- "PubSub Publish: Sending a NetworkMessage failed");
- }
- /* Clean up DSM */
- for(size_t i = 0; i < dsmCount; i++)
- UA_DataSetMessage_free(&dsmStore[i]);
- }
- /* Add new publishCallback. The first execution is triggered directly after
- * creation. */
- UA_StatusCode
- UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
- UA_StatusCode retval =
- UA_PubSubManager_addRepeatedCallback(server,
- (UA_ServerCallback) UA_WriterGroup_publishCallback,
- writerGroup, writerGroup->config.publishingInterval,
- &writerGroup->publishCallbackId);
- if(retval == UA_STATUSCODE_GOOD)
- writerGroup->publishCallbackIsRegistered = true;
- /* Run once after creation */
- UA_WriterGroup_publishCallback(server, writerGroup);
- return retval;
- }
- /* This callback triggers the collection and reception of NetworkMessages and the
- * contained DataSetMessages. */
- void UA_ReaderGroup_subscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
- UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
- UA_ByteString buffer;
- if(UA_ByteString_allocBuffer(&buffer, 512) != UA_STATUSCODE_GOOD) {
- UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Message buffer alloc failed!");
- return;
- }
- connection->channel->receive(connection->channel, &buffer, NULL, 300000);
- if(buffer.length > 0) {
- UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "Message received:");
- UA_NetworkMessage currentNetworkMessage;
- memset(¤tNetworkMessage, 0, sizeof(UA_NetworkMessage));
- size_t currentPosition = 0;
- UA_NetworkMessage_decodeBinary(&buffer, ¤tPosition, ¤tNetworkMessage);
- UA_Server_processNetworkMessage(server, ¤tNetworkMessage, connection);
- UA_NetworkMessage_deleteMembers(¤tNetworkMessage);
- }
- UA_ByteString_deleteMembers(&buffer);
- }
- /* Add new subscribeCallback. The first execution is triggered directly after
- * creation. */
- UA_StatusCode
- UA_ReaderGroup_addSubscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
- UA_StatusCode retval = UA_STATUSCODE_GOOD;
- UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
- if(connection != NULL) {
- retval = connection->channel->regist(connection->channel, NULL, NULL);
- if(retval == UA_STATUSCODE_GOOD) {
- retval = UA_PubSubManager_addRepeatedCallback(server,
- (UA_ServerCallback) UA_ReaderGroup_subscribeCallback,
- readerGroup, 5,
- &readerGroup->subscribeCallbackId);
- }
- else {
- UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER, "register channel failed: 0x%x!", retval);
- }
- }
- if(retval == UA_STATUSCODE_GOOD) {
- readerGroup->subscribeCallbackIsRegistered = true;
- }
- /* Run once after creation */
- UA_ReaderGroup_subscribeCallback(server, readerGroup);
- return retval;
- }
- #endif /* UA_ENABLE_PUBSUB */
|