ua_pubsub_reader.c 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. * Copyright (c) 2019 Kalycito Infotech Private Limited
  8. */
  9. #include <open62541/server_pubsub.h>
  10. #include "server/ua_server_internal.h"
  11. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  12. #include "ua_pubsub.h"
  13. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  14. #include "ua_pubsub_ns0.h"
  15. #endif
  16. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  17. #include "ua_types_encoding_binary.h"
  18. #endif
  19. #define UA_MAX_SIZENAME 64 /* Max size of Qualified Name of Subscribed Variable */
  20. /***************/
  21. /* ReaderGroup */
  22. /***************/
  23. UA_StatusCode
  24. UA_Server_addReaderGroup(UA_Server *server, UA_NodeId connectionIdentifier,
  25. const UA_ReaderGroupConfig *readerGroupConfig,
  26. UA_NodeId *readerGroupIdentifier) {
  27. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  28. UA_ReaderGroupConfig tmpReaderGroupConfig;
  29. /* Check for valid readergroup configuration */
  30. if(!readerGroupConfig) {
  31. return UA_STATUSCODE_BADINVALIDARGUMENT;
  32. }
  33. /* Search the connection by the given connectionIdentifier */
  34. UA_PubSubConnection *currentConnectionContext =
  35. UA_PubSubConnection_findConnectionbyId(server, connectionIdentifier);
  36. if(!currentConnectionContext) {
  37. return UA_STATUSCODE_BADNOTFOUND;
  38. }
  39. /* Allocate memory for new reader group */
  40. UA_ReaderGroup *newGroup = (UA_ReaderGroup *)UA_calloc(1, sizeof(UA_ReaderGroup));
  41. if(!newGroup) {
  42. return UA_STATUSCODE_BADOUTOFMEMORY;
  43. }
  44. /* Generate nodeid for the readergroup identifier */
  45. newGroup->linkedConnection = currentConnectionContext->identifier;
  46. UA_PubSubManager_generateUniqueNodeId(server, &newGroup->identifier);
  47. if(readerGroupIdentifier) {
  48. UA_NodeId_copy(&newGroup->identifier, readerGroupIdentifier);
  49. }
  50. /* Deep copy of the config */
  51. retval |= UA_ReaderGroupConfig_copy(readerGroupConfig, &tmpReaderGroupConfig);
  52. newGroup->config = tmpReaderGroupConfig;
  53. retval |= UA_ReaderGroup_addSubscribeCallback(server, newGroup);
  54. LIST_INSERT_HEAD(&currentConnectionContext->readerGroups, newGroup, listEntry);
  55. currentConnectionContext->readerGroupsSize++;
  56. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  57. addReaderGroupRepresentation(server, newGroup);
  58. #endif
  59. return retval;
  60. }
  61. UA_StatusCode
  62. UA_Server_removeReaderGroup(UA_Server *server, UA_NodeId groupIdentifier) {
  63. UA_ReaderGroup* readerGroup = UA_ReaderGroup_findRGbyId(server, groupIdentifier);
  64. if(readerGroup == NULL) {
  65. return UA_STATUSCODE_BADNOTFOUND;
  66. }
  67. /* Search the connection to which the given readergroup is connected to */
  68. UA_PubSubConnection *connection =
  69. UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  70. if(connection == NULL) {
  71. return UA_STATUSCODE_BADNOTFOUND;
  72. }
  73. /* Unregister subscribe callback */
  74. UA_PubSubManager_removeRepeatedPubSubCallback(server, readerGroup->subscribeCallbackId);
  75. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  76. /* To Do:RemoveGroupRepresentation(server, &readerGroup->identifier) */
  77. #endif
  78. /* UA_Server_ReaderGroup_delete also removes itself from the list */
  79. UA_Server_ReaderGroup_delete(server, readerGroup);
  80. /* Remove readerGroup from Connection */
  81. LIST_REMOVE(readerGroup, listEntry);
  82. UA_free(readerGroup);
  83. return UA_STATUSCODE_GOOD;
  84. }
  85. /* TODO: Implement
  86. UA_StatusCode
  87. UA_Server_ReaderGroup_updateConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  88. const UA_ReaderGroupConfig *config) {
  89. return UA_STATUSCODE_BADNOTIMPLEMENTED;
  90. }
  91. */
  92. UA_StatusCode
  93. UA_Server_ReaderGroup_getConfig(UA_Server *server, UA_NodeId readerGroupIdentifier,
  94. UA_ReaderGroupConfig *config) {
  95. if(!config) {
  96. return UA_STATUSCODE_BADINVALIDARGUMENT;
  97. }
  98. /* Identify the readergroup through the readerGroupIdentifier */
  99. UA_ReaderGroup *currentReaderGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  100. if(!currentReaderGroup) {
  101. return UA_STATUSCODE_BADNOTFOUND;
  102. }
  103. UA_ReaderGroupConfig tmpReaderGroupConfig;
  104. /* deep copy of the actual config */
  105. UA_ReaderGroupConfig_copy(&currentReaderGroup->config, &tmpReaderGroupConfig);
  106. *config = tmpReaderGroupConfig;
  107. return UA_STATUSCODE_GOOD;
  108. }
  109. void
  110. UA_Server_ReaderGroup_delete(UA_Server* server, UA_ReaderGroup *readerGroup) {
  111. /* To Do Call UA_ReaderGroupConfig_delete */
  112. UA_DataSetReader *dataSetReader, *tmpDataSetReader;
  113. LIST_FOREACH_SAFE(dataSetReader, &readerGroup->readers, listEntry, tmpDataSetReader) {
  114. UA_DataSetReader_delete(server, dataSetReader);
  115. }
  116. UA_PubSubConnection* pConn =
  117. UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  118. if(pConn != NULL) {
  119. pConn->readerGroupsSize--;
  120. }
  121. /* Delete ReaderGroup and its members */
  122. UA_String_deleteMembers(&readerGroup->config.name);
  123. UA_NodeId_deleteMembers(&readerGroup->linkedConnection);
  124. UA_NodeId_deleteMembers(&readerGroup->identifier);
  125. }
  126. UA_StatusCode
  127. UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,
  128. UA_ReaderGroupConfig *dst) {
  129. /* Currently simple memcpy only */
  130. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  131. UA_String_copy(&src->name, &dst->name);
  132. return UA_STATUSCODE_GOOD;
  133. }
  134. static UA_StatusCode
  135. checkReaderIdentifier(UA_Server *server, UA_NetworkMessage *pMsg, UA_DataSetReader *reader) {
  136. if(!pMsg->groupHeaderEnabled &&
  137. !pMsg->groupHeader.writerGroupIdEnabled &&
  138. !pMsg->payloadHeaderEnabled) {
  139. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  140. "Cannot process DataSetReader without WriterGroup"
  141. "and DataSetWriter identifiers");
  142. return UA_STATUSCODE_BADNOTIMPLEMENTED;
  143. }
  144. if((reader->config.writerGroupId == pMsg->groupHeader.writerGroupId) &&
  145. (reader->config.dataSetWriterId == *pMsg->payloadHeader.dataSetPayloadHeader.dataSetWriterIds)) {
  146. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
  147. "DataSetReader found. Process NetworkMessage");
  148. return UA_STATUSCODE_GOOD;
  149. }
  150. return UA_STATUSCODE_BADNOTFOUND;
  151. }
  152. static UA_StatusCode
  153. getReaderFromIdentifier(UA_Server *server, UA_NetworkMessage *pMsg,
  154. UA_DataSetReader **dataSetReader, UA_PubSubConnection *pConnection) {
  155. UA_StatusCode retval = UA_STATUSCODE_BADNOTFOUND;
  156. if(!pMsg->publisherIdEnabled) {
  157. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  158. "Cannot process DataSetReader without PublisherId");
  159. return UA_STATUSCODE_BADNOTIMPLEMENTED; /* TODO: Handle DSR without PublisherId */
  160. }
  161. UA_ReaderGroup* readerGroup;
  162. LIST_FOREACH(readerGroup, &pConnection->readerGroups, listEntry) {
  163. UA_DataSetReader *tmpReader;
  164. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  165. switch (pMsg->publisherIdType) {
  166. case UA_PUBLISHERDATATYPE_BYTE:
  167. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_BYTE] &&
  168. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_BYTE &&
  169. pMsg->publisherId.publisherIdByte == *(UA_Byte*)tmpReader->config.publisherId.data) {
  170. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  171. }
  172. break;
  173. case UA_PUBLISHERDATATYPE_UINT16:
  174. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT16] &&
  175. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT16 &&
  176. pMsg->publisherId.publisherIdUInt16 == *(UA_UInt16*) tmpReader->config.publisherId.data) {
  177. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  178. }
  179. break;
  180. case UA_PUBLISHERDATATYPE_UINT32:
  181. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT32] &&
  182. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT32 &&
  183. pMsg->publisherId.publisherIdUInt32 == *(UA_UInt32*)tmpReader->config.publisherId.data) {
  184. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  185. }
  186. break;
  187. case UA_PUBLISHERDATATYPE_UINT64:
  188. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_UINT64] &&
  189. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_UINT64 &&
  190. pMsg->publisherId.publisherIdUInt64 == *(UA_UInt64*)tmpReader->config.publisherId.data) {
  191. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  192. }
  193. break;
  194. case UA_PUBLISHERDATATYPE_STRING:
  195. if(tmpReader->config.publisherId.type == &UA_TYPES[UA_TYPES_STRING] &&
  196. pMsg->publisherIdType == UA_PUBLISHERDATATYPE_STRING &&
  197. UA_String_equal(&pMsg->publisherId.publisherIdString,
  198. (UA_String*)tmpReader->config.publisherId.data)) {
  199. retval = checkReaderIdentifier(server, pMsg, tmpReader);
  200. }
  201. break;
  202. default:
  203. return UA_STATUSCODE_BADINTERNALERROR;
  204. }
  205. if(retval == UA_STATUSCODE_GOOD) {
  206. *dataSetReader = tmpReader;
  207. return UA_STATUSCODE_GOOD;
  208. }
  209. }
  210. }
  211. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  212. "Dataset reader not found. Check PublisherID, WriterGroupID and DatasetWriterID");
  213. return UA_STATUSCODE_BADNOTFOUND;
  214. }
  215. UA_ReaderGroup *
  216. UA_ReaderGroup_findRGbyId(UA_Server *server, UA_NodeId identifier) {
  217. UA_PubSubConnection *pubSubConnection;
  218. TAILQ_FOREACH(pubSubConnection, &server->pubSubManager.connections, listEntry){
  219. UA_ReaderGroup* readerGroup = NULL;
  220. LIST_FOREACH(readerGroup, &pubSubConnection->readerGroups, listEntry) {
  221. if(UA_NodeId_equal(&identifier, &readerGroup->identifier)) {
  222. return readerGroup;
  223. }
  224. }
  225. }
  226. return NULL;
  227. }
  228. UA_DataSetReader *UA_ReaderGroup_findDSRbyId(UA_Server *server, UA_NodeId identifier) {
  229. UA_PubSubConnection *pubSubConnection;
  230. TAILQ_FOREACH(pubSubConnection, &server->pubSubManager.connections, listEntry){
  231. UA_ReaderGroup* readerGroup = NULL;
  232. LIST_FOREACH(readerGroup, &pubSubConnection->readerGroups, listEntry) {
  233. UA_DataSetReader *tmpReader;
  234. LIST_FOREACH(tmpReader, &readerGroup->readers, listEntry) {
  235. if(UA_NodeId_equal(&tmpReader->identifier, &identifier)) {
  236. return tmpReader;
  237. }
  238. }
  239. }
  240. }
  241. return NULL;
  242. }
  243. /* This callback triggers the collection and reception of NetworkMessages and the
  244. * contained DataSetMessages. */
  245. void UA_ReaderGroup_subscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  246. UA_PubSubConnection *connection =
  247. UA_PubSubConnection_findConnectionbyId(server, readerGroup->linkedConnection);
  248. UA_ByteString buffer;
  249. if(UA_ByteString_allocBuffer(&buffer, 512) != UA_STATUSCODE_GOOD) {
  250. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Message buffer alloc failed!");
  251. return;
  252. }
  253. // TFR: changed the timeout from 1000 (1ms) to 1 (1us).
  254. // TFR: blocking the OPC UA Server to wait for a UDP PubSub message reduces performance and should be avoided
  255. connection->channel->receive(connection->channel, &buffer, NULL, 1);
  256. if(buffer.length > 0) {
  257. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND, "Message received:");
  258. UA_NetworkMessage currentNetworkMessage;
  259. memset(&currentNetworkMessage, 0, sizeof(UA_NetworkMessage));
  260. size_t currentPosition = 0;
  261. UA_NetworkMessage_decodeBinary(&buffer, &currentPosition, &currentNetworkMessage);
  262. UA_Server_processNetworkMessage(server, &currentNetworkMessage, connection);
  263. UA_NetworkMessage_deleteMembers(&currentNetworkMessage);
  264. }
  265. UA_ByteString_deleteMembers(&buffer);
  266. }
  267. /* Add new subscribeCallback. The first execution is triggered directly after
  268. * creation. */
  269. UA_StatusCode
  270. UA_ReaderGroup_addSubscribeCallback(UA_Server *server, UA_ReaderGroup *readerGroup) {
  271. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  272. retval |= UA_PubSubManager_addRepeatedCallback(server,
  273. (UA_ServerCallback) UA_ReaderGroup_subscribeCallback,
  274. readerGroup, 5, &readerGroup->subscribeCallbackId);
  275. if(retval == UA_STATUSCODE_GOOD) {
  276. readerGroup->subscribeCallbackIsRegistered = true;
  277. }
  278. /* Run once after creation */
  279. UA_ReaderGroup_subscribeCallback(server, readerGroup);
  280. return retval;
  281. }
  282. /**********/
  283. /* Reader */
  284. /**********/
  285. UA_StatusCode
  286. UA_Server_addDataSetReader(UA_Server *server, UA_NodeId readerGroupIdentifier,
  287. const UA_DataSetReaderConfig *dataSetReaderConfig,
  288. UA_NodeId *readerIdentifier) {
  289. /* Search the reader group by the given readerGroupIdentifier */
  290. UA_ReaderGroup *readerGroup = UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  291. if(!dataSetReaderConfig) {
  292. return UA_STATUSCODE_BADNOTFOUND;
  293. }
  294. if(readerGroup == NULL) {
  295. return UA_STATUSCODE_BADNOTFOUND;
  296. }
  297. /* Allocate memory for new DataSetReader */
  298. UA_DataSetReader *newDataSetReader = (UA_DataSetReader *)UA_calloc(1, sizeof(UA_DataSetReader));
  299. /* Copy the config into the new dataSetReader */
  300. UA_DataSetReaderConfig_copy(dataSetReaderConfig, &newDataSetReader->config);
  301. newDataSetReader->linkedReaderGroup = readerGroup->identifier;
  302. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetReader->identifier);
  303. if(readerIdentifier != NULL) {
  304. UA_NodeId_copy(&newDataSetReader->identifier, readerIdentifier);
  305. }
  306. /* Add the new reader to the group */
  307. LIST_INSERT_HEAD(&readerGroup->readers, newDataSetReader, listEntry);
  308. readerGroup->readersCount++;
  309. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  310. addDataSetReaderRepresentation(server, newDataSetReader);
  311. #endif
  312. return UA_STATUSCODE_GOOD;
  313. }
  314. UA_StatusCode
  315. UA_Server_removeDataSetReader(UA_Server *server, UA_NodeId readerIdentifier) {
  316. /* Remove datasetreader given by the identifier */
  317. UA_DataSetReader *dataSetReader = UA_ReaderGroup_findDSRbyId(server, readerIdentifier);
  318. if(!dataSetReader) {
  319. return UA_STATUSCODE_BADNOTFOUND;
  320. }
  321. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  322. removeDataSetReaderRepresentation(server, dataSetReader);
  323. #endif
  324. UA_DataSetReader_delete(server, dataSetReader);
  325. return UA_STATUSCODE_GOOD;
  326. }
  327. UA_StatusCode
  328. UA_Server_DataSetReader_updateConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier,
  329. UA_NodeId readerGroupIdentifier,
  330. const UA_DataSetReaderConfig *config) {
  331. if(config == NULL) {
  332. return UA_STATUSCODE_BADINVALIDARGUMENT;
  333. }
  334. UA_DataSetReader *currentDataSetReader =
  335. UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  336. UA_ReaderGroup *currentReaderGroup =
  337. UA_ReaderGroup_findRGbyId(server, readerGroupIdentifier);
  338. if(!currentDataSetReader) {
  339. return UA_STATUSCODE_BADNOTFOUND;
  340. }
  341. /* The update functionality will be extended during the next PubSub batches.
  342. * Currently is only a change of the publishing interval possible. */
  343. if(currentDataSetReader->config.writerGroupId != config->writerGroupId) {
  344. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentReaderGroup->subscribeCallbackId);
  345. currentDataSetReader->config.writerGroupId = config->writerGroupId;
  346. UA_ReaderGroup_subscribeCallback(server, currentReaderGroup);
  347. }
  348. else {
  349. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  350. "No or unsupported ReaderGroup update.");
  351. }
  352. return UA_STATUSCODE_GOOD;
  353. }
  354. UA_StatusCode
  355. UA_Server_DataSetReader_getConfig(UA_Server *server, UA_NodeId dataSetReaderIdentifier,
  356. UA_DataSetReaderConfig *config) {
  357. if(!config) {
  358. return UA_STATUSCODE_BADINVALIDARGUMENT;
  359. }
  360. UA_DataSetReader *currentDataSetReader =
  361. UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  362. if(!currentDataSetReader) {
  363. return UA_STATUSCODE_BADNOTFOUND;
  364. }
  365. UA_DataSetReaderConfig tmpReaderConfig;
  366. /* Deep copy of the actual config */
  367. UA_DataSetReaderConfig_copy(&currentDataSetReader->config, &tmpReaderConfig);
  368. *config = tmpReaderConfig;
  369. return UA_STATUSCODE_GOOD;
  370. }
  371. UA_StatusCode
  372. UA_DataSetReaderConfig_copy(const UA_DataSetReaderConfig *src,
  373. UA_DataSetReaderConfig *dst) {
  374. memset(dst, 0, sizeof(UA_DataSetReaderConfig));
  375. UA_StatusCode retVal = UA_String_copy(&src->name, &dst->name);
  376. if(retVal != UA_STATUSCODE_GOOD) {
  377. return retVal;
  378. }
  379. retVal = UA_Variant_copy(&src->publisherId, &dst->publisherId);
  380. if(retVal != UA_STATUSCODE_GOOD) {
  381. return retVal;
  382. }
  383. dst->writerGroupId = src->writerGroupId;
  384. dst->dataSetWriterId = src->dataSetWriterId;
  385. retVal = UA_DataSetMetaDataType_copy(&src->dataSetMetaData, &dst->dataSetMetaData);
  386. if(retVal != UA_STATUSCODE_GOOD) {
  387. return retVal;
  388. }
  389. dst->dataSetFieldContentMask = src->dataSetFieldContentMask;
  390. dst->messageReceiveTimeout = src->messageReceiveTimeout;
  391. /* Currently memcpy is used to copy the securityParameters */
  392. memcpy(&dst->securityParameters, &src->securityParameters, sizeof(UA_PubSubSecurityParameters));
  393. retVal = UA_UadpDataSetReaderMessageDataType_copy(&src->messageSettings, &dst->messageSettings);
  394. if(retVal != UA_STATUSCODE_GOOD) {
  395. return retVal;
  396. }
  397. return UA_STATUSCODE_GOOD;
  398. }
  399. /* This Method is used to initially set the SubscribedDataSet to
  400. * TargetVariablesType and to create the list of target Variables of a
  401. * SubscribedDataSetType. */
  402. UA_StatusCode
  403. UA_Server_DataSetReader_createTargetVariables(UA_Server *server,
  404. UA_NodeId dataSetReaderIdentifier,
  405. UA_TargetVariablesDataType *targetVariables) {
  406. UA_StatusCode retval = UA_STATUSCODE_BADUNEXPECTEDERROR;
  407. UA_DataSetReader* pDS = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  408. if(pDS == NULL) {
  409. return UA_STATUSCODE_BADINVALIDARGUMENT;
  410. }
  411. if(pDS->subscribedDataSetTarget.targetVariablesSize > 0) {
  412. UA_TargetVariablesDataType_deleteMembers(&pDS->subscribedDataSetTarget);
  413. pDS->subscribedDataSetTarget.targetVariablesSize = 0;
  414. pDS->subscribedDataSetTarget.targetVariables = NULL;
  415. }
  416. /* Set subscribed dataset to TargetVariableType */
  417. pDS->subscribedDataSetType = UA_PUBSUB_SDS_TARGET;
  418. retval = UA_TargetVariablesDataType_copy(targetVariables, &pDS->subscribedDataSetTarget);
  419. return retval;
  420. }
  421. /* Adds Subscribed Variables from the DataSetMetaData for the given DataSet into
  422. * the given parent node and creates the corresponding data in the
  423. * targetVariables of the DataSetReader */
  424. UA_StatusCode
  425. UA_Server_DataSetReader_addTargetVariables(UA_Server *server, UA_NodeId *parentNode,
  426. UA_NodeId dataSetReaderIdentifier,
  427. UA_SubscribedDataSetEnumType sdsType) {
  428. if((server == NULL) || (parentNode == NULL)) {
  429. return UA_STATUSCODE_BADINVALIDARGUMENT;
  430. }
  431. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  432. UA_DataSetReader* pDataSetReader = UA_ReaderGroup_findDSRbyId(server, dataSetReaderIdentifier);
  433. if(pDataSetReader == NULL) {
  434. return UA_STATUSCODE_BADINVALIDARGUMENT;
  435. }
  436. UA_TargetVariablesDataType targetVars;
  437. targetVars.targetVariablesSize = pDataSetReader->config.dataSetMetaData.fieldsSize;
  438. targetVars.targetVariables = (UA_FieldTargetDataType*)
  439. UA_calloc(targetVars.targetVariablesSize, sizeof(UA_FieldTargetDataType));
  440. for (size_t i = 0; i < pDataSetReader->config.dataSetMetaData.fieldsSize; i++) {
  441. UA_VariableAttributes vAttr = UA_VariableAttributes_default;
  442. vAttr.valueRank = pDataSetReader->config.dataSetMetaData.fields[i].valueRank;
  443. if(pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensionsSize > 0) {
  444. retval = UA_Array_copy(pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensions,
  445. pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensionsSize,
  446. (void**)&vAttr.arrayDimensions, &UA_TYPES[UA_TYPES_UINT32]);
  447. if(retval == UA_STATUSCODE_GOOD) {
  448. vAttr.arrayDimensionsSize =
  449. pDataSetReader->config.dataSetMetaData.fields[i].arrayDimensionsSize;
  450. }
  451. }
  452. vAttr.dataType = pDataSetReader->config.dataSetMetaData.fields[i].dataType;
  453. vAttr.accessLevel = UA_ACCESSLEVELMASK_READ;
  454. UA_LocalizedText_copy(&pDataSetReader->config.dataSetMetaData.fields[i].description,
  455. &vAttr.description);
  456. UA_QualifiedName qn;
  457. UA_QualifiedName_init(&qn);
  458. char szTmpName[UA_MAX_SIZENAME];
  459. if(pDataSetReader->config.dataSetMetaData.fields[i].name.length > 0) {
  460. UA_UInt16 slen = UA_MAX_SIZENAME -1;
  461. vAttr.displayName.locale = UA_STRING("en-US");
  462. vAttr.displayName.text = pDataSetReader->config.dataSetMetaData.fields[i].name;
  463. if(pDataSetReader->config.dataSetMetaData.fields[i].name.length < slen) {
  464. slen = (UA_UInt16)pDataSetReader->config.dataSetMetaData.fields[i].name.length;
  465. UA_snprintf(szTmpName, sizeof(szTmpName), "%.*s", (int)slen,
  466. (const char*)pDataSetReader->config.dataSetMetaData.fields[i].name.data);
  467. }
  468. szTmpName[slen] = '\0';
  469. qn = UA_QUALIFIEDNAME(1, szTmpName);
  470. }
  471. else {
  472. strcpy(szTmpName, "SubscribedVariable");
  473. vAttr.displayName = UA_LOCALIZEDTEXT("en-US", szTmpName);
  474. qn = UA_QUALIFIEDNAME(1, "SubscribedVariable");
  475. }
  476. /* Add variable to the given parent node */
  477. UA_NodeId newNode;
  478. retval = UA_Server_addVariableNode(server, UA_NODEID_NULL, *parentNode,
  479. UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), qn,
  480. UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
  481. vAttr, NULL, &newNode);
  482. if(retval == UA_STATUSCODE_GOOD) {
  483. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_USERLAND,
  484. "addVariableNode %s succeeded", szTmpName);
  485. }
  486. else {
  487. UA_LOG_ERROR(&server->config.logger, UA_LOGCATEGORY_USERLAND,
  488. "addVariableNode: error 0x%x", retval);
  489. }
  490. UA_FieldTargetDataType_init(&targetVars.targetVariables[i]);
  491. targetVars.targetVariables[i].attributeId = UA_ATTRIBUTEID_VALUE;
  492. UA_NodeId_copy(&newNode, &targetVars.targetVariables[i].targetNodeId);
  493. UA_NodeId_deleteMembers(&newNode);
  494. if(vAttr.arrayDimensionsSize > 0) {
  495. UA_Array_delete(vAttr.arrayDimensions, vAttr.arrayDimensionsSize,
  496. &UA_TYPES[UA_TYPES_UINT32]);
  497. }
  498. }
  499. if(sdsType == UA_PUBSUB_SDS_TARGET) {
  500. retval = UA_Server_DataSetReader_createTargetVariables(server, pDataSetReader->identifier,
  501. &targetVars);
  502. }
  503. UA_TargetVariablesDataType_deleteMembers(&targetVars);
  504. return retval;
  505. }
  506. void
  507. UA_Server_DataSetReader_process(UA_Server *server, UA_DataSetReader *dataSetReader,
  508. UA_DataSetMessage* dataSetMsg) {
  509. if((dataSetReader == NULL) || (dataSetMsg == NULL) || (server == NULL)) {
  510. return;
  511. }
  512. if(!dataSetMsg->header.dataSetMessageValid) {
  513. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER,
  514. "DataSetMessage is discarded: message is not valid");
  515. /* To Do check ConfigurationVersion*/
  516. /*if(dataSetMsg->header.configVersionMajorVersionEnabled)
  517. * {
  518. * if(dataSetMsg->header.configVersionMajorVersion != dataSetReader->config.dataSetMetaData.configurationVersion.majorVersion)
  519. * {
  520. * UA_LOG_WARNING(server->config.logger, UA_LOGCATEGORY_SERVER, "DataSetMessage is discarded: ConfigurationVersion MajorVersion does not match");
  521. * return;
  522. * }
  523. } */
  524. return;
  525. }
  526. if(dataSetMsg->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) {
  527. if(dataSetMsg->header.fieldEncoding != UA_FIELDENCODING_RAWDATA) {
  528. size_t anzFields = dataSetMsg->data.keyFrameData.fieldCount;
  529. if(dataSetReader->config.dataSetMetaData.fieldsSize < anzFields) {
  530. anzFields = dataSetReader->config.dataSetMetaData.fieldsSize;
  531. }
  532. if(dataSetReader->subscribedDataSetTarget.targetVariablesSize < anzFields) {
  533. anzFields = dataSetReader->subscribedDataSetTarget.targetVariablesSize;
  534. }
  535. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  536. for(UA_UInt16 i = 0; i < anzFields; i++) {
  537. if(dataSetMsg->data.keyFrameData.dataSetFields[i].hasValue) {
  538. if(dataSetReader->subscribedDataSetTarget.targetVariables[i].attributeId == UA_ATTRIBUTEID_VALUE) {
  539. retVal = UA_Server_writeValue(server, dataSetReader->subscribedDataSetTarget.targetVariables[i].targetNodeId, dataSetMsg->data.keyFrameData.dataSetFields[i].value);
  540. if(retVal != UA_STATUSCODE_GOOD) {
  541. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write Value KF %u: 0x%x", i, retVal);
  542. }
  543. }
  544. else {
  545. UA_WriteValue writeVal;
  546. UA_WriteValue_init(&writeVal);
  547. writeVal.attributeId = dataSetReader->subscribedDataSetTarget.targetVariables[i].attributeId;
  548. writeVal.indexRange = dataSetReader->subscribedDataSetTarget.targetVariables[i].receiverIndexRange;
  549. writeVal.nodeId = dataSetReader->subscribedDataSetTarget.targetVariables[i].targetNodeId;
  550. UA_DataValue_copy(&dataSetMsg->data.keyFrameData.dataSetFields[i], &writeVal.value);
  551. retVal = UA_Server_write(server, &writeVal);
  552. if(retVal != UA_STATUSCODE_GOOD) {
  553. UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Error Write KF %u: 0x%x", i, retVal);
  554. }
  555. }
  556. }
  557. }
  558. }
  559. }
  560. }
  561. void UA_DataSetReader_delete(UA_Server *server, UA_DataSetReader *dataSetReader) {
  562. /* Delete DataSetReader config */
  563. UA_String_deleteMembers(&dataSetReader->config.name);
  564. UA_Variant_deleteMembers(&dataSetReader->config.publisherId);
  565. UA_DataSetMetaDataType_deleteMembers(&dataSetReader->config.dataSetMetaData);
  566. UA_UadpDataSetReaderMessageDataType_deleteMembers(&dataSetReader->config.messageSettings);
  567. UA_TargetVariablesDataType_deleteMembers(&dataSetReader->subscribedDataSetTarget);
  568. /* Delete DataSetReader */
  569. UA_ReaderGroup* pGroup = UA_ReaderGroup_findRGbyId(server, dataSetReader->linkedReaderGroup);
  570. if(pGroup != NULL) {
  571. pGroup->readersCount--;
  572. }
  573. UA_NodeId_deleteMembers(&dataSetReader->identifier);
  574. UA_NodeId_deleteMembers(&dataSetReader->linkedReaderGroup);
  575. /* Remove DataSetReader from group */
  576. LIST_REMOVE(dataSetReader, listEntry);
  577. /* Free memory allocated for DataSetReader */
  578. UA_free(dataSetReader);
  579. }
  580. UA_StatusCode
  581. UA_Server_processNetworkMessage(UA_Server *server, UA_NetworkMessage *pMsg,
  582. UA_PubSubConnection *pConnection) {
  583. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  584. if(!pMsg || !pConnection)
  585. return UA_STATUSCODE_BADINVALIDARGUMENT;
  586. /* To Do Handle multiple DataSetMessage for one NetworkMessage */
  587. /* To Do The condition pMsg->dataSetClassIdEnabled
  588. * Here some filtering is possible */
  589. UA_DataSetReader *dataSetReader;
  590. retval = getReaderFromIdentifier(server, pMsg, &dataSetReader, pConnection);
  591. if(retval != UA_STATUSCODE_GOOD) {
  592. return retval;
  593. }
  594. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER,
  595. "DataSetReader found with PublisherId");
  596. UA_Byte anzDataSets = 1;
  597. if(pMsg->payloadHeaderEnabled)
  598. anzDataSets = pMsg->payloadHeader.dataSetPayloadHeader.count;
  599. for(UA_Byte iterator = 0; iterator < anzDataSets; iterator++) {
  600. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Process Msg with DataSetReader!");
  601. UA_Server_DataSetReader_process(server, dataSetReader,
  602. &pMsg->payload.dataSetPayload.dataSetMessages[iterator]);
  603. }
  604. /* To Do Handle when dataSetReader parameters are null for publisherId
  605. * and zero for WriterGroupId and DataSetWriterId */
  606. return UA_STATUSCODE_GOOD;
  607. }
  608. #endif /* UA_ENABLE_PUBSUB */