ua_pubsub_writer.c 83 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. * Copyright (c) 2019 Kalycito Infotech Private Limited
  8. */
  9. #include <open62541/server_pubsub.h>
  10. #include "server/ua_server_internal.h"
  11. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  12. #include "ua_pubsub.h"
  13. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  14. #include "ua_pubsub_ns0.h"
  15. #endif
  16. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  17. #include "ua_types_encoding_binary.h"
  18. #endif
  19. #define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */
  20. /* Forward declaration */
  21. static void
  22. UA_WriterGroup_clear(UA_Server *server, UA_WriterGroup *writerGroup);
  23. static void
  24. UA_DataSetField_clear(UA_DataSetField *field);
  25. /**********************************************/
  26. /* Connection */
  27. /**********************************************/
  28. UA_StatusCode
  29. UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
  30. UA_PubSubConnectionConfig *dst) {
  31. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  32. memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
  33. retVal |= UA_String_copy(&src->name, &dst->name);
  34. retVal |= UA_Variant_copy(&src->address, &dst->address);
  35. retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
  36. retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
  37. if(src->connectionPropertiesSize > 0){
  38. dst->connectionProperties = (UA_KeyValuePair *)
  39. UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
  40. if(!dst->connectionProperties){
  41. return UA_STATUSCODE_BADOUTOFMEMORY;
  42. }
  43. for(size_t i = 0; i < src->connectionPropertiesSize; i++){
  44. retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
  45. &dst->connectionProperties[i].key);
  46. retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
  47. &dst->connectionProperties[i].value);
  48. }
  49. }
  50. return retVal;
  51. }
  52. UA_StatusCode
  53. UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
  54. UA_PubSubConnectionConfig *config) {
  55. if(!config)
  56. return UA_STATUSCODE_BADINVALIDARGUMENT;
  57. UA_PubSubConnection *currentPubSubConnection =
  58. UA_PubSubConnection_findConnectionbyId(server, connection);
  59. if(!currentPubSubConnection)
  60. return UA_STATUSCODE_BADNOTFOUND;
  61. UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
  62. //deep copy of the actual config
  63. UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
  64. *config = tmpPubSubConnectionConfig;
  65. return UA_STATUSCODE_GOOD;
  66. }
  67. UA_PubSubConnection *
  68. UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
  69. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  70. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
  71. return &server->pubSubManager.connections[i];
  72. }
  73. }
  74. return NULL;
  75. }
  76. void
  77. UA_PubSubConnectionConfig_clear(UA_PubSubConnectionConfig *connectionConfig) {
  78. UA_String_clear(&connectionConfig->name);
  79. UA_String_clear(&connectionConfig->transportProfileUri);
  80. UA_Variant_clear(&connectionConfig->connectionTransportSettings);
  81. UA_Variant_clear(&connectionConfig->address);
  82. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  83. UA_QualifiedName_clear(&connectionConfig->connectionProperties[i].key);
  84. UA_Variant_clear(&connectionConfig->connectionProperties[i].value);
  85. }
  86. UA_free(connectionConfig->connectionProperties);
  87. }
  88. void
  89. UA_PubSubConnection_clear(UA_Server *server, UA_PubSubConnection *connection) {
  90. //delete connection config
  91. UA_PubSubConnectionConfig_clear(connection->config);
  92. //remove contained WriterGroups
  93. UA_WriterGroup *writerGroup, *tmpWriterGroup;
  94. LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
  95. UA_Server_removeWriterGroup(server, writerGroup->identifier);
  96. }
  97. /* remove contained ReaderGroups */
  98. UA_ReaderGroup *readerGroups, *tmpReaderGroup;
  99. LIST_FOREACH_SAFE(readerGroups, &connection->readerGroups, listEntry, tmpReaderGroup){
  100. UA_Server_removeReaderGroup(server, readerGroups->identifier);
  101. }
  102. UA_NodeId_clear(&connection->identifier);
  103. if(connection->channel){
  104. connection->channel->close(connection->channel);
  105. }
  106. UA_free(connection->config);
  107. }
  108. UA_StatusCode
  109. UA_PubSubConnection_regist(UA_Server *server, UA_NodeId *connectionIdentifier) {
  110. UA_PubSubConnection *connection =
  111. UA_PubSubConnection_findConnectionbyId(server, *connectionIdentifier);
  112. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  113. if(connection == NULL) {
  114. return UA_STATUSCODE_BADNOTFOUND;
  115. }
  116. retval = connection->channel->regist(connection->channel, NULL, NULL);
  117. if(retval != UA_STATUSCODE_GOOD) {
  118. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  119. "register channel failed: 0x%x!", retval);
  120. }
  121. return retval;
  122. }
  123. UA_StatusCode
  124. UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
  125. const UA_WriterGroupConfig *writerGroupConfig,
  126. UA_NodeId *writerGroupIdentifier) {
  127. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  128. if(!writerGroupConfig)
  129. return UA_STATUSCODE_BADINVALIDARGUMENT;
  130. //search the connection by the given connectionIdentifier
  131. UA_PubSubConnection *currentConnectionContext =
  132. UA_PubSubConnection_findConnectionbyId(server, connection);
  133. if(!currentConnectionContext)
  134. return UA_STATUSCODE_BADNOTFOUND;
  135. if(currentConnectionContext->config->configurationFrozen){
  136. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  137. "Adding WriterGroup failed. PubSubConnection is frozen.");
  138. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  139. }
  140. //allocate memory for new WriterGroup
  141. UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
  142. if(!newWriterGroup)
  143. return UA_STATUSCODE_BADOUTOFMEMORY;
  144. newWriterGroup->linkedConnection = currentConnectionContext->identifier;
  145. UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
  146. if(writerGroupIdentifier){
  147. UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
  148. }
  149. //deep copy of the config
  150. UA_WriterGroupConfig tmpWriterGroupConfig;
  151. retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
  152. if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
  153. UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
  154. tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
  155. tmpWriterGroupConfig.messageSettings.content.decoded.type =
  156. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
  157. tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
  158. }
  159. newWriterGroup->config = tmpWriterGroupConfig;
  160. LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
  161. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  162. addWriterGroupRepresentation(server, newWriterGroup);
  163. #endif
  164. return retVal;
  165. }
  166. UA_StatusCode
  167. UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
  168. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  169. if(!wg)
  170. return UA_STATUSCODE_BADNOTFOUND;
  171. if(wg->config.configurationFrozen){
  172. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  173. "Delete WriterGroup failed. WriterGroup is frozen.");
  174. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  175. }
  176. UA_PubSubConnection *connection =
  177. UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  178. if(!connection)
  179. return UA_STATUSCODE_BADNOTFOUND;
  180. if(connection->config->configurationFrozen){
  181. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  182. "Delete WriterGroup failed. PubSubConnection is frozen.");
  183. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  184. }
  185. if(wg->state == UA_PUBSUBSTATE_OPERATIONAL){
  186. //unregister the publish callback
  187. UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
  188. }
  189. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  190. removeGroupRepresentation(server, wg);
  191. #endif
  192. UA_WriterGroup_clear(server, wg);
  193. LIST_REMOVE(wg, listEntry);
  194. UA_free(wg);
  195. return UA_STATUSCODE_GOOD;
  196. }
  197. UA_StatusCode
  198. UA_Server_freezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writerGroup){
  199. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  200. if(!wg)
  201. return UA_STATUSCODE_BADNOTFOUND;
  202. //PubSubConnection freezeCounter++
  203. UA_PubSubConnection *pubSubConnection = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  204. pubSubConnection->configurationFreezeCounter++;
  205. pubSubConnection->config->configurationFrozen = UA_TRUE;
  206. //WriterGroup freeze
  207. wg->config.configurationFrozen = UA_TRUE;
  208. //DataSetWriter freeze
  209. UA_DataSetWriter *dataSetWriter;
  210. LIST_FOREACH(dataSetWriter, &wg->writers, listEntry){
  211. dataSetWriter->config.configurationFrozen = UA_TRUE;
  212. //PublishedDataSet freezeCounter++
  213. UA_PublishedDataSet *publishedDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  214. publishedDataSet->configurationFreezeCounter++;
  215. publishedDataSet->config.configurationFrozen = UA_TRUE;
  216. //DataSetFields freeze
  217. UA_DataSetField *dataSetField;
  218. TAILQ_FOREACH(dataSetField, &publishedDataSet->fields, listEntry){
  219. dataSetField->config.configurationFrozen = UA_TRUE;
  220. }
  221. }
  222. //if(wg->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE){
  223. //UA_NetworkMessage_calculateBufferAndOffets(server, w)
  224. //}
  225. return UA_STATUSCODE_GOOD;
  226. }
  227. UA_StatusCode
  228. UA_Server_unfreezeWriterGroupConfiguration(UA_Server *server, const UA_NodeId writerGroup){
  229. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  230. if(!wg)
  231. return UA_STATUSCODE_BADNOTFOUND;
  232. //if(wg->config.rtLevel == UA_PUBSUB_RT_NONE){
  233. // UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  234. // "PubSub configuration freeze without RT configuration has no effect.");
  235. // return UA_STATUSCODE_BADCONFIGURATIONERROR;
  236. //}
  237. //PubSubConnection freezeCounter--
  238. UA_PubSubConnection *pubSubConnection = UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  239. pubSubConnection->configurationFreezeCounter--;
  240. if(pubSubConnection->configurationFreezeCounter == 0){
  241. pubSubConnection->config->configurationFrozen = UA_FALSE;
  242. }
  243. //WriterGroup unfreeze
  244. wg->config.configurationFrozen = UA_FALSE;
  245. //DataSetWriter unfreeze
  246. UA_DataSetWriter *dataSetWriter;
  247. LIST_FOREACH(dataSetWriter, &wg->writers, listEntry){
  248. UA_PublishedDataSet *publishedDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  249. //PublishedDataSet freezeCounter--
  250. publishedDataSet->configurationFreezeCounter--;
  251. if(publishedDataSet->configurationFreezeCounter == 0){
  252. publishedDataSet->config.configurationFrozen = UA_FALSE;
  253. UA_DataSetField *dataSetField;
  254. TAILQ_FOREACH(dataSetField, &publishedDataSet->fields, listEntry){
  255. dataSetField->config.configurationFrozen = UA_FALSE;
  256. }
  257. }
  258. dataSetWriter->config.configurationFrozen = UA_FALSE;
  259. }
  260. return UA_STATUSCODE_GOOD;
  261. }
  262. UA_StatusCode UA_EXPORT
  263. UA_Server_setWriterGroupOperational(UA_Server *server, const UA_NodeId writerGroup){
  264. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  265. if(!wg)
  266. return UA_STATUSCODE_BADNOTFOUND;
  267. return UA_WriterGroup_setPubSubState(server, UA_PUBSUBSTATE_OPERATIONAL, wg);
  268. }
  269. UA_StatusCode UA_EXPORT
  270. UA_Server_setWriterGroupDisabled(UA_Server *server, const UA_NodeId writerGroup){
  271. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  272. if(!wg)
  273. return UA_STATUSCODE_BADNOTFOUND;
  274. return UA_WriterGroup_setPubSubState(server, UA_PUBSUBSTATE_DISABLED, wg);
  275. }
  276. /**********************************************/
  277. /* PublishedDataSet */
  278. /**********************************************/
  279. UA_StatusCode
  280. UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
  281. UA_PublishedDataSetConfig *dst) {
  282. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  283. memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
  284. retVal |= UA_String_copy(&src->name, &dst->name);
  285. switch(src->publishedDataSetType){
  286. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  287. //no additional items
  288. break;
  289. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  290. if(src->config.itemsTemplate.variablesToAddSize > 0){
  291. dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
  292. src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
  293. }
  294. for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
  295. retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
  296. &dst->config.itemsTemplate.variablesToAdd[i]);
  297. }
  298. retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
  299. &dst->config.itemsTemplate.metaData);
  300. break;
  301. default:
  302. return UA_STATUSCODE_BADINVALIDARGUMENT;
  303. }
  304. return retVal;
  305. }
  306. UA_StatusCode
  307. UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
  308. UA_PublishedDataSetConfig *config){
  309. if(!config)
  310. return UA_STATUSCODE_BADINVALIDARGUMENT;
  311. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  312. if(!currentPublishedDataSet)
  313. return UA_STATUSCODE_BADNOTFOUND;
  314. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  315. //deep copy of the actual config
  316. UA_PublishedDataSetConfig_copy(&currentPublishedDataSet->config, &tmpPublishedDataSetConfig);
  317. *config = tmpPublishedDataSetConfig;
  318. return UA_STATUSCODE_GOOD;
  319. }
  320. UA_StatusCode
  321. UA_Server_getPublishedDataSetMetaData(UA_Server *server, const UA_NodeId pds, UA_DataSetMetaDataType *metaData){
  322. if(!metaData)
  323. return UA_STATUSCODE_BADINVALIDARGUMENT;
  324. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  325. if(!currentPublishedDataSet)
  326. return UA_STATUSCODE_BADNOTFOUND;
  327. UA_DataSetMetaDataType tmpDataSetMetaData;
  328. if(UA_DataSetMetaDataType_copy(&currentPublishedDataSet->dataSetMetaData, &tmpDataSetMetaData) != UA_STATUSCODE_GOOD)
  329. return UA_STATUSCODE_BADINTERNALERROR;
  330. *metaData = tmpDataSetMetaData;
  331. return UA_STATUSCODE_GOOD;
  332. }
  333. UA_PublishedDataSet *
  334. UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
  335. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  336. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
  337. return &server->pubSubManager.publishedDataSets[i];
  338. }
  339. }
  340. return NULL;
  341. }
  342. void
  343. UA_PublishedDataSetConfig_clear(UA_PublishedDataSetConfig *pdsConfig){
  344. //delete pds config
  345. UA_String_clear(&pdsConfig->name);
  346. switch (pdsConfig->publishedDataSetType){
  347. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  348. //no additional items
  349. break;
  350. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  351. if(pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
  352. for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
  353. UA_PublishedVariableDataType_clear(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
  354. }
  355. UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
  356. }
  357. UA_DataSetMetaDataType_clear(&pdsConfig->config.itemsTemplate.metaData);
  358. break;
  359. default:
  360. break;
  361. }
  362. }
  363. void
  364. UA_PublishedDataSet_clear(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
  365. UA_PublishedDataSetConfig_clear(&publishedDataSet->config);
  366. //delete PDS
  367. UA_DataSetField *field, *tmpField;
  368. TAILQ_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
  369. UA_Server_removeDataSetField(server, field->identifier);
  370. }
  371. UA_DataSetMetaDataType_clear(&publishedDataSet->dataSetMetaData);
  372. UA_NodeId_clear(&publishedDataSet->identifier);
  373. }
  374. static UA_StatusCode
  375. generateFieldMetaData(UA_Server *server, UA_DataSetField *field, UA_FieldMetaData *fieldMetaData){
  376. switch (field->config.dataSetFieldType){
  377. case UA_PUBSUB_DATASETFIELD_VARIABLE:
  378. if(UA_String_copy(&field->config.field.variable.fieldNameAlias, &fieldMetaData->name) != UA_STATUSCODE_GOOD)
  379. return UA_STATUSCODE_BADINTERNALERROR;
  380. fieldMetaData->description = UA_LOCALIZEDTEXT_ALLOC("", "");
  381. fieldMetaData->dataSetFieldId = UA_GUID_NULL;
  382. //ToDo after freeze PR, the value source must be checked (other behavior for static value source)
  383. if(field->config.field.variable.staticValueSourceEnabled){
  384. fieldMetaData->arrayDimensions = (UA_UInt32 *) UA_calloc(
  385. field->config.field.variable.staticValueSource.value.arrayDimensionsSize, sizeof(UA_UInt32));
  386. if(fieldMetaData->arrayDimensions == NULL)
  387. return UA_STATUSCODE_BADOUTOFMEMORY;
  388. memcpy(fieldMetaData->arrayDimensions, field->config.field.variable.staticValueSource.value.arrayDimensions,
  389. sizeof(UA_UInt32) *field->config.field.variable.staticValueSource.value.arrayDimensionsSize);
  390. fieldMetaData->arrayDimensionsSize = field->config.field.variable.staticValueSource.value.arrayDimensionsSize;
  391. if(UA_NodeId_copy(&field->config.field.variable.staticValueSource.value.type->typeId,
  392. &fieldMetaData->dataType) != UA_STATUSCODE_GOOD){
  393. if(fieldMetaData->arrayDimensions){
  394. UA_free(fieldMetaData->arrayDimensions);
  395. return UA_STATUSCODE_BADINTERNALERROR;
  396. }
  397. }
  398. fieldMetaData->properties = NULL;
  399. fieldMetaData->propertiesSize = 0;
  400. //TODO collect value rank for the static field source
  401. fieldMetaData->fieldFlags = UA_DATASETFIELDFLAGS_NONE;
  402. return UA_STATUSCODE_GOOD;
  403. }
  404. UA_Variant value;
  405. UA_Variant_init(&value);
  406. if(UA_Server_readArrayDimensions(server, field->config.field.variable.publishParameters.publishedVariable,
  407. &value) != UA_STATUSCODE_GOOD){
  408. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  409. "PubSub meta data generation. Reading ArrayDimension failed.");
  410. } else {
  411. fieldMetaData->arrayDimensions = (UA_UInt32 *) UA_calloc(value.arrayDimensionsSize, sizeof(UA_UInt32));
  412. if(fieldMetaData->arrayDimensions == NULL)
  413. return UA_STATUSCODE_BADOUTOFMEMORY;
  414. memcpy(fieldMetaData->arrayDimensions, value.arrayDimensions, sizeof(UA_UInt32)*value.arrayDimensionsSize);
  415. fieldMetaData->arrayDimensionsSize = value.arrayDimensionsSize;
  416. }
  417. if(UA_Server_readDataType(server, field->config.field.variable.publishParameters.publishedVariable,
  418. &fieldMetaData->dataType) != UA_STATUSCODE_GOOD){
  419. if(fieldMetaData->arrayDimensions){
  420. UA_free(fieldMetaData->arrayDimensions);
  421. return UA_STATUSCODE_BADINTERNALERROR;
  422. }
  423. }
  424. fieldMetaData->properties = NULL;
  425. fieldMetaData->propertiesSize = 0;
  426. UA_Int32 valueRank;
  427. if(UA_Server_readValueRank(server, field->config.field.variable.publishParameters.publishedVariable,
  428. &valueRank) != UA_STATUSCODE_GOOD){
  429. if(fieldMetaData->arrayDimensions){
  430. UA_free(fieldMetaData->arrayDimensions);
  431. return UA_STATUSCODE_BADINTERNALERROR;
  432. }
  433. }
  434. fieldMetaData->valueRank = valueRank;
  435. if(field->config.field.variable.promotedField){
  436. fieldMetaData->fieldFlags = UA_DATASETFIELDFLAGS_PROMOTEDFIELD;
  437. } else {
  438. fieldMetaData->fieldFlags = UA_DATASETFIELDFLAGS_NONE;
  439. }
  440. //TODO collect the following fields
  441. //fieldMetaData.builtInType
  442. //fieldMetaData.maxStringLength
  443. return UA_STATUSCODE_GOOD;
  444. case UA_PUBSUB_DATASETFIELD_EVENT:
  445. return UA_STATUSCODE_BADNOTSUPPORTED;
  446. default:
  447. return UA_STATUSCODE_BADNOTSUPPORTED;
  448. }
  449. }
  450. UA_DataSetFieldResult
  451. UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
  452. const UA_DataSetFieldConfig *fieldConfig,
  453. UA_NodeId *fieldIdentifier) {
  454. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  455. UA_DataSetFieldResult result = {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
  456. if(!fieldConfig)
  457. return result;
  458. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
  459. if(currentDataSet == NULL){
  460. result.result = UA_STATUSCODE_BADNOTFOUND;
  461. return result;
  462. }
  463. if(currentDataSet->config.configurationFrozen){
  464. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  465. "Adding DataSetField failed. PublishedDataSet is frozen.");
  466. result.result = UA_STATUSCODE_BADCONFIGURATIONERROR;
  467. return result;
  468. }
  469. if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
  470. result.result = UA_STATUSCODE_BADNOTIMPLEMENTED;
  471. return result;
  472. }
  473. UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
  474. if(!newField){
  475. result.result = UA_STATUSCODE_BADINTERNALERROR;
  476. return result;
  477. }
  478. UA_DataSetFieldConfig tmpFieldConfig;
  479. retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
  480. newField->config = tmpFieldConfig;
  481. UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
  482. if(fieldIdentifier != NULL){
  483. UA_NodeId_copy(&newField->identifier, fieldIdentifier);
  484. }
  485. newField->publishedDataSet = currentDataSet->identifier;
  486. //update major version of parent published data set
  487. currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
  488. /* The order of DataSetFields should be the same in both creating and publishing.
  489. * So adding DataSetFields at the the end of the DataSets using TAILQ structure */
  490. if (currentDataSet->fieldSize != 0)
  491. TAILQ_INSERT_TAIL(&currentDataSet->fields, newField, listEntry);
  492. else
  493. TAILQ_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
  494. if(newField->config.field.variable.promotedField)
  495. currentDataSet->promotedFieldsCount++;
  496. currentDataSet->fieldSize++;
  497. //generate fieldMetadata within the DataSetMetaData
  498. currentDataSet->dataSetMetaData.fieldsSize++;
  499. UA_FieldMetaData *fieldMetaData = (UA_FieldMetaData *) UA_realloc(currentDataSet->dataSetMetaData.fields, currentDataSet->dataSetMetaData.fieldsSize *
  500. sizeof(UA_FieldMetaData));
  501. if(!fieldMetaData){
  502. result.result = UA_STATUSCODE_BADOUTOFMEMORY;
  503. return result;
  504. }
  505. currentDataSet->dataSetMetaData.fields = fieldMetaData;
  506. UA_FieldMetaData_init(&fieldMetaData[currentDataSet->fieldSize-1]);
  507. if(generateFieldMetaData(server, newField, &fieldMetaData[currentDataSet->fieldSize-1]) != UA_STATUSCODE_GOOD){
  508. UA_Server_removeDataSetField(server, newField->identifier);
  509. result.result = UA_STATUSCODE_BADINTERNALERROR;
  510. return result;
  511. }
  512. result.result = retVal;
  513. result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  514. result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  515. return result;
  516. }
  517. UA_DataSetFieldResult
  518. UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
  519. UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
  520. UA_DataSetFieldResult result = {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  521. if(!currentField)
  522. return result;
  523. if(currentField->config.configurationFrozen){
  524. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  525. "Remove DataSetField failed. DataSetField is frozen.");
  526. result.result = UA_STATUSCODE_BADCONFIGURATIONERROR;
  527. return result;
  528. }
  529. UA_PublishedDataSet *parentPublishedDataSet =
  530. UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
  531. if(!parentPublishedDataSet)
  532. return result;
  533. if(parentPublishedDataSet->config.configurationFrozen){
  534. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  535. "Remove DataSetField failed. PublishedDataSet is frozen.");
  536. result.result = UA_STATUSCODE_BADCONFIGURATIONERROR;
  537. return result;
  538. }
  539. parentPublishedDataSet->fieldSize--;
  540. if(currentField->config.field.variable.promotedField)
  541. parentPublishedDataSet->promotedFieldsCount--;
  542. /* update major version of PublishedDataSet */
  543. parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
  544. UA_PubSubConfigurationVersionTimeDifference();
  545. currentField->fieldMetaData.arrayDimensions = NULL;
  546. currentField->fieldMetaData.properties = NULL;
  547. currentField->fieldMetaData.name = UA_STRING_NULL;
  548. currentField->fieldMetaData.description.locale = UA_STRING_NULL;
  549. currentField->fieldMetaData.description.text = UA_STRING_NULL;
  550. UA_DataSetField_clear(currentField);
  551. TAILQ_REMOVE(&parentPublishedDataSet->fields, currentField, listEntry);
  552. UA_free(currentField);
  553. result.result = UA_STATUSCODE_GOOD;
  554. //regenerate DataSetMetaData
  555. parentPublishedDataSet->dataSetMetaData.fieldsSize--;
  556. if(parentPublishedDataSet->dataSetMetaData.fieldsSize > 0){
  557. for(size_t i = 0; i < parentPublishedDataSet->dataSetMetaData.fieldsSize+1; i++) {
  558. UA_FieldMetaData_clear(&parentPublishedDataSet->dataSetMetaData.fields[i]);
  559. }
  560. UA_free(parentPublishedDataSet->dataSetMetaData.fields);
  561. UA_FieldMetaData *fieldMetaData = (UA_FieldMetaData *) UA_calloc(parentPublishedDataSet->dataSetMetaData.fieldsSize,
  562. sizeof(UA_FieldMetaData));
  563. if(!fieldMetaData){
  564. result.result = UA_STATUSCODE_BADOUTOFMEMORY;
  565. return result;
  566. }
  567. UA_DataSetField *tmpDSF;
  568. size_t counter = 0;
  569. TAILQ_FOREACH(tmpDSF, &parentPublishedDataSet->fields, listEntry){
  570. UA_FieldMetaData tmpFieldMetaData;
  571. UA_FieldMetaData_init(&tmpFieldMetaData);
  572. if(generateFieldMetaData(server, tmpDSF, &tmpFieldMetaData) != UA_STATUSCODE_GOOD){
  573. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  574. "PubSub MetaData generation failed!");
  575. //TODO how to ensure consistency if the metadata regeneration fails
  576. result.result = UA_STATUSCODE_BADINTERNALERROR;
  577. }
  578. fieldMetaData[counter++] = tmpFieldMetaData;
  579. }
  580. parentPublishedDataSet->dataSetMetaData.fields = fieldMetaData;
  581. } else {
  582. UA_FieldMetaData_delete(parentPublishedDataSet->dataSetMetaData.fields);
  583. parentPublishedDataSet->dataSetMetaData.fields = NULL;
  584. }
  585. result.configurationVersion.majorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion;
  586. result.configurationVersion.minorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion;
  587. return result;
  588. }
  589. /**********************************************/
  590. /* DataSetWriter */
  591. /**********************************************/
  592. UA_StatusCode
  593. UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
  594. UA_DataSetWriterConfig *dst){
  595. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  596. memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
  597. retVal |= UA_String_copy(&src->name, &dst->name);
  598. retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
  599. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  600. if (src->dataSetWriterPropertiesSize > 0) {
  601. dst->dataSetWriterProperties = (UA_KeyValuePair *)
  602. UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
  603. if(!dst->dataSetWriterProperties)
  604. return UA_STATUSCODE_BADOUTOFMEMORY;
  605. for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
  606. retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
  607. }
  608. }
  609. return retVal;
  610. }
  611. UA_StatusCode
  612. UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
  613. UA_DataSetWriterConfig *config){
  614. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  615. if(!config)
  616. return UA_STATUSCODE_BADINVALIDARGUMENT;
  617. UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  618. if(!currentDataSetWriter)
  619. return UA_STATUSCODE_BADNOTFOUND;
  620. UA_DataSetWriterConfig tmpWriterConfig;
  621. //deep copy of the actual config
  622. retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
  623. *config = tmpWriterConfig;
  624. return retVal;
  625. }
  626. UA_DataSetWriter *
  627. UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
  628. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  629. UA_WriterGroup *tmpWriterGroup;
  630. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
  631. UA_DataSetWriter *tmpWriter;
  632. LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
  633. if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
  634. return tmpWriter;
  635. }
  636. }
  637. }
  638. }
  639. return NULL;
  640. }
  641. void
  642. UA_DataSetWriterConfig_clear(UA_DataSetWriterConfig *pdsConfig) {
  643. UA_String_clear(&pdsConfig->name);
  644. UA_String_clear(&pdsConfig->dataSetName);
  645. for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
  646. UA_KeyValuePair_clear(&pdsConfig->dataSetWriterProperties[i]);
  647. }
  648. UA_free(pdsConfig->dataSetWriterProperties);
  649. UA_ExtensionObject_clear(&pdsConfig->messageSettings);
  650. }
  651. static void
  652. UA_DataSetWriter_clear(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
  653. UA_DataSetWriterConfig_clear(&dataSetWriter->config);
  654. //delete DataSetWriter
  655. UA_NodeId_clear(&dataSetWriter->identifier);
  656. UA_NodeId_clear(&dataSetWriter->linkedWriterGroup);
  657. UA_NodeId_clear(&dataSetWriter->connectedDataSet);
  658. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  659. //delete lastSamples store
  660. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
  661. UA_DataValue_clear(&dataSetWriter->lastSamples[i].value);
  662. }
  663. UA_free(dataSetWriter->lastSamples);
  664. dataSetWriter->lastSamples = NULL;
  665. dataSetWriter->lastSamplesCount = 0;
  666. #endif
  667. }
  668. //state machine methods not part of the open62541 state machine API
  669. UA_StatusCode
  670. UA_DataSetWriter_setPubSubState(UA_Server *server, UA_PubSubState state, UA_DataSetWriter *dataSetWriter){
  671. switch(state){
  672. case UA_PUBSUBSTATE_DISABLED:
  673. switch (dataSetWriter->state){
  674. case UA_PUBSUBSTATE_DISABLED:
  675. return UA_STATUSCODE_GOOD;
  676. case UA_PUBSUBSTATE_PAUSED:
  677. dataSetWriter->state = UA_PUBSUBSTATE_DISABLED;
  678. //no further action is required
  679. break;
  680. case UA_PUBSUBSTATE_OPERATIONAL:
  681. dataSetWriter->state = UA_PUBSUBSTATE_DISABLED;
  682. break;
  683. case UA_PUBSUBSTATE_ERROR:
  684. break;
  685. default:
  686. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  687. "Received unknown PubSub state!");
  688. }
  689. break;
  690. case UA_PUBSUBSTATE_PAUSED:
  691. switch (dataSetWriter->state){
  692. case UA_PUBSUBSTATE_DISABLED:
  693. break;
  694. case UA_PUBSUBSTATE_PAUSED:
  695. return UA_STATUSCODE_GOOD;
  696. case UA_PUBSUBSTATE_OPERATIONAL:
  697. break;
  698. case UA_PUBSUBSTATE_ERROR:
  699. break;
  700. default:
  701. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  702. "Received unknown PubSub state!");
  703. }
  704. break;
  705. case UA_PUBSUBSTATE_OPERATIONAL:
  706. switch (dataSetWriter->state){
  707. case UA_PUBSUBSTATE_DISABLED:
  708. dataSetWriter->state = UA_PUBSUBSTATE_OPERATIONAL;
  709. break;
  710. case UA_PUBSUBSTATE_PAUSED:
  711. break;
  712. case UA_PUBSUBSTATE_OPERATIONAL:
  713. return UA_STATUSCODE_GOOD;
  714. case UA_PUBSUBSTATE_ERROR:
  715. break;
  716. default:
  717. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  718. "Received unknown PubSub state!");
  719. }
  720. break;
  721. case UA_PUBSUBSTATE_ERROR:
  722. switch (dataSetWriter->state){
  723. case UA_PUBSUBSTATE_DISABLED:
  724. break;
  725. case UA_PUBSUBSTATE_PAUSED:
  726. break;
  727. case UA_PUBSUBSTATE_OPERATIONAL:
  728. break;
  729. case UA_PUBSUBSTATE_ERROR:
  730. return UA_STATUSCODE_GOOD;
  731. default:
  732. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  733. "Received unknown PubSub state!");
  734. }
  735. break;
  736. default:
  737. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  738. "Received unknown PubSub state!");
  739. }
  740. return UA_STATUSCODE_GOOD;
  741. }
  742. /**********************************************/
  743. /* WriterGroup */
  744. /**********************************************/
  745. UA_StatusCode
  746. UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
  747. UA_WriterGroupConfig *dst){
  748. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  749. memcpy(dst, src, sizeof(UA_WriterGroupConfig));
  750. retVal |= UA_String_copy(&src->name, &dst->name);
  751. retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
  752. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  753. if (src->groupPropertiesSize > 0) {
  754. dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
  755. if(!dst->groupProperties)
  756. return UA_STATUSCODE_BADOUTOFMEMORY;
  757. for(size_t i = 0; i < src->groupPropertiesSize; i++){
  758. retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
  759. }
  760. }
  761. return retVal;
  762. }
  763. UA_StatusCode
  764. UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
  765. UA_WriterGroupConfig *config){
  766. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  767. if(!config)
  768. return UA_STATUSCODE_BADINVALIDARGUMENT;
  769. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
  770. if(!currentWriterGroup){
  771. return UA_STATUSCODE_BADNOTFOUND;
  772. }
  773. UA_WriterGroupConfig tmpWriterGroupConfig;
  774. //deep copy of the actual config
  775. retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
  776. *config = tmpWriterGroupConfig;
  777. return retVal;
  778. }
  779. UA_StatusCode
  780. UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
  781. const UA_WriterGroupConfig *config){
  782. if(!config)
  783. return UA_STATUSCODE_BADINVALIDARGUMENT;
  784. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
  785. if(!currentWriterGroup)
  786. return UA_STATUSCODE_BADNOTFOUND;
  787. if(currentWriterGroup->config.configurationFrozen){
  788. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  789. "Modify WriterGroup failed. WriterGroup is frozen.");
  790. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  791. }
  792. //The update functionality will be extended during the next PubSub batches.
  793. //Currently is only a change of the publishing interval possible.
  794. if(currentWriterGroup->config.maxEncapsulatedDataSetMessageCount != config->maxEncapsulatedDataSetMessageCount){
  795. currentWriterGroup->config.maxEncapsulatedDataSetMessageCount = config->maxEncapsulatedDataSetMessageCount;
  796. if(currentWriterGroup->config.messageSettings.encoding == UA_EXTENSIONOBJECT_ENCODED_NOBODY) {
  797. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  798. "MaxEncapsulatedDataSetMessag need enabled 'PayloadHeader' within the message settings.");
  799. }
  800. }
  801. if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
  802. if(currentWriterGroup->config.rtLevel == UA_PUBSUB_RT_NONE && currentWriterGroup->state == UA_PUBSUBSTATE_OPERATIONAL){
  803. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
  804. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  805. UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
  806. } else {
  807. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  808. }
  809. }
  810. if(currentWriterGroup->config.priority != config->priority) {
  811. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  812. "No or unsupported WriterGroup update.");
  813. }
  814. return UA_STATUSCODE_GOOD;
  815. }
  816. UA_WriterGroup *
  817. UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
  818. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  819. UA_WriterGroup *tmpWriterGroup;
  820. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
  821. if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
  822. return tmpWriterGroup;
  823. }
  824. }
  825. }
  826. return NULL;
  827. }
  828. void
  829. UA_WriterGroupConfig_clear(UA_WriterGroupConfig *writerGroupConfig){
  830. //delete writerGroup config
  831. UA_String_clear(&writerGroupConfig->name);
  832. UA_ExtensionObject_clear(&writerGroupConfig->transportSettings);
  833. UA_ExtensionObject_clear(&writerGroupConfig->messageSettings);
  834. for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
  835. UA_KeyValuePair_clear(&writerGroupConfig->groupProperties[i]);
  836. }
  837. UA_free(writerGroupConfig->groupProperties);
  838. }
  839. static void
  840. UA_WriterGroup_clear(UA_Server *server, UA_WriterGroup *writerGroup) {
  841. UA_WriterGroupConfig_clear(&writerGroup->config);
  842. //delete WriterGroup
  843. //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
  844. UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
  845. LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
  846. UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
  847. }
  848. UA_NodeId_clear(&writerGroup->linkedConnection);
  849. UA_NodeId_clear(&writerGroup->identifier);
  850. }
  851. UA_StatusCode
  852. UA_WriterGroup_setPubSubState(UA_Server *server, UA_PubSubState state, UA_WriterGroup *writerGroup){
  853. UA_DataSetWriter *dataSetWriter;
  854. switch(state){
  855. case UA_PUBSUBSTATE_DISABLED:
  856. switch (writerGroup->state){
  857. case UA_PUBSUBSTATE_DISABLED:
  858. return UA_STATUSCODE_GOOD;
  859. case UA_PUBSUBSTATE_PAUSED:
  860. break;
  861. case UA_PUBSUBSTATE_OPERATIONAL:
  862. UA_PubSubManager_removeRepeatedPubSubCallback(server, writerGroup->publishCallbackId);
  863. LIST_FOREACH(dataSetWriter, &writerGroup->writers, listEntry){
  864. UA_DataSetWriter_setPubSubState(server, UA_PUBSUBSTATE_DISABLED, dataSetWriter);
  865. }
  866. writerGroup->state = UA_PUBSUBSTATE_DISABLED;
  867. break;
  868. case UA_PUBSUBSTATE_ERROR:
  869. break;
  870. default:
  871. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  872. "Received unknown PubSub state!");
  873. }
  874. break;
  875. case UA_PUBSUBSTATE_PAUSED:
  876. switch (writerGroup->state){
  877. case UA_PUBSUBSTATE_DISABLED:
  878. break;
  879. case UA_PUBSUBSTATE_PAUSED:
  880. return UA_STATUSCODE_GOOD;
  881. case UA_PUBSUBSTATE_OPERATIONAL:
  882. break;
  883. case UA_PUBSUBSTATE_ERROR:
  884. break;
  885. default:
  886. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  887. "Received unknown PubSub state!");
  888. }
  889. break;
  890. case UA_PUBSUBSTATE_OPERATIONAL:
  891. switch (writerGroup->state){
  892. case UA_PUBSUBSTATE_DISABLED:
  893. writerGroup->state = UA_PUBSUBSTATE_OPERATIONAL;
  894. UA_PubSubManager_removeRepeatedPubSubCallback(server, writerGroup->publishCallbackId);
  895. LIST_FOREACH(dataSetWriter, &writerGroup->writers, listEntry){
  896. UA_DataSetWriter_setPubSubState(server, UA_PUBSUBSTATE_OPERATIONAL, dataSetWriter);
  897. }
  898. UA_WriterGroup_addPublishCallback(server, writerGroup);
  899. break;
  900. case UA_PUBSUBSTATE_PAUSED:
  901. break;
  902. case UA_PUBSUBSTATE_OPERATIONAL:
  903. return UA_STATUSCODE_GOOD;
  904. case UA_PUBSUBSTATE_ERROR:
  905. break;
  906. default:
  907. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  908. "Received unknown PubSub state!");
  909. }
  910. break;
  911. case UA_PUBSUBSTATE_ERROR:
  912. switch (writerGroup->state){
  913. case UA_PUBSUBSTATE_DISABLED:
  914. break;
  915. case UA_PUBSUBSTATE_PAUSED:
  916. break;
  917. case UA_PUBSUBSTATE_OPERATIONAL:
  918. break;
  919. case UA_PUBSUBSTATE_ERROR:
  920. return UA_STATUSCODE_GOOD;
  921. default:
  922. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  923. "Received unknown PubSub state!");
  924. }
  925. break;
  926. default:
  927. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  928. "Received unknown PubSub state!");
  929. }
  930. return UA_STATUSCODE_GOOD;
  931. }
  932. UA_StatusCode
  933. UA_Server_addDataSetWriter(UA_Server *server,
  934. const UA_NodeId writerGroup, const UA_NodeId dataSet,
  935. const UA_DataSetWriterConfig *dataSetWriterConfig,
  936. UA_NodeId *writerIdentifier) {
  937. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  938. if(!dataSetWriterConfig)
  939. return UA_STATUSCODE_BADINVALIDARGUMENT;
  940. UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
  941. if(!currentDataSetContext)
  942. return UA_STATUSCODE_BADNOTFOUND;
  943. if(currentDataSetContext->config.configurationFrozen){
  944. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  945. "Adding DataSetWriter failed. PublishedDataSet is frozen.");
  946. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  947. }
  948. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  949. if(!wg)
  950. return UA_STATUSCODE_BADNOTFOUND;
  951. if(wg->config.configurationFrozen){
  952. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  953. "Adding DataSetWriter failed. WriterGroup is frozen.");
  954. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  955. }
  956. UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
  957. if(!newDataSetWriter)
  958. return UA_STATUSCODE_BADOUTOFMEMORY;
  959. //copy the config into the new dataSetWriter
  960. UA_DataSetWriterConfig tmpDataSetWriterConfig;
  961. retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
  962. newDataSetWriter->config = tmpDataSetWriterConfig;
  963. //save the current version of the connected PublishedDataSet
  964. newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
  965. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  966. //initialize the queue for the last values
  967. if (currentDataSetContext->fieldSize > 0) {
  968. newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
  969. UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
  970. if(!newDataSetWriter->lastSamples) {
  971. UA_DataSetWriterConfig_clear(&newDataSetWriter->config);
  972. UA_free(newDataSetWriter);
  973. return UA_STATUSCODE_BADOUTOFMEMORY;
  974. }
  975. newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
  976. }
  977. #endif
  978. //connect PublishedDataSet with DataSetWriter
  979. newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
  980. newDataSetWriter->linkedWriterGroup = wg->identifier;
  981. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
  982. if(writerIdentifier != NULL)
  983. UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
  984. //add the new writer to the group
  985. LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
  986. wg->writersCount++;
  987. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  988. addDataSetWriterRepresentation(server, newDataSetWriter);
  989. #endif
  990. return retVal;
  991. }
  992. UA_StatusCode
  993. UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
  994. UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  995. if(!dataSetWriter)
  996. return UA_STATUSCODE_BADNOTFOUND;
  997. if(dataSetWriter->config.configurationFrozen){
  998. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  999. "Remove DataSetWriter failed. DataSetWriter is frozen.");
  1000. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  1001. }
  1002. UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
  1003. if(!linkedWriterGroup)
  1004. return UA_STATUSCODE_BADNOTFOUND;
  1005. if(linkedWriterGroup->config.configurationFrozen){
  1006. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1007. "Remove DataSetWriter failed. WriterGroup is frozen.");
  1008. return UA_STATUSCODE_BADCONFIGURATIONERROR;
  1009. }
  1010. UA_PublishedDataSet *publishedDataSet = UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1011. if(!publishedDataSet)
  1012. return UA_STATUSCODE_BADNOTFOUND;
  1013. linkedWriterGroup->writersCount--;
  1014. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  1015. removeDataSetWriterRepresentation(server, dataSetWriter);
  1016. #endif
  1017. //remove DataSetWriter from group
  1018. UA_DataSetWriter_clear(server, dataSetWriter);
  1019. LIST_REMOVE(dataSetWriter, listEntry);
  1020. UA_free(dataSetWriter);
  1021. return UA_STATUSCODE_GOOD;
  1022. }
  1023. /**********************************************/
  1024. /* DataSetField */
  1025. /**********************************************/
  1026. UA_StatusCode
  1027. UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
  1028. memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
  1029. if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
  1030. UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
  1031. UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
  1032. &dst->field.variable.publishParameters);
  1033. } else {
  1034. return UA_STATUSCODE_BADNOTSUPPORTED;
  1035. }
  1036. return UA_STATUSCODE_GOOD;
  1037. }
  1038. UA_StatusCode
  1039. UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
  1040. UA_DataSetFieldConfig *config) {
  1041. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1042. if(!config)
  1043. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1044. UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
  1045. if(!currentDataSetField)
  1046. return UA_STATUSCODE_BADNOTFOUND;
  1047. UA_DataSetFieldConfig tmpFieldConfig;
  1048. //deep copy of the actual config
  1049. retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
  1050. *config = tmpFieldConfig;
  1051. return retVal;
  1052. }
  1053. UA_DataSetField *
  1054. UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
  1055. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  1056. UA_DataSetField *tmpField;
  1057. TAILQ_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
  1058. if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
  1059. return tmpField;
  1060. }
  1061. }
  1062. }
  1063. return NULL;
  1064. }
  1065. void
  1066. UA_DataSetFieldConfig_clear(UA_DataSetFieldConfig *dataSetFieldConfig){
  1067. if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
  1068. UA_String_clear(&dataSetFieldConfig->field.variable.fieldNameAlias);
  1069. UA_PublishedVariableDataType_clear(&dataSetFieldConfig->field.variable.publishParameters);
  1070. }
  1071. }
  1072. static void
  1073. UA_DataSetField_clear(UA_DataSetField *field) {
  1074. UA_DataSetFieldConfig_clear(&field->config);
  1075. //delete DataSetField
  1076. UA_NodeId_clear(&field->identifier);
  1077. UA_NodeId_clear(&field->publishedDataSet);
  1078. UA_FieldMetaData_clear(&field->fieldMetaData);
  1079. }
  1080. /*********************************************************/
  1081. /* PublishValues handling */
  1082. /*********************************************************/
  1083. /**
  1084. * Compare two variants. Internally used for value change detection.
  1085. *
  1086. * @return true if the value has changed
  1087. */
  1088. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1089. static UA_Boolean
  1090. valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
  1091. if(! (oldValue && newValue))
  1092. return false;
  1093. UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
  1094. size_t oldValueEncodingSize, newValueEncodingSize;
  1095. oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1096. newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1097. if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
  1098. return false;
  1099. if(oldValueEncodingSize != newValueEncodingSize)
  1100. return true;
  1101. if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
  1102. return false;
  1103. if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
  1104. return false;
  1105. UA_Byte *bufPosOldValue = oldValueEncoding->data;
  1106. const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
  1107. UA_Byte *bufPosNewValue = newValueEncoding->data;
  1108. const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
  1109. if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
  1110. &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1111. return false;
  1112. }
  1113. if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
  1114. &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1115. return false;
  1116. }
  1117. oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
  1118. newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
  1119. UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
  1120. UA_ByteString_delete(oldValueEncoding);
  1121. UA_ByteString_delete(newValueEncoding);
  1122. return compareResult;
  1123. }
  1124. #endif
  1125. /**
  1126. * Obtain the latest value for a specific DataSetField. This method is currently
  1127. * called inside the DataSetMessage generation process.
  1128. */
  1129. static void
  1130. UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
  1131. UA_DataValue *value) {
  1132. /* Read the value */
  1133. if(field->config.field.variable.staticValueSourceEnabled == UA_FALSE){
  1134. UA_ReadValueId rvid;
  1135. UA_ReadValueId_init(&rvid);
  1136. rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
  1137. rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
  1138. rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
  1139. *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
  1140. } else {
  1141. value->value.storageType = UA_VARIANT_DATA_NODELETE;
  1142. *value = field->config.field.variable.staticValueSource;
  1143. }
  1144. }
  1145. static UA_StatusCode
  1146. UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1147. UA_DataSetWriter *dataSetWriter) {
  1148. UA_PublishedDataSet *currentDataSet =
  1149. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1150. if(!currentDataSet)
  1151. return UA_STATUSCODE_BADNOTFOUND;
  1152. /* Prepare DataSetMessageContent */
  1153. dataSetMessage->header.dataSetMessageValid = true;
  1154. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
  1155. dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
  1156. dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
  1157. UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
  1158. if(!dataSetMessage->data.keyFrameData.dataSetFields)
  1159. return UA_STATUSCODE_BADOUTOFMEMORY;
  1160. #ifdef UA_ENABLE_JSON_ENCODING
  1161. /* json: insert fieldnames used as json keys */
  1162. dataSetMessage->data.keyFrameData.fieldNames =
  1163. (UA_String *)UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_STRING]);
  1164. if(!dataSetMessage->data.keyFrameData.fieldNames)
  1165. return UA_STATUSCODE_BADOUTOFMEMORY;
  1166. #endif
  1167. /* Loop over the fields */
  1168. size_t counter = 0;
  1169. UA_DataSetField *dsf;
  1170. TAILQ_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1171. #ifdef UA_ENABLE_JSON_ENCODING
  1172. /* json: store the fieldNameAlias*/
  1173. UA_String_copy(&dsf->config.field.variable.fieldNameAlias,
  1174. &dataSetMessage->data.keyFrameData.fieldNames[counter]);
  1175. #endif
  1176. /* Sample the value */
  1177. UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
  1178. UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
  1179. /* Deactivate statuscode? */
  1180. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1181. dfv->hasStatus = false;
  1182. /* Deactivate timestamps */
  1183. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1184. (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1185. dfv->hasSourceTimestamp = false;
  1186. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1187. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1188. dfv->hasSourcePicoseconds = false;
  1189. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1190. (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1191. dfv->hasServerTimestamp = false;
  1192. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1193. (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1194. dfv->hasServerPicoseconds = false;
  1195. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1196. /* Update lastValue store */
  1197. UA_DataValue_clear(&dataSetWriter->lastSamples[counter].value);
  1198. UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
  1199. #endif
  1200. counter++;
  1201. }
  1202. return UA_STATUSCODE_GOOD;
  1203. }
  1204. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1205. static UA_StatusCode
  1206. UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
  1207. UA_DataSetMessage *dataSetMessage,
  1208. UA_DataSetWriter *dataSetWriter) {
  1209. UA_PublishedDataSet *currentDataSet =
  1210. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1211. if(!currentDataSet)
  1212. return UA_STATUSCODE_BADNOTFOUND;
  1213. /* Prepare DataSetMessageContent */
  1214. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1215. dataSetMessage->header.dataSetMessageValid = true;
  1216. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
  1217. if (currentDataSet->fieldSize == 0) {
  1218. return UA_STATUSCODE_GOOD;
  1219. }
  1220. UA_DataSetField *dsf;
  1221. size_t counter = 0;
  1222. TAILQ_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1223. /* Sample the value */
  1224. UA_DataValue value;
  1225. UA_DataValue_init(&value);
  1226. UA_PubSubDataSetField_sampleValue(server, dsf, &value);
  1227. /* Check if the value has changed */
  1228. if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
  1229. /* increase fieldCount for current delta message */
  1230. dataSetMessage->data.deltaFrameData.fieldCount++;
  1231. dataSetWriter->lastSamples[counter].valueChanged = true;
  1232. /* Update last stored sample */
  1233. UA_DataValue_clear(&dataSetWriter->lastSamples[counter].value);
  1234. dataSetWriter->lastSamples[counter].value = value;
  1235. } else {
  1236. UA_DataValue_clear(&value);
  1237. dataSetWriter->lastSamples[counter].valueChanged = false;
  1238. }
  1239. counter++;
  1240. }
  1241. /* Allocate DeltaFrameFields */
  1242. UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
  1243. UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
  1244. if(!deltaFields)
  1245. return UA_STATUSCODE_BADOUTOFMEMORY;
  1246. dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
  1247. size_t currentDeltaField = 0;
  1248. for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
  1249. if(!dataSetWriter->lastSamples[i].valueChanged)
  1250. continue;
  1251. UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
  1252. dff->fieldIndex = (UA_UInt16) i;
  1253. UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
  1254. dataSetWriter->lastSamples[i].valueChanged = false;
  1255. /* Deactivate statuscode? */
  1256. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1257. dff->fieldValue.hasStatus = false;
  1258. /* Deactivate timestamps? */
  1259. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1260. dff->fieldValue.hasSourceTimestamp = false;
  1261. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1262. dff->fieldValue.hasServerPicoseconds = false;
  1263. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1264. dff->fieldValue.hasServerTimestamp = false;
  1265. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1266. dff->fieldValue.hasServerPicoseconds = false;
  1267. currentDeltaField++;
  1268. }
  1269. return UA_STATUSCODE_GOOD;
  1270. }
  1271. #endif
  1272. /**
  1273. * Generate a DataSetMessage for the given writer.
  1274. *
  1275. * @param dataSetWriter ptr to corresponding writer
  1276. * @return ptr to generated DataSetMessage
  1277. */
  1278. static UA_StatusCode
  1279. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1280. UA_DataSetWriter *dataSetWriter) {
  1281. UA_PublishedDataSet *currentDataSet =
  1282. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1283. if(!currentDataSet)
  1284. return UA_STATUSCODE_BADNOTFOUND;
  1285. /* Reset the message */
  1286. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1287. /* store messageType to switch between json or uadp (default) */
  1288. UA_UInt16 messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1289. UA_JsonDataSetWriterMessageDataType *jsonDataSetWriterMessageDataType = NULL;
  1290. /* The configuration Flags are included
  1291. * inside the std. defined UA_UadpDataSetWriterMessageDataType */
  1292. UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
  1293. UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
  1294. if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1295. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1296. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1297. &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
  1298. dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
  1299. dataSetWriter->config.messageSettings.content.decoded.data;
  1300. /* type is UADP */
  1301. messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1302. } else if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1303. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1304. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1305. &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE])) {
  1306. jsonDataSetWriterMessageDataType = (UA_JsonDataSetWriterMessageDataType *)
  1307. dataSetWriter->config.messageSettings.content.decoded.data;
  1308. /* type is JSON */
  1309. messageType = UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE;
  1310. } else {
  1311. /* create default flag configuration if no
  1312. * UadpDataSetWriterMessageDataType was passed in */
  1313. memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
  1314. defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
  1315. ((u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
  1316. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
  1317. dataSetWriterMessageDataType = &defaultUadpConfiguration;
  1318. }
  1319. /* Sanity-test the configuration */
  1320. if(dataSetWriterMessageDataType &&
  1321. (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
  1322. dataSetWriterMessageDataType->dataSetOffset != 0 ||
  1323. dataSetWriterMessageDataType->configuredSize != 0)) {
  1324. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1325. "Static DSM configuration not supported. Using defaults");
  1326. dataSetWriterMessageDataType->networkMessageNumber = 0;
  1327. dataSetWriterMessageDataType->dataSetOffset = 0;
  1328. dataSetWriterMessageDataType->configuredSize = 0;
  1329. }
  1330. /* The field encoding depends on the flags inside the writer config.
  1331. * TODO: This can be moved to the encoding layer. */
  1332. if(dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_RAWDATA
  1333. ) {
  1334. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
  1335. } else if((u64)dataSetWriter->config.dataSetFieldContentMask &
  1336. ((u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
  1337. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
  1338. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
  1339. } else {
  1340. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
  1341. }
  1342. if(messageType == UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE) {
  1343. /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
  1344. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1345. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION) {
  1346. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1347. dataSetMessage->header.configVersionMajorVersion =
  1348. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1349. }
  1350. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1351. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION) {
  1352. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1353. dataSetMessage->header.configVersionMinorVersion =
  1354. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1355. }
  1356. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1357. (u64)UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1358. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1359. dataSetMessage->header.dataSetMessageSequenceNr =
  1360. dataSetWriter->actualDataSetMessageSequenceCount;
  1361. }
  1362. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1363. (u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1364. dataSetMessage->header.timestampEnabled = true;
  1365. dataSetMessage->header.timestamp = UA_DateTime_now();
  1366. }
  1367. /* TODO: Picoseconds resolution not supported atm */
  1368. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1369. (u64)UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
  1370. dataSetMessage->header.picoSecondsIncluded = false;
  1371. }
  1372. /* TODO: Statuscode not supported yet */
  1373. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1374. (u64)UA_UADPDATASETMESSAGECONTENTMASK_STATUS) {
  1375. dataSetMessage->header.statusEnabled = false;
  1376. }
  1377. } else if(messageType == UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE) {
  1378. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1379. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1380. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1381. dataSetMessage->header.configVersionMajorVersion =
  1382. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1383. }
  1384. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1385. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1386. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1387. dataSetMessage->header.configVersionMinorVersion =
  1388. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1389. }
  1390. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1391. (u64)UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1392. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1393. dataSetMessage->header.dataSetMessageSequenceNr =
  1394. dataSetWriter->actualDataSetMessageSequenceCount;
  1395. }
  1396. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1397. (u64)UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1398. dataSetMessage->header.timestampEnabled = true;
  1399. dataSetMessage->header.timestamp = UA_DateTime_now();
  1400. }
  1401. /* TODO: Statuscode not supported yet */
  1402. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1403. (u64)UA_JSONDATASETMESSAGECONTENTMASK_STATUS) {
  1404. dataSetMessage->header.statusEnabled = false;
  1405. }
  1406. }
  1407. /* Set the sequence count. Automatically rolls over to zero */
  1408. dataSetWriter->actualDataSetMessageSequenceCount++;
  1409. /* JSON does not differ between deltaframes and keyframes, only keyframes are currently used. */
  1410. if(messageType != UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
  1411. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1412. /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
  1413. if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
  1414. dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
  1415. /* Remove old samples */
  1416. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
  1417. UA_DataValue_clear(&dataSetWriter->lastSamples[i].value);
  1418. /* Realloc pds dependent memory */
  1419. dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
  1420. UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
  1421. UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1422. if(!newSamplesArray)
  1423. return UA_STATUSCODE_BADOUTOFMEMORY;
  1424. dataSetWriter->lastSamples = newSamplesArray;
  1425. memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1426. dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
  1427. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1428. dataSetWriter->deltaFrameCounter = 0;
  1429. return UA_STATUSCODE_GOOD;
  1430. }
  1431. /* The standard defines: if a PDS contains only one fields no delta messages
  1432. * should be generated because they need more memory than a keyframe with 1
  1433. * field. */
  1434. if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
  1435. dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
  1436. UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
  1437. dataSetWriter->deltaFrameCounter++;
  1438. return UA_STATUSCODE_GOOD;
  1439. }
  1440. dataSetWriter->deltaFrameCounter = 1;
  1441. #endif
  1442. }
  1443. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1444. return UA_STATUSCODE_GOOD;
  1445. }
  1446. static UA_StatusCode
  1447. sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
  1448. UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
  1449. UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
  1450. #ifdef UA_ENABLE_JSON_ENCODING
  1451. UA_NetworkMessage nm;
  1452. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1453. nm.version = 1;
  1454. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1455. nm.payloadHeaderEnabled = true;
  1456. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1457. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1458. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1459. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1460. UA_ByteString buf;
  1461. size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true);
  1462. size_t stackSize = 1;
  1463. if(msgSize <= UA_MAX_STACKBUF)
  1464. stackSize = msgSize;
  1465. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1466. buf.data = stackBuf;
  1467. buf.length = msgSize;
  1468. if(msgSize > UA_MAX_STACKBUF) {
  1469. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1470. if(retval != UA_STATUSCODE_GOOD)
  1471. return retval;
  1472. }
  1473. /* Encode the message */
  1474. UA_Byte *bufPos = buf.data;
  1475. memset(bufPos, 0, msgSize);
  1476. const UA_Byte *bufEnd = &buf.data[buf.length];
  1477. retval = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true);
  1478. if(retval != UA_STATUSCODE_GOOD) {
  1479. if(msgSize > UA_MAX_STACKBUF)
  1480. UA_ByteString_clear(&buf);
  1481. return retval;
  1482. }
  1483. /* Send the prepared messages */
  1484. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1485. if(msgSize > UA_MAX_STACKBUF)
  1486. UA_ByteString_clear(&buf);
  1487. #endif
  1488. return retval;
  1489. }
  1490. static UA_StatusCode
  1491. sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
  1492. UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
  1493. UA_ExtensionObject *messageSettings,
  1494. UA_ExtensionObject *transportSettings) {
  1495. if(messageSettings->content.decoded.type !=
  1496. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
  1497. return UA_STATUSCODE_BADINTERNALERROR;
  1498. UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
  1499. messageSettings->content.decoded.data;
  1500. UA_NetworkMessage nm;
  1501. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1502. nm.publisherIdEnabled =
  1503. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
  1504. nm.groupHeaderEnabled =
  1505. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
  1506. nm.groupHeader.writerGroupIdEnabled =
  1507. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
  1508. nm.groupHeader.groupVersionEnabled =
  1509. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
  1510. nm.groupHeader.networkMessageNumberEnabled =
  1511. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
  1512. nm.groupHeader.sequenceNumberEnabled =
  1513. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
  1514. nm.payloadHeaderEnabled =
  1515. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
  1516. nm.timestampEnabled =
  1517. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
  1518. nm.picosecondsEnabled =
  1519. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
  1520. nm.dataSetClassIdEnabled =
  1521. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
  1522. nm.promotedFieldsEnabled =
  1523. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
  1524. nm.version = 1;
  1525. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1526. if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
  1527. nm.publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
  1528. nm.publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
  1529. } else if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
  1530. nm.publisherIdType = UA_PUBLISHERDATATYPE_STRING;
  1531. nm.publisherId.publisherIdString = connection->config->publisherId.string;
  1532. }
  1533. /* Compute the length of the dsm separately for the header */
  1534. UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
  1535. for(UA_Byte i = 0; i < dsmCount; i++)
  1536. dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
  1537. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1538. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1539. nm.groupHeader.writerGroupId = wg->config.writerGroupId;
  1540. nm.groupHeader.networkMessageNumber = 1;
  1541. nm.payload.dataSetPayload.sizes = dsmLengths;
  1542. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1543. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1544. UA_ByteString buf;
  1545. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
  1546. size_t stackSize = 1;
  1547. if(msgSize <= UA_MAX_STACKBUF)
  1548. stackSize = msgSize;
  1549. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1550. buf.data = stackBuf;
  1551. buf.length = msgSize;
  1552. UA_StatusCode retval;
  1553. if(msgSize > UA_MAX_STACKBUF) {
  1554. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1555. if(retval != UA_STATUSCODE_GOOD)
  1556. return retval;
  1557. }
  1558. /* Encode the message */
  1559. UA_Byte *bufPos = buf.data;
  1560. memset(bufPos, 0, msgSize);
  1561. const UA_Byte *bufEnd = &buf.data[buf.length];
  1562. retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
  1563. if(retval != UA_STATUSCODE_GOOD) {
  1564. if(msgSize > UA_MAX_STACKBUF)
  1565. UA_ByteString_clear(&buf);
  1566. return retval;
  1567. }
  1568. /* Send the prepared messages */
  1569. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1570. if(msgSize > UA_MAX_STACKBUF)
  1571. UA_ByteString_clear(&buf);
  1572. return retval;
  1573. }
  1574. /* This callback triggers the collection and publish of NetworkMessages and the
  1575. * contained DataSetMessages. */
  1576. void
  1577. UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1578. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Publish Callback");
  1579. if(!writerGroup) {
  1580. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1581. "Publish failed. WriterGroup not found");
  1582. return;
  1583. }
  1584. /* Nothing to do? */
  1585. if(writerGroup->writersCount <= 0)
  1586. return;
  1587. /* Binary or Json encoding? */
  1588. if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
  1589. writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
  1590. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1591. "Publish failed: Unknown encoding type.");
  1592. return;
  1593. }
  1594. /* Find the connection associated with the writer */
  1595. UA_PubSubConnection *connection =
  1596. UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
  1597. if(!connection) {
  1598. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1599. "Publish failed. PubSubConnection invalid.");
  1600. return;
  1601. }
  1602. /* How many DSM can be sent in one NM? */
  1603. UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
  1604. if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
  1605. maxDSM = UA_BYTE_MAX;
  1606. /* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
  1607. if(maxDSM == 0)
  1608. maxDSM = 1;
  1609. /* It is possible to put several DataSetMessages into one NetworkMessage.
  1610. * But only if they do not contain promoted fields. NM with only DSM are
  1611. * sent out right away. The others are kept in a buffer for "batching". */
  1612. size_t dsmCount = 0;
  1613. UA_DataSetWriter *dsw;
  1614. UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
  1615. UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
  1616. LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
  1617. /* Find the dataset */
  1618. UA_PublishedDataSet *pds =
  1619. UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
  1620. if(!pds) {
  1621. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1622. "PubSub Publish: PublishedDataSet not found");
  1623. continue;
  1624. }
  1625. /* Generate the DSM */
  1626. UA_StatusCode res =
  1627. UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
  1628. if(res != UA_STATUSCODE_GOOD) {
  1629. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1630. "PubSub Publish: DataSetMessage creation failed");
  1631. continue;
  1632. }
  1633. /* Send right away if there is only this DSM in a NM. If promoted fields
  1634. * are contained in the PublishedDataSet, then this DSM must go into a
  1635. * dedicated NM as well. */
  1636. if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
  1637. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1638. res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
  1639. &dsw->config.dataSetWriterId, 1,
  1640. &writerGroup->config.messageSettings,
  1641. &writerGroup->config.transportSettings);
  1642. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1643. res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
  1644. &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
  1645. }
  1646. if(res != UA_STATUSCODE_GOOD)
  1647. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1648. "PubSub Publish: Could not send a NetworkMessage");
  1649. UA_DataSetMessage_free(&dsmStore[dsmCount]);
  1650. continue;
  1651. }
  1652. dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
  1653. dsmCount++;
  1654. }
  1655. /* Send the NetworkMessages with batched DataSetMessages */
  1656. size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
  1657. for(UA_UInt32 i = 0; i < nmCount; i++) {
  1658. UA_Byte nmDsmCount = maxDSM;
  1659. if(i == nmCount - 1 && (dsmCount % maxDSM))
  1660. nmDsmCount = (UA_Byte)dsmCount % maxDSM;
  1661. UA_StatusCode res3 = UA_STATUSCODE_GOOD;
  1662. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1663. res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
  1664. &dsWriterIds[i * maxDSM], nmDsmCount,
  1665. &writerGroup->config.messageSettings,
  1666. &writerGroup->config.transportSettings);
  1667. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1668. res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
  1669. &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
  1670. }
  1671. if(res3 != UA_STATUSCODE_GOOD)
  1672. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1673. "PubSub Publish: Sending a NetworkMessage failed");
  1674. }
  1675. /* Clean up DSM */
  1676. for(size_t i = 0; i < dsmCount; i++)
  1677. UA_DataSetMessage_free(&dsmStore[i]);
  1678. }
  1679. /* Add new publishCallback. The first execution is triggered directly after
  1680. * creation. */
  1681. UA_StatusCode
  1682. UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1683. UA_StatusCode retval =
  1684. UA_PubSubManager_addRepeatedCallback(server,
  1685. (UA_ServerCallback) UA_WriterGroup_publishCallback,
  1686. writerGroup, writerGroup->config.publishingInterval,
  1687. &writerGroup->publishCallbackId);
  1688. if(retval == UA_STATUSCODE_GOOD)
  1689. writerGroup->publishCallbackIsRegistered = true;
  1690. /* Run once after creation */
  1691. UA_WriterGroup_publishCallback(server, writerGroup);
  1692. return retval;
  1693. }
  1694. #endif /* UA_ENABLE_PUBSUB */