ua_pubsub.c 53 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. */
  7. #include "ua_types_encoding_binary.h"
  8. #include "ua_server_pubsub.h"
  9. #include "server/ua_server_internal.h"
  10. #include "ua_pubsub.h"
  11. #include "ua_pubsub_manager.h"
  12. #include "ua_pubsub_networkmessage.h"
  13. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  14. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  15. #include "ua_pubsub_ns0.h"
  16. #endif
  17. /**********************************************/
  18. /* Connection */
  19. /**********************************************/
  20. UA_StatusCode
  21. UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
  22. UA_PubSubConnectionConfig *dst) {
  23. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  24. memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
  25. retVal |= UA_String_copy(&src->name, &dst->name);
  26. retVal |= UA_Variant_copy(&src->address, &dst->address);
  27. retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
  28. retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
  29. if(src->connectionPropertiesSize > 0){
  30. dst->connectionProperties = (UA_KeyValuePair *)
  31. UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
  32. if(!dst->connectionProperties){
  33. return UA_STATUSCODE_BADOUTOFMEMORY;
  34. }
  35. for(size_t i = 0; i < src->connectionPropertiesSize; i++){
  36. retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
  37. &dst->connectionProperties[i].key);
  38. retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
  39. &dst->connectionProperties[i].value);
  40. }
  41. }
  42. return retVal;
  43. }
  44. UA_StatusCode
  45. UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
  46. UA_PubSubConnectionConfig *config) {
  47. if(!config)
  48. return UA_STATUSCODE_BADINVALIDARGUMENT;
  49. UA_PubSubConnection *currentPubSubConnection =
  50. UA_PubSubConnection_findConnectionbyId(server, connection);
  51. if(!currentPubSubConnection)
  52. return UA_STATUSCODE_BADNOTFOUND;
  53. UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
  54. //deep copy of the actual config
  55. UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
  56. *config = tmpPubSubConnectionConfig;
  57. return UA_STATUSCODE_GOOD;
  58. }
  59. UA_PubSubConnection *
  60. UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
  61. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  62. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
  63. return &server->pubSubManager.connections[i];
  64. }
  65. }
  66. return NULL;
  67. }
  68. void
  69. UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig) {
  70. UA_String_deleteMembers(&connectionConfig->name);
  71. UA_String_deleteMembers(&connectionConfig->transportProfileUri);
  72. UA_Variant_deleteMembers(&connectionConfig->connectionTransportSettings);
  73. UA_Variant_deleteMembers(&connectionConfig->address);
  74. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  75. UA_QualifiedName_deleteMembers(&connectionConfig->connectionProperties[i].key);
  76. UA_Variant_deleteMembers(&connectionConfig->connectionProperties[i].value);
  77. }
  78. UA_free(connectionConfig->connectionProperties);
  79. }
  80. void
  81. UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection) {
  82. //delete connection config
  83. UA_PubSubConnectionConfig_deleteMembers(connection->config);
  84. //remove contained WriterGroups
  85. UA_WriterGroup *writerGroup, *tmpWriterGroup;
  86. LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
  87. UA_Server_removeWriterGroup(server, writerGroup->identifier);
  88. }
  89. UA_NodeId_deleteMembers(&connection->identifier);
  90. if(connection->channel){
  91. connection->channel->close(connection->channel);
  92. }
  93. UA_free(connection->config);
  94. }
  95. UA_StatusCode
  96. UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
  97. const UA_WriterGroupConfig *writerGroupConfig,
  98. UA_NodeId *writerGroupIdentifier) {
  99. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  100. if(!writerGroupConfig)
  101. return UA_STATUSCODE_BADINVALIDARGUMENT;
  102. //search the connection by the given connectionIdentifier
  103. UA_PubSubConnection *currentConnectionContext =
  104. UA_PubSubConnection_findConnectionbyId(server, connection);
  105. if(!currentConnectionContext)
  106. return UA_STATUSCODE_BADNOTFOUND;
  107. //allocate memory for new WriterGroup
  108. UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
  109. if (!newWriterGroup)
  110. return UA_STATUSCODE_BADOUTOFMEMORY;
  111. newWriterGroup->linkedConnection = currentConnectionContext->identifier;
  112. UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
  113. if(writerGroupIdentifier){
  114. UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
  115. }
  116. UA_WriterGroupConfig tmpWriterGroupConfig;
  117. //deep copy of the config
  118. retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
  119. newWriterGroup->config = tmpWriterGroupConfig;
  120. retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
  121. LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
  122. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  123. addWriterGroupRepresentation(server, newWriterGroup);
  124. #endif
  125. return retVal;
  126. }
  127. UA_StatusCode
  128. UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
  129. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  130. if(!wg)
  131. return UA_STATUSCODE_BADNOTFOUND;
  132. UA_PubSubConnection *connection =
  133. UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  134. if(!connection)
  135. return UA_STATUSCODE_BADNOTFOUND;
  136. //unregister the publish callback
  137. if(UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId) != UA_STATUSCODE_GOOD)
  138. return UA_STATUSCODE_BADINTERNALERROR;
  139. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  140. removeWriterGroupRepresentation(server, wg);
  141. #endif
  142. UA_WriterGroup_deleteMembers(server, wg);
  143. UA_free(wg);
  144. return UA_STATUSCODE_GOOD;
  145. }
  146. /**********************************************/
  147. /* PublishedDataSet */
  148. /**********************************************/
  149. UA_StatusCode
  150. UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
  151. UA_PublishedDataSetConfig *dst) {
  152. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  153. memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
  154. retVal |= UA_String_copy(&src->name, &dst->name);
  155. switch(src->publishedDataSetType){
  156. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  157. //no additional items
  158. break;
  159. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  160. if (src->config.itemsTemplate.variablesToAddSize > 0){
  161. dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
  162. src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
  163. }
  164. for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
  165. retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
  166. &dst->config.itemsTemplate.variablesToAdd[i]);
  167. }
  168. retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
  169. &dst->config.itemsTemplate.metaData);
  170. break;
  171. default:
  172. return UA_STATUSCODE_BADINVALIDARGUMENT;
  173. }
  174. return retVal;
  175. }
  176. UA_StatusCode
  177. UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
  178. UA_PublishedDataSetConfig *config){
  179. if(!config)
  180. return UA_STATUSCODE_BADINVALIDARGUMENT;
  181. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  182. if(!currentPublishedDataSet)
  183. return UA_STATUSCODE_BADNOTFOUND;
  184. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  185. //deep copy of the actual config
  186. UA_PublishedDataSetConfig_copy(&currentPublishedDataSet->config, &tmpPublishedDataSetConfig);
  187. *config = tmpPublishedDataSetConfig;
  188. return UA_STATUSCODE_GOOD;
  189. }
  190. UA_PublishedDataSet *
  191. UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
  192. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  193. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
  194. return &server->pubSubManager.publishedDataSets[i];
  195. }
  196. }
  197. return NULL;
  198. }
  199. void
  200. UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
  201. //delete pds config
  202. UA_String_deleteMembers(&pdsConfig->name);
  203. switch (pdsConfig->publishedDataSetType){
  204. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  205. //no additional items
  206. break;
  207. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  208. if (pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
  209. for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
  210. UA_PublishedVariableDataType_deleteMembers(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
  211. }
  212. UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
  213. }
  214. UA_DataSetMetaDataType_deleteMembers(&pdsConfig->config.itemsTemplate.metaData);
  215. break;
  216. default:
  217. break;
  218. }
  219. }
  220. void
  221. UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
  222. UA_PublishedDataSetConfig_deleteMembers(&publishedDataSet->config);
  223. //delete PDS
  224. UA_DataSetMetaDataType_deleteMembers(&publishedDataSet->dataSetMetaData);
  225. UA_DataSetField *field, *tmpField;
  226. LIST_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
  227. UA_Server_removeDataSetField(server, field->identifier);
  228. }
  229. UA_NodeId_deleteMembers(&publishedDataSet->identifier);
  230. }
  231. UA_DataSetFieldResult
  232. UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
  233. const UA_DataSetFieldConfig *fieldConfig,
  234. UA_NodeId *fieldIdentifier) {
  235. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  236. if(!fieldConfig)
  237. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
  238. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
  239. if(currentDataSet == NULL)
  240. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  241. if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS)
  242. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTIMPLEMENTED, {0, 0}};
  243. UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
  244. if(!newField)
  245. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADINTERNALERROR, {0, 0}};
  246. UA_DataSetFieldConfig tmpFieldConfig;
  247. retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
  248. newField->config = tmpFieldConfig;
  249. UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
  250. if(fieldIdentifier != NULL){
  251. UA_NodeId_copy(&newField->identifier, fieldIdentifier);
  252. }
  253. newField->publishedDataSet = currentDataSet->identifier;
  254. //update major version of parent published data set
  255. currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
  256. LIST_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
  257. if(newField->config.field.variable.promotedField)
  258. currentDataSet->promotedFieldsCount++;
  259. currentDataSet->fieldSize++;
  260. UA_DataSetFieldResult result =
  261. {retVal, {currentDataSet->dataSetMetaData.configurationVersion.majorVersion,
  262. currentDataSet->dataSetMetaData.configurationVersion.minorVersion}};
  263. return result;
  264. }
  265. UA_DataSetFieldResult
  266. UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
  267. UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
  268. if(!currentField)
  269. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  270. UA_PublishedDataSet *parentPublishedDataSet =
  271. UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
  272. if(!parentPublishedDataSet)
  273. return (UA_DataSetFieldResult) {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  274. parentPublishedDataSet->fieldSize--;
  275. if(currentField->config.field.variable.promotedField)
  276. parentPublishedDataSet->promotedFieldsCount--;
  277. /* update major version of PublishedDataSet */
  278. parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
  279. UA_PubSubConfigurationVersionTimeDifference();
  280. UA_DataSetField_deleteMembers(currentField);
  281. UA_free(currentField);
  282. UA_DataSetFieldResult result =
  283. {UA_STATUSCODE_GOOD, {parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion,
  284. parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion}};
  285. return result;
  286. }
  287. /**********************************************/
  288. /* DataSetWriter */
  289. /**********************************************/
  290. UA_StatusCode
  291. UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
  292. UA_DataSetWriterConfig *dst){
  293. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  294. memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
  295. retVal |= UA_String_copy(&src->name, &dst->name);
  296. retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
  297. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  298. dst->dataSetWriterProperties = (UA_KeyValuePair *)
  299. UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
  300. if(!dst->dataSetWriterProperties)
  301. return UA_STATUSCODE_BADOUTOFMEMORY;
  302. for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
  303. retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
  304. }
  305. return retVal;
  306. }
  307. UA_StatusCode
  308. UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
  309. UA_DataSetWriterConfig *config){
  310. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  311. if(!config)
  312. return UA_STATUSCODE_BADINVALIDARGUMENT;
  313. UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  314. if(!currentDataSetWriter)
  315. return UA_STATUSCODE_BADNOTFOUND;
  316. UA_DataSetWriterConfig tmpWriterConfig;
  317. //deep copy of the actual config
  318. retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
  319. *config = tmpWriterConfig;
  320. return retVal;
  321. }
  322. UA_DataSetWriter *
  323. UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
  324. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  325. UA_WriterGroup *tmpWriterGroup;
  326. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
  327. UA_DataSetWriter *tmpWriter;
  328. LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
  329. if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
  330. return tmpWriter;
  331. }
  332. }
  333. }
  334. }
  335. return NULL;
  336. }
  337. void
  338. UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig) {
  339. UA_String_deleteMembers(&pdsConfig->name);
  340. UA_String_deleteMembers(&pdsConfig->dataSetName);
  341. for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
  342. UA_KeyValuePair_deleteMembers(&pdsConfig->dataSetWriterProperties[i]);
  343. }
  344. UA_free(pdsConfig->dataSetWriterProperties);
  345. UA_ExtensionObject_deleteMembers(&pdsConfig->messageSettings);
  346. }
  347. void
  348. UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter){
  349. UA_DataSetWriterConfig_deleteMembers(&dataSetWriter->config);
  350. //delete DataSetWriter
  351. UA_NodeId_deleteMembers(&dataSetWriter->identifier);
  352. UA_NodeId_deleteMembers(&dataSetWriter->linkedWriterGroup);
  353. UA_NodeId_deleteMembers(&dataSetWriter->connectedDataSet);
  354. LIST_REMOVE(dataSetWriter, listEntry);
  355. //delete lastSamples store
  356. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++){
  357. UA_DataValue_delete(dataSetWriter->lastSamples[i].value);
  358. }
  359. LIST_REMOVE(dataSetWriter, listEntry);
  360. UA_free(dataSetWriter->lastSamples);
  361. }
  362. /**********************************************/
  363. /* WriterGroup */
  364. /**********************************************/
  365. UA_StatusCode
  366. UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
  367. UA_WriterGroupConfig *dst){
  368. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  369. memcpy(dst, src, sizeof(UA_WriterGroupConfig));
  370. retVal |= UA_String_copy(&src->name, &dst->name);
  371. retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
  372. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  373. dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
  374. if(!dst->groupProperties)
  375. return UA_STATUSCODE_BADOUTOFMEMORY;
  376. for(size_t i = 0; i < src->groupPropertiesSize; i++){
  377. retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
  378. }
  379. return retVal;
  380. }
  381. UA_StatusCode
  382. UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
  383. UA_WriterGroupConfig *config){
  384. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  385. if(!config)
  386. return UA_STATUSCODE_BADINVALIDARGUMENT;
  387. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
  388. if(!currentWriterGroup){
  389. return UA_STATUSCODE_BADNOTFOUND;
  390. }
  391. UA_WriterGroupConfig tmpWriterGroupConfig;
  392. //deep copy of the actual config
  393. retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
  394. *config = tmpWriterGroupConfig;
  395. return retVal;
  396. }
  397. UA_StatusCode
  398. UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
  399. const UA_WriterGroupConfig *config){
  400. if(!config)
  401. return UA_STATUSCODE_BADINVALIDARGUMENT;
  402. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
  403. if(!currentWriterGroup)
  404. return UA_STATUSCODE_BADNOTFOUND;
  405. //The update functionality will be extended during the next PubSub batches.
  406. //Currently is only a change of the publishing interval possible.
  407. if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
  408. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
  409. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  410. UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
  411. } else if (currentWriterGroup->config.priority != config->priority) {
  412. UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "No or unsupported WriterGroup update.");
  413. }
  414. return UA_STATUSCODE_GOOD;
  415. }
  416. UA_WriterGroup *
  417. UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
  418. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  419. UA_WriterGroup *tmpWriterGroup;
  420. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
  421. if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
  422. return tmpWriterGroup;
  423. }
  424. }
  425. }
  426. return NULL;
  427. }
  428. void
  429. UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig){
  430. //delete writerGroup config
  431. UA_String_deleteMembers(&writerGroupConfig->name);
  432. UA_ExtensionObject_deleteMembers(&writerGroupConfig->transportSettings);
  433. UA_ExtensionObject_deleteMembers(&writerGroupConfig->messageSettings);
  434. for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
  435. UA_KeyValuePair_deleteMembers(&writerGroupConfig->groupProperties[i]);
  436. }
  437. UA_free(writerGroupConfig->groupProperties);
  438. }
  439. void
  440. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup) {
  441. UA_WriterGroupConfig_deleteMembers(&writerGroup->config);
  442. //delete WriterGroup
  443. //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
  444. UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
  445. LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
  446. UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
  447. }
  448. LIST_REMOVE(writerGroup, listEntry);
  449. UA_NodeId_deleteMembers(&writerGroup->linkedConnection);
  450. UA_NodeId_deleteMembers(&writerGroup->identifier);
  451. }
  452. UA_StatusCode
  453. UA_Server_addDataSetWriter(UA_Server *server,
  454. const UA_NodeId writerGroup, const UA_NodeId dataSet,
  455. const UA_DataSetWriterConfig *dataSetWriterConfig,
  456. UA_NodeId *writerIdentifier) {
  457. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  458. if(!dataSetWriterConfig)
  459. return UA_STATUSCODE_BADINVALIDARGUMENT;
  460. UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
  461. if(!currentDataSetContext)
  462. return UA_STATUSCODE_BADNOTFOUND;
  463. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  464. if(!wg)
  465. return UA_STATUSCODE_BADNOTFOUND;
  466. UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
  467. if(!newDataSetWriter)
  468. return UA_STATUSCODE_BADOUTOFMEMORY;
  469. //copy the config into the new dataSetWriter
  470. UA_DataSetWriterConfig tmpDataSetWriterConfig;
  471. retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
  472. newDataSetWriter->config = tmpDataSetWriterConfig;
  473. //save the current version of the connected PublishedDataSet
  474. newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
  475. //initialize the queue for the last values
  476. newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
  477. newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
  478. UA_calloc(newDataSetWriter->lastSamplesCount, sizeof(UA_DataSetWriterSample));
  479. if(!newDataSetWriter->lastSamples) {
  480. UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
  481. UA_free(newDataSetWriter);
  482. return UA_STATUSCODE_BADOUTOFMEMORY;
  483. }
  484. for(size_t i = 0; i < newDataSetWriter->lastSamplesCount; i++) {
  485. newDataSetWriter->lastSamples[i].value = (UA_DataValue *) UA_calloc(1, sizeof(UA_DataValue));
  486. if(!newDataSetWriter->lastSamples[i].value) {
  487. for(size_t j = 0; j < i; j++)
  488. UA_free(newDataSetWriter->lastSamples[j].value);
  489. UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
  490. UA_free(newDataSetWriter);
  491. return UA_STATUSCODE_BADOUTOFMEMORY;
  492. }
  493. }
  494. //connect PublishedDataSet with DataSetWriter
  495. newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
  496. newDataSetWriter->linkedWriterGroup = wg->identifier;
  497. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
  498. if(writerIdentifier != NULL)
  499. UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
  500. //add the new writer to the group
  501. LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
  502. wg->writersCount++;
  503. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  504. addDataSetWriterRepresentation(server, newDataSetWriter);
  505. #endif
  506. return retVal;
  507. }
  508. UA_StatusCode
  509. UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
  510. UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  511. if(!dataSetWriter)
  512. return UA_STATUSCODE_BADNOTFOUND;
  513. UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
  514. if(!linkedWriterGroup)
  515. return UA_STATUSCODE_BADNOTFOUND;
  516. linkedWriterGroup->writersCount--;
  517. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  518. removeDataSetWriterRepresentation(server, dataSetWriter);
  519. #endif
  520. //remove DataSetWriter from group
  521. UA_DataSetWriter_deleteMembers(server, dataSetWriter);
  522. UA_free(dataSetWriter);
  523. return UA_STATUSCODE_GOOD;
  524. }
  525. /**********************************************/
  526. /* DataSetField */
  527. /**********************************************/
  528. UA_StatusCode
  529. UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
  530. memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
  531. if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
  532. UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
  533. UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
  534. &dst->field.variable.publishParameters);
  535. } else {
  536. return UA_STATUSCODE_BADNOTSUPPORTED;
  537. }
  538. return UA_STATUSCODE_GOOD;
  539. }
  540. UA_StatusCode
  541. UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
  542. UA_DataSetFieldConfig *config) {
  543. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  544. if(!config)
  545. return UA_STATUSCODE_BADINVALIDARGUMENT;
  546. UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
  547. if(!currentDataSetField)
  548. return UA_STATUSCODE_BADNOTFOUND;
  549. UA_DataSetFieldConfig tmpFieldConfig;
  550. //deep copy of the actual config
  551. retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
  552. *config = tmpFieldConfig;
  553. return retVal;
  554. }
  555. UA_DataSetField *
  556. UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
  557. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  558. UA_DataSetField *tmpField;
  559. LIST_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
  560. if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
  561. return tmpField;
  562. }
  563. }
  564. }
  565. return NULL;
  566. }
  567. void
  568. UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig){
  569. if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
  570. UA_String_deleteMembers(&dataSetFieldConfig->field.variable.fieldNameAlias);
  571. UA_PublishedVariableDataType_deleteMembers(&dataSetFieldConfig->field.variable.publishParameters);
  572. }
  573. }
  574. void UA_DataSetField_deleteMembers(UA_DataSetField *field) {
  575. UA_DataSetFieldConfig_deleteMembers(&field->config);
  576. //delete DataSetField
  577. UA_NodeId_deleteMembers(&field->identifier);
  578. UA_NodeId_deleteMembers(&field->publishedDataSet);
  579. UA_FieldMetaData_deleteMembers(&field->fieldMetaData);
  580. UA_DataValue_deleteMembers(&field->lastValue);
  581. LIST_REMOVE(field, listEntry);
  582. }
  583. /*********************************************************/
  584. /* PublishValues handling */
  585. /*********************************************************/
  586. /**
  587. * Compare two variants. Internally used for value change detection.
  588. *
  589. * @return UA_TRUE if the value has changed
  590. */
  591. static UA_Boolean
  592. valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
  593. if(! (oldValue && newValue))
  594. return UA_FALSE;
  595. UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
  596. size_t oldValueEncodingSize, newValueEncodingSize;
  597. oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
  598. newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
  599. if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
  600. return UA_FALSE;
  601. if(oldValueEncodingSize != newValueEncodingSize)
  602. return UA_TRUE;
  603. if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
  604. return UA_FALSE;
  605. if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
  606. return UA_FALSE;
  607. UA_Byte *bufPosOldValue = oldValueEncoding->data;
  608. const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
  609. UA_Byte *bufPosNewValue = newValueEncoding->data;
  610. const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
  611. if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
  612. &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  613. return UA_FALSE;
  614. }
  615. if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
  616. &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  617. return UA_FALSE;
  618. }
  619. oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
  620. newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
  621. UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
  622. UA_ByteString_delete(oldValueEncoding);
  623. UA_ByteString_delete(newValueEncoding);
  624. return compareResult;
  625. }
  626. /**
  627. * Obtain the latest value for a specific DataSetField. This method is currently
  628. * called inside the DataSetMessage generation process.
  629. */
  630. static UA_StatusCode
  631. UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field) {
  632. /* Read the value */
  633. UA_ReadValueId rvid;
  634. UA_ReadValueId_init(&rvid);
  635. rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
  636. rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
  637. rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
  638. UA_DataValue value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
  639. UA_DataValue_deleteMembers(&field->lastValue);
  640. field->lastValue = value;
  641. return UA_STATUSCODE_GOOD;
  642. }
  643. static UA_StatusCode
  644. UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  645. UA_DataSetWriter *dataSetWriter) {
  646. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  647. if(!currentDataSet)
  648. return UA_STATUSCODE_BADNOTFOUND;
  649. //prepare DataSetMessageContent
  650. dataSetMessage->header.dataSetMessageValid = true;
  651. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
  652. dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
  653. dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
  654. UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
  655. if(!dataSetMessage->data.keyFrameData.dataSetFields)
  656. return UA_STATUSCODE_BADOUTOFMEMORY;
  657. UA_DataSetField *tmpDataSetField;
  658. size_t counter = 0;
  659. LIST_FOREACH(tmpDataSetField, &currentDataSet->fields, listEntry){
  660. if(UA_PubSubDataSetField_sampleValue(server, tmpDataSetField) == UA_STATUSCODE_GOOD){
  661. //include field into DSM
  662. UA_DataValue_init(&dataSetMessage->data.keyFrameData.dataSetFields[counter]);
  663. UA_DataValue_copy(&tmpDataSetField->lastValue, &dataSetMessage->data.keyFrameData.dataSetFields[counter]);
  664. if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0){
  665. dataSetMessage->data.keyFrameData.dataSetFields[counter].hasStatus = UA_FALSE;
  666. }
  667. if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0){
  668. dataSetMessage->data.keyFrameData.dataSetFields[counter].hasSourceTimestamp = UA_FALSE;
  669. if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0){
  670. dataSetMessage->data.keyFrameData.dataSetFields[counter].hasServerPicoseconds = UA_FALSE;
  671. }
  672. }
  673. if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0){
  674. dataSetMessage->data.keyFrameData.dataSetFields[counter].hasServerTimestamp = UA_FALSE;
  675. }
  676. //Update lastValue store
  677. UA_DataValue_deleteMembers(dataSetWriter->lastSamples[counter].value);
  678. UA_DataValue_copy(&tmpDataSetField->lastValue, dataSetWriter->lastSamples[counter++].value);
  679. }
  680. }
  681. return UA_STATUSCODE_GOOD;
  682. }
  683. static UA_StatusCode
  684. UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  685. UA_DataSetWriter *dataSetWriter) {
  686. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  687. if(!currentDataSet)
  688. return UA_STATUSCODE_BADNOTFOUND;
  689. //prepare DataSetMessageContent
  690. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  691. dataSetMessage->header.dataSetMessageValid = true;
  692. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
  693. UA_DataSetField *tmpDataSetField;
  694. size_t counter = 0;
  695. LIST_FOREACH(tmpDataSetField, &currentDataSet->fields, listEntry) {
  696. if(UA_PubSubDataSetField_sampleValue(server, tmpDataSetField) == UA_STATUSCODE_GOOD) {
  697. //check if the value has changed
  698. if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value->value, &tmpDataSetField->lastValue.value)){
  699. //increase fieldCount for current delta message
  700. dataSetMessage->data.deltaFrameData.fieldCount++;
  701. dataSetWriter->lastSamples[counter].valeChanged = UA_TRUE;
  702. } else {
  703. dataSetWriter->lastSamples[counter].valeChanged = UA_FALSE;
  704. }
  705. //update last stored sample
  706. UA_DataValue_init(dataSetWriter->lastSamples[counter].value);
  707. UA_DataValue_copy(&tmpDataSetField->lastValue, dataSetWriter->lastSamples[counter++].value);
  708. }
  709. }
  710. //allocate DeltaFrameFields
  711. UA_DataSetMessage_DeltaFrameField * deltaFields = (UA_DataSetMessage_DeltaFrameField *)
  712. UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
  713. if(!deltaFields)
  714. return UA_STATUSCODE_BADOUTOFMEMORY;
  715. dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
  716. size_t currentDeltaField = 0;
  717. for(size_t i = 0; i < currentDataSet->fieldSize; i++){
  718. if(dataSetWriter->lastSamples[i].valeChanged){
  719. deltaFields[currentDeltaField].fieldIndex = (UA_UInt16) i;
  720. UA_DataValue_copy(dataSetWriter->lastSamples[i].value, &deltaFields[currentDeltaField].fieldValue);
  721. dataSetWriter->lastSamples[i].valeChanged = false;
  722. if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0){
  723. dataSetMessage->data.deltaFrameData.deltaFrameFields[currentDeltaField].fieldValue.hasStatus = UA_FALSE;
  724. }
  725. if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0){
  726. dataSetMessage->data.deltaFrameData.deltaFrameFields[currentDeltaField].fieldValue.hasSourceTimestamp = UA_FALSE;
  727. if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0){
  728. dataSetMessage->data.deltaFrameData.deltaFrameFields[currentDeltaField].fieldValue.hasServerPicoseconds = UA_FALSE;
  729. }
  730. }
  731. if((dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0){
  732. dataSetMessage->data.deltaFrameData.deltaFrameFields[currentDeltaField].fieldValue.hasServerTimestamp = UA_FALSE;
  733. }
  734. currentDeltaField++;
  735. }
  736. }
  737. return UA_STATUSCODE_GOOD;
  738. }
  739. /**
  740. * Generate a DataSetMessage for the given writer.
  741. *
  742. * @param dataSetWriter ptr to corresponding writer
  743. * @return ptr to generated DataSetMessage
  744. */
  745. static UA_StatusCode
  746. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  747. UA_DataSetWriter *dataSetWriter) {
  748. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  749. if(!currentDataSet)
  750. return UA_STATUSCODE_BADNOTFOUND;
  751. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  752. //currently is only UADP supported. The configuration Flags are included inside the std. defined UA_UadpDataSetWriterMessageDataType
  753. UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
  754. if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  755. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  756. (dataSetWriter->config.messageSettings.content.decoded.type == &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
  757. dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *) dataSetWriter->config.messageSettings.content.decoded.data;
  758. } else {
  759. //create default flag configuration if no UadpDataSetWriterMessageDataType was passed in
  760. UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
  761. memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
  762. defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask) ((unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP |
  763. (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
  764. (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
  765. dataSetWriterMessageDataType = &defaultUadpConfiguration;
  766. }
  767. if(dataSetWriterMessageDataType->networkMessageNumber != 0 || dataSetWriterMessageDataType->dataSetOffset != 0 ||
  768. dataSetWriterMessageDataType->configuredSize !=0 ){
  769. UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "Static DSM configuration not supported. Using defaults");
  770. dataSetWriterMessageDataType->networkMessageNumber = 0;
  771. dataSetWriterMessageDataType->dataSetOffset = 0;
  772. dataSetWriterMessageDataType->configuredSize = 0;
  773. }
  774. //The encoding depends on the flags inside the writer config.
  775. if(dataSetWriter->config.dataSetFieldContentMask & (unsigned int) UA_DATASETFIELDCONTENTMASK_RAWDATAENCODING) {
  776. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
  777. } else if (dataSetWriter->config.dataSetFieldContentMask &
  778. ((unsigned int) UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP |
  779. (unsigned int) UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
  780. (unsigned int) UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS |
  781. (unsigned int) UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
  782. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
  783. } else {
  784. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
  785. }
  786. //Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.'
  787. if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION){
  788. dataSetMessage->header.configVersionMajorVersionEnabled = UA_TRUE;
  789. dataSetMessage->header.configVersionMajorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  790. }
  791. if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION){
  792. dataSetMessage->header.configVersionMinorVersionEnabled = UA_TRUE;
  793. dataSetMessage->header.configVersionMinorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  794. }
  795. if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  796. dataSetMessage->header.dataSetMessageSequenceNrEnabled = UA_TRUE;
  797. dataSetMessage->header.dataSetMessageSequenceNr = dataSetWriter->actualDataSetMessageSequenceCount;
  798. }
  799. if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  800. dataSetMessage->header.timestampEnabled = UA_TRUE;
  801. dataSetMessage->header.timestamp = UA_DateTime_now();
  802. if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
  803. dataSetMessage->header.picoSecondsIncluded = UA_FALSE;
  804. UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DSM picosecond field is currently not supported. Using defaults");
  805. }
  806. }
  807. if(dataSetWriterMessageDataType->dataSetMessageContentMask & (unsigned int) UA_UADPDATASETMESSAGECONTENTMASK_STATUS){
  808. dataSetMessage->header.statusEnabled = UA_FALSE;
  809. UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DSM status field is currently not supported. Using defaults");
  810. }
  811. if(dataSetWriter->actualDataSetMessageSequenceCount < UA_UINT16_MAX){
  812. dataSetWriter->actualDataSetMessageSequenceCount++;
  813. } else {
  814. dataSetWriter->actualDataSetMessageSequenceCount = 0;
  815. }
  816. //check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame.
  817. if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
  818. dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
  819. //realloc pds dependent memory
  820. dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
  821. dataSetWriter->lastSamples = (UA_DataSetWriterSample * ) UA_realloc(dataSetWriter->lastSamples,
  822. sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  823. if(!dataSetWriter->lastSamples)
  824. return UA_STATUSCODE_BADOUTOFMEMORY;
  825. for (size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
  826. dataSetWriter->lastSamples[i].value = (UA_DataValue *) UA_calloc(1, sizeof(UA_DataValue));
  827. if(!dataSetWriter->lastSamples[i].value)
  828. return UA_STATUSCODE_BADOUTOFMEMORY;
  829. }
  830. dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
  831. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  832. dataSetWriter->deltaFrameCounter = 0;
  833. } else if (currentDataSet->fieldSize == 1 || dataSetWriter->deltaFrameCounter == 0 || dataSetWriter->deltaFrameCounter > dataSetWriter->config.keyFrameCount){
  834. //@info the standard defines: if a PDS contains only one fields no delta messages should be generated
  835. //because they need more memory than a keyframe with 1 field.
  836. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  837. dataSetWriter->deltaFrameCounter = 1;
  838. } else {
  839. UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
  840. dataSetWriter->deltaFrameCounter++;
  841. }
  842. return UA_STATUSCODE_GOOD;
  843. }
  844. /*
  845. * This callback triggers the collection and publish of NetworkMessages and the contained DataSetMessages.
  846. */
  847. void
  848. UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  849. if(!writerGroup){
  850. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. WriterGroup not found");
  851. return;
  852. }
  853. if(writerGroup->writersCount <= 0)
  854. return;
  855. if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP) {
  856. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Unknown encoding type.");
  857. return;
  858. }
  859. //prevent error if the maxEncapsulatedDataSetMessageCount is set to 0->1
  860. writerGroup->config.maxEncapsulatedDataSetMessageCount = (UA_UInt16) (writerGroup->config.maxEncapsulatedDataSetMessageCount == 0 ||
  861. writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX
  862. ? 1 : writerGroup->config.maxEncapsulatedDataSetMessageCount);
  863. UA_DataSetMessage *dsmStore = (UA_DataSetMessage *) UA_calloc(writerGroup->writersCount, sizeof(UA_DataSetMessage));
  864. if(!dsmStore) {
  865. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage allocation failed");
  866. return;
  867. }
  868. memset(dsmStore, 0, sizeof(UA_DataSetMessage) * writerGroup->writersCount);
  869. //The binary DataSetMessage sizes are part of the payload. Memory is allocated on the stack.
  870. UA_STACKARRAY(UA_UInt16, dsmSizes, writerGroup->writersCount);
  871. memset(dsmSizes, 0, writerGroup->writersCount * sizeof(UA_UInt16));
  872. /*
  873. * Calculate the number of needed NetworkMessages. The previous allocated DataSetMessage array is
  874. * filled from left for combined DSM messages and from the right for single DSM.
  875. * Allocated DSM Array
  876. * +----------------------------+
  877. * |DSM1||DSM2||DSM3||DSM4||DSM5|
  878. * +--+----+-----+-----+-----+--+
  879. * | | | | |
  880. * | | | | |
  881. * +--v----v-----v-----v--+--v--+
  882. * | NM1 || NM2 | NM3 |
  883. * +----------------------+-----+
  884. * NetworkMessages
  885. */
  886. UA_UInt16 combinedNetworkMessageCount = 0, singleNetworkMessagesCount = 0;
  887. UA_DataSetWriter *tmpDataSetWriter;
  888. LIST_FOREACH(tmpDataSetWriter, &writerGroup->writers, listEntry){
  889. //if promoted fields are contained in the PublishedDataSet, then this DSM must encapsulated in one NM
  890. UA_PublishedDataSet *tmpPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, tmpDataSetWriter->connectedDataSet);
  891. if(!tmpPublishedDataSet) {
  892. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. PublishedDataSet not found");
  893. return;
  894. }
  895. if(tmpPublishedDataSet->promotedFieldsCount > 0) {
  896. if(UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[(writerGroup->writersCount - 1) - singleNetworkMessagesCount],
  897. tmpDataSetWriter) != UA_STATUSCODE_GOOD){
  898. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. DataSetMessage creation failed");
  899. return;
  900. };
  901. dsmSizes[(writerGroup->writersCount-1) - singleNetworkMessagesCount] = (UA_UInt16) UA_DataSetMessage_calcSizeBinary(&dsmStore[(writerGroup->writersCount-1)
  902. - singleNetworkMessagesCount]);
  903. singleNetworkMessagesCount++;
  904. } else {
  905. if(UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[combinedNetworkMessageCount], tmpDataSetWriter) != UA_STATUSCODE_GOOD){
  906. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. DataSetMessage creation failed");
  907. return;
  908. };
  909. dsmSizes[combinedNetworkMessageCount] = (UA_UInt16) UA_DataSetMessage_calcSizeBinary(&dsmStore[combinedNetworkMessageCount]);
  910. combinedNetworkMessageCount++;
  911. }
  912. }
  913. UA_UInt32 networkMessageCount = singleNetworkMessagesCount;
  914. if(combinedNetworkMessageCount != 0){
  915. combinedNetworkMessageCount = (UA_UInt16) (
  916. combinedNetworkMessageCount / writerGroup->config.maxEncapsulatedDataSetMessageCount +
  917. (combinedNetworkMessageCount % writerGroup->config.maxEncapsulatedDataSetMessageCount) == 0 ? 0 : 1);
  918. networkMessageCount += combinedNetworkMessageCount;
  919. }
  920. //Alloc memory for the NetworkMessages on the stack
  921. UA_STACKARRAY(UA_NetworkMessage, nmStore, networkMessageCount);
  922. memset(nmStore, 0, networkMessageCount * sizeof(UA_NetworkMessage));
  923. UA_UInt32 currentDSMPosition = 0;
  924. for(UA_UInt32 i = 0; i < networkMessageCount; i++) {
  925. nmStore[i].version = 1;
  926. nmStore[i].networkMessageType = UA_NETWORKMESSAGE_DATASET;
  927. //create combined NetworkMessages
  928. if(i < (networkMessageCount-singleNetworkMessagesCount)){
  929. if(combinedNetworkMessageCount - (i * writerGroup->config.maxEncapsulatedDataSetMessageCount)){
  930. if(combinedNetworkMessageCount == 1){
  931. nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) ((writerGroup->writersCount) - singleNetworkMessagesCount);
  932. currentDSMPosition = 0;
  933. } else {
  934. nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) writerGroup->config.maxEncapsulatedDataSetMessageCount;
  935. currentDSMPosition = i * writerGroup->config.maxEncapsulatedDataSetMessageCount;
  936. }
  937. //nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) writerGroup->config.maxEncapsulatedDataSetMessageCount;
  938. nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
  939. nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
  940. } else {
  941. currentDSMPosition = i * writerGroup->config.maxEncapsulatedDataSetMessageCount;
  942. nmStore[i].payloadHeader.dataSetPayloadHeader.count = (UA_Byte) (currentDSMPosition - ((i - 1) * writerGroup->config.maxEncapsulatedDataSetMessageCount)); //attention cast from uint32 to byte
  943. nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
  944. nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
  945. }
  946. } else {///create single NetworkMessages (1 DSM per NM)
  947. nmStore[i].payloadHeader.dataSetPayloadHeader.count = 1;
  948. currentDSMPosition = (UA_UInt32) combinedNetworkMessageCount + (i - combinedNetworkMessageCount/writerGroup->config.maxEncapsulatedDataSetMessageCount
  949. + (combinedNetworkMessageCount % writerGroup->config.maxEncapsulatedDataSetMessageCount) == 0 ? 0 : 1);
  950. nmStore[i].payload.dataSetPayload.dataSetMessages = &dsmStore[currentDSMPosition];
  951. nmStore->payload.dataSetPayload.sizes = &dsmSizes[currentDSMPosition];
  952. }
  953. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
  954. if(!connection){
  955. UA_LOG_ERROR(server->config.logger, UA_LOGCATEGORY_SERVER, "Publish failed. PubSubConnection invalid.");
  956. return;
  957. }
  958. //send the prepared messages
  959. UA_ByteString buf;
  960. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nmStore[i]);
  961. if(UA_ByteString_allocBuffer(&buf, msgSize) == UA_STATUSCODE_GOOD) {
  962. UA_Byte *bufPos = buf.data;
  963. memset(bufPos, 0, msgSize);
  964. const UA_Byte *bufEnd = &(buf.data[buf.length]);
  965. if(UA_NetworkMessage_encodeBinary(&nmStore[i], &bufPos, bufEnd) != UA_STATUSCODE_GOOD){
  966. UA_ByteString_deleteMembers(&buf);
  967. return;
  968. };
  969. connection->channel->send(connection->channel, NULL, &buf);
  970. }
  971. nmStore[i].payloadHeaderEnabled = UA_TRUE;
  972. //The stack allocated sizes field must be set to NULL to prevent invalid free.
  973. nmStore[i].payload.dataSetPayload.sizes = NULL;
  974. UA_ByteString_deleteMembers(&buf);
  975. UA_NetworkMessage_deleteMembers(&nmStore[i]);
  976. }
  977. }
  978. /*
  979. * Add new publishCallback. The first execution is triggered directly after creation.
  980. * @Warning - The duration (double) is currently casted to int. -> intervals smaller 1ms are not possible.
  981. */
  982. UA_StatusCode
  983. UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  984. UA_StatusCode retval =
  985. UA_PubSubManager_addRepeatedCallback(server, (UA_ServerCallback) UA_WriterGroup_publishCallback,
  986. writerGroup, (UA_UInt32) writerGroup->config.publishingInterval,
  987. &writerGroup->publishCallbackId);
  988. if(retval == UA_STATUSCODE_GOOD)
  989. writerGroup->publishCallbackIsRegistered = true;
  990. //run once after creation
  991. UA_WriterGroup_publishCallback(server, writerGroup);
  992. return retval;
  993. }
  994. #endif /* UA_ENABLE_PUBSUB */