ua_pubsub_writer.c 90 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. * Copyright (c) 2019 Kalycito Infotech Private Limited
  8. */
  9. #include <open62541/server_pubsub.h>
  10. #include "server/ua_server_internal.h"
  11. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  12. #include "ua_pubsub.h"
  13. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  14. #include "ua_pubsub_ns0.h"
  15. #endif
  16. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  17. #include "ua_types_encoding_binary.h"
  18. #endif
  19. #define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */
  20. /* Forward declaration */
  21. static void
  22. UA_WriterGroup_clear(UA_Server *server, UA_WriterGroup *writerGroup);
  23. static void
  24. UA_DataSetField_clear(UA_DataSetField *field);
  25. static UA_StatusCode
  26. generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
  27. UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
  28. UA_ExtensionObject *messageSettings,
  29. UA_ExtensionObject *transportSettings,
  30. UA_NetworkMessage *networkMessage);
  31. static UA_StatusCode
  32. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  33. UA_DataSetWriter *dataSetWriter);
  34. /**********************************************/
  35. /* Connection */
  36. /**********************************************/
  37. UA_StatusCode
  38. UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
  39. UA_PubSubConnectionConfig *dst) {
  40. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  41. memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
  42. retVal |= UA_String_copy(&src->name, &dst->name);
  43. retVal |= UA_Variant_copy(&src->address, &dst->address);
  44. retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
  45. retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
  46. if(src->connectionPropertiesSize > 0){
  47. dst->connectionProperties = (UA_KeyValuePair *)
  48. UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
  49. if(!dst->connectionProperties){
  50. return UA_STATUSCODE_BADOUTOFMEMORY;
  51. }
  52. for(size_t i = 0; i < src->connectionPropertiesSize; i++){
  53. retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
  54. &dst->connectionProperties[i].key);
  55. retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
  56. &dst->connectionProperties[i].value);
  57. }
  58. }
  59. return retVal;
  60. }
  61. UA_StatusCode
  62. UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
  63. UA_PubSubConnectionConfig *config) {
  64. if(!config)
  65. return UA_STATUSCODE_BADINVALIDARGUMENT;
  66. UA_PubSubConnection *currentPubSubConnection =
  67. UA_PubSubConnection_findConnectionbyId(server, connection);
  68. if(!currentPubSubConnection)
  69. return UA_STATUSCODE_BADNOTFOUND;
  70. UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
  71. //deep copy of the actual config
  72. UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
  73. *config = tmpPubSubConnectionConfig;
  74. return UA_STATUSCODE_GOOD;
  75. }
  76. UA_PubSubConnection *
  77. UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
  78. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  79. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
  80. return &server->pubSubManager.connections[i];
  81. }
  82. }
  83. return NULL;
  84. }
  85. void
  86. UA_PubSubConnectionConfig_clear(UA_PubSubConnectionConfig *connectionConfig) {
  87. UA_String_clear(&connectionConfig->name);
  88. UA_String_clear(&connectionConfig->transportProfileUri);
  89. UA_Variant_clear(&connectionConfig->connectionTransportSettings);
  90. UA_Variant_clear(&connectionConfig->address);
  91. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  92. UA_QualifiedName_clear(&connectionConfig->connectionProperties[i].key);
  93. UA_Variant_clear(&connectionConfig->connectionProperties[i].value);
  94. }
  95. UA_free(connectionConfig->connectionProperties);
  96. }
  97. void
  98. UA_PubSubConnection_clear(UA_Server *server, UA_PubSubConnection *connection) {
  99. //delete connection config
  100. UA_PubSubConnectionConfig_clear(connection->config);
  101. //remove contained WriterGroups
  102. UA_WriterGroup *writerGroup, *tmpWriterGroup;
  103. LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
  104. UA_Server_removeWriterGroup(server, writerGroup->identifier);
  105. }
  106. /* remove contained ReaderGroups */
  107. UA_ReaderGroup *readerGroups, *tmpReaderGroup;
  108. LIST_FOREACH_SAFE(readerGroups, &connection->readerGroups, listEntry, tmpReaderGroup){
  109. UA_Server_removeReaderGroup(server, readerGroups->identifier);
  110. }
  111. UA_NodeId_clear(&connection->identifier);
  112. if(connection->channel){
  113. connection->channel->close(connection->channel);
  114. }
  115. UA_free(connection->config);
  116. }
  117. UA_StatusCode
  118. UA_PubSubConnection_regist(UA_Server *server, UA_NodeId *connectionIdentifier) {
  119. UA_PubSubConnection *connection =
  120. UA_PubSubConnection_findConnectionbyId(server, *connectionIdentifier);
  121. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  122. if(connection == NULL) {
  123. return UA_STATUSCODE_BADNOTFOUND;
  124. }
  125. retval = connection->channel->regist(connection->channel, NULL, NULL);
  126. if(retval != UA_STATUSCODE_GOOD) {
  127. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  128. "register channel failed: 0x%x!", retval);
  129. }
  130. return retval;
  131. }
  132. UA_StatusCode
  133. UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
  134. const UA_WriterGroupConfig *writerGroupConfig,
  135. UA_NodeId *writerGroupIdentifier) {
  136. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  137. if(!writerGroupConfig)
  138. return UA_STATUSCODE_BADINVALIDARGUMENT;
  139. //search the connection by the given connectionIdentifier
  140. UA_PubSubConnection *currentConnectionContext =
  141. UA_PubSubConnection_findConnectionbyId(server, connection);
  142. if(!currentConnectionContext)
  143. return UA_STATUSCODE_BADNOTFOUND;
  144. if(currentConnectionContext->config->configurationFrozen){
  145. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  146. "Adding WriterGroup failed. PubSubConnection is frozen.");
  147. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  148. }
  149. //allocate memory for new WriterGroup
  150. UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
  151. if(!newWriterGroup)
  152. return UA_STATUSCODE_BADOUTOFMEMORY;
  153. newWriterGroup->linkedConnection = currentConnectionContext->identifier;
  154. newWriterGroup->linkedConnectionPtr = currentConnectionContext;
  155. UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
  156. if(writerGroupIdentifier){
  157. UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
  158. }
  159. //deep copy of the config
  160. UA_WriterGroupConfig tmpWriterGroupConfig;
  161. retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
  162. if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
  163. UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
  164. tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
  165. tmpWriterGroupConfig.messageSettings.content.decoded.type =
  166. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
  167. tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
  168. }
  169. newWriterGroup->config = tmpWriterGroupConfig;
  170. LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
  171. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  172. addWriterGroupRepresentation(server, newWriterGroup);
  173. #endif
  174. return retVal;
  175. }
  176. UA_StatusCode
  177. UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
  178. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  179. if(!wg)
  180. return UA_STATUSCODE_BADNOTFOUND;
  181. if(wg->config.configurationFrozen){
  182. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  183. "Delete WriterGroup failed. WriterGroup is frozen.");
  184. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  185. }
  186. UA_PubSubConnection *connection =
  187. UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  188. if(!connection)
  189. return UA_STATUSCODE_BADNOTFOUND;
  190. if(connection->config->configurationFrozen){
  191. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  192. "Delete WriterGroup failed. PubSubConnection is frozen.");
  193. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  194. }
  195. if(wg->state == UA_PUBSUBSTATE_OPERATIONAL){
  196. //unregister the publish callback
  197. UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
  198. }
  199. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  200. removeGroupRepresentation(server, wg);
  201. #endif
  202. UA_WriterGroup_clear(server, wg);
  203. LIST_REMOVE(wg, listEntry);
  204. UA_free(wg);
  205. return UA_STATUSCODE_GOOD;
  206. }
  207. UA_StatusCode
  208. UA_Server_freezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writerGroup){
  209. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  210. if(!wg)
  211. return UA_STATUSCODE_BADNOTFOUND;
  212. //PubSubConnection freezeCounter++
  213. UA_PubSubConnection *pubSubConnection = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  214. pubSubConnection->configurationFreezeCounter++;
  215. pubSubConnection->config->configurationFrozen = UA_TRUE;
  216. //WriterGroup freeze
  217. wg->config.configurationFrozen = UA_TRUE;
  218. //DataSetWriter freeze
  219. size_t dsmCount = 0;
  220. UA_DataSetWriter *dataSetWriter;
  221. LIST_FOREACH(dataSetWriter, &wg->writers, listEntry){
  222. dataSetWriter->config.configurationFrozen = UA_TRUE;
  223. //PublishedDataSet freezeCounter++
  224. UA_PublishedDataSet *publishedDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  225. publishedDataSet->configurationFreezeCounter++;
  226. publishedDataSet->config.configurationFrozen = UA_TRUE;
  227. //DataSetFields freeze
  228. UA_DataSetField *dataSetField;
  229. TAILQ_FOREACH(dataSetField, &publishedDataSet->fields, listEntry){
  230. dataSetField->config.configurationFrozen = UA_TRUE;
  231. }
  232. }
  233. if(wg->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE){
  234. if(wg->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP) {
  235. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  236. "PubSub-RT configuration fail: Non-RT capable encoding.");
  237. return UA_STATUSCODE_BADNOTSUPPORTED;
  238. }
  239. //TODO Clarify: should we only allow = maxEncapsulatedDataSetMessageCount == 1 with RT?
  240. //TODO Clarify: Behaviour if the finale size is more than MTU
  241. /* Generate data set messages */
  242. UA_STACKARRAY(UA_UInt16, dsWriterIds, wg->writersCount);
  243. UA_STACKARRAY(UA_DataSetMessage, dsmStore, wg->writersCount);
  244. UA_DataSetWriter *dsw;
  245. LIST_FOREACH(dsw, &wg->writers, listEntry) {
  246. /* Find the dataset */
  247. UA_PublishedDataSet *pds =
  248. UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
  249. if(!pds) {
  250. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  251. "PubSub Publish: PublishedDataSet not found");
  252. continue;
  253. }
  254. if(pds->promotedFieldsCount > 0) {
  255. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  256. "PubSub-RT configuration fail: PDS contains promoted fields.");
  257. return UA_STATUSCODE_BADNOTSUPPORTED;
  258. }
  259. UA_DataSetField *dsf;
  260. TAILQ_FOREACH(dsf, &pds->fields, listEntry){
  261. if(!dsf->config.field.variable.staticValueSourceEnabled){
  262. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  263. "PubSub-RT configuration fail: PDS contains variables with dynamic length types.");
  264. return UA_STATUSCODE_BADNOTSUPPORTED;
  265. }
  266. if((UA_NodeId_equal(&dsf->fieldMetaData.dataType, &UA_TYPES[UA_TYPES_STRING].typeId) ||
  267. UA_NodeId_equal(&dsf->fieldMetaData.dataType, &UA_TYPES[UA_TYPES_BYTESTRING].typeId)) &&
  268. dsf->fieldMetaData.maxStringLength == 0){
  269. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  270. "PubSub-RT configuration fail: PDS contains String/ByteString with dynamic length.");
  271. return UA_STATUSCODE_BADNOTSUPPORTED;
  272. } else if(!UA_DataType_isNumeric(UA_findDataType(&dsf->fieldMetaData.dataType))){
  273. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  274. "PubSub-RT configuration fail: PDS contains variable with dynamic size.");
  275. return UA_STATUSCODE_BADNOTSUPPORTED;
  276. }
  277. }
  278. /* Generate the DSM */
  279. UA_StatusCode res =
  280. UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
  281. if(res != UA_STATUSCODE_GOOD) {
  282. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  283. "PubSub RT Offset calculation: DataSetMessage buffering failed");
  284. continue;
  285. }
  286. dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
  287. dsmCount++;
  288. }
  289. UA_NetworkMessage networkMessage;
  290. UA_StatusCode res = generateNetworkMessage(pubSubConnection, wg, dsmStore, dsWriterIds, (UA_Byte) dsmCount,
  291. &wg->config.messageSettings, &wg->config.transportSettings, &networkMessage);
  292. if(res != UA_STATUSCODE_GOOD)
  293. return UA_STATUSCODE_BADINTERNALERROR;
  294. UA_NetworkMessage_generateOffsetBuffer(&wg->bufferedMessage, &networkMessage);
  295. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  296. UA_ByteString buf;
  297. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&networkMessage);
  298. res = UA_ByteString_allocBuffer(&buf, msgSize);
  299. if(res != UA_STATUSCODE_GOOD)
  300. return UA_STATUSCODE_BADOUTOFMEMORY;
  301. wg->bufferedMessage.buffer = buf;
  302. const UA_Byte *bufEnd = &wg->bufferedMessage.buffer.data[wg->bufferedMessage.buffer.length];
  303. UA_Byte *bufPos = wg->bufferedMessage.buffer.data;
  304. UA_NetworkMessage_encodeBinary(&networkMessage, &bufPos, bufEnd);
  305. /* Clean up DSM */
  306. for(size_t i = 0; i < dsmCount; i++)
  307. UA_free(dsmStore[i].data.keyFrameData.dataSetFields);
  308. //UA_DataSetMessage_free(&dsmStore[i]);
  309. }
  310. return UA_STATUSCODE_GOOD;
  311. }
  312. UA_StatusCode
  313. UA_Server_unfreezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writerGroup){
  314. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  315. if(!wg)
  316. return UA_STATUSCODE_BADNOTFOUND;
  317. //if(wg->config.rtLevel == UA_PUBSUB_RT_NONE){
  318. // UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  319. // "PubSub configuration freeze without RT configuration has no effect.");
  320. // return UA_STATUSCODE_BADCONFIGURATIONERROR;
  321. //}
  322. //PubSubConnection freezeCounter--
  323. UA_PubSubConnection *pubSubConnection = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  324. pubSubConnection->configurationFreezeCounter--;
  325. if(pubSubConnection->configurationFreezeCounter == 0){
  326. pubSubConnection->config->configurationFrozen = UA_FALSE;
  327. }
  328. //WriterGroup unfreeze
  329. wg->config.configurationFrozen = UA_FALSE;
  330. //DataSetWriter unfreeze
  331. UA_DataSetWriter *dataSetWriter;
  332. LIST_FOREACH(dataSetWriter, &wg->writers, listEntry){
  333. UA_PublishedDataSet *publishedDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  334. //PublishedDataSet freezeCounter--
  335. publishedDataSet->configurationFreezeCounter--;
  336. if(publishedDataSet->configurationFreezeCounter == 0){
  337. publishedDataSet->config.configurationFrozen = UA_FALSE;
  338. UA_DataSetField *dataSetField;
  339. TAILQ_FOREACH(dataSetField, &publishedDataSet->fields, listEntry){
  340. dataSetField->config.configurationFrozen = UA_FALSE;
  341. }
  342. }
  343. dataSetWriter->config.configurationFrozen = UA_FALSE;
  344. }
  345. return UA_STATUSCODE_GOOD;
  346. }
  347. UA_StatusCode UA_EXPORT
  348. UA_Server_setWriterGroupOperational(UA_Server *server, const UA_NodeId writerGroup){
  349. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  350. if(!wg)
  351. return UA_STATUSCODE_BADNOTFOUND;
  352. return UA_WriterGroup_setPubSubState(server, UA_PUBSUBSTATE_OPERATIONAL, wg);
  353. }
  354. UA_StatusCode UA_EXPORT
  355. UA_Server_setWriterGroupDisabled(UA_Server *server, const UA_NodeId writerGroup){
  356. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  357. if(!wg)
  358. return UA_STATUSCODE_BADNOTFOUND;
  359. return UA_WriterGroup_setPubSubState(server, UA_PUBSUBSTATE_DISABLED, wg);
  360. }
  361. /**********************************************/
  362. /* PublishedDataSet */
  363. /**********************************************/
  364. UA_StatusCode
  365. UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
  366. UA_PublishedDataSetConfig *dst) {
  367. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  368. memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
  369. retVal |= UA_String_copy(&src->name, &dst->name);
  370. switch(src->publishedDataSetType){
  371. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  372. //no additional items
  373. break;
  374. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  375. if(src->config.itemsTemplate.variablesToAddSize > 0){
  376. dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
  377. src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
  378. }
  379. for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
  380. retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
  381. &dst->config.itemsTemplate.variablesToAdd[i]);
  382. }
  383. retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
  384. &dst->config.itemsTemplate.metaData);
  385. break;
  386. default:
  387. return UA_STATUSCODE_BADINVALIDARGUMENT;
  388. }
  389. return retVal;
  390. }
  391. UA_StatusCode
  392. UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
  393. UA_PublishedDataSetConfig *config){
  394. if(!config)
  395. return UA_STATUSCODE_BADINVALIDARGUMENT;
  396. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  397. if(!currentPublishedDataSet)
  398. return UA_STATUSCODE_BADNOTFOUND;
  399. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  400. //deep copy of the actual config
  401. UA_PublishedDataSetConfig_copy(&currentPublishedDataSet->config, &tmpPublishedDataSetConfig);
  402. *config = tmpPublishedDataSetConfig;
  403. return UA_STATUSCODE_GOOD;
  404. }
  405. UA_StatusCode
  406. UA_Server_getPublishedDataSetMetaData(UA_Server *server, const UA_NodeId pds, UA_DataSetMetaDataType *metaData){
  407. if(!metaData)
  408. return UA_STATUSCODE_BADINVALIDARGUMENT;
  409. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  410. if(!currentPublishedDataSet)
  411. return UA_STATUSCODE_BADNOTFOUND;
  412. UA_DataSetMetaDataType tmpDataSetMetaData;
  413. if(UA_DataSetMetaDataType_copy(&currentPublishedDataSet->dataSetMetaData, &tmpDataSetMetaData) != UA_STATUSCODE_GOOD)
  414. return UA_STATUSCODE_BADINTERNALERROR;
  415. *metaData = tmpDataSetMetaData;
  416. return UA_STATUSCODE_GOOD;
  417. }
  418. UA_PublishedDataSet *
  419. UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
  420. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  421. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
  422. return &server->pubSubManager.publishedDataSets[i];
  423. }
  424. }
  425. return NULL;
  426. }
  427. void
  428. UA_PublishedDataSetConfig_clear(UA_PublishedDataSetConfig *pdsConfig){
  429. //delete pds config
  430. UA_String_clear(&pdsConfig->name);
  431. switch (pdsConfig->publishedDataSetType){
  432. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  433. //no additional items
  434. break;
  435. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  436. if(pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
  437. for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
  438. UA_PublishedVariableDataType_clear(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
  439. }
  440. UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
  441. }
  442. UA_DataSetMetaDataType_clear(&pdsConfig->config.itemsTemplate.metaData);
  443. break;
  444. default:
  445. break;
  446. }
  447. }
  448. void
  449. UA_PublishedDataSet_clear(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
  450. UA_PublishedDataSetConfig_clear(&publishedDataSet->config);
  451. //delete PDS
  452. UA_DataSetField *field, *tmpField;
  453. TAILQ_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
  454. UA_Server_removeDataSetField(server, field->identifier);
  455. }
  456. UA_DataSetMetaDataType_clear(&publishedDataSet->dataSetMetaData);
  457. UA_NodeId_clear(&publishedDataSet->identifier);
  458. }
  459. static UA_StatusCode
  460. generateFieldMetaData(UA_Server *server, UA_DataSetField *field, UA_FieldMetaData *fieldMetaData){
  461. switch (field->config.dataSetFieldType){
  462. case UA_PUBSUB_DATASETFIELD_VARIABLE:
  463. if(UA_String_copy(&field->config.field.variable.fieldNameAlias, &fieldMetaData->name) != UA_STATUSCODE_GOOD)
  464. return UA_STATUSCODE_BADINTERNALERROR;
  465. fieldMetaData->description = UA_LOCALIZEDTEXT_ALLOC("", "");
  466. fieldMetaData->dataSetFieldId = UA_GUID_NULL;
  467. //ToDo after freeze PR, the value source must be checked (other behavior for static value source)
  468. if(field->config.field.variable.staticValueSourceEnabled){
  469. fieldMetaData->arrayDimensions = (UA_UInt32 *) UA_calloc(
  470. field->config.field.variable.staticValueSource.value.arrayDimensionsSize, sizeof(UA_UInt32));
  471. if(fieldMetaData->arrayDimensions == NULL)
  472. return UA_STATUSCODE_BADOUTOFMEMORY;
  473. memcpy(fieldMetaData->arrayDimensions, field->config.field.variable.staticValueSource.value.arrayDimensions,
  474. sizeof(UA_UInt32) *field->config.field.variable.staticValueSource.value.arrayDimensionsSize);
  475. fieldMetaData->arrayDimensionsSize = field->config.field.variable.staticValueSource.value.arrayDimensionsSize;
  476. if(UA_NodeId_copy(&field->config.field.variable.staticValueSource.value.type->typeId,
  477. &fieldMetaData->dataType) != UA_STATUSCODE_GOOD){
  478. if(fieldMetaData->arrayDimensions){
  479. UA_free(fieldMetaData->arrayDimensions);
  480. return UA_STATUSCODE_BADINTERNALERROR;
  481. }
  482. }
  483. fieldMetaData->properties = NULL;
  484. fieldMetaData->propertiesSize = 0;
  485. //TODO collect value rank for the static field source
  486. fieldMetaData->fieldFlags = UA_DATASETFIELDFLAGS_NONE;
  487. return UA_STATUSCODE_GOOD;
  488. }
  489. UA_Variant value;
  490. UA_Variant_init(&value);
  491. if(UA_Server_readArrayDimensions(server, field->config.field.variable.publishParameters.publishedVariable,
  492. &value) != UA_STATUSCODE_GOOD){
  493. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  494. "PubSub meta data generation. Reading ArrayDimension failed.");
  495. } else {
  496. fieldMetaData->arrayDimensions = (UA_UInt32 *) UA_calloc(value.arrayDimensionsSize, sizeof(UA_UInt32));
  497. if(fieldMetaData->arrayDimensions == NULL)
  498. return UA_STATUSCODE_BADOUTOFMEMORY;
  499. memcpy(fieldMetaData->arrayDimensions, value.arrayDimensions, sizeof(UA_UInt32)*value.arrayDimensionsSize);
  500. fieldMetaData->arrayDimensionsSize = value.arrayDimensionsSize;
  501. }
  502. if(UA_Server_readDataType(server, field->config.field.variable.publishParameters.publishedVariable,
  503. &fieldMetaData->dataType) != UA_STATUSCODE_GOOD){
  504. if(fieldMetaData->arrayDimensions){
  505. UA_free(fieldMetaData->arrayDimensions);
  506. return UA_STATUSCODE_BADINTERNALERROR;
  507. }
  508. }
  509. fieldMetaData->properties = NULL;
  510. fieldMetaData->propertiesSize = 0;
  511. UA_Int32 valueRank;
  512. if(UA_Server_readValueRank(server, field->config.field.variable.publishParameters.publishedVariable,
  513. &valueRank) != UA_STATUSCODE_GOOD){
  514. if(fieldMetaData->arrayDimensions){
  515. UA_free(fieldMetaData->arrayDimensions);
  516. return UA_STATUSCODE_BADINTERNALERROR;
  517. }
  518. }
  519. fieldMetaData->valueRank = valueRank;
  520. if(field->config.field.variable.promotedField){
  521. fieldMetaData->fieldFlags = UA_DATASETFIELDFLAGS_PROMOTEDFIELD;
  522. } else {
  523. fieldMetaData->fieldFlags = UA_DATASETFIELDFLAGS_NONE;
  524. }
  525. //TODO collect the following fields
  526. //fieldMetaData.builtInType
  527. //fieldMetaData.maxStringLength
  528. return UA_STATUSCODE_GOOD;
  529. case UA_PUBSUB_DATASETFIELD_EVENT:
  530. return UA_STATUSCODE_BADNOTSUPPORTED;
  531. default:
  532. return UA_STATUSCODE_BADNOTSUPPORTED;
  533. }
  534. }
  535. UA_DataSetFieldResult
  536. UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
  537. const UA_DataSetFieldConfig *fieldConfig,
  538. UA_NodeId *fieldIdentifier) {
  539. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  540. UA_DataSetFieldResult result = {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
  541. if(!fieldConfig)
  542. return result;
  543. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
  544. if(currentDataSet == NULL){
  545. result.result = UA_STATUSCODE_BADNOTFOUND;
  546. return result;
  547. }
  548. if(currentDataSet->config.configurationFrozen){
  549. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  550. "Adding DataSetField failed. PublishedDataSet is frozen.");
  551. result.result = UA_STATUSCODE_BADCONFIGURATIONERROR;
  552. return result;
  553. }
  554. if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
  555. result.result = UA_STATUSCODE_BADNOTIMPLEMENTED;
  556. return result;
  557. }
  558. UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
  559. if(!newField){
  560. result.result = UA_STATUSCODE_BADINTERNALERROR;
  561. return result;
  562. }
  563. UA_DataSetFieldConfig tmpFieldConfig;
  564. retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
  565. newField->config = tmpFieldConfig;
  566. UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
  567. if(fieldIdentifier != NULL){
  568. UA_NodeId_copy(&newField->identifier, fieldIdentifier);
  569. }
  570. newField->publishedDataSet = currentDataSet->identifier;
  571. //update major version of parent published data set
  572. currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
  573. /* The order of DataSetFields should be the same in both creating and publishing.
  574. * So adding DataSetFields at the the end of the DataSets using TAILQ structure */
  575. if (currentDataSet->fieldSize != 0)
  576. TAILQ_INSERT_TAIL(&currentDataSet->fields, newField, listEntry);
  577. else
  578. TAILQ_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
  579. if(newField->config.field.variable.promotedField)
  580. currentDataSet->promotedFieldsCount++;
  581. currentDataSet->fieldSize++;
  582. //generate fieldMetadata within the DataSetMetaData
  583. currentDataSet->dataSetMetaData.fieldsSize++;
  584. UA_FieldMetaData *fieldMetaData = (UA_FieldMetaData *) UA_realloc(currentDataSet->dataSetMetaData.fields, currentDataSet->dataSetMetaData.fieldsSize *
  585. sizeof(UA_FieldMetaData));
  586. if(!fieldMetaData){
  587. result.result = UA_STATUSCODE_BADOUTOFMEMORY;
  588. return result;
  589. }
  590. currentDataSet->dataSetMetaData.fields = fieldMetaData;
  591. UA_FieldMetaData_init(&fieldMetaData[currentDataSet->fieldSize-1]);
  592. if(generateFieldMetaData(server, newField, &fieldMetaData[currentDataSet->fieldSize-1]) != UA_STATUSCODE_GOOD){
  593. UA_Server_removeDataSetField(server, newField->identifier);
  594. result.result = UA_STATUSCODE_BADINTERNALERROR;
  595. return result;
  596. }
  597. UA_DataSetField *dsf;
  598. size_t counter = 0;
  599. TAILQ_FOREACH(dsf, &currentDataSet->fields, listEntry){
  600. dsf->fieldMetaData = fieldMetaData[counter++];
  601. }
  602. result.result = retVal;
  603. result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  604. result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  605. return result;
  606. }
  607. UA_DataSetFieldResult
  608. UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
  609. UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
  610. UA_DataSetFieldResult result = {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  611. if(!currentField)
  612. return result;
  613. if(currentField->config.configurationFrozen){
  614. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  615. "Remove DataSetField failed. DataSetField is frozen.");
  616. result.result = UA_STATUSCODE_BADCONFIGURATIONERROR;
  617. return result;
  618. }
  619. UA_PublishedDataSet *parentPublishedDataSet =
  620. UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
  621. if(!parentPublishedDataSet)
  622. return result;
  623. if(parentPublishedDataSet->config.configurationFrozen){
  624. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  625. "Remove DataSetField failed. PublishedDataSet is frozen.");
  626. result.result = UA_STATUSCODE_BADCONFIGURATIONERROR;
  627. return result;
  628. }
  629. parentPublishedDataSet->fieldSize--;
  630. if(currentField->config.field.variable.promotedField)
  631. parentPublishedDataSet->promotedFieldsCount--;
  632. /* update major version of PublishedDataSet */
  633. parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
  634. UA_PubSubConfigurationVersionTimeDifference();
  635. currentField->fieldMetaData.arrayDimensions = NULL;
  636. currentField->fieldMetaData.properties = NULL;
  637. currentField->fieldMetaData.name = UA_STRING_NULL;
  638. currentField->fieldMetaData.description.locale = UA_STRING_NULL;
  639. currentField->fieldMetaData.description.text = UA_STRING_NULL;
  640. UA_DataSetField_clear(currentField);
  641. TAILQ_REMOVE(&parentPublishedDataSet->fields, currentField, listEntry);
  642. UA_free(currentField);
  643. result.result = UA_STATUSCODE_GOOD;
  644. //regenerate DataSetMetaData
  645. parentPublishedDataSet->dataSetMetaData.fieldsSize--;
  646. if(parentPublishedDataSet->dataSetMetaData.fieldsSize > 0){
  647. for(size_t i = 0; i < parentPublishedDataSet->dataSetMetaData.fieldsSize+1; i++) {
  648. UA_FieldMetaData_clear(&parentPublishedDataSet->dataSetMetaData.fields[i]);
  649. }
  650. UA_free(parentPublishedDataSet->dataSetMetaData.fields);
  651. UA_FieldMetaData *fieldMetaData = (UA_FieldMetaData *) UA_calloc(parentPublishedDataSet->dataSetMetaData.fieldsSize,
  652. sizeof(UA_FieldMetaData));
  653. if(!fieldMetaData){
  654. result.result = UA_STATUSCODE_BADOUTOFMEMORY;
  655. return result;
  656. }
  657. UA_DataSetField *tmpDSF;
  658. size_t counter = 0;
  659. TAILQ_FOREACH(tmpDSF, &parentPublishedDataSet->fields, listEntry){
  660. UA_FieldMetaData tmpFieldMetaData;
  661. UA_FieldMetaData_init(&tmpFieldMetaData);
  662. if(generateFieldMetaData(server, tmpDSF, &tmpFieldMetaData) != UA_STATUSCODE_GOOD){
  663. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  664. "PubSub MetaData generation failed!");
  665. //TODO how to ensure consistency if the metadata regeneration fails
  666. result.result = UA_STATUSCODE_BADINTERNALERROR;
  667. }
  668. fieldMetaData[counter++] = tmpFieldMetaData;
  669. }
  670. parentPublishedDataSet->dataSetMetaData.fields = fieldMetaData;
  671. } else {
  672. UA_FieldMetaData_delete(parentPublishedDataSet->dataSetMetaData.fields);
  673. parentPublishedDataSet->dataSetMetaData.fields = NULL;
  674. }
  675. result.configurationVersion.majorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion;
  676. result.configurationVersion.minorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion;
  677. return result;
  678. }
  679. /**********************************************/
  680. /* DataSetWriter */
  681. /**********************************************/
  682. UA_StatusCode
  683. UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
  684. UA_DataSetWriterConfig *dst){
  685. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  686. memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
  687. retVal |= UA_String_copy(&src->name, &dst->name);
  688. retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
  689. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  690. if (src->dataSetWriterPropertiesSize > 0) {
  691. dst->dataSetWriterProperties = (UA_KeyValuePair *)
  692. UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
  693. if(!dst->dataSetWriterProperties)
  694. return UA_STATUSCODE_BADOUTOFMEMORY;
  695. for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
  696. retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
  697. }
  698. }
  699. return retVal;
  700. }
  701. UA_StatusCode
  702. UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
  703. UA_DataSetWriterConfig *config){
  704. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  705. if(!config)
  706. return UA_STATUSCODE_BADINVALIDARGUMENT;
  707. UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  708. if(!currentDataSetWriter)
  709. return UA_STATUSCODE_BADNOTFOUND;
  710. UA_DataSetWriterConfig tmpWriterConfig;
  711. //deep copy of the actual config
  712. retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
  713. *config = tmpWriterConfig;
  714. return retVal;
  715. }
  716. UA_DataSetWriter *
  717. UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
  718. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  719. UA_WriterGroup *tmpWriterGroup;
  720. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
  721. UA_DataSetWriter *tmpWriter;
  722. LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
  723. if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
  724. return tmpWriter;
  725. }
  726. }
  727. }
  728. }
  729. return NULL;
  730. }
  731. void
  732. UA_DataSetWriterConfig_clear(UA_DataSetWriterConfig *pdsConfig) {
  733. UA_String_clear(&pdsConfig->name);
  734. UA_String_clear(&pdsConfig->dataSetName);
  735. for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
  736. UA_KeyValuePair_clear(&pdsConfig->dataSetWriterProperties[i]);
  737. }
  738. UA_free(pdsConfig->dataSetWriterProperties);
  739. UA_ExtensionObject_clear(&pdsConfig->messageSettings);
  740. }
  741. static void
  742. UA_DataSetWriter_clear(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
  743. UA_DataSetWriterConfig_clear(&dataSetWriter->config);
  744. //delete DataSetWriter
  745. UA_NodeId_clear(&dataSetWriter->identifier);
  746. UA_NodeId_clear(&dataSetWriter->linkedWriterGroup);
  747. UA_NodeId_clear(&dataSetWriter->connectedDataSet);
  748. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  749. //delete lastSamples store
  750. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
  751. UA_DataValue_clear(&dataSetWriter->lastSamples[i].value);
  752. }
  753. UA_free(dataSetWriter->lastSamples);
  754. dataSetWriter->lastSamples = NULL;
  755. dataSetWriter->lastSamplesCount = 0;
  756. #endif
  757. }
  758. //state machine methods not part of the open62541 state machine API
  759. UA_StatusCode
  760. UA_DataSetWriter_setPubSubState(UA_Server *server, UA_PubSubState state, UA_DataSetWriter *dataSetWriter){
  761. switch(state){
  762. case UA_PUBSUBSTATE_DISABLED:
  763. switch (dataSetWriter->state){
  764. case UA_PUBSUBSTATE_DISABLED:
  765. return UA_STATUSCODE_GOOD;
  766. case UA_PUBSUBSTATE_PAUSED:
  767. dataSetWriter->state = UA_PUBSUBSTATE_DISABLED;
  768. //no further action is required
  769. break;
  770. case UA_PUBSUBSTATE_OPERATIONAL:
  771. dataSetWriter->state = UA_PUBSUBSTATE_DISABLED;
  772. break;
  773. case UA_PUBSUBSTATE_ERROR:
  774. break;
  775. default:
  776. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  777. "Received unknown PubSub state!");
  778. }
  779. break;
  780. case UA_PUBSUBSTATE_PAUSED:
  781. switch (dataSetWriter->state){
  782. case UA_PUBSUBSTATE_DISABLED:
  783. break;
  784. case UA_PUBSUBSTATE_PAUSED:
  785. return UA_STATUSCODE_GOOD;
  786. case UA_PUBSUBSTATE_OPERATIONAL:
  787. break;
  788. case UA_PUBSUBSTATE_ERROR:
  789. break;
  790. default:
  791. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  792. "Received unknown PubSub state!");
  793. }
  794. break;
  795. case UA_PUBSUBSTATE_OPERATIONAL:
  796. switch (dataSetWriter->state){
  797. case UA_PUBSUBSTATE_DISABLED:
  798. dataSetWriter->state = UA_PUBSUBSTATE_OPERATIONAL;
  799. break;
  800. case UA_PUBSUBSTATE_PAUSED:
  801. break;
  802. case UA_PUBSUBSTATE_OPERATIONAL:
  803. return UA_STATUSCODE_GOOD;
  804. case UA_PUBSUBSTATE_ERROR:
  805. break;
  806. default:
  807. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  808. "Received unknown PubSub state!");
  809. }
  810. break;
  811. case UA_PUBSUBSTATE_ERROR:
  812. switch (dataSetWriter->state){
  813. case UA_PUBSUBSTATE_DISABLED:
  814. break;
  815. case UA_PUBSUBSTATE_PAUSED:
  816. break;
  817. case UA_PUBSUBSTATE_OPERATIONAL:
  818. break;
  819. case UA_PUBSUBSTATE_ERROR:
  820. return UA_STATUSCODE_GOOD;
  821. default:
  822. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  823. "Received unknown PubSub state!");
  824. }
  825. break;
  826. default:
  827. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  828. "Received unknown PubSub state!");
  829. }
  830. return UA_STATUSCODE_GOOD;
  831. }
  832. /**********************************************/
  833. /* WriterGroup */
  834. /**********************************************/
  835. UA_StatusCode
  836. UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
  837. UA_WriterGroupConfig *dst){
  838. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  839. memcpy(dst, src, sizeof(UA_WriterGroupConfig));
  840. retVal |= UA_String_copy(&src->name, &dst->name);
  841. retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
  842. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  843. if (src->groupPropertiesSize > 0) {
  844. dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
  845. if(!dst->groupProperties)
  846. return UA_STATUSCODE_BADOUTOFMEMORY;
  847. for(size_t i = 0; i < src->groupPropertiesSize; i++){
  848. retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
  849. }
  850. }
  851. return retVal;
  852. }
  853. UA_StatusCode
  854. UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
  855. UA_WriterGroupConfig *config){
  856. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  857. if(!config)
  858. return UA_STATUSCODE_BADINVALIDARGUMENT;
  859. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
  860. if(!currentWriterGroup){
  861. return UA_STATUSCODE_BADNOTFOUND;
  862. }
  863. UA_WriterGroupConfig tmpWriterGroupConfig;
  864. //deep copy of the actual config
  865. retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
  866. *config = tmpWriterGroupConfig;
  867. return retVal;
  868. }
  869. UA_StatusCode
  870. UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
  871. const UA_WriterGroupConfig *config){
  872. if(!config)
  873. return UA_STATUSCODE_BADINVALIDARGUMENT;
  874. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
  875. if(!currentWriterGroup)
  876. return UA_STATUSCODE_BADNOTFOUND;
  877. if(currentWriterGroup->config.configurationFrozen){
  878. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  879. "Modify WriterGroup failed. WriterGroup is frozen.");
  880. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  881. }
  882. //The update functionality will be extended during the next PubSub batches.
  883. //Currently is only a change of the publishing interval possible.
  884. if(currentWriterGroup->config.maxEncapsulatedDataSetMessageCount != config->maxEncapsulatedDataSetMessageCount){
  885. currentWriterGroup->config.maxEncapsulatedDataSetMessageCount = config->maxEncapsulatedDataSetMessageCount;
  886. if(currentWriterGroup->config.messageSettings.encoding == UA_EXTENSIONOBJECT_ENCODED_NOBODY) {
  887. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  888. "MaxEncapsulatedDataSetMessag need enabled 'PayloadHeader' within the message settings.");
  889. }
  890. }
  891. if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
  892. if(currentWriterGroup->config.rtLevel == UA_PUBSUB_RT_NONE && currentWriterGroup->state == UA_PUBSUBSTATE_OPERATIONAL){
  893. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
  894. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  895. UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
  896. } else {
  897. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  898. }
  899. }
  900. if(currentWriterGroup->config.priority != config->priority) {
  901. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  902. "No or unsupported WriterGroup update.");
  903. }
  904. return UA_STATUSCODE_GOOD;
  905. }
  906. UA_WriterGroup *
  907. UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
  908. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  909. UA_WriterGroup *tmpWriterGroup;
  910. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
  911. if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
  912. return tmpWriterGroup;
  913. }
  914. }
  915. }
  916. return NULL;
  917. }
  918. void
  919. UA_WriterGroupConfig_clear(UA_WriterGroupConfig *writerGroupConfig){
  920. //delete writerGroup config
  921. UA_String_clear(&writerGroupConfig->name);
  922. UA_ExtensionObject_clear(&writerGroupConfig->transportSettings);
  923. UA_ExtensionObject_clear(&writerGroupConfig->messageSettings);
  924. for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
  925. UA_KeyValuePair_clear(&writerGroupConfig->groupProperties[i]);
  926. }
  927. UA_free(writerGroupConfig->groupProperties);
  928. }
  929. static void
  930. UA_WriterGroup_clear(UA_Server *server, UA_WriterGroup *writerGroup) {
  931. UA_WriterGroupConfig_clear(&writerGroup->config);
  932. //delete WriterGroup
  933. //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
  934. UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
  935. LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
  936. UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
  937. }
  938. if(writerGroup->bufferedMessage.offsetsSize > 0){
  939. for (size_t i = 0; i < writerGroup->bufferedMessage.offsetsSize; i++) {
  940. if(writerGroup->bufferedMessage.offsets[i].contentType == UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT){
  941. UA_DataValue_delete(writerGroup->bufferedMessage.offsets[i].offsetData.value.value);
  942. }
  943. }
  944. UA_ByteString_deleteMembers(&writerGroup->bufferedMessage.buffer);
  945. UA_free(writerGroup->bufferedMessage.offsets);
  946. }
  947. UA_NodeId_clear(&writerGroup->linkedConnection);
  948. UA_NodeId_clear(&writerGroup->identifier);
  949. }
  950. UA_StatusCode
  951. UA_WriterGroup_setPubSubState(UA_Server *server, UA_PubSubState state, UA_WriterGroup *writerGroup){
  952. UA_DataSetWriter *dataSetWriter;
  953. switch(state){
  954. case UA_PUBSUBSTATE_DISABLED:
  955. switch (writerGroup->state){
  956. case UA_PUBSUBSTATE_DISABLED:
  957. return UA_STATUSCODE_GOOD;
  958. case UA_PUBSUBSTATE_PAUSED:
  959. break;
  960. case UA_PUBSUBSTATE_OPERATIONAL:
  961. UA_PubSubManager_removeRepeatedPubSubCallback(server, writerGroup->publishCallbackId);
  962. LIST_FOREACH(dataSetWriter, &writerGroup->writers, listEntry){
  963. UA_DataSetWriter_setPubSubState(server, UA_PUBSUBSTATE_DISABLED, dataSetWriter);
  964. }
  965. writerGroup->state = UA_PUBSUBSTATE_DISABLED;
  966. break;
  967. case UA_PUBSUBSTATE_ERROR:
  968. break;
  969. default:
  970. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  971. "Received unknown PubSub state!");
  972. }
  973. break;
  974. case UA_PUBSUBSTATE_PAUSED:
  975. switch (writerGroup->state){
  976. case UA_PUBSUBSTATE_DISABLED:
  977. break;
  978. case UA_PUBSUBSTATE_PAUSED:
  979. return UA_STATUSCODE_GOOD;
  980. case UA_PUBSUBSTATE_OPERATIONAL:
  981. break;
  982. case UA_PUBSUBSTATE_ERROR:
  983. break;
  984. default:
  985. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  986. "Received unknown PubSub state!");
  987. }
  988. break;
  989. case UA_PUBSUBSTATE_OPERATIONAL:
  990. switch (writerGroup->state){
  991. case UA_PUBSUBSTATE_DISABLED:
  992. writerGroup->state = UA_PUBSUBSTATE_OPERATIONAL;
  993. UA_PubSubManager_removeRepeatedPubSubCallback(server, writerGroup->publishCallbackId);
  994. LIST_FOREACH(dataSetWriter, &writerGroup->writers, listEntry){
  995. UA_DataSetWriter_setPubSubState(server, UA_PUBSUBSTATE_OPERATIONAL, dataSetWriter);
  996. }
  997. UA_WriterGroup_addPublishCallback(server, writerGroup);
  998. break;
  999. case UA_PUBSUBSTATE_PAUSED:
  1000. break;
  1001. case UA_PUBSUBSTATE_OPERATIONAL:
  1002. return UA_STATUSCODE_GOOD;
  1003. case UA_PUBSUBSTATE_ERROR:
  1004. break;
  1005. default:
  1006. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1007. "Received unknown PubSub state!");
  1008. }
  1009. break;
  1010. case UA_PUBSUBSTATE_ERROR:
  1011. switch (writerGroup->state){
  1012. case UA_PUBSUBSTATE_DISABLED:
  1013. break;
  1014. case UA_PUBSUBSTATE_PAUSED:
  1015. break;
  1016. case UA_PUBSUBSTATE_OPERATIONAL:
  1017. break;
  1018. case UA_PUBSUBSTATE_ERROR:
  1019. return UA_STATUSCODE_GOOD;
  1020. default:
  1021. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1022. "Received unknown PubSub state!");
  1023. }
  1024. break;
  1025. default:
  1026. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1027. "Received unknown PubSub state!");
  1028. }
  1029. return UA_STATUSCODE_GOOD;
  1030. }
  1031. UA_StatusCode
  1032. UA_Server_addDataSetWriter(UA_Server *server,
  1033. const UA_NodeId writerGroup, const UA_NodeId dataSet,
  1034. const UA_DataSetWriterConfig *dataSetWriterConfig,
  1035. UA_NodeId *writerIdentifier) {
  1036. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1037. if(!dataSetWriterConfig)
  1038. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1039. UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
  1040. if(!currentDataSetContext)
  1041. return UA_STATUSCODE_BADNOTFOUND;
  1042. if(currentDataSetContext->config.configurationFrozen){
  1043. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1044. "Adding DataSetWriter failed. PublishedDataSet is frozen.");
  1045. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  1046. }
  1047. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  1048. if(!wg)
  1049. return UA_STATUSCODE_BADNOTFOUND;
  1050. if(wg->config.configurationFrozen){
  1051. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1052. "Adding DataSetWriter failed. WriterGroup is frozen.");
  1053. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  1054. }
  1055. UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
  1056. if(!newDataSetWriter)
  1057. return UA_STATUSCODE_BADOUTOFMEMORY;
  1058. //copy the config into the new dataSetWriter
  1059. UA_DataSetWriterConfig tmpDataSetWriterConfig;
  1060. retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
  1061. newDataSetWriter->config = tmpDataSetWriterConfig;
  1062. //save the current version of the connected PublishedDataSet
  1063. newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
  1064. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1065. //initialize the queue for the last values
  1066. if (currentDataSetContext->fieldSize > 0) {
  1067. newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
  1068. UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
  1069. if(!newDataSetWriter->lastSamples) {
  1070. UA_DataSetWriterConfig_clear(&newDataSetWriter->config);
  1071. UA_free(newDataSetWriter);
  1072. return UA_STATUSCODE_BADOUTOFMEMORY;
  1073. }
  1074. newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
  1075. }
  1076. #endif
  1077. //connect PublishedDataSet with DataSetWriter
  1078. newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
  1079. newDataSetWriter->linkedWriterGroup = wg->identifier;
  1080. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
  1081. if(writerIdentifier != NULL)
  1082. UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
  1083. //add the new writer to the group
  1084. LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
  1085. wg->writersCount++;
  1086. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  1087. addDataSetWriterRepresentation(server, newDataSetWriter);
  1088. #endif
  1089. return retVal;
  1090. }
  1091. UA_StatusCode
  1092. UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
  1093. UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  1094. if(!dataSetWriter)
  1095. return UA_STATUSCODE_BADNOTFOUND;
  1096. if(dataSetWriter->config.configurationFrozen){
  1097. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1098. "Remove DataSetWriter failed. DataSetWriter is frozen.");
  1099. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  1100. }
  1101. UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
  1102. if(!linkedWriterGroup)
  1103. return UA_STATUSCODE_BADNOTFOUND;
  1104. if(linkedWriterGroup->config.configurationFrozen){
  1105. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1106. "Remove DataSetWriter failed. WriterGroup is frozen.");
  1107. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  1108. }
  1109. UA_PublishedDataSet *publishedDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1110. if(!publishedDataSet)
  1111. return UA_STATUSCODE_BADNOTFOUND;
  1112. linkedWriterGroup->writersCount--;
  1113. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  1114. removeDataSetWriterRepresentation(server, dataSetWriter);
  1115. #endif
  1116. //remove DataSetWriter from group
  1117. UA_DataSetWriter_clear(server, dataSetWriter);
  1118. LIST_REMOVE(dataSetWriter, listEntry);
  1119. UA_free(dataSetWriter);
  1120. return UA_STATUSCODE_GOOD;
  1121. }
  1122. /**********************************************/
  1123. /* DataSetField */
  1124. /**********************************************/
  1125. UA_StatusCode
  1126. UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
  1127. memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
  1128. if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
  1129. UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
  1130. UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
  1131. &dst->field.variable.publishParameters);
  1132. } else {
  1133. return UA_STATUSCODE_BADNOTSUPPORTED;
  1134. }
  1135. return UA_STATUSCODE_GOOD;
  1136. }
  1137. UA_StatusCode
  1138. UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
  1139. UA_DataSetFieldConfig *config) {
  1140. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1141. if(!config)
  1142. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1143. UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
  1144. if(!currentDataSetField)
  1145. return UA_STATUSCODE_BADNOTFOUND;
  1146. UA_DataSetFieldConfig tmpFieldConfig;
  1147. //deep copy of the actual config
  1148. retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
  1149. *config = tmpFieldConfig;
  1150. return retVal;
  1151. }
  1152. UA_DataSetField *
  1153. UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
  1154. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  1155. UA_DataSetField *tmpField;
  1156. TAILQ_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
  1157. if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
  1158. return tmpField;
  1159. }
  1160. }
  1161. }
  1162. return NULL;
  1163. }
  1164. void
  1165. UA_DataSetFieldConfig_clear(UA_DataSetFieldConfig *dataSetFieldConfig){
  1166. if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
  1167. UA_String_clear(&dataSetFieldConfig->field.variable.fieldNameAlias);
  1168. UA_PublishedVariableDataType_clear(&dataSetFieldConfig->field.variable.publishParameters);
  1169. }
  1170. }
  1171. static void
  1172. UA_DataSetField_clear(UA_DataSetField *field) {
  1173. UA_DataSetFieldConfig_clear(&field->config);
  1174. //delete DataSetField
  1175. UA_NodeId_clear(&field->identifier);
  1176. UA_NodeId_clear(&field->publishedDataSet);
  1177. UA_FieldMetaData_clear(&field->fieldMetaData);
  1178. }
  1179. /*********************************************************/
  1180. /* PublishValues handling */
  1181. /*********************************************************/
  1182. /**
  1183. * Compare two variants. Internally used for value change detection.
  1184. *
  1185. * @return true if the value has changed
  1186. */
  1187. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1188. static UA_Boolean
  1189. valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
  1190. if(! (oldValue && newValue))
  1191. return false;
  1192. UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
  1193. size_t oldValueEncodingSize, newValueEncodingSize;
  1194. oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1195. newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1196. if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
  1197. return false;
  1198. if(oldValueEncodingSize != newValueEncodingSize)
  1199. return true;
  1200. if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
  1201. return false;
  1202. if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
  1203. return false;
  1204. UA_Byte *bufPosOldValue = oldValueEncoding->data;
  1205. const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
  1206. UA_Byte *bufPosNewValue = newValueEncoding->data;
  1207. const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
  1208. if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
  1209. &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1210. return false;
  1211. }
  1212. if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
  1213. &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1214. return false;
  1215. }
  1216. oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
  1217. newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
  1218. UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
  1219. UA_ByteString_delete(oldValueEncoding);
  1220. UA_ByteString_delete(newValueEncoding);
  1221. return compareResult;
  1222. }
  1223. #endif
  1224. /**
  1225. * Obtain the latest value for a specific DataSetField. This method is currently
  1226. * called inside the DataSetMessage generation process.
  1227. */
  1228. static void
  1229. UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
  1230. UA_DataValue *value) {
  1231. /* Read the value */
  1232. if(field->config.field.variable.staticValueSourceEnabled == UA_FALSE){
  1233. UA_ReadValueId rvid;
  1234. UA_ReadValueId_init(&rvid);
  1235. rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
  1236. rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
  1237. rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
  1238. *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
  1239. } else {
  1240. value->value.storageType = UA_VARIANT_DATA_NODELETE;
  1241. *value = field->config.field.variable.staticValueSource;
  1242. }
  1243. }
  1244. static UA_StatusCode
  1245. UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1246. UA_DataSetWriter *dataSetWriter) {
  1247. UA_PublishedDataSet *currentDataSet =
  1248. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1249. if(!currentDataSet)
  1250. return UA_STATUSCODE_BADNOTFOUND;
  1251. /* Prepare DataSetMessageContent */
  1252. dataSetMessage->header.dataSetMessageValid = true;
  1253. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
  1254. dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
  1255. dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
  1256. UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
  1257. if(!dataSetMessage->data.keyFrameData.dataSetFields)
  1258. return UA_STATUSCODE_BADOUTOFMEMORY;
  1259. #ifdef UA_ENABLE_JSON_ENCODING
  1260. /* json: insert fieldnames used as json keys */
  1261. dataSetMessage->data.keyFrameData.fieldNames =
  1262. (UA_String *)UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_STRING]);
  1263. if(!dataSetMessage->data.keyFrameData.fieldNames)
  1264. return UA_STATUSCODE_BADOUTOFMEMORY;
  1265. #endif
  1266. /* Loop over the fields */
  1267. size_t counter = 0;
  1268. UA_DataSetField *dsf;
  1269. TAILQ_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1270. #ifdef UA_ENABLE_JSON_ENCODING
  1271. /* json: store the fieldNameAlias*/
  1272. UA_String_copy(&dsf->config.field.variable.fieldNameAlias,
  1273. &dataSetMessage->data.keyFrameData.fieldNames[counter]);
  1274. #endif
  1275. /* Sample the value */
  1276. UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
  1277. UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
  1278. /* Deactivate statuscode? */
  1279. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1280. dfv->hasStatus = false;
  1281. /* Deactivate timestamps */
  1282. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1283. (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1284. dfv->hasSourceTimestamp = false;
  1285. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1286. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1287. dfv->hasSourcePicoseconds = false;
  1288. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1289. (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1290. dfv->hasServerTimestamp = false;
  1291. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1292. (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1293. dfv->hasServerPicoseconds = false;
  1294. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1295. /* Update lastValue store */
  1296. UA_DataValue_clear(&dataSetWriter->lastSamples[counter].value);
  1297. UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
  1298. #endif
  1299. counter++;
  1300. }
  1301. return UA_STATUSCODE_GOOD;
  1302. }
  1303. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1304. static UA_StatusCode
  1305. UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
  1306. UA_DataSetMessage *dataSetMessage,
  1307. UA_DataSetWriter *dataSetWriter) {
  1308. UA_PublishedDataSet *currentDataSet =
  1309. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1310. if(!currentDataSet)
  1311. return UA_STATUSCODE_BADNOTFOUND;
  1312. /* Prepare DataSetMessageContent */
  1313. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1314. dataSetMessage->header.dataSetMessageValid = true;
  1315. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
  1316. if (currentDataSet->fieldSize == 0) {
  1317. return UA_STATUSCODE_GOOD;
  1318. }
  1319. UA_DataSetField *dsf;
  1320. size_t counter = 0;
  1321. TAILQ_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1322. /* Sample the value */
  1323. UA_DataValue value;
  1324. UA_DataValue_init(&value);
  1325. UA_PubSubDataSetField_sampleValue(server, dsf, &value);
  1326. /* Check if the value has changed */
  1327. if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
  1328. /* increase fieldCount for current delta message */
  1329. dataSetMessage->data.deltaFrameData.fieldCount++;
  1330. dataSetWriter->lastSamples[counter].valueChanged = true;
  1331. /* Update last stored sample */
  1332. UA_DataValue_clear(&dataSetWriter->lastSamples[counter].value);
  1333. dataSetWriter->lastSamples[counter].value = value;
  1334. } else {
  1335. UA_DataValue_clear(&value);
  1336. dataSetWriter->lastSamples[counter].valueChanged = false;
  1337. }
  1338. counter++;
  1339. }
  1340. /* Allocate DeltaFrameFields */
  1341. UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
  1342. UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
  1343. if(!deltaFields)
  1344. return UA_STATUSCODE_BADOUTOFMEMORY;
  1345. dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
  1346. size_t currentDeltaField = 0;
  1347. for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
  1348. if(!dataSetWriter->lastSamples[i].valueChanged)
  1349. continue;
  1350. UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
  1351. dff->fieldIndex = (UA_UInt16) i;
  1352. UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
  1353. dataSetWriter->lastSamples[i].valueChanged = false;
  1354. /* Deactivate statuscode? */
  1355. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1356. dff->fieldValue.hasStatus = false;
  1357. /* Deactivate timestamps? */
  1358. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1359. dff->fieldValue.hasSourceTimestamp = false;
  1360. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1361. dff->fieldValue.hasServerPicoseconds = false;
  1362. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1363. dff->fieldValue.hasServerTimestamp = false;
  1364. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1365. dff->fieldValue.hasServerPicoseconds = false;
  1366. currentDeltaField++;
  1367. }
  1368. return UA_STATUSCODE_GOOD;
  1369. }
  1370. #endif
  1371. /**
  1372. * Generate a DataSetMessage for the given writer.
  1373. *
  1374. * @param dataSetWriter ptr to corresponding writer
  1375. * @return ptr to generated DataSetMessage
  1376. */
  1377. static UA_StatusCode
  1378. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1379. UA_DataSetWriter *dataSetWriter) {
  1380. UA_PublishedDataSet *currentDataSet =
  1381. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1382. if(!currentDataSet)
  1383. return UA_STATUSCODE_BADNOTFOUND;
  1384. /* Reset the message */
  1385. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1386. /* store messageType to switch between json or uadp (default) */
  1387. UA_UInt16 messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1388. UA_JsonDataSetWriterMessageDataType *jsonDataSetWriterMessageDataType = NULL;
  1389. /* The configuration Flags are included
  1390. * inside the std. defined UA_UadpDataSetWriterMessageDataType */
  1391. UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
  1392. UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
  1393. if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1394. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1395. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1396. &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
  1397. dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
  1398. dataSetWriter->config.messageSettings.content.decoded.data;
  1399. /* type is UADP */
  1400. messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1401. } else if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1402. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1403. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1404. &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE])) {
  1405. jsonDataSetWriterMessageDataType = (UA_JsonDataSetWriterMessageDataType *)
  1406. dataSetWriter->config.messageSettings.content.decoded.data;
  1407. /* type is JSON */
  1408. messageType = UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE;
  1409. } else {
  1410. /* create default flag configuration if no
  1411. * UadpDataSetWriterMessageDataType was passed in */
  1412. memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
  1413. defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
  1414. ((u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
  1415. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
  1416. dataSetWriterMessageDataType = &defaultUadpConfiguration;
  1417. }
  1418. /* Sanity-test the configuration */
  1419. if(dataSetWriterMessageDataType &&
  1420. (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
  1421. dataSetWriterMessageDataType->dataSetOffset != 0 ||
  1422. dataSetWriterMessageDataType->configuredSize != 0)) {
  1423. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1424. "Static DSM configuration not supported. Using defaults");
  1425. dataSetWriterMessageDataType->networkMessageNumber = 0;
  1426. dataSetWriterMessageDataType->dataSetOffset = 0;
  1427. dataSetWriterMessageDataType->configuredSize = 0;
  1428. }
  1429. /* The field encoding depends on the flags inside the writer config.
  1430. * TODO: This can be moved to the encoding layer. */
  1431. if(dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_RAWDATA
  1432. ) {
  1433. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
  1434. } else if((u64)dataSetWriter->config.dataSetFieldContentMask &
  1435. ((u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
  1436. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
  1437. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
  1438. } else {
  1439. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
  1440. }
  1441. if(messageType == UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE) {
  1442. /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
  1443. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1444. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION) {
  1445. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1446. dataSetMessage->header.configVersionMajorVersion =
  1447. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1448. }
  1449. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1450. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION) {
  1451. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1452. dataSetMessage->header.configVersionMinorVersion =
  1453. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1454. }
  1455. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1456. (u64)UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1457. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1458. dataSetMessage->header.dataSetMessageSequenceNr =
  1459. dataSetWriter->actualDataSetMessageSequenceCount;
  1460. }
  1461. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1462. (u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1463. dataSetMessage->header.timestampEnabled = true;
  1464. dataSetMessage->header.timestamp = UA_DateTime_now();
  1465. }
  1466. /* TODO: Picoseconds resolution not supported atm */
  1467. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1468. (u64)UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
  1469. dataSetMessage->header.picoSecondsIncluded = false;
  1470. }
  1471. /* TODO: Statuscode not supported yet */
  1472. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1473. (u64)UA_UADPDATASETMESSAGECONTENTMASK_STATUS) {
  1474. dataSetMessage->header.statusEnabled = false;
  1475. }
  1476. } else if(messageType == UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE) {
  1477. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1478. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1479. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1480. dataSetMessage->header.configVersionMajorVersion =
  1481. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1482. }
  1483. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1484. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1485. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1486. dataSetMessage->header.configVersionMinorVersion =
  1487. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1488. }
  1489. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1490. (u64)UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1491. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1492. dataSetMessage->header.dataSetMessageSequenceNr =
  1493. dataSetWriter->actualDataSetMessageSequenceCount;
  1494. }
  1495. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1496. (u64)UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1497. dataSetMessage->header.timestampEnabled = true;
  1498. dataSetMessage->header.timestamp = UA_DateTime_now();
  1499. }
  1500. /* TODO: Statuscode not supported yet */
  1501. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1502. (u64)UA_JSONDATASETMESSAGECONTENTMASK_STATUS) {
  1503. dataSetMessage->header.statusEnabled = false;
  1504. }
  1505. }
  1506. /* Set the sequence count. Automatically rolls over to zero */
  1507. dataSetWriter->actualDataSetMessageSequenceCount++;
  1508. /* JSON does not differ between deltaframes and keyframes, only keyframes are currently used. */
  1509. if(messageType != UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
  1510. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1511. /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
  1512. if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
  1513. dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
  1514. /* Remove old samples */
  1515. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
  1516. UA_DataValue_clear(&dataSetWriter->lastSamples[i].value);
  1517. /* Realloc pds dependent memory */
  1518. dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
  1519. UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
  1520. UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1521. if(!newSamplesArray)
  1522. return UA_STATUSCODE_BADOUTOFMEMORY;
  1523. dataSetWriter->lastSamples = newSamplesArray;
  1524. memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1525. dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
  1526. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1527. dataSetWriter->deltaFrameCounter = 0;
  1528. return UA_STATUSCODE_GOOD;
  1529. }
  1530. /* The standard defines: if a PDS contains only one fields no delta messages
  1531. * should be generated because they need more memory than a keyframe with 1
  1532. * field. */
  1533. if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
  1534. dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
  1535. UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
  1536. dataSetWriter->deltaFrameCounter++;
  1537. return UA_STATUSCODE_GOOD;
  1538. }
  1539. dataSetWriter->deltaFrameCounter = 1;
  1540. #endif
  1541. }
  1542. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1543. return UA_STATUSCODE_GOOD;
  1544. }
  1545. static UA_StatusCode
  1546. sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
  1547. UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
  1548. UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
  1549. #ifdef UA_ENABLE_JSON_ENCODING
  1550. UA_NetworkMessage nm;
  1551. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1552. nm.version = 1;
  1553. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1554. nm.payloadHeaderEnabled = true;
  1555. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1556. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1557. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1558. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1559. UA_ByteString buf;
  1560. size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true);
  1561. size_t stackSize = 1;
  1562. if(msgSize <= UA_MAX_STACKBUF)
  1563. stackSize = msgSize;
  1564. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1565. buf.data = stackBuf;
  1566. buf.length = msgSize;
  1567. if(msgSize > UA_MAX_STACKBUF) {
  1568. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1569. if(retval != UA_STATUSCODE_GOOD)
  1570. return retval;
  1571. }
  1572. /* Encode the message */
  1573. UA_Byte *bufPos = buf.data;
  1574. memset(bufPos, 0, msgSize);
  1575. const UA_Byte *bufEnd = &buf.data[buf.length];
  1576. retval = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true);
  1577. if(retval != UA_STATUSCODE_GOOD) {
  1578. if(msgSize > UA_MAX_STACKBUF)
  1579. UA_ByteString_clear(&buf);
  1580. return retval;
  1581. }
  1582. /* Send the prepared messages */
  1583. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1584. if(msgSize > UA_MAX_STACKBUF)
  1585. UA_ByteString_clear(&buf);
  1586. #endif
  1587. return retval;
  1588. }
  1589. static UA_StatusCode
  1590. generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
  1591. UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
  1592. UA_ExtensionObject *messageSettings,
  1593. UA_ExtensionObject *transportSettings,
  1594. UA_NetworkMessage *networkMessage) {
  1595. if(messageSettings->content.decoded.type !=
  1596. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
  1597. return UA_STATUSCODE_BADINTERNALERROR;
  1598. UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
  1599. messageSettings->content.decoded.data;
  1600. memset(networkMessage, 0, sizeof(UA_NetworkMessage));
  1601. networkMessage->publisherIdEnabled =
  1602. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
  1603. networkMessage->groupHeaderEnabled =
  1604. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
  1605. networkMessage->groupHeader.writerGroupIdEnabled =
  1606. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
  1607. networkMessage->groupHeader.groupVersionEnabled =
  1608. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
  1609. networkMessage->groupHeader.networkMessageNumberEnabled =
  1610. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
  1611. networkMessage->groupHeader.sequenceNumberEnabled =
  1612. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
  1613. networkMessage->payloadHeaderEnabled =
  1614. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
  1615. networkMessage->timestampEnabled =
  1616. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
  1617. networkMessage->picosecondsEnabled =
  1618. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
  1619. networkMessage->dataSetClassIdEnabled =
  1620. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
  1621. networkMessage->promotedFieldsEnabled =
  1622. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
  1623. networkMessage->version = 1;
  1624. networkMessage->networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1625. if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
  1626. networkMessage->publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
  1627. networkMessage->publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
  1628. } else if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
  1629. networkMessage->publisherIdType = UA_PUBLISHERDATATYPE_STRING;
  1630. networkMessage->publisherId.publisherIdString = connection->config->publisherId.string;
  1631. }
  1632. if(networkMessage->groupHeader.sequenceNumberEnabled)
  1633. networkMessage->groupHeader.sequenceNumber = wg->sequenceNumber;
  1634. /* Compute the length of the dsm separately for the header */
  1635. UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
  1636. for(UA_Byte i = 0; i < dsmCount; i++)
  1637. dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
  1638. networkMessage->payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1639. networkMessage->payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1640. networkMessage->groupHeader.writerGroupId = wg->config.writerGroupId;
  1641. /* number of the NetworkMessage inside a PublishingInterval */
  1642. networkMessage->groupHeader.networkMessageNumber = 1;
  1643. networkMessage->payload.dataSetPayload.sizes = dsmLengths;
  1644. networkMessage->payload.dataSetPayload.dataSetMessages = dsm;
  1645. return UA_STATUSCODE_GOOD;
  1646. }
  1647. static UA_StatusCode
  1648. sendBufferedNetworkMessage(UA_Server *server, UA_PubSubConnection *connection,
  1649. UA_NetworkMessageOffsetBuffer *buffer, UA_ExtensionObject *transportSettings) {
  1650. if(UA_NetworkMessage_updateBufferedMessage(buffer) != UA_STATUSCODE_GOOD)
  1651. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "PubSub sending. Unknown field type.");
  1652. UA_StatusCode retval = connection->channel->send(connection->channel, transportSettings, &buffer->buffer);
  1653. return retval;
  1654. }
  1655. static UA_StatusCode
  1656. sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
  1657. UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
  1658. UA_ExtensionObject *messageSettings,
  1659. UA_ExtensionObject *transportSettings) {
  1660. UA_NetworkMessage nm;
  1661. generateNetworkMessage(connection, wg, dsm, writerIds, dsmCount, messageSettings, transportSettings, &nm);
  1662. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1663. UA_ByteString buf;
  1664. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
  1665. size_t stackSize = 1;
  1666. if(msgSize <= UA_MAX_STACKBUF)
  1667. stackSize = msgSize;
  1668. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1669. buf.data = stackBuf;
  1670. buf.length = msgSize;
  1671. UA_StatusCode retval;
  1672. if(msgSize > UA_MAX_STACKBUF) {
  1673. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1674. if(retval != UA_STATUSCODE_GOOD)
  1675. return retval;
  1676. }
  1677. /* Encode the message */
  1678. UA_Byte *bufPos = buf.data;
  1679. memset(bufPos, 0, msgSize);
  1680. const UA_Byte *bufEnd = &buf.data[buf.length];
  1681. retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
  1682. if(retval != UA_STATUSCODE_GOOD) {
  1683. if(msgSize > UA_MAX_STACKBUF)
  1684. UA_ByteString_clear(&buf);
  1685. return retval;
  1686. }
  1687. /* Send the prepared messages */
  1688. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1689. if(msgSize > UA_MAX_STACKBUF)
  1690. UA_ByteString_clear(&buf);
  1691. return retval;
  1692. }
  1693. /* This callback triggers the collection and publish of NetworkMessages and the
  1694. * contained DataSetMessages. */
  1695. void
  1696. UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1697. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Publish Callback");
  1698. if(!writerGroup) {
  1699. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1700. "Publish failed. WriterGroup not found");
  1701. return;
  1702. }
  1703. /* Nothing to do? */
  1704. if(writerGroup->writersCount <= 0)
  1705. return;
  1706. /* Binary or Json encoding? */
  1707. if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
  1708. writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
  1709. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1710. "Publish failed: Unknown encoding type.");
  1711. return;
  1712. }
  1713. /* Find the connection associated with the writer */
  1714. UA_PubSubConnection *connection = writerGroup->linkedConnectionPtr;
  1715. if(!connection) {
  1716. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1717. "Publish failed. PubSubConnection invalid.");
  1718. return;
  1719. }
  1720. if(writerGroup->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE) {
  1721. sendBufferedNetworkMessage(server, connection, &writerGroup->bufferedMessage, &writerGroup->config.transportSettings);
  1722. writerGroup->sequenceNumber++;
  1723. return;
  1724. }
  1725. /* How many DSM can be sent in one NM? */
  1726. UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
  1727. if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
  1728. maxDSM = UA_BYTE_MAX;
  1729. /* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
  1730. if(maxDSM == 0)
  1731. maxDSM = 1;
  1732. /* It is possible to put several DataSetMessages into one NetworkMessage.
  1733. * But only if they do not contain promoted fields. NM with only DSM are
  1734. * sent out right away. The others are kept in a buffer for "batching". */
  1735. size_t dsmCount = 0;
  1736. UA_DataSetWriter *dsw;
  1737. UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
  1738. UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
  1739. LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
  1740. /* Find the dataset */
  1741. UA_PublishedDataSet *pds =
  1742. UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
  1743. if(!pds) {
  1744. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1745. "PubSub Publish: PublishedDataSet not found");
  1746. continue;
  1747. }
  1748. /* Generate the DSM */
  1749. UA_StatusCode res =
  1750. UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
  1751. if(res != UA_STATUSCODE_GOOD) {
  1752. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1753. "PubSub Publish: DataSetMessage creation failed");
  1754. continue;
  1755. }
  1756. /* Send right away if there is only this DSM in a NM. If promoted fields
  1757. * are contained in the PublishedDataSet, then this DSM must go into a
  1758. * dedicated NM as well. */
  1759. if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
  1760. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1761. res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
  1762. &dsw->config.dataSetWriterId, 1,
  1763. &writerGroup->config.messageSettings,
  1764. &writerGroup->config.transportSettings);
  1765. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1766. res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
  1767. &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
  1768. }
  1769. if(res != UA_STATUSCODE_GOOD)
  1770. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1771. "PubSub Publish: Could not send a NetworkMessage");
  1772. UA_DataSetMessage_free(&dsmStore[dsmCount]);
  1773. continue;
  1774. }
  1775. dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
  1776. dsmCount++;
  1777. }
  1778. /* Send the NetworkMessages with batched DataSetMessages */
  1779. size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
  1780. for(UA_UInt32 i = 0; i < nmCount; i++) {
  1781. UA_Byte nmDsmCount = maxDSM;
  1782. if(i == nmCount - 1 && (dsmCount % maxDSM))
  1783. nmDsmCount = (UA_Byte)dsmCount % maxDSM;
  1784. UA_StatusCode res3 = UA_STATUSCODE_GOOD;
  1785. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1786. res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
  1787. &dsWriterIds[i * maxDSM], nmDsmCount,
  1788. &writerGroup->config.messageSettings,
  1789. &writerGroup->config.transportSettings);
  1790. writerGroup->sequenceNumber++;
  1791. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1792. res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
  1793. &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
  1794. writerGroup->sequenceNumber++;
  1795. }
  1796. if(res3 != UA_STATUSCODE_GOOD)
  1797. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1798. "PubSub Publish: Sending a NetworkMessage failed");
  1799. }
  1800. /* Clean up DSM */
  1801. for(size_t i = 0; i < dsmCount; i++)
  1802. UA_DataSetMessage_free(&dsmStore[i]);
  1803. }
  1804. /* Add new publishCallback. The first execution is triggered directly after
  1805. * creation. */
  1806. UA_StatusCode
  1807. UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1808. UA_StatusCode retval =
  1809. UA_PubSubManager_addRepeatedCallback(server,
  1810. (UA_ServerCallback) UA_WriterGroup_publishCallback,
  1811. writerGroup, writerGroup->config.publishingInterval,
  1812. &writerGroup->publishCallbackId);
  1813. if(retval == UA_STATUSCODE_GOOD)
  1814. writerGroup->publishCallbackIsRegistered = true;
  1815. /* Run once after creation */
  1816. UA_WriterGroup_publishCallback(server, writerGroup);
  1817. return retval;
  1818. }
  1819. #endif /* UA_ENABLE_PUBSUB */