ua_pubsub.c 52 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. */
  7. #include "ua_types_encoding_binary.h"
  8. #include "ua_server_pubsub.h"
  9. #include "server/ua_server_internal.h"
  10. #include "ua_pubsub.h"
  11. #include "ua_pubsub_manager.h"
  12. #include "ua_pubsub_networkmessage.h"
  13. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  14. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  15. #include "ua_pubsub_ns0.h"
  16. #endif
  17. /**********************************************/
  18. /* Connection */
  19. /**********************************************/
  20. UA_StatusCode
  21. UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
  22. UA_PubSubConnectionConfig *dst) {
  23. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  24. memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
  25. retVal |= UA_String_copy(&src->name, &dst->name);
  26. retVal |= UA_Variant_copy(&src->address, &dst->address);
  27. retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
  28. retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
  29. if(src->connectionPropertiesSize > 0){
  30. dst->connectionProperties = (UA_KeyValuePair *)
  31. UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
  32. if(!dst->connectionProperties){
  33. return UA_STATUSCODE_BADOUTOFMEMORY;
  34. }
  35. for(size_t i = 0; i < src->connectionPropertiesSize; i++){
  36. retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
  37. &dst->connectionProperties[i].key);
  38. retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
  39. &dst->connectionProperties[i].value);
  40. }
  41. }
  42. return retVal;
  43. }
  44. UA_StatusCode
  45. UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
  46. UA_PubSubConnectionConfig *config) {
  47. if(!config)
  48. return UA_STATUSCODE_BADINVALIDARGUMENT;
  49. UA_PubSubConnection *currentPubSubConnection =
  50. UA_PubSubConnection_findConnectionbyId(server, connection);
  51. if(!currentPubSubConnection)
  52. return UA_STATUSCODE_BADNOTFOUND;
  53. UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
  54. //deep copy of the actual config
  55. UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
  56. *config = tmpPubSubConnectionConfig;
  57. return UA_STATUSCODE_GOOD;
  58. }
  59. UA_PubSubConnection *
  60. UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
  61. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  62. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
  63. return &server->pubSubManager.connections[i];
  64. }
  65. }
  66. return NULL;
  67. }
  68. void
  69. UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig) {
  70. UA_String_deleteMembers(&connectionConfig->name);
  71. UA_String_deleteMembers(&connectionConfig->transportProfileUri);
  72. UA_Variant_deleteMembers(&connectionConfig->connectionTransportSettings);
  73. UA_Variant_deleteMembers(&connectionConfig->address);
  74. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  75. UA_QualifiedName_deleteMembers(&connectionConfig->connectionProperties[i].key);
  76. UA_Variant_deleteMembers(&connectionConfig->connectionProperties[i].value);
  77. }
  78. UA_free(connectionConfig->connectionProperties);
  79. }
  80. void
  81. UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection) {
  82. //delete connection config
  83. UA_PubSubConnectionConfig_deleteMembers(connection->config);
  84. //remove contained WriterGroups
  85. UA_WriterGroup *writerGroup, *tmpWriterGroup;
  86. LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
  87. UA_Server_removeWriterGroup(server, writerGroup->identifier);
  88. }
  89. UA_NodeId_deleteMembers(&connection->identifier);
  90. if(connection->channel){
  91. connection->channel->close(connection->channel);
  92. }
  93. UA_free(connection->config);
  94. }
  95. UA_StatusCode
  96. UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
  97. const UA_WriterGroupConfig *writerGroupConfig,
  98. UA_NodeId *writerGroupIdentifier) {
  99. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  100. if(!writerGroupConfig)
  101. return UA_STATUSCODE_BADINVALIDARGUMENT;
  102. //search the connection by the given connectionIdentifier
  103. UA_PubSubConnection *currentConnectionContext =
  104. UA_PubSubConnection_findConnectionbyId(server, connection);
  105. if(!currentConnectionContext)
  106. return UA_STATUSCODE_BADNOTFOUND;
  107. //allocate memory for new WriterGroup
  108. UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
  109. if (!newWriterGroup)
  110. return UA_STATUSCODE_BADOUTOFMEMORY;
  111. newWriterGroup->linkedConnection = currentConnectionContext->identifier;
  112. UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
  113. if(writerGroupIdentifier){
  114. UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
  115. }
  116. UA_WriterGroupConfig tmpWriterGroupConfig;
  117. //deep copy of the config
  118. retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
  119. newWriterGroup->config = tmpWriterGroupConfig;
  120. retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
  121. LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
  122. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  123. addWriterGroupRepresentation(server, newWriterGroup);
  124. #endif
  125. return retVal;
  126. }
  127. UA_StatusCode
  128. UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
  129. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  130. if(!wg)
  131. return UA_STATUSCODE_BADNOTFOUND;
  132. UA_PubSubConnection *connection =
  133. UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  134. if(!connection)
  135. return UA_STATUSCODE_BADNOTFOUND;
  136. //unregister the publish callback
  137. if(UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId) != UA_STATUSCODE_GOOD)
  138. return UA_STATUSCODE_BADINTERNALERROR;
  139. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  140. removeWriterGroupRepresentation(server, wg);
  141. #endif
  142. UA_WriterGroup_deleteMembers(server, wg);
  143. UA_free(wg);
  144. return UA_STATUSCODE_GOOD;
  145. }
  146. /**********************************************/
  147. /* PublishedDataSet */
  148. /**********************************************/
  149. UA_StatusCode
  150. UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
  151. UA_PublishedDataSetConfig *dst) {
  152. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  153. memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
  154. retVal |= UA_String_copy(&src->name, &dst->name);
  155. switch(src->publishedDataSetType){
  156. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  157. //no additional items
  158. break;
  159. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  160. if (src->config.itemsTemplate.variablesToAddSize > 0){
  161. dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
  162. src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
  163. }
  164. for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
  165. retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
  166. &dst->config.itemsTemplate.variablesToAdd[i]);
  167. }
  168. retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
  169. &dst->config.itemsTemplate.metaData);
  170. break;
  171. default:
  172. return UA_STATUSCODE_BADINVALIDARGUMENT;
  173. }
  174. return retVal;
  175. }
  176. UA_StatusCode
  177. UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
  178. UA_PublishedDataSetConfig *config){
  179. if(!config)
  180. return UA_STATUSCODE_BADINVALIDARGUMENT;
  181. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  182. if(!currentPublishedDataSet)
  183. return UA_STATUSCODE_BADNOTFOUND;
  184. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  185. //deep copy of the actual config
  186. UA_PublishedDataSetConfig_copy(&currentPublishedDataSet->config, &tmpPublishedDataSetConfig);
  187. *config = tmpPublishedDataSetConfig;
  188. return UA_STATUSCODE_GOOD;
  189. }
  190. UA_PublishedDataSet *
  191. UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
  192. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  193. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
  194. return &server->pubSubManager.publishedDataSets[i];
  195. }
  196. }
  197. return NULL;
  198. }
  199. void
  200. UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
  201. //delete pds config
  202. UA_String_deleteMembers(&pdsConfig->name);
  203. switch (pdsConfig->publishedDataSetType){
  204. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  205. //no additional items
  206. break;
  207. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  208. if (pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
  209. for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
  210. UA_PublishedVariableDataType_deleteMembers(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
  211. }
  212. UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
  213. }
  214. UA_DataSetMetaDataType_deleteMembers(&pdsConfig->config.itemsTemplate.metaData);
  215. break;
  216. default:
  217. break;
  218. }
  219. }
  220. void
  221. UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
  222. UA_PublishedDataSetConfig_deleteMembers(&publishedDataSet->config);
  223. //delete PDS
  224. UA_DataSetMetaDataType_deleteMembers(&publishedDataSet->dataSetMetaData);
  225. UA_DataSetField *field, *tmpField;
  226. LIST_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
  227. UA_Server_removeDataSetField(server, field->identifier);
  228. }
  229. UA_NodeId_deleteMembers(&publishedDataSet->identifier);
  230. }
  231. UA_DataSetFieldResult
  232. UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
  233. const UA_DataSetFieldConfig *fieldConfig,
  234. UA_NodeId *fieldIdentifier) {
  235. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  236. if(!fieldConfig)
  237. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
  238. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
  239. if(currentDataSet == NULL)
  240. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  241. if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS)
  242. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTIMPLEMENTED, {0, 0}};
  243. UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
  244. if(!newField)
  245. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADINTERNALERROR, {0, 0}};
  246. UA_DataSetFieldConfig tmpFieldConfig;
  247. retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
  248. newField->config = tmpFieldConfig;
  249. UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
  250. if(fieldIdentifier != NULL){
  251. UA_NodeId_copy(&newField->identifier, fieldIdentifier);
  252. }
  253. newField->publishedDataSet = currentDataSet->identifier;
  254. //update major version of parent published data set
  255. currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
  256. LIST_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
  257. if(newField->config.field.variable.promotedField)
  258. currentDataSet->promotedFieldsCount++;
  259. currentDataSet->fieldSize++;
  260. UA_DataSetFieldResult result =
  261. {retVal, {currentDataSet->dataSetMetaData.configurationVersion.majorVersion,
  262. currentDataSet->dataSetMetaData.configurationVersion.minorVersion}};
  263. return result;
  264. }
  265. UA_DataSetFieldResult
  266. UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
  267. UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
  268. if(!currentField)
  269. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  270. UA_PublishedDataSet *parentPublishedDataSet =
  271. UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
  272. if(!parentPublishedDataSet)
  273. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  274. parentPublishedDataSet->fieldSize--;
  275. if(currentField->config.field.variable.promotedField)
  276. parentPublishedDataSet->promotedFieldsCount--;
  277. /* update major version of PublishedDataSet */
  278. parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
  279. UA_PubSubConfigurationVersionTimeDifference();
  280. UA_DataSetField_deleteMembers(currentField);
  281. UA_free(currentField);
  282. UA_DataSetFieldResult result =
  283. {UA_STATUSCODE_GOOD, {parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion,
  284. parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion}};
  285. return result;
  286. }
  287. /**********************************************/
  288. /* DataSetWriter */
  289. /**********************************************/
  290. UA_StatusCode
  291. UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
  292. UA_DataSetWriterConfig *dst){
  293. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  294. memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
  295. retVal |= UA_String_copy(&src->name, &dst->name);
  296. retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
  297. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  298. dst->dataSetWriterProperties = (UA_KeyValuePair *)
  299. UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
  300. if(!dst->dataSetWriterProperties)
  301. return UA_STATUSCODE_BADOUTOFMEMORY;
  302. for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
  303. retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
  304. }
  305. return retVal;
  306. }
  307. UA_StatusCode
  308. UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
  309. UA_DataSetWriterConfig *config){
  310. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  311. if(!config)
  312. return UA_STATUSCODE_BADINVALIDARGUMENT;
  313. UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  314. if(!currentDataSetWriter)
  315. return UA_STATUSCODE_BADNOTFOUND;
  316. UA_DataSetWriterConfig tmpWriterConfig;
  317. //deep copy of the actual config
  318. retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
  319. *config = tmpWriterConfig;
  320. return retVal;
  321. }
  322. UA_DataSetWriter *
  323. UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
  324. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  325. UA_WriterGroup *tmpWriterGroup;
  326. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
  327. UA_DataSetWriter *tmpWriter;
  328. LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
  329. if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
  330. return tmpWriter;
  331. }
  332. }
  333. }
  334. }
  335. return NULL;
  336. }
  337. void
  338. UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig) {
  339. UA_String_deleteMembers(&pdsConfig->name);
  340. UA_String_deleteMembers(&pdsConfig->dataSetName);
  341. for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
  342. UA_KeyValuePair_deleteMembers(&pdsConfig->dataSetWriterProperties[i]);
  343. }
  344. UA_free(pdsConfig->dataSetWriterProperties);
  345. UA_ExtensionObject_deleteMembers(&pdsConfig->messageSettings);
  346. }
  347. void
  348. UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter){
  349. UA_DataSetWriterConfig_deleteMembers(&dataSetWriter->config);
  350. //delete DataSetWriter
  351. UA_NodeId_deleteMembers(&dataSetWriter->identifier);
  352. UA_NodeId_deleteMembers(&dataSetWriter->linkedWriterGroup);
  353. UA_NodeId_deleteMembers(&dataSetWriter->connectedDataSet);
  354. LIST_REMOVE(dataSetWriter, listEntry);
  355. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  356. //delete lastSamples store
  357. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++){
  358. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  359. }
  360. UA_free(dataSetWriter->lastSamples);
  361. dataSetWriter->lastSamples = NULL;
  362. dataSetWriter->lastSamplesCount = 0;
  363. #endif
  364. }
  365. /**********************************************/
  366. /* WriterGroup */
  367. /**********************************************/
  368. UA_StatusCode
  369. UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
  370. UA_WriterGroupConfig *dst){
  371. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  372. memcpy(dst, src, sizeof(UA_WriterGroupConfig));
  373. retVal |= UA_String_copy(&src->name, &dst->name);
  374. retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
  375. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  376. dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
  377. if(!dst->groupProperties)
  378. return UA_STATUSCODE_BADOUTOFMEMORY;
  379. for(size_t i = 0; i < src->groupPropertiesSize; i++){
  380. retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
  381. }
  382. return retVal;
  383. }
  384. UA_StatusCode
  385. UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
  386. UA_WriterGroupConfig *config){
  387. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  388. if(!config)
  389. return UA_STATUSCODE_BADINVALIDARGUMENT;
  390. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
  391. if(!currentWriterGroup){
  392. return UA_STATUSCODE_BADNOTFOUND;
  393. }
  394. UA_WriterGroupConfig tmpWriterGroupConfig;
  395. //deep copy of the actual config
  396. retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
  397. *config = tmpWriterGroupConfig;
  398. return retVal;
  399. }
  400. UA_StatusCode
  401. UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
  402. const UA_WriterGroupConfig *config){
  403. if(!config)
  404. return UA_STATUSCODE_BADINVALIDARGUMENT;
  405. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
  406. if(!currentWriterGroup)
  407. return UA_STATUSCODE_BADNOTFOUND;
  408. //The update functionality will be extended during the next PubSub batches.
  409. //Currently is only a change of the publishing interval possible.
  410. if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
  411. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
  412. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  413. UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
  414. } else if (currentWriterGroup->config.priority != config->priority) {
  415. UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "No or unsupported WriterGroup update.");
  416. }
  417. return UA_STATUSCODE_GOOD;
  418. }
  419. UA_WriterGroup *
  420. UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
  421. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  422. UA_WriterGroup *tmpWriterGroup;
  423. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
  424. if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
  425. return tmpWriterGroup;
  426. }
  427. }
  428. }
  429. return NULL;
  430. }
  431. void
  432. UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig){
  433. //delete writerGroup config
  434. UA_String_deleteMembers(&writerGroupConfig->name);
  435. UA_ExtensionObject_deleteMembers(&writerGroupConfig->transportSettings);
  436. UA_ExtensionObject_deleteMembers(&writerGroupConfig->messageSettings);
  437. for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
  438. UA_KeyValuePair_deleteMembers(&writerGroupConfig->groupProperties[i]);
  439. }
  440. UA_free(writerGroupConfig->groupProperties);
  441. }
  442. void
  443. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup) {
  444. UA_WriterGroupConfig_deleteMembers(&writerGroup->config);
  445. //delete WriterGroup
  446. //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
  447. UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
  448. LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
  449. UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
  450. }
  451. LIST_REMOVE(writerGroup, listEntry);
  452. UA_NodeId_deleteMembers(&writerGroup->linkedConnection);
  453. UA_NodeId_deleteMembers(&writerGroup->identifier);
  454. }
  455. UA_StatusCode
  456. UA_Server_addDataSetWriter(UA_Server *server,
  457. const UA_NodeId writerGroup, const UA_NodeId dataSet,
  458. const UA_DataSetWriterConfig *dataSetWriterConfig,
  459. UA_NodeId *writerIdentifier) {
  460. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  461. if(!dataSetWriterConfig)
  462. return UA_STATUSCODE_BADINVALIDARGUMENT;
  463. UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
  464. if(!currentDataSetContext)
  465. return UA_STATUSCODE_BADNOTFOUND;
  466. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  467. if(!wg)
  468. return UA_STATUSCODE_BADNOTFOUND;
  469. UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
  470. if(!newDataSetWriter)
  471. return UA_STATUSCODE_BADOUTOFMEMORY;
  472. //copy the config into the new dataSetWriter
  473. UA_DataSetWriterConfig tmpDataSetWriterConfig;
  474. retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
  475. newDataSetWriter->config = tmpDataSetWriterConfig;
  476. //save the current version of the connected PublishedDataSet
  477. newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
  478. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  479. //initialize the queue for the last values
  480. newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
  481. UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
  482. if(!newDataSetWriter->lastSamples) {
  483. UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
  484. UA_free(newDataSetWriter);
  485. return UA_STATUSCODE_BADOUTOFMEMORY;
  486. }
  487. newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
  488. #endif
  489. //connect PublishedDataSet with DataSetWriter
  490. newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
  491. newDataSetWriter->linkedWriterGroup = wg->identifier;
  492. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
  493. if(writerIdentifier != NULL)
  494. UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
  495. //add the new writer to the group
  496. LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
  497. wg->writersCount++;
  498. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  499. addDataSetWriterRepresentation(server, newDataSetWriter);
  500. #endif
  501. return retVal;
  502. }
  503. UA_StatusCode
  504. UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
  505. UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  506. if(!dataSetWriter)
  507. return UA_STATUSCODE_BADNOTFOUND;
  508. UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
  509. if(!linkedWriterGroup)
  510. return UA_STATUSCODE_BADNOTFOUND;
  511. linkedWriterGroup->writersCount--;
  512. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  513. removeDataSetWriterRepresentation(server, dataSetWriter);
  514. #endif
  515. //remove DataSetWriter from group
  516. UA_DataSetWriter_deleteMembers(server, dataSetWriter);
  517. UA_free(dataSetWriter);
  518. return UA_STATUSCODE_GOOD;
  519. }
  520. /**********************************************/
  521. /* DataSetField */
  522. /**********************************************/
  523. UA_StatusCode
  524. UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
  525. memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
  526. if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
  527. UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
  528. UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
  529. &dst->field.variable.publishParameters);
  530. } else {
  531. return UA_STATUSCODE_BADNOTSUPPORTED;
  532. }
  533. return UA_STATUSCODE_GOOD;
  534. }
  535. UA_StatusCode
  536. UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
  537. UA_DataSetFieldConfig *config) {
  538. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  539. if(!config)
  540. return UA_STATUSCODE_BADINVALIDARGUMENT;
  541. UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
  542. if(!currentDataSetField)
  543. return UA_STATUSCODE_BADNOTFOUND;
  544. UA_DataSetFieldConfig tmpFieldConfig;
  545. //deep copy of the actual config
  546. retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
  547. *config = tmpFieldConfig;
  548. return retVal;
  549. }
  550. UA_DataSetField *
  551. UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
  552. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  553. UA_DataSetField *tmpField;
  554. LIST_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
  555. if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
  556. return tmpField;
  557. }
  558. }
  559. }
  560. return NULL;
  561. }
  562. void
  563. UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig){
  564. if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
  565. UA_String_deleteMembers(&dataSetFieldConfig->field.variable.fieldNameAlias);
  566. UA_PublishedVariableDataType_deleteMembers(&dataSetFieldConfig->field.variable.publishParameters);
  567. }
  568. }
  569. void UA_DataSetField_deleteMembers(UA_DataSetField *field) {
  570. UA_DataSetFieldConfig_deleteMembers(&field->config);
  571. //delete DataSetField
  572. UA_NodeId_deleteMembers(&field->identifier);
  573. UA_NodeId_deleteMembers(&field->publishedDataSet);
  574. UA_FieldMetaData_deleteMembers(&field->fieldMetaData);
  575. LIST_REMOVE(field, listEntry);
  576. }
  577. /*********************************************************/
  578. /* PublishValues handling */
  579. /*********************************************************/
  580. /**
  581. * Compare two variants. Internally used for value change detection.
  582. *
  583. * @return UA_TRUE if the value has changed
  584. */
  585. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  586. static UA_Boolean
  587. valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
  588. if(! (oldValue && newValue))
  589. return UA_FALSE;
  590. UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
  591. size_t oldValueEncodingSize, newValueEncodingSize;
  592. oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
  593. newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
  594. if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
  595. return UA_FALSE;
  596. if(oldValueEncodingSize != newValueEncodingSize)
  597. return UA_TRUE;
  598. if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
  599. return UA_FALSE;
  600. if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
  601. return UA_FALSE;
  602. UA_Byte *bufPosOldValue = oldValueEncoding->data;
  603. const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
  604. UA_Byte *bufPosNewValue = newValueEncoding->data;
  605. const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
  606. if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
  607. &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  608. return UA_FALSE;
  609. }
  610. if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
  611. &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  612. return UA_FALSE;
  613. }
  614. oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
  615. newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
  616. UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
  617. UA_ByteString_delete(oldValueEncoding);
  618. UA_ByteString_delete(newValueEncoding);
  619. return compareResult;
  620. }
  621. #endif
  622. /**
  623. * Obtain the latest value for a specific DataSetField. This method is currently
  624. * called inside the DataSetMessage generation process.
  625. */
  626. static void
  627. UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
  628. UA_DataValue *value) {
  629. /* Read the value */
  630. UA_ReadValueId rvid;
  631. UA_ReadValueId_init(&rvid);
  632. rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
  633. rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
  634. rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
  635. *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
  636. }
  637. static UA_StatusCode
  638. UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  639. UA_DataSetWriter *dataSetWriter) {
  640. UA_PublishedDataSet *currentDataSet =
  641. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  642. if(!currentDataSet)
  643. return UA_STATUSCODE_BADNOTFOUND;
  644. /* Prepare DataSetMessageContent */
  645. dataSetMessage->header.dataSetMessageValid = true;
  646. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
  647. dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
  648. dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
  649. UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
  650. if(!dataSetMessage->data.keyFrameData.dataSetFields)
  651. return UA_STATUSCODE_BADOUTOFMEMORY;
  652. /* Loop over the fields */
  653. size_t counter = 0;
  654. UA_DataSetField *dsf;
  655. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  656. /* Sample the value */
  657. UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
  658. UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
  659. /* Deactivate statuscode? */
  660. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  661. dfv->hasStatus = false;
  662. /* Deactivate timestamps */
  663. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  664. dfv->hasSourceTimestamp = false;
  665. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  666. dfv->hasSourcePicoseconds = false;
  667. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  668. dfv->hasServerTimestamp = false;
  669. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  670. dfv->hasServerPicoseconds = false;
  671. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  672. /* Update lastValue store */
  673. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  674. UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
  675. #endif
  676. counter++;
  677. }
  678. return UA_STATUSCODE_GOOD;
  679. }
  680. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  681. static UA_StatusCode
  682. UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
  683. UA_DataSetMessage *dataSetMessage,
  684. UA_DataSetWriter *dataSetWriter) {
  685. UA_PublishedDataSet *currentDataSet =
  686. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  687. if(!currentDataSet)
  688. return UA_STATUSCODE_BADNOTFOUND;
  689. /* Prepare DataSetMessageContent */
  690. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  691. dataSetMessage->header.dataSetMessageValid = true;
  692. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
  693. UA_DataSetField *dsf;
  694. size_t counter = 0;
  695. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  696. /* Sample the value */
  697. UA_DataValue value;
  698. UA_DataValue_init(&value);
  699. UA_PubSubDataSetField_sampleValue(server, dsf, &value);
  700. /* Check if the value has changed */
  701. if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
  702. /* increase fieldCount for current delta message */
  703. dataSetMessage->data.deltaFrameData.fieldCount++;
  704. dataSetWriter->lastSamples[counter].valueChanged = UA_TRUE;
  705. /* Update last stored sample */
  706. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  707. dataSetWriter->lastSamples[counter].value = value;
  708. } else {
  709. UA_DataValue_deleteMembers(&value);
  710. dataSetWriter->lastSamples[counter].valueChanged = UA_FALSE;
  711. }
  712. counter++;
  713. }
  714. /* Allocate DeltaFrameFields */
  715. UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
  716. UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
  717. if(!deltaFields)
  718. return UA_STATUSCODE_BADOUTOFMEMORY;
  719. dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
  720. size_t currentDeltaField = 0;
  721. for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
  722. if(!dataSetWriter->lastSamples[i].valueChanged)
  723. continue;
  724. UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
  725. dff->fieldIndex = (UA_UInt16) i;
  726. UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
  727. dataSetWriter->lastSamples[i].valueChanged = false;
  728. /* Deactivate statuscode? */
  729. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  730. dff->fieldValue.hasStatus = UA_FALSE;
  731. /* Deactivate timestamps? */
  732. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  733. dff->fieldValue.hasSourceTimestamp = UA_FALSE;
  734. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  735. dff->fieldValue.hasServerPicoseconds = UA_FALSE;
  736. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  737. dff->fieldValue.hasServerTimestamp = UA_FALSE;
  738. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  739. dff->fieldValue.hasServerPicoseconds = UA_FALSE;
  740. currentDeltaField++;
  741. }
  742. return UA_STATUSCODE_GOOD;
  743. }
  744. #endif
  745. /**
  746. * Generate a DataSetMessage for the given writer.
  747. *
  748. * @param dataSetWriter ptr to corresponding writer
  749. * @return ptr to generated DataSetMessage
  750. */
  751. static UA_StatusCode
  752. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  753. UA_DataSetWriter *dataSetWriter) {
  754. UA_PublishedDataSet *currentDataSet =
  755. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  756. if(!currentDataSet)
  757. return UA_STATUSCODE_BADNOTFOUND;
  758. /* Reset the message */
  759. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  760. /* Currently is only UADP supported. The configuration Flags are included
  761. * inside the std. defined UA_UadpDataSetWriterMessageDataType */
  762. UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
  763. UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
  764. if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  765. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  766. (dataSetWriter->config.messageSettings.content.decoded.type == &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
  767. dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
  768. dataSetWriter->config.messageSettings.content.decoded.data;
  769. } else {
  770. /* create default flag configuration if no
  771. * UadpDataSetWriterMessageDataType was passed in */
  772. memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
  773. defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
  774. (UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
  775. UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
  776. dataSetWriterMessageDataType = &defaultUadpConfiguration;
  777. }
  778. /* Sanity-test the configuration */
  779. if(dataSetWriterMessageDataType->networkMessageNumber != 0 ||
  780. dataSetWriterMessageDataType->dataSetOffset != 0 ||
  781. dataSetWriterMessageDataType->configuredSize !=0 ) {
  782. UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER,
  783. "Static DSM configuration not supported. Using defaults");
  784. dataSetWriterMessageDataType->networkMessageNumber = 0;
  785. dataSetWriterMessageDataType->dataSetOffset = 0;
  786. dataSetWriterMessageDataType->configuredSize = 0;
  787. }
  788. /* The field encoding depends on the flags inside the writer config.
  789. * TODO: This can be moved to the encoding layer. */
  790. if(dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_RAWDATAENCODING) {
  791. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
  792. } else if (dataSetWriter->config.dataSetFieldContentMask &
  793. (UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
  794. UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
  795. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
  796. } else {
  797. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
  798. }
  799. /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
  800. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION){
  801. dataSetMessage->header.configVersionMajorVersionEnabled = UA_TRUE;
  802. dataSetMessage->header.configVersionMajorVersion =
  803. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  804. }
  805. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION){
  806. dataSetMessage->header.configVersionMinorVersionEnabled = UA_TRUE;
  807. dataSetMessage->header.configVersionMinorVersion =
  808. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  809. }
  810. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  811. dataSetMessage->header.dataSetMessageSequenceNrEnabled = UA_TRUE;
  812. dataSetMessage->header.dataSetMessageSequenceNr =
  813. dataSetWriter->actualDataSetMessageSequenceCount;
  814. }
  815. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  816. dataSetMessage->header.timestampEnabled = UA_TRUE;
  817. dataSetMessage->header.timestamp = UA_DateTime_now();
  818. }
  819. /* TODO: Picoseconds resolution not supported atm */
  820. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
  821. dataSetMessage->header.picoSecondsIncluded = UA_FALSE;
  822. }
  823. /* TODO: Statuscode not supported yet */
  824. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_STATUS){
  825. dataSetMessage->header.statusEnabled = UA_FALSE;
  826. }
  827. /* Set the sequence count. Automatically rolls over to zero */
  828. dataSetWriter->actualDataSetMessageSequenceCount++;
  829. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  830. /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
  831. if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
  832. dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
  833. /* Remove old samples */
  834. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
  835. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  836. /* Realloc pds dependent memory */
  837. dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
  838. UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
  839. UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  840. if(!newSamplesArray)
  841. return UA_STATUSCODE_BADOUTOFMEMORY;
  842. dataSetWriter->lastSamples = newSamplesArray;
  843. memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  844. dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
  845. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  846. dataSetWriter->deltaFrameCounter = 0;
  847. return UA_STATUSCODE_GOOD;
  848. }
  849. /* The standard defines: if a PDS contains only one fields no delta messages
  850. * should be generated because they need more memory than a keyframe with 1
  851. * field. */
  852. if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
  853. dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
  854. UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
  855. dataSetWriter->deltaFrameCounter++;
  856. return UA_STATUSCODE_GOOD;
  857. }
  858. dataSetWriter->deltaFrameCounter = 1;
  859. #endif
  860. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  861. return UA_STATUSCODE_GOOD;
  862. }
  863. /*
  864. * This callback triggers the collection and publish of NetworkMessages and the contained DataSetMessages.
  865. */
  866. void
  867. UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  868. if(!writerGroup){
  869. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. WriterGroup not found");
  870. return;
  871. }
  872. if(writerGroup->writersCount <= 0)
  873. return;
  874. if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP) {
  875. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Unknown encoding type.");
  876. return;
  877. }
  878. //prevent error if the maxEncapsulatedDataSetMessageCount is set to 0->1
  879. writerGroup->config.maxEncapsulatedDataSetMessageCount = (UA_UInt16) (writerGroup->config.maxEncapsulatedDataSetMessageCount == 0 ||
  880. writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX
  881. ? 1 : writerGroup->config.maxEncapsulatedDataSetMessageCount);
  882. UA_DataSetMessage *dsmStore = (UA_DataSetMessage *) UA_calloc(writerGroup->writersCount, sizeof(UA_DataSetMessage));
  883. if(!dsmStore) {
  884. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage allocation failed");
  885. return;
  886. }
  887. memset(dsmStore, 0, sizeof(UA_DataSetMessage) * writerGroup->writersCount);
  888. //The binary DataSetMessage sizes are part of the payload. Memory is allocated on the stack.
  889. UA_STACKARRAY(UA_UInt16, dsmSizes, writerGroup->writersCount);
  890. memset(dsmSizes, 0, writerGroup->writersCount * sizeof(UA_UInt16));
  891. /*
  892. * Calculate the number of needed NetworkMessages. The previous allocated DataSetMessage array is
  893. * filled from left for combined DSM messages and from the right for single DSM.
  894. * Allocated DSM Array
  895. * +----------------------------+
  896. * |DSM1||DSM2||DSM3||DSM4||DSM5|
  897. * +--+----+-----+-----+-----+--+
  898. * | | | | |
  899. * | | | | |
  900. * +--v----v-----v-----v--+--v--+
  901. * | NM1 || NM2 | NM3 |
  902. * +----------------------+-----+
  903. * NetworkMessages
  904. */
  905. UA_UInt16 combinedNetworkMessageCount = 0, singleNetworkMessagesCount = 0;
  906. UA_DataSetWriter *tmpDataSetWriter;
  907. LIST_FOREACH(tmpDataSetWriter, &writerGroup->writers, listEntry){
  908. //if promoted fields are contained in the PublishedDataSet, then this DSM must encapsulated in one NM
  909. UA_PublishedDataSet *tmpPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, tmpDataSetWriter->connectedDataSet);
  910. if(!tmpPublishedDataSet) {
  911. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. PublishedDataSet not found");
  912. return;
  913. }
  914. if(tmpPublishedDataSet->promotedFieldsCount > 0) {
  915. if(UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[(writerGroup->writersCount - 1) - singleNetworkMessagesCount],
  916. tmpDataSetWriter) != UA_STATUSCODE_GOOD){
  917. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. DataSetMessage creation failed");
  918. return;
  919. };
  920. dsmSizes[(writerGroup->writersCount-1) - singleNetworkMessagesCount] = (UA_UInt16) UA_DataSetMessage_calcSizeBinary(&dsmStore[(writerGroup->writersCount-1)
  921. - singleNetworkMessagesCount]);
  922. singleNetworkMessagesCount++;
  923. } else {
  924. if(UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[combinedNetworkMessageCount], tmpDataSetWriter) != UA_STATUSCODE_GOOD){
  925. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. DataSetMessage creation failed");
  926. return;
  927. };
  928. dsmSizes[combinedNetworkMessageCount] = (UA_UInt16) UA_DataSetMessage_calcSizeBinary(&dsmStore[combinedNetworkMessageCount]);
  929. combinedNetworkMessageCount++;
  930. }
  931. }
  932. UA_UInt32 networkMessageCount = singleNetworkMessagesCount;
  933. if(combinedNetworkMessageCount != 0){
  934. combinedNetworkMessageCount = (UA_UInt16) (
  935. combinedNetworkMessageCount / writerGroup->config.maxEncapsulatedDataSetMessageCount +
  936. (combinedNetworkMessageCount % writerGroup->config.maxEncapsulatedDataSetMessageCount) == 0 ? 0 : 1);
  937. networkMessageCount += combinedNetworkMessageCount;
  938. }
  939. //Alloc memory for the NetworkMessages on the stack
  940. UA_STACKARRAY(UA_NetworkMessage, nmStore, networkMessageCount);
  941. memset(nmStore, 0, networkMessageCount * sizeof(UA_NetworkMessage));
  942. UA_UInt32 currentDSMPosition = 0;
  943. for(UA_UInt32 i = 0; i < networkMessageCount; i++) {
  944. nmStore[i].version = 1;
  945. nmStore[i].networkMessageType = UA_NETWORKMESSAGE_DATASET;
  946. //create combined NetworkMessages
  947. if(i < (networkMessageCount-singleNetworkMessagesCount)){
  948. if(combinedNetworkMessageCount - (i * writerGroup->config.maxEncapsulatedDataSetMessageCount)){
  949. if(combinedNetworkMessageCount == 1){
  950. nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) ((writerGroup->writersCount) - singleNetworkMessagesCount);
  951. currentDSMPosition = 0;
  952. } else {
  953. nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) writerGroup->config.maxEncapsulatedDataSetMessageCount;
  954. currentDSMPosition = i * writerGroup->config.maxEncapsulatedDataSetMessageCount;
  955. }
  956. //nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) writerGroup->config.maxEncapsulatedDataSetMessageCount;
  957. nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
  958. nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
  959. } else {
  960. currentDSMPosition = i * writerGroup->config.maxEncapsulatedDataSetMessageCount;
  961. nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) (currentDSMPosition - ((i - 1) * writerGroup->config.maxEncapsulatedDataSetMessageCount)); //attention cast from uint32 to byte
  962. nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
  963. nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
  964. }
  965. } else {///create single NetworkMessages (1 DSM per NM)
  966. nmStore[i].payloadHeader.dataSetPayloadHeader.count = 1;
  967. currentDSMPosition = (UA_UInt32) combinedNetworkMessageCount + (i - combinedNetworkMessageCount/writerGroup->config.maxEncapsulatedDataSetMessageCount
  968. + (combinedNetworkMessageCount % writerGroup->config.maxEncapsulatedDataSetMessageCount) == 0 ? 0 : 1);
  969. nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
  970. nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
  971. }
  972. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
  973. if(!connection){
  974. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. PubSubConnection invalid.");
  975. return;
  976. }
  977. //send the prepared messages
  978. UA_ByteString buf;
  979. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nmStore[i]);
  980. if(UA_ByteString_allocBuffer(&buf, msgSize) == UA_STATUSCODE_GOOD) {
  981. UA_Byte *bufPos = buf.data;
  982. memset(bufPos, 0, msgSize);
  983. const UA_Byte *bufEnd = &(buf.data[buf.length]);
  984. if(UA_NetworkMessage_encodeBinary(&nmStore[i], &bufPos, bufEnd) != UA_STATUSCODE_GOOD){
  985. UA_ByteString_deleteMembers(&buf);
  986. return;
  987. };
  988. connection->channel->send(connection->channel, NULL, &buf);
  989. }
  990. nmStore[i].payloadHeaderEnabled = UA_TRUE;
  991. //The stack allocated sizes field must be set to NULL to prevent invalid free.
  992. nmStore[i].payload.dataSetPayload.sizes = NULL;
  993. UA_ByteString_deleteMembers(&buf);
  994. UA_NetworkMessage_deleteMembers(&nmStore[i]);
  995. }
  996. }
  997. /*
  998. * Add new publishCallback. The first execution is triggered directly after creation.
  999. * @Warning - The duration (double) is currently casted to int. -> intervals smaller 1ms are not possible.
  1000. */
  1001. UA_StatusCode
  1002. UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1003. UA_StatusCode retval =
  1004. UA_PubSubManager_addRepeatedCallback(server, (UA_ServerCallback) UA_WriterGroup_publishCallback,
  1005. writerGroup, (UA_UInt32) writerGroup->config.publishingInterval,
  1006. &writerGroup->publishCallbackId);
  1007. if(retval == UA_STATUSCODE_GOOD)
  1008. writerGroup->publishCallbackIsRegistered = true;
  1009. //run once after creation
  1010. UA_WriterGroup_publishCallback(server, writerGroup);
  1011. return retval;
  1012. }
  1013. #endif /* UA_ENABLE_PUBSUB */