ua_pubsub.c 91 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. * Copyright (c) 2019 Kalycito Infotech Private Limited
  8. */
  9. #include "server/ua_server_internal.h"
  10. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  11. #include "ua_pubsub.h"
  12. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  13. #include "ua_pubsub_ns0.h"
  14. #endif
  15. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  16. #include "ua_types_encoding_binary.h"
  17. #endif
  18. #define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */
  19. #define UA_MAX_SIZENAME 64 /* Max size of Qualified Name of Subscribed Variable */
  20. /* Forward declaration */
  21. static void
  22. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup);
  23. static void
  24. UA_DataSetField_deleteMembers(UA_DataSetField *field);
  25. /**********************************************/
  26. /* Connection */
  27. /**********************************************/
  28. UA_StatusCode
  29. UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
  30. UA_PubSubConnectionConfig *dst) {
  31. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  32. memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
  33. retVal |= UA_String_copy(&src->name, &dst->name);
  34. retVal |= UA_Variant_copy(&src->address, &dst->address);
  35. retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
  36. retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
  37. if(src->connectionPropertiesSize > 0){
  38. dst->connectionProperties = (UA_KeyValuePair *)
  39. UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
  40. if(!dst->connectionProperties){
  41. return UA_STATUSCODE_BADOUTOFMEMORY;
  42. }
  43. for(size_t i = 0; i < src->connectionPropertiesSize; i++){
  44. retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
  45. &dst->connectionProperties[i].key);
  46. retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
  47. &dst->connectionProperties[i].value);
  48. }
  49. }
  50. return retVal;
  51. }
  52. UA_StatusCode
  53. UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
  54. UA_PubSubConnectionConfig *config) {
  55. if(!config)
  56. return UA_STATUSCODE_BADINVALIDARGUMENT;
  57. UA_PubSubConnection *currentPubSubConnection =
  58. UA_PubSubConnection_findConnectionbyId(server, connection);
  59. if(!currentPubSubConnection)
  60. return UA_STATUSCODE_BADNOTFOUND;
  61. UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
  62. //deep copy of the actual config
  63. UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
  64. *config = tmpPubSubConnectionConfig;
  65. return UA_STATUSCODE_GOOD;
  66. }
  67. UA_PubSubConnection *
  68. UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
  69. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  70. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
  71. return &server->pubSubManager.connections[i];
  72. }
  73. }
  74. return NULL;
  75. }
  76. void
  77. UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig) {
  78. UA_String_deleteMembers(&connectionConfig->name);
  79. UA_String_deleteMembers(&connectionConfig->transportProfileUri);
  80. UA_Variant_deleteMembers(&connectionConfig->connectionTransportSettings);
  81. UA_Variant_deleteMembers(&connectionConfig->address);
  82. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  83. UA_QualifiedName_deleteMembers(&connectionConfig->connectionProperties[i].key);
  84. UA_Variant_deleteMembers(&connectionConfig->connectionProperties[i].value);
  85. }
  86. UA_free(connectionConfig->connectionProperties);
  87. }
  88. void
  89. UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection) {
  90. //delete connection config
  91. UA_PubSubConnectionConfig_deleteMembers(connection->config);
  92. //remove contained WriterGroups
  93. UA_WriterGroup *writerGroup, *tmpWriterGroup;
  94. LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
  95. UA_Server_removeWriterGroup(server, writerGroup->identifier);
  96. }
  97. /* remove contained ReaderGroups */
  98. UA_ReaderGroup *readerGroups, *tmpReaderGroup;
  99. LIST_FOREACH_SAFE(readerGroups, &connection->readerGroups, listEntry, tmpReaderGroup){
  100. UA_Server_removeReaderGroup(server, readerGroups->identifier);
  101. }
  102. UA_NodeId_deleteMembers(&connection->identifier);
  103. if(connection->channel){
  104. connection->channel->close(connection->channel);
  105. }
  106. UA_free(connection->config);
  107. }
  108. /**
  109. * Regist connection given by connectionIdentifier
  110. *
  111. * @param server
  112. * @param connectionIdentifier
  113. */
  114. UA_StatusCode
  115. UA_PubSubConnection_regist(UA_Server *server, UA_NodeId *connectionIdentifier) {
  116. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, *connectionIdentifier);
  117. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  118. if(connection == NULL) {
  119. return UA_STATUSCODE_BADNOTFOUND;
  120. }
  121. retval = connection->channel->regist(connection->channel, NULL, NULL);
  122. if (retval != UA_STATUSCODE_GOOD) {
  123. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER, "register channel failed: 0x%x!", retval);
  124. }
  125. return retval;
  126. }
  127. UA_StatusCode
  128. UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
  129. const UA_WriterGroupConfig *writerGroupConfig,
  130. UA_NodeId *writerGroupIdentifier) {
  131. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  132. if(!writerGroupConfig)
  133. return UA_STATUSCODE_BADINVALIDARGUMENT;
  134. //search the connection by the given connectionIdentifier
  135. UA_PubSubConnection *currentConnectionContext =
  136. UA_PubSubConnection_findConnectionbyId(server, connection);
  137. if(!currentConnectionContext)
  138. return UA_STATUSCODE_BADNOTFOUND;
  139. //allocate memory for new WriterGroup
  140. UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
  141. if(!newWriterGroup)
  142. return UA_STATUSCODE_BADOUTOFMEMORY;
  143. newWriterGroup->linkedConnection = currentConnectionContext->identifier;
  144. UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
  145. if(writerGroupIdentifier){
  146. UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
  147. }
  148. //deep copy of the config
  149. UA_WriterGroupConfig tmpWriterGroupConfig;
  150. retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
  151. if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
  152. UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
  153. tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
  154. tmpWriterGroupConfig.messageSettings.content.decoded.type =
  155. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
  156. tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
  157. }
  158. newWriterGroup->config = tmpWriterGroupConfig;
  159. retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
  160. LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
  161. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  162. addWriterGroupRepresentation(server, newWriterGroup);
  163. #endif
  164. return retVal;
  165. }
  166. UA_StatusCode
  167. UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
  168. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  169. if(!wg)
  170. return UA_STATUSCODE_BADNOTFOUND;
  171. UA_PubSubConnection *connection =
  172. UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  173. if(!connection)
  174. return UA_STATUSCODE_BADNOTFOUND;
  175. //unregister the publish callback
  176. UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
  177. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  178. removeGroupRepresentation(server, wg);
  179. #endif
  180. UA_WriterGroup_deleteMembers(server, wg);
  181. LIST_REMOVE(wg, listEntry);
  182. UA_free(wg);
  183. return UA_STATUSCODE_GOOD;
  184. }
  185. /**********************************************/
  186. /* ReaderGroup */
  187. /**********************************************/
  188. /**
  189. * Add ReaderGroup to connection.
  190. *
  191. * @param server
  192. * @param connectionIdentifier
  193. * @param readerGroupConfiguration
  194. * @param readerGroupIdentifier
  195. * @return UA_STATUSCODE_GOOD on success
  196. */
  197. UA_StatusCode
  198. UA_Server_addReaderGroup(UA_Server *server, UA_NodeId connectionIdentifier,
  199. const UA_ReaderGroupConfig *readerGroupConfig,
  200. UA_NodeId *readerGroupIdentifier) {
  201. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  202. UA_ReaderGroupConfig tmpReaderGroupConfig;
  203. /* Check for valid readergroup configuration */
  204. if(!readerGroupConfig) {
  205. return UA_STATUSCODE_BADINVALIDARGUMENT;
  206. }
  207. /* Search the connection by the given connectionIdentifier */
  208. UA_PubSubConnection *currentConnectionContext = UA_PubSubConnection_findConnectionbyId(server, connectionIdentifier);
  209. if(!currentConnectionContext) {
  210. return UA_STATUSCODE_BADNOTFOUND;
  211. }
  212. /* Allocate memory for new reader group */
  213. UA_ReaderGroup *newGroup = (UA_ReaderGroup *)UA_calloc(1, sizeof(UA_ReaderGroup));
  214. if(!newGroup) {
  215. return UA_STATUSCODE_BADOUTOFMEMORY;
  216. }
  217. /* Generate nodeid for the readergroup identifier */
  218. newGroup->linkedConnection = currentConnectionContext->identifier;
  219. UA_PubSubManager_generateUniqueNodeId(server, &newGroup->identifier);
  220. if(readerGroupIdentifier) {
  221. UA_NodeId_copy(&newGroup->identifier, readerGroupIdentifier);
  222. }
  223. /* Deep copy of the config */
  224. retval |= UA_ReaderGroupConfig_copy(readerGroupConfig, &tmpReaderGroupConfig);
  225. newGroup->config = tmpReaderGroupConfig;
  226. retval |= UA_ReaderGroup_addSubscribeCallback(server, newGroup);
  227. LIST_INSERT_HEAD(&currentConnectionContext->readerGroups, newGroup, listEntry);
  228. currentConnectionContext->readerGroupsSize++;
  229. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  230. addReaderGroupRepresentation(server, newGroup);
  231. #endif
  232. return retval;
  233. }
  234. /**
  235. * Remove ReaderGroup from connection and delete contained readers.
  236. *
  237. * @param server
  238. * @param groupIdentifier
  239. * @return UA_STATUSCODE_GOOD on success
  240. */
  241. UA_StatusCode
  242. UA_Server_removeReaderGroup(UA_Server *server, UA_NodeId groupIdentifier) {
  243. UA_ReaderGroup* readerGroup = UA_ReaderGroup_findRGbyId(server, groupIdentifier);
  244. if(readerGroup == NULL) {
  245. return UA_STATUSCODE_BADNOTFOUND;
  246. }
  247. /* Search the connection to which the given readergroup is connected to */
  248. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  249. if(connection == NULL) {
  250. return UA_STATUSCODE_BADNOTFOUND;
  251. }
  252. /* Unregister subscribe callback */
  253. UA_PubSubManager_removeRepeatedPubSubCallback(server, readerGroup->subscribeCallbackId);
  254. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  255. /* To Do:RemoveGroupRepresentation(server, &readerGroup->identifier) */
  256. #endif
  257. /* UA_Server_ReaderGroup_delete also removes itself from the list */
  258. UA_Server_ReaderGroup_delete(server, readerGroup);
  259. /* Remove readerGroup from Connection */
  260. LIST_REMOVE(readerGroup, listEntry);
  261. UA_free(readerGroup);
  262. return UA_STATUSCODE_GOOD;
  263. }
  264. /**
  265. * To Do:
  266. * Update ReaderGroup configuration.
  267. *
  268. * @param server
  269. * @param readerGroupIdentifier
  270. * @param readerGroupConfiguration
  271. * @return UA_STATUSCODE_GOOD on success
  272. */
  273. UA_StatusCode
  274. UA_Server_ReaderGroup_updateConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  275. const UA_ReaderGroupConfig *config) {
  276. return UA_STATUSCODE_BADNOTIMPLEMENTED;
  277. }
  278. /**
  279. * Get ReaderGroup configuration.
  280. *
  281. * @param server
  282. * @param groupIdentifier
  283. * @param readerGroupConfiguration
  284. * @return UA_STATUSCODE_GOOD on success
  285. */
  286. UA_StatusCode
  287. UA_Server_ReaderGroup_getConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  288. UA_ReaderGroupConfig *config) {
  289. if(!config) {
  290. return UA_STATUSCODE_BADINVALIDARGUMENT;
  291. }
  292. /* Identify the readergroup through the readerGroupIdentifier */
  293. UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  294. if(!currentReaderGroup) {
  295. return UA_STATUSCODE_BADNOTFOUND;
  296. }
  297. UA_ReaderGroupConfig tmpReaderGroupConfig;
  298. /* deep copy of the actual config */
  299. UA_ReaderGroupConfig_copy(&currentReaderGroup->config, &tmpReaderGroupConfig);
  300. *config = tmpReaderGroupConfig;
  301. return UA_STATUSCODE_GOOD;
  302. }
  303. /* To Do UA_ReaderGroupConfig delete */
  304. /**
  305. * Delete ReaderGroup.
  306. *
  307. * @param server
  308. * @param groupIdentifier
  309. */
  310. void UA_Server_ReaderGroup_delete(UA_Server* server, UA_ReaderGroup *readerGroup) {
  311. /* To Do Call UA_ReaderGroupConfig_delete */
  312. UA_DataSetReader *dataSetReader, *tmpDataSetReader;
  313. LIST_FOREACH_SAFE(dataSetReader, &readerGroup->readers, listEntry, tmpDataSetReader) {
  314. UA_DataSetReader_delete(server, dataSetReader);
  315. }
  316. UA_PubSubConnection* pConn = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  317. if(pConn != NULL) {
  318. pConn->readerGroupsSize--;
  319. }
  320. /* Delete ReaderGroup and its members */
  321. UA_String_deleteMembers(&readerGroup->config.name);
  322. UA_NodeId_deleteMembers(&readerGroup->linkedConnection);
  323. UA_NodeId_deleteMembers(&readerGroup->identifier);
  324. }
  325. /**
  326. * Copy ReaderGroup configuration.
  327. *
  328. * @param source
  329. * @param destination
  330. * @return UA_STATUSCODE_GOOD on success
  331. */
  332. UA_StatusCode
  333. UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,
  334. UA_ReaderGroupConfig *dst) {
  335. UA_String_copy(&src->name, &dst->name);
  336. /* Currently simple memcpy only */
  337. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  338. return UA_STATUSCODE_GOOD;
  339. }
  340. static UA_DataSetReader *
  341. getReaderFromIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_PubSubConnection *pConnection) {
  342. if(pConnection->readerGroupsSize == 1) {
  343. if(LIST_FIRST(&pConnection->readerGroups)->readersCount == 1) {
  344. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "only 1 DataSetReader available. This one will be used.");
  345. return LIST_FIRST(&LIST_FIRST(&pConnection->readerGroups)->readers);
  346. }
  347. }
  348. if(!pMsg->publisherIdEnabled)
  349. return NULL;
  350. UA_ReaderGroup* readerGroup;
  351. LIST_FOREACH(readerGroup, &pConnection->readerGroups, listEntry) {
  352. UA_DataSetReader *tmpReader;
  353. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  354. switch (pMsg->publisherIdType) {
  355. case UA_PUBLISHERDATATYPE_BYTE:
  356. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_BYTE] &&
  357. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_BYTE &&
  358. pMsg->publisherId.publisherIdByte == *(UA_Byte*)tmpReader->config.publisherId.data) {
  359. return tmpReader;
  360. }
  361. break;
  362. case UA_PUBLISHERDATATYPE_UINT16:
  363. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT16] &&
  364. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT16 &&
  365. pMsg->publisherId.publisherIdUInt16 == *(UA_UInt16*)tmpReader->config.publisherId.data) {
  366. return tmpReader;
  367. }
  368. break;
  369. case UA_PUBLISHERDATATYPE_UINT32:
  370. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT32] &&
  371. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT32 &&
  372. pMsg->publisherId.publisherIdUInt32 == *(UA_UInt32*)tmpReader->config.publisherId.data) {
  373. return tmpReader;
  374. }
  375. break;
  376. case UA_PUBLISHERDATATYPE_UINT64:
  377. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT64] &&
  378. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT64 &&
  379. pMsg->publisherId.publisherIdUInt64 == *(UA_UInt64*)tmpReader->config.publisherId.data) {
  380. return tmpReader;
  381. }
  382. break;
  383. case UA_PUBLISHERDATATYPE_STRING:
  384. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_STRING] &&
  385. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_STRING &&
  386. UA_String_equal(&pMsg->publisherId.publisherIdString, (UA_String*)tmpReader->config.publisherId.data)) {
  387. return tmpReader;
  388. }
  389. break;
  390. default:
  391. return NULL;
  392. }
  393. }
  394. }
  395. return NULL;
  396. }
  397. /**
  398. * Process NetworkMessage.
  399. *
  400. * @param server
  401. * @param networkmessage
  402. * @return UA_STATUSCODE_GOOD on success
  403. */
  404. UA_StatusCode
  405. UA_Server_processNetworkMessage(UA_Server *server, UA_NetworkMessage *pMsg,
  406. UA_PubSubConnection *pConnection) {
  407. if(!pMsg || !pConnection)
  408. return UA_STATUSCODE_BADINVALIDARGUMENT;
  409. /* To Do The condition with dataSetWriterIdAvailable and WriterGroupIdAvailable to be handled
  410. * when pMsg->groupHeaderEnabled, pMsg->dataSetClassIdEnabled, pMsg->payloadHeaderEnabled
  411. * Here some filtering is possible */
  412. UA_DataSetReader* dataSetReaderErg = getReaderFromIdentifier(server, pMsg, pConnection);
  413. /* No Reader with the specified id found */
  414. if(!dataSetReaderErg) {
  415. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "No DataSetReader found with PublisherId");
  416. return UA_STATUSCODE_BADNOTFOUND; /* TODO: Check the return code */
  417. }
  418. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetReader found with PublisherId");
  419. UA_Byte anzDataSets = 1;
  420. if(pMsg->payloadHeaderEnabled)
  421. anzDataSets = pMsg->payloadHeader.dataSetPayloadHeader.count;
  422. for(UA_Byte iterator = 0; iterator < anzDataSets; iterator++) {
  423. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Process Msg with DataSetReader!");
  424. UA_Server_DataSetReader_process(server, dataSetReaderErg, &pMsg->payload.dataSetPayload.dataSetMessages[iterator]);
  425. }
  426. /* To Do the condition with dataSetWriterId and WriterGroupId
  427. * else condition for dataSetWriterIdAvailable and writerGroupIdAvailable) */
  428. return UA_STATUSCODE_GOOD;
  429. }
  430. /**
  431. * Find ReaderGroup with its identifier.
  432. *
  433. * @param server
  434. * @param groupIdentifier
  435. * @return the ReaderGroup or NULL if not found
  436. */
  437. UA_ReaderGroup * UA_ReaderGroup_findRGbyId(UA_Server *server, UA_NodeId identifier) {
  438. for (size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
  439. UA_ReaderGroup* readerGroup = NULL;
  440. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
  441. if(UA_NodeId_equal(&identifier, &readerGroup->identifier)) {
  442. return readerGroup;
  443. }
  444. }
  445. }
  446. return NULL;
  447. }
  448. /**
  449. * Find a DataSetReader with its identifier
  450. *
  451. * @param server
  452. * @param identifier
  453. * @return the DataSetReader or NULL if not found
  454. */
  455. UA_DataSetReader *UA_ReaderGroup_findDSRbyId(UA_Server *server, UA_NodeId identifier) {
  456. for (size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
  457. UA_ReaderGroup* readerGroup = NULL;
  458. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
  459. UA_DataSetReader *tmpReader;
  460. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  461. if(UA_NodeId_equal(&tmpReader->identifier, &identifier)) {
  462. return tmpReader;
  463. }
  464. }
  465. }
  466. }
  467. return NULL;
  468. }
  469. /**********************************************/
  470. /* DataSetReader */
  471. /**********************************************/
  472. /**
  473. * Add a DataSetReader to ReaderGroup
  474. *
  475. * @param server
  476. * @param readerGroupIdentifier
  477. * @param dataSetReaderConfig
  478. * @param readerIdentifier
  479. * @return UA_STATUSCODE_GOOD on success
  480. */
  481. UA_StatusCode
  482. UA_Server_addDataSetReader(UA_Server *server, UA_NodeId readerGroupIdentifier,
  483. const UA_DataSetReaderConfig *dataSetReaderConfig,
  484. UA_NodeId *readerIdentifier) {
  485. /* Search the reader group by the given readerGroupIdentifier */
  486. UA_ReaderGroup *readerGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  487. if(!dataSetReaderConfig) {
  488. return UA_STATUSCODE_BADNOTFOUND;
  489. }
  490. if(readerGroup == NULL) {
  491. return UA_STATUSCODE_BADNOTFOUND;
  492. }
  493. /* Allocate memory for new DataSetReader */
  494. UA_DataSetReader *newDataSetReader = (UA_DataSetReader *)UA_calloc(1, sizeof(UA_DataSetReader));
  495. /* Copy the config into the new dataSetReader */
  496. UA_DataSetReaderConfig_copy(dataSetReaderConfig, &newDataSetReader->config);
  497. newDataSetReader->linkedReaderGroup = readerGroup->identifier;
  498. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetReader->identifier);
  499. if(readerIdentifier != NULL) {
  500. UA_NodeId_copy(&newDataSetReader->identifier, readerIdentifier);
  501. }
  502. /* Add the new reader to the group */
  503. LIST_INSERT_HEAD(&readerGroup->readers, newDataSetReader, listEntry);
  504. readerGroup->readersCount++;
  505. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  506. addDataSetReaderRepresentation(server, newDataSetReader);
  507. #endif
  508. return UA_STATUSCODE_GOOD;
  509. }
  510. /**
  511. * Remove a DataSetReader from ReaderGroup
  512. *
  513. * @param server
  514. * @param readerGroupIdentifier
  515. * @return UA_STATUSCODE_GOOD on success
  516. */
  517. UA_StatusCode
  518. UA_Server_removeDataSetReader(UA_Server *server, UA_NodeId readerIdentifier) {
  519. /* Remove datasetreader given by the identifier */
  520. UA_DataSetReader *dataSetReader = UA_ReaderGroup_findDSRbyId(server, readerIdentifier);
  521. if(!dataSetReader) {
  522. return UA_STATUSCODE_BADNOTFOUND;
  523. }
  524. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  525. removeDataSetReaderRepresentation(server, dataSetReader);
  526. #endif
  527. UA_DataSetReader_delete(server, dataSetReader);
  528. return UA_STATUSCODE_GOOD;
  529. }
  530. /**
  531. * Update the config of the DataSetReader.
  532. *
  533. * @param server
  534. * @param dataSetReaderIdentifier
  535. * @param readerGroupIdentifier
  536. * @param config
  537. * @return UA_STATUSCODE_GOOD on success
  538. */
  539. UA_StatusCode
  540. UA_Server_DataSetReader_updateConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier, UA_NodeId readerGroupIdentifier,
  541. const UA_DataSetReaderConfig *config) {
  542. if(config == NULL) {
  543. return UA_STATUSCODE_BADINVALIDARGUMENT;
  544. }
  545. UA_DataSetReader *currentDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  546. UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  547. if(!currentDataSetReader) {
  548. return UA_STATUSCODE_BADNOTFOUND;
  549. }
  550. /* The update functionality will be extended during the next PubSub batches.
  551. * Currently is only a change of the publishing interval possible. */
  552. if(currentDataSetReader->config.writerGroupId != config->writerGroupId) {
  553. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentReaderGroup->subscribeCallbackId);
  554. currentDataSetReader->config.writerGroupId = config->writerGroupId;
  555. UA_ReaderGroup_subscribeCallback(server, currentReaderGroup);
  556. }
  557. else {
  558. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  559. "No or unsupported ReaderGroup update.");
  560. }
  561. return UA_STATUSCODE_GOOD;
  562. }
  563. /**
  564. * Get the current config of the UA_DataSetReader.
  565. *
  566. * @param server
  567. * @param dataSetReaderIdentifier
  568. * @param config
  569. * @return UA_STATUSCODE_GOOD on success
  570. */
  571. UA_StatusCode
  572. UA_Server_DataSetReader_getConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier,
  573. UA_DataSetReaderConfig *config) {
  574. if(!config) {
  575. return UA_STATUSCODE_BADINVALIDARGUMENT;
  576. }
  577. UA_DataSetReader *currentDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  578. if(!currentDataSetReader) {
  579. return UA_STATUSCODE_BADNOTFOUND;
  580. }
  581. UA_DataSetReaderConfig tmpReaderConfig;
  582. /* Deep copy of the actual config */
  583. UA_DataSetReaderConfig_copy(&currentDataSetReader->config, &tmpReaderConfig);
  584. *config = tmpReaderConfig;
  585. return UA_STATUSCODE_GOOD;
  586. }
  587. /**
  588. * This Method is used to initially set the SubscribedDataSet to TargetVariablesType and to create the list of target Variables of a SubscribedDataSetType.
  589. *
  590. * @param server
  591. * @param dataSetReaderIdentifier
  592. * @param targetVariables
  593. * @return UA_STATUSCODE_GOOD on success
  594. */
  595. UA_StatusCode
  596. UA_Server_DataSetReader_createTargetVariables(UA_Server *server, UA_NodeId dataSetReaderIdentifier, UA_TargetVariablesDataType *targetVariables) {
  597. UA_StatusCode retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
  598. UA_DataSetReader* pDS = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  599. if(pDS == NULL) {
  600. return UA_STATUSCODE_BADINVALIDARGUMENT;
  601. }
  602. if(pDS->subscribedDataSetTarget.targetVariablesSize > 0) {
  603. UA_TargetVariablesDataType_deleteMembers(&pDS->subscribedDataSetTarget);
  604. pDS->subscribedDataSetTarget.targetVariablesSize = 0;
  605. pDS->subscribedDataSetTarget.targetVariables = NULL;
  606. }
  607. /* Set subscribed dataset to TargetVariableType */
  608. pDS->subscribedDataSetType = UA_PUBSUB_SDS_TARGET;
  609. retval = UA_TargetVariablesDataType_copy(targetVariables, &pDS->subscribedDataSetTarget);
  610. return retval;
  611. }
  612. /**
  613. * Adds Subscribed Variables from the DataSetMetaData for the given DataSet into the given parent node
  614. * and creates the corresponding data in the targetVariables of the DataSetReader
  615. *
  616. * @param server
  617. * @param parentNode
  618. * @param dataSetReaderIdentifier
  619. * @return UA_STATUSCODE_GOOD on success
  620. */
  621. UA_StatusCode UA_Server_DataSetReader_addTargetVariables(UA_Server *server, UA_NodeId *parentNode, UA_NodeId dataSetReaderIdentifier, UA_SubscribedDataSetEnumType sdsType) {
  622. if((server == NULL) || (parentNode == NULL)) {
  623. return UA_STATUSCODE_BADINVALIDARGUMENT;
  624. }
  625. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  626. UA_DataSetReader* pDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  627. if(pDataSetReader == NULL) {
  628. return UA_STATUSCODE_BADINVALIDARGUMENT;
  629. }
  630. UA_TargetVariablesDataType targetVars;
  631. targetVars.targetVariablesSize = pDataSetReader->config.dataSetMetaData.fieldsSize;
  632. targetVars.targetVariables = (UA_FieldTargetDataType *)UA_calloc(targetVars.targetVariablesSize, sizeof(UA_FieldTargetDataType));
  633. for (size_t iteratorField = 0; iteratorField < pDataSetReader->config.dataSetMetaData.fieldsSize; iteratorField++) {
  634. UA_VariableAttributes vAttr = UA_VariableAttributes_default;
  635. vAttr.valueRank = pDataSetReader->config.dataSetMetaData.fields[iteratorField].valueRank;
  636. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize > 0) {
  637. retval = UA_Array_copy(pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensions, pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize, (void**)&vAttr.arrayDimensions, &UA_TYPES[UA_TYPES_UINT32]);
  638. if(retval == UA_STATUSCODE_GOOD) {
  639. vAttr.arrayDimensionsSize = pDataSetReader->config.dataSetMetaData.fields[iteratorField].arrayDimensionsSize;
  640. }
  641. }
  642. vAttr.dataType = pDataSetReader->config.dataSetMetaData.fields[iteratorField].dataType;
  643. vAttr.accessLevel = UA_ACCESSLEVELMASK_READ;
  644. UA_LocalizedText_copy(&pDataSetReader->config.dataSetMetaData.fields[iteratorField].description, &vAttr.description);
  645. UA_QualifiedName qn;
  646. UA_QualifiedName_init(&qn);
  647. char szTmpName[UA_MAX_SIZENAME];
  648. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length > 0) {
  649. UA_UInt16 slen = UA_MAX_SIZENAME -1;
  650. vAttr.displayName.locale = UA_STRING("en-US");
  651. vAttr.displayName.text = pDataSetReader->config.dataSetMetaData.fields[iteratorField].name;
  652. if(pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length < slen) {
  653. slen = (UA_UInt16)pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.length;
  654. UA_snprintf(szTmpName, sizeof(szTmpName), "%s", (const char*)pDataSetReader->config.dataSetMetaData.fields[iteratorField].name.data);
  655. }
  656. szTmpName[slen] = '\0';
  657. qn = UA_QUALIFIEDNAME(1, szTmpName);
  658. }
  659. else {
  660. strcpy(szTmpName, "SubscribedVariable");
  661. vAttr.displayName = UA_LOCALIZEDTEXT("en-US", szTmpName);
  662. qn = UA_QUALIFIEDNAME(1, "SubscribedVariable");
  663. }
  664. /* Add variable to the given parent node */
  665. UA_NodeId newNode;
  666. retval = UA_Server_addVariableNode(server, UA_NODEID_NULL, *parentNode,
  667. UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), qn,
  668. UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), vAttr, NULL, &newNode);
  669. if(retval == UA_STATUSCODE_GOOD) {
  670. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "addVariableNode %s succeeded", szTmpName);
  671. }
  672. else {
  673. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_USERLAND, "addVariableNode: error 0x%x", retval);
  674. }
  675. UA_FieldTargetDataType_init(&targetVars.targetVariables[iteratorField]);
  676. targetVars.targetVariables[iteratorField].attributeId = UA_ATTRIBUTEID_VALUE;
  677. UA_NodeId_copy(&newNode, &targetVars.targetVariables[iteratorField].targetNodeId);
  678. UA_NodeId_deleteMembers(&newNode);
  679. if(vAttr.arrayDimensionsSize > 0) {
  680. UA_Array_delete(vAttr.arrayDimensions, vAttr.arrayDimensionsSize, &UA_TYPES[UA_TYPES_UINT32]);
  681. }
  682. }
  683. if(sdsType == UA_PUBSUB_SDS_TARGET) {
  684. retval = UA_Server_DataSetReader_createTargetVariables(server, pDataSetReader->identifier, &targetVars);
  685. }
  686. UA_TargetVariablesDataType_deleteMembers(&targetVars);
  687. return retval;
  688. }
  689. /**
  690. * Process a NetworkMessage with a DataSetReader.
  691. *
  692. * @param server
  693. * @param dataSetReader
  694. * @param dataSetMsg
  695. */
  696. void UA_Server_DataSetReader_process(UA_Server *server, UA_DataSetReader *dataSetReader, UA_DataSetMessage* dataSetMsg) {
  697. if((dataSetReader == NULL) || (dataSetMsg == NULL) || (server == NULL)) {
  698. return;
  699. }
  700. if(!dataSetMsg->header.dataSetMessageValid) {
  701. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: message is not valid");
  702. /* To Do check ConfigurationVersion*/
  703. /*if(dataSetMsg->header.configVersionMajorVersionEnabled)
  704. * {
  705. * if(dataSetMsg->header.configVersionMajorVersion != dataSetReader->config.dataSetMetaData.configurationVersion.majorVersion)
  706. * {
  707. * UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: ConfigurationVersion MajorVersion does not match");
  708. * return;
  709. * }
  710. } */
  711. }
  712. else {
  713. if(dataSetMsg->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) {
  714. if(dataSetMsg->header.fieldEncoding != UA_FIELDENCODING_RAWDATA) {
  715. size_t anzFields = dataSetMsg->data.keyFrameData.fieldCount;
  716. if(dataSetReader->config.dataSetMetaData.fieldsSize < anzFields) {
  717. anzFields = dataSetReader->config.dataSetMetaData.fieldsSize;
  718. }
  719. if(dataSetReader->subscribedDataSetTarget.targetVariablesSize < anzFields) {
  720. anzFields = dataSetReader->subscribedDataSetTarget.targetVariablesSize;
  721. }
  722. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  723. for (UA_UInt16 iteratorField = 0; iteratorField < anzFields; iteratorField++) {
  724. if(dataSetMsg->data.keyFrameData.dataSetFields[iteratorField].hasValue) {
  725. if(dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].attributeId == UA_ATTRIBUTEID_VALUE) {
  726. retVal = UA_Server_writeValue(server, dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].targetNodeId, dataSetMsg->data.keyFrameData.dataSetFields[iteratorField].value);
  727. if(retVal != UA_STATUSCODE_GOOD) {
  728. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write Value KF %u: 0x%x", iteratorField, retVal);
  729. }
  730. }
  731. else {
  732. UA_WriteValue writeVal;
  733. UA_WriteValue_init(&writeVal);
  734. writeVal.attributeId = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].attributeId;
  735. writeVal.indexRange = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].receiverIndexRange;
  736. writeVal.nodeId = dataSetReader->subscribedDataSetTarget.targetVariables[iteratorField].targetNodeId;
  737. UA_DataValue_copy(&dataSetMsg->data.keyFrameData.dataSetFields[iteratorField], &writeVal.value);
  738. retVal = UA_Server_write(server, &writeVal);
  739. if(retVal != UA_STATUSCODE_GOOD) {
  740. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write KF %u: 0x%x", iteratorField, retVal);
  741. }
  742. }
  743. }
  744. }
  745. }
  746. }
  747. }
  748. }
  749. /**
  750. * Copy the config of the DataSetReader.
  751. *
  752. * @param src
  753. * @param dst
  754. * @return UA_STATUSCODE_GOOD on success
  755. */
  756. UA_StatusCode
  757. UA_DataSetReaderConfig_copy(const UA_DataSetReaderConfig *src,
  758. UA_DataSetReaderConfig *dst) {
  759. memset(dst, 0, sizeof(UA_DataSetReaderConfig));
  760. UA_StatusCode retVal = UA_String_copy(&src->name, &dst->name);
  761. if(retVal != UA_STATUSCODE_GOOD) {
  762. return retVal;
  763. }
  764. retVal = UA_Variant_copy(&src->publisherId, &dst->publisherId);
  765. if(retVal != UA_STATUSCODE_GOOD) {
  766. return retVal;
  767. }
  768. dst->writerGroupId = src->writerGroupId;
  769. dst->dataSetWriterId = src->dataSetWriterId;
  770. retVal = UA_DataSetMetaDataType_copy(&src->dataSetMetaData, &dst->dataSetMetaData);
  771. if(retVal != UA_STATUSCODE_GOOD) {
  772. return retVal;
  773. }
  774. dst->dataSetFieldContentMask = src->dataSetFieldContentMask;
  775. dst->messageReceiveTimeout = src->messageReceiveTimeout;
  776. /* Currently memcpy is used to copy the securityParameters */
  777. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  778. retVal = UA_UadpDataSetReaderMessageDataType_copy(&src->messageSettings, &dst->messageSettings);
  779. if(retVal != UA_STATUSCODE_GOOD) {
  780. return retVal;
  781. }
  782. return UA_STATUSCODE_GOOD;
  783. }
  784. /**
  785. * Delete the DataSetReader.
  786. *
  787. * @param server
  788. * @param dataSetReader
  789. */
  790. void UA_DataSetReader_delete(UA_Server *server, UA_DataSetReader *dataSetReader) {
  791. /* Delete DataSetReader config */
  792. UA_String_deleteMembers(&dataSetReader->config.name);
  793. UA_Variant_deleteMembers(&dataSetReader->config.publisherId);
  794. UA_DataSetMetaDataType_deleteMembers(&dataSetReader->config.dataSetMetaData);
  795. UA_UadpDataSetReaderMessageDataType_deleteMembers(&dataSetReader->config.messageSettings);
  796. UA_TargetVariablesDataType_deleteMembers(&dataSetReader->subscribedDataSetTarget);
  797. /* Delete DataSetReader */
  798. UA_ReaderGroup* pGroup = UA_ReaderGroup_findRGbyId(server, dataSetReader->linkedReaderGroup);
  799. if(pGroup != NULL) {
  800. pGroup->readersCount--;
  801. }
  802. UA_NodeId_deleteMembers(&dataSetReader->identifier);
  803. UA_NodeId_deleteMembers(&dataSetReader->linkedReaderGroup);
  804. /* Remove DataSetReader from group */
  805. LIST_REMOVE(dataSetReader, listEntry);
  806. /* Free memory allocated for DataSetReader */
  807. UA_free(dataSetReader);
  808. }
  809. /**********************************************/
  810. /* PublishedDataSet */
  811. /**********************************************/
  812. UA_StatusCode
  813. UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
  814. UA_PublishedDataSetConfig *dst) {
  815. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  816. memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
  817. retVal |= UA_String_copy(&src->name, &dst->name);
  818. switch(src->publishedDataSetType){
  819. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  820. //no additional items
  821. break;
  822. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  823. if(src->config.itemsTemplate.variablesToAddSize > 0){
  824. dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
  825. src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
  826. }
  827. for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
  828. retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
  829. &dst->config.itemsTemplate.variablesToAdd[i]);
  830. }
  831. retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
  832. &dst->config.itemsTemplate.metaData);
  833. break;
  834. default:
  835. return UA_STATUSCODE_BADINVALIDARGUMENT;
  836. }
  837. return retVal;
  838. }
  839. UA_StatusCode
  840. UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
  841. UA_PublishedDataSetConfig *config){
  842. if(!config)
  843. return UA_STATUSCODE_BADINVALIDARGUMENT;
  844. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  845. if(!currentPublishedDataSet)
  846. return UA_STATUSCODE_BADNOTFOUND;
  847. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  848. //deep copy of the actual config
  849. UA_PublishedDataSetConfig_copy(&currentPublishedDataSet->config, &tmpPublishedDataSetConfig);
  850. *config = tmpPublishedDataSetConfig;
  851. return UA_STATUSCODE_GOOD;
  852. }
  853. UA_PublishedDataSet *
  854. UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
  855. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  856. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
  857. return &server->pubSubManager.publishedDataSets[i];
  858. }
  859. }
  860. return NULL;
  861. }
  862. void
  863. UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
  864. //delete pds config
  865. UA_String_deleteMembers(&pdsConfig->name);
  866. switch (pdsConfig->publishedDataSetType){
  867. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  868. //no additional items
  869. break;
  870. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  871. if(pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
  872. for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
  873. UA_PublishedVariableDataType_deleteMembers(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
  874. }
  875. UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
  876. }
  877. UA_DataSetMetaDataType_deleteMembers(&pdsConfig->config.itemsTemplate.metaData);
  878. break;
  879. default:
  880. break;
  881. }
  882. }
  883. void
  884. UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
  885. UA_PublishedDataSetConfig_deleteMembers(&publishedDataSet->config);
  886. //delete PDS
  887. UA_DataSetMetaDataType_deleteMembers(&publishedDataSet->dataSetMetaData);
  888. UA_DataSetField *field, *tmpField;
  889. LIST_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
  890. UA_Server_removeDataSetField(server, field->identifier);
  891. }
  892. UA_NodeId_deleteMembers(&publishedDataSet->identifier);
  893. }
  894. UA_DataSetFieldResult
  895. UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
  896. const UA_DataSetFieldConfig *fieldConfig,
  897. UA_NodeId *fieldIdentifier) {
  898. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  899. UA_DataSetFieldResult result = {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
  900. if(!fieldConfig)
  901. return result;
  902. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
  903. if(currentDataSet == NULL){
  904. result.result = UA_STATUSCODE_BADNOTFOUND;
  905. return result;
  906. }
  907. if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
  908. result.result = UA_STATUSCODE_BADNOTIMPLEMENTED;
  909. return result;
  910. }
  911. UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
  912. if(!newField){
  913. result.result = UA_STATUSCODE_BADINTERNALERROR;
  914. return result;
  915. }
  916. UA_DataSetFieldConfig tmpFieldConfig;
  917. retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
  918. newField->config = tmpFieldConfig;
  919. UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
  920. if(fieldIdentifier != NULL){
  921. UA_NodeId_copy(&newField->identifier, fieldIdentifier);
  922. }
  923. newField->publishedDataSet = currentDataSet->identifier;
  924. //update major version of parent published data set
  925. currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
  926. LIST_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
  927. if(newField->config.field.variable.promotedField)
  928. currentDataSet->promotedFieldsCount++;
  929. currentDataSet->fieldSize++;
  930. result.result = retVal;
  931. result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  932. result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  933. return result;
  934. }
  935. UA_DataSetFieldResult
  936. UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
  937. UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
  938. UA_DataSetFieldResult result = {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  939. if(!currentField)
  940. return result;
  941. UA_PublishedDataSet *parentPublishedDataSet =
  942. UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
  943. if(!parentPublishedDataSet)
  944. return result;
  945. parentPublishedDataSet->fieldSize--;
  946. if(currentField->config.field.variable.promotedField)
  947. parentPublishedDataSet->promotedFieldsCount--;
  948. /* update major version of PublishedDataSet */
  949. parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
  950. UA_PubSubConfigurationVersionTimeDifference();
  951. UA_DataSetField_deleteMembers(currentField);
  952. LIST_REMOVE(currentField, listEntry);
  953. UA_free(currentField);
  954. result.result = UA_STATUSCODE_GOOD;
  955. result.configurationVersion.majorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion;
  956. result.configurationVersion.minorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion;
  957. return result;
  958. }
  959. /**********************************************/
  960. /* DataSetWriter */
  961. /**********************************************/
  962. UA_StatusCode
  963. UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
  964. UA_DataSetWriterConfig *dst){
  965. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  966. memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
  967. retVal |= UA_String_copy(&src->name, &dst->name);
  968. retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
  969. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  970. dst->dataSetWriterProperties = (UA_KeyValuePair *)
  971. UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
  972. if(!dst->dataSetWriterProperties)
  973. return UA_STATUSCODE_BADOUTOFMEMORY;
  974. for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
  975. retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
  976. }
  977. return retVal;
  978. }
  979. UA_StatusCode
  980. UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
  981. UA_DataSetWriterConfig *config){
  982. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  983. if(!config)
  984. return UA_STATUSCODE_BADINVALIDARGUMENT;
  985. UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  986. if(!currentDataSetWriter)
  987. return UA_STATUSCODE_BADNOTFOUND;
  988. UA_DataSetWriterConfig tmpWriterConfig;
  989. //deep copy of the actual config
  990. retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
  991. *config = tmpWriterConfig;
  992. return retVal;
  993. }
  994. UA_DataSetWriter *
  995. UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
  996. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  997. UA_WriterGroup *tmpWriterGroup;
  998. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
  999. UA_DataSetWriter *tmpWriter;
  1000. LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
  1001. if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
  1002. return tmpWriter;
  1003. }
  1004. }
  1005. }
  1006. }
  1007. return NULL;
  1008. }
  1009. void
  1010. UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig) {
  1011. UA_String_deleteMembers(&pdsConfig->name);
  1012. UA_String_deleteMembers(&pdsConfig->dataSetName);
  1013. for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
  1014. UA_KeyValuePair_deleteMembers(&pdsConfig->dataSetWriterProperties[i]);
  1015. }
  1016. UA_free(pdsConfig->dataSetWriterProperties);
  1017. UA_ExtensionObject_deleteMembers(&pdsConfig->messageSettings);
  1018. }
  1019. static void
  1020. UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
  1021. UA_DataSetWriterConfig_deleteMembers(&dataSetWriter->config);
  1022. //delete DataSetWriter
  1023. UA_NodeId_deleteMembers(&dataSetWriter->identifier);
  1024. UA_NodeId_deleteMembers(&dataSetWriter->linkedWriterGroup);
  1025. UA_NodeId_deleteMembers(&dataSetWriter->connectedDataSet);
  1026. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1027. //delete lastSamples store
  1028. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
  1029. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  1030. }
  1031. UA_free(dataSetWriter->lastSamples);
  1032. dataSetWriter->lastSamples = NULL;
  1033. dataSetWriter->lastSamplesCount = 0;
  1034. #endif
  1035. }
  1036. /**********************************************/
  1037. /* WriterGroup */
  1038. /**********************************************/
  1039. UA_StatusCode
  1040. UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
  1041. UA_WriterGroupConfig *dst){
  1042. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1043. memcpy(dst, src, sizeof(UA_WriterGroupConfig));
  1044. retVal |= UA_String_copy(&src->name, &dst->name);
  1045. retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
  1046. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  1047. dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
  1048. if(!dst->groupProperties)
  1049. return UA_STATUSCODE_BADOUTOFMEMORY;
  1050. for(size_t i = 0; i < src->groupPropertiesSize; i++){
  1051. retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
  1052. }
  1053. return retVal;
  1054. }
  1055. UA_StatusCode
  1056. UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
  1057. UA_WriterGroupConfig *config){
  1058. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1059. if(!config)
  1060. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1061. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
  1062. if(!currentWriterGroup){
  1063. return UA_STATUSCODE_BADNOTFOUND;
  1064. }
  1065. UA_WriterGroupConfig tmpWriterGroupConfig;
  1066. //deep copy of the actual config
  1067. retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
  1068. *config = tmpWriterGroupConfig;
  1069. return retVal;
  1070. }
  1071. UA_StatusCode
  1072. UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
  1073. const UA_WriterGroupConfig *config){
  1074. if(!config)
  1075. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1076. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
  1077. if(!currentWriterGroup)
  1078. return UA_STATUSCODE_BADNOTFOUND;
  1079. //The update functionality will be extended during the next PubSub batches.
  1080. //Currently is only a change of the publishing interval possible.
  1081. if(currentWriterGroup->config.maxEncapsulatedDataSetMessageCount != config->maxEncapsulatedDataSetMessageCount){
  1082. currentWriterGroup->config.maxEncapsulatedDataSetMessageCount = config->maxEncapsulatedDataSetMessageCount;
  1083. if(currentWriterGroup->config.messageSettings.encoding == UA_EXTENSIONOBJECT_ENCODED_NOBODY) {
  1084. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1085. "MaxEncapsulatedDataSetMessag need enabled 'PayloadHeader' within the message settings.");
  1086. }
  1087. }
  1088. if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
  1089. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
  1090. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  1091. UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
  1092. }
  1093. if(currentWriterGroup->config.priority != config->priority) {
  1094. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1095. "No or unsupported WriterGroup update.");
  1096. }
  1097. return UA_STATUSCODE_GOOD;
  1098. }
  1099. UA_WriterGroup *
  1100. UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
  1101. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  1102. UA_WriterGroup *tmpWriterGroup;
  1103. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
  1104. if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
  1105. return tmpWriterGroup;
  1106. }
  1107. }
  1108. }
  1109. return NULL;
  1110. }
  1111. void
  1112. UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig){
  1113. //delete writerGroup config
  1114. UA_String_deleteMembers(&writerGroupConfig->name);
  1115. UA_ExtensionObject_deleteMembers(&writerGroupConfig->transportSettings);
  1116. UA_ExtensionObject_deleteMembers(&writerGroupConfig->messageSettings);
  1117. for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
  1118. UA_KeyValuePair_deleteMembers(&writerGroupConfig->groupProperties[i]);
  1119. }
  1120. UA_free(writerGroupConfig->groupProperties);
  1121. }
  1122. static void
  1123. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup) {
  1124. UA_WriterGroupConfig_deleteMembers(&writerGroup->config);
  1125. //delete WriterGroup
  1126. //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
  1127. UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
  1128. LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
  1129. UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
  1130. }
  1131. UA_NodeId_deleteMembers(&writerGroup->linkedConnection);
  1132. UA_NodeId_deleteMembers(&writerGroup->identifier);
  1133. }
  1134. UA_StatusCode
  1135. UA_Server_addDataSetWriter(UA_Server *server,
  1136. const UA_NodeId writerGroup, const UA_NodeId dataSet,
  1137. const UA_DataSetWriterConfig *dataSetWriterConfig,
  1138. UA_NodeId *writerIdentifier) {
  1139. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1140. if(!dataSetWriterConfig)
  1141. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1142. UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
  1143. if(!currentDataSetContext)
  1144. return UA_STATUSCODE_BADNOTFOUND;
  1145. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  1146. if(!wg)
  1147. return UA_STATUSCODE_BADNOTFOUND;
  1148. UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
  1149. if(!newDataSetWriter)
  1150. return UA_STATUSCODE_BADOUTOFMEMORY;
  1151. //copy the config into the new dataSetWriter
  1152. UA_DataSetWriterConfig tmpDataSetWriterConfig;
  1153. retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
  1154. newDataSetWriter->config = tmpDataSetWriterConfig;
  1155. //save the current version of the connected PublishedDataSet
  1156. newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
  1157. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1158. //initialize the queue for the last values
  1159. newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
  1160. UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
  1161. if(!newDataSetWriter->lastSamples) {
  1162. UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
  1163. UA_free(newDataSetWriter);
  1164. return UA_STATUSCODE_BADOUTOFMEMORY;
  1165. }
  1166. newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
  1167. #endif
  1168. //connect PublishedDataSet with DataSetWriter
  1169. newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
  1170. newDataSetWriter->linkedWriterGroup = wg->identifier;
  1171. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
  1172. if(writerIdentifier != NULL)
  1173. UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
  1174. //add the new writer to the group
  1175. LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
  1176. wg->writersCount++;
  1177. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  1178. addDataSetWriterRepresentation(server, newDataSetWriter);
  1179. #endif
  1180. return retVal;
  1181. }
  1182. UA_StatusCode
  1183. UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
  1184. UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  1185. if(!dataSetWriter)
  1186. return UA_STATUSCODE_BADNOTFOUND;
  1187. UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
  1188. if(!linkedWriterGroup)
  1189. return UA_STATUSCODE_BADNOTFOUND;
  1190. linkedWriterGroup->writersCount--;
  1191. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  1192. removeDataSetWriterRepresentation(server, dataSetWriter);
  1193. #endif
  1194. //remove DataSetWriter from group
  1195. UA_DataSetWriter_deleteMembers(server, dataSetWriter);
  1196. LIST_REMOVE(dataSetWriter, listEntry);
  1197. UA_free(dataSetWriter);
  1198. return UA_STATUSCODE_GOOD;
  1199. }
  1200. /**********************************************/
  1201. /* DataSetField */
  1202. /**********************************************/
  1203. UA_StatusCode
  1204. UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
  1205. memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
  1206. if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
  1207. UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
  1208. UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
  1209. &dst->field.variable.publishParameters);
  1210. } else {
  1211. return UA_STATUSCODE_BADNOTSUPPORTED;
  1212. }
  1213. return UA_STATUSCODE_GOOD;
  1214. }
  1215. UA_StatusCode
  1216. UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
  1217. UA_DataSetFieldConfig *config) {
  1218. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  1219. if(!config)
  1220. return UA_STATUSCODE_BADINVALIDARGUMENT;
  1221. UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
  1222. if(!currentDataSetField)
  1223. return UA_STATUSCODE_BADNOTFOUND;
  1224. UA_DataSetFieldConfig tmpFieldConfig;
  1225. //deep copy of the actual config
  1226. retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
  1227. *config = tmpFieldConfig;
  1228. return retVal;
  1229. }
  1230. UA_DataSetField *
  1231. UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
  1232. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  1233. UA_DataSetField *tmpField;
  1234. LIST_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
  1235. if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
  1236. return tmpField;
  1237. }
  1238. }
  1239. }
  1240. return NULL;
  1241. }
  1242. void
  1243. UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig){
  1244. if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
  1245. UA_String_deleteMembers(&dataSetFieldConfig->field.variable.fieldNameAlias);
  1246. UA_PublishedVariableDataType_deleteMembers(&dataSetFieldConfig->field.variable.publishParameters);
  1247. }
  1248. }
  1249. static void
  1250. UA_DataSetField_deleteMembers(UA_DataSetField *field) {
  1251. UA_DataSetFieldConfig_deleteMembers(&field->config);
  1252. //delete DataSetField
  1253. UA_NodeId_deleteMembers(&field->identifier);
  1254. UA_NodeId_deleteMembers(&field->publishedDataSet);
  1255. UA_FieldMetaData_deleteMembers(&field->fieldMetaData);
  1256. }
  1257. /*********************************************************/
  1258. /* PublishValues handling */
  1259. /*********************************************************/
  1260. /**
  1261. * Compare two variants. Internally used for value change detection.
  1262. *
  1263. * @return true if the value has changed
  1264. */
  1265. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1266. static UA_Boolean
  1267. valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
  1268. if(! (oldValue && newValue))
  1269. return false;
  1270. UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
  1271. size_t oldValueEncodingSize, newValueEncodingSize;
  1272. oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1273. newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
  1274. if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
  1275. return false;
  1276. if(oldValueEncodingSize != newValueEncodingSize)
  1277. return true;
  1278. if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
  1279. return false;
  1280. if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
  1281. return false;
  1282. UA_Byte *bufPosOldValue = oldValueEncoding->data;
  1283. const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
  1284. UA_Byte *bufPosNewValue = newValueEncoding->data;
  1285. const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
  1286. if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
  1287. &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1288. return false;
  1289. }
  1290. if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
  1291. &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  1292. return false;
  1293. }
  1294. oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
  1295. newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
  1296. UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
  1297. UA_ByteString_delete(oldValueEncoding);
  1298. UA_ByteString_delete(newValueEncoding);
  1299. return compareResult;
  1300. }
  1301. #endif
  1302. /**
  1303. * Obtain the latest value for a specific DataSetField. This method is currently
  1304. * called inside the DataSetMessage generation process.
  1305. */
  1306. static void
  1307. UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
  1308. UA_DataValue *value) {
  1309. /* Read the value */
  1310. UA_ReadValueId rvid;
  1311. UA_ReadValueId_init(&rvid);
  1312. rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
  1313. rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
  1314. rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
  1315. *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
  1316. }
  1317. static UA_StatusCode
  1318. UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1319. UA_DataSetWriter *dataSetWriter) {
  1320. UA_PublishedDataSet *currentDataSet =
  1321. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1322. if(!currentDataSet)
  1323. return UA_STATUSCODE_BADNOTFOUND;
  1324. /* Prepare DataSetMessageContent */
  1325. dataSetMessage->header.dataSetMessageValid = true;
  1326. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
  1327. dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
  1328. dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
  1329. UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
  1330. if(!dataSetMessage->data.keyFrameData.dataSetFields)
  1331. return UA_STATUSCODE_BADOUTOFMEMORY;
  1332. #ifdef UA_ENABLE_JSON_ENCODING
  1333. /* json: insert fieldnames used as json keys */
  1334. dataSetMessage->data.keyFrameData.fieldNames =
  1335. (UA_String *)UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_STRING]);
  1336. if(!dataSetMessage->data.keyFrameData.fieldNames)
  1337. return UA_STATUSCODE_BADOUTOFMEMORY;
  1338. #endif
  1339. /* Loop over the fields */
  1340. size_t counter = 0;
  1341. UA_DataSetField *dsf;
  1342. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1343. #ifdef UA_ENABLE_JSON_ENCODING
  1344. /* json: store the fieldNameAlias*/
  1345. UA_String_copy(&dsf->config.field.variable.fieldNameAlias,
  1346. &dataSetMessage->data.keyFrameData.fieldNames[counter]);
  1347. #endif
  1348. /* Sample the value */
  1349. UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
  1350. UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
  1351. /* Deactivate statuscode? */
  1352. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1353. dfv->hasStatus = false;
  1354. /* Deactivate timestamps */
  1355. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1356. (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1357. dfv->hasSourceTimestamp = false;
  1358. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1359. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1360. dfv->hasSourcePicoseconds = false;
  1361. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1362. (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1363. dfv->hasServerTimestamp = false;
  1364. if(((u64)dataSetWriter->config.dataSetFieldContentMask &
  1365. (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1366. dfv->hasServerPicoseconds = false;
  1367. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1368. /* Update lastValue store */
  1369. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  1370. UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
  1371. #endif
  1372. counter++;
  1373. }
  1374. return UA_STATUSCODE_GOOD;
  1375. }
  1376. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1377. static UA_StatusCode
  1378. UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
  1379. UA_DataSetMessage *dataSetMessage,
  1380. UA_DataSetWriter *dataSetWriter) {
  1381. UA_PublishedDataSet *currentDataSet =
  1382. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1383. if(!currentDataSet)
  1384. return UA_STATUSCODE_BADNOTFOUND;
  1385. /* Prepare DataSetMessageContent */
  1386. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1387. dataSetMessage->header.dataSetMessageValid = true;
  1388. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
  1389. UA_DataSetField *dsf;
  1390. size_t counter = 0;
  1391. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  1392. /* Sample the value */
  1393. UA_DataValue value;
  1394. UA_DataValue_init(&value);
  1395. UA_PubSubDataSetField_sampleValue(server, dsf, &value);
  1396. /* Check if the value has changed */
  1397. if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
  1398. /* increase fieldCount for current delta message */
  1399. dataSetMessage->data.deltaFrameData.fieldCount++;
  1400. dataSetWriter->lastSamples[counter].valueChanged = true;
  1401. /* Update last stored sample */
  1402. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  1403. dataSetWriter->lastSamples[counter].value = value;
  1404. } else {
  1405. UA_DataValue_deleteMembers(&value);
  1406. dataSetWriter->lastSamples[counter].valueChanged = false;
  1407. }
  1408. counter++;
  1409. }
  1410. /* Allocate DeltaFrameFields */
  1411. UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
  1412. UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
  1413. if(!deltaFields)
  1414. return UA_STATUSCODE_BADOUTOFMEMORY;
  1415. dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
  1416. size_t currentDeltaField = 0;
  1417. for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
  1418. if(!dataSetWriter->lastSamples[i].valueChanged)
  1419. continue;
  1420. UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
  1421. dff->fieldIndex = (UA_UInt16) i;
  1422. UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
  1423. dataSetWriter->lastSamples[i].valueChanged = false;
  1424. /* Deactivate statuscode? */
  1425. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  1426. dff->fieldValue.hasStatus = false;
  1427. /* Deactivate timestamps? */
  1428. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  1429. dff->fieldValue.hasSourceTimestamp = false;
  1430. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  1431. dff->fieldValue.hasServerPicoseconds = false;
  1432. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  1433. dff->fieldValue.hasServerTimestamp = false;
  1434. if(((u64)dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  1435. dff->fieldValue.hasServerPicoseconds = false;
  1436. currentDeltaField++;
  1437. }
  1438. return UA_STATUSCODE_GOOD;
  1439. }
  1440. #endif
  1441. /**
  1442. * Generate a DataSetMessage for the given writer.
  1443. *
  1444. * @param dataSetWriter ptr to corresponding writer
  1445. * @return ptr to generated DataSetMessage
  1446. */
  1447. static UA_StatusCode
  1448. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  1449. UA_DataSetWriter *dataSetWriter) {
  1450. UA_PublishedDataSet *currentDataSet =
  1451. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  1452. if(!currentDataSet)
  1453. return UA_STATUSCODE_BADNOTFOUND;
  1454. /* Reset the message */
  1455. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  1456. /* store messageType to switch between json or uadp (default) */
  1457. UA_UInt16 messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1458. UA_JsonDataSetWriterMessageDataType *jsonDataSetWriterMessageDataType = NULL;
  1459. /* The configuration Flags are included
  1460. * inside the std. defined UA_UadpDataSetWriterMessageDataType */
  1461. UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
  1462. UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
  1463. if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1464. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1465. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1466. &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
  1467. dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
  1468. dataSetWriter->config.messageSettings.content.decoded.data;
  1469. /* type is UADP */
  1470. messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  1471. } else if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  1472. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  1473. (dataSetWriter->config.messageSettings.content.decoded.type ==
  1474. &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE])) {
  1475. jsonDataSetWriterMessageDataType = (UA_JsonDataSetWriterMessageDataType *)
  1476. dataSetWriter->config.messageSettings.content.decoded.data;
  1477. /* type is JSON */
  1478. messageType = UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE;
  1479. } else {
  1480. /* create default flag configuration if no
  1481. * UadpDataSetWriterMessageDataType was passed in */
  1482. memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
  1483. defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
  1484. ((u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
  1485. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
  1486. dataSetWriterMessageDataType = &defaultUadpConfiguration;
  1487. }
  1488. /* Sanity-test the configuration */
  1489. if(dataSetWriterMessageDataType &&
  1490. (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
  1491. dataSetWriterMessageDataType->dataSetOffset != 0 ||
  1492. dataSetWriterMessageDataType->configuredSize != 0)) {
  1493. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1494. "Static DSM configuration not supported. Using defaults");
  1495. dataSetWriterMessageDataType->networkMessageNumber = 0;
  1496. dataSetWriterMessageDataType->dataSetOffset = 0;
  1497. dataSetWriterMessageDataType->configuredSize = 0;
  1498. }
  1499. /* The field encoding depends on the flags inside the writer config.
  1500. * TODO: This can be moved to the encoding layer. */
  1501. if(dataSetWriter->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_RAWDATA
  1502. ) {
  1503. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
  1504. } else if((u64)dataSetWriter->config.dataSetFieldContentMask &
  1505. ((u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
  1506. (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
  1507. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
  1508. } else {
  1509. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
  1510. }
  1511. if(messageType == UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE) {
  1512. /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
  1513. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1514. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION) {
  1515. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1516. dataSetMessage->header.configVersionMajorVersion =
  1517. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1518. }
  1519. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1520. (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION) {
  1521. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1522. dataSetMessage->header.configVersionMinorVersion =
  1523. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1524. }
  1525. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1526. (u64)UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1527. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1528. dataSetMessage->header.dataSetMessageSequenceNr =
  1529. dataSetWriter->actualDataSetMessageSequenceCount;
  1530. }
  1531. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1532. (u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1533. dataSetMessage->header.timestampEnabled = true;
  1534. dataSetMessage->header.timestamp = UA_DateTime_now();
  1535. }
  1536. /* TODO: Picoseconds resolution not supported atm */
  1537. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1538. (u64)UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
  1539. dataSetMessage->header.picoSecondsIncluded = false;
  1540. }
  1541. /* TODO: Statuscode not supported yet */
  1542. if((u64)dataSetWriterMessageDataType->dataSetMessageContentMask &
  1543. (u64)UA_UADPDATASETMESSAGECONTENTMASK_STATUS) {
  1544. dataSetMessage->header.statusEnabled = false;
  1545. }
  1546. } else if(messageType == UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE) {
  1547. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1548. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1549. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  1550. dataSetMessage->header.configVersionMajorVersion =
  1551. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  1552. }
  1553. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1554. (u64)UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION) {
  1555. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  1556. dataSetMessage->header.configVersionMinorVersion =
  1557. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  1558. }
  1559. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1560. (u64)UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  1561. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  1562. dataSetMessage->header.dataSetMessageSequenceNr =
  1563. dataSetWriter->actualDataSetMessageSequenceCount;
  1564. }
  1565. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1566. (u64)UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  1567. dataSetMessage->header.timestampEnabled = true;
  1568. dataSetMessage->header.timestamp = UA_DateTime_now();
  1569. }
  1570. /* TODO: Statuscode not supported yet */
  1571. if((u64)jsonDataSetWriterMessageDataType->dataSetMessageContentMask &
  1572. (u64)UA_JSONDATASETMESSAGECONTENTMASK_STATUS) {
  1573. dataSetMessage->header.statusEnabled = false;
  1574. }
  1575. }
  1576. /* Set the sequence count. Automatically rolls over to zero */
  1577. dataSetWriter->actualDataSetMessageSequenceCount++;
  1578. /* JSON does not differ between deltaframes and keyframes, only keyframes are currently used. */
  1579. if(messageType != UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
  1580. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  1581. /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
  1582. if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
  1583. dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
  1584. /* Remove old samples */
  1585. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
  1586. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  1587. /* Realloc pds dependent memory */
  1588. dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
  1589. UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
  1590. UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1591. if(!newSamplesArray)
  1592. return UA_STATUSCODE_BADOUTOFMEMORY;
  1593. dataSetWriter->lastSamples = newSamplesArray;
  1594. memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  1595. dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
  1596. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1597. dataSetWriter->deltaFrameCounter = 0;
  1598. return UA_STATUSCODE_GOOD;
  1599. }
  1600. /* The standard defines: if a PDS contains only one fields no delta messages
  1601. * should be generated because they need more memory than a keyframe with 1
  1602. * field. */
  1603. if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
  1604. dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
  1605. UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
  1606. dataSetWriter->deltaFrameCounter++;
  1607. return UA_STATUSCODE_GOOD;
  1608. }
  1609. dataSetWriter->deltaFrameCounter = 1;
  1610. #endif
  1611. }
  1612. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  1613. return UA_STATUSCODE_GOOD;
  1614. }
  1615. static UA_StatusCode
  1616. sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
  1617. UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
  1618. UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
  1619. #ifdef UA_ENABLE_JSON_ENCODING
  1620. UA_NetworkMessage nm;
  1621. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1622. nm.version = 1;
  1623. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1624. nm.payloadHeaderEnabled = true;
  1625. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1626. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1627. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1628. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1629. UA_ByteString buf;
  1630. size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true);
  1631. size_t stackSize = 1;
  1632. if(msgSize <= UA_MAX_STACKBUF)
  1633. stackSize = msgSize;
  1634. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1635. buf.data = stackBuf;
  1636. buf.length = msgSize;
  1637. if(msgSize > UA_MAX_STACKBUF) {
  1638. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1639. if(retval != UA_STATUSCODE_GOOD)
  1640. return retval;
  1641. }
  1642. /* Encode the message */
  1643. UA_Byte *bufPos = buf.data;
  1644. memset(bufPos, 0, msgSize);
  1645. const UA_Byte *bufEnd = &buf.data[buf.length];
  1646. retval = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true);
  1647. if(retval != UA_STATUSCODE_GOOD) {
  1648. if(msgSize > UA_MAX_STACKBUF)
  1649. UA_ByteString_deleteMembers(&buf);
  1650. return retval;
  1651. }
  1652. /* Send the prepared messages */
  1653. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1654. if(msgSize > UA_MAX_STACKBUF)
  1655. UA_ByteString_deleteMembers(&buf);
  1656. #endif
  1657. return retval;
  1658. }
  1659. static UA_StatusCode
  1660. sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
  1661. UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
  1662. UA_ExtensionObject *messageSettings,
  1663. UA_ExtensionObject *transportSettings) {
  1664. if(messageSettings->content.decoded.type !=
  1665. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
  1666. return UA_STATUSCODE_BADINTERNALERROR;
  1667. UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
  1668. messageSettings->content.decoded.data;
  1669. UA_NetworkMessage nm;
  1670. memset(&nm, 0, sizeof(UA_NetworkMessage));
  1671. nm.publisherIdEnabled =
  1672. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
  1673. nm.groupHeaderEnabled =
  1674. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
  1675. nm.groupHeader.writerGroupIdEnabled =
  1676. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
  1677. nm.groupHeader.groupVersionEnabled =
  1678. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
  1679. nm.groupHeader.networkMessageNumberEnabled =
  1680. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
  1681. nm.groupHeader.sequenceNumberEnabled =
  1682. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
  1683. nm.payloadHeaderEnabled =
  1684. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
  1685. nm.timestampEnabled =
  1686. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
  1687. nm.picosecondsEnabled =
  1688. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
  1689. nm.dataSetClassIdEnabled =
  1690. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
  1691. nm.promotedFieldsEnabled =
  1692. ((u64)wgm->networkMessageContentMask & (u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
  1693. nm.version = 1;
  1694. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1695. if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
  1696. nm.publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
  1697. nm.publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
  1698. } else if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
  1699. nm.publisherIdType = UA_PUBLISHERDATATYPE_STRING;
  1700. nm.publisherId.publisherIdString = connection->config->publisherId.string;
  1701. }
  1702. /* Compute the length of the dsm separately for the header */
  1703. UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
  1704. for(UA_Byte i = 0; i < dsmCount; i++)
  1705. dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
  1706. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1707. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1708. nm.groupHeader.writerGroupId = wg->config.writerGroupId;
  1709. nm.groupHeader.networkMessageNumber = 1;
  1710. nm.payload.dataSetPayload.sizes = dsmLengths;
  1711. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1712. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1713. UA_ByteString buf;
  1714. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
  1715. size_t stackSize = 1;
  1716. if(msgSize <= UA_MAX_STACKBUF)
  1717. stackSize = msgSize;
  1718. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1719. buf.data = stackBuf;
  1720. buf.length = msgSize;
  1721. UA_StatusCode retval;
  1722. if(msgSize > UA_MAX_STACKBUF) {
  1723. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1724. if(retval != UA_STATUSCODE_GOOD)
  1725. return retval;
  1726. }
  1727. /* Encode the message */
  1728. UA_Byte *bufPos = buf.data;
  1729. memset(bufPos, 0, msgSize);
  1730. const UA_Byte *bufEnd = &buf.data[buf.length];
  1731. retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
  1732. if(retval != UA_STATUSCODE_GOOD) {
  1733. if(msgSize > UA_MAX_STACKBUF)
  1734. UA_ByteString_deleteMembers(&buf);
  1735. return retval;
  1736. }
  1737. /* Send the prepared messages */
  1738. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1739. if(msgSize > UA_MAX_STACKBUF)
  1740. UA_ByteString_deleteMembers(&buf);
  1741. return retval;
  1742. }
  1743. /* This callback triggers the collection and publish of NetworkMessages and the
  1744. * contained DataSetMessages. */
  1745. void
  1746. UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1747. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Publish Callback");
  1748. if(!writerGroup) {
  1749. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1750. "Publish failed. WriterGroup not found");
  1751. return;
  1752. }
  1753. /* Nothing to do? */
  1754. if(writerGroup->writersCount <= 0)
  1755. return;
  1756. /* Binary or Json encoding? */
  1757. if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
  1758. writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
  1759. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1760. "Publish failed: Unknown encoding type.");
  1761. return;
  1762. }
  1763. /* Find the connection associated with the writer */
  1764. UA_PubSubConnection *connection =
  1765. UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
  1766. if(!connection) {
  1767. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1768. "Publish failed. PubSubConnection invalid.");
  1769. return;
  1770. }
  1771. /* How many DSM can be sent in one NM? */
  1772. UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
  1773. if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
  1774. maxDSM = UA_BYTE_MAX;
  1775. /* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
  1776. if(maxDSM == 0)
  1777. maxDSM = 1;
  1778. /* It is possible to put several DataSetMessages into one NetworkMessage.
  1779. * But only if they do not contain promoted fields. NM with only DSM are
  1780. * sent out right away. The others are kept in a buffer for "batching". */
  1781. size_t dsmCount = 0;
  1782. UA_DataSetWriter *dsw;
  1783. UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
  1784. UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
  1785. LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
  1786. /* Find the dataset */
  1787. UA_PublishedDataSet *pds =
  1788. UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
  1789. if(!pds) {
  1790. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1791. "PubSub Publish: PublishedDataSet not found");
  1792. continue;
  1793. }
  1794. /* Generate the DSM */
  1795. UA_StatusCode res =
  1796. UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
  1797. if(res != UA_STATUSCODE_GOOD) {
  1798. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1799. "PubSub Publish: DataSetMessage creation failed");
  1800. continue;
  1801. }
  1802. /* Send right away if there is only this DSM in a NM. If promoted fields
  1803. * are contained in the PublishedDataSet, then this DSM must go into a
  1804. * dedicated NM as well. */
  1805. if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
  1806. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1807. res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
  1808. &dsw->config.dataSetWriterId, 1,
  1809. &writerGroup->config.messageSettings,
  1810. &writerGroup->config.transportSettings);
  1811. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1812. res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
  1813. &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
  1814. }
  1815. if(res != UA_STATUSCODE_GOOD)
  1816. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1817. "PubSub Publish: Could not send a NetworkMessage");
  1818. UA_DataSetMessage_free(&dsmStore[dsmCount]);
  1819. continue;
  1820. }
  1821. dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
  1822. dsmCount++;
  1823. }
  1824. /* Send the NetworkMessages with batched DataSetMessages */
  1825. size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
  1826. for(UA_UInt32 i = 0; i < nmCount; i++) {
  1827. UA_Byte nmDsmCount = maxDSM;
  1828. if(i == nmCount - 1 && (dsmCount % maxDSM))
  1829. nmDsmCount = (UA_Byte)dsmCount % maxDSM;
  1830. UA_StatusCode res3 = UA_STATUSCODE_GOOD;
  1831. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1832. res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
  1833. &dsWriterIds[i * maxDSM], nmDsmCount,
  1834. &writerGroup->config.messageSettings,
  1835. &writerGroup->config.transportSettings);
  1836. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1837. res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
  1838. &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
  1839. }
  1840. if(res3 != UA_STATUSCODE_GOOD)
  1841. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1842. "PubSub Publish: Sending a NetworkMessage failed");
  1843. }
  1844. /* Clean up DSM */
  1845. for(size_t i = 0; i < dsmCount; i++)
  1846. UA_DataSetMessage_free(&dsmStore[i]);
  1847. }
  1848. /* Add new publishCallback. The first execution is triggered directly after
  1849. * creation. */
  1850. UA_StatusCode
  1851. UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1852. UA_StatusCode retval =
  1853. UA_PubSubManager_addRepeatedCallback(server,
  1854. (UA_ServerCallback) UA_WriterGroup_publishCallback,
  1855. writerGroup, writerGroup->config.publishingInterval,
  1856. &writerGroup->publishCallbackId);
  1857. if(retval == UA_STATUSCODE_GOOD)
  1858. writerGroup->publishCallbackIsRegistered = true;
  1859. /* Run once after creation */
  1860. UA_WriterGroup_publishCallback(server, writerGroup);
  1861. return retval;
  1862. }
  1863. /* This callback triggers the collection and reception of NetworkMessages and the
  1864. * contained DataSetMessages. */
  1865. void UA_ReaderGroup_subscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  1866. UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  1867. UA_ByteString buffer;
  1868. if(UA_ByteString_allocBuffer(&buffer, 512) != UA_STATUSCODE_GOOD) {
  1869. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Message buffer alloc failed!");
  1870. return;
  1871. }
  1872. connection->channel->receive(connection->channel, &buffer, NULL, 300000);
  1873. if(buffer.length > 0) {
  1874. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "Message received:");
  1875. UA_NetworkMessage currentNetworkMessage;
  1876. memset(&currentNetworkMessage, 0, sizeof(UA_NetworkMessage));
  1877. size_t currentPosition = 0;
  1878. UA_NetworkMessage_decodeBinary(&buffer, &currentPosition, &currentNetworkMessage);
  1879. UA_Server_processNetworkMessage(server, &currentNetworkMessage, connection);
  1880. UA_NetworkMessage_deleteMembers(&currentNetworkMessage);
  1881. }
  1882. UA_ByteString_deleteMembers(&buffer);
  1883. }
  1884. /* Add new subscribeCallback. The first execution is triggered directly after
  1885. * creation. */
  1886. UA_StatusCode
  1887. UA_ReaderGroup_addSubscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  1888. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  1889. retval |= UA_PubSubManager_addRepeatedCallback(server, (UA_ServerCallback) UA_ReaderGroup_subscribeCallback,
  1890. readerGroup, 5,
  1891. &readerGroup->subscribeCallbackId);
  1892. if(retval == UA_STATUSCODE_GOOD) {
  1893. readerGroup->subscribeCallbackIsRegistered = true;
  1894. }
  1895. /* Run once after creation */
  1896. UA_ReaderGroup_subscribeCallback(server, readerGroup);
  1897. return retval;
  1898. }
  1899. #endif /* UA_ENABLE_PUBSUB */