ua_pubsub.c 105 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. * Copyright (c) 2019 Kalycito Infotech Private Limited
  8. */
  9. #include <open62541/server_config_default.h>
  10. #include "server/ua_server_internal.h"
  11. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  12. #include "ua_pubsub.h"
  13. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  14. #include "ua_pubsub_ns0.h"
  15. #endif
  16. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  17. #include "ua_types_encoding_binary.h"
  18. #endif
  19. #define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */
  20. #define UA_MAX_SIZENAME 64 /* Max size of Qualified Name of Subscribed Variable */
  21. /* Forward declaration */
  22. static size_t
  23. iterateVariableAttrDimension(UA_VariableAttributes varAttr);
  24. static void
  25. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup);
  26. static void
  27. UA_DataSetField_deleteMembers(UA_DataSetField *field);
  28. /**********************************************/
  29. /* Connection */
  30. /**********************************************/
  31. UA_StatusCode
  32. UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
  33. UA_PubSubConnectionConfig *dst) {
  34. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  35. memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
  36. retVal |= UA_String_copy(&src->name, &dst->name);
  37. retVal |= UA_Variant_copy(&src->address, &dst->address);
  38. retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
  39. retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
  40. if(src->connectionPropertiesSize > 0){
  41. dst->connectionProperties = (UA_KeyValuePair *)
  42. UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
  43. if(!dst->connectionProperties){
  44. return UA_STATUSCODE_BADOUTOFMEMORY;
  45. }
  46. for(size_t i = 0; i < src->connectionPropertiesSize; i++){
  47. retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
  48. &dst->connectionProperties[i].key);
  49. retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
  50. &dst->connectionProperties[i].value);
  51. }
  52. }
  53. return retVal;
  54. }
  55. UA_StatusCode
  56. UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
  57. UA_PubSubConnectionConfig *config) {
  58. if(!config)
  59. return UA_STATUSCODE_BADINVALIDARGUMENT;
  60. UA_PubSubConnection *currentPubSubConnection =
  61. UA_PubSubConnection_findConnectionbyId(server, connection);
  62. if(!currentPubSubConnection)
  63. return UA_STATUSCODE_BADNOTFOUND;
  64. UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
  65. //deep copy of the actual config
  66. UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
  67. *config = tmpPubSubConnectionConfig;
  68. return UA_STATUSCODE_GOOD;
  69. }
  70. UA_PubSubConnection *
  71. UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
  72. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  73. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
  74. return &server->pubSubManager.connections[i];
  75. }
  76. }
  77. return NULL;
  78. }
  79. void
  80. UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig) {
  81. UA_String_deleteMembers(&connectionConfig->name);
  82. UA_String_deleteMembers(&connectionConfig->transportProfileUri);
  83. UA_Variant_deleteMembers(&connectionConfig->connectionTransportSettings);
  84. UA_Variant_deleteMembers(&connectionConfig->address);
  85. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  86. UA_QualifiedName_deleteMembers(&connectionConfig->connectionProperties[i].key);
  87. UA_Variant_deleteMembers(&connectionConfig->connectionProperties[i].value);
  88. }
  89. UA_free(connectionConfig->connectionProperties);
  90. }
  91. void
  92. UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection) {
  93. //delete connection config
  94. UA_PubSubConnectionConfig_deleteMembers(connection->config);
  95. //remove contained WriterGroups
  96. UA_WriterGroup *writerGroup, *tmpWriterGroup;
  97. LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
  98. UA_Server_removeWriterGroup(server, writerGroup->identifier);
  99. }
  100. /* remove contained ReaderGroups */
  101. UA_ReaderGroup *readerGroups, *tmpReaderGroup;
  102. LIST_FOREACH_SAFE(readerGroups, &connection->readerGroups, listEntry, tmpReaderGroup){
  103. UA_Server_removeReaderGroup(server, readerGroups->identifier);
  104. }
  105. UA_NodeId_deleteMembers(&connection->identifier);
  106. if(connection->channel){
  107. connection->channel->close(connection->channel);
  108. }
  109. UA_free(connection->config);
  110. }
  111. UA_StatusCode
  112. UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
  113. const UA_WriterGroupConfig *writerGroupConfig,
  114. UA_NodeId *writerGroupIdentifier) {
  115. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  116. if(!writerGroupConfig)
  117. return UA_STATUSCODE_BADINVALIDARGUMENT;
  118. //search the connection by the given connectionIdentifier
  119. UA_PubSubConnection *currentConnectionContext =
  120. UA_PubSubConnection_findConnectionbyId(server, connection);
  121. if(!currentConnectionContext)
  122. return UA_STATUSCODE_BADNOTFOUND;
  123. //allocate memory for new WriterGroup
  124. UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
  125. if(!newWriterGroup)
  126. return UA_STATUSCODE_BADOUTOFMEMORY;
  127. newWriterGroup->linkedConnection = currentConnectionContext->identifier;
  128. UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
  129. if(writerGroupIdentifier){
  130. UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
  131. }
  132. //deep copy of the config
  133. UA_WriterGroupConfig tmpWriterGroupConfig;
  134. retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
  135. if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
  136. UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
  137. tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
  138. tmpWriterGroupConfig.messageSettings.content.decoded.type =
  139. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
  140. tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
  141. }
  142. newWriterGroup->config = tmpWriterGroupConfig;
  143. retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
  144. LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
  145. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  146. addWriterGroupRepresentation(server, newWriterGroup);
  147. #endif
  148. return retVal;
  149. }
  150. UA_StatusCode
  151. UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
  152. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  153. if(!wg)
  154. return UA_STATUSCODE_BADNOTFOUND;
  155. UA_PubSubConnection *connection =
  156. UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  157. if(!connection)
  158. return UA_STATUSCODE_BADNOTFOUND;
  159. //unregister the publish callback
  160. UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
  161. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  162. removeGroupRepresentation(server, wg);
  163. #endif
  164. UA_WriterGroup_deleteMembers(server, wg);
  165. LIST_REMOVE(wg, listEntry);
  166. UA_free(wg);
  167. return UA_STATUSCODE_GOOD;
  168. }
  169. /**********************************************/
  170. /* ReaderGroup */
  171. /**********************************************/
  172. /**
  173. * Add ReaderGroup to connection.
  174. *
  175. * @param server
  176. * @param connectionIdentifier
  177. * @param readerGroupConfiguration
  178. * @param readerGroupIdentifier
  179. * @return UA_STATUSCODE_GOOD on success
  180. */
  181. UA_StatusCode
  182. UA_Server_addReaderGroup(UA_Server *server, UA_NodeId connectionIdentifier,
  183. const UA_ReaderGroupConfig *readerGroupConfig,
  184. UA_NodeId *readerGroupIdentifier) {
  185. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  186. UA_ReaderGroupConfig tmpReaderGroupConfig;
  187. /* Search the connection by the given connectionIdentifier */
  188. if(!readerGroupConfig) {
  189. return UA_STATUSCODE_BADINVALIDARGUMENT;
  190. }
  191. /* Search the connection by the given connectionIdentifier */
  192. UA_PubSubConnection *currentConnectionContext = UA_PubSubConnection_findConnectionbyId(server, connectionIdentifier);
  193. if(!currentConnectionContext) {
  194. return UA_STATUSCODE_BADNOTFOUND;
  195. }
  196. /* Allocate memory for new reader group */
  197. UA_ReaderGroup *newGroup = (UA_ReaderGroup *)UA_calloc(1, sizeof(UA_ReaderGroup));
  198. if(!newGroup) {
  199. return UA_STATUSCODE_BADOUTOFMEMORY;
  200. }
  201. /* Generate nodeid for the readergroup identifier */
  202. newGroup->linkedConnection = currentConnectionContext->identifier;
  203. UA_PubSubManager_generateUniqueNodeId(server, &newGroup->identifier);
  204. if(readerGroupIdentifier) {
  205. UA_NodeId_copy(&newGroup->identifier, readerGroupIdentifier);
  206. }
  207. /* Deep copy of the config */
  208. retval |= UA_ReaderGroupConfig_copy(readerGroupConfig, &tmpReaderGroupConfig);
  209. newGroup->config = tmpReaderGroupConfig;
  210. retval |= UA_ReaderGroup_addSubscribeCallback(server, newGroup);
  211. LIST_INSERT_HEAD(&currentConnectionContext->readerGroups, newGroup, listEntry);
  212. currentConnectionContext->readerGroupsSize++;
  213. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  214. addReaderGroupRepresentation(server, newGroup);
  215. #endif
  216. return retval;
  217. }
  218. /**
  219. * Remove ReaderGroup from connection and delete contained readers.
  220. *
  221. * @param server
  222. * @param groupIdentifier
  223. * @return UA_STATUSCODE_GOOD on success
  224. */
  225. UA_StatusCode
  226. UA_Server_removeReaderGroup(UA_Server *server, UA_NodeId groupIdentifier) {
  227. UA_ReaderGroup* readerGroup = UA_ReaderGroup_findRGbyId(server, groupIdentifier);
  228. if(readerGroup == NULL) {
  229. return UA_STATUSCODE_BADNOTFOUND;
  230. }
  231. /* Search the connection to which the given readergroup is connected to */
  232. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  233. if(connection == NULL) {
  234. return UA_STATUSCODE_BADNOTFOUND;
  235. }
  236. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  237. /* To Do:RemoveGroupRepresentation(server, &readerGroup->identifier) */
  238. #endif
  239. /* UA_Server_ReaderGroup_delete also removes itself from the list */
  240. UA_Server_ReaderGroup_delete(server, readerGroup);
  241. /* Remove readerGroup from Connection */
  242. LIST_REMOVE(readerGroup, listEntry);
  243. UA_free(readerGroup);
  244. return UA_STATUSCODE_GOOD;
  245. }
  246. /**
  247. * To Do:
  248. * Update ReaderGroup configuration.
  249. *
  250. * @param server
  251. * @param readerGroupIdentifier
  252. * @param readerGroupConfiguration
  253. * @return UA_STATUSCODE_GOOD on success
  254. */
  255. UA_StatusCode
  256. UA_Server_ReaderGroup_updateConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  257. const UA_ReaderGroupConfig *config) {
  258. return UA_STATUSCODE_BADNOTIMPLEMENTED;
  259. }
  260. /**
  261. * Get ReaderGroup configuration.
  262. *
  263. * @param server
  264. * @param groupIdentifier
  265. * @param readerGroupConfiguration
  266. * @return UA_STATUSCODE_GOOD on success
  267. */
  268. UA_StatusCode
  269. UA_Server_ReaderGroup_getConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  270. UA_ReaderGroupConfig *config) {
  271. if(!config) {
  272. return UA_STATUSCODE_BADINVALIDARGUMENT;
  273. }
  274. /* Identify the readergroup through the readerGroupIdentifier */
  275. UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  276. if(!currentReaderGroup) {
  277. return UA_STATUSCODE_BADNOTFOUND;
  278. }
  279. UA_ReaderGroupConfig tmpReaderGroupConfig;
  280. /* deep copy of the actual config */
  281. UA_ReaderGroupConfig_copy(&currentReaderGroup->config, &tmpReaderGroupConfig);
  282. *config = tmpReaderGroupConfig;
  283. return UA_STATUSCODE_GOOD;
  284. }
  285. /* To Do UA_ReaderGroupConfig delete */
  286. /**
  287. * Delete ReaderGroup.
  288. *
  289. * @param server
  290. * @param groupIdentifier
  291. */
  292. void UA_Server_ReaderGroup_delete(UA_Server* server, UA_ReaderGroup *readerGroup) {
  293. /* To Do Call UA_ReaderGroupConfig_delete */
  294. UA_DataSetReader *dataSetReader, *tmpDataSetReader;
  295. LIST_FOREACH_SAFE(dataSetReader, &readerGroup->readers, listEntry, tmpDataSetReader) {
  296. UA_DataSetReader_delete(server, dataSetReader);
  297. }
  298. UA_PubSubConnection* pConn = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  299. if(pConn != NULL) {
  300. pConn->readerGroupsSize--;
  301. }
  302. /* Delete ReaderGroup and its members */
  303. UA_String_deleteMembers(&readerGroup->config.name);
  304. UA_NodeId_deleteMembers(&readerGroup->linkedConnection);
  305. UA_NodeId_deleteMembers(&readerGroup->identifier);
  306. }
  307. /**
  308. * Copy ReaderGroup configuration.
  309. *
  310. * @param source
  311. * @param destination
  312. * @return UA_STATUSCODE_GOOD on success
  313. */
  314. UA_StatusCode
  315. UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,
  316. UA_ReaderGroupConfig *dst) {
  317. UA_String_copy(&src->name, &dst->name);
  318. /* Currently simple memcpy only */
  319. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  320. return UA_STATUSCODE_GOOD;
  321. }
  322. /**
  323. * Process NetworkMessage.
  324. *
  325. * @param server
  326. * @param networkmessage
  327. * @return UA_STATUSCODE_GOOD on success
  328. */
  329. UA_StatusCode
  330. UA_Server_processNetworkMessage(UA_Server *server, UA_NetworkMessage *pMsg,
  331. UA_PubSubConnection *pConnection) {
  332. UA_StatusCode retval = UA_STATUSCODE_BADNOTIMPLEMENTED;
  333. if((pMsg == NULL) || (pConnection == NULL)) {
  334. return UA_STATUSCODE_BADINVALIDARGUMENT;
  335. }
  336. UA_Boolean publisherIdAvailable = false;
  337. if(pMsg->publisherIdEnabled) {
  338. publisherIdAvailable = true;
  339. }
  340. /* To Do The condition with dataSetWriterIdAvailable and WriterGroupIdAvailable to be handled
  341. * when pMsg->groupHeaderEnabled, pMsg->dataSetClassIdEnabled, pMsg->payloadHeaderEnabled
  342. * Here some filtering is possible */
  343. UA_Byte anzDataSets = 1;
  344. if(pMsg->payloadHeaderEnabled) {
  345. anzDataSets = pMsg->payloadHeader.dataSetPayloadHeader.count;
  346. }
  347. UA_DataSetReader* dataSetReaderErg = NULL;
  348. if(pConnection->readerGroupsSize == 1) {
  349. if(LIST_FIRST(&pConnection->readerGroups)->readersCount == 1) {
  350. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "only 1 DataSetReader available. This one will be used.");
  351. dataSetReaderErg = LIST_FIRST(&LIST_FIRST(&pConnection->readerGroups)->readers);
  352. }
  353. }
  354. else {
  355. UA_DataSetReader *tmpReader;
  356. if(publisherIdAvailable) {
  357. UA_ReaderGroup* readerGroup = NULL;
  358. LIST_FOREACH(readerGroup, &pConnection->readerGroups, listEntry) {
  359. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  360. if(publisherIdAvailable && (tmpReader->config.publisherId.type != NULL)) {
  361. switch (pMsg->publisherIdType) {
  362. case UA_PUBLISHERDATATYPE_BYTE:
  363. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_BYTE] &&
  364. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_BYTE &&
  365. pMsg->publisherId.publisherIdByte == *(UA_Byte*)tmpReader->config.publisherId.data) {
  366. dataSetReaderErg = tmpReader;
  367. }
  368. break;
  369. case UA_PUBLISHERDATATYPE_UINT16:
  370. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT16] &&
  371. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT16 &&
  372. pMsg->publisherId.publisherIdUInt16 == *(UA_UInt16*)tmpReader->config.publisherId.data) {
  373. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetReader found with PublisherId");
  374. dataSetReaderErg = tmpReader;
  375. }
  376. break;
  377. case UA_PUBLISHERDATATYPE_UINT32:
  378. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT32] &&
  379. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT32 &&
  380. pMsg->publisherId.publisherIdUInt32 == *(UA_UInt32*)tmpReader->config.publisherId.data) {
  381. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetReader found with PublisherId");
  382. dataSetReaderErg = tmpReader;
  383. }
  384. break;
  385. case UA_PUBLISHERDATATYPE_UINT64:
  386. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT64] &&
  387. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT64 &&
  388. pMsg->publisherId.publisherIdUInt64 == *(UA_UInt64*)tmpReader->config.publisherId.data) {
  389. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetReader found with PublisherId");
  390. dataSetReaderErg = tmpReader;
  391. }
  392. break;
  393. case UA_PUBLISHERDATATYPE_STRING:
  394. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_STRING] &&
  395. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_STRING &&
  396. UA_String_equal(&pMsg->publisherId.publisherIdString, (UA_String*)tmpReader->config.publisherId.data)) {
  397. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetReader found with PublisherId");
  398. dataSetReaderErg = tmpReader;
  399. }
  400. break;
  401. }
  402. }
  403. /* to break out of LIST_FOREACH loop */
  404. if(dataSetReaderErg!= NULL) {
  405. break;
  406. }
  407. }
  408. }
  409. }
  410. }
  411. for(UA_Byte iterator = 0; iterator < anzDataSets; iterator++) {
  412. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Process Msg with DataSetReader!");
  413. UA_Server_DataSetReader_process(server, dataSetReaderErg, &pMsg->payload.dataSetPayload.dataSetMessages[iterator]);
  414. }
  415. /* To Do the condition with dataSetWriterId and WriterGroupId
  416. * else condition for dataSetWriterIdAvailable and writerGroupIdAvailable) */
  417. return retval;
  418. }
  419. /**
  420. * Find size iterator - array dimension for VariableAttributes
  421. *
  422. * @param VariableAttributes
  423. * @return global variable size iterator
  424. */
  425. static size_t
  426. iterateVariableAttrDimension (UA_VariableAttributes varAttr) {
  427. size_t sizeIterator = 0;
  428. for(size_t indexIterator = 0; indexIterator < varAttr.arrayDimensionsSize; indexIterator++) {
  429. sizeIterator += varAttr.arrayDimensions[indexIterator];
  430. }
  431. return sizeIterator;
  432. }
  433. /**
  434. * Find ReaderGroup with its identifier.
  435. *
  436. * @param server
  437. * @param groupIdentifier
  438. * @return the ReaderGroup or NULL if not found
  439. */
  440. UA_ReaderGroup * UA_ReaderGroup_findRGbyId(UA_Server *server, UA_NodeId identifier) {
  441. for (size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
  442. UA_ReaderGroup* readerGroup = NULL;
  443. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
  444. if(UA_NodeId_equal(&identifier, &readerGroup->identifier)) {
  445. return readerGroup;
  446. }
  447. }
  448. }
  449. return NULL;
  450. }
  451. /**
  452. * Find a DataSetReader with its identifier
  453. *
  454. * @param server
  455. * @param identifier
  456. * @return the DataSetReader or NULL if not found
  457. */
  458. UA_DataSetReader *UA_ReaderGroup_findDSRbyId(UA_Server *server, UA_NodeId identifier) {
  459. for (size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
  460. UA_ReaderGroup* readerGroup = NULL;
  461. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
  462. UA_DataSetReader *tmpReader;
  463. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  464. if(UA_NodeId_equal(&tmpReader->identifier, &identifier)) {
  465. return tmpReader;
  466. }
  467. }
  468. }
  469. }
  470. return NULL;
  471. }
  472. /**********************************************/
  473. /* DataSetReader */
  474. /**********************************************/
  475. /**
  476. * Add a DataSetReader to ReaderGroup
  477. *
  478. * @param server
  479. * @param readerGroupIdentifier
  480. * @param dataSetReaderConfig
  481. * @param readerIdentifier
  482. * @return UA_STATUSCODE_GOOD on success
  483. */
  484. UA_StatusCode
  485. UA_Server_addDataSetReader(UA_Server *server, UA_NodeId readerGroupIdentifier,
  486. const UA_DataSetReaderConfig *dataSetReaderConfig,
  487. UA_NodeId *readerIdentifier) {
  488. /* Search the reader group by the given readerGroupIdentifier */
  489. UA_ReaderGroup *readerGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  490. if(!dataSetReaderConfig) {
  491. return UA_STATUSCODE_BADNOTFOUND;
  492. }
  493. if(readerGroup == NULL) {
  494. return UA_STATUSCODE_BADNOTFOUND;
  495. }
  496. /* Allocate memory for new DataSetReader */
  497. UA_DataSetReader *newDataSetReader = (UA_DataSetReader *)UA_calloc(1, sizeof(UA_DataSetReader));
  498. /* Copy the config into the new dataSetReader */
  499. UA_DataSetReaderConfig_copy(dataSetReaderConfig, &newDataSetReader->config);
  500. newDataSetReader->linkedReaderGroup = readerGroup->identifier;
  501. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetReader->identifier);
  502. if(readerIdentifier != NULL) {
  503. UA_NodeId_copy(&newDataSetReader->identifier, readerIdentifier);
  504. }
  505. /* Add the new reader to the group */
  506. LIST_INSERT_HEAD(&readerGroup->readers, newDataSetReader, listEntry);
  507. readerGroup->readersCount++;
  508. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  509. addDataSetReaderRepresentation(server, newDataSetReader);
  510. #endif
  511. return UA_STATUSCODE_GOOD;
  512. }
  513. /**
  514. * Remove a DataSetReader from ReaderGroup
  515. *
  516. * @param server
  517. * @param readerGroupIdentifier
  518. * @return UA_STATUSCODE_GOOD on success
  519. */
  520. UA_StatusCode
  521. UA_Server_removeDataSetReader(UA_Server *server, UA_NodeId readerIdentifier) {
  522. /* Remove datasetreader given by the identifier */
  523. UA_DataSetReader *dataSetReader = UA_ReaderGroup_findDSRbyId(server, readerIdentifier);
  524. if(!dataSetReader) {
  525. return UA_STATUSCODE_BADNOTFOUND;
  526. }
  527. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  528. removeDataSetReaderRepresentation(server, dataSetReader);
  529. #endif
  530. UA_DataSetReader_delete(server, dataSetReader);
  531. return UA_STATUSCODE_GOOD;
  532. }
  533. /**
  534. * Update the config of the DataSetReader.
  535. *
  536. * @param server
  537. * @param dataSetReaderIdentifier
  538. * @param readerGroupIdentifier
  539. * @param config
  540. * @return UA_STATUSCODE_GOOD on success
  541. */
  542. UA_StatusCode
  543. UA_Server_DataSetReader_updateConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier, UA_NodeId readerGroupIdentifier,
  544. const UA_DataSetReaderConfig *config) {
  545. if(config == NULL) {
  546. return UA_STATUSCODE_BADINVALIDARGUMENT;
  547. }
  548. UA_DataSetReader *currentDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  549. UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  550. if(!currentDataSetReader) {
  551. return UA_STATUSCODE_BADNOTFOUND;
  552. }
  553. /* The update functionality will be extended during the next PubSub batches.
  554. * Currently is only a change of the publishing interval possible. */
  555. if(currentDataSetReader->config.writerGroupId != config->writerGroupId) {
  556. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentReaderGroup->subscribeCallbackId);
  557. currentDataSetReader->config.writerGroupId = config->writerGroupId;
  558. UA_ReaderGroup_subscribeCallback(server, currentReaderGroup);
  559. }
  560. else {
  561. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  562. "No or unsupported ReaderGroup update.");
  563. }
  564. return UA_STATUSCODE_GOOD;
  565. }
  566. /**
  567. * Get the current config of the UA_DataSetReader.
  568. *
  569. * @param server
  570. * @param dataSetReaderIdentifier
  571. * @param config
  572. * @return UA_STATUSCODE_GOOD on success
  573. */
  574. UA_StatusCode
  575. UA_Server_DataSetReader_getConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier,
  576. UA_DataSetReaderConfig *config) {
  577. if(!config) {
  578. return UA_STATUSCODE_BADINVALIDARGUMENT;
  579. }
  580. UA_DataSetReader *currentDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  581. if(!currentDataSetReader) {
  582. return UA_STATUSCODE_BADNOTFOUND;
  583. }
  584. UA_DataSetReaderConfig tmpReaderConfig;
  585. /* Deep copy of the actual config */
  586. UA_DataSetReaderConfig_copy(&currentDataSetReader->config, &tmpReaderConfig);
  587. *config = tmpReaderConfig;
  588. return UA_STATUSCODE_GOOD;
  589. }
  590. /**
  591. * This Method is used to initially set the SubscribedDataSet to TargetVariablesType and to create the list of target Variables of a SubscribedDataSetType.
  592. *
  593. * @param server
  594. * @param dataSetReaderIdentifier
  595. * @param targetVariables
  596. * @return UA_STATUSCODE_GOOD on success
  597. */
  598. UA_StatusCode
  599. UA_Server_DataSetReader_createTargetVariables(UA_Server *server, UA_NodeId dataSetReaderIdentifier, UA_TargetVariablesDataType *targetVariables) {
  600. UA_StatusCode retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
  601. UA_DataSetReader* pDS = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  602. if(pDS == NULL) {
  603. return UA_STATUSCODE_BADINVALIDARGUMENT;
  604. }
  605. if(pDS->subscribedDataSetTarget.targetVariablesSize > 0) {
  606. UA_TargetVariablesDataType_deleteMembers(&pDS->subscribedDataSetTarget);
  607. pDS->subscribedDataSetTarget.targetVariablesSize = 0;
  608. pDS->subscribedDataSetTarget.targetVariables = NULL;
  609. }
  610. /* Set subscribed dataset to TargetVariableType */
  611. pDS->subscribedDataSetType = UA_PUBSUB_SDS_TARGET;
  612. retval = UA_TargetVariablesDataType_copy(targetVariables, &pDS->subscribedDataSetTarget);
  613. return retval;
  614. }
  615. /**
  616. * Adds Subscribed Variables from the DataSetMetaData for the given DataSet into the given parent node
  617. * and creates the corresponding data in the targetVariables of the DataSetReader
  618. *
  619. * @param server
  620. * @param parentNode
  621. * @param dataSetReaderIdentifier
  622. * @return UA_STATUSCODE_GOOD on success
  623. */
  624. UA_StatusCode UA_Server_DataSetReader_addTargetVariables(UA_Server *server, UA_NodeId *parentNode, UA_NodeId dataSetReaderIdentifier, UA_SubscribedDataSetEnumType sdsType) {
  625. if((server == NULL) || (parentNode == NULL)) {
  626. return UA_STATUSCODE_BADINVALIDARGUMENT;
  627. }
  628. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  629. UA_DataSetReader* pDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  630. if(pDataSetReader == NULL) {
  631. return UA_STATUSCODE_BADINVALIDARGUMENT;
  632. }
  633. /* To Do Declare for all other type */
  634. UA_DateTime dateTimeTypeVal = 0;
  635. UA_TargetVariablesDataType targetVars;
  636. targetVars.targetVariablesSize = pDataSetReader->config.dataSetMetaData.fieldsSize;
  637. targetVars.targetVariables = (UA_FieldTargetDataType *)UA_calloc(targetVars.targetVariablesSize, sizeof(UA_FieldTargetDataType));
  638. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  639. for (size_t iteratorField = 0; iteratorField < pDataSetReader->config.dataSetMetaData.fieldsSize; iteratorField++) {
  640. UA_VariableAttributes vAttr = UA_VariableAttributes_default;
  641. vAttr.valueRank = pDataSetReader->config.dataSetMetaData.fields[iteratorField].valueRank;
  642. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize > 0) {
  643. retVal = UA_Array_copy(pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensions, pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize, (void**)&vAttr.arrayDimensions, &UA_TYPES[UA_TYPES_UINT32]);
  644. if(retVal == UA_STATUSCODE_GOOD) {
  645. vAttr.arrayDimensionsSize = pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize;
  646. }
  647. }
  648. switch (pDataSetReader->config.dataSetMetaData.fields[iteratorField].builtInType) {
  649. case UA_NS0ID_BOOLEAN:
  650. if(vAttr.arrayDimensionsSize > 0) {
  651. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  652. UA_Boolean *boolArray = (UA_Boolean*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_BOOLEAN]);
  653. UA_Variant_setArrayCopy(&vAttr.value, boolArray, sizeIterator, &UA_TYPES[UA_TYPES_BOOLEAN]);
  654. UA_Array_delete(boolArray, sizeIterator, &UA_TYPES[UA_TYPES_BOOLEAN]);
  655. }
  656. else {
  657. UA_Boolean boolTypeVal = false;
  658. UA_Variant_setScalar(&vAttr.value, &boolTypeVal, &UA_TYPES[UA_TYPES_BOOLEAN]);
  659. }
  660. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  661. break;
  662. case UA_NS0ID_SBYTE:
  663. if(vAttr.arrayDimensionsSize > 0) {
  664. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  665. UA_SByte *sbyteArray = (UA_SByte*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_SBYTE]);
  666. UA_Variant_setArrayCopy(&vAttr.value, sbyteArray, sizeIterator, &UA_TYPES[UA_TYPES_SBYTE]);
  667. UA_Array_delete(sbyteArray, sizeIterator, &UA_TYPES[UA_TYPES_SBYTE]);
  668. }
  669. else {
  670. UA_SByte sbyteTypeVal = 0;
  671. UA_Variant_setScalar(&vAttr.value, &sbyteTypeVal, &UA_TYPES[UA_TYPES_SBYTE]);
  672. }
  673. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  674. break;
  675. case UA_NS0ID_BYTE:
  676. if(vAttr.arrayDimensionsSize > 0) {
  677. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  678. UA_Byte *byteArray = (UA_Byte*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_BYTE]);
  679. UA_Variant_setArrayCopy(&vAttr.value, byteArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTE]);
  680. UA_Array_delete(byteArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTE]);
  681. }
  682. else {
  683. UA_Byte byteTypeVal = 0;
  684. UA_Variant_setScalar(&vAttr.value, &byteTypeVal, &UA_TYPES[UA_TYPES_BYTE]);
  685. }
  686. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  687. break;
  688. case UA_NS0ID_INT16:
  689. if(vAttr.arrayDimensionsSize > 0) {
  690. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  691. UA_Int16 *int16Array = (UA_Int16*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_INT16]);
  692. UA_Variant_setArrayCopy(&vAttr.value, int16Array, sizeIterator, &UA_TYPES[UA_TYPES_INT16]);
  693. UA_Array_delete(int16Array, sizeIterator, &UA_TYPES[UA_TYPES_INT16]);
  694. }
  695. else {
  696. UA_Int16 int16TypeVal = 0;
  697. UA_Variant_setScalar(&vAttr.value, &int16TypeVal, &UA_TYPES[UA_TYPES_INT16]);
  698. }
  699. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  700. break;
  701. case UA_NS0ID_UINT16:
  702. if(vAttr.arrayDimensionsSize > 0) {
  703. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  704. UA_UInt16 *uint16Array = (UA_UInt16*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_UINT16]);
  705. UA_Variant_setArrayCopy(&vAttr.value, uint16Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT16]);
  706. UA_Array_delete(uint16Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT16]);
  707. }
  708. else {
  709. UA_UInt16 uint16TypeVal = 0;
  710. UA_Variant_setScalar(&vAttr.value, &uint16TypeVal, &UA_TYPES[UA_TYPES_UINT16]);
  711. }
  712. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  713. break;
  714. case UA_NS0ID_INT32:
  715. if(vAttr.arrayDimensionsSize > 0) {
  716. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  717. UA_Int32 *int32Array = (UA_Int32*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_INT32]);
  718. UA_Variant_setArrayCopy(&vAttr.value, int32Array, sizeIterator, &UA_TYPES[UA_TYPES_INT32]);
  719. UA_Array_delete(int32Array, sizeIterator, &UA_TYPES[UA_TYPES_INT32]);
  720. }
  721. else {
  722. UA_Int32 int32TypeVal = 0;
  723. UA_Variant_setScalar(&vAttr.value, &int32TypeVal, &UA_TYPES[UA_TYPES_INT32]);
  724. }
  725. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  726. break;
  727. case UA_NS0ID_UINT32:
  728. if(vAttr.arrayDimensionsSize > 0) {
  729. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  730. UA_UInt32 *uint32Array = (UA_UInt32*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_UINT32]);
  731. UA_Variant_setArrayCopy(&vAttr.value, uint32Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT32]);
  732. UA_Array_delete(uint32Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT32]);
  733. }
  734. else {
  735. UA_UInt32 uint32TypeVal = 0;
  736. UA_Variant_setScalar(&vAttr.value, &uint32TypeVal, &UA_TYPES[UA_TYPES_UINT32]);
  737. }
  738. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  739. break;
  740. case UA_NS0ID_INT64:
  741. if(vAttr.arrayDimensionsSize > 0) {
  742. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  743. UA_Int64 *int64Array = (UA_Int64*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_INT64]);
  744. UA_Variant_setArrayCopy(&vAttr.value, int64Array, sizeIterator, &UA_TYPES[UA_TYPES_INT64]);
  745. UA_Array_delete(int64Array, sizeIterator, &UA_TYPES[UA_TYPES_INT64]);
  746. }
  747. else {
  748. UA_Int64 int64TypeVal = 0;
  749. UA_Variant_setScalar(&vAttr.value, &int64TypeVal, &UA_TYPES[UA_TYPES_INT64]);
  750. }
  751. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  752. break;
  753. case UA_NS0ID_UINT64:
  754. if(vAttr.arrayDimensionsSize > 0) {
  755. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  756. UA_UInt64 *uint64Array = (UA_UInt64*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_UINT64]);
  757. UA_Variant_setArrayCopy(&vAttr.value, uint64Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT64]);
  758. UA_Array_delete(uint64Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT64]);
  759. }
  760. else {
  761. UA_UInt64 uint64TypeVal = 0;
  762. UA_Variant_setScalar(&vAttr.value, &uint64TypeVal, &UA_TYPES[UA_TYPES_UINT64]);
  763. }
  764. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  765. break;
  766. case UA_NS0ID_FLOAT:
  767. if(vAttr.arrayDimensionsSize > 0) {
  768. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  769. UA_Float *floatArray = (UA_Float*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_FLOAT]);
  770. UA_Variant_setArrayCopy(&vAttr.value, floatArray, sizeIterator, &UA_TYPES[UA_TYPES_FLOAT]);
  771. UA_Array_delete(floatArray, sizeIterator, &UA_TYPES[UA_TYPES_FLOAT]);
  772. }
  773. else {
  774. UA_Float floatTypeVal = 0;
  775. UA_Variant_setScalar(&vAttr.value, &floatTypeVal, &UA_TYPES[UA_TYPES_FLOAT]);
  776. }
  777. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  778. break;
  779. case UA_NS0ID_DOUBLE:
  780. if(vAttr.arrayDimensionsSize > 0) {
  781. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  782. UA_Double *doubleArray = (UA_Double*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_DOUBLE]);
  783. UA_Variant_setArrayCopy(&vAttr.value, doubleArray, sizeIterator, &UA_TYPES[UA_TYPES_DOUBLE]);
  784. UA_Array_delete(doubleArray, sizeIterator, &UA_TYPES[UA_TYPES_DOUBLE]);
  785. }
  786. else {
  787. UA_Double doubleTypeVal = 0.0;
  788. UA_Variant_setScalar(&vAttr.value, &doubleTypeVal, &UA_TYPES[UA_TYPES_DOUBLE]);
  789. }
  790. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  791. break;
  792. case UA_NS0ID_STRING:
  793. if(vAttr.arrayDimensionsSize > 0) {
  794. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  795. UA_String *stringArray = (UA_String*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_STRING]);
  796. UA_Variant_setArrayCopy(&vAttr.value, stringArray, sizeIterator, &UA_TYPES[UA_TYPES_STRING]);
  797. UA_Array_delete(stringArray, sizeIterator, &UA_TYPES[UA_TYPES_STRING]);
  798. }
  799. else {
  800. UA_String stringTypeVal = UA_STRING_NULL;
  801. UA_Variant_setScalar(&vAttr.value, &stringTypeVal, &UA_TYPES[UA_TYPES_STRING]);
  802. }
  803. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  804. break;
  805. case UA_NS0ID_DATETIME:
  806. if(vAttr.arrayDimensionsSize > 0) {
  807. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  808. UA_DateTime *dateTimeArray = (UA_DateTime*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_DATETIME]);
  809. UA_Variant_setArrayCopy(&vAttr.value, dateTimeArray, sizeIterator, &UA_TYPES[UA_TYPES_DATETIME]);
  810. UA_Array_delete(dateTimeArray, sizeIterator, &UA_TYPES[UA_TYPES_DATETIME]);
  811. }
  812. else {
  813. dateTimeTypeVal = 0;
  814. UA_Variant_setScalar(&vAttr.value, &dateTimeTypeVal, &UA_TYPES[UA_TYPES_DATETIME]);
  815. }
  816. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  817. break;
  818. case UA_NS0ID_GUID:
  819. if(vAttr.arrayDimensionsSize > 0) {
  820. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  821. UA_Guid *guidArray = (UA_Guid*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_GUID]);
  822. UA_Variant_setArrayCopy(&vAttr.value, guidArray, sizeIterator, &UA_TYPES[UA_TYPES_GUID]);
  823. UA_Array_delete(guidArray, sizeIterator, &UA_TYPES[UA_TYPES_GUID]);
  824. }
  825. else {
  826. UA_Guid guidTypeVal = UA_GUID_NULL;
  827. UA_Variant_setScalar(&vAttr.value, &guidTypeVal, &UA_TYPES[UA_TYPES_GUID]);
  828. }
  829. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  830. break;
  831. case UA_NS0ID_NODEID:
  832. if(vAttr.arrayDimensionsSize > 0) {
  833. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  834. UA_NodeId *nodeidArray = (UA_NodeId*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_NODEID]);
  835. UA_Variant_setArrayCopy(&vAttr.value, nodeidArray, sizeIterator, &UA_TYPES[UA_TYPES_NODEID]);
  836. UA_Array_delete(nodeidArray, sizeIterator, &UA_TYPES[UA_TYPES_NODEID]);
  837. }
  838. else {
  839. UA_NodeId nodeidTypeVal = UA_NODEID_NULL;
  840. UA_Variant_setScalar(&vAttr.value, &nodeidTypeVal, &UA_TYPES[UA_TYPES_NODEID]);
  841. }
  842. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  843. break;
  844. case UA_NS0ID_BYTESTRING:
  845. if(vAttr.arrayDimensionsSize > 0) {
  846. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  847. UA_ByteString *byteStringArray = (UA_ByteString*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_BYTESTRING]);
  848. UA_Variant_setArrayCopy(&vAttr.value, byteStringArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTESTRING]);
  849. UA_Array_delete(byteStringArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTESTRING]);
  850. }
  851. else {
  852. UA_ByteString byteStringTypeVal = UA_BYTESTRING_NULL;
  853. UA_Variant_setScalar(&vAttr.value, &byteStringTypeVal, &UA_TYPES[UA_TYPES_BYTESTRING]);
  854. }
  855. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  856. break;
  857. default:
  858. /* Type not supported */
  859. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "Type %u not supported, create empty variable.", pDataSetReader->config.dataSetMetaData.fields[iteratorField].builtInType);
  860. break;
  861. }
  862. vAttr.accessLevel = UA_ACCESSLEVELMASK_READ;
  863. UA_LocalizedText_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].description, &vAttr.description);
  864. UA_QualifiedName qn;
  865. UA_QualifiedName_init(&qn);
  866. char szTmpName[UA_MAX_SIZENAME];
  867. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length > 0) {
  868. UA_UInt16 slen = UA_MAX_SIZENAME -1;
  869. vAttr.displayName.locale = UA_STRING("en-US");
  870. vAttr.displayName.text = pDataSetReader->config.dataSetMetaData.fields[iteratorField].name;
  871. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length < slen) {
  872. slen = (UA_UInt16)pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length;
  873. UA_snprintf(szTmpName, sizeof(szTmpName), "%s", (const char*)pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.data);
  874. }
  875. szTmpName[slen] = '\0';
  876. qn = UA_QUALIFIEDNAME(1, szTmpName);
  877. }
  878. else {
  879. strcpy(szTmpName, "SubscribedVariable");
  880. vAttr.displayName = UA_LOCALIZEDTEXT("en-US", szTmpName);
  881. qn = UA_QUALIFIEDNAME(1, "SubscribedVariable");
  882. }
  883. /* Add variable to the given parent node */
  884. UA_NodeId newNode;
  885. retVal = UA_Server_addVariableNode(server, UA_NODEID_NULL, *parentNode,
  886. UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), qn,
  887. UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), vAttr, NULL, &newNode);
  888. if(retVal == UA_STATUSCODE_GOOD) {
  889. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "addVariableNode %s succeeded", szTmpName);
  890. }
  891. else {
  892. retval = retVal;
  893. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_USERLAND, "addVariableNode: error 0x%x", retVal);
  894. }
  895. UA_FieldTargetDataType_init(&targetVars.targetVariables[iteratorField]);
  896. targetVars.targetVariables[iteratorField].attributeId = UA_ATTRIBUTEID_VALUE;
  897. UA_NodeId_copy(&newNode, &targetVars.targetVariables[iteratorField].targetNodeId);
  898. UA_NodeId_deleteMembers(&newNode);
  899. if(vAttr.arrayDimensionsSize > 0) {
  900. UA_Variant_deleteMembers(&vAttr.value);
  901. UA_Array_delete(vAttr.arrayDimensions, vAttr.arrayDimensionsSize, &UA_TYPES[UA_TYPES_UINT32]);
  902. }
  903. }
  904. if(sdsType == UA_PUBSUB_SDS_TARGET) {
  905. retval = UA_Server_DataSetReader_createTargetVariables(server, pDataSetReader->identifier, &targetVars);
  906. }
  907. UA_TargetVariablesDataType_deleteMembers(&targetVars);
  908. return retval;
  909. }
  910. /**
  911. * Process a NetworkMessage with a DataSetReader.
  912. *
  913. * @param server
  914. * @param dataSetReader
  915. * @param dataSetMsg
  916. */
  917. void UA_Server_DataSetReader_process(UA_Server *server, UA_DataSetReader *dataSetReader, UA_DataSetMessage* dataSetMsg) {
  918. if((dataSetReader == NULL) || (dataSetMsg == NULL) || (server == NULL)) {
  919. return;
  920. }
  921. if(!dataSetMsg->header.dataSetMessageValid) {
  922. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: message is not valid");
  923. /* To Do check ConfigurationVersion*/
  924. /*if(dataSetMsg->header.configVersionMajorVersionEnabled)
  925. * {
  926. * if(dataSetMsg->header.configVersionMajorVersion != dataSetReader->config.dataSetMetaData.configurationVersion.majorVersion)
  927. * {
  928. * UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: ConfigurationVersion MajorVersion does not match");
  929. * return;
  930. * }
  931. } */
  932. }
  933. else {
  934. if(dataSetMsg->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) {
  935. if(dataSetMsg->header.fieldEncoding != UA_FIELDENCODING_RAWDATA) {
  936. size_t anzFields = dataSetMsg->data.keyFrameData.fieldCount;
  937. if(dataSetReader->config.dataSetMetaData.fieldsSize < anzFields) {
  938. anzFields = dataSetReader->config.dataSetMetaData.fieldsSize;
  939. }
  940. if(dataSetReader->subscribedDataSetTarget.targetVariablesSize < anzFields) {
  941. anzFields = dataSetReader->subscribedDataSetTarget.targetVariablesSize;
  942. }
  943. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  944. for (UA_UInt16 iteratorField = 0; iteratorField < anzFields; iteratorField++) {
  945. if(dataSetMsg->data.keyFrameData.dataSetFields[iteratorField].hasValue) {
  946. if(dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].attributeId == UA_ATTRIBUTEID_VALUE) {
  947. retVal = UA_Server_writeValue(server, dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].targetNodeId, dataSetMsg->data.keyFrameData.dataSetFields[iteratorField].value);
  948. if(retVal != UA_STATUSCODE_GOOD) {
  949. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write Value KF %u: 0x%x", iteratorField, retVal);
  950. }
  951. }
  952. else {
  953. UA_WriteValue writeVal;
  954. UA_WriteValue_init(&writeVal);
  955. writeVal.attributeId = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].attributeId;
  956. writeVal.indexRange = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].receiverIndexRange;
  957. writeVal.nodeId = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].targetNodeId;
  958. UA_DataValue_copy(&dataSetMsg->data.keyFrameData.dataSetFields[iteratorField], &writeVal.value);
  959. retVal = UA_Server_write(server, &writeVal);
  960. if(retVal != UA_STATUSCODE_GOOD) {
  961. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write KF %u: 0x%x", iteratorField, retVal);
  962. }
  963. }
  964. }
  965. }
  966. }
  967. }
  968. }
  969. }
  970. /**
  971. * Copy the config of the DataSetReader.
  972. *
  973. * @param src
  974. * @param dst
  975. * @return UA_STATUSCODE_GOOD on success
  976. */
  977. UA_StatusCode
  978. UA_DataSetReaderConfig_copy(const UA_DataSetReaderConfig *src,
  979. UA_DataSetReaderConfig *dst) {
  980. memset(dst, 0, sizeof(UA_DataSetReaderConfig));
  981. UA_StatusCode retVal = UA_String_copy(&src->name, &dst->name);
  982. if(retVal != UA_STATUSCODE_GOOD) {
  983. return retVal;
  984. }
  985. retVal = UA_Variant_copy(&src->publisherId, &dst->publisherId);
  986. if(retVal != UA_STATUSCODE_GOOD) {
  987. return retVal;
  988. }
  989. dst->writerGroupId = src->writerGroupId;
  990. dst->dataSetWriterId = src->dataSetWriterId;
  991. retVal = UA_DataSetMetaDataType_copy(&src->dataSetMetaData, &dst->dataSetMetaData);
  992. if(retVal != UA_STATUSCODE_GOOD) {
  993. return retVal;
  994. }
  995. dst->dataSetFieldContentMask = src->dataSetFieldContentMask;
  996. dst->messageReceiveTimeout = src->messageReceiveTimeout;
  997. /* Currently memcpy is used to copy the securityParameters */
  998. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  999. retVal = UA_UadpDataSetReaderMessageDataType_copy(&src->messageSettings, &dst->messageSettings);
  1000. if(retVal != UA_STATUSCODE_GOOD) {
  1001. return retVal;
  1002. }
  1003. return UA_STATUSCODE_GOOD;
  1004. }
  1005. /**
  1006. * Delete the DataSetReader.
  1007. *
  1008. * @param server
  1009. * @param dataSetReader
  1010. */
  1011. void UA_DataSetReader_delete(UA_Server *server, UA_DataSetReader *dataSetReader) {
  1012. /* Delete DataSetReader config */
  1013. UA_String_deleteMembers(&dataSetReader->config.name);
  1014. UA_Variant_deleteMembers(&dataSetReader->config.publisherId);
  1015. UA_DataSetMetaDataType_deleteMembers(&dataSetReader->config.dataSetMetaData);
  1016. UA_UadpDataSetReaderMessageDataType_deleteMembers(&dataSetReader->config.messageSettings);
  1017. UA_TargetVariablesDataType_deleteMembers(&dataSetReader->subscribedDataSetTarget);
  1018. /* Delete DataSetReader */
  1019. UA_ReaderGroup* pGroup = UA_ReaderGroup_findRGbyId(server, dataSetReader->linkedReaderGroup);
  1020. if(pGroup != NULL) {
  1021. pGroup->readersCount--;
  1022. }
  1023. UA_NodeId_deleteMembers(&dataSetReader->identifier);
  1024. UA_NodeId_deleteMembers(&dataSetReader->linkedReaderGroup);
  1025. /* Remove DataSetReader from group */
  1026. LIST_REMOVE(dataSetReader, listEntry);
  1027. /* Free memory allocated for DataSetReader */
  1028. UA_free(dataSetReader);
  1029. }
  1030. /**********************************************/
  1031. /* PublishedDataSet */
  1032. /**********************************************/
  1033. UA_StatusCode
  1034. UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
  1035. UA_PublishedDataSetConfig *dst) {
  1036. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1037. memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
  1038. retVal |= UA_String_copy(&src->name, &dst->name);
  1039. switch(src->publishedDataSetType){
  1040. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  1041. //no additional items
  1042. break;
  1043. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  1044. if(src->config.itemsTemplate.variablesToAddSize > 0){
  1045. dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
  1046. src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
  1047. }
  1048. for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
  1049. retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
  1050. &dst->config.itemsTemplate.variablesToAdd[i]);
  1051. }
  1052. retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
  1053. &dst->config.itemsTemplate.metaData);
  1054. break;
  1055. default:
  1056. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1057. }
  1058. return retVal;
  1059. }
  1060. UA_StatusCode
  1061. UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
  1062. UA_PublishedDataSetConfig *config){
  1063. if(!config)
  1064. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1065. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  1066. if(!currentPublishedDataSet)
  1067. return UA_STATUSCODE_BADNOTFOUND;
  1068. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  1069. //deep copy of the actual config
  1070. UA_PublishedDataSetConfig_copy(&currentPublishedDataSet->config, &tmpPublishedDataSetConfig);
  1071. *config = tmpPublishedDataSetConfig;
  1072. return UA_STATUSCODE_GOOD;
  1073. }
  1074. UA_PublishedDataSet *
  1075. UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
  1076. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  1077. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
  1078. return &server->pubSubManager.publishedDataSets[i];
  1079. }
  1080. }
  1081. return NULL;
  1082. }
  1083. void
  1084. UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
  1085. //delete pds config
  1086. UA_String_deleteMembers(&pdsConfig->name);
  1087. switch (pdsConfig->publishedDataSetType){
  1088. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  1089. //no additional items
  1090. break;
  1091. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  1092. if(pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
  1093. for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
  1094. UA_PublishedVariableDataType_deleteMembers(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
  1095. }
  1096. UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
  1097. }
  1098. UA_DataSetMetaDataType_deleteMembers(&pdsConfig->config.itemsTemplate.metaData);
  1099. break;
  1100. default:
  1101. break;
  1102. }
  1103. }
  1104. void
  1105. UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
  1106. UA_PublishedDataSetConfig_deleteMembers(&publishedDataSet->config);
  1107. //delete PDS
  1108. UA_DataSetMetaDataType_deleteMembers(&publishedDataSet->dataSetMetaData);
  1109. UA_DataSetField *field, *tmpField;
  1110. LIST_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
  1111. UA_Server_removeDataSetField(server, field->identifier);
  1112. }
  1113. UA_NodeId_deleteMembers(&publishedDataSet->identifier);
  1114. }
  1115. UA_DataSetFieldResult
  1116. UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
  1117. const UA_DataSetFieldConfig *fieldConfig,
  1118. UA_NodeId *fieldIdentifier) {
  1119. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1120. UA_DataSetFieldResult result = {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
  1121. if(!fieldConfig)
  1122. return result;
  1123. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
  1124. if(currentDataSet == NULL){
  1125. result.result = UA_STATUSCODE_BADNOTFOUND;
  1126. return result;
  1127. }
  1128. if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
  1129. result.result = UA_STATUSCODE_BADNOTIMPLEMENTED;
  1130. return result;
  1131. }
  1132. UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
  1133. if(!newField){
  1134. result.result = UA_STATUSCODE_BADINTERNALERROR;
  1135. return result;
  1136. }
  1137. UA_DataSetFieldConfig tmpFieldConfig;
  1138. retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
  1139. newField->config = tmpFieldConfig;
  1140. UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
  1141. if(fieldIdentifier != NULL){
  1142. UA_NodeId_copy(&newField->identifier, fieldIdentifier);
  1143. }
  1144. newField->publishedDataSet = currentDataSet->identifier;
  1145. //update major version of parent published data set
  1146. currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
  1147. LIST_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
  1148. if(newField->config.field.variable.promotedField)
  1149. currentDataSet->promotedFieldsCount++;
  1150. currentDataSet->fieldSize++;
  1151. result.result = retVal;
  1152. result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1153. result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1154. return result;
  1155. }
  1156. UA_DataSetFieldResult
  1157. UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
  1158. UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
  1159. UA_DataSetFieldResult result = {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  1160. if(!currentField)
  1161. return result;
  1162. UA_PublishedDataSet *parentPublishedDataSet =
  1163. UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
  1164. if(!parentPublishedDataSet)
  1165. return result;
  1166. parentPublishedDataSet->fieldSize--;
  1167. if(currentField->config.field.variable.promotedField)
  1168. parentPublishedDataSet->promotedFieldsCount--;
  1169. /* update major version of PublishedDataSet */
  1170. parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
  1171. UA_PubSubConfigurationVersionTimeDifference();
  1172. UA_DataSetField_deleteMembers(currentField);
  1173. LIST_REMOVE(currentField, listEntry);
  1174. UA_free(currentField);
  1175. result.result = UA_STATUSCODE_GOOD;
  1176. result.configurationVersion.majorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1177. result.configurationVersion.minorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1178. return result;
  1179. }
  1180. /**********************************************/
  1181. /* DataSetWriter */
  1182. /**********************************************/
  1183. UA_StatusCode
  1184. UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
  1185. UA_DataSetWriterConfig *dst){
  1186. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1187. memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
  1188. retVal |= UA_String_copy(&src->name, &dst->name);
  1189. retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
  1190. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  1191. dst->dataSetWriterProperties = (UA_KeyValuePair *)
  1192. UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
  1193. if(!dst->dataSetWriterProperties)
  1194. return UA_STATUSCODE_BADOUTOFMEMORY;
  1195. for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
  1196. retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
  1197. }
  1198. return retVal;
  1199. }
  1200. UA_StatusCode
  1201. UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
  1202. UA_DataSetWriterConfig *config){
  1203. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1204. if(!config)
  1205. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1206. UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  1207. if(!currentDataSetWriter)
  1208. return UA_STATUSCODE_BADNOTFOUND;
  1209. UA_DataSetWriterConfig tmpWriterConfig;
  1210. //deep copy of the actual config
  1211. retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
  1212. *config = tmpWriterConfig;
  1213. return retVal;
  1214. }
  1215. UA_DataSetWriter *
  1216. UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
  1217. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  1218. UA_WriterGroup *tmpWriterGroup;
  1219. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
  1220. UA_DataSetWriter *tmpWriter;
  1221. LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
  1222. if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
  1223. return tmpWriter;
  1224. }
  1225. }
  1226. }
  1227. }
  1228. return NULL;
  1229. }
  1230. void
  1231. UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig) {
  1232. UA_String_deleteMembers(&pdsConfig->name);
  1233. UA_String_deleteMembers(&pdsConfig->dataSetName);
  1234. for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
  1235. UA_KeyValuePair_deleteMembers(&pdsConfig->dataSetWriterProperties[i]);
  1236. }
  1237. UA_free(pdsConfig->dataSetWriterProperties);
  1238. UA_ExtensionObject_deleteMembers(&pdsConfig->messageSettings);
  1239. }
  1240. static void
  1241. UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
  1242. UA_DataSetWriterConfig_deleteMembers(&dataSetWriter->config);
  1243. //delete DataSetWriter
  1244. UA_NodeId_deleteMembers(&dataSetWriter->identifier);
  1245. UA_NodeId_deleteMembers(&dataSetWriter->linkedWriterGroup);
  1246. UA_NodeId_deleteMembers(&dataSetWriter->connectedDataSet);
  1247. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1248. //delete lastSamples store
  1249. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
  1250. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  1251. }
  1252. UA_free(dataSetWriter->lastSamples);
  1253. dataSetWriter->lastSamples = NULL;
  1254. dataSetWriter->lastSamplesCount = 0;
  1255. #endif
  1256. }
  1257. /**********************************************/
  1258. /* WriterGroup */
  1259. /**********************************************/
  1260. UA_StatusCode
  1261. UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
  1262. UA_WriterGroupConfig *dst){
  1263. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1264. memcpy(dst, src, sizeof(UA_WriterGroupConfig));
  1265. retVal |= UA_String_copy(&src->name, &dst->name);
  1266. retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
  1267. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  1268. dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
  1269. if(!dst->groupProperties)
  1270. return UA_STATUSCODE_BADOUTOFMEMORY;
  1271. for(size_t i = 0; i < src->groupPropertiesSize; i++){
  1272. retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
  1273. }
  1274. return retVal;
  1275. }
  1276. UA_StatusCode
  1277. UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
  1278. UA_WriterGroupConfig *config){
  1279. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1280. if(!config)
  1281. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1282. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
  1283. if(!currentWriterGroup){
  1284. return UA_STATUSCODE_BADNOTFOUND;
  1285. }
  1286. UA_WriterGroupConfig tmpWriterGroupConfig;
  1287. //deep copy of the actual config
  1288. retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
  1289. *config = tmpWriterGroupConfig;
  1290. return retVal;
  1291. }
  1292. UA_StatusCode
  1293. UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
  1294. const UA_WriterGroupConfig *config){
  1295. if(!config)
  1296. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1297. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
  1298. if(!currentWriterGroup)
  1299. return UA_STATUSCODE_BADNOTFOUND;
  1300. //The update functionality will be extended during the next PubSub batches.
  1301. //Currently is only a change of the publishing interval possible.
  1302. if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
  1303. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
  1304. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  1305. UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
  1306. } else if(currentWriterGroup->config.priority != config->priority) {
  1307. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1308. "No or unsupported WriterGroup update.");
  1309. }
  1310. return UA_STATUSCODE_GOOD;
  1311. }
  1312. UA_WriterGroup *
  1313. UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
  1314. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  1315. UA_WriterGroup *tmpWriterGroup;
  1316. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
  1317. if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
  1318. return tmpWriterGroup;
  1319. }
  1320. }
  1321. }
  1322. return NULL;
  1323. }
  1324. void
  1325. UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig){
  1326. //delete writerGroup config
  1327. UA_String_deleteMembers(&writerGroupConfig->name);
  1328. UA_ExtensionObject_deleteMembers(&writerGroupConfig->transportSettings);
  1329. UA_ExtensionObject_deleteMembers(&writerGroupConfig->messageSettings);
  1330. for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
  1331. UA_KeyValuePair_deleteMembers(&writerGroupConfig->groupProperties[i]);
  1332. }
  1333. UA_free(writerGroupConfig->groupProperties);
  1334. }
  1335. static void
  1336. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup) {
  1337. UA_WriterGroupConfig_deleteMembers(&writerGroup->config);
  1338. //delete WriterGroup
  1339. //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
  1340. UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
  1341. LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
  1342. UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
  1343. }
  1344. UA_NodeId_deleteMembers(&writerGroup->linkedConnection);
  1345. UA_NodeId_deleteMembers(&writerGroup->identifier);
  1346. }
  1347. UA_StatusCode
  1348. UA_Server_addDataSetWriter(UA_Server *server,
  1349. const UA_NodeId writerGroup, const UA_NodeId dataSet,
  1350. const UA_DataSetWriterConfig *dataSetWriterConfig,
  1351. UA_NodeId *writerIdentifier) {
  1352. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1353. if(!dataSetWriterConfig)
  1354. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1355. UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
  1356. if(!currentDataSetContext)
  1357. return UA_STATUSCODE_BADNOTFOUND;
  1358. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  1359. if(!wg)
  1360. return UA_STATUSCODE_BADNOTFOUND;
  1361. UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
  1362. if(!newDataSetWriter)
  1363. return UA_STATUSCODE_BADOUTOFMEMORY;
  1364. //copy the config into the new dataSetWriter
  1365. UA_DataSetWriterConfig tmpDataSetWriterConfig;
  1366. retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
  1367. newDataSetWriter->config = tmpDataSetWriterConfig;
  1368. //save the current version of the connected PublishedDataSet
  1369. newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
  1370. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1371. //initialize the queue for the last values
  1372. newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
  1373. UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
  1374. if(!newDataSetWriter->lastSamples) {
  1375. UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
  1376. UA_free(newDataSetWriter);
  1377. return UA_STATUSCODE_BADOUTOFMEMORY;
  1378. }
  1379. newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
  1380. #endif
  1381. //connect PublishedDataSet with DataSetWriter
  1382. newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
  1383. newDataSetWriter->linkedWriterGroup = wg->identifier;
  1384. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
  1385. if(writerIdentifier != NULL)
  1386. UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
  1387. //add the new writer to the group
  1388. LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
  1389. wg->writersCount++;
  1390. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  1391. addDataSetWriterRepresentation(server, newDataSetWriter);
  1392. #endif
  1393. return retVal;
  1394. }
  1395. UA_StatusCode
  1396. UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
  1397. UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  1398. if(!dataSetWriter)
  1399. return UA_STATUSCODE_BADNOTFOUND;
  1400. UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
  1401. if(!linkedWriterGroup)
  1402. return UA_STATUSCODE_BADNOTFOUND;
  1403. linkedWriterGroup->writersCount--;
  1404. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  1405. removeDataSetWriterRepresentation(server, dataSetWriter);
  1406. #endif
  1407. //remove DataSetWriter from group
  1408. UA_DataSetWriter_deleteMembers(server, dataSetWriter);
  1409. LIST_REMOVE(dataSetWriter, listEntry);
  1410. UA_free(dataSetWriter);
  1411. return UA_STATUSCODE_GOOD;
  1412. }
  1413. /**********************************************/
  1414. /* DataSetField */
  1415. /**********************************************/
  1416. UA_StatusCode
  1417. UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
  1418. memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
  1419. if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
  1420. UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
  1421. UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
  1422. &dst->field.variable.publishParameters);
  1423. } else {
  1424. return UA_STATUSCODE_BADNOTSUPPORTED;
  1425. }
  1426. return UA_STATUSCODE_GOOD;
  1427. }
  1428. UA_StatusCode
  1429. UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
  1430. UA_DataSetFieldConfig *config) {
  1431. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1432. if(!config)
  1433. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1434. UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
  1435. if(!currentDataSetField)
  1436. return UA_STATUSCODE_BADNOTFOUND;
  1437. UA_DataSetFieldConfig tmpFieldConfig;
  1438. //deep copy of the actual config
  1439. retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
  1440. *config = tmpFieldConfig;
  1441. return retVal;
  1442. }
  1443. UA_DataSetField *
  1444. UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
  1445. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  1446. UA_DataSetField *tmpField;
  1447. LIST_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
  1448. if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
  1449. return tmpField;
  1450. }
  1451. }
  1452. }
  1453. return NULL;
  1454. }
  1455. void
  1456. UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig){
  1457. if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
  1458. UA_String_deleteMembers(&dataSetFieldConfig->field.variable.fieldNameAlias);
  1459. UA_PublishedVariableDataType_deleteMembers(&dataSetFieldConfig->field.variable.publishParameters);
  1460. }
  1461. }
  1462. static void
  1463. UA_DataSetField_deleteMembers(UA_DataSetField *field) {
  1464. UA_DataSetFieldConfig_deleteMembers(&field->config);
  1465. //delete DataSetField
  1466. UA_NodeId_deleteMembers(&field->identifier);
  1467. UA_NodeId_deleteMembers(&field->publishedDataSet);
  1468. UA_FieldMetaData_deleteMembers(&field->fieldMetaData);
  1469. }
  1470. /*********************************************************/
  1471. /* PublishValues handling */
  1472. /*********************************************************/
  1473. /**
  1474. * Compare two variants. Internally used for value change detection.
  1475. *
  1476. * @return true if the value has changed
  1477. */
  1478. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1479. static UA_Boolean
  1480. valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
  1481. if(! (oldValue && newValue))
  1482. return false;
  1483. UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
  1484. size_t oldValueEncodingSize, newValueEncodingSize;
  1485. oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1486. newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1487. if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
  1488. return false;
  1489. if(oldValueEncodingSize != newValueEncodingSize)
  1490. return true;
  1491. if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
  1492. return false;
  1493. if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
  1494. return false;
  1495. UA_Byte *bufPosOldValue = oldValueEncoding->data;
  1496. const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
  1497. UA_Byte *bufPosNewValue = newValueEncoding->data;
  1498. const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
  1499. if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
  1500. &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1501. return false;
  1502. }
  1503. if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
  1504. &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1505. return false;
  1506. }
  1507. oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
  1508. newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
  1509. UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
  1510. UA_ByteString_delete(oldValueEncoding);
  1511. UA_ByteString_delete(newValueEncoding);
  1512. return compareResult;
  1513. }
  1514. #endif
  1515. /**
  1516. * Obtain the latest value for a specific DataSetField. This method is currently
  1517. * called inside the DataSetMessage generation process.
  1518. */
  1519. static void
  1520. UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
  1521. UA_DataValue *value) {
  1522. /* Read the value */
  1523. UA_ReadValueId rvid;
  1524. UA_ReadValueId_init(&rvid);
  1525. rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
  1526. rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
  1527. rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
  1528. *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
  1529. }
  1530. static UA_StatusCode
  1531. UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1532. UA_DataSetWriter *dataSetWriter) {
  1533. UA_PublishedDataSet *currentDataSet =
  1534. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1535. if(!currentDataSet)
  1536. return UA_STATUSCODE_BADNOTFOUND;
  1537. /* Prepare DataSetMessageContent */
  1538. dataSetMessage->header.dataSetMessageValid = true;
  1539. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
  1540. dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
  1541. dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
  1542. UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
  1543. if(!dataSetMessage->data.keyFrameData.dataSetFields)
  1544. return UA_STATUSCODE_BADOUTOFMEMORY;
  1545. #ifdef UA_ENABLE_JSON_ENCODING
  1546. /* json: insert fieldnames used as json keys */
  1547. dataSetMessage->data.keyFrameData.fieldNames =
  1548. (UA_String *)UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_STRING]);
  1549. if(!dataSetMessage->data.keyFrameData.fieldNames)
  1550. return UA_STATUSCODE_BADOUTOFMEMORY;
  1551. #endif
  1552. /* Loop over the fields */
  1553. size_t counter = 0;
  1554. UA_DataSetField *dsf;
  1555. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1556. #ifdef UA_ENABLE_JSON_ENCODING
  1557. /* json: store the fieldNameAlias*/
  1558. UA_String_copy(&dsf->config.field.variable.fieldNameAlias,
  1559. &dataSetMessage->data.keyFrameData.fieldNames[counter]);
  1560. #endif
  1561. /* Sample the value */
  1562. UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
  1563. UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
  1564. /* Deactivate statuscode? */
  1565. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1566. dfv->hasStatus = false;
  1567. /* Deactivate timestamps */
  1568. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1569. (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1570. dfv->hasSourceTimestamp = false;
  1571. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1572. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1573. dfv->hasSourcePicoseconds = false;
  1574. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1575. (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1576. dfv->hasServerTimestamp = false;
  1577. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1578. (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1579. dfv->hasServerPicoseconds = false;
  1580. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1581. /* Update lastValue store */
  1582. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  1583. UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
  1584. #endif
  1585. counter++;
  1586. }
  1587. return UA_STATUSCODE_GOOD;
  1588. }
  1589. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1590. static UA_StatusCode
  1591. UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
  1592. UA_DataSetMessage *dataSetMessage,
  1593. UA_DataSetWriter *dataSetWriter) {
  1594. UA_PublishedDataSet *currentDataSet =
  1595. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1596. if(!currentDataSet)
  1597. return UA_STATUSCODE_BADNOTFOUND;
  1598. /* Prepare DataSetMessageContent */
  1599. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1600. dataSetMessage->header.dataSetMessageValid = true;
  1601. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
  1602. UA_DataSetField *dsf;
  1603. size_t counter = 0;
  1604. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1605. /* Sample the value */
  1606. UA_DataValue value;
  1607. UA_DataValue_init(&value);
  1608. UA_PubSubDataSetField_sampleValue(server, dsf, &value);
  1609. /* Check if the value has changed */
  1610. if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
  1611. /* increase fieldCount for current delta message */
  1612. dataSetMessage->data.deltaFrameData.fieldCount++;
  1613. dataSetWriter->lastSamples[counter].valueChanged = true;
  1614. /* Update last stored sample */
  1615. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  1616. dataSetWriter->lastSamples[counter].value = value;
  1617. } else {
  1618. UA_DataValue_deleteMembers(&value);
  1619. dataSetWriter->lastSamples[counter].valueChanged = false;
  1620. }
  1621. counter++;
  1622. }
  1623. /* Allocate DeltaFrameFields */
  1624. UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
  1625. UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
  1626. if(!deltaFields)
  1627. return UA_STATUSCODE_BADOUTOFMEMORY;
  1628. dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
  1629. size_t currentDeltaField = 0;
  1630. for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
  1631. if(!dataSetWriter->lastSamples[i].valueChanged)
  1632. continue;
  1633. UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
  1634. dff->fieldIndex = (UA_UInt16) i;
  1635. UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
  1636. dataSetWriter->lastSamples[i].valueChanged = false;
  1637. /* Deactivate statuscode? */
  1638. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1639. dff->fieldValue.hasStatus = false;
  1640. /* Deactivate timestamps? */
  1641. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1642. dff->fieldValue.hasSourceTimestamp = false;
  1643. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1644. dff->fieldValue.hasServerPicoseconds = false;
  1645. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1646. dff->fieldValue.hasServerTimestamp = false;
  1647. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1648. dff->fieldValue.hasServerPicoseconds = false;
  1649. currentDeltaField++;
  1650. }
  1651. return UA_STATUSCODE_GOOD;
  1652. }
  1653. #endif
  1654. /**
  1655. * Generate a DataSetMessage for the given writer.
  1656. *
  1657. * @param dataSetWriter ptr to corresponding writer
  1658. * @return ptr to generated DataSetMessage
  1659. */
  1660. static UA_StatusCode
  1661. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1662. UA_DataSetWriter *dataSetWriter) {
  1663. UA_PublishedDataSet *currentDataSet =
  1664. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1665. if(!currentDataSet)
  1666. return UA_STATUSCODE_BADNOTFOUND;
  1667. /* Reset the message */
  1668. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1669. /* store messageType to switch between json or uadp (default) */
  1670. UA_UInt16 messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1671. UA_JsonDataSetWriterMessageDataType *jsonDataSetWriterMessageDataType = NULL;
  1672. /* The configuration Flags are included
  1673. * inside the std. defined UA_UadpDataSetWriterMessageDataType */
  1674. UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
  1675. UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
  1676. if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1677. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1678. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1679. &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
  1680. dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
  1681. dataSetWriter->config.messageSettings.content.decoded.data;
  1682. /* type is UADP */
  1683. messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1684. } else if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1685. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1686. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1687. &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE])) {
  1688. jsonDataSetWriterMessageDataType = (UA_JsonDataSetWriterMessageDataType *)
  1689. dataSetWriter->config.messageSettings.content.decoded.data;
  1690. /* type is JSON */
  1691. messageType = UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE;
  1692. } else {
  1693. /* create default flag configuration if no
  1694. * UadpDataSetWriterMessageDataType was passed in */
  1695. memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
  1696. defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
  1697. ((u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
  1698. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
  1699. dataSetWriterMessageDataType = &defaultUadpConfiguration;
  1700. }
  1701. /* Sanity-test the configuration */
  1702. if(dataSetWriterMessageDataType &&
  1703. (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
  1704. dataSetWriterMessageDataType->dataSetOffset != 0 ||
  1705. dataSetWriterMessageDataType->configuredSize != 0)) {
  1706. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1707. "Static DSM configuration not supported. Using defaults");
  1708. dataSetWriterMessageDataType->networkMessageNumber = 0;
  1709. dataSetWriterMessageDataType->dataSetOffset = 0;
  1710. dataSetWriterMessageDataType->configuredSize = 0;
  1711. }
  1712. /* The field encoding depends on the flags inside the writer config.
  1713. * TODO: This can be moved to the encoding layer. */
  1714. if(dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_RAWDATA
  1715. ) {
  1716. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
  1717. } else if((u64)dataSetWriter->config.dataSetFieldContentMask &
  1718. ((u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
  1719. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
  1720. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
  1721. } else {
  1722. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
  1723. }
  1724. if(messageType == UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE) {
  1725. /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
  1726. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1727. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION) {
  1728. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1729. dataSetMessage->header.configVersionMajorVersion =
  1730. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1731. }
  1732. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1733. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION) {
  1734. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1735. dataSetMessage->header.configVersionMinorVersion =
  1736. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1737. }
  1738. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1739. (u64)UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1740. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1741. dataSetMessage->header.dataSetMessageSequenceNr =
  1742. dataSetWriter->actualDataSetMessageSequenceCount;
  1743. }
  1744. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1745. (u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1746. dataSetMessage->header.timestampEnabled = true;
  1747. dataSetMessage->header.timestamp = UA_DateTime_now();
  1748. }
  1749. /* TODO: Picoseconds resolution not supported atm */
  1750. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1751. (u64)UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
  1752. dataSetMessage->header.picoSecondsIncluded = false;
  1753. }
  1754. /* TODO: Statuscode not supported yet */
  1755. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1756. (u64)UA_UADPDATASETMESSAGECONTENTMASK_STATUS) {
  1757. dataSetMessage->header.statusEnabled = false;
  1758. }
  1759. } else if(messageType == UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE) {
  1760. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1761. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1762. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1763. dataSetMessage->header.configVersionMajorVersion =
  1764. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1765. }
  1766. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1767. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1768. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1769. dataSetMessage->header.configVersionMinorVersion =
  1770. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1771. }
  1772. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1773. (u64)UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1774. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1775. dataSetMessage->header.dataSetMessageSequenceNr =
  1776. dataSetWriter->actualDataSetMessageSequenceCount;
  1777. }
  1778. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1779. (u64)UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1780. dataSetMessage->header.timestampEnabled = true;
  1781. dataSetMessage->header.timestamp = UA_DateTime_now();
  1782. }
  1783. /* TODO: Statuscode not supported yet */
  1784. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1785. (u64)UA_JSONDATASETMESSAGECONTENTMASK_STATUS) {
  1786. dataSetMessage->header.statusEnabled = false;
  1787. }
  1788. }
  1789. /* Set the sequence count. Automatically rolls over to zero */
  1790. dataSetWriter->actualDataSetMessageSequenceCount++;
  1791. /* JSON does not differ between deltaframes and keyframes, only keyframes are currently used. */
  1792. if(messageType != UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
  1793. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1794. /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
  1795. if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
  1796. dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
  1797. /* Remove old samples */
  1798. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
  1799. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  1800. /* Realloc pds dependent memory */
  1801. dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
  1802. UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
  1803. UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1804. if(!newSamplesArray)
  1805. return UA_STATUSCODE_BADOUTOFMEMORY;
  1806. dataSetWriter->lastSamples = newSamplesArray;
  1807. memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1808. dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
  1809. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1810. dataSetWriter->deltaFrameCounter = 0;
  1811. return UA_STATUSCODE_GOOD;
  1812. }
  1813. /* The standard defines: if a PDS contains only one fields no delta messages
  1814. * should be generated because they need more memory than a keyframe with 1
  1815. * field. */
  1816. if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
  1817. dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
  1818. UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
  1819. dataSetWriter->deltaFrameCounter++;
  1820. return UA_STATUSCODE_GOOD;
  1821. }
  1822. dataSetWriter->deltaFrameCounter = 1;
  1823. #endif
  1824. }
  1825. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1826. return UA_STATUSCODE_GOOD;
  1827. }
  1828. static UA_StatusCode
  1829. sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
  1830. UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
  1831. UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
  1832. #ifdef UA_ENABLE_JSON_ENCODING
  1833. UA_NetworkMessage nm;
  1834. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1835. nm.version = 1;
  1836. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1837. nm.payloadHeaderEnabled = true;
  1838. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1839. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1840. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1841. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1842. UA_ByteString buf;
  1843. size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true);
  1844. size_t stackSize = 1;
  1845. if(msgSize <= UA_MAX_STACKBUF)
  1846. stackSize = msgSize;
  1847. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1848. buf.data = stackBuf;
  1849. buf.length = msgSize;
  1850. if(msgSize > UA_MAX_STACKBUF) {
  1851. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1852. if(retval != UA_STATUSCODE_GOOD)
  1853. return retval;
  1854. }
  1855. /* Encode the message */
  1856. UA_Byte *bufPos = buf.data;
  1857. memset(bufPos, 0, msgSize);
  1858. const UA_Byte *bufEnd = &buf.data[buf.length];
  1859. retval = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true);
  1860. if(retval != UA_STATUSCODE_GOOD) {
  1861. if(msgSize > UA_MAX_STACKBUF)
  1862. UA_ByteString_deleteMembers(&buf);
  1863. return retval;
  1864. }
  1865. /* Send the prepared messages */
  1866. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1867. if(msgSize > UA_MAX_STACKBUF)
  1868. UA_ByteString_deleteMembers(&buf);
  1869. #endif
  1870. return retval;
  1871. }
  1872. static UA_StatusCode
  1873. sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
  1874. UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
  1875. UA_ExtensionObject *messageSettings,
  1876. UA_ExtensionObject *transportSettings) {
  1877. if(messageSettings->content.decoded.type !=
  1878. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
  1879. return UA_STATUSCODE_BADINTERNALERROR;
  1880. UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
  1881. messageSettings->content.decoded.data;
  1882. UA_NetworkMessage nm;
  1883. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1884. nm.publisherIdEnabled =
  1885. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
  1886. nm.groupHeaderEnabled =
  1887. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
  1888. nm.groupHeader.writerGroupIdEnabled =
  1889. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
  1890. nm.groupHeader.groupVersionEnabled =
  1891. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
  1892. nm.groupHeader.networkMessageNumberEnabled =
  1893. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
  1894. nm.groupHeader.sequenceNumberEnabled =
  1895. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
  1896. nm.payloadHeaderEnabled =
  1897. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
  1898. nm.timestampEnabled =
  1899. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
  1900. nm.picosecondsEnabled =
  1901. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
  1902. nm.dataSetClassIdEnabled =
  1903. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
  1904. nm.promotedFieldsEnabled =
  1905. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
  1906. nm.version = 1;
  1907. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1908. if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
  1909. nm.publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
  1910. nm.publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
  1911. } else if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
  1912. nm.publisherIdType = UA_PUBLISHERDATATYPE_STRING;
  1913. nm.publisherId.publisherIdString = connection->config->publisherId.string;
  1914. }
  1915. /* Compute the length of the dsm separately for the header */
  1916. UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
  1917. for(UA_Byte i = 0; i < dsmCount; i++)
  1918. dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
  1919. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1920. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1921. nm.groupHeader.writerGroupId = wg->config.writerGroupId;
  1922. nm.groupHeader.networkMessageNumber = 1;
  1923. nm.payload.dataSetPayload.sizes = dsmLengths;
  1924. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1925. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1926. UA_ByteString buf;
  1927. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
  1928. size_t stackSize = 1;
  1929. if(msgSize <= UA_MAX_STACKBUF)
  1930. stackSize = msgSize;
  1931. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1932. buf.data = stackBuf;
  1933. buf.length = msgSize;
  1934. UA_StatusCode retval;
  1935. if(msgSize > UA_MAX_STACKBUF) {
  1936. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1937. if(retval != UA_STATUSCODE_GOOD)
  1938. return retval;
  1939. }
  1940. /* Encode the message */
  1941. UA_Byte *bufPos = buf.data;
  1942. memset(bufPos, 0, msgSize);
  1943. const UA_Byte *bufEnd = &buf.data[buf.length];
  1944. retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
  1945. if(retval != UA_STATUSCODE_GOOD) {
  1946. if(msgSize > UA_MAX_STACKBUF)
  1947. UA_ByteString_deleteMembers(&buf);
  1948. return retval;
  1949. }
  1950. /* Send the prepared messages */
  1951. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1952. if(msgSize > UA_MAX_STACKBUF)
  1953. UA_ByteString_deleteMembers(&buf);
  1954. return retval;
  1955. }
  1956. /* This callback triggers the collection and publish of NetworkMessages and the
  1957. * contained DataSetMessages. */
  1958. void
  1959. UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1960. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Publish Callback");
  1961. if(!writerGroup) {
  1962. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1963. "Publish failed. WriterGroup not found");
  1964. return;
  1965. }
  1966. /* Nothing to do? */
  1967. if(writerGroup->writersCount <= 0)
  1968. return;
  1969. /* Binary or Json encoding? */
  1970. if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
  1971. writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
  1972. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1973. "Publish failed: Unknown encoding type.");
  1974. return;
  1975. }
  1976. /* Find the connection associated with the writer */
  1977. UA_PubSubConnection *connection =
  1978. UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
  1979. if(!connection) {
  1980. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1981. "Publish failed. PubSubConnection invalid.");
  1982. return;
  1983. }
  1984. /* How many DSM can be sent in one NM? */
  1985. UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
  1986. if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
  1987. maxDSM = UA_BYTE_MAX;
  1988. /* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
  1989. if(maxDSM == 0)
  1990. maxDSM = 1;
  1991. /* It is possible to put several DataSetMessages into one NetworkMessage.
  1992. * But only if they do not contain promoted fields. NM with only DSM are
  1993. * sent out right away. The others are kept in a buffer for "batching". */
  1994. size_t dsmCount = 0;
  1995. UA_DataSetWriter *dsw;
  1996. UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
  1997. UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
  1998. LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
  1999. /* Find the dataset */
  2000. UA_PublishedDataSet *pds =
  2001. UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
  2002. if(!pds) {
  2003. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  2004. "PubSub Publish: PublishedDataSet not found");
  2005. continue;
  2006. }
  2007. /* Generate the DSM */
  2008. UA_StatusCode res =
  2009. UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
  2010. if(res != UA_STATUSCODE_GOOD) {
  2011. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  2012. "PubSub Publish: DataSetMessage creation failed");
  2013. continue;
  2014. }
  2015. /* Send right away if there is only this DSM in a NM. If promoted fields
  2016. * are contained in the PublishedDataSet, then this DSM must go into a
  2017. * dedicated NM as well. */
  2018. if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
  2019. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  2020. res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
  2021. &dsw->config.dataSetWriterId, 1,
  2022. &writerGroup->config.messageSettings,
  2023. &writerGroup->config.transportSettings);
  2024. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  2025. res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
  2026. &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
  2027. }
  2028. if(res != UA_STATUSCODE_GOOD)
  2029. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  2030. "PubSub Publish: Could not send a NetworkMessage");
  2031. UA_DataSetMessage_free(&dsmStore[dsmCount]);
  2032. continue;
  2033. }
  2034. dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
  2035. dsmCount++;
  2036. }
  2037. /* Send the NetworkMessages with batched DataSetMessages */
  2038. size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
  2039. for(UA_UInt32 i = 0; i < nmCount; i++) {
  2040. UA_Byte nmDsmCount = maxDSM;
  2041. if(i == nmCount - 1)
  2042. nmDsmCount = (UA_Byte)dsmCount % maxDSM;
  2043. UA_StatusCode res3 = UA_STATUSCODE_GOOD;
  2044. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  2045. res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
  2046. &dsWriterIds[i * maxDSM], nmDsmCount,
  2047. &writerGroup->config.messageSettings,
  2048. &writerGroup->config.transportSettings);
  2049. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  2050. res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
  2051. &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
  2052. }
  2053. if(res3 != UA_STATUSCODE_GOOD)
  2054. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  2055. "PubSub Publish: Sending a NetworkMessage failed");
  2056. }
  2057. /* Clean up DSM */
  2058. for(size_t i = 0; i < dsmCount; i++)
  2059. UA_DataSetMessage_free(&dsmStore[i]);
  2060. }
  2061. /* Add new publishCallback. The first execution is triggered directly after
  2062. * creation. */
  2063. UA_StatusCode
  2064. UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  2065. UA_StatusCode retval =
  2066. UA_PubSubManager_addRepeatedCallback(server,
  2067. (UA_ServerCallback) UA_WriterGroup_publishCallback,
  2068. writerGroup, writerGroup->config.publishingInterval,
  2069. &writerGroup->publishCallbackId);
  2070. if(retval == UA_STATUSCODE_GOOD)
  2071. writerGroup->publishCallbackIsRegistered = true;
  2072. /* Run once after creation */
  2073. UA_WriterGroup_publishCallback(server, writerGroup);
  2074. return retval;
  2075. }
  2076. /* This callback triggers the collection and reception of NetworkMessages and the
  2077. * contained DataSetMessages. */
  2078. void UA_ReaderGroup_subscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  2079. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  2080. UA_ByteString buffer;
  2081. if(UA_ByteString_allocBuffer(&buffer, 512) != UA_STATUSCODE_GOOD) {
  2082. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Message buffer alloc failed!");
  2083. return;
  2084. }
  2085. connection->channel->receive(connection->channel, &buffer, NULL, 300000);
  2086. if(buffer.length > 0) {
  2087. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "Message received:");
  2088. UA_NetworkMessage currentNetworkMessage;
  2089. memset(&currentNetworkMessage, 0, sizeof(UA_NetworkMessage));
  2090. size_t currentPosition = 0;
  2091. UA_NetworkMessage_decodeBinary(&buffer, &currentPosition, &currentNetworkMessage);
  2092. UA_Server_processNetworkMessage(server, &currentNetworkMessage, connection);
  2093. UA_NetworkMessage_deleteMembers(&currentNetworkMessage);
  2094. }
  2095. UA_ByteString_deleteMembers(&buffer);
  2096. }
  2097. /* Add new subscribeCallback. The first execution is triggered directly after
  2098. * creation. */
  2099. UA_StatusCode
  2100. UA_ReaderGroup_addSubscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  2101. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  2102. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  2103. if(connection != NULL) {
  2104. retval = connection->channel->regist(connection->channel, NULL, NULL);
  2105. if(retval == UA_STATUSCODE_GOOD) {
  2106. retval = UA_PubSubManager_addRepeatedCallback(server,
  2107. (UA_ServerCallback) UA_ReaderGroup_subscribeCallback,
  2108. readerGroup, 5,
  2109. &readerGroup->subscribeCallbackId);
  2110. }
  2111. else {
  2112. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER, "register channel failed: 0x%x!", retval);
  2113. }
  2114. }
  2115. if(retval == UA_STATUSCODE_GOOD) {
  2116. readerGroup->subscribeCallbackIsRegistered = true;
  2117. }
  2118. /* Run once after creation */
  2119. UA_ReaderGroup_subscribeCallback(server, readerGroup);
  2120. return retval;
  2121. }
  2122. #endif /* UA_ENABLE_PUBSUB */