ua_pubsub_reader.c 29 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. * Copyright (c) 2019 Kalycito Infotech Private Limited
  8. */
  9. #include <open62541/server_pubsub.h>
  10. #include "server/ua_server_internal.h"
  11. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  12. #include "ua_pubsub.h"
  13. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  14. #include "ua_pubsub_ns0.h"
  15. #endif
  16. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  17. #include "ua_types_encoding_binary.h"
  18. #endif
  19. #define UA_MAX_SIZENAME 64 /* Max size of Qualified Name of Subscribed Variable */
  20. /***************/
  21. /* ReaderGroup */
  22. /***************/
  23. UA_StatusCode
  24. UA_Server_addReaderGroup(UA_Server *server, UA_NodeId connectionIdentifier,
  25. const UA_ReaderGroupConfig *readerGroupConfig,
  26. UA_NodeId *readerGroupIdentifier) {
  27. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  28. UA_ReaderGroupConfig tmpReaderGroupConfig;
  29. /* Check for valid readergroup configuration */
  30. if(!readerGroupConfig) {
  31. return UA_STATUSCODE_BADINVALIDARGUMENT;
  32. }
  33. /* Search the connection by the given connectionIdentifier */
  34. UA_PubSubConnection *currentConnectionContext =
  35. UA_PubSubConnection_findConnectionbyId(server, connectionIdentifier);
  36. if(!currentConnectionContext) {
  37. return UA_STATUSCODE_BADNOTFOUND;
  38. }
  39. /* Allocate memory for new reader group */
  40. UA_ReaderGroup *newGroup = (UA_ReaderGroup *)UA_calloc(1, sizeof(UA_ReaderGroup));
  41. if(!newGroup) {
  42. return UA_STATUSCODE_BADOUTOFMEMORY;
  43. }
  44. /* Generate nodeid for the readergroup identifier */
  45. newGroup->linkedConnection = currentConnectionContext->identifier;
  46. UA_PubSubManager_generateUniqueNodeId(server, &newGroup->identifier);
  47. if(readerGroupIdentifier) {
  48. UA_NodeId_copy(&newGroup->identifier, readerGroupIdentifier);
  49. }
  50. /* Deep copy of the config */
  51. retval |= UA_ReaderGroupConfig_copy(readerGroupConfig, &tmpReaderGroupConfig);
  52. newGroup->config = tmpReaderGroupConfig;
  53. retval |= UA_ReaderGroup_addSubscribeCallback(server, newGroup);
  54. LIST_INSERT_HEAD(&currentConnectionContext->readerGroups, newGroup, listEntry);
  55. currentConnectionContext->readerGroupsSize++;
  56. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  57. addReaderGroupRepresentation(server, newGroup);
  58. #endif
  59. return retval;
  60. }
  61. UA_StatusCode
  62. UA_Server_removeReaderGroup(UA_Server *server, UA_NodeId groupIdentifier) {
  63. UA_ReaderGroup* readerGroup = UA_ReaderGroup_findRGbyId(server, groupIdentifier);
  64. if(readerGroup == NULL) {
  65. return UA_STATUSCODE_BADNOTFOUND;
  66. }
  67. /* Search the connection to which the given readergroup is connected to */
  68. UA_PubSubConnection *connection =
  69. UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  70. if(connection == NULL) {
  71. return UA_STATUSCODE_BADNOTFOUND;
  72. }
  73. /* Unregister subscribe callback */
  74. UA_PubSubManager_removeRepeatedPubSubCallback(server, readerGroup->subscribeCallbackId);
  75. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  76. /* To Do:RemoveGroupRepresentation(server, &readerGroup->identifier) */
  77. #endif
  78. /* UA_Server_ReaderGroup_delete also removes itself from the list */
  79. UA_Server_ReaderGroup_delete(server, readerGroup);
  80. /* Remove readerGroup from Connection */
  81. LIST_REMOVE(readerGroup, listEntry);
  82. UA_free(readerGroup);
  83. return UA_STATUSCODE_GOOD;
  84. }
  85. /* TODO: Implement
  86. UA_StatusCode
  87. UA_Server_ReaderGroup_updateConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  88. const UA_ReaderGroupConfig *config) {
  89. return UA_STATUSCODE_BADNOTIMPLEMENTED;
  90. }
  91. */
  92. UA_StatusCode
  93. UA_Server_ReaderGroup_getConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  94. UA_ReaderGroupConfig *config) {
  95. if(!config) {
  96. return UA_STATUSCODE_BADINVALIDARGUMENT;
  97. }
  98. /* Identify the readergroup through the readerGroupIdentifier */
  99. UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  100. if(!currentReaderGroup) {
  101. return UA_STATUSCODE_BADNOTFOUND;
  102. }
  103. UA_ReaderGroupConfig tmpReaderGroupConfig;
  104. /* deep copy of the actual config */
  105. UA_ReaderGroupConfig_copy(&currentReaderGroup->config, &tmpReaderGroupConfig);
  106. *config = tmpReaderGroupConfig;
  107. return UA_STATUSCODE_GOOD;
  108. }
  109. void
  110. UA_Server_ReaderGroup_delete(UA_Server* server, UA_ReaderGroup *readerGroup) {
  111. /* To Do Call UA_ReaderGroupConfig_delete */
  112. UA_DataSetReader *dataSetReader, *tmpDataSetReader;
  113. LIST_FOREACH_SAFE(dataSetReader, &readerGroup->readers, listEntry, tmpDataSetReader) {
  114. UA_DataSetReader_delete(server, dataSetReader);
  115. }
  116. UA_PubSubConnection* pConn =
  117. UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  118. if(pConn != NULL) {
  119. pConn->readerGroupsSize--;
  120. }
  121. /* Delete ReaderGroup and its members */
  122. UA_String_deleteMembers(&readerGroup->config.name);
  123. UA_NodeId_deleteMembers(&readerGroup->linkedConnection);
  124. UA_NodeId_deleteMembers(&readerGroup->identifier);
  125. }
  126. UA_StatusCode
  127. UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,
  128. UA_ReaderGroupConfig *dst) {
  129. /* Currently simple memcpy only */
  130. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  131. UA_String_copy(&src->name, &dst->name);
  132. return UA_STATUSCODE_GOOD;
  133. }
  134. static UA_StatusCode
  135. checkReaderIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_DataSetReader *reader) {
  136. if(!pMsg->groupHeaderEnabled &&
  137. !pMsg->groupHeader.writerGroupIdEnabled &&
  138. !pMsg->payloadHeaderEnabled) {
  139. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  140. "Cannot process DataSetReader without WriterGroup"
  141. "and DataSetWriter identifiers");
  142. return UA_STATUSCODE_BADNOTIMPLEMENTED;
  143. }
  144. if((reader->config.writerGroupId == pMsg->groupHeader.writerGroupId) &&
  145. (reader->config.dataSetWriterId == *pMsg->payloadHeader.dataSetPayloadHeader.dataSetWriterIds)) {
  146. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
  147. "DataSetReader found. Process NetworkMessage");
  148. return UA_STATUSCODE_GOOD;
  149. }
  150. return UA_STATUSCODE_BADNOTFOUND;
  151. }
  152. static UA_StatusCode
  153. getReaderFromIdentifier(UA_Server *server, UA_NetworkMessage *pMsg,
  154. UA_DataSetReader **dataSetReader, UA_PubSubConnection *pConnection) {
  155. UA_StatusCode retval = UA_STATUSCODE_BADNOTFOUND;
  156. if(!pMsg->publisherIdEnabled) {
  157. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  158. "Cannot process DataSetReader without PublisherId");
  159. return UA_STATUSCODE_BADNOTIMPLEMENTED; /* TODO: Handle DSR without PublisherId */
  160. }
  161. UA_ReaderGroup* readerGroup;
  162. LIST_FOREACH(readerGroup, &pConnection->readerGroups, listEntry) {
  163. UA_DataSetReader *tmpReader;
  164. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  165. switch (pMsg->publisherIdType) {
  166. case UA_PUBLISHERDATATYPE_BYTE:
  167. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_BYTE] &&
  168. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_BYTE &&
  169. pMsg->publisherId.publisherIdByte == *(UA_Byte*)tmpReader->config.publisherId.data) {
  170. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  171. }
  172. break;
  173. case UA_PUBLISHERDATATYPE_UINT16:
  174. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT16] &&
  175. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT16 &&
  176. pMsg->publisherId.publisherIdUInt16 == *(UA_UInt16*) tmpReader->config.publisherId.data) {
  177. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  178. }
  179. break;
  180. case UA_PUBLISHERDATATYPE_UINT32:
  181. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT32] &&
  182. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT32 &&
  183. pMsg->publisherId.publisherIdUInt32 == *(UA_UInt32*)tmpReader->config.publisherId.data) {
  184. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  185. }
  186. break;
  187. case UA_PUBLISHERDATATYPE_UINT64:
  188. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT64] &&
  189. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT64 &&
  190. pMsg->publisherId.publisherIdUInt64 == *(UA_UInt64*)tmpReader->config.publisherId.data) {
  191. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  192. }
  193. break;
  194. case UA_PUBLISHERDATATYPE_STRING:
  195. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_STRING] &&
  196. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_STRING &&
  197. UA_String_equal(&pMsg->publisherId.publisherIdString,
  198. (UA_String*)tmpReader->config.publisherId.data)) {
  199. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  200. }
  201. break;
  202. default:
  203. return UA_STATUSCODE_BADINTERNALERROR;
  204. }
  205. if(retval == UA_STATUSCODE_GOOD) {
  206. *dataSetReader = tmpReader;
  207. return UA_STATUSCODE_GOOD;
  208. }
  209. }
  210. }
  211. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  212. "Dataset reader not found. Check PublisherID, WriterGroupID and DatasetWriterID");
  213. return UA_STATUSCODE_BADNOTFOUND;
  214. }
  215. UA_ReaderGroup *
  216. UA_ReaderGroup_findRGbyId(UA_Server *server, UA_NodeId identifier) {
  217. for(size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
  218. UA_ReaderGroup* readerGroup = NULL;
  219. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
  220. if(UA_NodeId_equal(&identifier, &readerGroup->identifier)) {
  221. return readerGroup;
  222. }
  223. }
  224. }
  225. return NULL;
  226. }
  227. UA_DataSetReader *UA_ReaderGroup_findDSRbyId(UA_Server *server, UA_NodeId identifier) {
  228. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++) {
  229. UA_ReaderGroup* readerGroup = NULL;
  230. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[i].readerGroups, listEntry) {
  231. UA_DataSetReader *tmpReader;
  232. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  233. if(UA_NodeId_equal(&tmpReader->identifier, &identifier)) {
  234. return tmpReader;
  235. }
  236. }
  237. }
  238. }
  239. return NULL;
  240. }
  241. /* This callback triggers the collection and reception of NetworkMessages and the
  242. * contained DataSetMessages. */
  243. void UA_ReaderGroup_subscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  244. UA_PubSubConnection *connection =
  245. UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  246. UA_ByteString buffer;
  247. if(UA_ByteString_allocBuffer(&buffer, 512) != UA_STATUSCODE_GOOD) {
  248. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Message buffer alloc failed!");
  249. return;
  250. }
  251. // TFR: changed the timeout from 1000 (1ms) to 1 (1us).
  252. // TFR: blocking the OPC UA Server to wait for a UDP PubSub message reduces performance and should be avoided
  253. connection->channel->receive(connection->channel, &buffer, NULL, 1);
  254. if(buffer.length > 0) {
  255. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "Message received:");
  256. UA_NetworkMessage currentNetworkMessage;
  257. memset(&currentNetworkMessage, 0, sizeof(UA_NetworkMessage));
  258. size_t currentPosition = 0;
  259. UA_NetworkMessage_decodeBinary(&buffer, &currentPosition, &currentNetworkMessage);
  260. UA_Server_processNetworkMessage(server, &currentNetworkMessage, connection);
  261. UA_NetworkMessage_deleteMembers(&currentNetworkMessage);
  262. }
  263. UA_ByteString_deleteMembers(&buffer);
  264. }
  265. /* Add new subscribeCallback. The first execution is triggered directly after
  266. * creation. */
  267. UA_StatusCode
  268. UA_ReaderGroup_addSubscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  269. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  270. retval |= UA_PubSubManager_addRepeatedCallback(server,
  271. (UA_ServerCallback) UA_ReaderGroup_subscribeCallback,
  272. readerGroup, 5, &readerGroup->subscribeCallbackId);
  273. if(retval == UA_STATUSCODE_GOOD) {
  274. readerGroup->subscribeCallbackIsRegistered = true;
  275. }
  276. /* Run once after creation */
  277. UA_ReaderGroup_subscribeCallback(server, readerGroup);
  278. return retval;
  279. }
  280. /**********/
  281. /* Reader */
  282. /**********/
  283. UA_StatusCode
  284. UA_Server_addDataSetReader(UA_Server *server, UA_NodeId readerGroupIdentifier,
  285. const UA_DataSetReaderConfig *dataSetReaderConfig,
  286. UA_NodeId *readerIdentifier) {
  287. /* Search the reader group by the given readerGroupIdentifier */
  288. UA_ReaderGroup *readerGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  289. if(!dataSetReaderConfig) {
  290. return UA_STATUSCODE_BADNOTFOUND;
  291. }
  292. if(readerGroup == NULL) {
  293. return UA_STATUSCODE_BADNOTFOUND;
  294. }
  295. /* Allocate memory for new DataSetReader */
  296. UA_DataSetReader *newDataSetReader = (UA_DataSetReader *)UA_calloc(1, sizeof(UA_DataSetReader));
  297. /* Copy the config into the new dataSetReader */
  298. UA_DataSetReaderConfig_copy(dataSetReaderConfig, &newDataSetReader->config);
  299. newDataSetReader->linkedReaderGroup = readerGroup->identifier;
  300. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetReader->identifier);
  301. if(readerIdentifier != NULL) {
  302. UA_NodeId_copy(&newDataSetReader->identifier, readerIdentifier);
  303. }
  304. /* Add the new reader to the group */
  305. LIST_INSERT_HEAD(&readerGroup->readers, newDataSetReader, listEntry);
  306. readerGroup->readersCount++;
  307. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  308. addDataSetReaderRepresentation(server, newDataSetReader);
  309. #endif
  310. return UA_STATUSCODE_GOOD;
  311. }
  312. UA_StatusCode
  313. UA_Server_removeDataSetReader(UA_Server *server, UA_NodeId readerIdentifier) {
  314. /* Remove datasetreader given by the identifier */
  315. UA_DataSetReader *dataSetReader = UA_ReaderGroup_findDSRbyId(server, readerIdentifier);
  316. if(!dataSetReader) {
  317. return UA_STATUSCODE_BADNOTFOUND;
  318. }
  319. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  320. removeDataSetReaderRepresentation(server, dataSetReader);
  321. #endif
  322. UA_DataSetReader_delete(server, dataSetReader);
  323. return UA_STATUSCODE_GOOD;
  324. }
  325. UA_StatusCode
  326. UA_Server_DataSetReader_updateConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier,
  327. UA_NodeId readerGroupIdentifier,
  328. const UA_DataSetReaderConfig *config) {
  329. if(config == NULL) {
  330. return UA_STATUSCODE_BADINVALIDARGUMENT;
  331. }
  332. UA_DataSetReader *currentDataSetReader =
  333. UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  334. UA_ReaderGroup *currentReaderGroup =
  335. UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  336. if(!currentDataSetReader) {
  337. return UA_STATUSCODE_BADNOTFOUND;
  338. }
  339. /* The update functionality will be extended during the next PubSub batches.
  340. * Currently is only a change of the publishing interval possible. */
  341. if(currentDataSetReader->config.writerGroupId != config->writerGroupId) {
  342. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentReaderGroup->subscribeCallbackId);
  343. currentDataSetReader->config.writerGroupId = config->writerGroupId;
  344. UA_ReaderGroup_subscribeCallback(server, currentReaderGroup);
  345. }
  346. else {
  347. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  348. "No or unsupported ReaderGroup update.");
  349. }
  350. return UA_STATUSCODE_GOOD;
  351. }
  352. UA_StatusCode
  353. UA_Server_DataSetReader_getConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier,
  354. UA_DataSetReaderConfig *config) {
  355. if(!config) {
  356. return UA_STATUSCODE_BADINVALIDARGUMENT;
  357. }
  358. UA_DataSetReader *currentDataSetReader =
  359. UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  360. if(!currentDataSetReader) {
  361. return UA_STATUSCODE_BADNOTFOUND;
  362. }
  363. UA_DataSetReaderConfig tmpReaderConfig;
  364. /* Deep copy of the actual config */
  365. UA_DataSetReaderConfig_copy(&currentDataSetReader->config, &tmpReaderConfig);
  366. *config = tmpReaderConfig;
  367. return UA_STATUSCODE_GOOD;
  368. }
  369. UA_StatusCode
  370. UA_DataSetReaderConfig_copy(const UA_DataSetReaderConfig *src,
  371. UA_DataSetReaderConfig *dst) {
  372. memset(dst, 0, sizeof(UA_DataSetReaderConfig));
  373. UA_StatusCode retVal = UA_String_copy(&src->name, &dst->name);
  374. if(retVal != UA_STATUSCODE_GOOD) {
  375. return retVal;
  376. }
  377. retVal = UA_Variant_copy(&src->publisherId, &dst->publisherId);
  378. if(retVal != UA_STATUSCODE_GOOD) {
  379. return retVal;
  380. }
  381. dst->writerGroupId = src->writerGroupId;
  382. dst->dataSetWriterId = src->dataSetWriterId;
  383. retVal = UA_DataSetMetaDataType_copy(&src->dataSetMetaData, &dst->dataSetMetaData);
  384. if(retVal != UA_STATUSCODE_GOOD) {
  385. return retVal;
  386. }
  387. dst->dataSetFieldContentMask = src->dataSetFieldContentMask;
  388. dst->messageReceiveTimeout = src->messageReceiveTimeout;
  389. /* Currently memcpy is used to copy the securityParameters */
  390. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  391. retVal = UA_UadpDataSetReaderMessageDataType_copy(&src->messageSettings, &dst->messageSettings);
  392. if(retVal != UA_STATUSCODE_GOOD) {
  393. return retVal;
  394. }
  395. return UA_STATUSCODE_GOOD;
  396. }
  397. /* This Method is used to initially set the SubscribedDataSet to
  398. * TargetVariablesType and to create the list of target Variables of a
  399. * SubscribedDataSetType. */
  400. UA_StatusCode
  401. UA_Server_DataSetReader_createTargetVariables(UA_Server *server,
  402. UA_NodeId dataSetReaderIdentifier,
  403. UA_TargetVariablesDataType *targetVariables) {
  404. UA_StatusCode retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
  405. UA_DataSetReader* pDS = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  406. if(pDS == NULL) {
  407. return UA_STATUSCODE_BADINVALIDARGUMENT;
  408. }
  409. if(pDS->subscribedDataSetTarget.targetVariablesSize > 0) {
  410. UA_TargetVariablesDataType_deleteMembers(&pDS->subscribedDataSetTarget);
  411. pDS->subscribedDataSetTarget.targetVariablesSize = 0;
  412. pDS->subscribedDataSetTarget.targetVariables = NULL;
  413. }
  414. /* Set subscribed dataset to TargetVariableType */
  415. pDS->subscribedDataSetType = UA_PUBSUB_SDS_TARGET;
  416. retval = UA_TargetVariablesDataType_copy(targetVariables, &pDS->subscribedDataSetTarget);
  417. return retval;
  418. }
  419. /* Adds Subscribed Variables from the DataSetMetaData for the given DataSet into
  420. * the given parent node and creates the corresponding data in the
  421. * targetVariables of the DataSetReader */
  422. UA_StatusCode
  423. UA_Server_DataSetReader_addTargetVariables(UA_Server *server, UA_NodeId *parentNode,
  424. UA_NodeId dataSetReaderIdentifier,
  425. UA_SubscribedDataSetEnumType sdsType) {
  426. if((server == NULL) || (parentNode == NULL)) {
  427. return UA_STATUSCODE_BADINVALIDARGUMENT;
  428. }
  429. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  430. UA_DataSetReader* pDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  431. if(pDataSetReader == NULL) {
  432. return UA_STATUSCODE_BADINVALIDARGUMENT;
  433. }
  434. UA_TargetVariablesDataType targetVars;
  435. targetVars.targetVariablesSize = pDataSetReader->config.dataSetMetaData.fieldsSize;
  436. targetVars.targetVariables = (UA_FieldTargetDataType*)
  437. UA_calloc(targetVars.targetVariablesSize, sizeof(UA_FieldTargetDataType));
  438. for (size_t i = 0; i < pDataSetReader->config.dataSetMetaData.fieldsSize; i++) {
  439. UA_VariableAttributes vAttr = UA_VariableAttributes_default;
  440. vAttr.valueRank = pDataSetReader->config.dataSetMetaData.fields[i].valueRank;
  441. if(pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensionsSize > 0) {
  442. retval = UA_Array_copy(pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensions,
  443. pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensionsSize,
  444. (void**)&vAttr.arrayDimensions, &UA_TYPES[UA_TYPES_UINT32]);
  445. if(retval == UA_STATUSCODE_GOOD) {
  446. vAttr.arrayDimensionsSize =
  447. pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensionsSize;
  448. }
  449. }
  450. vAttr.dataType = pDataSetReader->config.dataSetMetaData.fields[i].dataType;
  451. vAttr.accessLevel = UA_ACCESSLEVELMASK_READ;
  452. UA_LocalizedText_copy(&pDataSetReader->config.dataSetMetaData.fields[i].description,
  453. &vAttr.description);
  454. UA_QualifiedName qn;
  455. UA_QualifiedName_init(&qn);
  456. char szTmpName[UA_MAX_SIZENAME];
  457. if(pDataSetReader->config.dataSetMetaData.fields[i].name.length > 0) {
  458. UA_UInt16 slen = UA_MAX_SIZENAME -1;
  459. vAttr.displayName.locale = UA_STRING("en-US");
  460. vAttr.displayName.text = pDataSetReader->config.dataSetMetaData.fields[i].name;
  461. if(pDataSetReader->config.dataSetMetaData.fields[i].name.length < slen) {
  462. slen = (UA_UInt16)pDataSetReader->config.dataSetMetaData.fields[i].name.length;
  463. UA_snprintf(szTmpName, sizeof(szTmpName), "%.*s", (int)slen,
  464. (const char*)pDataSetReader->config.dataSetMetaData.fields[i].name.data);
  465. }
  466. szTmpName[slen] = '\0';
  467. qn = UA_QUALIFIEDNAME(1, szTmpName);
  468. }
  469. else {
  470. strcpy(szTmpName, "SubscribedVariable");
  471. vAttr.displayName = UA_LOCALIZEDTEXT("en-US", szTmpName);
  472. qn = UA_QUALIFIEDNAME(1, "SubscribedVariable");
  473. }
  474. /* Add variable to the given parent node */
  475. UA_NodeId newNode;
  476. retval = UA_Server_addVariableNode(server, UA_NODEID_NULL, *parentNode,
  477. UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), qn,
  478. UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
  479. vAttr, NULL, &newNode);
  480. if(retval == UA_STATUSCODE_GOOD) {
  481. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND,
  482. "addVariableNode %s succeeded", szTmpName);
  483. }
  484. else {
  485. UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_USERLAND,
  486. "addVariableNode: error 0x%x", retval);
  487. }
  488. UA_FieldTargetDataType_init(&targetVars.targetVariables[i]);
  489. targetVars.targetVariables[i].attributeId = UA_ATTRIBUTEID_VALUE;
  490. UA_NodeId_copy(&newNode, &targetVars.targetVariables[i].targetNodeId);
  491. UA_NodeId_deleteMembers(&newNode);
  492. if(vAttr.arrayDimensionsSize > 0) {
  493. UA_Array_delete(vAttr.arrayDimensions, vAttr.arrayDimensionsSize,
  494. &UA_TYPES[UA_TYPES_UINT32]);
  495. }
  496. }
  497. if(sdsType == UA_PUBSUB_SDS_TARGET) {
  498. retval = UA_Server_DataSetReader_createTargetVariables(server, pDataSetReader->identifier,
  499. &targetVars);
  500. }
  501. UA_TargetVariablesDataType_deleteMembers(&targetVars);
  502. return retval;
  503. }
  504. void
  505. UA_Server_DataSetReader_process(UA_Server *server, UA_DataSetReader *dataSetReader,
  506. UA_DataSetMessage* dataSetMsg) {
  507. if((dataSetReader == NULL) || (dataSetMsg == NULL) || (server == NULL)) {
  508. return;
  509. }
  510. if(!dataSetMsg->header.dataSetMessageValid) {
  511. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  512. "DataSetMessage is discarded: message is not valid");
  513. /* To Do check ConfigurationVersion*/
  514. /*if(dataSetMsg->header.configVersionMajorVersionEnabled)
  515. * {
  516. * if(dataSetMsg->header.configVersionMajorVersion != dataSetReader->config.dataSetMetaData.configurationVersion.majorVersion)
  517. * {
  518. * UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: ConfigurationVersion MajorVersion does not match");
  519. * return;
  520. * }
  521. } */
  522. return;
  523. }
  524. if(dataSetMsg->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) {
  525. if(dataSetMsg->header.fieldEncoding != UA_FIELDENCODING_RAWDATA) {
  526. size_t anzFields = dataSetMsg->data.keyFrameData.fieldCount;
  527. if(dataSetReader->config.dataSetMetaData.fieldsSize < anzFields) {
  528. anzFields = dataSetReader->config.dataSetMetaData.fieldsSize;
  529. }
  530. if(dataSetReader->subscribedDataSetTarget.targetVariablesSize < anzFields) {
  531. anzFields = dataSetReader->subscribedDataSetTarget.targetVariablesSize;
  532. }
  533. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  534. for(UA_UInt16 i = 0; i < anzFields; i++) {
  535. if(dataSetMsg->data.keyFrameData.dataSetFields[i].hasValue) {
  536. if(dataSetReader->subscribedDataSetTarget.targetVariables[i].attributeId == UA_ATTRIBUTEID_VALUE) {
  537. retVal = UA_Server_writeValue(server, dataSetReader->subscribedDataSetTarget.targetVariables[i].targetNodeId, dataSetMsg->data.keyFrameData.dataSetFields[i].value);
  538. if(retVal != UA_STATUSCODE_GOOD) {
  539. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write Value KF %u: 0x%x", i, retVal);
  540. }
  541. }
  542. else {
  543. UA_WriteValue writeVal;
  544. UA_WriteValue_init(&writeVal);
  545. writeVal.attributeId = dataSetReader->subscribedDataSetTarget.targetVariables[i].attributeId;
  546. writeVal.indexRange = dataSetReader->subscribedDataSetTarget.targetVariables[i].receiverIndexRange;
  547. writeVal.nodeId = dataSetReader->subscribedDataSetTarget.targetVariables[i].targetNodeId;
  548. UA_DataValue_copy(&dataSetMsg->data.keyFrameData.dataSetFields[i], &writeVal.value);
  549. retVal = UA_Server_write(server, &writeVal);
  550. if(retVal != UA_STATUSCODE_GOOD) {
  551. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write KF %u: 0x%x", i, retVal);
  552. }
  553. }
  554. }
  555. }
  556. }
  557. }
  558. }
  559. void UA_DataSetReader_delete(UA_Server *server, UA_DataSetReader *dataSetReader) {
  560. /* Delete DataSetReader config */
  561. UA_String_deleteMembers(&dataSetReader->config.name);
  562. UA_Variant_deleteMembers(&dataSetReader->config.publisherId);
  563. UA_DataSetMetaDataType_deleteMembers(&dataSetReader->config.dataSetMetaData);
  564. UA_UadpDataSetReaderMessageDataType_deleteMembers(&dataSetReader->config.messageSettings);
  565. UA_TargetVariablesDataType_deleteMembers(&dataSetReader->subscribedDataSetTarget);
  566. /* Delete DataSetReader */
  567. UA_ReaderGroup* pGroup = UA_ReaderGroup_findRGbyId(server, dataSetReader->linkedReaderGroup);
  568. if(pGroup != NULL) {
  569. pGroup->readersCount--;
  570. }
  571. UA_NodeId_deleteMembers(&dataSetReader->identifier);
  572. UA_NodeId_deleteMembers(&dataSetReader->linkedReaderGroup);
  573. /* Remove DataSetReader from group */
  574. LIST_REMOVE(dataSetReader, listEntry);
  575. /* Free memory allocated for DataSetReader */
  576. UA_free(dataSetReader);
  577. }
  578. UA_StatusCode
  579. UA_Server_processNetworkMessage(UA_Server *server, UA_NetworkMessage *pMsg,
  580. UA_PubSubConnection *pConnection) {
  581. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  582. if(!pMsg || !pConnection)
  583. return UA_STATUSCODE_BADINVALIDARGUMENT;
  584. /* To Do Handle multiple DataSetMessage for one NetworkMessage */
  585. /* To Do The condition pMsg->dataSetClassIdEnabled
  586. * Here some filtering is possible */
  587. UA_DataSetReader *dataSetReader;
  588. retval = getReaderFromIdentifier(server, pMsg, &dataSetReader, pConnection);
  589. if(retval != UA_STATUSCODE_GOOD) {
  590. return retval;
  591. }
  592. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
  593. "DataSetReader found with PublisherId");
  594. UA_Byte anzDataSets = 1;
  595. if(pMsg->payloadHeaderEnabled)
  596. anzDataSets = pMsg->payloadHeader.dataSetPayloadHeader.count;
  597. for(UA_Byte iterator = 0; iterator < anzDataSets; iterator++) {
  598. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Process Msg with DataSetReader!");
  599. UA_Server_DataSetReader_process(server, dataSetReader,
  600. &pMsg->payload.dataSetPayload.dataSetMessages[iterator]);
  601. }
  602. /* To Do Handle when dataSetReader parameters are null for publisherId
  603. * and zero for WriterGroupId and DataSetWriterId */
  604. return UA_STATUSCODE_GOOD;
  605. }
  606. #endif /* UA_ENABLE_PUBSUB */