ua_pubsub.c 59 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. */
  8. #include "server/ua_server_internal.h"
  9. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  10. #include "ua_pubsub.h"
  11. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  12. #include "ua_pubsub_ns0.h"
  13. #endif
  14. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  15. #include "ua_types_encoding_binary.h"
  16. #endif
  17. #define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */
  18. /* Forward declaration */
  19. static void
  20. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup);
  21. static void
  22. UA_DataSetField_deleteMembers(UA_DataSetField *field);
  23. /**********************************************/
  24. /* Connection */
  25. /**********************************************/
  26. UA_StatusCode
  27. UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
  28. UA_PubSubConnectionConfig *dst) {
  29. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  30. memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
  31. retVal |= UA_String_copy(&src->name, &dst->name);
  32. retVal |= UA_Variant_copy(&src->address, &dst->address);
  33. retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
  34. retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
  35. if(src->connectionPropertiesSize > 0){
  36. dst->connectionProperties = (UA_KeyValuePair *)
  37. UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
  38. if(!dst->connectionProperties){
  39. return UA_STATUSCODE_BADOUTOFMEMORY;
  40. }
  41. for(size_t i = 0; i < src->connectionPropertiesSize; i++){
  42. retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
  43. &dst->connectionProperties[i].key);
  44. retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
  45. &dst->connectionProperties[i].value);
  46. }
  47. }
  48. return retVal;
  49. }
  50. UA_StatusCode
  51. UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
  52. UA_PubSubConnectionConfig *config) {
  53. if(!config)
  54. return UA_STATUSCODE_BADINVALIDARGUMENT;
  55. UA_PubSubConnection *currentPubSubConnection =
  56. UA_PubSubConnection_findConnectionbyId(server, connection);
  57. if(!currentPubSubConnection)
  58. return UA_STATUSCODE_BADNOTFOUND;
  59. UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
  60. //deep copy of the actual config
  61. UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
  62. *config = tmpPubSubConnectionConfig;
  63. return UA_STATUSCODE_GOOD;
  64. }
  65. UA_PubSubConnection *
  66. UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
  67. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  68. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
  69. return &server->pubSubManager.connections[i];
  70. }
  71. }
  72. return NULL;
  73. }
  74. void
  75. UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig) {
  76. UA_String_deleteMembers(&connectionConfig->name);
  77. UA_String_deleteMembers(&connectionConfig->transportProfileUri);
  78. UA_Variant_deleteMembers(&connectionConfig->connectionTransportSettings);
  79. UA_Variant_deleteMembers(&connectionConfig->address);
  80. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  81. UA_QualifiedName_deleteMembers(&connectionConfig->connectionProperties[i].key);
  82. UA_Variant_deleteMembers(&connectionConfig->connectionProperties[i].value);
  83. }
  84. UA_free(connectionConfig->connectionProperties);
  85. }
  86. void
  87. UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection) {
  88. //delete connection config
  89. UA_PubSubConnectionConfig_deleteMembers(connection->config);
  90. //remove contained WriterGroups
  91. UA_WriterGroup *writerGroup, *tmpWriterGroup;
  92. LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
  93. UA_Server_removeWriterGroup(server, writerGroup->identifier);
  94. }
  95. UA_NodeId_deleteMembers(&connection->identifier);
  96. if(connection->channel){
  97. connection->channel->close(connection->channel);
  98. }
  99. UA_free(connection->config);
  100. }
  101. UA_StatusCode
  102. UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
  103. const UA_WriterGroupConfig *writerGroupConfig,
  104. UA_NodeId *writerGroupIdentifier) {
  105. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  106. if(!writerGroupConfig)
  107. return UA_STATUSCODE_BADINVALIDARGUMENT;
  108. //search the connection by the given connectionIdentifier
  109. UA_PubSubConnection *currentConnectionContext =
  110. UA_PubSubConnection_findConnectionbyId(server, connection);
  111. if(!currentConnectionContext)
  112. return UA_STATUSCODE_BADNOTFOUND;
  113. //allocate memory for new WriterGroup
  114. UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
  115. if (!newWriterGroup)
  116. return UA_STATUSCODE_BADOUTOFMEMORY;
  117. newWriterGroup->linkedConnection = currentConnectionContext->identifier;
  118. UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
  119. if(writerGroupIdentifier){
  120. UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
  121. }
  122. //deep copy of the config
  123. UA_WriterGroupConfig tmpWriterGroupConfig;
  124. retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
  125. if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
  126. UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
  127. tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
  128. tmpWriterGroupConfig.messageSettings.content.decoded.type =
  129. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
  130. tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
  131. }
  132. newWriterGroup->config = tmpWriterGroupConfig;
  133. retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
  134. LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
  135. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  136. addWriterGroupRepresentation(server, newWriterGroup);
  137. #endif
  138. return retVal;
  139. }
  140. UA_StatusCode
  141. UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
  142. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  143. if(!wg)
  144. return UA_STATUSCODE_BADNOTFOUND;
  145. UA_PubSubConnection *connection =
  146. UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  147. if(!connection)
  148. return UA_STATUSCODE_BADNOTFOUND;
  149. //unregister the publish callback
  150. UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
  151. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  152. removeGroupRepresentation(server, wg);
  153. #endif
  154. UA_WriterGroup_deleteMembers(server, wg);
  155. LIST_REMOVE(wg, listEntry);
  156. UA_free(wg);
  157. return UA_STATUSCODE_GOOD;
  158. }
  159. /**********************************************/
  160. /* PublishedDataSet */
  161. /**********************************************/
  162. UA_StatusCode
  163. UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
  164. UA_PublishedDataSetConfig *dst) {
  165. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  166. memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
  167. retVal |= UA_String_copy(&src->name, &dst->name);
  168. switch(src->publishedDataSetType){
  169. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  170. //no additional items
  171. break;
  172. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  173. if (src->config.itemsTemplate.variablesToAddSize > 0){
  174. dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
  175. src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
  176. }
  177. for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
  178. retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
  179. &dst->config.itemsTemplate.variablesToAdd[i]);
  180. }
  181. retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
  182. &dst->config.itemsTemplate.metaData);
  183. break;
  184. default:
  185. return UA_STATUSCODE_BADINVALIDARGUMENT;
  186. }
  187. return retVal;
  188. }
  189. UA_StatusCode
  190. UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
  191. UA_PublishedDataSetConfig *config){
  192. if(!config)
  193. return UA_STATUSCODE_BADINVALIDARGUMENT;
  194. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  195. if(!currentPublishedDataSet)
  196. return UA_STATUSCODE_BADNOTFOUND;
  197. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  198. //deep copy of the actual config
  199. UA_PublishedDataSetConfig_copy(&currentPublishedDataSet->config, &tmpPublishedDataSetConfig);
  200. *config = tmpPublishedDataSetConfig;
  201. return UA_STATUSCODE_GOOD;
  202. }
  203. UA_PublishedDataSet *
  204. UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
  205. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  206. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
  207. return &server->pubSubManager.publishedDataSets[i];
  208. }
  209. }
  210. return NULL;
  211. }
  212. void
  213. UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
  214. //delete pds config
  215. UA_String_deleteMembers(&pdsConfig->name);
  216. switch (pdsConfig->publishedDataSetType){
  217. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  218. //no additional items
  219. break;
  220. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  221. if (pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
  222. for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
  223. UA_PublishedVariableDataType_deleteMembers(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
  224. }
  225. UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
  226. }
  227. UA_DataSetMetaDataType_deleteMembers(&pdsConfig->config.itemsTemplate.metaData);
  228. break;
  229. default:
  230. break;
  231. }
  232. }
  233. void
  234. UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
  235. UA_PublishedDataSetConfig_deleteMembers(&publishedDataSet->config);
  236. //delete PDS
  237. UA_DataSetMetaDataType_deleteMembers(&publishedDataSet->dataSetMetaData);
  238. UA_DataSetField *field, *tmpField;
  239. LIST_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
  240. UA_Server_removeDataSetField(server, field->identifier);
  241. }
  242. UA_NodeId_deleteMembers(&publishedDataSet->identifier);
  243. }
  244. UA_DataSetFieldResult
  245. UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
  246. const UA_DataSetFieldConfig *fieldConfig,
  247. UA_NodeId *fieldIdentifier) {
  248. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  249. UA_DataSetFieldResult result = {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
  250. if(!fieldConfig)
  251. return result;
  252. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
  253. if(currentDataSet == NULL){
  254. result.result = UA_STATUSCODE_BADNOTFOUND;
  255. return result;
  256. }
  257. if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
  258. result.result = UA_STATUSCODE_BADNOTIMPLEMENTED;
  259. return result;
  260. }
  261. UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
  262. if(!newField){
  263. result.result = UA_STATUSCODE_BADINTERNALERROR;
  264. return result;
  265. }
  266. UA_DataSetFieldConfig tmpFieldConfig;
  267. retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
  268. newField->config = tmpFieldConfig;
  269. UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
  270. if(fieldIdentifier != NULL){
  271. UA_NodeId_copy(&newField->identifier, fieldIdentifier);
  272. }
  273. newField->publishedDataSet = currentDataSet->identifier;
  274. //update major version of parent published data set
  275. currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
  276. LIST_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
  277. if(newField->config.field.variable.promotedField)
  278. currentDataSet->promotedFieldsCount++;
  279. currentDataSet->fieldSize++;
  280. result.result = retVal;
  281. result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  282. result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  283. return result;
  284. }
  285. UA_DataSetFieldResult
  286. UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
  287. UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
  288. UA_DataSetFieldResult result = {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  289. if(!currentField)
  290. return result;
  291. UA_PublishedDataSet *parentPublishedDataSet =
  292. UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
  293. if(!parentPublishedDataSet)
  294. return result;
  295. parentPublishedDataSet->fieldSize--;
  296. if(currentField->config.field.variable.promotedField)
  297. parentPublishedDataSet->promotedFieldsCount--;
  298. /* update major version of PublishedDataSet */
  299. parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
  300. UA_PubSubConfigurationVersionTimeDifference();
  301. UA_DataSetField_deleteMembers(currentField);
  302. LIST_REMOVE(currentField, listEntry);
  303. UA_free(currentField);
  304. result.result = UA_STATUSCODE_GOOD;
  305. result.configurationVersion.majorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion;
  306. result.configurationVersion.minorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion;
  307. return result;
  308. }
  309. /**********************************************/
  310. /* DataSetWriter */
  311. /**********************************************/
  312. UA_StatusCode
  313. UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
  314. UA_DataSetWriterConfig *dst){
  315. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  316. memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
  317. retVal |= UA_String_copy(&src->name, &dst->name);
  318. retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
  319. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  320. dst->dataSetWriterProperties = (UA_KeyValuePair *)
  321. UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
  322. if(!dst->dataSetWriterProperties)
  323. return UA_STATUSCODE_BADOUTOFMEMORY;
  324. for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
  325. retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
  326. }
  327. return retVal;
  328. }
  329. UA_StatusCode
  330. UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
  331. UA_DataSetWriterConfig *config){
  332. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  333. if(!config)
  334. return UA_STATUSCODE_BADINVALIDARGUMENT;
  335. UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  336. if(!currentDataSetWriter)
  337. return UA_STATUSCODE_BADNOTFOUND;
  338. UA_DataSetWriterConfig tmpWriterConfig;
  339. //deep copy of the actual config
  340. retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
  341. *config = tmpWriterConfig;
  342. return retVal;
  343. }
  344. UA_DataSetWriter *
  345. UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
  346. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  347. UA_WriterGroup *tmpWriterGroup;
  348. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
  349. UA_DataSetWriter *tmpWriter;
  350. LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
  351. if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
  352. return tmpWriter;
  353. }
  354. }
  355. }
  356. }
  357. return NULL;
  358. }
  359. void
  360. UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig) {
  361. UA_String_deleteMembers(&pdsConfig->name);
  362. UA_String_deleteMembers(&pdsConfig->dataSetName);
  363. for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
  364. UA_KeyValuePair_deleteMembers(&pdsConfig->dataSetWriterProperties[i]);
  365. }
  366. UA_free(pdsConfig->dataSetWriterProperties);
  367. UA_ExtensionObject_deleteMembers(&pdsConfig->messageSettings);
  368. }
  369. static void
  370. UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
  371. UA_DataSetWriterConfig_deleteMembers(&dataSetWriter->config);
  372. //delete DataSetWriter
  373. UA_NodeId_deleteMembers(&dataSetWriter->identifier);
  374. UA_NodeId_deleteMembers(&dataSetWriter->linkedWriterGroup);
  375. UA_NodeId_deleteMembers(&dataSetWriter->connectedDataSet);
  376. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  377. //delete lastSamples store
  378. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
  379. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  380. }
  381. UA_free(dataSetWriter->lastSamples);
  382. dataSetWriter->lastSamples = NULL;
  383. dataSetWriter->lastSamplesCount = 0;
  384. #endif
  385. }
  386. /**********************************************/
  387. /* WriterGroup */
  388. /**********************************************/
  389. UA_StatusCode
  390. UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
  391. UA_WriterGroupConfig *dst){
  392. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  393. memcpy(dst, src, sizeof(UA_WriterGroupConfig));
  394. retVal |= UA_String_copy(&src->name, &dst->name);
  395. retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
  396. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  397. dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
  398. if(!dst->groupProperties)
  399. return UA_STATUSCODE_BADOUTOFMEMORY;
  400. for(size_t i = 0; i < src->groupPropertiesSize; i++){
  401. retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
  402. }
  403. return retVal;
  404. }
  405. UA_StatusCode
  406. UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
  407. UA_WriterGroupConfig *config){
  408. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  409. if(!config)
  410. return UA_STATUSCODE_BADINVALIDARGUMENT;
  411. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
  412. if(!currentWriterGroup){
  413. return UA_STATUSCODE_BADNOTFOUND;
  414. }
  415. UA_WriterGroupConfig tmpWriterGroupConfig;
  416. //deep copy of the actual config
  417. retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
  418. *config = tmpWriterGroupConfig;
  419. return retVal;
  420. }
  421. UA_StatusCode
  422. UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
  423. const UA_WriterGroupConfig *config){
  424. if(!config)
  425. return UA_STATUSCODE_BADINVALIDARGUMENT;
  426. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
  427. if(!currentWriterGroup)
  428. return UA_STATUSCODE_BADNOTFOUND;
  429. //The update functionality will be extended during the next PubSub batches.
  430. //Currently is only a change of the publishing interval possible.
  431. if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
  432. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
  433. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  434. UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
  435. } else if(currentWriterGroup->config.priority != config->priority) {
  436. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  437. "No or unsupported WriterGroup update.");
  438. }
  439. return UA_STATUSCODE_GOOD;
  440. }
  441. UA_WriterGroup *
  442. UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
  443. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  444. UA_WriterGroup *tmpWriterGroup;
  445. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
  446. if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
  447. return tmpWriterGroup;
  448. }
  449. }
  450. }
  451. return NULL;
  452. }
  453. void
  454. UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig){
  455. //delete writerGroup config
  456. UA_String_deleteMembers(&writerGroupConfig->name);
  457. UA_ExtensionObject_deleteMembers(&writerGroupConfig->transportSettings);
  458. UA_ExtensionObject_deleteMembers(&writerGroupConfig->messageSettings);
  459. for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
  460. UA_KeyValuePair_deleteMembers(&writerGroupConfig->groupProperties[i]);
  461. }
  462. UA_free(writerGroupConfig->groupProperties);
  463. }
  464. static void
  465. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup) {
  466. UA_WriterGroupConfig_deleteMembers(&writerGroup->config);
  467. //delete WriterGroup
  468. //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
  469. UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
  470. LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
  471. UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
  472. }
  473. UA_NodeId_deleteMembers(&writerGroup->linkedConnection);
  474. UA_NodeId_deleteMembers(&writerGroup->identifier);
  475. }
  476. UA_StatusCode
  477. UA_Server_addDataSetWriter(UA_Server *server,
  478. const UA_NodeId writerGroup, const UA_NodeId dataSet,
  479. const UA_DataSetWriterConfig *dataSetWriterConfig,
  480. UA_NodeId *writerIdentifier) {
  481. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  482. if(!dataSetWriterConfig)
  483. return UA_STATUSCODE_BADINVALIDARGUMENT;
  484. UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
  485. if(!currentDataSetContext)
  486. return UA_STATUSCODE_BADNOTFOUND;
  487. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  488. if(!wg)
  489. return UA_STATUSCODE_BADNOTFOUND;
  490. UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
  491. if(!newDataSetWriter)
  492. return UA_STATUSCODE_BADOUTOFMEMORY;
  493. //copy the config into the new dataSetWriter
  494. UA_DataSetWriterConfig tmpDataSetWriterConfig;
  495. retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
  496. newDataSetWriter->config = tmpDataSetWriterConfig;
  497. //save the current version of the connected PublishedDataSet
  498. newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
  499. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  500. //initialize the queue for the last values
  501. newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
  502. UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
  503. if(!newDataSetWriter->lastSamples) {
  504. UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
  505. UA_free(newDataSetWriter);
  506. return UA_STATUSCODE_BADOUTOFMEMORY;
  507. }
  508. newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
  509. #endif
  510. //connect PublishedDataSet with DataSetWriter
  511. newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
  512. newDataSetWriter->linkedWriterGroup = wg->identifier;
  513. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
  514. if(writerIdentifier != NULL)
  515. UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
  516. //add the new writer to the group
  517. LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
  518. wg->writersCount++;
  519. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  520. addDataSetWriterRepresentation(server, newDataSetWriter);
  521. #endif
  522. return retVal;
  523. }
  524. UA_StatusCode
  525. UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
  526. UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  527. if(!dataSetWriter)
  528. return UA_STATUSCODE_BADNOTFOUND;
  529. UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
  530. if(!linkedWriterGroup)
  531. return UA_STATUSCODE_BADNOTFOUND;
  532. linkedWriterGroup->writersCount--;
  533. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  534. removeDataSetWriterRepresentation(server, dataSetWriter);
  535. #endif
  536. //remove DataSetWriter from group
  537. UA_DataSetWriter_deleteMembers(server, dataSetWriter);
  538. LIST_REMOVE(dataSetWriter, listEntry);
  539. UA_free(dataSetWriter);
  540. return UA_STATUSCODE_GOOD;
  541. }
  542. /**********************************************/
  543. /* DataSetField */
  544. /**********************************************/
  545. UA_StatusCode
  546. UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
  547. memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
  548. if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
  549. UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
  550. UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
  551. &dst->field.variable.publishParameters);
  552. } else {
  553. return UA_STATUSCODE_BADNOTSUPPORTED;
  554. }
  555. return UA_STATUSCODE_GOOD;
  556. }
  557. UA_StatusCode
  558. UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
  559. UA_DataSetFieldConfig *config) {
  560. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  561. if(!config)
  562. return UA_STATUSCODE_BADINVALIDARGUMENT;
  563. UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
  564. if(!currentDataSetField)
  565. return UA_STATUSCODE_BADNOTFOUND;
  566. UA_DataSetFieldConfig tmpFieldConfig;
  567. //deep copy of the actual config
  568. retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
  569. *config = tmpFieldConfig;
  570. return retVal;
  571. }
  572. UA_DataSetField *
  573. UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
  574. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  575. UA_DataSetField *tmpField;
  576. LIST_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
  577. if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
  578. return tmpField;
  579. }
  580. }
  581. }
  582. return NULL;
  583. }
  584. void
  585. UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig){
  586. if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
  587. UA_String_deleteMembers(&dataSetFieldConfig->field.variable.fieldNameAlias);
  588. UA_PublishedVariableDataType_deleteMembers(&dataSetFieldConfig->field.variable.publishParameters);
  589. }
  590. }
  591. static void
  592. UA_DataSetField_deleteMembers(UA_DataSetField *field) {
  593. UA_DataSetFieldConfig_deleteMembers(&field->config);
  594. //delete DataSetField
  595. UA_NodeId_deleteMembers(&field->identifier);
  596. UA_NodeId_deleteMembers(&field->publishedDataSet);
  597. UA_FieldMetaData_deleteMembers(&field->fieldMetaData);
  598. }
  599. /*********************************************************/
  600. /* PublishValues handling */
  601. /*********************************************************/
  602. /**
  603. * Compare two variants. Internally used for value change detection.
  604. *
  605. * @return true if the value has changed
  606. */
  607. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  608. static UA_Boolean
  609. valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
  610. if(! (oldValue && newValue))
  611. return false;
  612. UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
  613. size_t oldValueEncodingSize, newValueEncodingSize;
  614. oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
  615. newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
  616. if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
  617. return false;
  618. if(oldValueEncodingSize != newValueEncodingSize)
  619. return true;
  620. if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
  621. return false;
  622. if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
  623. return false;
  624. UA_Byte *bufPosOldValue = oldValueEncoding->data;
  625. const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
  626. UA_Byte *bufPosNewValue = newValueEncoding->data;
  627. const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
  628. if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
  629. &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  630. return false;
  631. }
  632. if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
  633. &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  634. return false;
  635. }
  636. oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
  637. newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
  638. UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
  639. UA_ByteString_delete(oldValueEncoding);
  640. UA_ByteString_delete(newValueEncoding);
  641. return compareResult;
  642. }
  643. #endif
  644. /**
  645. * Obtain the latest value for a specific DataSetField. This method is currently
  646. * called inside the DataSetMessage generation process.
  647. */
  648. static void
  649. UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
  650. UA_DataValue *value) {
  651. /* Read the value */
  652. UA_ReadValueId rvid;
  653. UA_ReadValueId_init(&rvid);
  654. rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
  655. rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
  656. rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
  657. *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
  658. }
  659. static UA_StatusCode
  660. UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  661. UA_DataSetWriter *dataSetWriter) {
  662. UA_PublishedDataSet *currentDataSet =
  663. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  664. if(!currentDataSet)
  665. return UA_STATUSCODE_BADNOTFOUND;
  666. /* Prepare DataSetMessageContent */
  667. dataSetMessage->header.dataSetMessageValid = true;
  668. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
  669. dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
  670. dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
  671. UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
  672. if(!dataSetMessage->data.keyFrameData.dataSetFields)
  673. return UA_STATUSCODE_BADOUTOFMEMORY;
  674. #ifdef UA_ENABLE_JSON_ENCODING
  675. /* json: insert fieldnames used as json keys */
  676. dataSetMessage->data.keyFrameData.fieldNames =
  677. (UA_String *)UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_STRING]);
  678. if(!dataSetMessage->data.keyFrameData.fieldNames)
  679. return UA_STATUSCODE_BADOUTOFMEMORY;
  680. #endif
  681. /* Loop over the fields */
  682. size_t counter = 0;
  683. UA_DataSetField *dsf;
  684. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  685. #ifdef UA_ENABLE_JSON_ENCODING
  686. /* json: store the fieldNameAlias*/
  687. UA_String_copy(&dsf->config.field.variable.fieldNameAlias,
  688. &dataSetMessage->data.keyFrameData.fieldNames[counter]);
  689. #endif
  690. /* Sample the value */
  691. UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
  692. UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
  693. /* Deactivate statuscode? */
  694. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  695. dfv->hasStatus = false;
  696. /* Deactivate timestamps */
  697. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  698. (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  699. dfv->hasSourceTimestamp = false;
  700. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  701. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  702. dfv->hasSourcePicoseconds = false;
  703. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  704. (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  705. dfv->hasServerTimestamp = false;
  706. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  707. (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  708. dfv->hasServerPicoseconds = false;
  709. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  710. /* Update lastValue store */
  711. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  712. UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
  713. #endif
  714. counter++;
  715. }
  716. return UA_STATUSCODE_GOOD;
  717. }
  718. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  719. static UA_StatusCode
  720. UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
  721. UA_DataSetMessage *dataSetMessage,
  722. UA_DataSetWriter *dataSetWriter) {
  723. UA_PublishedDataSet *currentDataSet =
  724. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  725. if(!currentDataSet)
  726. return UA_STATUSCODE_BADNOTFOUND;
  727. /* Prepare DataSetMessageContent */
  728. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  729. dataSetMessage->header.dataSetMessageValid = true;
  730. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
  731. UA_DataSetField *dsf;
  732. size_t counter = 0;
  733. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  734. /* Sample the value */
  735. UA_DataValue value;
  736. UA_DataValue_init(&value);
  737. UA_PubSubDataSetField_sampleValue(server, dsf, &value);
  738. /* Check if the value has changed */
  739. if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
  740. /* increase fieldCount for current delta message */
  741. dataSetMessage->data.deltaFrameData.fieldCount++;
  742. dataSetWriter->lastSamples[counter].valueChanged = true;
  743. /* Update last stored sample */
  744. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  745. dataSetWriter->lastSamples[counter].value = value;
  746. } else {
  747. UA_DataValue_deleteMembers(&value);
  748. dataSetWriter->lastSamples[counter].valueChanged = false;
  749. }
  750. counter++;
  751. }
  752. /* Allocate DeltaFrameFields */
  753. UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
  754. UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
  755. if(!deltaFields)
  756. return UA_STATUSCODE_BADOUTOFMEMORY;
  757. dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
  758. size_t currentDeltaField = 0;
  759. for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
  760. if(!dataSetWriter->lastSamples[i].valueChanged)
  761. continue;
  762. UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
  763. dff->fieldIndex = (UA_UInt16) i;
  764. UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
  765. dataSetWriter->lastSamples[i].valueChanged = false;
  766. /* Deactivate statuscode? */
  767. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  768. dff->fieldValue.hasStatus = false;
  769. /* Deactivate timestamps? */
  770. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  771. dff->fieldValue.hasSourceTimestamp = false;
  772. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) ==
  773. 0)
  774. dff->fieldValue.hasServerPicoseconds = false;
  775. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  776. dff->fieldValue.hasServerTimestamp = false;
  777. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) ==
  778. 0)
  779. dff->fieldValue.hasServerPicoseconds = false;
  780. currentDeltaField++;
  781. }
  782. return UA_STATUSCODE_GOOD;
  783. }
  784. #endif
  785. /**
  786. * Generate a DataSetMessage for the given writer.
  787. *
  788. * @param dataSetWriter ptr to corresponding writer
  789. * @return ptr to generated DataSetMessage
  790. */
  791. static UA_StatusCode
  792. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  793. UA_DataSetWriter *dataSetWriter) {
  794. UA_PublishedDataSet *currentDataSet =
  795. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  796. if(!currentDataSet)
  797. return UA_STATUSCODE_BADNOTFOUND;
  798. /* Reset the message */
  799. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  800. /* store messageType to switch between json or uadp (default) */
  801. UA_UInt16 messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  802. UA_JsonDataSetWriterMessageDataType *jsonDataSetWriterMessageDataType = NULL;
  803. /* The configuration Flags are included
  804. * inside the std. defined UA_UadpDataSetWriterMessageDataType */
  805. UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
  806. UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
  807. if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  808. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  809. (dataSetWriter->config.messageSettings.content.decoded.type ==
  810. &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
  811. dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
  812. dataSetWriter->config.messageSettings.content.decoded.data;
  813. /* type is UADP */
  814. messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  815. } else if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  816. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  817. (dataSetWriter->config.messageSettings.content.decoded.type ==
  818. &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE])) {
  819. jsonDataSetWriterMessageDataType = (UA_JsonDataSetWriterMessageDataType *)
  820. dataSetWriter->config.messageSettings.content.decoded.data;
  821. /* type is JSON */
  822. messageType = UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE;
  823. } else {
  824. /* create default flag configuration if no
  825. * UadpDataSetWriterMessageDataType was passed in */
  826. memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
  827. defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
  828. ((u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
  829. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
  830. dataSetWriterMessageDataType = &defaultUadpConfiguration;
  831. }
  832. /* Sanity-test the configuration */
  833. if(dataSetWriterMessageDataType &&
  834. (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
  835. dataSetWriterMessageDataType->dataSetOffset != 0 ||
  836. dataSetWriterMessageDataType->configuredSize != 0)) {
  837. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  838. "Static DSM configuration not supported. Using defaults");
  839. dataSetWriterMessageDataType->networkMessageNumber = 0;
  840. dataSetWriterMessageDataType->dataSetOffset = 0;
  841. dataSetWriterMessageDataType->configuredSize = 0;
  842. }
  843. /* The field encoding depends on the flags inside the writer config.
  844. * TODO: This can be moved to the encoding layer. */
  845. if((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_RAWDATAENCODING) {
  846. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
  847. } else if((u64)dataSetWriter->config.dataSetFieldContentMask &
  848. ((u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
  849. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
  850. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
  851. } else {
  852. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
  853. }
  854. if(messageType == UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE) {
  855. /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
  856. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  857. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION) {
  858. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  859. dataSetMessage->header.configVersionMajorVersion =
  860. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  861. }
  862. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  863. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION) {
  864. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  865. dataSetMessage->header.configVersionMinorVersion =
  866. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  867. }
  868. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  869. (u64)UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  870. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  871. dataSetMessage->header.dataSetMessageSequenceNr =
  872. dataSetWriter->actualDataSetMessageSequenceCount;
  873. }
  874. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  875. (u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  876. dataSetMessage->header.timestampEnabled = true;
  877. dataSetMessage->header.timestamp = UA_DateTime_now();
  878. }
  879. /* TODO: Picoseconds resolution not supported atm */
  880. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  881. (u64)UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
  882. dataSetMessage->header.picoSecondsIncluded = false;
  883. }
  884. /* TODO: Statuscode not supported yet */
  885. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  886. (u64)UA_UADPDATASETMESSAGECONTENTMASK_STATUS) {
  887. dataSetMessage->header.statusEnabled = false;
  888. }
  889. } else if(messageType == UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE) {
  890. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  891. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  892. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  893. dataSetMessage->header.configVersionMajorVersion =
  894. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  895. }
  896. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  897. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  898. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  899. dataSetMessage->header.configVersionMinorVersion =
  900. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  901. }
  902. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  903. (u64)UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  904. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  905. dataSetMessage->header.dataSetMessageSequenceNr =
  906. dataSetWriter->actualDataSetMessageSequenceCount;
  907. }
  908. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  909. (u64)UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  910. dataSetMessage->header.timestampEnabled = true;
  911. dataSetMessage->header.timestamp = UA_DateTime_now();
  912. }
  913. /* TODO: Statuscode not supported yet */
  914. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  915. (u64)UA_JSONDATASETMESSAGECONTENTMASK_STATUS) {
  916. dataSetMessage->header.statusEnabled = false;
  917. }
  918. }
  919. /* Set the sequence count. Automatically rolls over to zero */
  920. dataSetWriter->actualDataSetMessageSequenceCount++;
  921. /* JSON does not differ between deltaframes and keyframes, only keyframes are currently used. */
  922. if(messageType != UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
  923. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  924. /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
  925. if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
  926. dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
  927. /* Remove old samples */
  928. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
  929. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  930. /* Realloc pds dependent memory */
  931. dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
  932. UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
  933. UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  934. if(!newSamplesArray)
  935. return UA_STATUSCODE_BADOUTOFMEMORY;
  936. dataSetWriter->lastSamples = newSamplesArray;
  937. memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  938. dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
  939. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  940. dataSetWriter->deltaFrameCounter = 0;
  941. return UA_STATUSCODE_GOOD;
  942. }
  943. /* The standard defines: if a PDS contains only one fields no delta messages
  944. * should be generated because they need more memory than a keyframe with 1
  945. * field. */
  946. if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
  947. dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
  948. UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
  949. dataSetWriter->deltaFrameCounter++;
  950. return UA_STATUSCODE_GOOD;
  951. }
  952. dataSetWriter->deltaFrameCounter = 1;
  953. #endif
  954. }
  955. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  956. return UA_STATUSCODE_GOOD;
  957. }
  958. static UA_StatusCode
  959. sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
  960. UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
  961. UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
  962. #ifdef UA_ENABLE_JSON_ENCODING
  963. UA_NetworkMessage nm;
  964. memset(&nm, 0, sizeof(UA_NetworkMessage));
  965. nm.version = 1;
  966. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  967. nm.payloadHeaderEnabled = true;
  968. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  969. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  970. nm.payload.dataSetPayload.dataSetMessages = dsm;
  971. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  972. UA_ByteString buf;
  973. size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true);
  974. size_t stackSize = 1;
  975. if(msgSize <= UA_MAX_STACKBUF)
  976. stackSize = msgSize;
  977. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  978. buf.data = stackBuf;
  979. buf.length = msgSize;
  980. if(msgSize > UA_MAX_STACKBUF) {
  981. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  982. if(retval != UA_STATUSCODE_GOOD)
  983. return retval;
  984. }
  985. /* Encode the message */
  986. UA_Byte *bufPos = buf.data;
  987. memset(bufPos, 0, msgSize);
  988. const UA_Byte *bufEnd = &buf.data[buf.length];
  989. retval = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true);
  990. if(retval != UA_STATUSCODE_GOOD) {
  991. if(msgSize > UA_MAX_STACKBUF)
  992. UA_ByteString_deleteMembers(&buf);
  993. return retval;
  994. }
  995. /* Send the prepared messages */
  996. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  997. if(msgSize > UA_MAX_STACKBUF)
  998. UA_ByteString_deleteMembers(&buf);
  999. #endif
  1000. return retval;
  1001. }
  1002. static UA_StatusCode
  1003. sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
  1004. UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
  1005. UA_ExtensionObject *messageSettings,
  1006. UA_ExtensionObject *transportSettings) {
  1007. if(messageSettings->content.decoded.type !=
  1008. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
  1009. return UA_STATUSCODE_BADINTERNALERROR;
  1010. UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
  1011. messageSettings->content.decoded.data;
  1012. UA_NetworkMessage nm;
  1013. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1014. nm.publisherIdEnabled =
  1015. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
  1016. nm.groupHeaderEnabled =
  1017. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
  1018. nm.groupHeader.writerGroupIdEnabled =
  1019. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
  1020. nm.groupHeader.groupVersionEnabled =
  1021. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
  1022. nm.groupHeader.networkMessageNumberEnabled =
  1023. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
  1024. nm.groupHeader.sequenceNumberEnabled =
  1025. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
  1026. nm.payloadHeaderEnabled =
  1027. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
  1028. nm.timestampEnabled =
  1029. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
  1030. nm.picosecondsEnabled =
  1031. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
  1032. nm.dataSetClassIdEnabled =
  1033. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
  1034. nm.promotedFieldsEnabled =
  1035. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
  1036. nm.version = 1;
  1037. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1038. if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
  1039. nm.publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
  1040. nm.publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
  1041. } else if (connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
  1042. nm.publisherIdType = UA_PUBLISHERDATATYPE_STRING;
  1043. nm.publisherId.publisherIdString = connection->config->publisherId.string;
  1044. }
  1045. /* Compute the length of the dsm separately for the header */
  1046. UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
  1047. for(UA_Byte i = 0; i < dsmCount; i++)
  1048. dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
  1049. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1050. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1051. nm.groupHeader.writerGroupId = wg->config.writerGroupId;
  1052. nm.groupHeader.networkMessageNumber = 1;
  1053. nm.payload.dataSetPayload.sizes = dsmLengths;
  1054. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1055. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1056. UA_ByteString buf;
  1057. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
  1058. size_t stackSize = 1;
  1059. if(msgSize <= UA_MAX_STACKBUF)
  1060. stackSize = msgSize;
  1061. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1062. buf.data = stackBuf;
  1063. buf.length = msgSize;
  1064. UA_StatusCode retval;
  1065. if(msgSize > UA_MAX_STACKBUF) {
  1066. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1067. if(retval != UA_STATUSCODE_GOOD)
  1068. return retval;
  1069. }
  1070. /* Encode the message */
  1071. UA_Byte *bufPos = buf.data;
  1072. memset(bufPos, 0, msgSize);
  1073. const UA_Byte *bufEnd = &buf.data[buf.length];
  1074. retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
  1075. if(retval != UA_STATUSCODE_GOOD) {
  1076. if(msgSize > UA_MAX_STACKBUF)
  1077. UA_ByteString_deleteMembers(&buf);
  1078. return retval;
  1079. }
  1080. /* Send the prepared messages */
  1081. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1082. if(msgSize > UA_MAX_STACKBUF)
  1083. UA_ByteString_deleteMembers(&buf);
  1084. return retval;
  1085. }
  1086. /* This callback triggers the collection and publish of NetworkMessages and the
  1087. * contained DataSetMessages. */
  1088. void
  1089. UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1090. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Publish Callback");
  1091. if(!writerGroup) {
  1092. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1093. "Publish failed. WriterGroup not found");
  1094. return;
  1095. }
  1096. /* Nothing to do? */
  1097. if(writerGroup->writersCount <= 0)
  1098. return;
  1099. /* Binary or Json encoding? */
  1100. if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
  1101. writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
  1102. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1103. "Publish failed: Unknown encoding type.");
  1104. return;
  1105. }
  1106. /* Find the connection associated with the writer */
  1107. UA_PubSubConnection *connection =
  1108. UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
  1109. if(!connection) {
  1110. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1111. "Publish failed. PubSubConnection invalid.");
  1112. return;
  1113. }
  1114. /* How many DSM can be sent in one NM? */
  1115. UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
  1116. if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
  1117. maxDSM = UA_BYTE_MAX;
  1118. /* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
  1119. if(maxDSM == 0)
  1120. maxDSM = 1;
  1121. /* It is possible to put several DataSetMessages into one NetworkMessage.
  1122. * But only if they do not contain promoted fields. NM with only DSM are
  1123. * sent out right away. The others are kept in a buffer for "batching". */
  1124. size_t dsmCount = 0;
  1125. UA_DataSetWriter *dsw;
  1126. UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
  1127. UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
  1128. LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
  1129. /* Find the dataset */
  1130. UA_PublishedDataSet *pds =
  1131. UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
  1132. if(!pds) {
  1133. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1134. "PubSub Publish: PublishedDataSet not found");
  1135. continue;
  1136. }
  1137. /* Generate the DSM */
  1138. UA_StatusCode res =
  1139. UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
  1140. if(res != UA_STATUSCODE_GOOD) {
  1141. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1142. "PubSub Publish: DataSetMessage creation failed");
  1143. continue;
  1144. }
  1145. /* Send right away if there is only this DSM in a NM. If promoted fields
  1146. * are contained in the PublishedDataSet, then this DSM must go into a
  1147. * dedicated NM as well. */
  1148. if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
  1149. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1150. res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
  1151. &dsw->config.dataSetWriterId, 1,
  1152. &writerGroup->config.messageSettings,
  1153. &writerGroup->config.transportSettings);
  1154. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1155. res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
  1156. &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
  1157. }
  1158. if(res != UA_STATUSCODE_GOOD)
  1159. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1160. "PubSub Publish: Could not send a NetworkMessage");
  1161. UA_DataSetMessage_free(&dsmStore[dsmCount]);
  1162. continue;
  1163. }
  1164. dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
  1165. dsmCount++;
  1166. }
  1167. /* Send the NetworkMessages with batched DataSetMessages */
  1168. size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
  1169. for(UA_UInt32 i = 0; i < nmCount; i++) {
  1170. UA_Byte nmDsmCount = maxDSM;
  1171. if(i == nmCount - 1)
  1172. nmDsmCount = (UA_Byte)dsmCount % maxDSM;
  1173. UA_StatusCode res3 = UA_STATUSCODE_GOOD;
  1174. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1175. res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
  1176. &dsWriterIds[i * maxDSM], nmDsmCount,
  1177. &writerGroup->config.messageSettings,
  1178. &writerGroup->config.transportSettings);
  1179. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1180. res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
  1181. &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
  1182. }
  1183. if(res3 != UA_STATUSCODE_GOOD)
  1184. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1185. "PubSub Publish: Sending a NetworkMessage failed");
  1186. }
  1187. /* Clean up DSM */
  1188. for(size_t i = 0; i < dsmCount; i++)
  1189. UA_DataSetMessage_free(&dsmStore[i]);
  1190. }
  1191. /* Add new publishCallback. The first execution is triggered directly after
  1192. * creation. */
  1193. UA_StatusCode
  1194. UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1195. UA_StatusCode retval =
  1196. UA_PubSubManager_addRepeatedCallback(server,
  1197. (UA_ServerCallback) UA_WriterGroup_publishCallback,
  1198. writerGroup, writerGroup->config.publishingInterval,
  1199. &writerGroup->publishCallbackId);
  1200. if(retval == UA_STATUSCODE_GOOD)
  1201. writerGroup->publishCallbackIsRegistered = true;
  1202. /* Run once after creation */
  1203. UA_WriterGroup_publishCallback(server, writerGroup);
  1204. return retval;
  1205. }
  1206. #endif /* UA_ENABLE_PUBSUB */