ua_pubsub.c 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. */
  8. #include "server/ua_server_internal.h"
  9. #include "ua_types_encoding_binary.h"
  10. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  11. #include "ua_server_pubsub.h"
  12. #include "ua_pubsub.h"
  13. #include "ua_pubsub_manager.h"
  14. #include "ua_pubsub_networkmessage.h"
  15. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  16. #include "ua_pubsub_ns0.h"
  17. #endif
  18. #define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */
  19. /* Forward declaration */
  20. static void
  21. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup);
  22. static void
  23. UA_DataSetField_deleteMembers(UA_DataSetField *field);
  24. /**********************************************/
  25. /* Connection */
  26. /**********************************************/
  27. UA_StatusCode
  28. UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
  29. UA_PubSubConnectionConfig *dst) {
  30. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  31. memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
  32. retVal |= UA_String_copy(&src->name, &dst->name);
  33. retVal |= UA_Variant_copy(&src->address, &dst->address);
  34. retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
  35. retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
  36. if(src->connectionPropertiesSize > 0){
  37. dst->connectionProperties = (UA_KeyValuePair *)
  38. UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
  39. if(!dst->connectionProperties){
  40. return UA_STATUSCODE_BADOUTOFMEMORY;
  41. }
  42. for(size_t i = 0; i < src->connectionPropertiesSize; i++){
  43. retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
  44. &dst->connectionProperties[i].key);
  45. retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
  46. &dst->connectionProperties[i].value);
  47. }
  48. }
  49. return retVal;
  50. }
  51. UA_StatusCode
  52. UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
  53. UA_PubSubConnectionConfig *config) {
  54. if(!config)
  55. return UA_STATUSCODE_BADINVALIDARGUMENT;
  56. UA_PubSubConnection *currentPubSubConnection =
  57. UA_PubSubConnection_findConnectionbyId(server, connection);
  58. if(!currentPubSubConnection)
  59. return UA_STATUSCODE_BADNOTFOUND;
  60. UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
  61. //deep copy of the actual config
  62. UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
  63. *config = tmpPubSubConnectionConfig;
  64. return UA_STATUSCODE_GOOD;
  65. }
  66. UA_PubSubConnection *
  67. UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
  68. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  69. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
  70. return &server->pubSubManager.connections[i];
  71. }
  72. }
  73. return NULL;
  74. }
  75. void
  76. UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig) {
  77. UA_String_deleteMembers(&connectionConfig->name);
  78. UA_String_deleteMembers(&connectionConfig->transportProfileUri);
  79. UA_Variant_deleteMembers(&connectionConfig->connectionTransportSettings);
  80. UA_Variant_deleteMembers(&connectionConfig->address);
  81. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  82. UA_QualifiedName_deleteMembers(&connectionConfig->connectionProperties[i].key);
  83. UA_Variant_deleteMembers(&connectionConfig->connectionProperties[i].value);
  84. }
  85. UA_free(connectionConfig->connectionProperties);
  86. }
  87. void
  88. UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection) {
  89. //delete connection config
  90. UA_PubSubConnectionConfig_deleteMembers(connection->config);
  91. //remove contained WriterGroups
  92. UA_WriterGroup *writerGroup, *tmpWriterGroup;
  93. LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
  94. UA_Server_removeWriterGroup(server, writerGroup->identifier);
  95. }
  96. UA_NodeId_deleteMembers(&connection->identifier);
  97. if(connection->channel){
  98. connection->channel->close(connection->channel);
  99. }
  100. UA_free(connection->config);
  101. }
  102. UA_StatusCode
  103. UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
  104. const UA_WriterGroupConfig *writerGroupConfig,
  105. UA_NodeId *writerGroupIdentifier) {
  106. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  107. if(!writerGroupConfig)
  108. return UA_STATUSCODE_BADINVALIDARGUMENT;
  109. //search the connection by the given connectionIdentifier
  110. UA_PubSubConnection *currentConnectionContext =
  111. UA_PubSubConnection_findConnectionbyId(server, connection);
  112. if(!currentConnectionContext)
  113. return UA_STATUSCODE_BADNOTFOUND;
  114. //allocate memory for new WriterGroup
  115. UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
  116. if (!newWriterGroup)
  117. return UA_STATUSCODE_BADOUTOFMEMORY;
  118. newWriterGroup->linkedConnection = currentConnectionContext->identifier;
  119. UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
  120. if(writerGroupIdentifier){
  121. UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
  122. }
  123. //deep copy of the config
  124. UA_WriterGroupConfig tmpWriterGroupConfig;
  125. retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
  126. if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
  127. UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
  128. tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
  129. tmpWriterGroupConfig.messageSettings.content.decoded.type =
  130. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
  131. tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
  132. }
  133. newWriterGroup->config = tmpWriterGroupConfig;
  134. retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
  135. LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
  136. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  137. addWriterGroupRepresentation(server, newWriterGroup);
  138. #endif
  139. return retVal;
  140. }
  141. UA_StatusCode
  142. UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
  143. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  144. if(!wg)
  145. return UA_STATUSCODE_BADNOTFOUND;
  146. UA_PubSubConnection *connection =
  147. UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  148. if(!connection)
  149. return UA_STATUSCODE_BADNOTFOUND;
  150. //unregister the publish callback
  151. UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
  152. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  153. removeGroupRepresentation(server, wg);
  154. #endif
  155. UA_WriterGroup_deleteMembers(server, wg);
  156. LIST_REMOVE(wg, listEntry);
  157. UA_free(wg);
  158. return UA_STATUSCODE_GOOD;
  159. }
  160. /**********************************************/
  161. /* PublishedDataSet */
  162. /**********************************************/
  163. UA_StatusCode
  164. UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
  165. UA_PublishedDataSetConfig *dst) {
  166. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  167. memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
  168. retVal |= UA_String_copy(&src->name, &dst->name);
  169. switch(src->publishedDataSetType){
  170. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  171. //no additional items
  172. break;
  173. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  174. if (src->config.itemsTemplate.variablesToAddSize > 0){
  175. dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
  176. src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
  177. }
  178. for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
  179. retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
  180. &dst->config.itemsTemplate.variablesToAdd[i]);
  181. }
  182. retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
  183. &dst->config.itemsTemplate.metaData);
  184. break;
  185. default:
  186. return UA_STATUSCODE_BADINVALIDARGUMENT;
  187. }
  188. return retVal;
  189. }
  190. UA_StatusCode
  191. UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
  192. UA_PublishedDataSetConfig *config){
  193. if(!config)
  194. return UA_STATUSCODE_BADINVALIDARGUMENT;
  195. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  196. if(!currentPublishedDataSet)
  197. return UA_STATUSCODE_BADNOTFOUND;
  198. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  199. //deep copy of the actual config
  200. UA_PublishedDataSetConfig_copy(&currentPublishedDataSet->config, &tmpPublishedDataSetConfig);
  201. *config = tmpPublishedDataSetConfig;
  202. return UA_STATUSCODE_GOOD;
  203. }
  204. UA_PublishedDataSet *
  205. UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
  206. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  207. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
  208. return &server->pubSubManager.publishedDataSets[i];
  209. }
  210. }
  211. return NULL;
  212. }
  213. void
  214. UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
  215. //delete pds config
  216. UA_String_deleteMembers(&pdsConfig->name);
  217. switch (pdsConfig->publishedDataSetType){
  218. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  219. //no additional items
  220. break;
  221. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  222. if (pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
  223. for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
  224. UA_PublishedVariableDataType_deleteMembers(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
  225. }
  226. UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
  227. }
  228. UA_DataSetMetaDataType_deleteMembers(&pdsConfig->config.itemsTemplate.metaData);
  229. break;
  230. default:
  231. break;
  232. }
  233. }
  234. void
  235. UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
  236. UA_PublishedDataSetConfig_deleteMembers(&publishedDataSet->config);
  237. //delete PDS
  238. UA_DataSetMetaDataType_deleteMembers(&publishedDataSet->dataSetMetaData);
  239. UA_DataSetField *field, *tmpField;
  240. LIST_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
  241. UA_Server_removeDataSetField(server, field->identifier);
  242. }
  243. UA_NodeId_deleteMembers(&publishedDataSet->identifier);
  244. }
  245. UA_DataSetFieldResult
  246. UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
  247. const UA_DataSetFieldConfig *fieldConfig,
  248. UA_NodeId *fieldIdentifier) {
  249. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  250. UA_DataSetFieldResult result = {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
  251. if(!fieldConfig)
  252. return result;
  253. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
  254. if(currentDataSet == NULL){
  255. result.result = UA_STATUSCODE_BADNOTFOUND;
  256. return result;
  257. }
  258. if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
  259. result.result = UA_STATUSCODE_BADNOTIMPLEMENTED;
  260. return result;
  261. }
  262. UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
  263. if(!newField){
  264. result.result = UA_STATUSCODE_BADINTERNALERROR;
  265. return result;
  266. }
  267. UA_DataSetFieldConfig tmpFieldConfig;
  268. retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
  269. newField->config = tmpFieldConfig;
  270. UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
  271. if(fieldIdentifier != NULL){
  272. UA_NodeId_copy(&newField->identifier, fieldIdentifier);
  273. }
  274. newField->publishedDataSet = currentDataSet->identifier;
  275. //update major version of parent published data set
  276. currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
  277. LIST_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
  278. if(newField->config.field.variable.promotedField)
  279. currentDataSet->promotedFieldsCount++;
  280. currentDataSet->fieldSize++;
  281. result.result = retVal;
  282. result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  283. result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  284. return result;
  285. }
  286. UA_DataSetFieldResult
  287. UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
  288. UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
  289. UA_DataSetFieldResult result = {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  290. if(!currentField)
  291. return result;
  292. UA_PublishedDataSet *parentPublishedDataSet =
  293. UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
  294. if(!parentPublishedDataSet)
  295. return result;
  296. parentPublishedDataSet->fieldSize--;
  297. if(currentField->config.field.variable.promotedField)
  298. parentPublishedDataSet->promotedFieldsCount--;
  299. /* update major version of PublishedDataSet */
  300. parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
  301. UA_PubSubConfigurationVersionTimeDifference();
  302. UA_DataSetField_deleteMembers(currentField);
  303. LIST_REMOVE(currentField, listEntry);
  304. UA_free(currentField);
  305. result.result = UA_STATUSCODE_GOOD;
  306. result.configurationVersion.majorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion;
  307. result.configurationVersion.minorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion;
  308. return result;
  309. }
  310. /**********************************************/
  311. /* DataSetWriter */
  312. /**********************************************/
  313. UA_StatusCode
  314. UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
  315. UA_DataSetWriterConfig *dst){
  316. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  317. memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
  318. retVal |= UA_String_copy(&src->name, &dst->name);
  319. retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
  320. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  321. dst->dataSetWriterProperties = (UA_KeyValuePair *)
  322. UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
  323. if(!dst->dataSetWriterProperties)
  324. return UA_STATUSCODE_BADOUTOFMEMORY;
  325. for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
  326. retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
  327. }
  328. return retVal;
  329. }
  330. UA_StatusCode
  331. UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
  332. UA_DataSetWriterConfig *config){
  333. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  334. if(!config)
  335. return UA_STATUSCODE_BADINVALIDARGUMENT;
  336. UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  337. if(!currentDataSetWriter)
  338. return UA_STATUSCODE_BADNOTFOUND;
  339. UA_DataSetWriterConfig tmpWriterConfig;
  340. //deep copy of the actual config
  341. retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
  342. *config = tmpWriterConfig;
  343. return retVal;
  344. }
  345. UA_DataSetWriter *
  346. UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
  347. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  348. UA_WriterGroup *tmpWriterGroup;
  349. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
  350. UA_DataSetWriter *tmpWriter;
  351. LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
  352. if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
  353. return tmpWriter;
  354. }
  355. }
  356. }
  357. }
  358. return NULL;
  359. }
  360. void
  361. UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig) {
  362. UA_String_deleteMembers(&pdsConfig->name);
  363. UA_String_deleteMembers(&pdsConfig->dataSetName);
  364. for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
  365. UA_KeyValuePair_deleteMembers(&pdsConfig->dataSetWriterProperties[i]);
  366. }
  367. UA_free(pdsConfig->dataSetWriterProperties);
  368. UA_ExtensionObject_deleteMembers(&pdsConfig->messageSettings);
  369. }
  370. static void
  371. UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
  372. UA_DataSetWriterConfig_deleteMembers(&dataSetWriter->config);
  373. //delete DataSetWriter
  374. UA_NodeId_deleteMembers(&dataSetWriter->identifier);
  375. UA_NodeId_deleteMembers(&dataSetWriter->linkedWriterGroup);
  376. UA_NodeId_deleteMembers(&dataSetWriter->connectedDataSet);
  377. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  378. //delete lastSamples store
  379. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
  380. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  381. }
  382. UA_free(dataSetWriter->lastSamples);
  383. dataSetWriter->lastSamples = NULL;
  384. dataSetWriter->lastSamplesCount = 0;
  385. #endif
  386. }
  387. /**********************************************/
  388. /* WriterGroup */
  389. /**********************************************/
  390. UA_StatusCode
  391. UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
  392. UA_WriterGroupConfig *dst){
  393. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  394. memcpy(dst, src, sizeof(UA_WriterGroupConfig));
  395. retVal |= UA_String_copy(&src->name, &dst->name);
  396. retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
  397. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  398. dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
  399. if(!dst->groupProperties)
  400. return UA_STATUSCODE_BADOUTOFMEMORY;
  401. for(size_t i = 0; i < src->groupPropertiesSize; i++){
  402. retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
  403. }
  404. return retVal;
  405. }
  406. UA_StatusCode
  407. UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
  408. UA_WriterGroupConfig *config){
  409. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  410. if(!config)
  411. return UA_STATUSCODE_BADINVALIDARGUMENT;
  412. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
  413. if(!currentWriterGroup){
  414. return UA_STATUSCODE_BADNOTFOUND;
  415. }
  416. UA_WriterGroupConfig tmpWriterGroupConfig;
  417. //deep copy of the actual config
  418. retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
  419. *config = tmpWriterGroupConfig;
  420. return retVal;
  421. }
  422. UA_StatusCode
  423. UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
  424. const UA_WriterGroupConfig *config){
  425. if(!config)
  426. return UA_STATUSCODE_BADINVALIDARGUMENT;
  427. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
  428. if(!currentWriterGroup)
  429. return UA_STATUSCODE_BADNOTFOUND;
  430. //The update functionality will be extended during the next PubSub batches.
  431. //Currently is only a change of the publishing interval possible.
  432. if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
  433. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
  434. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  435. UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
  436. } else if(currentWriterGroup->config.priority != config->priority) {
  437. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  438. "No or unsupported WriterGroup update.");
  439. }
  440. return UA_STATUSCODE_GOOD;
  441. }
  442. UA_WriterGroup *
  443. UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
  444. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  445. UA_WriterGroup *tmpWriterGroup;
  446. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
  447. if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
  448. return tmpWriterGroup;
  449. }
  450. }
  451. }
  452. return NULL;
  453. }
  454. void
  455. UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig){
  456. //delete writerGroup config
  457. UA_String_deleteMembers(&writerGroupConfig->name);
  458. UA_ExtensionObject_deleteMembers(&writerGroupConfig->transportSettings);
  459. UA_ExtensionObject_deleteMembers(&writerGroupConfig->messageSettings);
  460. for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
  461. UA_KeyValuePair_deleteMembers(&writerGroupConfig->groupProperties[i]);
  462. }
  463. UA_free(writerGroupConfig->groupProperties);
  464. }
  465. static void
  466. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup) {
  467. UA_WriterGroupConfig_deleteMembers(&writerGroup->config);
  468. //delete WriterGroup
  469. //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
  470. UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
  471. LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
  472. UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
  473. }
  474. UA_NodeId_deleteMembers(&writerGroup->linkedConnection);
  475. UA_NodeId_deleteMembers(&writerGroup->identifier);
  476. }
  477. UA_StatusCode
  478. UA_Server_addDataSetWriter(UA_Server *server,
  479. const UA_NodeId writerGroup, const UA_NodeId dataSet,
  480. const UA_DataSetWriterConfig *dataSetWriterConfig,
  481. UA_NodeId *writerIdentifier) {
  482. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  483. if(!dataSetWriterConfig)
  484. return UA_STATUSCODE_BADINVALIDARGUMENT;
  485. UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
  486. if(!currentDataSetContext)
  487. return UA_STATUSCODE_BADNOTFOUND;
  488. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  489. if(!wg)
  490. return UA_STATUSCODE_BADNOTFOUND;
  491. UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
  492. if(!newDataSetWriter)
  493. return UA_STATUSCODE_BADOUTOFMEMORY;
  494. //copy the config into the new dataSetWriter
  495. UA_DataSetWriterConfig tmpDataSetWriterConfig;
  496. retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
  497. newDataSetWriter->config = tmpDataSetWriterConfig;
  498. //save the current version of the connected PublishedDataSet
  499. newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
  500. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  501. //initialize the queue for the last values
  502. newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
  503. UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
  504. if(!newDataSetWriter->lastSamples) {
  505. UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
  506. UA_free(newDataSetWriter);
  507. return UA_STATUSCODE_BADOUTOFMEMORY;
  508. }
  509. newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
  510. #endif
  511. //connect PublishedDataSet with DataSetWriter
  512. newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
  513. newDataSetWriter->linkedWriterGroup = wg->identifier;
  514. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
  515. if(writerIdentifier != NULL)
  516. UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
  517. //add the new writer to the group
  518. LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
  519. wg->writersCount++;
  520. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  521. addDataSetWriterRepresentation(server, newDataSetWriter);
  522. #endif
  523. return retVal;
  524. }
  525. UA_StatusCode
  526. UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
  527. UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  528. if(!dataSetWriter)
  529. return UA_STATUSCODE_BADNOTFOUND;
  530. UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
  531. if(!linkedWriterGroup)
  532. return UA_STATUSCODE_BADNOTFOUND;
  533. linkedWriterGroup->writersCount--;
  534. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  535. removeDataSetWriterRepresentation(server, dataSetWriter);
  536. #endif
  537. //remove DataSetWriter from group
  538. UA_DataSetWriter_deleteMembers(server, dataSetWriter);
  539. LIST_REMOVE(dataSetWriter, listEntry);
  540. UA_free(dataSetWriter);
  541. return UA_STATUSCODE_GOOD;
  542. }
  543. /**********************************************/
  544. /* DataSetField */
  545. /**********************************************/
  546. UA_StatusCode
  547. UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
  548. memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
  549. if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
  550. UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
  551. UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
  552. &dst->field.variable.publishParameters);
  553. } else {
  554. return UA_STATUSCODE_BADNOTSUPPORTED;
  555. }
  556. return UA_STATUSCODE_GOOD;
  557. }
  558. UA_StatusCode
  559. UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
  560. UA_DataSetFieldConfig *config) {
  561. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  562. if(!config)
  563. return UA_STATUSCODE_BADINVALIDARGUMENT;
  564. UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
  565. if(!currentDataSetField)
  566. return UA_STATUSCODE_BADNOTFOUND;
  567. UA_DataSetFieldConfig tmpFieldConfig;
  568. //deep copy of the actual config
  569. retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
  570. *config = tmpFieldConfig;
  571. return retVal;
  572. }
  573. UA_DataSetField *
  574. UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
  575. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  576. UA_DataSetField *tmpField;
  577. LIST_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
  578. if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
  579. return tmpField;
  580. }
  581. }
  582. }
  583. return NULL;
  584. }
  585. void
  586. UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig){
  587. if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
  588. UA_String_deleteMembers(&dataSetFieldConfig->field.variable.fieldNameAlias);
  589. UA_PublishedVariableDataType_deleteMembers(&dataSetFieldConfig->field.variable.publishParameters);
  590. }
  591. }
  592. static void
  593. UA_DataSetField_deleteMembers(UA_DataSetField *field) {
  594. UA_DataSetFieldConfig_deleteMembers(&field->config);
  595. //delete DataSetField
  596. UA_NodeId_deleteMembers(&field->identifier);
  597. UA_NodeId_deleteMembers(&field->publishedDataSet);
  598. UA_FieldMetaData_deleteMembers(&field->fieldMetaData);
  599. }
  600. /*********************************************************/
  601. /* PublishValues handling */
  602. /*********************************************************/
  603. /**
  604. * Compare two variants. Internally used for value change detection.
  605. *
  606. * @return true if the value has changed
  607. */
  608. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  609. static UA_Boolean
  610. valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
  611. if(! (oldValue && newValue))
  612. return false;
  613. UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
  614. size_t oldValueEncodingSize, newValueEncodingSize;
  615. oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
  616. newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
  617. if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
  618. return false;
  619. if(oldValueEncodingSize != newValueEncodingSize)
  620. return true;
  621. if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
  622. return false;
  623. if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
  624. return false;
  625. UA_Byte *bufPosOldValue = oldValueEncoding->data;
  626. const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
  627. UA_Byte *bufPosNewValue = newValueEncoding->data;
  628. const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
  629. if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
  630. &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  631. return false;
  632. }
  633. if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
  634. &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  635. return false;
  636. }
  637. oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
  638. newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
  639. UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
  640. UA_ByteString_delete(oldValueEncoding);
  641. UA_ByteString_delete(newValueEncoding);
  642. return compareResult;
  643. }
  644. #endif
  645. /**
  646. * Obtain the latest value for a specific DataSetField. This method is currently
  647. * called inside the DataSetMessage generation process.
  648. */
  649. static void
  650. UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
  651. UA_DataValue *value) {
  652. /* Read the value */
  653. UA_ReadValueId rvid;
  654. UA_ReadValueId_init(&rvid);
  655. rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
  656. rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
  657. rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
  658. *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
  659. }
  660. static UA_StatusCode
  661. UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  662. UA_DataSetWriter *dataSetWriter) {
  663. UA_PublishedDataSet *currentDataSet =
  664. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  665. if(!currentDataSet)
  666. return UA_STATUSCODE_BADNOTFOUND;
  667. /* Prepare DataSetMessageContent */
  668. dataSetMessage->header.dataSetMessageValid = true;
  669. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
  670. dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
  671. dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
  672. UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
  673. if(!dataSetMessage->data.keyFrameData.dataSetFields)
  674. return UA_STATUSCODE_BADOUTOFMEMORY;
  675. #ifdef UA_ENABLE_JSON_ENCODING
  676. /* json: insert fieldnames used as json keys */
  677. dataSetMessage->data.keyFrameData.fieldNames =
  678. (UA_String *)UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_STRING]);
  679. if(!dataSetMessage->data.keyFrameData.fieldNames)
  680. return UA_STATUSCODE_BADOUTOFMEMORY;
  681. #endif
  682. /* Loop over the fields */
  683. size_t counter = 0;
  684. UA_DataSetField *dsf;
  685. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  686. #ifdef UA_ENABLE_JSON_ENCODING
  687. /* json: store the fieldNameAlias*/
  688. UA_String_copy(&dsf->config.field.variable.fieldNameAlias,
  689. &dataSetMessage->data.keyFrameData.fieldNames[counter]);
  690. #endif
  691. /* Sample the value */
  692. UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
  693. UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
  694. /* Deactivate statuscode? */
  695. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  696. dfv->hasStatus = false;
  697. /* Deactivate timestamps */
  698. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  699. dfv->hasSourceTimestamp = false;
  700. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  701. dfv->hasSourcePicoseconds = false;
  702. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  703. dfv->hasServerTimestamp = false;
  704. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  705. dfv->hasServerPicoseconds = false;
  706. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  707. /* Update lastValue store */
  708. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  709. UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
  710. #endif
  711. counter++;
  712. }
  713. return UA_STATUSCODE_GOOD;
  714. }
  715. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  716. static UA_StatusCode
  717. UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
  718. UA_DataSetMessage *dataSetMessage,
  719. UA_DataSetWriter *dataSetWriter) {
  720. UA_PublishedDataSet *currentDataSet =
  721. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  722. if(!currentDataSet)
  723. return UA_STATUSCODE_BADNOTFOUND;
  724. /* Prepare DataSetMessageContent */
  725. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  726. dataSetMessage->header.dataSetMessageValid = true;
  727. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
  728. UA_DataSetField *dsf;
  729. size_t counter = 0;
  730. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  731. /* Sample the value */
  732. UA_DataValue value;
  733. UA_DataValue_init(&value);
  734. UA_PubSubDataSetField_sampleValue(server, dsf, &value);
  735. /* Check if the value has changed */
  736. if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
  737. /* increase fieldCount for current delta message */
  738. dataSetMessage->data.deltaFrameData.fieldCount++;
  739. dataSetWriter->lastSamples[counter].valueChanged = true;
  740. /* Update last stored sample */
  741. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  742. dataSetWriter->lastSamples[counter].value = value;
  743. } else {
  744. UA_DataValue_deleteMembers(&value);
  745. dataSetWriter->lastSamples[counter].valueChanged = false;
  746. }
  747. counter++;
  748. }
  749. /* Allocate DeltaFrameFields */
  750. UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
  751. UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
  752. if(!deltaFields)
  753. return UA_STATUSCODE_BADOUTOFMEMORY;
  754. dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
  755. size_t currentDeltaField = 0;
  756. for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
  757. if(!dataSetWriter->lastSamples[i].valueChanged)
  758. continue;
  759. UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
  760. dff->fieldIndex = (UA_UInt16) i;
  761. UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
  762. dataSetWriter->lastSamples[i].valueChanged = false;
  763. /* Deactivate statuscode? */
  764. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  765. dff->fieldValue.hasStatus = false;
  766. /* Deactivate timestamps? */
  767. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  768. dff->fieldValue.hasSourceTimestamp = false;
  769. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  770. dff->fieldValue.hasServerPicoseconds = false;
  771. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  772. dff->fieldValue.hasServerTimestamp = false;
  773. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  774. dff->fieldValue.hasServerPicoseconds = false;
  775. currentDeltaField++;
  776. }
  777. return UA_STATUSCODE_GOOD;
  778. }
  779. #endif
  780. /**
  781. * Generate a DataSetMessage for the given writer.
  782. *
  783. * @param dataSetWriter ptr to corresponding writer
  784. * @return ptr to generated DataSetMessage
  785. */
  786. static UA_StatusCode
  787. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  788. UA_DataSetWriter *dataSetWriter) {
  789. UA_PublishedDataSet *currentDataSet =
  790. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  791. if(!currentDataSet)
  792. return UA_STATUSCODE_BADNOTFOUND;
  793. /* Reset the message */
  794. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  795. /* store messageType to switch between json or uadp (default) */
  796. UA_UInt16 messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  797. UA_JsonDataSetWriterMessageDataType *jsonDataSetWriterMessageDataType = NULL;
  798. /* The configuration Flags are included
  799. * inside the std. defined UA_UadpDataSetWriterMessageDataType */
  800. UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
  801. UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
  802. if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  803. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  804. (dataSetWriter->config.messageSettings.content.decoded.type == &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
  805. dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
  806. dataSetWriter->config.messageSettings.content.decoded.data;
  807. /* type is UADP */
  808. messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  809. } else if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  810. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  811. (dataSetWriter->config.messageSettings.content.decoded.type == &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE])) {
  812. jsonDataSetWriterMessageDataType = (UA_JsonDataSetWriterMessageDataType *)
  813. dataSetWriter->config.messageSettings.content.decoded.data;
  814. /* type is JSON */
  815. messageType = UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE;
  816. }else {
  817. /* create default flag configuration if no
  818. * UadpDataSetWriterMessageDataType was passed in */
  819. memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
  820. defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
  821. (UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
  822. UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
  823. dataSetWriterMessageDataType = &defaultUadpConfiguration;
  824. }
  825. /* Sanity-test the configuration */
  826. if(dataSetWriterMessageDataType &&
  827. (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
  828. dataSetWriterMessageDataType->dataSetOffset != 0 ||
  829. dataSetWriterMessageDataType->configuredSize !=0 )) {
  830. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  831. "Static DSM configuration not supported. Using defaults");
  832. dataSetWriterMessageDataType->networkMessageNumber = 0;
  833. dataSetWriterMessageDataType->dataSetOffset = 0;
  834. dataSetWriterMessageDataType->configuredSize = 0;
  835. }
  836. /* The field encoding depends on the flags inside the writer config.
  837. * TODO: This can be moved to the encoding layer. */
  838. if(dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_RAWDATAENCODING) {
  839. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
  840. } else if (dataSetWriter->config.dataSetFieldContentMask &
  841. (UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
  842. UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
  843. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
  844. } else {
  845. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
  846. }
  847. if(messageType == UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE){
  848. /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
  849. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION){
  850. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  851. dataSetMessage->header.configVersionMajorVersion =
  852. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  853. }
  854. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION){
  855. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  856. dataSetMessage->header.configVersionMinorVersion =
  857. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  858. }
  859. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  860. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  861. dataSetMessage->header.dataSetMessageSequenceNr =
  862. dataSetWriter->actualDataSetMessageSequenceCount;
  863. }
  864. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  865. dataSetMessage->header.timestampEnabled = true;
  866. dataSetMessage->header.timestamp = UA_DateTime_now();
  867. }
  868. /* TODO: Picoseconds resolution not supported atm */
  869. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
  870. dataSetMessage->header.picoSecondsIncluded = false;
  871. }
  872. /* TODO: Statuscode not supported yet */
  873. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_STATUS){
  874. dataSetMessage->header.statusEnabled = false;
  875. }
  876. }else if(messageType == UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
  877. if(jsonDataSetWriterMessageDataType->dataSetMessageContentMask & UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION){
  878. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  879. dataSetMessage->header.configVersionMajorVersion =
  880. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  881. }
  882. if(jsonDataSetWriterMessageDataType->dataSetMessageContentMask & UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION){
  883. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  884. dataSetMessage->header.configVersionMinorVersion =
  885. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  886. }
  887. if(jsonDataSetWriterMessageDataType->dataSetMessageContentMask & UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  888. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  889. dataSetMessage->header.dataSetMessageSequenceNr =
  890. dataSetWriter->actualDataSetMessageSequenceCount;
  891. }
  892. if(jsonDataSetWriterMessageDataType->dataSetMessageContentMask & UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  893. dataSetMessage->header.timestampEnabled = true;
  894. dataSetMessage->header.timestamp = UA_DateTime_now();
  895. }
  896. /* TODO: Statuscode not supported yet */
  897. if(jsonDataSetWriterMessageDataType->dataSetMessageContentMask & UA_JSONDATASETMESSAGECONTENTMASK_STATUS){
  898. dataSetMessage->header.statusEnabled = false;
  899. }
  900. }
  901. /* Set the sequence count. Automatically rolls over to zero */
  902. dataSetWriter->actualDataSetMessageSequenceCount++;
  903. /* JSON does not differ between deltaframes and keyframes, only keyframes are currently used. */
  904. if(messageType != UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
  905. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  906. /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
  907. if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
  908. dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
  909. /* Remove old samples */
  910. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
  911. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  912. /* Realloc pds dependent memory */
  913. dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
  914. UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
  915. UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  916. if(!newSamplesArray)
  917. return UA_STATUSCODE_BADOUTOFMEMORY;
  918. dataSetWriter->lastSamples = newSamplesArray;
  919. memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  920. dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
  921. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  922. dataSetWriter->deltaFrameCounter = 0;
  923. return UA_STATUSCODE_GOOD;
  924. }
  925. /* The standard defines: if a PDS contains only one fields no delta messages
  926. * should be generated because they need more memory than a keyframe with 1
  927. * field. */
  928. if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
  929. dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
  930. UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
  931. dataSetWriter->deltaFrameCounter++;
  932. return UA_STATUSCODE_GOOD;
  933. }
  934. dataSetWriter->deltaFrameCounter = 1;
  935. #endif
  936. }
  937. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  938. return UA_STATUSCODE_GOOD;
  939. }
  940. static UA_StatusCode
  941. sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
  942. UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
  943. UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
  944. #ifdef UA_ENABLE_JSON_ENCODING
  945. UA_NetworkMessage nm;
  946. memset(&nm, 0, sizeof(UA_NetworkMessage));
  947. nm.version = 1;
  948. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  949. nm.payloadHeaderEnabled = true;
  950. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  951. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  952. nm.payload.dataSetPayload.dataSetMessages = dsm;
  953. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  954. UA_ByteString buf;
  955. size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true);
  956. size_t stackSize = 1;
  957. if(msgSize <= UA_MAX_STACKBUF)
  958. stackSize = msgSize;
  959. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  960. buf.data = stackBuf;
  961. buf.length = msgSize;
  962. if(msgSize > UA_MAX_STACKBUF) {
  963. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  964. if(retval != UA_STATUSCODE_GOOD)
  965. return retval;
  966. }
  967. /* Encode the message */
  968. UA_Byte *bufPos = buf.data;
  969. memset(bufPos, 0, msgSize);
  970. const UA_Byte *bufEnd = &buf.data[buf.length];
  971. retval = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true);
  972. if(retval != UA_STATUSCODE_GOOD) {
  973. if(msgSize > UA_MAX_STACKBUF)
  974. UA_ByteString_deleteMembers(&buf);
  975. return retval;
  976. }
  977. /* Send the prepared messages */
  978. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  979. if(msgSize > UA_MAX_STACKBUF)
  980. UA_ByteString_deleteMembers(&buf);
  981. #endif
  982. return retval;
  983. }
  984. static UA_StatusCode
  985. sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
  986. UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
  987. UA_ExtensionObject *messageSettings,
  988. UA_ExtensionObject *transportSettings) {
  989. if(messageSettings->content.decoded.type !=
  990. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
  991. return UA_STATUSCODE_BADINTERNALERROR;
  992. UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
  993. messageSettings->content.decoded.data;
  994. UA_NetworkMessage nm;
  995. memset(&nm, 0, sizeof(UA_NetworkMessage));
  996. nm.publisherIdEnabled =
  997. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID);
  998. nm.groupHeaderEnabled =
  999. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER);
  1000. nm.groupHeader.writerGroupIdEnabled =
  1001. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID);
  1002. nm.groupHeader.groupVersionEnabled =
  1003. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION);
  1004. nm.groupHeader.networkMessageNumberEnabled =
  1005. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER);
  1006. nm.groupHeader.sequenceNumberEnabled =
  1007. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER);
  1008. nm.payloadHeaderEnabled =
  1009. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
  1010. nm.timestampEnabled =
  1011. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP);
  1012. nm.picosecondsEnabled =
  1013. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS);
  1014. nm.dataSetClassIdEnabled =
  1015. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID);
  1016. nm.promotedFieldsEnabled =
  1017. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS);
  1018. nm.version = 1;
  1019. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1020. if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
  1021. nm.publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
  1022. nm.publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
  1023. } else if (connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
  1024. nm.publisherIdType = UA_PUBLISHERDATATYPE_STRING;
  1025. nm.publisherId.publisherIdString = connection->config->publisherId.string;
  1026. }
  1027. /* Compute the length of the dsm separately for the header */
  1028. UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
  1029. for(UA_Byte i = 0; i < dsmCount; i++)
  1030. dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
  1031. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1032. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1033. nm.groupHeader.writerGroupId = wg->config.writerGroupId;
  1034. nm.groupHeader.networkMessageNumber = 1;
  1035. nm.payload.dataSetPayload.sizes = dsmLengths;
  1036. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1037. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1038. UA_ByteString buf;
  1039. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
  1040. size_t stackSize = 1;
  1041. if(msgSize <= UA_MAX_STACKBUF)
  1042. stackSize = msgSize;
  1043. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1044. buf.data = stackBuf;
  1045. buf.length = msgSize;
  1046. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  1047. if(msgSize > UA_MAX_STACKBUF) {
  1048. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1049. if(retval != UA_STATUSCODE_GOOD)
  1050. return retval;
  1051. }
  1052. /* Encode the message */
  1053. UA_Byte *bufPos = buf.data;
  1054. memset(bufPos, 0, msgSize);
  1055. const UA_Byte *bufEnd = &buf.data[buf.length];
  1056. retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
  1057. if(retval != UA_STATUSCODE_GOOD) {
  1058. if(msgSize > UA_MAX_STACKBUF)
  1059. UA_ByteString_deleteMembers(&buf);
  1060. return retval;
  1061. }
  1062. /* Send the prepared messages */
  1063. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1064. if(msgSize > UA_MAX_STACKBUF)
  1065. UA_ByteString_deleteMembers(&buf);
  1066. return retval;
  1067. }
  1068. /* This callback triggers the collection and publish of NetworkMessages and the
  1069. * contained DataSetMessages. */
  1070. void
  1071. UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1072. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Publish Callback");
  1073. if(!writerGroup) {
  1074. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1075. "Publish failed. WriterGroup not found");
  1076. return;
  1077. }
  1078. /* Nothing to do? */
  1079. if(writerGroup->writersCount <= 0)
  1080. return;
  1081. /* Binary or Json encoding? */
  1082. if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
  1083. writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
  1084. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1085. "Publish failed: Unknown encoding type.");
  1086. return;
  1087. }
  1088. /* Find the connection associated with the writer */
  1089. UA_PubSubConnection *connection =
  1090. UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
  1091. if(!connection) {
  1092. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1093. "Publish failed. PubSubConnection invalid.");
  1094. return;
  1095. }
  1096. /* How many DSM can be sent in one NM? */
  1097. UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
  1098. if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
  1099. maxDSM = UA_BYTE_MAX;
  1100. /* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
  1101. if(maxDSM == 0)
  1102. maxDSM = 1;
  1103. /* It is possible to put several DataSetMessages into one NetworkMessage.
  1104. * But only if they do not contain promoted fields. NM with only DSM are
  1105. * sent out right away. The others are kept in a buffer for "batching". */
  1106. size_t dsmCount = 0;
  1107. UA_DataSetWriter *dsw;
  1108. UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
  1109. UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
  1110. LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
  1111. /* Find the dataset */
  1112. UA_PublishedDataSet *pds =
  1113. UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
  1114. if(!pds) {
  1115. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1116. "PubSub Publish: PublishedDataSet not found");
  1117. continue;
  1118. }
  1119. /* Generate the DSM */
  1120. UA_StatusCode res =
  1121. UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
  1122. if(res != UA_STATUSCODE_GOOD) {
  1123. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1124. "PubSub Publish: DataSetMessage creation failed");
  1125. continue;
  1126. }
  1127. /* Send right away if there is only this DSM in a NM. If promoted fields
  1128. * are contained in the PublishedDataSet, then this DSM must go into a
  1129. * dedicated NM as well. */
  1130. if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
  1131. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1132. res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
  1133. &dsw->config.dataSetWriterId, 1,
  1134. &writerGroup->config.messageSettings,
  1135. &writerGroup->config.transportSettings);
  1136. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1137. res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
  1138. &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
  1139. }
  1140. if(res != UA_STATUSCODE_GOOD)
  1141. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1142. "PubSub Publish: Could not send a NetworkMessage");
  1143. UA_DataSetMessage_free(&dsmStore[dsmCount]);
  1144. continue;
  1145. }
  1146. dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
  1147. dsmCount++;
  1148. }
  1149. /* Send the NetworkMessages with batched DataSetMessages */
  1150. size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
  1151. for(UA_UInt32 i = 0; i < nmCount; i++) {
  1152. UA_Byte nmDsmCount = maxDSM;
  1153. if(i == nmCount - 1)
  1154. nmDsmCount = (UA_Byte)dsmCount % maxDSM;
  1155. UA_StatusCode res3 = UA_STATUSCODE_GOOD;
  1156. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1157. res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
  1158. &dsWriterIds[i * maxDSM], nmDsmCount,
  1159. &writerGroup->config.messageSettings,
  1160. &writerGroup->config.transportSettings);
  1161. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1162. res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
  1163. &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
  1164. }
  1165. if(res3 != UA_STATUSCODE_GOOD)
  1166. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1167. "PubSub Publish: Sending a NetworkMessage failed");
  1168. }
  1169. /* Clean up DSM */
  1170. for(size_t i = 0; i < dsmCount; i++)
  1171. UA_DataSetMessage_free(&dsmStore[i]);
  1172. }
  1173. /* Add new publishCallback. The first execution is triggered directly after
  1174. * creation. */
  1175. UA_StatusCode
  1176. UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1177. UA_StatusCode retval =
  1178. UA_PubSubManager_addRepeatedCallback(server,
  1179. (UA_ServerCallback) UA_WriterGroup_publishCallback,
  1180. writerGroup, writerGroup->config.publishingInterval,
  1181. &writerGroup->publishCallbackId);
  1182. if(retval == UA_STATUSCODE_GOOD)
  1183. writerGroup->publishCallbackIsRegistered = true;
  1184. /* Run once after creation */
  1185. UA_WriterGroup_publishCallback(server, writerGroup);
  1186. return retval;
  1187. }
  1188. #endif /* UA_ENABLE_PUBSUB */