ua_pubsub.c 104 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. * Copyright (c) 2019 Kalycito Infotech Private Limited
  8. */
  9. #include <open62541/server_config_default.h>
  10. #include "server/ua_server_internal.h"
  11. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  12. #include "ua_pubsub.h"
  13. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  14. #include "ua_pubsub_ns0.h"
  15. #endif
  16. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  17. #include "ua_types_encoding_binary.h"
  18. #endif
  19. #define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */
  20. #define UA_MAX_SIZENAME 64 /* Max size of Qualified Name of Subscribed Variable */
  21. /* Forward declaration */
  22. static size_t
  23. iterateVariableAttrDimension(UA_VariableAttributes varAttr);
  24. static void
  25. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup);
  26. static void
  27. UA_DataSetField_deleteMembers(UA_DataSetField *field);
  28. /**********************************************/
  29. /* Connection */
  30. /**********************************************/
  31. UA_StatusCode
  32. UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
  33. UA_PubSubConnectionConfig *dst) {
  34. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  35. memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
  36. retVal |= UA_String_copy(&src->name, &dst->name);
  37. retVal |= UA_Variant_copy(&src->address, &dst->address);
  38. retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
  39. retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
  40. if(src->connectionPropertiesSize > 0){
  41. dst->connectionProperties = (UA_KeyValuePair *)
  42. UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
  43. if(!dst->connectionProperties){
  44. return UA_STATUSCODE_BADOUTOFMEMORY;
  45. }
  46. for(size_t i = 0; i < src->connectionPropertiesSize; i++){
  47. retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
  48. &dst->connectionProperties[i].key);
  49. retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
  50. &dst->connectionProperties[i].value);
  51. }
  52. }
  53. return retVal;
  54. }
  55. UA_StatusCode
  56. UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
  57. UA_PubSubConnectionConfig *config) {
  58. if(!config)
  59. return UA_STATUSCODE_BADINVALIDARGUMENT;
  60. UA_PubSubConnection *currentPubSubConnection =
  61. UA_PubSubConnection_findConnectionbyId(server, connection);
  62. if(!currentPubSubConnection)
  63. return UA_STATUSCODE_BADNOTFOUND;
  64. UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
  65. //deep copy of the actual config
  66. UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
  67. *config = tmpPubSubConnectionConfig;
  68. return UA_STATUSCODE_GOOD;
  69. }
  70. UA_PubSubConnection *
  71. UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
  72. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  73. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
  74. return &server->pubSubManager.connections[i];
  75. }
  76. }
  77. return NULL;
  78. }
  79. void
  80. UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig) {
  81. UA_String_deleteMembers(&connectionConfig->name);
  82. UA_String_deleteMembers(&connectionConfig->transportProfileUri);
  83. UA_Variant_deleteMembers(&connectionConfig->connectionTransportSettings);
  84. UA_Variant_deleteMembers(&connectionConfig->address);
  85. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  86. UA_QualifiedName_deleteMembers(&connectionConfig->connectionProperties[i].key);
  87. UA_Variant_deleteMembers(&connectionConfig->connectionProperties[i].value);
  88. }
  89. UA_free(connectionConfig->connectionProperties);
  90. }
  91. void
  92. UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection) {
  93. //delete connection config
  94. UA_PubSubConnectionConfig_deleteMembers(connection->config);
  95. //remove contained WriterGroups
  96. UA_WriterGroup *writerGroup, *tmpWriterGroup;
  97. LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
  98. UA_Server_removeWriterGroup(server, writerGroup->identifier);
  99. }
  100. /* remove contained ReaderGroups */
  101. UA_ReaderGroup *readerGroups, *tmpReaderGroup;
  102. LIST_FOREACH_SAFE(readerGroups, &connection->readerGroups, listEntry, tmpReaderGroup){
  103. UA_Server_removeReaderGroup(server, readerGroups->identifier);
  104. }
  105. UA_NodeId_deleteMembers(&connection->identifier);
  106. if(connection->channel){
  107. connection->channel->close(connection->channel);
  108. }
  109. UA_free(connection->config);
  110. }
  111. UA_StatusCode
  112. UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
  113. const UA_WriterGroupConfig *writerGroupConfig,
  114. UA_NodeId *writerGroupIdentifier) {
  115. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  116. if(!writerGroupConfig)
  117. return UA_STATUSCODE_BADINVALIDARGUMENT;
  118. //search the connection by the given connectionIdentifier
  119. UA_PubSubConnection *currentConnectionContext =
  120. UA_PubSubConnection_findConnectionbyId(server, connection);
  121. if(!currentConnectionContext)
  122. return UA_STATUSCODE_BADNOTFOUND;
  123. //allocate memory for new WriterGroup
  124. UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
  125. if(!newWriterGroup)
  126. return UA_STATUSCODE_BADOUTOFMEMORY;
  127. newWriterGroup->linkedConnection = currentConnectionContext->identifier;
  128. UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
  129. if(writerGroupIdentifier){
  130. UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
  131. }
  132. //deep copy of the config
  133. UA_WriterGroupConfig tmpWriterGroupConfig;
  134. retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
  135. if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
  136. UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
  137. tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
  138. tmpWriterGroupConfig.messageSettings.content.decoded.type =
  139. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
  140. tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
  141. }
  142. newWriterGroup->config = tmpWriterGroupConfig;
  143. retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
  144. LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
  145. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  146. addWriterGroupRepresentation(server, newWriterGroup);
  147. #endif
  148. return retVal;
  149. }
  150. UA_StatusCode
  151. UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
  152. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  153. if(!wg)
  154. return UA_STATUSCODE_BADNOTFOUND;
  155. UA_PubSubConnection *connection =
  156. UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  157. if(!connection)
  158. return UA_STATUSCODE_BADNOTFOUND;
  159. //unregister the publish callback
  160. UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
  161. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  162. removeGroupRepresentation(server, wg);
  163. #endif
  164. UA_WriterGroup_deleteMembers(server, wg);
  165. LIST_REMOVE(wg, listEntry);
  166. UA_free(wg);
  167. return UA_STATUSCODE_GOOD;
  168. }
  169. /**********************************************/
  170. /* ReaderGroup */
  171. /**********************************************/
  172. /**
  173. * Add ReaderGroup to connection.
  174. *
  175. * @param server
  176. * @param connectionIdentifier
  177. * @param readerGroupConfiguration
  178. * @param readerGroupIdentifier
  179. * @return UA_STATUSCODE_GOOD on success
  180. */
  181. UA_StatusCode
  182. UA_Server_addReaderGroup(UA_Server *server, UA_NodeId connectionIdentifier,
  183. const UA_ReaderGroupConfig *readerGroupConfig,
  184. UA_NodeId *readerGroupIdentifier) {
  185. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  186. UA_ReaderGroupConfig tmpReaderGroupConfig;
  187. /* Search the connection by the given connectionIdentifier */
  188. if(!readerGroupConfig) {
  189. return UA_STATUSCODE_BADINVALIDARGUMENT;
  190. }
  191. /* Search the connection by the given connectionIdentifier */
  192. UA_PubSubConnection *currentConnectionContext = UA_PubSubConnection_findConnectionbyId(server, connectionIdentifier);
  193. if(!currentConnectionContext) {
  194. return UA_STATUSCODE_BADNOTFOUND;
  195. }
  196. /* Allocate memory for new reader group */
  197. UA_ReaderGroup *newGroup = (UA_ReaderGroup *)UA_calloc(1, sizeof(UA_ReaderGroup));
  198. if(!newGroup) {
  199. return UA_STATUSCODE_BADOUTOFMEMORY;
  200. }
  201. /* Generate nodeid for the readergroup identifier */
  202. newGroup->linkedConnection = currentConnectionContext->identifier;
  203. UA_PubSubManager_generateUniqueNodeId(server, &newGroup->identifier);
  204. if(readerGroupIdentifier) {
  205. UA_NodeId_copy(&newGroup->identifier, readerGroupIdentifier);
  206. }
  207. /* Deep copy of the config */
  208. retval |= UA_ReaderGroupConfig_copy(readerGroupConfig, &tmpReaderGroupConfig);
  209. newGroup->config = tmpReaderGroupConfig;
  210. retval |= UA_ReaderGroup_addSubscribeCallback(server, newGroup);
  211. LIST_INSERT_HEAD(&currentConnectionContext->readerGroups, newGroup, listEntry);
  212. currentConnectionContext->readerGroupsSize++;
  213. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  214. addReaderGroupRepresentation(server, newGroup);
  215. #endif
  216. return retval;
  217. }
  218. /**
  219. * Remove ReaderGroup from connection and delete contained readers.
  220. *
  221. * @param server
  222. * @param groupIdentifier
  223. * @return UA_STATUSCODE_GOOD on success
  224. */
  225. UA_StatusCode
  226. UA_Server_removeReaderGroup(UA_Server *server, UA_NodeId groupIdentifier) {
  227. UA_ReaderGroup* readerGroup = UA_ReaderGroup_findRGbyId(server, groupIdentifier);
  228. if(readerGroup == NULL) {
  229. return UA_STATUSCODE_BADNOTFOUND;
  230. }
  231. /* Search the connection to which the given readergroup is connected to */
  232. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  233. if(connection == NULL) {
  234. return UA_STATUSCODE_BADNOTFOUND;
  235. }
  236. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  237. /* To Do:RemoveGroupRepresentation(server, &readerGroup->identifier) */
  238. #endif
  239. /* UA_Server_ReaderGroup_delete also removes itself from the list */
  240. UA_Server_ReaderGroup_delete(server, readerGroup);
  241. /* Remove readerGroup from Connection */
  242. LIST_REMOVE(readerGroup, listEntry);
  243. UA_free(readerGroup);
  244. return UA_STATUSCODE_GOOD;
  245. }
  246. /**
  247. * To Do:
  248. * Update ReaderGroup configuration.
  249. *
  250. * @param server
  251. * @param readerGroupIdentifier
  252. * @param readerGroupConfiguration
  253. * @return UA_STATUSCODE_GOOD on success
  254. */
  255. UA_StatusCode
  256. UA_Server_ReaderGroup_updateConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  257. const UA_ReaderGroupConfig *config) {
  258. return UA_STATUSCODE_BADNOTIMPLEMENTED;
  259. }
  260. /**
  261. * Get ReaderGroup configuration.
  262. *
  263. * @param server
  264. * @param groupIdentifier
  265. * @param readerGroupConfiguration
  266. * @return UA_STATUSCODE_GOOD on success
  267. */
  268. UA_StatusCode
  269. UA_Server_ReaderGroup_getConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  270. UA_ReaderGroupConfig *config) {
  271. if(!config) {
  272. return UA_STATUSCODE_BADINVALIDARGUMENT;
  273. }
  274. /* Identify the readergroup through the readerGroupIdentifier */
  275. UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  276. if(!currentReaderGroup) {
  277. return UA_STATUSCODE_BADNOTFOUND;
  278. }
  279. UA_ReaderGroupConfig tmpReaderGroupConfig;
  280. /* deep copy of the actual config */
  281. UA_ReaderGroupConfig_copy(&currentReaderGroup->config, &tmpReaderGroupConfig);
  282. *config = tmpReaderGroupConfig;
  283. return UA_STATUSCODE_GOOD;
  284. }
  285. /* To Do UA_ReaderGroupConfig delete */
  286. /**
  287. * Delete ReaderGroup.
  288. *
  289. * @param server
  290. * @param groupIdentifier
  291. */
  292. void UA_Server_ReaderGroup_delete(UA_Server* server, UA_ReaderGroup *readerGroup) {
  293. /* To Do Call UA_ReaderGroupConfig_delete */
  294. UA_DataSetReader *dataSetReader, *tmpDataSetReader;
  295. LIST_FOREACH_SAFE(dataSetReader, &readerGroup->readers, listEntry, tmpDataSetReader) {
  296. UA_DataSetReader_delete(server, dataSetReader);
  297. }
  298. UA_PubSubConnection* pConn = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  299. if(pConn != NULL) {
  300. pConn->readerGroupsSize--;
  301. }
  302. /* Delete ReaderGroup and its members */
  303. UA_String_deleteMembers(&readerGroup->config.name);
  304. UA_NodeId_deleteMembers(&readerGroup->linkedConnection);
  305. UA_NodeId_deleteMembers(&readerGroup->identifier);
  306. }
  307. /**
  308. * Copy ReaderGroup configuration.
  309. *
  310. * @param source
  311. * @param destination
  312. * @return UA_STATUSCODE_GOOD on success
  313. */
  314. UA_StatusCode
  315. UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,
  316. UA_ReaderGroupConfig *dst) {
  317. UA_String_copy(&src->name, &dst->name);
  318. /* Currently simple memcpy only */
  319. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  320. return UA_STATUSCODE_GOOD;
  321. }
  322. static UA_DataSetReader *
  323. getReaderFromIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_PubSubConnection *pConnection) {
  324. if(pConnection->readerGroupsSize == 1) {
  325. if(LIST_FIRST(&pConnection->readerGroups)->readersCount == 1) {
  326. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "only 1 DataSetReader available. This one will be used.");
  327. return LIST_FIRST(&LIST_FIRST(&pConnection->readerGroups)->readers);
  328. }
  329. }
  330. if(!pMsg->publisherIdEnabled)
  331. return NULL;
  332. UA_ReaderGroup* readerGroup;
  333. LIST_FOREACH(readerGroup, &pConnection->readerGroups, listEntry) {
  334. UA_DataSetReader *tmpReader;
  335. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  336. switch (pMsg->publisherIdType) {
  337. case UA_PUBLISHERDATATYPE_BYTE:
  338. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_BYTE] &&
  339. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_BYTE &&
  340. pMsg->publisherId.publisherIdByte == *(UA_Byte*)tmpReader->config.publisherId.data) {
  341. return tmpReader;
  342. }
  343. break;
  344. case UA_PUBLISHERDATATYPE_UINT16:
  345. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT16] &&
  346. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT16 &&
  347. pMsg->publisherId.publisherIdUInt16 == *(UA_UInt16*)tmpReader->config.publisherId.data) {
  348. return tmpReader;
  349. }
  350. break;
  351. case UA_PUBLISHERDATATYPE_UINT32:
  352. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT32] &&
  353. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT32 &&
  354. pMsg->publisherId.publisherIdUInt32 == *(UA_UInt32*)tmpReader->config.publisherId.data) {
  355. return tmpReader;
  356. }
  357. break;
  358. case UA_PUBLISHERDATATYPE_UINT64:
  359. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT64] &&
  360. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT64 &&
  361. pMsg->publisherId.publisherIdUInt64 == *(UA_UInt64*)tmpReader->config.publisherId.data) {
  362. return tmpReader;
  363. }
  364. break;
  365. case UA_PUBLISHERDATATYPE_STRING:
  366. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_STRING] &&
  367. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_STRING &&
  368. UA_String_equal(&pMsg->publisherId.publisherIdString, (UA_String*)tmpReader->config.publisherId.data)) {
  369. return tmpReader;
  370. }
  371. break;
  372. default:
  373. return NULL;
  374. }
  375. }
  376. }
  377. return NULL;
  378. }
  379. /**
  380. * Process NetworkMessage.
  381. *
  382. * @param server
  383. * @param networkmessage
  384. * @return UA_STATUSCODE_GOOD on success
  385. */
  386. UA_StatusCode
  387. UA_Server_processNetworkMessage(UA_Server *server, UA_NetworkMessage *pMsg,
  388. UA_PubSubConnection *pConnection) {
  389. if(!pMsg || !pConnection)
  390. return UA_STATUSCODE_BADINVALIDARGUMENT;
  391. /* To Do The condition with dataSetWriterIdAvailable and WriterGroupIdAvailable to be handled
  392. * when pMsg->groupHeaderEnabled, pMsg->dataSetClassIdEnabled, pMsg->payloadHeaderEnabled
  393. * Here some filtering is possible */
  394. UA_DataSetReader* dataSetReaderErg = getReaderFromIdentifier(server, pMsg, pConnection);
  395. /* No Reader with the specified id found */
  396. if(!dataSetReaderErg) {
  397. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "No DataSetReader found with PublisherId");
  398. return UA_STATUSCODE_BADNOTFOUND; /* TODO: Check the return code */
  399. }
  400. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetReader found with PublisherId");
  401. UA_Byte anzDataSets = 1;
  402. if(pMsg->payloadHeaderEnabled)
  403. anzDataSets = pMsg->payloadHeader.dataSetPayloadHeader.count;
  404. for(UA_Byte iterator = 0; iterator < anzDataSets; iterator++) {
  405. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Process Msg with DataSetReader!");
  406. UA_Server_DataSetReader_process(server, dataSetReaderErg, &pMsg->payload.dataSetPayload.dataSetMessages[iterator]);
  407. }
  408. /* To Do the condition with dataSetWriterId and WriterGroupId
  409. * else condition for dataSetWriterIdAvailable and writerGroupIdAvailable) */
  410. return UA_STATUSCODE_GOOD;
  411. }
  412. /**
  413. * Find size iterator - array dimension for VariableAttributes
  414. *
  415. * @param VariableAttributes
  416. * @return global variable size iterator
  417. */
  418. static size_t
  419. iterateVariableAttrDimension (UA_VariableAttributes varAttr) {
  420. size_t sizeIterator = 0;
  421. for(size_t indexIterator = 0; indexIterator < varAttr.arrayDimensionsSize; indexIterator++) {
  422. sizeIterator += varAttr.arrayDimensions[indexIterator];
  423. }
  424. return sizeIterator;
  425. }
  426. /**
  427. * Find ReaderGroup with its identifier.
  428. *
  429. * @param server
  430. * @param groupIdentifier
  431. * @return the ReaderGroup or NULL if not found
  432. */
  433. UA_ReaderGroup * UA_ReaderGroup_findRGbyId(UA_Server *server, UA_NodeId identifier) {
  434. for (size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
  435. UA_ReaderGroup* readerGroup = NULL;
  436. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
  437. if(UA_NodeId_equal(&identifier, &readerGroup->identifier)) {
  438. return readerGroup;
  439. }
  440. }
  441. }
  442. return NULL;
  443. }
  444. /**
  445. * Find a DataSetReader with its identifier
  446. *
  447. * @param server
  448. * @param identifier
  449. * @return the DataSetReader or NULL if not found
  450. */
  451. UA_DataSetReader *UA_ReaderGroup_findDSRbyId(UA_Server *server, UA_NodeId identifier) {
  452. for (size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
  453. UA_ReaderGroup* readerGroup = NULL;
  454. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
  455. UA_DataSetReader *tmpReader;
  456. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  457. if(UA_NodeId_equal(&tmpReader->identifier, &identifier)) {
  458. return tmpReader;
  459. }
  460. }
  461. }
  462. }
  463. return NULL;
  464. }
  465. /**********************************************/
  466. /* DataSetReader */
  467. /**********************************************/
  468. /**
  469. * Add a DataSetReader to ReaderGroup
  470. *
  471. * @param server
  472. * @param readerGroupIdentifier
  473. * @param dataSetReaderConfig
  474. * @param readerIdentifier
  475. * @return UA_STATUSCODE_GOOD on success
  476. */
  477. UA_StatusCode
  478. UA_Server_addDataSetReader(UA_Server *server, UA_NodeId readerGroupIdentifier,
  479. const UA_DataSetReaderConfig *dataSetReaderConfig,
  480. UA_NodeId *readerIdentifier) {
  481. /* Search the reader group by the given readerGroupIdentifier */
  482. UA_ReaderGroup *readerGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  483. if(!dataSetReaderConfig) {
  484. return UA_STATUSCODE_BADNOTFOUND;
  485. }
  486. if(readerGroup == NULL) {
  487. return UA_STATUSCODE_BADNOTFOUND;
  488. }
  489. /* Allocate memory for new DataSetReader */
  490. UA_DataSetReader *newDataSetReader = (UA_DataSetReader *)UA_calloc(1, sizeof(UA_DataSetReader));
  491. /* Copy the config into the new dataSetReader */
  492. UA_DataSetReaderConfig_copy(dataSetReaderConfig, &newDataSetReader->config);
  493. newDataSetReader->linkedReaderGroup = readerGroup->identifier;
  494. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetReader->identifier);
  495. if(readerIdentifier != NULL) {
  496. UA_NodeId_copy(&newDataSetReader->identifier, readerIdentifier);
  497. }
  498. /* Add the new reader to the group */
  499. LIST_INSERT_HEAD(&readerGroup->readers, newDataSetReader, listEntry);
  500. readerGroup->readersCount++;
  501. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  502. addDataSetReaderRepresentation(server, newDataSetReader);
  503. #endif
  504. return UA_STATUSCODE_GOOD;
  505. }
  506. /**
  507. * Remove a DataSetReader from ReaderGroup
  508. *
  509. * @param server
  510. * @param readerGroupIdentifier
  511. * @return UA_STATUSCODE_GOOD on success
  512. */
  513. UA_StatusCode
  514. UA_Server_removeDataSetReader(UA_Server *server, UA_NodeId readerIdentifier) {
  515. /* Remove datasetreader given by the identifier */
  516. UA_DataSetReader *dataSetReader = UA_ReaderGroup_findDSRbyId(server, readerIdentifier);
  517. if(!dataSetReader) {
  518. return UA_STATUSCODE_BADNOTFOUND;
  519. }
  520. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  521. removeDataSetReaderRepresentation(server, dataSetReader);
  522. #endif
  523. UA_DataSetReader_delete(server, dataSetReader);
  524. return UA_STATUSCODE_GOOD;
  525. }
  526. /**
  527. * Update the config of the DataSetReader.
  528. *
  529. * @param server
  530. * @param dataSetReaderIdentifier
  531. * @param readerGroupIdentifier
  532. * @param config
  533. * @return UA_STATUSCODE_GOOD on success
  534. */
  535. UA_StatusCode
  536. UA_Server_DataSetReader_updateConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier, UA_NodeId readerGroupIdentifier,
  537. const UA_DataSetReaderConfig *config) {
  538. if(config == NULL) {
  539. return UA_STATUSCODE_BADINVALIDARGUMENT;
  540. }
  541. UA_DataSetReader *currentDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  542. UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  543. if(!currentDataSetReader) {
  544. return UA_STATUSCODE_BADNOTFOUND;
  545. }
  546. /* The update functionality will be extended during the next PubSub batches.
  547. * Currently is only a change of the publishing interval possible. */
  548. if(currentDataSetReader->config.writerGroupId != config->writerGroupId) {
  549. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentReaderGroup->subscribeCallbackId);
  550. currentDataSetReader->config.writerGroupId = config->writerGroupId;
  551. UA_ReaderGroup_subscribeCallback(server, currentReaderGroup);
  552. }
  553. else {
  554. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  555. "No or unsupported ReaderGroup update.");
  556. }
  557. return UA_STATUSCODE_GOOD;
  558. }
  559. /**
  560. * Get the current config of the UA_DataSetReader.
  561. *
  562. * @param server
  563. * @param dataSetReaderIdentifier
  564. * @param config
  565. * @return UA_STATUSCODE_GOOD on success
  566. */
  567. UA_StatusCode
  568. UA_Server_DataSetReader_getConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier,
  569. UA_DataSetReaderConfig *config) {
  570. if(!config) {
  571. return UA_STATUSCODE_BADINVALIDARGUMENT;
  572. }
  573. UA_DataSetReader *currentDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  574. if(!currentDataSetReader) {
  575. return UA_STATUSCODE_BADNOTFOUND;
  576. }
  577. UA_DataSetReaderConfig tmpReaderConfig;
  578. /* Deep copy of the actual config */
  579. UA_DataSetReaderConfig_copy(&currentDataSetReader->config, &tmpReaderConfig);
  580. *config = tmpReaderConfig;
  581. return UA_STATUSCODE_GOOD;
  582. }
  583. /**
  584. * This Method is used to initially set the SubscribedDataSet to TargetVariablesType and to create the list of target Variables of a SubscribedDataSetType.
  585. *
  586. * @param server
  587. * @param dataSetReaderIdentifier
  588. * @param targetVariables
  589. * @return UA_STATUSCODE_GOOD on success
  590. */
  591. UA_StatusCode
  592. UA_Server_DataSetReader_createTargetVariables(UA_Server *server, UA_NodeId dataSetReaderIdentifier, UA_TargetVariablesDataType *targetVariables) {
  593. UA_StatusCode retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
  594. UA_DataSetReader* pDS = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  595. if(pDS == NULL) {
  596. return UA_STATUSCODE_BADINVALIDARGUMENT;
  597. }
  598. if(pDS->subscribedDataSetTarget.targetVariablesSize > 0) {
  599. UA_TargetVariablesDataType_deleteMembers(&pDS->subscribedDataSetTarget);
  600. pDS->subscribedDataSetTarget.targetVariablesSize = 0;
  601. pDS->subscribedDataSetTarget.targetVariables = NULL;
  602. }
  603. /* Set subscribed dataset to TargetVariableType */
  604. pDS->subscribedDataSetType = UA_PUBSUB_SDS_TARGET;
  605. retval = UA_TargetVariablesDataType_copy(targetVariables, &pDS->subscribedDataSetTarget);
  606. return retval;
  607. }
  608. /**
  609. * Adds Subscribed Variables from the DataSetMetaData for the given DataSet into the given parent node
  610. * and creates the corresponding data in the targetVariables of the DataSetReader
  611. *
  612. * @param server
  613. * @param parentNode
  614. * @param dataSetReaderIdentifier
  615. * @return UA_STATUSCODE_GOOD on success
  616. */
  617. UA_StatusCode UA_Server_DataSetReader_addTargetVariables(UA_Server *server, UA_NodeId *parentNode, UA_NodeId dataSetReaderIdentifier, UA_SubscribedDataSetEnumType sdsType) {
  618. if((server == NULL) || (parentNode == NULL)) {
  619. return UA_STATUSCODE_BADINVALIDARGUMENT;
  620. }
  621. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  622. UA_DataSetReader* pDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  623. if(pDataSetReader == NULL) {
  624. return UA_STATUSCODE_BADINVALIDARGUMENT;
  625. }
  626. /* To Do Declare for all other type */
  627. UA_DateTime dateTimeTypeVal = 0;
  628. UA_TargetVariablesDataType targetVars;
  629. targetVars.targetVariablesSize = pDataSetReader->config.dataSetMetaData.fieldsSize;
  630. targetVars.targetVariables = (UA_FieldTargetDataType *)UA_calloc(targetVars.targetVariablesSize, sizeof(UA_FieldTargetDataType));
  631. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  632. for (size_t iteratorField = 0; iteratorField < pDataSetReader->config.dataSetMetaData.fieldsSize; iteratorField++) {
  633. UA_VariableAttributes vAttr = UA_VariableAttributes_default;
  634. vAttr.valueRank = pDataSetReader->config.dataSetMetaData.fields[iteratorField].valueRank;
  635. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize > 0) {
  636. retVal = UA_Array_copy(pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensions, pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize, (void**)&vAttr.arrayDimensions, &UA_TYPES[UA_TYPES_UINT32]);
  637. if(retVal == UA_STATUSCODE_GOOD) {
  638. vAttr.arrayDimensionsSize = pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize;
  639. }
  640. }
  641. switch (pDataSetReader->config.dataSetMetaData.fields[iteratorField].builtInType) {
  642. case UA_NS0ID_BOOLEAN:
  643. if(vAttr.arrayDimensionsSize > 0) {
  644. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  645. UA_Boolean *boolArray = (UA_Boolean*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_BOOLEAN]);
  646. UA_Variant_setArrayCopy(&vAttr.value, boolArray, sizeIterator, &UA_TYPES[UA_TYPES_BOOLEAN]);
  647. UA_Array_delete(boolArray, sizeIterator, &UA_TYPES[UA_TYPES_BOOLEAN]);
  648. }
  649. else {
  650. UA_Boolean boolTypeVal = false;
  651. UA_Variant_setScalar(&vAttr.value, &boolTypeVal, &UA_TYPES[UA_TYPES_BOOLEAN]);
  652. }
  653. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  654. break;
  655. case UA_NS0ID_SBYTE:
  656. if(vAttr.arrayDimensionsSize > 0) {
  657. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  658. UA_SByte *sbyteArray = (UA_SByte*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_SBYTE]);
  659. UA_Variant_setArrayCopy(&vAttr.value, sbyteArray, sizeIterator, &UA_TYPES[UA_TYPES_SBYTE]);
  660. UA_Array_delete(sbyteArray, sizeIterator, &UA_TYPES[UA_TYPES_SBYTE]);
  661. }
  662. else {
  663. UA_SByte sbyteTypeVal = 0;
  664. UA_Variant_setScalar(&vAttr.value, &sbyteTypeVal, &UA_TYPES[UA_TYPES_SBYTE]);
  665. }
  666. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  667. break;
  668. case UA_NS0ID_BYTE:
  669. if(vAttr.arrayDimensionsSize > 0) {
  670. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  671. UA_Byte *byteArray = (UA_Byte*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_BYTE]);
  672. UA_Variant_setArrayCopy(&vAttr.value, byteArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTE]);
  673. UA_Array_delete(byteArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTE]);
  674. }
  675. else {
  676. UA_Byte byteTypeVal = 0;
  677. UA_Variant_setScalar(&vAttr.value, &byteTypeVal, &UA_TYPES[UA_TYPES_BYTE]);
  678. }
  679. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  680. break;
  681. case UA_NS0ID_INT16:
  682. if(vAttr.arrayDimensionsSize > 0) {
  683. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  684. UA_Int16 *int16Array = (UA_Int16*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_INT16]);
  685. UA_Variant_setArrayCopy(&vAttr.value, int16Array, sizeIterator, &UA_TYPES[UA_TYPES_INT16]);
  686. UA_Array_delete(int16Array, sizeIterator, &UA_TYPES[UA_TYPES_INT16]);
  687. }
  688. else {
  689. UA_Int16 int16TypeVal = 0;
  690. UA_Variant_setScalar(&vAttr.value, &int16TypeVal, &UA_TYPES[UA_TYPES_INT16]);
  691. }
  692. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  693. break;
  694. case UA_NS0ID_UINT16:
  695. if(vAttr.arrayDimensionsSize > 0) {
  696. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  697. UA_UInt16 *uint16Array = (UA_UInt16*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_UINT16]);
  698. UA_Variant_setArrayCopy(&vAttr.value, uint16Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT16]);
  699. UA_Array_delete(uint16Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT16]);
  700. }
  701. else {
  702. UA_UInt16 uint16TypeVal = 0;
  703. UA_Variant_setScalar(&vAttr.value, &uint16TypeVal, &UA_TYPES[UA_TYPES_UINT16]);
  704. }
  705. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  706. break;
  707. case UA_NS0ID_INT32:
  708. if(vAttr.arrayDimensionsSize > 0) {
  709. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  710. UA_Int32 *int32Array = (UA_Int32*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_INT32]);
  711. UA_Variant_setArrayCopy(&vAttr.value, int32Array, sizeIterator, &UA_TYPES[UA_TYPES_INT32]);
  712. UA_Array_delete(int32Array, sizeIterator, &UA_TYPES[UA_TYPES_INT32]);
  713. }
  714. else {
  715. UA_Int32 int32TypeVal = 0;
  716. UA_Variant_setScalar(&vAttr.value, &int32TypeVal, &UA_TYPES[UA_TYPES_INT32]);
  717. }
  718. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  719. break;
  720. case UA_NS0ID_UINT32:
  721. if(vAttr.arrayDimensionsSize > 0) {
  722. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  723. UA_UInt32 *uint32Array = (UA_UInt32*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_UINT32]);
  724. UA_Variant_setArrayCopy(&vAttr.value, uint32Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT32]);
  725. UA_Array_delete(uint32Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT32]);
  726. }
  727. else {
  728. UA_UInt32 uint32TypeVal = 0;
  729. UA_Variant_setScalar(&vAttr.value, &uint32TypeVal, &UA_TYPES[UA_TYPES_UINT32]);
  730. }
  731. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  732. break;
  733. case UA_NS0ID_INT64:
  734. if(vAttr.arrayDimensionsSize > 0) {
  735. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  736. UA_Int64 *int64Array = (UA_Int64*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_INT64]);
  737. UA_Variant_setArrayCopy(&vAttr.value, int64Array, sizeIterator, &UA_TYPES[UA_TYPES_INT64]);
  738. UA_Array_delete(int64Array, sizeIterator, &UA_TYPES[UA_TYPES_INT64]);
  739. }
  740. else {
  741. UA_Int64 int64TypeVal = 0;
  742. UA_Variant_setScalar(&vAttr.value, &int64TypeVal, &UA_TYPES[UA_TYPES_INT64]);
  743. }
  744. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  745. break;
  746. case UA_NS0ID_UINT64:
  747. if(vAttr.arrayDimensionsSize > 0) {
  748. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  749. UA_UInt64 *uint64Array = (UA_UInt64*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_UINT64]);
  750. UA_Variant_setArrayCopy(&vAttr.value, uint64Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT64]);
  751. UA_Array_delete(uint64Array, sizeIterator, &UA_TYPES[UA_TYPES_UINT64]);
  752. }
  753. else {
  754. UA_UInt64 uint64TypeVal = 0;
  755. UA_Variant_setScalar(&vAttr.value, &uint64TypeVal, &UA_TYPES[UA_TYPES_UINT64]);
  756. }
  757. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  758. break;
  759. case UA_NS0ID_FLOAT:
  760. if(vAttr.arrayDimensionsSize > 0) {
  761. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  762. UA_Float *floatArray = (UA_Float*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_FLOAT]);
  763. UA_Variant_setArrayCopy(&vAttr.value, floatArray, sizeIterator, &UA_TYPES[UA_TYPES_FLOAT]);
  764. UA_Array_delete(floatArray, sizeIterator, &UA_TYPES[UA_TYPES_FLOAT]);
  765. }
  766. else {
  767. UA_Float floatTypeVal = 0;
  768. UA_Variant_setScalar(&vAttr.value, &floatTypeVal, &UA_TYPES[UA_TYPES_FLOAT]);
  769. }
  770. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  771. break;
  772. case UA_NS0ID_DOUBLE:
  773. if(vAttr.arrayDimensionsSize > 0) {
  774. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  775. UA_Double *doubleArray = (UA_Double*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_DOUBLE]);
  776. UA_Variant_setArrayCopy(&vAttr.value, doubleArray, sizeIterator, &UA_TYPES[UA_TYPES_DOUBLE]);
  777. UA_Array_delete(doubleArray, sizeIterator, &UA_TYPES[UA_TYPES_DOUBLE]);
  778. }
  779. else {
  780. UA_Double doubleTypeVal = 0.0;
  781. UA_Variant_setScalar(&vAttr.value, &doubleTypeVal, &UA_TYPES[UA_TYPES_DOUBLE]);
  782. }
  783. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  784. break;
  785. case UA_NS0ID_STRING:
  786. if(vAttr.arrayDimensionsSize > 0) {
  787. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  788. UA_String *stringArray = (UA_String*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_STRING]);
  789. UA_Variant_setArrayCopy(&vAttr.value, stringArray, sizeIterator, &UA_TYPES[UA_TYPES_STRING]);
  790. UA_Array_delete(stringArray, sizeIterator, &UA_TYPES[UA_TYPES_STRING]);
  791. }
  792. else {
  793. UA_String stringTypeVal = UA_STRING_NULL;
  794. UA_Variant_setScalar(&vAttr.value, &stringTypeVal, &UA_TYPES[UA_TYPES_STRING]);
  795. }
  796. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  797. break;
  798. case UA_NS0ID_DATETIME:
  799. if(vAttr.arrayDimensionsSize > 0) {
  800. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  801. UA_DateTime *dateTimeArray = (UA_DateTime*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_DATETIME]);
  802. UA_Variant_setArrayCopy(&vAttr.value, dateTimeArray, sizeIterator, &UA_TYPES[UA_TYPES_DATETIME]);
  803. UA_Array_delete(dateTimeArray, sizeIterator, &UA_TYPES[UA_TYPES_DATETIME]);
  804. }
  805. else {
  806. dateTimeTypeVal = 0;
  807. UA_Variant_setScalar(&vAttr.value, &dateTimeTypeVal, &UA_TYPES[UA_TYPES_DATETIME]);
  808. }
  809. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  810. break;
  811. case UA_NS0ID_GUID:
  812. if(vAttr.arrayDimensionsSize > 0) {
  813. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  814. UA_Guid *guidArray = (UA_Guid*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_GUID]);
  815. UA_Variant_setArrayCopy(&vAttr.value, guidArray, sizeIterator, &UA_TYPES[UA_TYPES_GUID]);
  816. UA_Array_delete(guidArray, sizeIterator, &UA_TYPES[UA_TYPES_GUID]);
  817. }
  818. else {
  819. UA_Guid guidTypeVal = UA_GUID_NULL;
  820. UA_Variant_setScalar(&vAttr.value, &guidTypeVal, &UA_TYPES[UA_TYPES_GUID]);
  821. }
  822. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  823. break;
  824. case UA_NS0ID_NODEID:
  825. if(vAttr.arrayDimensionsSize > 0) {
  826. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  827. UA_NodeId *nodeidArray = (UA_NodeId*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_NODEID]);
  828. UA_Variant_setArrayCopy(&vAttr.value, nodeidArray, sizeIterator, &UA_TYPES[UA_TYPES_NODEID]);
  829. UA_Array_delete(nodeidArray, sizeIterator, &UA_TYPES[UA_TYPES_NODEID]);
  830. }
  831. else {
  832. UA_NodeId nodeidTypeVal = UA_NODEID_NULL;
  833. UA_Variant_setScalar(&vAttr.value, &nodeidTypeVal, &UA_TYPES[UA_TYPES_NODEID]);
  834. }
  835. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  836. break;
  837. case UA_NS0ID_BYTESTRING:
  838. if(vAttr.arrayDimensionsSize > 0) {
  839. size_t sizeIterator = iterateVariableAttrDimension(vAttr);
  840. UA_ByteString *byteStringArray = (UA_ByteString*)UA_Array_new(sizeIterator, &UA_TYPES[UA_TYPES_BYTESTRING]);
  841. UA_Variant_setArrayCopy(&vAttr.value, byteStringArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTESTRING]);
  842. UA_Array_delete(byteStringArray, sizeIterator, &UA_TYPES[UA_TYPES_BYTESTRING]);
  843. }
  844. else {
  845. UA_ByteString byteStringTypeVal = UA_BYTESTRING_NULL;
  846. UA_Variant_setScalar(&vAttr.value, &byteStringTypeVal, &UA_TYPES[UA_TYPES_BYTESTRING]);
  847. }
  848. UA_NodeId_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType, &vAttr.dataType);
  849. break;
  850. default:
  851. /* Type not supported */
  852. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "Type %u not supported, create empty variable.", pDataSetReader->config.dataSetMetaData.fields[iteratorField].builtInType);
  853. break;
  854. }
  855. vAttr.accessLevel = UA_ACCESSLEVELMASK_READ;
  856. UA_LocalizedText_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].description, &vAttr.description);
  857. UA_QualifiedName qn;
  858. UA_QualifiedName_init(&qn);
  859. char szTmpName[UA_MAX_SIZENAME];
  860. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length > 0) {
  861. UA_UInt16 slen = UA_MAX_SIZENAME -1;
  862. vAttr.displayName.locale = UA_STRING("en-US");
  863. vAttr.displayName.text = pDataSetReader->config.dataSetMetaData.fields[iteratorField].name;
  864. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length < slen) {
  865. slen = (UA_UInt16)pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length;
  866. UA_snprintf(szTmpName, sizeof(szTmpName), "%s", (const char*)pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.data);
  867. }
  868. szTmpName[slen] = '\0';
  869. qn = UA_QUALIFIEDNAME(1, szTmpName);
  870. }
  871. else {
  872. strcpy(szTmpName, "SubscribedVariable");
  873. vAttr.displayName = UA_LOCALIZEDTEXT("en-US", szTmpName);
  874. qn = UA_QUALIFIEDNAME(1, "SubscribedVariable");
  875. }
  876. /* Add variable to the given parent node */
  877. UA_NodeId newNode;
  878. retVal = UA_Server_addVariableNode(server, UA_NODEID_NULL, *parentNode,
  879. UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), qn,
  880. UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), vAttr, NULL, &newNode);
  881. if(retVal == UA_STATUSCODE_GOOD) {
  882. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "addVariableNode %s succeeded", szTmpName);
  883. }
  884. else {
  885. retval = retVal;
  886. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_USERLAND, "addVariableNode: error 0x%x", retVal);
  887. }
  888. UA_FieldTargetDataType_init(&targetVars.targetVariables[iteratorField]);
  889. targetVars.targetVariables[iteratorField].attributeId = UA_ATTRIBUTEID_VALUE;
  890. UA_NodeId_copy(&newNode, &targetVars.targetVariables[iteratorField].targetNodeId);
  891. UA_NodeId_deleteMembers(&newNode);
  892. if(vAttr.arrayDimensionsSize > 0) {
  893. UA_Variant_deleteMembers(&vAttr.value);
  894. UA_Array_delete(vAttr.arrayDimensions, vAttr.arrayDimensionsSize, &UA_TYPES[UA_TYPES_UINT32]);
  895. }
  896. }
  897. if(sdsType == UA_PUBSUB_SDS_TARGET) {
  898. retval = UA_Server_DataSetReader_createTargetVariables(server, pDataSetReader->identifier, &targetVars);
  899. }
  900. UA_TargetVariablesDataType_deleteMembers(&targetVars);
  901. return retval;
  902. }
  903. /**
  904. * Process a NetworkMessage with a DataSetReader.
  905. *
  906. * @param server
  907. * @param dataSetReader
  908. * @param dataSetMsg
  909. */
  910. void UA_Server_DataSetReader_process(UA_Server *server, UA_DataSetReader *dataSetReader, UA_DataSetMessage* dataSetMsg) {
  911. if((dataSetReader == NULL) || (dataSetMsg == NULL) || (server == NULL)) {
  912. return;
  913. }
  914. if(!dataSetMsg->header.dataSetMessageValid) {
  915. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: message is not valid");
  916. /* To Do check ConfigurationVersion*/
  917. /*if(dataSetMsg->header.configVersionMajorVersionEnabled)
  918. * {
  919. * if(dataSetMsg->header.configVersionMajorVersion != dataSetReader->config.dataSetMetaData.configurationVersion.majorVersion)
  920. * {
  921. * UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: ConfigurationVersion MajorVersion does not match");
  922. * return;
  923. * }
  924. } */
  925. }
  926. else {
  927. if(dataSetMsg->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) {
  928. if(dataSetMsg->header.fieldEncoding != UA_FIELDENCODING_RAWDATA) {
  929. size_t anzFields = dataSetMsg->data.keyFrameData.fieldCount;
  930. if(dataSetReader->config.dataSetMetaData.fieldsSize < anzFields) {
  931. anzFields = dataSetReader->config.dataSetMetaData.fieldsSize;
  932. }
  933. if(dataSetReader->subscribedDataSetTarget.targetVariablesSize < anzFields) {
  934. anzFields = dataSetReader->subscribedDataSetTarget.targetVariablesSize;
  935. }
  936. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  937. for (UA_UInt16 iteratorField = 0; iteratorField < anzFields; iteratorField++) {
  938. if(dataSetMsg->data.keyFrameData.dataSetFields[iteratorField].hasValue) {
  939. if(dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].attributeId == UA_ATTRIBUTEID_VALUE) {
  940. retVal = UA_Server_writeValue(server, dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].targetNodeId, dataSetMsg->data.keyFrameData.dataSetFields[iteratorField].value);
  941. if(retVal != UA_STATUSCODE_GOOD) {
  942. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write Value KF %u: 0x%x", iteratorField, retVal);
  943. }
  944. }
  945. else {
  946. UA_WriteValue writeVal;
  947. UA_WriteValue_init(&writeVal);
  948. writeVal.attributeId = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].attributeId;
  949. writeVal.indexRange = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].receiverIndexRange;
  950. writeVal.nodeId = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].targetNodeId;
  951. UA_DataValue_copy(&dataSetMsg->data.keyFrameData.dataSetFields[iteratorField], &writeVal.value);
  952. retVal = UA_Server_write(server, &writeVal);
  953. if(retVal != UA_STATUSCODE_GOOD) {
  954. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write KF %u: 0x%x", iteratorField, retVal);
  955. }
  956. }
  957. }
  958. }
  959. }
  960. }
  961. }
  962. }
  963. /**
  964. * Copy the config of the DataSetReader.
  965. *
  966. * @param src
  967. * @param dst
  968. * @return UA_STATUSCODE_GOOD on success
  969. */
  970. UA_StatusCode
  971. UA_DataSetReaderConfig_copy(const UA_DataSetReaderConfig *src,
  972. UA_DataSetReaderConfig *dst) {
  973. memset(dst, 0, sizeof(UA_DataSetReaderConfig));
  974. UA_StatusCode retVal = UA_String_copy(&src->name, &dst->name);
  975. if(retVal != UA_STATUSCODE_GOOD) {
  976. return retVal;
  977. }
  978. retVal = UA_Variant_copy(&src->publisherId, &dst->publisherId);
  979. if(retVal != UA_STATUSCODE_GOOD) {
  980. return retVal;
  981. }
  982. dst->writerGroupId = src->writerGroupId;
  983. dst->dataSetWriterId = src->dataSetWriterId;
  984. retVal = UA_DataSetMetaDataType_copy(&src->dataSetMetaData, &dst->dataSetMetaData);
  985. if(retVal != UA_STATUSCODE_GOOD) {
  986. return retVal;
  987. }
  988. dst->dataSetFieldContentMask = src->dataSetFieldContentMask;
  989. dst->messageReceiveTimeout = src->messageReceiveTimeout;
  990. /* Currently memcpy is used to copy the securityParameters */
  991. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  992. retVal = UA_UadpDataSetReaderMessageDataType_copy(&src->messageSettings, &dst->messageSettings);
  993. if(retVal != UA_STATUSCODE_GOOD) {
  994. return retVal;
  995. }
  996. return UA_STATUSCODE_GOOD;
  997. }
  998. /**
  999. * Delete the DataSetReader.
  1000. *
  1001. * @param server
  1002. * @param dataSetReader
  1003. */
  1004. void UA_DataSetReader_delete(UA_Server *server, UA_DataSetReader *dataSetReader) {
  1005. /* Delete DataSetReader config */
  1006. UA_String_deleteMembers(&dataSetReader->config.name);
  1007. UA_Variant_deleteMembers(&dataSetReader->config.publisherId);
  1008. UA_DataSetMetaDataType_deleteMembers(&dataSetReader->config.dataSetMetaData);
  1009. UA_UadpDataSetReaderMessageDataType_deleteMembers(&dataSetReader->config.messageSettings);
  1010. UA_TargetVariablesDataType_deleteMembers(&dataSetReader->subscribedDataSetTarget);
  1011. /* Delete DataSetReader */
  1012. UA_ReaderGroup* pGroup = UA_ReaderGroup_findRGbyId(server, dataSetReader->linkedReaderGroup);
  1013. if(pGroup != NULL) {
  1014. pGroup->readersCount--;
  1015. }
  1016. UA_NodeId_deleteMembers(&dataSetReader->identifier);
  1017. UA_NodeId_deleteMembers(&dataSetReader->linkedReaderGroup);
  1018. /* Remove DataSetReader from group */
  1019. LIST_REMOVE(dataSetReader, listEntry);
  1020. /* Free memory allocated for DataSetReader */
  1021. UA_free(dataSetReader);
  1022. }
  1023. /**********************************************/
  1024. /* PublishedDataSet */
  1025. /**********************************************/
  1026. UA_StatusCode
  1027. UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
  1028. UA_PublishedDataSetConfig *dst) {
  1029. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1030. memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
  1031. retVal |= UA_String_copy(&src->name, &dst->name);
  1032. switch(src->publishedDataSetType){
  1033. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  1034. //no additional items
  1035. break;
  1036. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  1037. if(src->config.itemsTemplate.variablesToAddSize > 0){
  1038. dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
  1039. src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
  1040. }
  1041. for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
  1042. retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
  1043. &dst->config.itemsTemplate.variablesToAdd[i]);
  1044. }
  1045. retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
  1046. &dst->config.itemsTemplate.metaData);
  1047. break;
  1048. default:
  1049. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1050. }
  1051. return retVal;
  1052. }
  1053. UA_StatusCode
  1054. UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
  1055. UA_PublishedDataSetConfig *config){
  1056. if(!config)
  1057. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1058. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  1059. if(!currentPublishedDataSet)
  1060. return UA_STATUSCODE_BADNOTFOUND;
  1061. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  1062. //deep copy of the actual config
  1063. UA_PublishedDataSetConfig_copy(&currentPublishedDataSet->config, &tmpPublishedDataSetConfig);
  1064. *config = tmpPublishedDataSetConfig;
  1065. return UA_STATUSCODE_GOOD;
  1066. }
  1067. UA_PublishedDataSet *
  1068. UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
  1069. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  1070. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
  1071. return &server->pubSubManager.publishedDataSets[i];
  1072. }
  1073. }
  1074. return NULL;
  1075. }
  1076. void
  1077. UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
  1078. //delete pds config
  1079. UA_String_deleteMembers(&pdsConfig->name);
  1080. switch (pdsConfig->publishedDataSetType){
  1081. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  1082. //no additional items
  1083. break;
  1084. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  1085. if(pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
  1086. for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
  1087. UA_PublishedVariableDataType_deleteMembers(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
  1088. }
  1089. UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
  1090. }
  1091. UA_DataSetMetaDataType_deleteMembers(&pdsConfig->config.itemsTemplate.metaData);
  1092. break;
  1093. default:
  1094. break;
  1095. }
  1096. }
  1097. void
  1098. UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
  1099. UA_PublishedDataSetConfig_deleteMembers(&publishedDataSet->config);
  1100. //delete PDS
  1101. UA_DataSetMetaDataType_deleteMembers(&publishedDataSet->dataSetMetaData);
  1102. UA_DataSetField *field, *tmpField;
  1103. LIST_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
  1104. UA_Server_removeDataSetField(server, field->identifier);
  1105. }
  1106. UA_NodeId_deleteMembers(&publishedDataSet->identifier);
  1107. }
  1108. UA_DataSetFieldResult
  1109. UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
  1110. const UA_DataSetFieldConfig *fieldConfig,
  1111. UA_NodeId *fieldIdentifier) {
  1112. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1113. UA_DataSetFieldResult result = {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
  1114. if(!fieldConfig)
  1115. return result;
  1116. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
  1117. if(currentDataSet == NULL){
  1118. result.result = UA_STATUSCODE_BADNOTFOUND;
  1119. return result;
  1120. }
  1121. if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
  1122. result.result = UA_STATUSCODE_BADNOTIMPLEMENTED;
  1123. return result;
  1124. }
  1125. UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
  1126. if(!newField){
  1127. result.result = UA_STATUSCODE_BADINTERNALERROR;
  1128. return result;
  1129. }
  1130. UA_DataSetFieldConfig tmpFieldConfig;
  1131. retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
  1132. newField->config = tmpFieldConfig;
  1133. UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
  1134. if(fieldIdentifier != NULL){
  1135. UA_NodeId_copy(&newField->identifier, fieldIdentifier);
  1136. }
  1137. newField->publishedDataSet = currentDataSet->identifier;
  1138. //update major version of parent published data set
  1139. currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
  1140. LIST_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
  1141. if(newField->config.field.variable.promotedField)
  1142. currentDataSet->promotedFieldsCount++;
  1143. currentDataSet->fieldSize++;
  1144. result.result = retVal;
  1145. result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1146. result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1147. return result;
  1148. }
  1149. UA_DataSetFieldResult
  1150. UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
  1151. UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
  1152. UA_DataSetFieldResult result = {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  1153. if(!currentField)
  1154. return result;
  1155. UA_PublishedDataSet *parentPublishedDataSet =
  1156. UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
  1157. if(!parentPublishedDataSet)
  1158. return result;
  1159. parentPublishedDataSet->fieldSize--;
  1160. if(currentField->config.field.variable.promotedField)
  1161. parentPublishedDataSet->promotedFieldsCount--;
  1162. /* update major version of PublishedDataSet */
  1163. parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
  1164. UA_PubSubConfigurationVersionTimeDifference();
  1165. UA_DataSetField_deleteMembers(currentField);
  1166. LIST_REMOVE(currentField, listEntry);
  1167. UA_free(currentField);
  1168. result.result = UA_STATUSCODE_GOOD;
  1169. result.configurationVersion.majorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1170. result.configurationVersion.minorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1171. return result;
  1172. }
  1173. /**********************************************/
  1174. /* DataSetWriter */
  1175. /**********************************************/
  1176. UA_StatusCode
  1177. UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
  1178. UA_DataSetWriterConfig *dst){
  1179. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1180. memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
  1181. retVal |= UA_String_copy(&src->name, &dst->name);
  1182. retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
  1183. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  1184. dst->dataSetWriterProperties = (UA_KeyValuePair *)
  1185. UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
  1186. if(!dst->dataSetWriterProperties)
  1187. return UA_STATUSCODE_BADOUTOFMEMORY;
  1188. for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
  1189. retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
  1190. }
  1191. return retVal;
  1192. }
  1193. UA_StatusCode
  1194. UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
  1195. UA_DataSetWriterConfig *config){
  1196. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1197. if(!config)
  1198. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1199. UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  1200. if(!currentDataSetWriter)
  1201. return UA_STATUSCODE_BADNOTFOUND;
  1202. UA_DataSetWriterConfig tmpWriterConfig;
  1203. //deep copy of the actual config
  1204. retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
  1205. *config = tmpWriterConfig;
  1206. return retVal;
  1207. }
  1208. UA_DataSetWriter *
  1209. UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
  1210. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  1211. UA_WriterGroup *tmpWriterGroup;
  1212. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
  1213. UA_DataSetWriter *tmpWriter;
  1214. LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
  1215. if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
  1216. return tmpWriter;
  1217. }
  1218. }
  1219. }
  1220. }
  1221. return NULL;
  1222. }
  1223. void
  1224. UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig) {
  1225. UA_String_deleteMembers(&pdsConfig->name);
  1226. UA_String_deleteMembers(&pdsConfig->dataSetName);
  1227. for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
  1228. UA_KeyValuePair_deleteMembers(&pdsConfig->dataSetWriterProperties[i]);
  1229. }
  1230. UA_free(pdsConfig->dataSetWriterProperties);
  1231. UA_ExtensionObject_deleteMembers(&pdsConfig->messageSettings);
  1232. }
  1233. static void
  1234. UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
  1235. UA_DataSetWriterConfig_deleteMembers(&dataSetWriter->config);
  1236. //delete DataSetWriter
  1237. UA_NodeId_deleteMembers(&dataSetWriter->identifier);
  1238. UA_NodeId_deleteMembers(&dataSetWriter->linkedWriterGroup);
  1239. UA_NodeId_deleteMembers(&dataSetWriter->connectedDataSet);
  1240. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1241. //delete lastSamples store
  1242. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
  1243. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  1244. }
  1245. UA_free(dataSetWriter->lastSamples);
  1246. dataSetWriter->lastSamples = NULL;
  1247. dataSetWriter->lastSamplesCount = 0;
  1248. #endif
  1249. }
  1250. /**********************************************/
  1251. /* WriterGroup */
  1252. /**********************************************/
  1253. UA_StatusCode
  1254. UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
  1255. UA_WriterGroupConfig *dst){
  1256. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1257. memcpy(dst, src, sizeof(UA_WriterGroupConfig));
  1258. retVal |= UA_String_copy(&src->name, &dst->name);
  1259. retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
  1260. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  1261. dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
  1262. if(!dst->groupProperties)
  1263. return UA_STATUSCODE_BADOUTOFMEMORY;
  1264. for(size_t i = 0; i < src->groupPropertiesSize; i++){
  1265. retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
  1266. }
  1267. return retVal;
  1268. }
  1269. UA_StatusCode
  1270. UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
  1271. UA_WriterGroupConfig *config){
  1272. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1273. if(!config)
  1274. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1275. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
  1276. if(!currentWriterGroup){
  1277. return UA_STATUSCODE_BADNOTFOUND;
  1278. }
  1279. UA_WriterGroupConfig tmpWriterGroupConfig;
  1280. //deep copy of the actual config
  1281. retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
  1282. *config = tmpWriterGroupConfig;
  1283. return retVal;
  1284. }
  1285. UA_StatusCode
  1286. UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
  1287. const UA_WriterGroupConfig *config){
  1288. if(!config)
  1289. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1290. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
  1291. if(!currentWriterGroup)
  1292. return UA_STATUSCODE_BADNOTFOUND;
  1293. //The update functionality will be extended during the next PubSub batches.
  1294. //Currently is only a change of the publishing interval possible.
  1295. if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
  1296. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
  1297. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  1298. UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
  1299. } else if(currentWriterGroup->config.priority != config->priority) {
  1300. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1301. "No or unsupported WriterGroup update.");
  1302. }
  1303. return UA_STATUSCODE_GOOD;
  1304. }
  1305. UA_WriterGroup *
  1306. UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
  1307. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  1308. UA_WriterGroup *tmpWriterGroup;
  1309. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
  1310. if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
  1311. return tmpWriterGroup;
  1312. }
  1313. }
  1314. }
  1315. return NULL;
  1316. }
  1317. void
  1318. UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig){
  1319. //delete writerGroup config
  1320. UA_String_deleteMembers(&writerGroupConfig->name);
  1321. UA_ExtensionObject_deleteMembers(&writerGroupConfig->transportSettings);
  1322. UA_ExtensionObject_deleteMembers(&writerGroupConfig->messageSettings);
  1323. for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
  1324. UA_KeyValuePair_deleteMembers(&writerGroupConfig->groupProperties[i]);
  1325. }
  1326. UA_free(writerGroupConfig->groupProperties);
  1327. }
  1328. static void
  1329. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup) {
  1330. UA_WriterGroupConfig_deleteMembers(&writerGroup->config);
  1331. //delete WriterGroup
  1332. //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
  1333. UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
  1334. LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
  1335. UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
  1336. }
  1337. UA_NodeId_deleteMembers(&writerGroup->linkedConnection);
  1338. UA_NodeId_deleteMembers(&writerGroup->identifier);
  1339. }
  1340. UA_StatusCode
  1341. UA_Server_addDataSetWriter(UA_Server *server,
  1342. const UA_NodeId writerGroup, const UA_NodeId dataSet,
  1343. const UA_DataSetWriterConfig *dataSetWriterConfig,
  1344. UA_NodeId *writerIdentifier) {
  1345. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1346. if(!dataSetWriterConfig)
  1347. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1348. UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
  1349. if(!currentDataSetContext)
  1350. return UA_STATUSCODE_BADNOTFOUND;
  1351. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  1352. if(!wg)
  1353. return UA_STATUSCODE_BADNOTFOUND;
  1354. UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
  1355. if(!newDataSetWriter)
  1356. return UA_STATUSCODE_BADOUTOFMEMORY;
  1357. //copy the config into the new dataSetWriter
  1358. UA_DataSetWriterConfig tmpDataSetWriterConfig;
  1359. retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
  1360. newDataSetWriter->config = tmpDataSetWriterConfig;
  1361. //save the current version of the connected PublishedDataSet
  1362. newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
  1363. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1364. //initialize the queue for the last values
  1365. newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
  1366. UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
  1367. if(!newDataSetWriter->lastSamples) {
  1368. UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
  1369. UA_free(newDataSetWriter);
  1370. return UA_STATUSCODE_BADOUTOFMEMORY;
  1371. }
  1372. newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
  1373. #endif
  1374. //connect PublishedDataSet with DataSetWriter
  1375. newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
  1376. newDataSetWriter->linkedWriterGroup = wg->identifier;
  1377. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
  1378. if(writerIdentifier != NULL)
  1379. UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
  1380. //add the new writer to the group
  1381. LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
  1382. wg->writersCount++;
  1383. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  1384. addDataSetWriterRepresentation(server, newDataSetWriter);
  1385. #endif
  1386. return retVal;
  1387. }
  1388. UA_StatusCode
  1389. UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
  1390. UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  1391. if(!dataSetWriter)
  1392. return UA_STATUSCODE_BADNOTFOUND;
  1393. UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
  1394. if(!linkedWriterGroup)
  1395. return UA_STATUSCODE_BADNOTFOUND;
  1396. linkedWriterGroup->writersCount--;
  1397. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  1398. removeDataSetWriterRepresentation(server, dataSetWriter);
  1399. #endif
  1400. //remove DataSetWriter from group
  1401. UA_DataSetWriter_deleteMembers(server, dataSetWriter);
  1402. LIST_REMOVE(dataSetWriter, listEntry);
  1403. UA_free(dataSetWriter);
  1404. return UA_STATUSCODE_GOOD;
  1405. }
  1406. /**********************************************/
  1407. /* DataSetField */
  1408. /**********************************************/
  1409. UA_StatusCode
  1410. UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
  1411. memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
  1412. if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
  1413. UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
  1414. UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
  1415. &dst->field.variable.publishParameters);
  1416. } else {
  1417. return UA_STATUSCODE_BADNOTSUPPORTED;
  1418. }
  1419. return UA_STATUSCODE_GOOD;
  1420. }
  1421. UA_StatusCode
  1422. UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
  1423. UA_DataSetFieldConfig *config) {
  1424. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1425. if(!config)
  1426. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1427. UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
  1428. if(!currentDataSetField)
  1429. return UA_STATUSCODE_BADNOTFOUND;
  1430. UA_DataSetFieldConfig tmpFieldConfig;
  1431. //deep copy of the actual config
  1432. retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
  1433. *config = tmpFieldConfig;
  1434. return retVal;
  1435. }
  1436. UA_DataSetField *
  1437. UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
  1438. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  1439. UA_DataSetField *tmpField;
  1440. LIST_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
  1441. if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
  1442. return tmpField;
  1443. }
  1444. }
  1445. }
  1446. return NULL;
  1447. }
  1448. void
  1449. UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig){
  1450. if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
  1451. UA_String_deleteMembers(&dataSetFieldConfig->field.variable.fieldNameAlias);
  1452. UA_PublishedVariableDataType_deleteMembers(&dataSetFieldConfig->field.variable.publishParameters);
  1453. }
  1454. }
  1455. static void
  1456. UA_DataSetField_deleteMembers(UA_DataSetField *field) {
  1457. UA_DataSetFieldConfig_deleteMembers(&field->config);
  1458. //delete DataSetField
  1459. UA_NodeId_deleteMembers(&field->identifier);
  1460. UA_NodeId_deleteMembers(&field->publishedDataSet);
  1461. UA_FieldMetaData_deleteMembers(&field->fieldMetaData);
  1462. }
  1463. /*********************************************************/
  1464. /* PublishValues handling */
  1465. /*********************************************************/
  1466. /**
  1467. * Compare two variants. Internally used for value change detection.
  1468. *
  1469. * @return true if the value has changed
  1470. */
  1471. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1472. static UA_Boolean
  1473. valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
  1474. if(! (oldValue && newValue))
  1475. return false;
  1476. UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
  1477. size_t oldValueEncodingSize, newValueEncodingSize;
  1478. oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1479. newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1480. if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
  1481. return false;
  1482. if(oldValueEncodingSize != newValueEncodingSize)
  1483. return true;
  1484. if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
  1485. return false;
  1486. if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
  1487. return false;
  1488. UA_Byte *bufPosOldValue = oldValueEncoding->data;
  1489. const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
  1490. UA_Byte *bufPosNewValue = newValueEncoding->data;
  1491. const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
  1492. if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
  1493. &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1494. return false;
  1495. }
  1496. if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
  1497. &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1498. return false;
  1499. }
  1500. oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
  1501. newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
  1502. UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
  1503. UA_ByteString_delete(oldValueEncoding);
  1504. UA_ByteString_delete(newValueEncoding);
  1505. return compareResult;
  1506. }
  1507. #endif
  1508. /**
  1509. * Obtain the latest value for a specific DataSetField. This method is currently
  1510. * called inside the DataSetMessage generation process.
  1511. */
  1512. static void
  1513. UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
  1514. UA_DataValue *value) {
  1515. /* Read the value */
  1516. UA_ReadValueId rvid;
  1517. UA_ReadValueId_init(&rvid);
  1518. rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
  1519. rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
  1520. rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
  1521. *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
  1522. }
  1523. static UA_StatusCode
  1524. UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1525. UA_DataSetWriter *dataSetWriter) {
  1526. UA_PublishedDataSet *currentDataSet =
  1527. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1528. if(!currentDataSet)
  1529. return UA_STATUSCODE_BADNOTFOUND;
  1530. /* Prepare DataSetMessageContent */
  1531. dataSetMessage->header.dataSetMessageValid = true;
  1532. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
  1533. dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
  1534. dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
  1535. UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
  1536. if(!dataSetMessage->data.keyFrameData.dataSetFields)
  1537. return UA_STATUSCODE_BADOUTOFMEMORY;
  1538. #ifdef UA_ENABLE_JSON_ENCODING
  1539. /* json: insert fieldnames used as json keys */
  1540. dataSetMessage->data.keyFrameData.fieldNames =
  1541. (UA_String *)UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_STRING]);
  1542. if(!dataSetMessage->data.keyFrameData.fieldNames)
  1543. return UA_STATUSCODE_BADOUTOFMEMORY;
  1544. #endif
  1545. /* Loop over the fields */
  1546. size_t counter = 0;
  1547. UA_DataSetField *dsf;
  1548. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1549. #ifdef UA_ENABLE_JSON_ENCODING
  1550. /* json: store the fieldNameAlias*/
  1551. UA_String_copy(&dsf->config.field.variable.fieldNameAlias,
  1552. &dataSetMessage->data.keyFrameData.fieldNames[counter]);
  1553. #endif
  1554. /* Sample the value */
  1555. UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
  1556. UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
  1557. /* Deactivate statuscode? */
  1558. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1559. dfv->hasStatus = false;
  1560. /* Deactivate timestamps */
  1561. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1562. (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1563. dfv->hasSourceTimestamp = false;
  1564. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1565. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1566. dfv->hasSourcePicoseconds = false;
  1567. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1568. (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1569. dfv->hasServerTimestamp = false;
  1570. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1571. (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1572. dfv->hasServerPicoseconds = false;
  1573. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1574. /* Update lastValue store */
  1575. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  1576. UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
  1577. #endif
  1578. counter++;
  1579. }
  1580. return UA_STATUSCODE_GOOD;
  1581. }
  1582. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1583. static UA_StatusCode
  1584. UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
  1585. UA_DataSetMessage *dataSetMessage,
  1586. UA_DataSetWriter *dataSetWriter) {
  1587. UA_PublishedDataSet *currentDataSet =
  1588. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1589. if(!currentDataSet)
  1590. return UA_STATUSCODE_BADNOTFOUND;
  1591. /* Prepare DataSetMessageContent */
  1592. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1593. dataSetMessage->header.dataSetMessageValid = true;
  1594. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
  1595. UA_DataSetField *dsf;
  1596. size_t counter = 0;
  1597. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1598. /* Sample the value */
  1599. UA_DataValue value;
  1600. UA_DataValue_init(&value);
  1601. UA_PubSubDataSetField_sampleValue(server, dsf, &value);
  1602. /* Check if the value has changed */
  1603. if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
  1604. /* increase fieldCount for current delta message */
  1605. dataSetMessage->data.deltaFrameData.fieldCount++;
  1606. dataSetWriter->lastSamples[counter].valueChanged = true;
  1607. /* Update last stored sample */
  1608. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  1609. dataSetWriter->lastSamples[counter].value = value;
  1610. } else {
  1611. UA_DataValue_deleteMembers(&value);
  1612. dataSetWriter->lastSamples[counter].valueChanged = false;
  1613. }
  1614. counter++;
  1615. }
  1616. /* Allocate DeltaFrameFields */
  1617. UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
  1618. UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
  1619. if(!deltaFields)
  1620. return UA_STATUSCODE_BADOUTOFMEMORY;
  1621. dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
  1622. size_t currentDeltaField = 0;
  1623. for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
  1624. if(!dataSetWriter->lastSamples[i].valueChanged)
  1625. continue;
  1626. UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
  1627. dff->fieldIndex = (UA_UInt16) i;
  1628. UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
  1629. dataSetWriter->lastSamples[i].valueChanged = false;
  1630. /* Deactivate statuscode? */
  1631. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1632. dff->fieldValue.hasStatus = false;
  1633. /* Deactivate timestamps? */
  1634. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1635. dff->fieldValue.hasSourceTimestamp = false;
  1636. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1637. dff->fieldValue.hasServerPicoseconds = false;
  1638. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1639. dff->fieldValue.hasServerTimestamp = false;
  1640. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1641. dff->fieldValue.hasServerPicoseconds = false;
  1642. currentDeltaField++;
  1643. }
  1644. return UA_STATUSCODE_GOOD;
  1645. }
  1646. #endif
  1647. /**
  1648. * Generate a DataSetMessage for the given writer.
  1649. *
  1650. * @param dataSetWriter ptr to corresponding writer
  1651. * @return ptr to generated DataSetMessage
  1652. */
  1653. static UA_StatusCode
  1654. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1655. UA_DataSetWriter *dataSetWriter) {
  1656. UA_PublishedDataSet *currentDataSet =
  1657. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1658. if(!currentDataSet)
  1659. return UA_STATUSCODE_BADNOTFOUND;
  1660. /* Reset the message */
  1661. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1662. /* store messageType to switch between json or uadp (default) */
  1663. UA_UInt16 messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1664. UA_JsonDataSetWriterMessageDataType *jsonDataSetWriterMessageDataType = NULL;
  1665. /* The configuration Flags are included
  1666. * inside the std. defined UA_UadpDataSetWriterMessageDataType */
  1667. UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
  1668. UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
  1669. if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1670. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1671. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1672. &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
  1673. dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
  1674. dataSetWriter->config.messageSettings.content.decoded.data;
  1675. /* type is UADP */
  1676. messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1677. } else if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1678. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1679. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1680. &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE])) {
  1681. jsonDataSetWriterMessageDataType = (UA_JsonDataSetWriterMessageDataType *)
  1682. dataSetWriter->config.messageSettings.content.decoded.data;
  1683. /* type is JSON */
  1684. messageType = UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE;
  1685. } else {
  1686. /* create default flag configuration if no
  1687. * UadpDataSetWriterMessageDataType was passed in */
  1688. memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
  1689. defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
  1690. ((u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
  1691. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
  1692. dataSetWriterMessageDataType = &defaultUadpConfiguration;
  1693. }
  1694. /* Sanity-test the configuration */
  1695. if(dataSetWriterMessageDataType &&
  1696. (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
  1697. dataSetWriterMessageDataType->dataSetOffset != 0 ||
  1698. dataSetWriterMessageDataType->configuredSize != 0)) {
  1699. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1700. "Static DSM configuration not supported. Using defaults");
  1701. dataSetWriterMessageDataType->networkMessageNumber = 0;
  1702. dataSetWriterMessageDataType->dataSetOffset = 0;
  1703. dataSetWriterMessageDataType->configuredSize = 0;
  1704. }
  1705. /* The field encoding depends on the flags inside the writer config.
  1706. * TODO: This can be moved to the encoding layer. */
  1707. if(dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_RAWDATA
  1708. ) {
  1709. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
  1710. } else if((u64)dataSetWriter->config.dataSetFieldContentMask &
  1711. ((u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
  1712. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
  1713. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
  1714. } else {
  1715. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
  1716. }
  1717. if(messageType == UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE) {
  1718. /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
  1719. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1720. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION) {
  1721. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1722. dataSetMessage->header.configVersionMajorVersion =
  1723. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1724. }
  1725. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1726. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION) {
  1727. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1728. dataSetMessage->header.configVersionMinorVersion =
  1729. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1730. }
  1731. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1732. (u64)UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1733. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1734. dataSetMessage->header.dataSetMessageSequenceNr =
  1735. dataSetWriter->actualDataSetMessageSequenceCount;
  1736. }
  1737. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1738. (u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1739. dataSetMessage->header.timestampEnabled = true;
  1740. dataSetMessage->header.timestamp = UA_DateTime_now();
  1741. }
  1742. /* TODO: Picoseconds resolution not supported atm */
  1743. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1744. (u64)UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
  1745. dataSetMessage->header.picoSecondsIncluded = false;
  1746. }
  1747. /* TODO: Statuscode not supported yet */
  1748. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1749. (u64)UA_UADPDATASETMESSAGECONTENTMASK_STATUS) {
  1750. dataSetMessage->header.statusEnabled = false;
  1751. }
  1752. } else if(messageType == UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE) {
  1753. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1754. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1755. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1756. dataSetMessage->header.configVersionMajorVersion =
  1757. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1758. }
  1759. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1760. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1761. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1762. dataSetMessage->header.configVersionMinorVersion =
  1763. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1764. }
  1765. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1766. (u64)UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1767. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1768. dataSetMessage->header.dataSetMessageSequenceNr =
  1769. dataSetWriter->actualDataSetMessageSequenceCount;
  1770. }
  1771. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1772. (u64)UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1773. dataSetMessage->header.timestampEnabled = true;
  1774. dataSetMessage->header.timestamp = UA_DateTime_now();
  1775. }
  1776. /* TODO: Statuscode not supported yet */
  1777. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1778. (u64)UA_JSONDATASETMESSAGECONTENTMASK_STATUS) {
  1779. dataSetMessage->header.statusEnabled = false;
  1780. }
  1781. }
  1782. /* Set the sequence count. Automatically rolls over to zero */
  1783. dataSetWriter->actualDataSetMessageSequenceCount++;
  1784. /* JSON does not differ between deltaframes and keyframes, only keyframes are currently used. */
  1785. if(messageType != UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
  1786. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1787. /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
  1788. if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
  1789. dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
  1790. /* Remove old samples */
  1791. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
  1792. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  1793. /* Realloc pds dependent memory */
  1794. dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
  1795. UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
  1796. UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1797. if(!newSamplesArray)
  1798. return UA_STATUSCODE_BADOUTOFMEMORY;
  1799. dataSetWriter->lastSamples = newSamplesArray;
  1800. memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1801. dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
  1802. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1803. dataSetWriter->deltaFrameCounter = 0;
  1804. return UA_STATUSCODE_GOOD;
  1805. }
  1806. /* The standard defines: if a PDS contains only one fields no delta messages
  1807. * should be generated because they need more memory than a keyframe with 1
  1808. * field. */
  1809. if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
  1810. dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
  1811. UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
  1812. dataSetWriter->deltaFrameCounter++;
  1813. return UA_STATUSCODE_GOOD;
  1814. }
  1815. dataSetWriter->deltaFrameCounter = 1;
  1816. #endif
  1817. }
  1818. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1819. return UA_STATUSCODE_GOOD;
  1820. }
  1821. static UA_StatusCode
  1822. sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
  1823. UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
  1824. UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
  1825. #ifdef UA_ENABLE_JSON_ENCODING
  1826. UA_NetworkMessage nm;
  1827. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1828. nm.version = 1;
  1829. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1830. nm.payloadHeaderEnabled = true;
  1831. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1832. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1833. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1834. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1835. UA_ByteString buf;
  1836. size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true);
  1837. size_t stackSize = 1;
  1838. if(msgSize <= UA_MAX_STACKBUF)
  1839. stackSize = msgSize;
  1840. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1841. buf.data = stackBuf;
  1842. buf.length = msgSize;
  1843. if(msgSize > UA_MAX_STACKBUF) {
  1844. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1845. if(retval != UA_STATUSCODE_GOOD)
  1846. return retval;
  1847. }
  1848. /* Encode the message */
  1849. UA_Byte *bufPos = buf.data;
  1850. memset(bufPos, 0, msgSize);
  1851. const UA_Byte *bufEnd = &buf.data[buf.length];
  1852. retval = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true);
  1853. if(retval != UA_STATUSCODE_GOOD) {
  1854. if(msgSize > UA_MAX_STACKBUF)
  1855. UA_ByteString_deleteMembers(&buf);
  1856. return retval;
  1857. }
  1858. /* Send the prepared messages */
  1859. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1860. if(msgSize > UA_MAX_STACKBUF)
  1861. UA_ByteString_deleteMembers(&buf);
  1862. #endif
  1863. return retval;
  1864. }
  1865. static UA_StatusCode
  1866. sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
  1867. UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
  1868. UA_ExtensionObject *messageSettings,
  1869. UA_ExtensionObject *transportSettings) {
  1870. if(messageSettings->content.decoded.type !=
  1871. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
  1872. return UA_STATUSCODE_BADINTERNALERROR;
  1873. UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
  1874. messageSettings->content.decoded.data;
  1875. UA_NetworkMessage nm;
  1876. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1877. nm.publisherIdEnabled =
  1878. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
  1879. nm.groupHeaderEnabled =
  1880. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
  1881. nm.groupHeader.writerGroupIdEnabled =
  1882. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
  1883. nm.groupHeader.groupVersionEnabled =
  1884. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
  1885. nm.groupHeader.networkMessageNumberEnabled =
  1886. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
  1887. nm.groupHeader.sequenceNumberEnabled =
  1888. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
  1889. nm.payloadHeaderEnabled =
  1890. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
  1891. nm.timestampEnabled =
  1892. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
  1893. nm.picosecondsEnabled =
  1894. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
  1895. nm.dataSetClassIdEnabled =
  1896. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
  1897. nm.promotedFieldsEnabled =
  1898. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
  1899. nm.version = 1;
  1900. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1901. if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
  1902. nm.publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
  1903. nm.publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
  1904. } else if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
  1905. nm.publisherIdType = UA_PUBLISHERDATATYPE_STRING;
  1906. nm.publisherId.publisherIdString = connection->config->publisherId.string;
  1907. }
  1908. /* Compute the length of the dsm separately for the header */
  1909. UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
  1910. for(UA_Byte i = 0; i < dsmCount; i++)
  1911. dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
  1912. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1913. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1914. nm.groupHeader.writerGroupId = wg->config.writerGroupId;
  1915. nm.groupHeader.networkMessageNumber = 1;
  1916. nm.payload.dataSetPayload.sizes = dsmLengths;
  1917. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1918. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1919. UA_ByteString buf;
  1920. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
  1921. size_t stackSize = 1;
  1922. if(msgSize <= UA_MAX_STACKBUF)
  1923. stackSize = msgSize;
  1924. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1925. buf.data = stackBuf;
  1926. buf.length = msgSize;
  1927. UA_StatusCode retval;
  1928. if(msgSize > UA_MAX_STACKBUF) {
  1929. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1930. if(retval != UA_STATUSCODE_GOOD)
  1931. return retval;
  1932. }
  1933. /* Encode the message */
  1934. UA_Byte *bufPos = buf.data;
  1935. memset(bufPos, 0, msgSize);
  1936. const UA_Byte *bufEnd = &buf.data[buf.length];
  1937. retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
  1938. if(retval != UA_STATUSCODE_GOOD) {
  1939. if(msgSize > UA_MAX_STACKBUF)
  1940. UA_ByteString_deleteMembers(&buf);
  1941. return retval;
  1942. }
  1943. /* Send the prepared messages */
  1944. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1945. if(msgSize > UA_MAX_STACKBUF)
  1946. UA_ByteString_deleteMembers(&buf);
  1947. return retval;
  1948. }
  1949. /* This callback triggers the collection and publish of NetworkMessages and the
  1950. * contained DataSetMessages. */
  1951. void
  1952. UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1953. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Publish Callback");
  1954. if(!writerGroup) {
  1955. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1956. "Publish failed. WriterGroup not found");
  1957. return;
  1958. }
  1959. /* Nothing to do? */
  1960. if(writerGroup->writersCount <= 0)
  1961. return;
  1962. /* Binary or Json encoding? */
  1963. if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
  1964. writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
  1965. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1966. "Publish failed: Unknown encoding type.");
  1967. return;
  1968. }
  1969. /* Find the connection associated with the writer */
  1970. UA_PubSubConnection *connection =
  1971. UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
  1972. if(!connection) {
  1973. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1974. "Publish failed. PubSubConnection invalid.");
  1975. return;
  1976. }
  1977. /* How many DSM can be sent in one NM? */
  1978. UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
  1979. if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
  1980. maxDSM = UA_BYTE_MAX;
  1981. /* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
  1982. if(maxDSM == 0)
  1983. maxDSM = 1;
  1984. /* It is possible to put several DataSetMessages into one NetworkMessage.
  1985. * But only if they do not contain promoted fields. NM with only DSM are
  1986. * sent out right away. The others are kept in a buffer for "batching". */
  1987. size_t dsmCount = 0;
  1988. UA_DataSetWriter *dsw;
  1989. UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
  1990. UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
  1991. LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
  1992. /* Find the dataset */
  1993. UA_PublishedDataSet *pds =
  1994. UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
  1995. if(!pds) {
  1996. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1997. "PubSub Publish: PublishedDataSet not found");
  1998. continue;
  1999. }
  2000. /* Generate the DSM */
  2001. UA_StatusCode res =
  2002. UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
  2003. if(res != UA_STATUSCODE_GOOD) {
  2004. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  2005. "PubSub Publish: DataSetMessage creation failed");
  2006. continue;
  2007. }
  2008. /* Send right away if there is only this DSM in a NM. If promoted fields
  2009. * are contained in the PublishedDataSet, then this DSM must go into a
  2010. * dedicated NM as well. */
  2011. if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
  2012. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  2013. res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
  2014. &dsw->config.dataSetWriterId, 1,
  2015. &writerGroup->config.messageSettings,
  2016. &writerGroup->config.transportSettings);
  2017. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  2018. res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
  2019. &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
  2020. }
  2021. if(res != UA_STATUSCODE_GOOD)
  2022. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  2023. "PubSub Publish: Could not send a NetworkMessage");
  2024. UA_DataSetMessage_free(&dsmStore[dsmCount]);
  2025. continue;
  2026. }
  2027. dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
  2028. dsmCount++;
  2029. }
  2030. /* Send the NetworkMessages with batched DataSetMessages */
  2031. size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
  2032. for(UA_UInt32 i = 0; i < nmCount; i++) {
  2033. UA_Byte nmDsmCount = maxDSM;
  2034. if(i == nmCount - 1)
  2035. nmDsmCount = (UA_Byte)dsmCount % maxDSM;
  2036. UA_StatusCode res3 = UA_STATUSCODE_GOOD;
  2037. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  2038. res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
  2039. &dsWriterIds[i * maxDSM], nmDsmCount,
  2040. &writerGroup->config.messageSettings,
  2041. &writerGroup->config.transportSettings);
  2042. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  2043. res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
  2044. &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
  2045. }
  2046. if(res3 != UA_STATUSCODE_GOOD)
  2047. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  2048. "PubSub Publish: Sending a NetworkMessage failed");
  2049. }
  2050. /* Clean up DSM */
  2051. for(size_t i = 0; i < dsmCount; i++)
  2052. UA_DataSetMessage_free(&dsmStore[i]);
  2053. }
  2054. /* Add new publishCallback. The first execution is triggered directly after
  2055. * creation. */
  2056. UA_StatusCode
  2057. UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  2058. UA_StatusCode retval =
  2059. UA_PubSubManager_addRepeatedCallback(server,
  2060. (UA_ServerCallback) UA_WriterGroup_publishCallback,
  2061. writerGroup, writerGroup->config.publishingInterval,
  2062. &writerGroup->publishCallbackId);
  2063. if(retval == UA_STATUSCODE_GOOD)
  2064. writerGroup->publishCallbackIsRegistered = true;
  2065. /* Run once after creation */
  2066. UA_WriterGroup_publishCallback(server, writerGroup);
  2067. return retval;
  2068. }
  2069. /* This callback triggers the collection and reception of NetworkMessages and the
  2070. * contained DataSetMessages. */
  2071. void UA_ReaderGroup_subscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  2072. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  2073. UA_ByteString buffer;
  2074. if(UA_ByteString_allocBuffer(&buffer, 512) != UA_STATUSCODE_GOOD) {
  2075. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Message buffer alloc failed!");
  2076. return;
  2077. }
  2078. connection->channel->receive(connection->channel, &buffer, NULL, 300000);
  2079. if(buffer.length > 0) {
  2080. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "Message received:");
  2081. UA_NetworkMessage currentNetworkMessage;
  2082. memset(&currentNetworkMessage, 0, sizeof(UA_NetworkMessage));
  2083. size_t currentPosition = 0;
  2084. UA_NetworkMessage_decodeBinary(&buffer, &currentPosition, &currentNetworkMessage);
  2085. UA_Server_processNetworkMessage(server, &currentNetworkMessage, connection);
  2086. UA_NetworkMessage_deleteMembers(&currentNetworkMessage);
  2087. }
  2088. UA_ByteString_deleteMembers(&buffer);
  2089. }
  2090. /* Add new subscribeCallback. The first execution is triggered directly after
  2091. * creation. */
  2092. UA_StatusCode
  2093. UA_ReaderGroup_addSubscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  2094. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  2095. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  2096. if(connection != NULL) {
  2097. retval = connection->channel->regist(connection->channel, NULL, NULL);
  2098. if(retval == UA_STATUSCODE_GOOD) {
  2099. retval = UA_PubSubManager_addRepeatedCallback(server,
  2100. (UA_ServerCallback) UA_ReaderGroup_subscribeCallback,
  2101. readerGroup, 5,
  2102. &readerGroup->subscribeCallbackId);
  2103. }
  2104. else {
  2105. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER, "register channel failed: 0x%x!", retval);
  2106. }
  2107. }
  2108. if(retval == UA_STATUSCODE_GOOD) {
  2109. readerGroup->subscribeCallbackIsRegistered = true;
  2110. }
  2111. /* Run once after creation */
  2112. UA_ReaderGroup_subscribeCallback(server, readerGroup);
  2113. return retval;
  2114. }
  2115. #endif /* UA_ENABLE_PUBSUB */