ua_pubsub.c 104 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. * Copyright (c) 2019 Kalycito Infotech Private Limited
  8. */
  9. #include "server/ua_server_internal.h"
  10. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  11. #include "ua_pubsub.h"
  12. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  13. #include "ua_pubsub_ns0.h"
  14. #endif
  15. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  16. #include "ua_types_encoding_binary.h"
  17. #endif
  18. #define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */
  19. #define UA_MAX_SIZENAME 64 /* Max size of Qualified Name of Subscribed Variable */
  20. /* Forward declaration */
  21. static size_t
  22. iterateVariableAttrDimension(UA_VariableAttributes varAttr);
  23. static void
  24. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup);
  25. static void
  26. UA_DataSetField_deleteMembers(UA_DataSetField *field);
  27. /**********************************************/
  28. /* Connection */
  29. /**********************************************/
  30. UA_StatusCode
  31. UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
  32. UA_PubSubConnectionConfig *dst) {
  33. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  34. memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
  35. retVal |= UA_String_copy(&src->name, &dst->name);
  36. retVal |= UA_Variant_copy(&src->address, &dst->address);
  37. retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
  38. retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
  39. if(src->connectionPropertiesSize > 0){
  40. dst->connectionProperties = (UA_KeyValuePair *)
  41. UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
  42. if(!dst->connectionProperties){
  43. return UA_STATUSCODE_BADOUTOFMEMORY;
  44. }
  45. for(size_t i = 0; i < src->connectionPropertiesSize; i++){
  46. retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
  47. &dst->connectionProperties[i].key);
  48. retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
  49. &dst->connectionProperties[i].value);
  50. }
  51. }
  52. return retVal;
  53. }
  54. UA_StatusCode
  55. UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
  56. UA_PubSubConnectionConfig *config) {
  57. if(!config)
  58. return UA_STATUSCODE_BADINVALIDARGUMENT;
  59. UA_PubSubConnection *currentPubSubConnection =
  60. UA_PubSubConnection_findConnectionbyId(server, connection);
  61. if(!currentPubSubConnection)
  62. return UA_STATUSCODE_BADNOTFOUND;
  63. UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
  64. //deep copy of the actual config
  65. UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
  66. *config = tmpPubSubConnectionConfig;
  67. return UA_STATUSCODE_GOOD;
  68. }
  69. UA_PubSubConnection *
  70. UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
  71. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  72. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
  73. return &server->pubSubManager.connections[i];
  74. }
  75. }
  76. return NULL;
  77. }
  78. void
  79. UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig) {
  80. UA_String_deleteMembers(&connectionConfig->name);
  81. UA_String_deleteMembers(&connectionConfig->transportProfileUri);
  82. UA_Variant_deleteMembers(&connectionConfig->connectionTransportSettings);
  83. UA_Variant_deleteMembers(&connectionConfig->address);
  84. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  85. UA_QualifiedName_deleteMembers(&connectionConfig->connectionProperties[i].key);
  86. UA_Variant_deleteMembers(&connectionConfig->connectionProperties[i].value);
  87. }
  88. UA_free(connectionConfig->connectionProperties);
  89. }
  90. void
  91. UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection) {
  92. //delete connection config
  93. UA_PubSubConnectionConfig_deleteMembers(connection->config);
  94. //remove contained WriterGroups
  95. UA_WriterGroup *writerGroup, *tmpWriterGroup;
  96. LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
  97. UA_Server_removeWriterGroup(server, writerGroup->identifier);
  98. }
  99. /* remove contained ReaderGroups */
  100. UA_ReaderGroup *readerGroups, *tmpReaderGroup;
  101. LIST_FOREACH_SAFE(readerGroups, &connection->readerGroups, listEntry, tmpReaderGroup){
  102. UA_Server_removeReaderGroup(server, readerGroups->identifier);
  103. }
  104. UA_NodeId_deleteMembers(&connection->identifier);
  105. if(connection->channel){
  106. connection->channel->close(connection->channel);
  107. }
  108. UA_free(connection->config);
  109. }
  110. UA_StatusCode
  111. UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
  112. const UA_WriterGroupConfig *writerGroupConfig,
  113. UA_NodeId *writerGroupIdentifier) {
  114. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  115. if(!writerGroupConfig)
  116. return UA_STATUSCODE_BADINVALIDARGUMENT;
  117. //search the connection by the given connectionIdentifier
  118. UA_PubSubConnection *currentConnectionContext =
  119. UA_PubSubConnection_findConnectionbyId(server, connection);
  120. if(!currentConnectionContext)
  121. return UA_STATUSCODE_BADNOTFOUND;
  122. //allocate memory for new WriterGroup
  123. UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
  124. if(!newWriterGroup)
  125. return UA_STATUSCODE_BADOUTOFMEMORY;
  126. newWriterGroup->linkedConnection = currentConnectionContext->identifier;
  127. UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
  128. if(writerGroupIdentifier){
  129. UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
  130. }
  131. //deep copy of the config
  132. UA_WriterGroupConfig tmpWriterGroupConfig;
  133. retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
  134. if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
  135. UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
  136. tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
  137. tmpWriterGroupConfig.messageSettings.content.decoded.type =
  138. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
  139. tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
  140. }
  141. newWriterGroup->config = tmpWriterGroupConfig;
  142. retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
  143. LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
  144. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  145. addWriterGroupRepresentation(server, newWriterGroup);
  146. #endif
  147. return retVal;
  148. }
  149. UA_StatusCode
  150. UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
  151. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  152. if(!wg)
  153. return UA_STATUSCODE_BADNOTFOUND;
  154. UA_PubSubConnection *connection =
  155. UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  156. if(!connection)
  157. return UA_STATUSCODE_BADNOTFOUND;
  158. //unregister the publish callback
  159. UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
  160. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  161. removeGroupRepresentation(server, wg);
  162. #endif
  163. UA_WriterGroup_deleteMembers(server, wg);
  164. LIST_REMOVE(wg, listEntry);
  165. UA_free(wg);
  166. return UA_STATUSCODE_GOOD;
  167. }
  168. /**********************************************/
  169. /* ReaderGroup */
  170. /**********************************************/
  171. /**
  172. * Add ReaderGroup to connection.
  173. *
  174. * @param server
  175. * @param connectionIdentifier
  176. * @param readerGroupConfiguration
  177. * @param readerGroupIdentifier
  178. * @return UA_STATUSCODE_GOOD on success
  179. */
  180. UA_StatusCode
  181. UA_Server_addReaderGroup(UA_Server *server, UA_NodeId connectionIdentifier,
  182. const UA_ReaderGroupConfig *readerGroupConfig,
  183. UA_NodeId *readerGroupIdentifier) {
  184. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  185. UA_ReaderGroupConfig tmpReaderGroupConfig;
  186. /* Search the connection by the given connectionIdentifier */
  187. if(!readerGroupConfig) {
  188. return UA_STATUSCODE_BADINVALIDARGUMENT;
  189. }
  190. /* Search the connection by the given connectionIdentifier */
  191. UA_PubSubConnection *currentConnectionContext = UA_PubSubConnection_findConnectionbyId(server, connectionIdentifier);
  192. if(!currentConnectionContext) {
  193. return UA_STATUSCODE_BADNOTFOUND;
  194. }
  195. /* Allocate memory for new reader group */
  196. UA_ReaderGroup *newGroup = (UA_ReaderGroup *)UA_calloc(1, sizeof(UA_ReaderGroup));
  197. if(!newGroup) {
  198. return UA_STATUSCODE_BADOUTOFMEMORY;
  199. }
  200. /* Generate nodeid for the readergroup identifier */
  201. newGroup->linkedConnection = currentConnectionContext->identifier;
  202. UA_PubSubManager_generateUniqueNodeId(server, &newGroup->identifier);
  203. if(readerGroupIdentifier) {
  204. UA_NodeId_copy(&newGroup->identifier, readerGroupIdentifier);
  205. }
  206. /* Deep copy of the config */
  207. retval |= UA_ReaderGroupConfig_copy(readerGroupConfig, &tmpReaderGroupConfig);
  208. newGroup->config = tmpReaderGroupConfig;
  209. retval |= UA_ReaderGroup_addSubscribeCallback(server, newGroup);
  210. LIST_INSERT_HEAD(&currentConnectionContext->readerGroups, newGroup, listEntry);
  211. currentConnectionContext->readerGroupsSize++;
  212. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  213. addReaderGroupRepresentation(server, newGroup);
  214. #endif
  215. return retval;
  216. }
  217. /**
  218. * Remove ReaderGroup from connection and delete contained readers.
  219. *
  220. * @param server
  221. * @param groupIdentifier
  222. * @return UA_STATUSCODE_GOOD on success
  223. */
  224. UA_StatusCode
  225. UA_Server_removeReaderGroup(UA_Server *server, UA_NodeId groupIdentifier) {
  226. UA_ReaderGroup* readerGroup = UA_ReaderGroup_findRGbyId(server, groupIdentifier);
  227. if(readerGroup == NULL) {
  228. return UA_STATUSCODE_BADNOTFOUND;
  229. }
  230. /* Search the connection to which the given readergroup is connected to */
  231. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  232. if(connection == NULL) {
  233. return UA_STATUSCODE_BADNOTFOUND;
  234. }
  235. /* Unregister subscribe callback */
  236. UA_PubSubManager_removeRepeatedPubSubCallback(server, readerGroup->subscribeCallbackId);
  237. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  238. /* To Do:RemoveGroupRepresentation(server, &readerGroup->identifier) */
  239. #endif
  240. /* UA_Server_ReaderGroup_delete also removes itself from the list */
  241. UA_Server_ReaderGroup_delete(server, readerGroup);
  242. /* Remove readerGroup from Connection */
  243. LIST_REMOVE(readerGroup, listEntry);
  244. UA_free(readerGroup);
  245. return UA_STATUSCODE_GOOD;
  246. }
  247. /**
  248. * To Do:
  249. * Update ReaderGroup configuration.
  250. *
  251. * @param server
  252. * @param readerGroupIdentifier
  253. * @param readerGroupConfiguration
  254. * @return UA_STATUSCODE_GOOD on success
  255. */
  256. UA_StatusCode
  257. UA_Server_ReaderGroup_updateConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  258. const UA_ReaderGroupConfig *config) {
  259. return UA_STATUSCODE_BADNOTIMPLEMENTED;
  260. }
  261. /**
  262. * Get ReaderGroup configuration.
  263. *
  264. * @param server
  265. * @param groupIdentifier
  266. * @param readerGroupConfiguration
  267. * @return UA_STATUSCODE_GOOD on success
  268. */
  269. UA_StatusCode
  270. UA_Server_ReaderGroup_getConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  271. UA_ReaderGroupConfig *config) {
  272. if(!config) {
  273. return UA_STATUSCODE_BADINVALIDARGUMENT;
  274. }
  275. /* Identify the readergroup through the readerGroupIdentifier */
  276. UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  277. if(!currentReaderGroup) {
  278. return UA_STATUSCODE_BADNOTFOUND;
  279. }
  280. UA_ReaderGroupConfig tmpReaderGroupConfig;
  281. /* deep copy of the actual config */
  282. UA_ReaderGroupConfig_copy(&currentReaderGroup->config, &tmpReaderGroupConfig);
  283. *config = tmpReaderGroupConfig;
  284. return UA_STATUSCODE_GOOD;
  285. }
  286. /* To Do UA_ReaderGroupConfig delete */
  287. /**
  288. * Delete ReaderGroup.
  289. *
  290. * @param server
  291. * @param groupIdentifier
  292. */
  293. void UA_Server_ReaderGroup_delete(UA_Server* server, UA_ReaderGroup *readerGroup) {
  294. /* To Do Call UA_ReaderGroupConfig_delete */
  295. UA_DataSetReader *dataSetReader, *tmpDataSetReader;
  296. LIST_FOREACH_SAFE(dataSetReader, &readerGroup->readers, listEntry, tmpDataSetReader) {
  297. UA_DataSetReader_delete(server, dataSetReader);
  298. }
  299. UA_PubSubConnection* pConn = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  300. if(pConn != NULL) {
  301. pConn->readerGroupsSize--;
  302. }
  303. /* Delete ReaderGroup and its members */
  304. UA_String_deleteMembers(&readerGroup->config.name);
  305. UA_NodeId_deleteMembers(&readerGroup->linkedConnection);
  306. UA_NodeId_deleteMembers(&readerGroup->identifier);
  307. }
  308. /**
  309. * Copy ReaderGroup configuration.
  310. *
  311. * @param source
  312. * @param destination
  313. * @return UA_STATUSCODE_GOOD on success
  314. */
  315. UA_StatusCode
  316. UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,
  317. UA_ReaderGroupConfig *dst) {
  318. UA_String_copy(&src->name, &dst->name);
  319. /* Currently simple memcpy only */
  320. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  321. return UA_STATUSCODE_GOOD;
  322. }
  323. static UA_DataSetReader *
  324. getReaderFromIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_PubSubConnection *pConnection) {
  325. if(pConnection->readerGroupsSize == 1) {
  326. if(LIST_FIRST(&pConnection->readerGroups)->readersCount == 1) {
  327. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "only 1 DataSetReader available. This one will be used.");
  328. return LIST_FIRST(&LIST_FIRST(&pConnection->readerGroups)->readers);
  329. }
  330. }
  331. if(!pMsg->publisherIdEnabled)
  332. return NULL;
  333. UA_ReaderGroup* readerGroup;
  334. LIST_FOREACH(readerGroup, &pConnection->readerGroups, listEntry) {
  335. UA_DataSetReader *tmpReader;
  336. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  337. switch (pMsg->publisherIdType) {
  338. case UA_PUBLISHERDATATYPE_BYTE:
  339. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_BYTE] &&
  340. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_BYTE &&
  341. pMsg->publisherId.publisherIdByte == *(UA_Byte*)tmpReader->config.publisherId.data) {
  342. return tmpReader;
  343. }
  344. break;
  345. case UA_PUBLISHERDATATYPE_UINT16:
  346. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT16] &&
  347. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT16 &&
  348. pMsg->publisherId.publisherIdUInt16 == *(UA_UInt16*)tmpReader->config.publisherId.data) {
  349. return tmpReader;
  350. }
  351. break;
  352. case UA_PUBLISHERDATATYPE_UINT32:
  353. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT32] &&
  354. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT32 &&
  355. pMsg->publisherId.publisherIdUInt32 == *(UA_UInt32*)tmpReader->config.publisherId.data) {
  356. return tmpReader;
  357. }
  358. break;
  359. case UA_PUBLISHERDATATYPE_UINT64:
  360. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT64] &&
  361. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT64 &&
  362. pMsg->publisherId.publisherIdUInt64 == *(UA_UInt64*)tmpReader->config.publisherId.data) {
  363. return tmpReader;
  364. }
  365. break;
  366. case UA_PUBLISHERDATATYPE_STRING:
  367. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_STRING] &&
  368. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_STRING &&
  369. UA_String_equal(&pMsg->publisherId.publisherIdString, (UA_String*)tmpReader->config.publisherId.data)) {
  370. return tmpReader;
  371. }
  372. break;
  373. default:
  374. return NULL;
  375. }
  376. }
  377. }
  378. return NULL;
  379. }
  380. /**
  381. * Process NetworkMessage.
  382. *
  383. * @param server
  384. * @param networkmessage
  385. * @return UA_STATUSCODE_GOOD on success
  386. */
  387. UA_StatusCode
  388. UA_Server_processNetworkMessage(UA_Server *server, UA_NetworkMessage *pMsg,
  389. UA_PubSubConnection *pConnection) {
  390. if(!pMsg || !pConnection)
  391. return UA_STATUSCODE_BADINVALIDARGUMENT;
  392. /* To Do The condition with dataSetWriterIdAvailable and WriterGroupIdAvailable to be handled
  393. * when pMsg->groupHeaderEnabled, pMsg->dataSetClassIdEnabled, pMsg->payloadHeaderEnabled
  394. * Here some filtering is possible */
  395. UA_DataSetReader* dataSetReaderErg = getReaderFromIdentifier(server, pMsg, pConnection);
  396. /* No Reader with the specified id found */
  397. if(!dataSetReaderErg) {
  398. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "No DataSetReader found with PublisherId");
  399. return UA_STATUSCODE_BADNOTFOUND; /* TODO: Check the return code */
  400. }
  401. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetReader found with PublisherId");
  402. UA_Byte anzDataSets = 1;
  403. if(pMsg->payloadHeaderEnabled)
  404. anzDataSets = pMsg->payloadHeader.dataSetPayloadHeader.count;
  405. for(UA_Byte iterator = 0; iterator < anzDataSets; iterator++) {
  406. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Process Msg with DataSetReader!");
  407. UA_Server_DataSetReader_process(server, dataSetReaderErg, &pMsg->payload.dataSetPayload.dataSetMessages[iterator]);
  408. }
  409. /* To Do the condition with dataSetWriterId and WriterGroupId
  410. * else condition for dataSetWriterIdAvailable and writerGroupIdAvailable) */
  411. return UA_STATUSCODE_GOOD;
  412. }
  413. /**
  414. * Find size iterator - array dimension for VariableAttributes
  415. *
  416. * @param VariableAttributes
  417. * @return global variable size iterator
  418. */
  419. static size_t
  420. iterateVariableAttrDimension (UA_VariableAttributes varAttr) {
  421. size_t sizeIterator = 0;
  422. for(size_t indexIterator = 0; indexIterator < varAttr.arrayDimensionsSize; indexIterator++) {
  423. sizeIterator += varAttr.arrayDimensions[indexIterator];
  424. }
  425. return sizeIterator;
  426. }
  427. /**
  428. * Find ReaderGroup with its identifier.
  429. *
  430. * @param server
  431. * @param groupIdentifier
  432. * @return the ReaderGroup or NULL if not found
  433. */
  434. UA_ReaderGroup * UA_ReaderGroup_findRGbyId(UA_Server *server, UA_NodeId identifier) {
  435. for (size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
  436. UA_ReaderGroup* readerGroup = NULL;
  437. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
  438. if(UA_NodeId_equal(&identifier, &readerGroup->identifier)) {
  439. return readerGroup;
  440. }
  441. }
  442. }
  443. return NULL;
  444. }
  445. /**
  446. * Find a DataSetReader with its identifier
  447. *
  448. * @param server
  449. * @param identifier
  450. * @return the DataSetReader or NULL if not found
  451. */
  452. UA_DataSetReader *UA_ReaderGroup_findDSRbyId(UA_Server *server, UA_NodeId identifier) {
  453. for (size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
  454. UA_ReaderGroup* readerGroup = NULL;
  455. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
  456. UA_DataSetReader *tmpReader;
  457. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  458. if(UA_NodeId_equal(&tmpReader->identifier, &identifier)) {
  459. return tmpReader;
  460. }
  461. }
  462. }
  463. }
  464. return NULL;
  465. }
  466. /**********************************************/
  467. /* DataSetReader */
  468. /**********************************************/
  469. /**
  470. * Add a DataSetReader to ReaderGroup
  471. *
  472. * @param server
  473. * @param readerGroupIdentifier
  474. * @param dataSetReaderConfig
  475. * @param readerIdentifier
  476. * @return UA_STATUSCODE_GOOD on success
  477. */
  478. UA_StatusCode
  479. UA_Server_addDataSetReader(UA_Server *server, UA_NodeId readerGroupIdentifier,
  480. const UA_DataSetReaderConfig *dataSetReaderConfig,
  481. UA_NodeId *readerIdentifier) {
  482. /* Search the reader group by the given readerGroupIdentifier */
  483. UA_ReaderGroup *readerGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  484. if(!dataSetReaderConfig) {
  485. return UA_STATUSCODE_BADNOTFOUND;
  486. }
  487. if(readerGroup == NULL) {
  488. return UA_STATUSCODE_BADNOTFOUND;
  489. }
  490. /* Allocate memory for new DataSetReader */
  491. UA_DataSetReader *newDataSetReader = (UA_DataSetReader *)UA_calloc(1, sizeof(UA_DataSetReader));
  492. /* Copy the config into the new dataSetReader */
  493. UA_DataSetReaderConfig_copy(dataSetReaderConfig, &newDataSetReader->config);
  494. newDataSetReader->linkedReaderGroup = readerGroup->identifier;
  495. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetReader->identifier);
  496. if(readerIdentifier != NULL) {
  497. UA_NodeId_copy(&newDataSetReader->identifier, readerIdentifier);
  498. }
  499. /* Add the new reader to the group */
  500. LIST_INSERT_HEAD(&readerGroup->readers, newDataSetReader, listEntry);
  501. readerGroup->readersCount++;
  502. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  503. addDataSetReaderRepresentation(server, newDataSetReader);
  504. #endif
  505. return UA_STATUSCODE_GOOD;
  506. }
  507. /**
  508. * Remove a DataSetReader from ReaderGroup
  509. *
  510. * @param server
  511. * @param readerGroupIdentifier
  512. * @return UA_STATUSCODE_GOOD on success
  513. */
  514. UA_StatusCode
  515. UA_Server_removeDataSetReader(UA_Server *server, UA_NodeId readerIdentifier) {
  516. /* Remove datasetreader given by the identifier */
  517. UA_DataSetReader *dataSetReader = UA_ReaderGroup_findDSRbyId(server, readerIdentifier);
  518. if(!dataSetReader) {
  519. return UA_STATUSCODE_BADNOTFOUND;
  520. }
  521. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  522. removeDataSetReaderRepresentation(server, dataSetReader);
  523. #endif
  524. UA_DataSetReader_delete(server, dataSetReader);
  525. return UA_STATUSCODE_GOOD;
  526. }
  527. /**
  528. * Update the config of the DataSetReader.
  529. *
  530. * @param server
  531. * @param dataSetReaderIdentifier
  532. * @param readerGroupIdentifier
  533. * @param config
  534. * @return UA_STATUSCODE_GOOD on success
  535. */
  536. UA_StatusCode
  537. UA_Server_DataSetReader_updateConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier, UA_NodeId readerGroupIdentifier,
  538. const UA_DataSetReaderConfig *config) {
  539. if(config == NULL) {
  540. return UA_STATUSCODE_BADINVALIDARGUMENT;
  541. }
  542. UA_DataSetReader *currentDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  543. UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  544. if(!currentDataSetReader) {
  545. return UA_STATUSCODE_BADNOTFOUND;
  546. }
  547. /* The update functionality will be extended during the next PubSub batches.
  548. * Currently is only a change of the publishing interval possible. */
  549. if(currentDataSetReader->config.writerGroupId != config->writerGroupId) {
  550. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentReaderGroup->subscribeCallbackId);
  551. currentDataSetReader->config.writerGroupId = config->writerGroupId;
  552. UA_ReaderGroup_subscribeCallback(server, currentReaderGroup);
  553. }
  554. else {
  555. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  556. "No or unsupported ReaderGroup update.");
  557. }
  558. return UA_STATUSCODE_GOOD;
  559. }
  560. /**
  561. * Get the current config of the UA_DataSetReader.
  562. *
  563. * @param server
  564. * @param dataSetReaderIdentifier
  565. * @param config
  566. * @return UA_STATUSCODE_GOOD on success
  567. */
  568. UA_StatusCode
  569. UA_Server_DataSetReader_getConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier,
  570. UA_DataSetReaderConfig *config) {
  571. if(!config) {
  572. return UA_STATUSCODE_BADINVALIDARGUMENT;
  573. }
  574. UA_DataSetReader *currentDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  575. if(!currentDataSetReader) {
  576. return UA_STATUSCODE_BADNOTFOUND;
  577. }
  578. UA_DataSetReaderConfig tmpReaderConfig;
  579. /* Deep copy of the actual config */
  580. UA_DataSetReaderConfig_copy(&currentDataSetReader->config, &tmpReaderConfig);
  581. *config = tmpReaderConfig;
  582. return UA_STATUSCODE_GOOD;
  583. }
  584. /**
  585. * This Method is used to initially set the SubscribedDataSet to TargetVariablesType and to create the list of target Variables of a SubscribedDataSetType.
  586. *
  587. * @param server
  588. * @param dataSetReaderIdentifier
  589. * @param targetVariables
  590. * @return UA_STATUSCODE_GOOD on success
  591. */
  592. UA_StatusCode
  593. UA_Server_DataSetReader_createTargetVariables(UA_Server *server, UA_NodeId dataSetReaderIdentifier, UA_TargetVariablesDataType *targetVariables) {
  594. UA_StatusCode retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
  595. UA_DataSetReader* pDS = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  596. if(pDS == NULL) {
  597. return UA_STATUSCODE_BADINVALIDARGUMENT;
  598. }
  599. if(pDS->subscribedDataSetTarget.targetVariablesSize > 0) {
  600. UA_TargetVariablesDataType_deleteMembers(&pDS->subscribedDataSetTarget);
  601. pDS->subscribedDataSetTarget.targetVariablesSize = 0;
  602. pDS->subscribedDataSetTarget.targetVariables = NULL;
  603. }
  604. /* Set subscribed dataset to TargetVariableType */
  605. pDS->subscribedDataSetType = UA_PUBSUB_SDS_TARGET;
  606. retval = UA_TargetVariablesDataType_copy(targetVariables, &pDS->subscribedDataSetTarget);
  607. return retval;
  608. }
  609. /**
  610. * Adds Subscribed Variables from the DataSetMetaData for the given DataSet into the given parent node
  611. * and creates the corresponding data in the targetVariables of the DataSetReader
  612. *
  613. * @param server
  614. * @param parentNode
  615. * @param dataSetReaderIdentifier
  616. * @return UA_STATUSCODE_GOOD on success
  617. */
  618. UA_StatusCode UA_Server_DataSetReader_addTargetVariables(UA_Server *server, UA_NodeId *parentNode, UA_NodeId dataSetReaderIdentifier, UA_SubscribedDataSetEnumType sdsType) {
  619. if((server == NULL) || (parentNode == NULL)) {
  620. return UA_STATUSCODE_BADINVALIDARGUMENT;
  621. }
  622. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  623. UA_DataSetReader* pDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  624. if(pDataSetReader == NULL) {
  625. return UA_STATUSCODE_BADINVALIDARGUMENT;
  626. }
  627. /* To Do Declare for all other type */
  628. UA_DateTime dateTimeTypeVal = 0;
  629. UA_TargetVariablesDataType targetVars;
  630. targetVars.targetVariablesSize = pDataSetReader->config.dataSetMetaData.fieldsSize;
  631. targetVars.targetVariables = (UA_FieldTargetDataType *)UA_calloc(targetVars.targetVariablesSize, sizeof(UA_FieldTargetDataType));
  632. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  633. for (size_t iteratorField = 0; iteratorField < pDataSetReader->config.dataSetMetaData.fieldsSize; iteratorField++) {
  634. UA_VariableAttributes vAttr = UA_VariableAttributes_default;
  635. vAttr.valueRank = pDataSetReader->config.dataSetMetaData.fields[iteratorField].valueRank;
  636. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize > 0) {
  637. retVal = UA_Array_copy(pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensions, pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize, (void**)&vAttr.arrayDimensions, &UA_TYPES[UA_TYPES_UINT32]);
  638. if(retVal == UA_STATUSCODE_GOOD) {
  639. vAttr.arrayDimensionsSize = pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize;
  640. }
  641. }
  642. switch (pDataSetReader->config.dataSetMetaData.fields[iteratorField].builtInType) {
  643. case UA_NS0ID_BOOLEAN:
  644. if(vAttr.arrayDimensionsSize > 0) {
  645. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  646. UA_Boolean *boolArray = (UA_Boolean*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_BOOLEAN]);
  647. UA_Variant_setArrayCopy(&vAttr.value, boolArray, sizeIterator, &UA_TYPES[UA_TYPES_BOOLEAN]);
  648. UA_Array_delete(boolArray, sizeIterator, &UA_TYPES[UA_TYPES_BOOLEAN]);
  649. }
  650. else {
  651. UA_Boolean boolTypeVal = false;
  652. UA_Variant_setScalar(&vAttr.value, &boolTypeVal, &UA_TYPES[UA_TYPES_BOOLEAN]);
  653. }
  654. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  655. break;
  656. case UA_NS0ID_SBYTE:
  657. if(vAttr.arrayDimensionsSize > 0) {
  658. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  659. UA_SByte *sbyteArray = (UA_SByte*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_SBYTE]);
  660. UA_Variant_setArrayCopy(&vAttr.value, sbyteArray, sizeIterator, &UA_TYPES[UA_TYPES_SBYTE]);
  661. UA_Array_delete(sbyteArray, sizeIterator, &UA_TYPES[UA_TYPES_SBYTE]);
  662. }
  663. else {
  664. UA_SByte sbyteTypeVal = 0;
  665. UA_Variant_setScalar(&vAttr.value, &sbyteTypeVal, &UA_TYPES[UA_TYPES_SBYTE]);
  666. }
  667. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  668. break;
  669. case UA_NS0ID_BYTE:
  670. if(vAttr.arrayDimensionsSize > 0) {
  671. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  672. UA_Byte *byteArray = (UA_Byte*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_BYTE]);
  673. UA_Variant_setArrayCopy(&vAttr.value, byteArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTE]);
  674. UA_Array_delete(byteArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTE]);
  675. }
  676. else {
  677. UA_Byte byteTypeVal = 0;
  678. UA_Variant_setScalar(&vAttr.value, &byteTypeVal, &UA_TYPES[UA_TYPES_BYTE]);
  679. }
  680. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  681. break;
  682. case UA_NS0ID_INT16:
  683. if(vAttr.arrayDimensionsSize > 0) {
  684. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  685. UA_Int16 *int16Array = (UA_Int16*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_INT16]);
  686. UA_Variant_setArrayCopy(&vAttr.value, int16Array, sizeIterator, &UA_TYPES[UA_TYPES_INT16]);
  687. UA_Array_delete(int16Array, sizeIterator, &UA_TYPES[UA_TYPES_INT16]);
  688. }
  689. else {
  690. UA_Int16 int16TypeVal = 0;
  691. UA_Variant_setScalar(&vAttr.value, &int16TypeVal, &UA_TYPES[UA_TYPES_INT16]);
  692. }
  693. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  694. break;
  695. case UA_NS0ID_UINT16:
  696. if(vAttr.arrayDimensionsSize > 0) {
  697. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  698. UA_UInt16 *uint16Array = (UA_UInt16*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_UINT16]);
  699. UA_Variant_setArrayCopy(&vAttr.value, uint16Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT16]);
  700. UA_Array_delete(uint16Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT16]);
  701. }
  702. else {
  703. UA_UInt16 uint16TypeVal = 0;
  704. UA_Variant_setScalar(&vAttr.value, &uint16TypeVal, &UA_TYPES[UA_TYPES_UINT16]);
  705. }
  706. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  707. break;
  708. case UA_NS0ID_INT32:
  709. if(vAttr.arrayDimensionsSize > 0) {
  710. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  711. UA_Int32 *int32Array = (UA_Int32*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_INT32]);
  712. UA_Variant_setArrayCopy(&vAttr.value, int32Array, sizeIterator, &UA_TYPES[UA_TYPES_INT32]);
  713. UA_Array_delete(int32Array, sizeIterator, &UA_TYPES[UA_TYPES_INT32]);
  714. }
  715. else {
  716. UA_Int32 int32TypeVal = 0;
  717. UA_Variant_setScalar(&vAttr.value, &int32TypeVal, &UA_TYPES[UA_TYPES_INT32]);
  718. }
  719. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  720. break;
  721. case UA_NS0ID_UINT32:
  722. if(vAttr.arrayDimensionsSize > 0) {
  723. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  724. UA_UInt32 *uint32Array = (UA_UInt32*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_UINT32]);
  725. UA_Variant_setArrayCopy(&vAttr.value, uint32Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT32]);
  726. UA_Array_delete(uint32Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT32]);
  727. }
  728. else {
  729. UA_UInt32 uint32TypeVal = 0;
  730. UA_Variant_setScalar(&vAttr.value, &uint32TypeVal, &UA_TYPES[UA_TYPES_UINT32]);
  731. }
  732. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  733. break;
  734. case UA_NS0ID_INT64:
  735. if(vAttr.arrayDimensionsSize > 0) {
  736. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  737. UA_Int64 *int64Array = (UA_Int64*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_INT64]);
  738. UA_Variant_setArrayCopy(&vAttr.value, int64Array, sizeIterator, &UA_TYPES[UA_TYPES_INT64]);
  739. UA_Array_delete(int64Array, sizeIterator, &UA_TYPES[UA_TYPES_INT64]);
  740. }
  741. else {
  742. UA_Int64 int64TypeVal = 0;
  743. UA_Variant_setScalar(&vAttr.value, &int64TypeVal, &UA_TYPES[UA_TYPES_INT64]);
  744. }
  745. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  746. break;
  747. case UA_NS0ID_UINT64:
  748. if(vAttr.arrayDimensionsSize > 0) {
  749. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  750. UA_UInt64 *uint64Array = (UA_UInt64*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_UINT64]);
  751. UA_Variant_setArrayCopy(&vAttr.value, uint64Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT64]);
  752. UA_Array_delete(uint64Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT64]);
  753. }
  754. else {
  755. UA_UInt64 uint64TypeVal = 0;
  756. UA_Variant_setScalar(&vAttr.value, &uint64TypeVal, &UA_TYPES[UA_TYPES_UINT64]);
  757. }
  758. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  759. break;
  760. case UA_NS0ID_FLOAT:
  761. if(vAttr.arrayDimensionsSize > 0) {
  762. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  763. UA_Float *floatArray = (UA_Float*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_FLOAT]);
  764. UA_Variant_setArrayCopy(&vAttr.value, floatArray, sizeIterator, &UA_TYPES[UA_TYPES_FLOAT]);
  765. UA_Array_delete(floatArray, sizeIterator, &UA_TYPES[UA_TYPES_FLOAT]);
  766. }
  767. else {
  768. UA_Float floatTypeVal = 0;
  769. UA_Variant_setScalar(&vAttr.value, &floatTypeVal, &UA_TYPES[UA_TYPES_FLOAT]);
  770. }
  771. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  772. break;
  773. case UA_NS0ID_DOUBLE:
  774. if(vAttr.arrayDimensionsSize > 0) {
  775. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  776. UA_Double *doubleArray = (UA_Double*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_DOUBLE]);
  777. UA_Variant_setArrayCopy(&vAttr.value, doubleArray, sizeIterator, &UA_TYPES[UA_TYPES_DOUBLE]);
  778. UA_Array_delete(doubleArray, sizeIterator, &UA_TYPES[UA_TYPES_DOUBLE]);
  779. }
  780. else {
  781. UA_Double doubleTypeVal = 0.0;
  782. UA_Variant_setScalar(&vAttr.value, &doubleTypeVal, &UA_TYPES[UA_TYPES_DOUBLE]);
  783. }
  784. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  785. break;
  786. case UA_NS0ID_STRING:
  787. if(vAttr.arrayDimensionsSize > 0) {
  788. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  789. UA_String *stringArray = (UA_String*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_STRING]);
  790. UA_Variant_setArrayCopy(&vAttr.value, stringArray, sizeIterator, &UA_TYPES[UA_TYPES_STRING]);
  791. UA_Array_delete(stringArray, sizeIterator, &UA_TYPES[UA_TYPES_STRING]);
  792. }
  793. else {
  794. UA_String stringTypeVal = UA_STRING_NULL;
  795. UA_Variant_setScalar(&vAttr.value, &stringTypeVal, &UA_TYPES[UA_TYPES_STRING]);
  796. }
  797. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  798. break;
  799. case UA_NS0ID_DATETIME:
  800. if(vAttr.arrayDimensionsSize > 0) {
  801. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  802. UA_DateTime *dateTimeArray = (UA_DateTime*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_DATETIME]);
  803. UA_Variant_setArrayCopy(&vAttr.value, dateTimeArray, sizeIterator, &UA_TYPES[UA_TYPES_DATETIME]);
  804. UA_Array_delete(dateTimeArray, sizeIterator, &UA_TYPES[UA_TYPES_DATETIME]);
  805. }
  806. else {
  807. dateTimeTypeVal = 0;
  808. UA_Variant_setScalar(&vAttr.value, &dateTimeTypeVal, &UA_TYPES[UA_TYPES_DATETIME]);
  809. }
  810. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  811. break;
  812. case UA_NS0ID_GUID:
  813. if(vAttr.arrayDimensionsSize > 0) {
  814. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  815. UA_Guid *guidArray = (UA_Guid*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_GUID]);
  816. UA_Variant_setArrayCopy(&vAttr.value, guidArray, sizeIterator, &UA_TYPES[UA_TYPES_GUID]);
  817. UA_Array_delete(guidArray, sizeIterator, &UA_TYPES[UA_TYPES_GUID]);
  818. }
  819. else {
  820. UA_Guid guidTypeVal = UA_GUID_NULL;
  821. UA_Variant_setScalar(&vAttr.value, &guidTypeVal, &UA_TYPES[UA_TYPES_GUID]);
  822. }
  823. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  824. break;
  825. case UA_NS0ID_NODEID:
  826. if(vAttr.arrayDimensionsSize > 0) {
  827. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  828. UA_NodeId *nodeidArray = (UA_NodeId*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_NODEID]);
  829. UA_Variant_setArrayCopy(&vAttr.value, nodeidArray, sizeIterator, &UA_TYPES[UA_TYPES_NODEID]);
  830. UA_Array_delete(nodeidArray, sizeIterator, &UA_TYPES[UA_TYPES_NODEID]);
  831. }
  832. else {
  833. UA_NodeId nodeidTypeVal = UA_NODEID_NULL;
  834. UA_Variant_setScalar(&vAttr.value, &nodeidTypeVal, &UA_TYPES[UA_TYPES_NODEID]);
  835. }
  836. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  837. break;
  838. case UA_NS0ID_BYTESTRING:
  839. if(vAttr.arrayDimensionsSize > 0) {
  840. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  841. UA_ByteString *byteStringArray = (UA_ByteString*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_BYTESTRING]);
  842. UA_Variant_setArrayCopy(&vAttr.value, byteStringArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTESTRING]);
  843. UA_Array_delete(byteStringArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTESTRING]);
  844. }
  845. else {
  846. UA_ByteString byteStringTypeVal = UA_BYTESTRING_NULL;
  847. UA_Variant_setScalar(&vAttr.value, &byteStringTypeVal, &UA_TYPES[UA_TYPES_BYTESTRING]);
  848. }
  849. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  850. break;
  851. default:
  852. /* Type not supported */
  853. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "Type %u not supported, create empty variable.", pDataSetReader->config.dataSetMetaData.fields[iteratorField].builtInType);
  854. break;
  855. }
  856. vAttr.accessLevel = UA_ACCESSLEVELMASK_READ;
  857. UA_LocalizedText_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].description, &vAttr.description);
  858. UA_QualifiedName qn;
  859. UA_QualifiedName_init(&qn);
  860. char szTmpName[UA_MAX_SIZENAME];
  861. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length > 0) {
  862. UA_UInt16 slen = UA_MAX_SIZENAME -1;
  863. vAttr.displayName.locale = UA_STRING("en-US");
  864. vAttr.displayName.text = pDataSetReader->config.dataSetMetaData.fields[iteratorField].name;
  865. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length < slen) {
  866. slen = (UA_UInt16)pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length;
  867. UA_snprintf(szTmpName, sizeof(szTmpName), "%s", (const char*)pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.data);
  868. }
  869. szTmpName[slen] = '\0';
  870. qn = UA_QUALIFIEDNAME(1, szTmpName);
  871. }
  872. else {
  873. strcpy(szTmpName, "SubscribedVariable");
  874. vAttr.displayName = UA_LOCALIZEDTEXT("en-US", szTmpName);
  875. qn = UA_QUALIFIEDNAME(1, "SubscribedVariable");
  876. }
  877. /* Add variable to the given parent node */
  878. UA_NodeId newNode;
  879. retVal = UA_Server_addVariableNode(server, UA_NODEID_NULL, *parentNode,
  880. UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), qn,
  881. UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), vAttr, NULL, &newNode);
  882. if(retVal == UA_STATUSCODE_GOOD) {
  883. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "addVariableNode %s succeeded", szTmpName);
  884. }
  885. else {
  886. retval = retVal;
  887. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_USERLAND, "addVariableNode: error 0x%x", retVal);
  888. }
  889. UA_FieldTargetDataType_init(&targetVars.targetVariables[iteratorField]);
  890. targetVars.targetVariables[iteratorField].attributeId = UA_ATTRIBUTEID_VALUE;
  891. UA_NodeId_copy(&newNode, &targetVars.targetVariables[iteratorField].targetNodeId);
  892. UA_NodeId_deleteMembers(&newNode);
  893. if(vAttr.arrayDimensionsSize > 0) {
  894. UA_Variant_deleteMembers(&vAttr.value);
  895. UA_Array_delete(vAttr.arrayDimensions, vAttr.arrayDimensionsSize, &UA_TYPES[UA_TYPES_UINT32]);
  896. }
  897. }
  898. if(sdsType == UA_PUBSUB_SDS_TARGET) {
  899. retval = UA_Server_DataSetReader_createTargetVariables(server, pDataSetReader->identifier, &targetVars);
  900. }
  901. UA_TargetVariablesDataType_deleteMembers(&targetVars);
  902. return retval;
  903. }
  904. /**
  905. * Process a NetworkMessage with a DataSetReader.
  906. *
  907. * @param server
  908. * @param dataSetReader
  909. * @param dataSetMsg
  910. */
  911. void UA_Server_DataSetReader_process(UA_Server *server, UA_DataSetReader *dataSetReader, UA_DataSetMessage* dataSetMsg) {
  912. if((dataSetReader == NULL) || (dataSetMsg == NULL) || (server == NULL)) {
  913. return;
  914. }
  915. if(!dataSetMsg->header.dataSetMessageValid) {
  916. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: message is not valid");
  917. /* To Do check ConfigurationVersion*/
  918. /*if(dataSetMsg->header.configVersionMajorVersionEnabled)
  919. * {
  920. * if(dataSetMsg->header.configVersionMajorVersion != dataSetReader->config.dataSetMetaData.configurationVersion.majorVersion)
  921. * {
  922. * UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: ConfigurationVersion MajorVersion does not match");
  923. * return;
  924. * }
  925. } */
  926. }
  927. else {
  928. if(dataSetMsg->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) {
  929. if(dataSetMsg->header.fieldEncoding != UA_FIELDENCODING_RAWDATA) {
  930. size_t anzFields = dataSetMsg->data.keyFrameData.fieldCount;
  931. if(dataSetReader->config.dataSetMetaData.fieldsSize < anzFields) {
  932. anzFields = dataSetReader->config.dataSetMetaData.fieldsSize;
  933. }
  934. if(dataSetReader->subscribedDataSetTarget.targetVariablesSize < anzFields) {
  935. anzFields = dataSetReader->subscribedDataSetTarget.targetVariablesSize;
  936. }
  937. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  938. for (UA_UInt16 iteratorField = 0; iteratorField < anzFields; iteratorField++) {
  939. if(dataSetMsg->data.keyFrameData.dataSetFields[iteratorField].hasValue) {
  940. if(dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].attributeId == UA_ATTRIBUTEID_VALUE) {
  941. retVal = UA_Server_writeValue(server, dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].targetNodeId, dataSetMsg->data.keyFrameData.dataSetFields[iteratorField].value);
  942. if(retVal != UA_STATUSCODE_GOOD) {
  943. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write Value KF %u: 0x%x", iteratorField, retVal);
  944. }
  945. }
  946. else {
  947. UA_WriteValue writeVal;
  948. UA_WriteValue_init(&writeVal);
  949. writeVal.attributeId = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].attributeId;
  950. writeVal.indexRange = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].receiverIndexRange;
  951. writeVal.nodeId = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].targetNodeId;
  952. UA_DataValue_copy(&dataSetMsg->data.keyFrameData.dataSetFields[iteratorField], &writeVal.value);
  953. retVal = UA_Server_write(server, &writeVal);
  954. if(retVal != UA_STATUSCODE_GOOD) {
  955. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write KF %u: 0x%x", iteratorField, retVal);
  956. }
  957. }
  958. }
  959. }
  960. }
  961. }
  962. }
  963. }
  964. /**
  965. * Copy the config of the DataSetReader.
  966. *
  967. * @param src
  968. * @param dst
  969. * @return UA_STATUSCODE_GOOD on success
  970. */
  971. UA_StatusCode
  972. UA_DataSetReaderConfig_copy(const UA_DataSetReaderConfig *src,
  973. UA_DataSetReaderConfig *dst) {
  974. memset(dst, 0, sizeof(UA_DataSetReaderConfig));
  975. UA_StatusCode retVal = UA_String_copy(&src->name, &dst->name);
  976. if(retVal != UA_STATUSCODE_GOOD) {
  977. return retVal;
  978. }
  979. retVal = UA_Variant_copy(&src->publisherId, &dst->publisherId);
  980. if(retVal != UA_STATUSCODE_GOOD) {
  981. return retVal;
  982. }
  983. dst->writerGroupId = src->writerGroupId;
  984. dst->dataSetWriterId = src->dataSetWriterId;
  985. retVal = UA_DataSetMetaDataType_copy(&src->dataSetMetaData, &dst->dataSetMetaData);
  986. if(retVal != UA_STATUSCODE_GOOD) {
  987. return retVal;
  988. }
  989. dst->dataSetFieldContentMask = src->dataSetFieldContentMask;
  990. dst->messageReceiveTimeout = src->messageReceiveTimeout;
  991. /* Currently memcpy is used to copy the securityParameters */
  992. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  993. retVal = UA_UadpDataSetReaderMessageDataType_copy(&src->messageSettings, &dst->messageSettings);
  994. if(retVal != UA_STATUSCODE_GOOD) {
  995. return retVal;
  996. }
  997. return UA_STATUSCODE_GOOD;
  998. }
  999. /**
  1000. * Delete the DataSetReader.
  1001. *
  1002. * @param server
  1003. * @param dataSetReader
  1004. */
  1005. void UA_DataSetReader_delete(UA_Server *server, UA_DataSetReader *dataSetReader) {
  1006. /* Delete DataSetReader config */
  1007. UA_String_deleteMembers(&dataSetReader->config.name);
  1008. UA_Variant_deleteMembers(&dataSetReader->config.publisherId);
  1009. UA_DataSetMetaDataType_deleteMembers(&dataSetReader->config.dataSetMetaData);
  1010. UA_UadpDataSetReaderMessageDataType_deleteMembers(&dataSetReader->config.messageSettings);
  1011. UA_TargetVariablesDataType_deleteMembers(&dataSetReader->subscribedDataSetTarget);
  1012. /* Delete DataSetReader */
  1013. UA_ReaderGroup* pGroup = UA_ReaderGroup_findRGbyId(server, dataSetReader->linkedReaderGroup);
  1014. if(pGroup != NULL) {
  1015. pGroup->readersCount--;
  1016. }
  1017. UA_NodeId_deleteMembers(&dataSetReader->identifier);
  1018. UA_NodeId_deleteMembers(&dataSetReader->linkedReaderGroup);
  1019. /* Remove DataSetReader from group */
  1020. LIST_REMOVE(dataSetReader, listEntry);
  1021. /* Free memory allocated for DataSetReader */
  1022. UA_free(dataSetReader);
  1023. }
  1024. /**********************************************/
  1025. /* PublishedDataSet */
  1026. /**********************************************/
  1027. UA_StatusCode
  1028. UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
  1029. UA_PublishedDataSetConfig *dst) {
  1030. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1031. memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
  1032. retVal |= UA_String_copy(&src->name, &dst->name);
  1033. switch(src->publishedDataSetType){
  1034. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  1035. //no additional items
  1036. break;
  1037. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  1038. if(src->config.itemsTemplate.variablesToAddSize > 0){
  1039. dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
  1040. src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
  1041. }
  1042. for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
  1043. retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
  1044. &dst->config.itemsTemplate.variablesToAdd[i]);
  1045. }
  1046. retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
  1047. &dst->config.itemsTemplate.metaData);
  1048. break;
  1049. default:
  1050. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1051. }
  1052. return retVal;
  1053. }
  1054. UA_StatusCode
  1055. UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
  1056. UA_PublishedDataSetConfig *config){
  1057. if(!config)
  1058. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1059. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  1060. if(!currentPublishedDataSet)
  1061. return UA_STATUSCODE_BADNOTFOUND;
  1062. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  1063. //deep copy of the actual config
  1064. UA_PublishedDataSetConfig_copy(&currentPublishedDataSet->config, &tmpPublishedDataSetConfig);
  1065. *config = tmpPublishedDataSetConfig;
  1066. return UA_STATUSCODE_GOOD;
  1067. }
  1068. UA_PublishedDataSet *
  1069. UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
  1070. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  1071. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
  1072. return &server->pubSubManager.publishedDataSets[i];
  1073. }
  1074. }
  1075. return NULL;
  1076. }
  1077. void
  1078. UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
  1079. //delete pds config
  1080. UA_String_deleteMembers(&pdsConfig->name);
  1081. switch (pdsConfig->publishedDataSetType){
  1082. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  1083. //no additional items
  1084. break;
  1085. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  1086. if(pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
  1087. for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
  1088. UA_PublishedVariableDataType_deleteMembers(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
  1089. }
  1090. UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
  1091. }
  1092. UA_DataSetMetaDataType_deleteMembers(&pdsConfig->config.itemsTemplate.metaData);
  1093. break;
  1094. default:
  1095. break;
  1096. }
  1097. }
  1098. void
  1099. UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
  1100. UA_PublishedDataSetConfig_deleteMembers(&publishedDataSet->config);
  1101. //delete PDS
  1102. UA_DataSetMetaDataType_deleteMembers(&publishedDataSet->dataSetMetaData);
  1103. UA_DataSetField *field, *tmpField;
  1104. LIST_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
  1105. UA_Server_removeDataSetField(server, field->identifier);
  1106. }
  1107. UA_NodeId_deleteMembers(&publishedDataSet->identifier);
  1108. }
  1109. UA_DataSetFieldResult
  1110. UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
  1111. const UA_DataSetFieldConfig *fieldConfig,
  1112. UA_NodeId *fieldIdentifier) {
  1113. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1114. UA_DataSetFieldResult result = {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
  1115. if(!fieldConfig)
  1116. return result;
  1117. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
  1118. if(currentDataSet == NULL){
  1119. result.result = UA_STATUSCODE_BADNOTFOUND;
  1120. return result;
  1121. }
  1122. if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
  1123. result.result = UA_STATUSCODE_BADNOTIMPLEMENTED;
  1124. return result;
  1125. }
  1126. UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
  1127. if(!newField){
  1128. result.result = UA_STATUSCODE_BADINTERNALERROR;
  1129. return result;
  1130. }
  1131. UA_DataSetFieldConfig tmpFieldConfig;
  1132. retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
  1133. newField->config = tmpFieldConfig;
  1134. UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
  1135. if(fieldIdentifier != NULL){
  1136. UA_NodeId_copy(&newField->identifier, fieldIdentifier);
  1137. }
  1138. newField->publishedDataSet = currentDataSet->identifier;
  1139. //update major version of parent published data set
  1140. currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
  1141. LIST_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
  1142. if(newField->config.field.variable.promotedField)
  1143. currentDataSet->promotedFieldsCount++;
  1144. currentDataSet->fieldSize++;
  1145. result.result = retVal;
  1146. result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1147. result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1148. return result;
  1149. }
  1150. UA_DataSetFieldResult
  1151. UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
  1152. UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
  1153. UA_DataSetFieldResult result = {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  1154. if(!currentField)
  1155. return result;
  1156. UA_PublishedDataSet *parentPublishedDataSet =
  1157. UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
  1158. if(!parentPublishedDataSet)
  1159. return result;
  1160. parentPublishedDataSet->fieldSize--;
  1161. if(currentField->config.field.variable.promotedField)
  1162. parentPublishedDataSet->promotedFieldsCount--;
  1163. /* update major version of PublishedDataSet */
  1164. parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
  1165. UA_PubSubConfigurationVersionTimeDifference();
  1166. UA_DataSetField_deleteMembers(currentField);
  1167. LIST_REMOVE(currentField, listEntry);
  1168. UA_free(currentField);
  1169. result.result = UA_STATUSCODE_GOOD;
  1170. result.configurationVersion.majorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1171. result.configurationVersion.minorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1172. return result;
  1173. }
  1174. /**********************************************/
  1175. /* DataSetWriter */
  1176. /**********************************************/
  1177. UA_StatusCode
  1178. UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
  1179. UA_DataSetWriterConfig *dst){
  1180. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1181. memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
  1182. retVal |= UA_String_copy(&src->name, &dst->name);
  1183. retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
  1184. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  1185. dst->dataSetWriterProperties = (UA_KeyValuePair *)
  1186. UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
  1187. if(!dst->dataSetWriterProperties)
  1188. return UA_STATUSCODE_BADOUTOFMEMORY;
  1189. for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
  1190. retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
  1191. }
  1192. return retVal;
  1193. }
  1194. UA_StatusCode
  1195. UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
  1196. UA_DataSetWriterConfig *config){
  1197. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1198. if(!config)
  1199. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1200. UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  1201. if(!currentDataSetWriter)
  1202. return UA_STATUSCODE_BADNOTFOUND;
  1203. UA_DataSetWriterConfig tmpWriterConfig;
  1204. //deep copy of the actual config
  1205. retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
  1206. *config = tmpWriterConfig;
  1207. return retVal;
  1208. }
  1209. UA_DataSetWriter *
  1210. UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
  1211. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  1212. UA_WriterGroup *tmpWriterGroup;
  1213. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
  1214. UA_DataSetWriter *tmpWriter;
  1215. LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
  1216. if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
  1217. return tmpWriter;
  1218. }
  1219. }
  1220. }
  1221. }
  1222. return NULL;
  1223. }
  1224. void
  1225. UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig) {
  1226. UA_String_deleteMembers(&pdsConfig->name);
  1227. UA_String_deleteMembers(&pdsConfig->dataSetName);
  1228. for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
  1229. UA_KeyValuePair_deleteMembers(&pdsConfig->dataSetWriterProperties[i]);
  1230. }
  1231. UA_free(pdsConfig->dataSetWriterProperties);
  1232. UA_ExtensionObject_deleteMembers(&pdsConfig->messageSettings);
  1233. }
  1234. static void
  1235. UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
  1236. UA_DataSetWriterConfig_deleteMembers(&dataSetWriter->config);
  1237. //delete DataSetWriter
  1238. UA_NodeId_deleteMembers(&dataSetWriter->identifier);
  1239. UA_NodeId_deleteMembers(&dataSetWriter->linkedWriterGroup);
  1240. UA_NodeId_deleteMembers(&dataSetWriter->connectedDataSet);
  1241. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1242. //delete lastSamples store
  1243. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
  1244. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  1245. }
  1246. UA_free(dataSetWriter->lastSamples);
  1247. dataSetWriter->lastSamples = NULL;
  1248. dataSetWriter->lastSamplesCount = 0;
  1249. #endif
  1250. }
  1251. /**********************************************/
  1252. /* WriterGroup */
  1253. /**********************************************/
  1254. UA_StatusCode
  1255. UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
  1256. UA_WriterGroupConfig *dst){
  1257. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1258. memcpy(dst, src, sizeof(UA_WriterGroupConfig));
  1259. retVal |= UA_String_copy(&src->name, &dst->name);
  1260. retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
  1261. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  1262. dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
  1263. if(!dst->groupProperties)
  1264. return UA_STATUSCODE_BADOUTOFMEMORY;
  1265. for(size_t i = 0; i < src->groupPropertiesSize; i++){
  1266. retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
  1267. }
  1268. return retVal;
  1269. }
  1270. UA_StatusCode
  1271. UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
  1272. UA_WriterGroupConfig *config){
  1273. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1274. if(!config)
  1275. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1276. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
  1277. if(!currentWriterGroup){
  1278. return UA_STATUSCODE_BADNOTFOUND;
  1279. }
  1280. UA_WriterGroupConfig tmpWriterGroupConfig;
  1281. //deep copy of the actual config
  1282. retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
  1283. *config = tmpWriterGroupConfig;
  1284. return retVal;
  1285. }
  1286. UA_StatusCode
  1287. UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
  1288. const UA_WriterGroupConfig *config){
  1289. if(!config)
  1290. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1291. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
  1292. if(!currentWriterGroup)
  1293. return UA_STATUSCODE_BADNOTFOUND;
  1294. //The update functionality will be extended during the next PubSub batches.
  1295. //Currently is only a change of the publishing interval possible.
  1296. if(currentWriterGroup->config.maxEncapsulatedDataSetMessageCount != config->maxEncapsulatedDataSetMessageCount){
  1297. currentWriterGroup->config.maxEncapsulatedDataSetMessageCount = config->maxEncapsulatedDataSetMessageCount;
  1298. if(currentWriterGroup->config.messageSettings.encoding == UA_EXTENSIONOBJECT_ENCODED_NOBODY) {
  1299. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1300. "MaxEncapsulatedDataSetMessag need enabled 'PayloadHeader' within the message settings.");
  1301. }
  1302. }
  1303. if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
  1304. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
  1305. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  1306. UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
  1307. }
  1308. if(currentWriterGroup->config.priority != config->priority) {
  1309. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1310. "No or unsupported WriterGroup update.");
  1311. }
  1312. return UA_STATUSCODE_GOOD;
  1313. }
  1314. UA_WriterGroup *
  1315. UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
  1316. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  1317. UA_WriterGroup *tmpWriterGroup;
  1318. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
  1319. if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
  1320. return tmpWriterGroup;
  1321. }
  1322. }
  1323. }
  1324. return NULL;
  1325. }
  1326. void
  1327. UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig){
  1328. //delete writerGroup config
  1329. UA_String_deleteMembers(&writerGroupConfig->name);
  1330. UA_ExtensionObject_deleteMembers(&writerGroupConfig->transportSettings);
  1331. UA_ExtensionObject_deleteMembers(&writerGroupConfig->messageSettings);
  1332. for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
  1333. UA_KeyValuePair_deleteMembers(&writerGroupConfig->groupProperties[i]);
  1334. }
  1335. UA_free(writerGroupConfig->groupProperties);
  1336. }
  1337. static void
  1338. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup) {
  1339. UA_WriterGroupConfig_deleteMembers(&writerGroup->config);
  1340. //delete WriterGroup
  1341. //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
  1342. UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
  1343. LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
  1344. UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
  1345. }
  1346. UA_NodeId_deleteMembers(&writerGroup->linkedConnection);
  1347. UA_NodeId_deleteMembers(&writerGroup->identifier);
  1348. }
  1349. UA_StatusCode
  1350. UA_Server_addDataSetWriter(UA_Server *server,
  1351. const UA_NodeId writerGroup, const UA_NodeId dataSet,
  1352. const UA_DataSetWriterConfig *dataSetWriterConfig,
  1353. UA_NodeId *writerIdentifier) {
  1354. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1355. if(!dataSetWriterConfig)
  1356. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1357. UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
  1358. if(!currentDataSetContext)
  1359. return UA_STATUSCODE_BADNOTFOUND;
  1360. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  1361. if(!wg)
  1362. return UA_STATUSCODE_BADNOTFOUND;
  1363. UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
  1364. if(!newDataSetWriter)
  1365. return UA_STATUSCODE_BADOUTOFMEMORY;
  1366. //copy the config into the new dataSetWriter
  1367. UA_DataSetWriterConfig tmpDataSetWriterConfig;
  1368. retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
  1369. newDataSetWriter->config = tmpDataSetWriterConfig;
  1370. //save the current version of the connected PublishedDataSet
  1371. newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
  1372. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1373. //initialize the queue for the last values
  1374. newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
  1375. UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
  1376. if(!newDataSetWriter->lastSamples) {
  1377. UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
  1378. UA_free(newDataSetWriter);
  1379. return UA_STATUSCODE_BADOUTOFMEMORY;
  1380. }
  1381. newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
  1382. #endif
  1383. //connect PublishedDataSet with DataSetWriter
  1384. newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
  1385. newDataSetWriter->linkedWriterGroup = wg->identifier;
  1386. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
  1387. if(writerIdentifier != NULL)
  1388. UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
  1389. //add the new writer to the group
  1390. LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
  1391. wg->writersCount++;
  1392. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  1393. addDataSetWriterRepresentation(server, newDataSetWriter);
  1394. #endif
  1395. return retVal;
  1396. }
  1397. UA_StatusCode
  1398. UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
  1399. UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  1400. if(!dataSetWriter)
  1401. return UA_STATUSCODE_BADNOTFOUND;
  1402. UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
  1403. if(!linkedWriterGroup)
  1404. return UA_STATUSCODE_BADNOTFOUND;
  1405. linkedWriterGroup->writersCount--;
  1406. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  1407. removeDataSetWriterRepresentation(server, dataSetWriter);
  1408. #endif
  1409. //remove DataSetWriter from group
  1410. UA_DataSetWriter_deleteMembers(server, dataSetWriter);
  1411. LIST_REMOVE(dataSetWriter, listEntry);
  1412. UA_free(dataSetWriter);
  1413. return UA_STATUSCODE_GOOD;
  1414. }
  1415. /**********************************************/
  1416. /* DataSetField */
  1417. /**********************************************/
  1418. UA_StatusCode
  1419. UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
  1420. memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
  1421. if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
  1422. UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
  1423. UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
  1424. &dst->field.variable.publishParameters);
  1425. } else {
  1426. return UA_STATUSCODE_BADNOTSUPPORTED;
  1427. }
  1428. return UA_STATUSCODE_GOOD;
  1429. }
  1430. UA_StatusCode
  1431. UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
  1432. UA_DataSetFieldConfig *config) {
  1433. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1434. if(!config)
  1435. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1436. UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
  1437. if(!currentDataSetField)
  1438. return UA_STATUSCODE_BADNOTFOUND;
  1439. UA_DataSetFieldConfig tmpFieldConfig;
  1440. //deep copy of the actual config
  1441. retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
  1442. *config = tmpFieldConfig;
  1443. return retVal;
  1444. }
  1445. UA_DataSetField *
  1446. UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
  1447. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  1448. UA_DataSetField *tmpField;
  1449. LIST_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
  1450. if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
  1451. return tmpField;
  1452. }
  1453. }
  1454. }
  1455. return NULL;
  1456. }
  1457. void
  1458. UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig){
  1459. if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
  1460. UA_String_deleteMembers(&dataSetFieldConfig->field.variable.fieldNameAlias);
  1461. UA_PublishedVariableDataType_deleteMembers(&dataSetFieldConfig->field.variable.publishParameters);
  1462. }
  1463. }
  1464. static void
  1465. UA_DataSetField_deleteMembers(UA_DataSetField *field) {
  1466. UA_DataSetFieldConfig_deleteMembers(&field->config);
  1467. //delete DataSetField
  1468. UA_NodeId_deleteMembers(&field->identifier);
  1469. UA_NodeId_deleteMembers(&field->publishedDataSet);
  1470. UA_FieldMetaData_deleteMembers(&field->fieldMetaData);
  1471. }
  1472. /*********************************************************/
  1473. /* PublishValues handling */
  1474. /*********************************************************/
  1475. /**
  1476. * Compare two variants. Internally used for value change detection.
  1477. *
  1478. * @return true if the value has changed
  1479. */
  1480. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1481. static UA_Boolean
  1482. valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
  1483. if(! (oldValue && newValue))
  1484. return false;
  1485. UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
  1486. size_t oldValueEncodingSize, newValueEncodingSize;
  1487. oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1488. newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1489. if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
  1490. return false;
  1491. if(oldValueEncodingSize != newValueEncodingSize)
  1492. return true;
  1493. if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
  1494. return false;
  1495. if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
  1496. return false;
  1497. UA_Byte *bufPosOldValue = oldValueEncoding->data;
  1498. const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
  1499. UA_Byte *bufPosNewValue = newValueEncoding->data;
  1500. const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
  1501. if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
  1502. &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1503. return false;
  1504. }
  1505. if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
  1506. &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1507. return false;
  1508. }
  1509. oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
  1510. newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
  1511. UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
  1512. UA_ByteString_delete(oldValueEncoding);
  1513. UA_ByteString_delete(newValueEncoding);
  1514. return compareResult;
  1515. }
  1516. #endif
  1517. /**
  1518. * Obtain the latest value for a specific DataSetField. This method is currently
  1519. * called inside the DataSetMessage generation process.
  1520. */
  1521. static void
  1522. UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
  1523. UA_DataValue *value) {
  1524. /* Read the value */
  1525. UA_ReadValueId rvid;
  1526. UA_ReadValueId_init(&rvid);
  1527. rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
  1528. rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
  1529. rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
  1530. *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
  1531. }
  1532. static UA_StatusCode
  1533. UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1534. UA_DataSetWriter *dataSetWriter) {
  1535. UA_PublishedDataSet *currentDataSet =
  1536. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1537. if(!currentDataSet)
  1538. return UA_STATUSCODE_BADNOTFOUND;
  1539. /* Prepare DataSetMessageContent */
  1540. dataSetMessage->header.dataSetMessageValid = true;
  1541. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
  1542. dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
  1543. dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
  1544. UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
  1545. if(!dataSetMessage->data.keyFrameData.dataSetFields)
  1546. return UA_STATUSCODE_BADOUTOFMEMORY;
  1547. #ifdef UA_ENABLE_JSON_ENCODING
  1548. /* json: insert fieldnames used as json keys */
  1549. dataSetMessage->data.keyFrameData.fieldNames =
  1550. (UA_String *)UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_STRING]);
  1551. if(!dataSetMessage->data.keyFrameData.fieldNames)
  1552. return UA_STATUSCODE_BADOUTOFMEMORY;
  1553. #endif
  1554. /* Loop over the fields */
  1555. size_t counter = 0;
  1556. UA_DataSetField *dsf;
  1557. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1558. #ifdef UA_ENABLE_JSON_ENCODING
  1559. /* json: store the fieldNameAlias*/
  1560. UA_String_copy(&dsf->config.field.variable.fieldNameAlias,
  1561. &dataSetMessage->data.keyFrameData.fieldNames[counter]);
  1562. #endif
  1563. /* Sample the value */
  1564. UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
  1565. UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
  1566. /* Deactivate statuscode? */
  1567. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1568. dfv->hasStatus = false;
  1569. /* Deactivate timestamps */
  1570. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1571. (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1572. dfv->hasSourceTimestamp = false;
  1573. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1574. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1575. dfv->hasSourcePicoseconds = false;
  1576. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1577. (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1578. dfv->hasServerTimestamp = false;
  1579. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1580. (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1581. dfv->hasServerPicoseconds = false;
  1582. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1583. /* Update lastValue store */
  1584. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  1585. UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
  1586. #endif
  1587. counter++;
  1588. }
  1589. return UA_STATUSCODE_GOOD;
  1590. }
  1591. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1592. static UA_StatusCode
  1593. UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
  1594. UA_DataSetMessage *dataSetMessage,
  1595. UA_DataSetWriter *dataSetWriter) {
  1596. UA_PublishedDataSet *currentDataSet =
  1597. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1598. if(!currentDataSet)
  1599. return UA_STATUSCODE_BADNOTFOUND;
  1600. /* Prepare DataSetMessageContent */
  1601. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1602. dataSetMessage->header.dataSetMessageValid = true;
  1603. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
  1604. UA_DataSetField *dsf;
  1605. size_t counter = 0;
  1606. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1607. /* Sample the value */
  1608. UA_DataValue value;
  1609. UA_DataValue_init(&value);
  1610. UA_PubSubDataSetField_sampleValue(server, dsf, &value);
  1611. /* Check if the value has changed */
  1612. if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
  1613. /* increase fieldCount for current delta message */
  1614. dataSetMessage->data.deltaFrameData.fieldCount++;
  1615. dataSetWriter->lastSamples[counter].valueChanged = true;
  1616. /* Update last stored sample */
  1617. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  1618. dataSetWriter->lastSamples[counter].value = value;
  1619. } else {
  1620. UA_DataValue_deleteMembers(&value);
  1621. dataSetWriter->lastSamples[counter].valueChanged = false;
  1622. }
  1623. counter++;
  1624. }
  1625. /* Allocate DeltaFrameFields */
  1626. UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
  1627. UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
  1628. if(!deltaFields)
  1629. return UA_STATUSCODE_BADOUTOFMEMORY;
  1630. dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
  1631. size_t currentDeltaField = 0;
  1632. for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
  1633. if(!dataSetWriter->lastSamples[i].valueChanged)
  1634. continue;
  1635. UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
  1636. dff->fieldIndex = (UA_UInt16) i;
  1637. UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
  1638. dataSetWriter->lastSamples[i].valueChanged = false;
  1639. /* Deactivate statuscode? */
  1640. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1641. dff->fieldValue.hasStatus = false;
  1642. /* Deactivate timestamps? */
  1643. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1644. dff->fieldValue.hasSourceTimestamp = false;
  1645. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1646. dff->fieldValue.hasServerPicoseconds = false;
  1647. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1648. dff->fieldValue.hasServerTimestamp = false;
  1649. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1650. dff->fieldValue.hasServerPicoseconds = false;
  1651. currentDeltaField++;
  1652. }
  1653. return UA_STATUSCODE_GOOD;
  1654. }
  1655. #endif
  1656. /**
  1657. * Generate a DataSetMessage for the given writer.
  1658. *
  1659. * @param dataSetWriter ptr to corresponding writer
  1660. * @return ptr to generated DataSetMessage
  1661. */
  1662. static UA_StatusCode
  1663. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1664. UA_DataSetWriter *dataSetWriter) {
  1665. UA_PublishedDataSet *currentDataSet =
  1666. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1667. if(!currentDataSet)
  1668. return UA_STATUSCODE_BADNOTFOUND;
  1669. /* Reset the message */
  1670. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1671. /* store messageType to switch between json or uadp (default) */
  1672. UA_UInt16 messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1673. UA_JsonDataSetWriterMessageDataType *jsonDataSetWriterMessageDataType = NULL;
  1674. /* The configuration Flags are included
  1675. * inside the std. defined UA_UadpDataSetWriterMessageDataType */
  1676. UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
  1677. UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
  1678. if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1679. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1680. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1681. &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
  1682. dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
  1683. dataSetWriter->config.messageSettings.content.decoded.data;
  1684. /* type is UADP */
  1685. messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1686. } else if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1687. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1688. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1689. &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE])) {
  1690. jsonDataSetWriterMessageDataType = (UA_JsonDataSetWriterMessageDataType *)
  1691. dataSetWriter->config.messageSettings.content.decoded.data;
  1692. /* type is JSON */
  1693. messageType = UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE;
  1694. } else {
  1695. /* create default flag configuration if no
  1696. * UadpDataSetWriterMessageDataType was passed in */
  1697. memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
  1698. defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
  1699. ((u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
  1700. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
  1701. dataSetWriterMessageDataType = &defaultUadpConfiguration;
  1702. }
  1703. /* Sanity-test the configuration */
  1704. if(dataSetWriterMessageDataType &&
  1705. (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
  1706. dataSetWriterMessageDataType->dataSetOffset != 0 ||
  1707. dataSetWriterMessageDataType->configuredSize != 0)) {
  1708. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1709. "Static DSM configuration not supported. Using defaults");
  1710. dataSetWriterMessageDataType->networkMessageNumber = 0;
  1711. dataSetWriterMessageDataType->dataSetOffset = 0;
  1712. dataSetWriterMessageDataType->configuredSize = 0;
  1713. }
  1714. /* The field encoding depends on the flags inside the writer config.
  1715. * TODO: This can be moved to the encoding layer. */
  1716. if(dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_RAWDATA
  1717. ) {
  1718. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
  1719. } else if((u64)dataSetWriter->config.dataSetFieldContentMask &
  1720. ((u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
  1721. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
  1722. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
  1723. } else {
  1724. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
  1725. }
  1726. if(messageType == UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE) {
  1727. /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
  1728. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1729. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION) {
  1730. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1731. dataSetMessage->header.configVersionMajorVersion =
  1732. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1733. }
  1734. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1735. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION) {
  1736. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1737. dataSetMessage->header.configVersionMinorVersion =
  1738. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1739. }
  1740. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1741. (u64)UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1742. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1743. dataSetMessage->header.dataSetMessageSequenceNr =
  1744. dataSetWriter->actualDataSetMessageSequenceCount;
  1745. }
  1746. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1747. (u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1748. dataSetMessage->header.timestampEnabled = true;
  1749. dataSetMessage->header.timestamp = UA_DateTime_now();
  1750. }
  1751. /* TODO: Picoseconds resolution not supported atm */
  1752. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1753. (u64)UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
  1754. dataSetMessage->header.picoSecondsIncluded = false;
  1755. }
  1756. /* TODO: Statuscode not supported yet */
  1757. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1758. (u64)UA_UADPDATASETMESSAGECONTENTMASK_STATUS) {
  1759. dataSetMessage->header.statusEnabled = false;
  1760. }
  1761. } else if(messageType == UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE) {
  1762. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1763. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1764. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1765. dataSetMessage->header.configVersionMajorVersion =
  1766. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1767. }
  1768. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1769. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1770. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1771. dataSetMessage->header.configVersionMinorVersion =
  1772. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1773. }
  1774. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1775. (u64)UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1776. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1777. dataSetMessage->header.dataSetMessageSequenceNr =
  1778. dataSetWriter->actualDataSetMessageSequenceCount;
  1779. }
  1780. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1781. (u64)UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1782. dataSetMessage->header.timestampEnabled = true;
  1783. dataSetMessage->header.timestamp = UA_DateTime_now();
  1784. }
  1785. /* TODO: Statuscode not supported yet */
  1786. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1787. (u64)UA_JSONDATASETMESSAGECONTENTMASK_STATUS) {
  1788. dataSetMessage->header.statusEnabled = false;
  1789. }
  1790. }
  1791. /* Set the sequence count. Automatically rolls over to zero */
  1792. dataSetWriter->actualDataSetMessageSequenceCount++;
  1793. /* JSON does not differ between deltaframes and keyframes, only keyframes are currently used. */
  1794. if(messageType != UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
  1795. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1796. /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
  1797. if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
  1798. dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
  1799. /* Remove old samples */
  1800. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
  1801. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  1802. /* Realloc pds dependent memory */
  1803. dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
  1804. UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
  1805. UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1806. if(!newSamplesArray)
  1807. return UA_STATUSCODE_BADOUTOFMEMORY;
  1808. dataSetWriter->lastSamples = newSamplesArray;
  1809. memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1810. dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
  1811. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1812. dataSetWriter->deltaFrameCounter = 0;
  1813. return UA_STATUSCODE_GOOD;
  1814. }
  1815. /* The standard defines: if a PDS contains only one fields no delta messages
  1816. * should be generated because they need more memory than a keyframe with 1
  1817. * field. */
  1818. if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
  1819. dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
  1820. UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
  1821. dataSetWriter->deltaFrameCounter++;
  1822. return UA_STATUSCODE_GOOD;
  1823. }
  1824. dataSetWriter->deltaFrameCounter = 1;
  1825. #endif
  1826. }
  1827. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1828. return UA_STATUSCODE_GOOD;
  1829. }
  1830. static UA_StatusCode
  1831. sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
  1832. UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
  1833. UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
  1834. #ifdef UA_ENABLE_JSON_ENCODING
  1835. UA_NetworkMessage nm;
  1836. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1837. nm.version = 1;
  1838. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1839. nm.payloadHeaderEnabled = true;
  1840. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1841. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1842. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1843. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1844. UA_ByteString buf;
  1845. size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true);
  1846. size_t stackSize = 1;
  1847. if(msgSize <= UA_MAX_STACKBUF)
  1848. stackSize = msgSize;
  1849. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1850. buf.data = stackBuf;
  1851. buf.length = msgSize;
  1852. if(msgSize > UA_MAX_STACKBUF) {
  1853. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1854. if(retval != UA_STATUSCODE_GOOD)
  1855. return retval;
  1856. }
  1857. /* Encode the message */
  1858. UA_Byte *bufPos = buf.data;
  1859. memset(bufPos, 0, msgSize);
  1860. const UA_Byte *bufEnd = &buf.data[buf.length];
  1861. retval = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true);
  1862. if(retval != UA_STATUSCODE_GOOD) {
  1863. if(msgSize > UA_MAX_STACKBUF)
  1864. UA_ByteString_deleteMembers(&buf);
  1865. return retval;
  1866. }
  1867. /* Send the prepared messages */
  1868. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1869. if(msgSize > UA_MAX_STACKBUF)
  1870. UA_ByteString_deleteMembers(&buf);
  1871. #endif
  1872. return retval;
  1873. }
  1874. static UA_StatusCode
  1875. sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
  1876. UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
  1877. UA_ExtensionObject *messageSettings,
  1878. UA_ExtensionObject *transportSettings) {
  1879. if(messageSettings->content.decoded.type !=
  1880. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
  1881. return UA_STATUSCODE_BADINTERNALERROR;
  1882. UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
  1883. messageSettings->content.decoded.data;
  1884. UA_NetworkMessage nm;
  1885. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1886. nm.publisherIdEnabled =
  1887. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
  1888. nm.groupHeaderEnabled =
  1889. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
  1890. nm.groupHeader.writerGroupIdEnabled =
  1891. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
  1892. nm.groupHeader.groupVersionEnabled =
  1893. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
  1894. nm.groupHeader.networkMessageNumberEnabled =
  1895. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
  1896. nm.groupHeader.sequenceNumberEnabled =
  1897. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
  1898. nm.payloadHeaderEnabled =
  1899. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
  1900. nm.timestampEnabled =
  1901. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
  1902. nm.picosecondsEnabled =
  1903. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
  1904. nm.dataSetClassIdEnabled =
  1905. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
  1906. nm.promotedFieldsEnabled =
  1907. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
  1908. nm.version = 1;
  1909. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1910. if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
  1911. nm.publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
  1912. nm.publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
  1913. } else if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
  1914. nm.publisherIdType = UA_PUBLISHERDATATYPE_STRING;
  1915. nm.publisherId.publisherIdString = connection->config->publisherId.string;
  1916. }
  1917. /* Compute the length of the dsm separately for the header */
  1918. UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
  1919. for(UA_Byte i = 0; i < dsmCount; i++)
  1920. dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
  1921. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1922. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1923. nm.groupHeader.writerGroupId = wg->config.writerGroupId;
  1924. nm.groupHeader.networkMessageNumber = 1;
  1925. nm.payload.dataSetPayload.sizes = dsmLengths;
  1926. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1927. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1928. UA_ByteString buf;
  1929. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
  1930. size_t stackSize = 1;
  1931. if(msgSize <= UA_MAX_STACKBUF)
  1932. stackSize = msgSize;
  1933. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1934. buf.data = stackBuf;
  1935. buf.length = msgSize;
  1936. UA_StatusCode retval;
  1937. if(msgSize > UA_MAX_STACKBUF) {
  1938. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1939. if(retval != UA_STATUSCODE_GOOD)
  1940. return retval;
  1941. }
  1942. /* Encode the message */
  1943. UA_Byte *bufPos = buf.data;
  1944. memset(bufPos, 0, msgSize);
  1945. const UA_Byte *bufEnd = &buf.data[buf.length];
  1946. retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
  1947. if(retval != UA_STATUSCODE_GOOD) {
  1948. if(msgSize > UA_MAX_STACKBUF)
  1949. UA_ByteString_deleteMembers(&buf);
  1950. return retval;
  1951. }
  1952. /* Send the prepared messages */
  1953. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1954. if(msgSize > UA_MAX_STACKBUF)
  1955. UA_ByteString_deleteMembers(&buf);
  1956. return retval;
  1957. }
  1958. /* This callback triggers the collection and publish of NetworkMessages and the
  1959. * contained DataSetMessages. */
  1960. void
  1961. UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1962. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Publish Callback");
  1963. if(!writerGroup) {
  1964. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1965. "Publish failed. WriterGroup not found");
  1966. return;
  1967. }
  1968. /* Nothing to do? */
  1969. if(writerGroup->writersCount <= 0)
  1970. return;
  1971. /* Binary or Json encoding? */
  1972. if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
  1973. writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
  1974. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1975. "Publish failed: Unknown encoding type.");
  1976. return;
  1977. }
  1978. /* Find the connection associated with the writer */
  1979. UA_PubSubConnection *connection =
  1980. UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
  1981. if(!connection) {
  1982. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1983. "Publish failed. PubSubConnection invalid.");
  1984. return;
  1985. }
  1986. /* How many DSM can be sent in one NM? */
  1987. UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
  1988. if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
  1989. maxDSM = UA_BYTE_MAX;
  1990. /* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
  1991. if(maxDSM == 0)
  1992. maxDSM = 1;
  1993. /* It is possible to put several DataSetMessages into one NetworkMessage.
  1994. * But only if they do not contain promoted fields. NM with only DSM are
  1995. * sent out right away. The others are kept in a buffer for "batching". */
  1996. size_t dsmCount = 0;
  1997. UA_DataSetWriter *dsw;
  1998. UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
  1999. UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
  2000. LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
  2001. /* Find the dataset */
  2002. UA_PublishedDataSet *pds =
  2003. UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
  2004. if(!pds) {
  2005. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  2006. "PubSub Publish: PublishedDataSet not found");
  2007. continue;
  2008. }
  2009. /* Generate the DSM */
  2010. UA_StatusCode res =
  2011. UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
  2012. if(res != UA_STATUSCODE_GOOD) {
  2013. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  2014. "PubSub Publish: DataSetMessage creation failed");
  2015. continue;
  2016. }
  2017. /* Send right away if there is only this DSM in a NM. If promoted fields
  2018. * are contained in the PublishedDataSet, then this DSM must go into a
  2019. * dedicated NM as well. */
  2020. if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
  2021. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  2022. res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
  2023. &dsw->config.dataSetWriterId, 1,
  2024. &writerGroup->config.messageSettings,
  2025. &writerGroup->config.transportSettings);
  2026. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  2027. res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
  2028. &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
  2029. }
  2030. if(res != UA_STATUSCODE_GOOD)
  2031. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  2032. "PubSub Publish: Could not send a NetworkMessage");
  2033. UA_DataSetMessage_free(&dsmStore[dsmCount]);
  2034. continue;
  2035. }
  2036. dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
  2037. dsmCount++;
  2038. }
  2039. /* Send the NetworkMessages with batched DataSetMessages */
  2040. size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
  2041. for(UA_UInt32 i = 0; i < nmCount; i++) {
  2042. UA_Byte nmDsmCount = maxDSM;
  2043. if(i == nmCount - 1 && (dsmCount % maxDSM))
  2044. nmDsmCount = (UA_Byte)dsmCount % maxDSM;
  2045. UA_StatusCode res3 = UA_STATUSCODE_GOOD;
  2046. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  2047. res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
  2048. &dsWriterIds[i * maxDSM], nmDsmCount,
  2049. &writerGroup->config.messageSettings,
  2050. &writerGroup->config.transportSettings);
  2051. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  2052. res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
  2053. &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
  2054. }
  2055. if(res3 != UA_STATUSCODE_GOOD)
  2056. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  2057. "PubSub Publish: Sending a NetworkMessage failed");
  2058. }
  2059. /* Clean up DSM */
  2060. for(size_t i = 0; i < dsmCount; i++)
  2061. UA_DataSetMessage_free(&dsmStore[i]);
  2062. }
  2063. /* Add new publishCallback. The first execution is triggered directly after
  2064. * creation. */
  2065. UA_StatusCode
  2066. UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  2067. UA_StatusCode retval =
  2068. UA_PubSubManager_addRepeatedCallback(server,
  2069. (UA_ServerCallback) UA_WriterGroup_publishCallback,
  2070. writerGroup, writerGroup->config.publishingInterval,
  2071. &writerGroup->publishCallbackId);
  2072. if(retval == UA_STATUSCODE_GOOD)
  2073. writerGroup->publishCallbackIsRegistered = true;
  2074. /* Run once after creation */
  2075. UA_WriterGroup_publishCallback(server, writerGroup);
  2076. return retval;
  2077. }
  2078. /* This callback triggers the collection and reception of NetworkMessages and the
  2079. * contained DataSetMessages. */
  2080. void UA_ReaderGroup_subscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  2081. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  2082. UA_ByteString buffer;
  2083. if(UA_ByteString_allocBuffer(&buffer, 512) != UA_STATUSCODE_GOOD) {
  2084. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Message buffer alloc failed!");
  2085. return;
  2086. }
  2087. connection->channel->receive(connection->channel, &buffer, NULL, 300000);
  2088. if(buffer.length > 0) {
  2089. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "Message received:");
  2090. UA_NetworkMessage currentNetworkMessage;
  2091. memset(&currentNetworkMessage, 0, sizeof(UA_NetworkMessage));
  2092. size_t currentPosition = 0;
  2093. UA_NetworkMessage_decodeBinary(&buffer, &currentPosition, &currentNetworkMessage);
  2094. UA_Server_processNetworkMessage(server, &currentNetworkMessage, connection);
  2095. UA_NetworkMessage_deleteMembers(&currentNetworkMessage);
  2096. }
  2097. UA_ByteString_deleteMembers(&buffer);
  2098. }
  2099. /* Add new subscribeCallback. The first execution is triggered directly after
  2100. * creation. */
  2101. UA_StatusCode
  2102. UA_ReaderGroup_addSubscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  2103. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  2104. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  2105. if(connection != NULL) {
  2106. retval = connection->channel->regist(connection->channel, NULL, NULL);
  2107. if(retval == UA_STATUSCODE_GOOD) {
  2108. retval = UA_PubSubManager_addRepeatedCallback(server,
  2109. (UA_ServerCallback) UA_ReaderGroup_subscribeCallback,
  2110. readerGroup, 5,
  2111. &readerGroup->subscribeCallbackId);
  2112. }
  2113. else {
  2114. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER, "register channel failed: 0x%x!", retval);
  2115. }
  2116. }
  2117. if(retval == UA_STATUSCODE_GOOD) {
  2118. readerGroup->subscribeCallbackIsRegistered = true;
  2119. }
  2120. /* Run once after creation */
  2121. UA_ReaderGroup_subscribeCallback(server, readerGroup);
  2122. return retval;
  2123. }
  2124. #endif /* UA_ENABLE_PUBSUB */