ua_pubsub_reader.c 29 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. * Copyright (c) 2019 Kalycito Infotech Private Limited
  8. */
  9. #include <open62541/server_pubsub.h>
  10. #include "server/ua_server_internal.h"
  11. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  12. #include "ua_pubsub.h"
  13. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  14. #include "ua_pubsub_ns0.h"
  15. #endif
  16. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  17. #include "ua_types_encoding_binary.h"
  18. #endif
  19. #define UA_MAX_SIZENAME 64 /* Max size of Qualified Name of Subscribed Variable */
  20. /***************/
  21. /* ReaderGroup */
  22. /***************/
  23. UA_StatusCode
  24. UA_Server_addReaderGroup(UA_Server *server, UA_NodeId connectionIdentifier,
  25. const UA_ReaderGroupConfig *readerGroupConfig,
  26. UA_NodeId *readerGroupIdentifier) {
  27. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  28. UA_ReaderGroupConfig tmpReaderGroupConfig;
  29. /* Check for valid readergroup configuration */
  30. if(!readerGroupConfig) {
  31. return UA_STATUSCODE_BADINVALIDARGUMENT;
  32. }
  33. /* Search the connection by the given connectionIdentifier */
  34. UA_PubSubConnection *currentConnectionContext =
  35. UA_PubSubConnection_findConnectionbyId(server, connectionIdentifier);
  36. if(!currentConnectionContext) {
  37. return UA_STATUSCODE_BADNOTFOUND;
  38. }
  39. /* Allocate memory for new reader group */
  40. UA_ReaderGroup *newGroup = (UA_ReaderGroup *)UA_calloc(1, sizeof(UA_ReaderGroup));
  41. if(!newGroup) {
  42. return UA_STATUSCODE_BADOUTOFMEMORY;
  43. }
  44. /* Generate nodeid for the readergroup identifier */
  45. newGroup->linkedConnection = currentConnectionContext->identifier;
  46. UA_PubSubManager_generateUniqueNodeId(server, &newGroup->identifier);
  47. if(readerGroupIdentifier) {
  48. UA_NodeId_copy(&newGroup->identifier, readerGroupIdentifier);
  49. }
  50. /* Deep copy of the config */
  51. retval |= UA_ReaderGroupConfig_copy(readerGroupConfig, &tmpReaderGroupConfig);
  52. newGroup->config = tmpReaderGroupConfig;
  53. retval |= UA_ReaderGroup_addSubscribeCallback(server, newGroup);
  54. LIST_INSERT_HEAD(&currentConnectionContext->readerGroups, newGroup, listEntry);
  55. currentConnectionContext->readerGroupsSize++;
  56. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  57. addReaderGroupRepresentation(server, newGroup);
  58. #endif
  59. return retval;
  60. }
  61. UA_StatusCode
  62. UA_Server_removeReaderGroup(UA_Server *server, UA_NodeId groupIdentifier) {
  63. UA_ReaderGroup* readerGroup = UA_ReaderGroup_findRGbyId(server, groupIdentifier);
  64. if(readerGroup == NULL) {
  65. return UA_STATUSCODE_BADNOTFOUND;
  66. }
  67. /* Search the connection to which the given readergroup is connected to */
  68. UA_PubSubConnection *connection =
  69. UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  70. if(connection == NULL) {
  71. return UA_STATUSCODE_BADNOTFOUND;
  72. }
  73. /* Unregister subscribe callback */
  74. UA_PubSubManager_removeRepeatedPubSubCallback(server, readerGroup->subscribeCallbackId);
  75. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  76. /* To Do:RemoveGroupRepresentation(server, &readerGroup->identifier) */
  77. #endif
  78. /* UA_Server_ReaderGroup_delete also removes itself from the list */
  79. UA_Server_ReaderGroup_delete(server, readerGroup);
  80. /* Remove readerGroup from Connection */
  81. LIST_REMOVE(readerGroup, listEntry);
  82. UA_free(readerGroup);
  83. return UA_STATUSCODE_GOOD;
  84. }
  85. /* TODO: Implement
  86. UA_StatusCode
  87. UA_Server_ReaderGroup_updateConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  88. const UA_ReaderGroupConfig *config) {
  89. return UA_STATUSCODE_BADNOTIMPLEMENTED;
  90. }
  91. */
  92. UA_StatusCode
  93. UA_Server_ReaderGroup_getConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  94. UA_ReaderGroupConfig *config) {
  95. if(!config) {
  96. return UA_STATUSCODE_BADINVALIDARGUMENT;
  97. }
  98. /* Identify the readergroup through the readerGroupIdentifier */
  99. UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  100. if(!currentReaderGroup) {
  101. return UA_STATUSCODE_BADNOTFOUND;
  102. }
  103. UA_ReaderGroupConfig tmpReaderGroupConfig;
  104. /* deep copy of the actual config */
  105. UA_ReaderGroupConfig_copy(&currentReaderGroup->config, &tmpReaderGroupConfig);
  106. *config = tmpReaderGroupConfig;
  107. return UA_STATUSCODE_GOOD;
  108. }
  109. void
  110. UA_Server_ReaderGroup_delete(UA_Server* server, UA_ReaderGroup *readerGroup) {
  111. /* To Do Call UA_ReaderGroupConfig_delete */
  112. UA_DataSetReader *dataSetReader, *tmpDataSetReader;
  113. LIST_FOREACH_SAFE(dataSetReader, &readerGroup->readers, listEntry, tmpDataSetReader) {
  114. UA_DataSetReader_delete(server, dataSetReader);
  115. }
  116. UA_PubSubConnection* pConn =
  117. UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  118. if(pConn != NULL) {
  119. pConn->readerGroupsSize--;
  120. }
  121. /* Delete ReaderGroup and its members */
  122. UA_String_deleteMembers(&readerGroup->config.name);
  123. UA_NodeId_deleteMembers(&readerGroup->linkedConnection);
  124. UA_NodeId_deleteMembers(&readerGroup->identifier);
  125. }
  126. UA_StatusCode
  127. UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,
  128. UA_ReaderGroupConfig *dst) {
  129. /* Currently simple memcpy only */
  130. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  131. UA_String_copy(&src->name, &dst->name);
  132. return UA_STATUSCODE_GOOD;
  133. }
  134. static UA_StatusCode
  135. checkReaderIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_DataSetReader *reader) {
  136. if(!pMsg->groupHeaderEnabled &&
  137. !pMsg->groupHeader.writerGroupIdEnabled &&
  138. !pMsg->payloadHeaderEnabled) {
  139. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  140. "Cannot process DataSetReader without WriterGroup"
  141. "and DataSetWriter identifiers");
  142. return UA_STATUSCODE_BADNOTIMPLEMENTED;
  143. }
  144. if((reader->config.writerGroupId == pMsg->groupHeader.writerGroupId) &&
  145. (reader->config.dataSetWriterId == *pMsg->payloadHeader.dataSetPayloadHeader.dataSetWriterIds)) {
  146. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
  147. "DataSetReader found. Process NetworkMessage");
  148. return UA_STATUSCODE_GOOD;
  149. }
  150. return UA_STATUSCODE_BADNOTFOUND;
  151. }
  152. static UA_StatusCode
  153. getReaderFromIdentifier(UA_Server *server, UA_NetworkMessage *pMsg,
  154. UA_DataSetReader **dataSetReader, UA_PubSubConnection *pConnection) {
  155. UA_StatusCode retval = UA_STATUSCODE_BADNOTFOUND;
  156. if(!pMsg->publisherIdEnabled) {
  157. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  158. "Cannot process DataSetReader without PublisherId");
  159. return UA_STATUSCODE_BADNOTIMPLEMENTED; /* TODO: Handle DSR without PublisherId */
  160. }
  161. UA_ReaderGroup* readerGroup;
  162. LIST_FOREACH(readerGroup, &pConnection->readerGroups, listEntry) {
  163. UA_DataSetReader *tmpReader;
  164. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  165. switch (pMsg->publisherIdType) {
  166. case UA_PUBLISHERDATATYPE_BYTE:
  167. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_BYTE] &&
  168. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_BYTE &&
  169. pMsg->publisherId.publisherIdByte == *(UA_Byte*)tmpReader->config.publisherId.data) {
  170. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  171. }
  172. break;
  173. case UA_PUBLISHERDATATYPE_UINT16:
  174. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT16] &&
  175. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT16 &&
  176. pMsg->publisherId.publisherIdUInt16 == *(UA_UInt16*) tmpReader->config.publisherId.data) {
  177. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  178. }
  179. break;
  180. case UA_PUBLISHERDATATYPE_UINT32:
  181. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT32] &&
  182. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT32 &&
  183. pMsg->publisherId.publisherIdUInt32 == *(UA_UInt32*)tmpReader->config.publisherId.data) {
  184. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  185. }
  186. break;
  187. case UA_PUBLISHERDATATYPE_UINT64:
  188. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT64] &&
  189. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT64 &&
  190. pMsg->publisherId.publisherIdUInt64 == *(UA_UInt64*)tmpReader->config.publisherId.data) {
  191. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  192. }
  193. break;
  194. case UA_PUBLISHERDATATYPE_STRING:
  195. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_STRING] &&
  196. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_STRING &&
  197. UA_String_equal(&pMsg->publisherId.publisherIdString,
  198. (UA_String*)tmpReader->config.publisherId.data)) {
  199. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  200. }
  201. break;
  202. default:
  203. return UA_STATUSCODE_BADINTERNALERROR;
  204. }
  205. if(retval == UA_STATUSCODE_GOOD) {
  206. *dataSetReader = tmpReader;
  207. return UA_STATUSCODE_GOOD;
  208. }
  209. }
  210. }
  211. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  212. "Dataset reader not found. Check PublisherID, WriterGroupID and DatasetWriterID");
  213. return UA_STATUSCODE_BADNOTFOUND;
  214. }
  215. UA_ReaderGroup *
  216. UA_ReaderGroup_findRGbyId(UA_Server *server, UA_NodeId identifier) {
  217. for(size_t iteratorConn = 0; iteratorConn < server->pubSubManager.connectionsSize; iteratorConn++) {
  218. UA_ReaderGroup* readerGroup = NULL;
  219. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[iteratorConn].readerGroups, listEntry) {
  220. if(UA_NodeId_equal(&identifier, &readerGroup->identifier)) {
  221. return readerGroup;
  222. }
  223. }
  224. }
  225. return NULL;
  226. }
  227. UA_DataSetReader *UA_ReaderGroup_findDSRbyId(UA_Server *server, UA_NodeId identifier) {
  228. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++) {
  229. UA_ReaderGroup* readerGroup = NULL;
  230. LIST_FOREACH(readerGroup, &server->pubSubManager.connections[i].readerGroups, listEntry) {
  231. UA_DataSetReader *tmpReader;
  232. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  233. if(UA_NodeId_equal(&tmpReader->identifier, &identifier)) {
  234. return tmpReader;
  235. }
  236. }
  237. }
  238. }
  239. return NULL;
  240. }
  241. /* This callback triggers the collection and reception of NetworkMessages and the
  242. * contained DataSetMessages. */
  243. void UA_ReaderGroup_subscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  244. UA_PubSubConnection *connection =
  245. UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  246. UA_ByteString buffer;
  247. if(UA_ByteString_allocBuffer(&buffer, 512) != UA_STATUSCODE_GOOD) {
  248. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Message buffer alloc failed!");
  249. return;
  250. }
  251. connection->channel->receive(connection->channel, &buffer, NULL, 1000);
  252. if(buffer.length > 0) {
  253. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "Message received:");
  254. UA_NetworkMessage currentNetworkMessage;
  255. memset(&currentNetworkMessage, 0, sizeof(UA_NetworkMessage));
  256. size_t currentPosition = 0;
  257. UA_NetworkMessage_decodeBinary(&buffer, &currentPosition, &currentNetworkMessage);
  258. UA_Server_processNetworkMessage(server, &currentNetworkMessage, connection);
  259. UA_NetworkMessage_deleteMembers(&currentNetworkMessage);
  260. }
  261. UA_ByteString_deleteMembers(&buffer);
  262. }
  263. /* Add new subscribeCallback. The first execution is triggered directly after
  264. * creation. */
  265. UA_StatusCode
  266. UA_ReaderGroup_addSubscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  267. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  268. retval |= UA_PubSubManager_addRepeatedCallback(server,
  269. (UA_ServerCallback) UA_ReaderGroup_subscribeCallback,
  270. readerGroup, 5, &readerGroup->subscribeCallbackId);
  271. if(retval == UA_STATUSCODE_GOOD) {
  272. readerGroup->subscribeCallbackIsRegistered = true;
  273. }
  274. /* Run once after creation */
  275. UA_ReaderGroup_subscribeCallback(server, readerGroup);
  276. return retval;
  277. }
  278. /**********/
  279. /* Reader */
  280. /**********/
  281. UA_StatusCode
  282. UA_Server_addDataSetReader(UA_Server *server, UA_NodeId readerGroupIdentifier,
  283. const UA_DataSetReaderConfig *dataSetReaderConfig,
  284. UA_NodeId *readerIdentifier) {
  285. /* Search the reader group by the given readerGroupIdentifier */
  286. UA_ReaderGroup *readerGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  287. if(!dataSetReaderConfig) {
  288. return UA_STATUSCODE_BADNOTFOUND;
  289. }
  290. if(readerGroup == NULL) {
  291. return UA_STATUSCODE_BADNOTFOUND;
  292. }
  293. /* Allocate memory for new DataSetReader */
  294. UA_DataSetReader *newDataSetReader = (UA_DataSetReader *)UA_calloc(1, sizeof(UA_DataSetReader));
  295. /* Copy the config into the new dataSetReader */
  296. UA_DataSetReaderConfig_copy(dataSetReaderConfig, &newDataSetReader->config);
  297. newDataSetReader->linkedReaderGroup = readerGroup->identifier;
  298. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetReader->identifier);
  299. if(readerIdentifier != NULL) {
  300. UA_NodeId_copy(&newDataSetReader->identifier, readerIdentifier);
  301. }
  302. /* Add the new reader to the group */
  303. LIST_INSERT_HEAD(&readerGroup->readers, newDataSetReader, listEntry);
  304. readerGroup->readersCount++;
  305. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  306. addDataSetReaderRepresentation(server, newDataSetReader);
  307. #endif
  308. return UA_STATUSCODE_GOOD;
  309. }
  310. UA_StatusCode
  311. UA_Server_removeDataSetReader(UA_Server *server, UA_NodeId readerIdentifier) {
  312. /* Remove datasetreader given by the identifier */
  313. UA_DataSetReader *dataSetReader = UA_ReaderGroup_findDSRbyId(server, readerIdentifier);
  314. if(!dataSetReader) {
  315. return UA_STATUSCODE_BADNOTFOUND;
  316. }
  317. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  318. removeDataSetReaderRepresentation(server, dataSetReader);
  319. #endif
  320. UA_DataSetReader_delete(server, dataSetReader);
  321. return UA_STATUSCODE_GOOD;
  322. }
  323. UA_StatusCode
  324. UA_Server_DataSetReader_updateConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier,
  325. UA_NodeId readerGroupIdentifier,
  326. const UA_DataSetReaderConfig *config) {
  327. if(config == NULL) {
  328. return UA_STATUSCODE_BADINVALIDARGUMENT;
  329. }
  330. UA_DataSetReader *currentDataSetReader =
  331. UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  332. UA_ReaderGroup *currentReaderGroup =
  333. UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  334. if(!currentDataSetReader) {
  335. return UA_STATUSCODE_BADNOTFOUND;
  336. }
  337. /* The update functionality will be extended during the next PubSub batches.
  338. * Currently is only a change of the publishing interval possible. */
  339. if(currentDataSetReader->config.writerGroupId != config->writerGroupId) {
  340. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentReaderGroup->subscribeCallbackId);
  341. currentDataSetReader->config.writerGroupId = config->writerGroupId;
  342. UA_ReaderGroup_subscribeCallback(server, currentReaderGroup);
  343. }
  344. else {
  345. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  346. "No or unsupported ReaderGroup update.");
  347. }
  348. return UA_STATUSCODE_GOOD;
  349. }
  350. UA_StatusCode
  351. UA_Server_DataSetReader_getConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier,
  352. UA_DataSetReaderConfig *config) {
  353. if(!config) {
  354. return UA_STATUSCODE_BADINVALIDARGUMENT;
  355. }
  356. UA_DataSetReader *currentDataSetReader =
  357. UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  358. if(!currentDataSetReader) {
  359. return UA_STATUSCODE_BADNOTFOUND;
  360. }
  361. UA_DataSetReaderConfig tmpReaderConfig;
  362. /* Deep copy of the actual config */
  363. UA_DataSetReaderConfig_copy(&currentDataSetReader->config, &tmpReaderConfig);
  364. *config = tmpReaderConfig;
  365. return UA_STATUSCODE_GOOD;
  366. }
  367. UA_StatusCode
  368. UA_DataSetReaderConfig_copy(const UA_DataSetReaderConfig *src,
  369. UA_DataSetReaderConfig *dst) {
  370. memset(dst, 0, sizeof(UA_DataSetReaderConfig));
  371. UA_StatusCode retVal = UA_String_copy(&src->name, &dst->name);
  372. if(retVal != UA_STATUSCODE_GOOD) {
  373. return retVal;
  374. }
  375. retVal = UA_Variant_copy(&src->publisherId, &dst->publisherId);
  376. if(retVal != UA_STATUSCODE_GOOD) {
  377. return retVal;
  378. }
  379. dst->writerGroupId = src->writerGroupId;
  380. dst->dataSetWriterId = src->dataSetWriterId;
  381. retVal = UA_DataSetMetaDataType_copy(&src->dataSetMetaData, &dst->dataSetMetaData);
  382. if(retVal != UA_STATUSCODE_GOOD) {
  383. return retVal;
  384. }
  385. dst->dataSetFieldContentMask = src->dataSetFieldContentMask;
  386. dst->messageReceiveTimeout = src->messageReceiveTimeout;
  387. /* Currently memcpy is used to copy the securityParameters */
  388. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  389. retVal = UA_UadpDataSetReaderMessageDataType_copy(&src->messageSettings, &dst->messageSettings);
  390. if(retVal != UA_STATUSCODE_GOOD) {
  391. return retVal;
  392. }
  393. return UA_STATUSCODE_GOOD;
  394. }
  395. /* This Method is used to initially set the SubscribedDataSet to
  396. * TargetVariablesType and to create the list of target Variables of a
  397. * SubscribedDataSetType. */
  398. UA_StatusCode
  399. UA_Server_DataSetReader_createTargetVariables(UA_Server *server,
  400. UA_NodeId dataSetReaderIdentifier,
  401. UA_TargetVariablesDataType *targetVariables) {
  402. UA_StatusCode retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
  403. UA_DataSetReader* pDS = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  404. if(pDS == NULL) {
  405. return UA_STATUSCODE_BADINVALIDARGUMENT;
  406. }
  407. if(pDS->subscribedDataSetTarget.targetVariablesSize > 0) {
  408. UA_TargetVariablesDataType_deleteMembers(&pDS->subscribedDataSetTarget);
  409. pDS->subscribedDataSetTarget.targetVariablesSize = 0;
  410. pDS->subscribedDataSetTarget.targetVariables = NULL;
  411. }
  412. /* Set subscribed dataset to TargetVariableType */
  413. pDS->subscribedDataSetType = UA_PUBSUB_SDS_TARGET;
  414. retval = UA_TargetVariablesDataType_copy(targetVariables, &pDS->subscribedDataSetTarget);
  415. return retval;
  416. }
  417. /* Adds Subscribed Variables from the DataSetMetaData for the given DataSet into
  418. * the given parent node and creates the corresponding data in the
  419. * targetVariables of the DataSetReader */
  420. UA_StatusCode
  421. UA_Server_DataSetReader_addTargetVariables(UA_Server *server, UA_NodeId *parentNode,
  422. UA_NodeId dataSetReaderIdentifier,
  423. UA_SubscribedDataSetEnumType sdsType) {
  424. if((server == NULL) || (parentNode == NULL)) {
  425. return UA_STATUSCODE_BADINVALIDARGUMENT;
  426. }
  427. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  428. UA_DataSetReader* pDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  429. if(pDataSetReader == NULL) {
  430. return UA_STATUSCODE_BADINVALIDARGUMENT;
  431. }
  432. UA_TargetVariablesDataType targetVars;
  433. targetVars.targetVariablesSize = pDataSetReader->config.dataSetMetaData.fieldsSize;
  434. targetVars.targetVariables = (UA_FieldTargetDataType*)
  435. UA_calloc(targetVars.targetVariablesSize, sizeof(UA_FieldTargetDataType));
  436. for (size_t i = 0; i < pDataSetReader->config.dataSetMetaData.fieldsSize; i++) {
  437. UA_VariableAttributes vAttr = UA_VariableAttributes_default;
  438. vAttr.valueRank = pDataSetReader->config.dataSetMetaData.fields[i].valueRank;
  439. if(pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensionsSize > 0) {
  440. retval = UA_Array_copy(pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensions,
  441. pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensionsSize,
  442. (void**)&vAttr.arrayDimensions, &UA_TYPES[UA_TYPES_UINT32]);
  443. if(retval == UA_STATUSCODE_GOOD) {
  444. vAttr.arrayDimensionsSize =
  445. pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensionsSize;
  446. }
  447. }
  448. vAttr.dataType = pDataSetReader->config.dataSetMetaData.fields[i].dataType;
  449. vAttr.accessLevel = UA_ACCESSLEVELMASK_READ;
  450. UA_LocalizedText_copy(&pDataSetReader->config.dataSetMetaData.fields[i].description,
  451. &vAttr.description);
  452. UA_QualifiedName qn;
  453. UA_QualifiedName_init(&qn);
  454. char szTmpName[UA_MAX_SIZENAME];
  455. if(pDataSetReader->config.dataSetMetaData.fields[i].name.length > 0) {
  456. UA_UInt16 slen = UA_MAX_SIZENAME -1;
  457. vAttr.displayName.locale = UA_STRING("en-US");
  458. vAttr.displayName.text = pDataSetReader->config.dataSetMetaData.fields[i].name;
  459. if(pDataSetReader->config.dataSetMetaData.fields[i].name.length < slen) {
  460. slen = (UA_UInt16)pDataSetReader->config.dataSetMetaData.fields[i].name.length;
  461. UA_snprintf(szTmpName, sizeof(szTmpName), "%.*s", (int)slen,
  462. (const char*)pDataSetReader->config.dataSetMetaData.fields[i].name.data);
  463. }
  464. szTmpName[slen] = '\0';
  465. qn = UA_QUALIFIEDNAME(1, szTmpName);
  466. }
  467. else {
  468. strcpy(szTmpName, "SubscribedVariable");
  469. vAttr.displayName = UA_LOCALIZEDTEXT("en-US", szTmpName);
  470. qn = UA_QUALIFIEDNAME(1, "SubscribedVariable");
  471. }
  472. /* Add variable to the given parent node */
  473. UA_NodeId newNode;
  474. retval = UA_Server_addVariableNode(server, UA_NODEID_NULL, *parentNode,
  475. UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), qn,
  476. UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
  477. vAttr, NULL, &newNode);
  478. if(retval == UA_STATUSCODE_GOOD) {
  479. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND,
  480. "addVariableNode %s succeeded", szTmpName);
  481. }
  482. else {
  483. UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_USERLAND,
  484. "addVariableNode: error 0x%x", retval);
  485. }
  486. UA_FieldTargetDataType_init(&targetVars.targetVariables[i]);
  487. targetVars.targetVariables[i].attributeId = UA_ATTRIBUTEID_VALUE;
  488. UA_NodeId_copy(&newNode, &targetVars.targetVariables[i].targetNodeId);
  489. UA_NodeId_deleteMembers(&newNode);
  490. if(vAttr.arrayDimensionsSize > 0) {
  491. UA_Array_delete(vAttr.arrayDimensions, vAttr.arrayDimensionsSize,
  492. &UA_TYPES[UA_TYPES_UINT32]);
  493. }
  494. }
  495. if(sdsType == UA_PUBSUB_SDS_TARGET) {
  496. retval = UA_Server_DataSetReader_createTargetVariables(server, pDataSetReader->identifier,
  497. &targetVars);
  498. }
  499. UA_TargetVariablesDataType_deleteMembers(&targetVars);
  500. return retval;
  501. }
  502. void
  503. UA_Server_DataSetReader_process(UA_Server *server, UA_DataSetReader *dataSetReader,
  504. UA_DataSetMessage* dataSetMsg) {
  505. if((dataSetReader == NULL) || (dataSetMsg == NULL) || (server == NULL)) {
  506. return;
  507. }
  508. if(!dataSetMsg->header.dataSetMessageValid) {
  509. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  510. "DataSetMessage is discarded: message is not valid");
  511. /* To Do check ConfigurationVersion*/
  512. /*if(dataSetMsg->header.configVersionMajorVersionEnabled)
  513. * {
  514. * if(dataSetMsg->header.configVersionMajorVersion != dataSetReader->config.dataSetMetaData.configurationVersion.majorVersion)
  515. * {
  516. * UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: ConfigurationVersion MajorVersion does not match");
  517. * return;
  518. * }
  519. } */
  520. return;
  521. }
  522. if(dataSetMsg->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) {
  523. if(dataSetMsg->header.fieldEncoding != UA_FIELDENCODING_RAWDATA) {
  524. size_t anzFields = dataSetMsg->data.keyFrameData.fieldCount;
  525. if(dataSetReader->config.dataSetMetaData.fieldsSize < anzFields) {
  526. anzFields = dataSetReader->config.dataSetMetaData.fieldsSize;
  527. }
  528. if(dataSetReader->subscribedDataSetTarget.targetVariablesSize < anzFields) {
  529. anzFields = dataSetReader->subscribedDataSetTarget.targetVariablesSize;
  530. }
  531. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  532. for(UA_UInt16 i = 0; i < anzFields; i++) {
  533. if(dataSetMsg->data.keyFrameData.dataSetFields[i].hasValue) {
  534. if(dataSetReader->subscribedDataSetTarget.targetVariables[i].attributeId == UA_ATTRIBUTEID_VALUE) {
  535. retVal = UA_Server_writeValue(server, dataSetReader->subscribedDataSetTarget.targetVariables[i].targetNodeId, dataSetMsg->data.keyFrameData.dataSetFields[i].value);
  536. if(retVal != UA_STATUSCODE_GOOD) {
  537. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write Value KF %u: 0x%x", i, retVal);
  538. }
  539. }
  540. else {
  541. UA_WriteValue writeVal;
  542. UA_WriteValue_init(&writeVal);
  543. writeVal.attributeId = dataSetReader->subscribedDataSetTarget.targetVariables[i].attributeId;
  544. writeVal.indexRange = dataSetReader->subscribedDataSetTarget.targetVariables[i].receiverIndexRange;
  545. writeVal.nodeId = dataSetReader->subscribedDataSetTarget.targetVariables[i].targetNodeId;
  546. UA_DataValue_copy(&dataSetMsg->data.keyFrameData.dataSetFields[i], &writeVal.value);
  547. retVal = UA_Server_write(server, &writeVal);
  548. if(retVal != UA_STATUSCODE_GOOD) {
  549. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write KF %u: 0x%x", i, retVal);
  550. }
  551. }
  552. }
  553. }
  554. }
  555. }
  556. }
  557. void UA_DataSetReader_delete(UA_Server *server, UA_DataSetReader *dataSetReader) {
  558. /* Delete DataSetReader config */
  559. UA_String_deleteMembers(&dataSetReader->config.name);
  560. UA_Variant_deleteMembers(&dataSetReader->config.publisherId);
  561. UA_DataSetMetaDataType_deleteMembers(&dataSetReader->config.dataSetMetaData);
  562. UA_UadpDataSetReaderMessageDataType_deleteMembers(&dataSetReader->config.messageSettings);
  563. UA_TargetVariablesDataType_deleteMembers(&dataSetReader->subscribedDataSetTarget);
  564. /* Delete DataSetReader */
  565. UA_ReaderGroup* pGroup = UA_ReaderGroup_findRGbyId(server, dataSetReader->linkedReaderGroup);
  566. if(pGroup != NULL) {
  567. pGroup->readersCount--;
  568. }
  569. UA_NodeId_deleteMembers(&dataSetReader->identifier);
  570. UA_NodeId_deleteMembers(&dataSetReader->linkedReaderGroup);
  571. /* Remove DataSetReader from group */
  572. LIST_REMOVE(dataSetReader, listEntry);
  573. /* Free memory allocated for DataSetReader */
  574. UA_free(dataSetReader);
  575. }
  576. UA_StatusCode
  577. UA_Server_processNetworkMessage(UA_Server *server, UA_NetworkMessage *pMsg,
  578. UA_PubSubConnection *pConnection) {
  579. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  580. if(!pMsg || !pConnection)
  581. return UA_STATUSCODE_BADINVALIDARGUMENT;
  582. /* To Do Handle multiple DataSetMessage for one NetworkMessage */
  583. /* To Do The condition pMsg->dataSetClassIdEnabled
  584. * Here some filtering is possible */
  585. UA_DataSetReader *dataSetReader;
  586. retval = getReaderFromIdentifier(server, pMsg, &dataSetReader, pConnection);
  587. if(retval != UA_STATUSCODE_GOOD) {
  588. return retval;
  589. }
  590. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
  591. "DataSetReader found with PublisherId");
  592. UA_Byte anzDataSets = 1;
  593. if(pMsg->payloadHeaderEnabled)
  594. anzDataSets = pMsg->payloadHeader.dataSetPayloadHeader.count;
  595. for(UA_Byte iterator = 0; iterator < anzDataSets; iterator++) {
  596. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Process Msg with DataSetReader!");
  597. UA_Server_DataSetReader_process(server, dataSetReader,
  598. &pMsg->payload.dataSetPayload.dataSetMessages[iterator]);
  599. }
  600. /* To Do Handle when dataSetReader parameters are null for publisherId
  601. * and zero for WriterGroupId and DataSetWriterId */
  602. return UA_STATUSCODE_GOOD;
  603. }
  604. #endif /* UA_ENABLE_PUBSUB */