ua_pubsub.c 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright (c) 2017-2018 Fraunhofer IOSB (Author: Andreas Ebner)
  6. * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
  7. */
  8. #include "server/ua_server_internal.h"
  9. #ifdef UA_ENABLE_PUBSUB /* conditional compilation */
  10. #include "ua_pubsub.h"
  11. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  12. #include "ua_pubsub_ns0.h"
  13. #endif
  14. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  15. #include "ua_types_encoding_binary.h"
  16. #endif
  17. #define UA_MAX_STACKBUF 512 /* Max size of network messages on the stack */
  18. /* Forward declaration */
  19. static void
  20. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup);
  21. static void
  22. UA_DataSetField_deleteMembers(UA_DataSetField *field);
  23. /**********************************************/
  24. /* Connection */
  25. /**********************************************/
  26. UA_StatusCode
  27. UA_PubSubConnectionConfig_copy(const UA_PubSubConnectionConfig *src,
  28. UA_PubSubConnectionConfig *dst) {
  29. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  30. memcpy(dst, src, sizeof(UA_PubSubConnectionConfig));
  31. retVal |= UA_String_copy(&src->name, &dst->name);
  32. retVal |= UA_Variant_copy(&src->address, &dst->address);
  33. retVal |= UA_String_copy(&src->transportProfileUri, &dst->transportProfileUri);
  34. retVal |= UA_Variant_copy(&src->connectionTransportSettings, &dst->connectionTransportSettings);
  35. if(src->connectionPropertiesSize > 0){
  36. dst->connectionProperties = (UA_KeyValuePair *)
  37. UA_calloc(src->connectionPropertiesSize, sizeof(UA_KeyValuePair));
  38. if(!dst->connectionProperties){
  39. return UA_STATUSCODE_BADOUTOFMEMORY;
  40. }
  41. for(size_t i = 0; i < src->connectionPropertiesSize; i++){
  42. retVal |= UA_QualifiedName_copy(&src->connectionProperties[i].key,
  43. &dst->connectionProperties[i].key);
  44. retVal |= UA_Variant_copy(&src->connectionProperties[i].value,
  45. &dst->connectionProperties[i].value);
  46. }
  47. }
  48. return retVal;
  49. }
  50. UA_StatusCode
  51. UA_Server_getPubSubConnectionConfig(UA_Server *server, const UA_NodeId connection,
  52. UA_PubSubConnectionConfig *config) {
  53. if(!config)
  54. return UA_STATUSCODE_BADINVALIDARGUMENT;
  55. UA_PubSubConnection *currentPubSubConnection =
  56. UA_PubSubConnection_findConnectionbyId(server, connection);
  57. if(!currentPubSubConnection)
  58. return UA_STATUSCODE_BADNOTFOUND;
  59. UA_PubSubConnectionConfig tmpPubSubConnectionConfig;
  60. //deep copy of the actual config
  61. UA_PubSubConnectionConfig_copy(currentPubSubConnection->config, &tmpPubSubConnectionConfig);
  62. *config = tmpPubSubConnectionConfig;
  63. return UA_STATUSCODE_GOOD;
  64. }
  65. UA_PubSubConnection *
  66. UA_PubSubConnection_findConnectionbyId(UA_Server *server, UA_NodeId connectionIdentifier) {
  67. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  68. if(UA_NodeId_equal(&connectionIdentifier, &server->pubSubManager.connections[i].identifier)){
  69. return &server->pubSubManager.connections[i];
  70. }
  71. }
  72. return NULL;
  73. }
  74. void
  75. UA_PubSubConnectionConfig_deleteMembers(UA_PubSubConnectionConfig *connectionConfig) {
  76. UA_String_deleteMembers(&connectionConfig->name);
  77. UA_String_deleteMembers(&connectionConfig->transportProfileUri);
  78. UA_Variant_deleteMembers(&connectionConfig->connectionTransportSettings);
  79. UA_Variant_deleteMembers(&connectionConfig->address);
  80. for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
  81. UA_QualifiedName_deleteMembers(&connectionConfig->connectionProperties[i].key);
  82. UA_Variant_deleteMembers(&connectionConfig->connectionProperties[i].value);
  83. }
  84. UA_free(connectionConfig->connectionProperties);
  85. }
  86. void
  87. UA_PubSubConnection_deleteMembers(UA_Server *server, UA_PubSubConnection *connection) {
  88. //delete connection config
  89. UA_PubSubConnectionConfig_deleteMembers(connection->config);
  90. //remove contained WriterGroups
  91. UA_WriterGroup *writerGroup, *tmpWriterGroup;
  92. LIST_FOREACH_SAFE(writerGroup, &connection->writerGroups, listEntry, tmpWriterGroup){
  93. UA_Server_removeWriterGroup(server, writerGroup->identifier);
  94. }
  95. UA_NodeId_deleteMembers(&connection->identifier);
  96. if(connection->channel){
  97. connection->channel->close(connection->channel);
  98. }
  99. UA_free(connection->config);
  100. }
  101. UA_StatusCode
  102. UA_Server_addWriterGroup(UA_Server *server, const UA_NodeId connection,
  103. const UA_WriterGroupConfig *writerGroupConfig,
  104. UA_NodeId *writerGroupIdentifier) {
  105. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  106. if(!writerGroupConfig)
  107. return UA_STATUSCODE_BADINVALIDARGUMENT;
  108. //search the connection by the given connectionIdentifier
  109. UA_PubSubConnection *currentConnectionContext =
  110. UA_PubSubConnection_findConnectionbyId(server, connection);
  111. if(!currentConnectionContext)
  112. return UA_STATUSCODE_BADNOTFOUND;
  113. //allocate memory for new WriterGroup
  114. UA_WriterGroup *newWriterGroup = (UA_WriterGroup *) UA_calloc(1, sizeof(UA_WriterGroup));
  115. if (!newWriterGroup)
  116. return UA_STATUSCODE_BADOUTOFMEMORY;
  117. newWriterGroup->linkedConnection = currentConnectionContext->identifier;
  118. UA_PubSubManager_generateUniqueNodeId(server, &newWriterGroup->identifier);
  119. if(writerGroupIdentifier){
  120. UA_NodeId_copy(&newWriterGroup->identifier, writerGroupIdentifier);
  121. }
  122. //deep copy of the config
  123. UA_WriterGroupConfig tmpWriterGroupConfig;
  124. retVal |= UA_WriterGroupConfig_copy(writerGroupConfig, &tmpWriterGroupConfig);
  125. if(!tmpWriterGroupConfig.messageSettings.content.decoded.type) {
  126. UA_UadpWriterGroupMessageDataType *wgm = UA_UadpWriterGroupMessageDataType_new();
  127. tmpWriterGroupConfig.messageSettings.content.decoded.data = wgm;
  128. tmpWriterGroupConfig.messageSettings.content.decoded.type =
  129. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
  130. tmpWriterGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
  131. }
  132. newWriterGroup->config = tmpWriterGroupConfig;
  133. retVal |= UA_WriterGroup_addPublishCallback(server, newWriterGroup);
  134. LIST_INSERT_HEAD(&currentConnectionContext->writerGroups, newWriterGroup, listEntry);
  135. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  136. addWriterGroupRepresentation(server, newWriterGroup);
  137. #endif
  138. return retVal;
  139. }
  140. UA_StatusCode
  141. UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
  142. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  143. if(!wg)
  144. return UA_STATUSCODE_BADNOTFOUND;
  145. UA_PubSubConnection *connection =
  146. UA_PubSubConnection_findConnectionbyId(server, wg->linkedConnection);
  147. if(!connection)
  148. return UA_STATUSCODE_BADNOTFOUND;
  149. //unregister the publish callback
  150. UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
  151. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  152. removeGroupRepresentation(server, wg);
  153. #endif
  154. UA_WriterGroup_deleteMembers(server, wg);
  155. LIST_REMOVE(wg, listEntry);
  156. UA_free(wg);
  157. return UA_STATUSCODE_GOOD;
  158. }
  159. /**********************************************/
  160. /* PublishedDataSet */
  161. /**********************************************/
  162. UA_StatusCode
  163. UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
  164. UA_PublishedDataSetConfig *dst) {
  165. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  166. memcpy(dst, src, sizeof(UA_PublishedDataSetConfig));
  167. retVal |= UA_String_copy(&src->name, &dst->name);
  168. switch(src->publishedDataSetType){
  169. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  170. //no additional items
  171. break;
  172. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  173. if (src->config.itemsTemplate.variablesToAddSize > 0){
  174. dst->config.itemsTemplate.variablesToAdd = (UA_PublishedVariableDataType *) UA_calloc(
  175. src->config.itemsTemplate.variablesToAddSize, sizeof(UA_PublishedVariableDataType));
  176. }
  177. for(size_t i = 0; i < src->config.itemsTemplate.variablesToAddSize; i++){
  178. retVal |= UA_PublishedVariableDataType_copy(&src->config.itemsTemplate.variablesToAdd[i],
  179. &dst->config.itemsTemplate.variablesToAdd[i]);
  180. }
  181. retVal |= UA_DataSetMetaDataType_copy(&src->config.itemsTemplate.metaData,
  182. &dst->config.itemsTemplate.metaData);
  183. break;
  184. default:
  185. return UA_STATUSCODE_BADINVALIDARGUMENT;
  186. }
  187. return retVal;
  188. }
  189. UA_StatusCode
  190. UA_Server_getPublishedDataSetConfig(UA_Server *server, const UA_NodeId pds,
  191. UA_PublishedDataSetConfig *config){
  192. if(!config)
  193. return UA_STATUSCODE_BADINVALIDARGUMENT;
  194. UA_PublishedDataSet *currentPublishedDataSet = UA_PublishedDataSet_findPDSbyId(server, pds);
  195. if(!currentPublishedDataSet)
  196. return UA_STATUSCODE_BADNOTFOUND;
  197. UA_PublishedDataSetConfig tmpPublishedDataSetConfig;
  198. //deep copy of the actual config
  199. UA_PublishedDataSetConfig_copy(&currentPublishedDataSet->config, &tmpPublishedDataSetConfig);
  200. *config = tmpPublishedDataSetConfig;
  201. return UA_STATUSCODE_GOOD;
  202. }
  203. UA_PublishedDataSet *
  204. UA_PublishedDataSet_findPDSbyId(UA_Server *server, UA_NodeId identifier){
  205. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  206. if(UA_NodeId_equal(&server->pubSubManager.publishedDataSets[i].identifier, &identifier)){
  207. return &server->pubSubManager.publishedDataSets[i];
  208. }
  209. }
  210. return NULL;
  211. }
  212. void
  213. UA_PublishedDataSetConfig_deleteMembers(UA_PublishedDataSetConfig *pdsConfig){
  214. //delete pds config
  215. UA_String_deleteMembers(&pdsConfig->name);
  216. switch (pdsConfig->publishedDataSetType){
  217. case UA_PUBSUB_DATASET_PUBLISHEDITEMS:
  218. //no additional items
  219. break;
  220. case UA_PUBSUB_DATASET_PUBLISHEDITEMS_TEMPLATE:
  221. if (pdsConfig->config.itemsTemplate.variablesToAddSize > 0){
  222. for(size_t i = 0; i < pdsConfig->config.itemsTemplate.variablesToAddSize; i++){
  223. UA_PublishedVariableDataType_deleteMembers(&pdsConfig->config.itemsTemplate.variablesToAdd[i]);
  224. }
  225. UA_free(pdsConfig->config.itemsTemplate.variablesToAdd);
  226. }
  227. UA_DataSetMetaDataType_deleteMembers(&pdsConfig->config.itemsTemplate.metaData);
  228. break;
  229. default:
  230. break;
  231. }
  232. }
  233. void
  234. UA_PublishedDataSet_deleteMembers(UA_Server *server, UA_PublishedDataSet *publishedDataSet){
  235. UA_PublishedDataSetConfig_deleteMembers(&publishedDataSet->config);
  236. //delete PDS
  237. UA_DataSetMetaDataType_deleteMembers(&publishedDataSet->dataSetMetaData);
  238. UA_DataSetField *field, *tmpField;
  239. LIST_FOREACH_SAFE(field, &publishedDataSet->fields, listEntry, tmpField) {
  240. UA_Server_removeDataSetField(server, field->identifier);
  241. }
  242. UA_NodeId_deleteMembers(&publishedDataSet->identifier);
  243. }
  244. UA_DataSetFieldResult
  245. UA_Server_addDataSetField(UA_Server *server, const UA_NodeId publishedDataSet,
  246. const UA_DataSetFieldConfig *fieldConfig,
  247. UA_NodeId *fieldIdentifier) {
  248. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  249. UA_DataSetFieldResult result = {UA_STATUSCODE_BADINVALIDARGUMENT, {0, 0}};
  250. if(!fieldConfig)
  251. return result;
  252. UA_PublishedDataSet *currentDataSet = UA_PublishedDataSet_findPDSbyId(server, publishedDataSet);
  253. if(currentDataSet == NULL){
  254. result.result = UA_STATUSCODE_BADNOTFOUND;
  255. return result;
  256. }
  257. if(currentDataSet->config.publishedDataSetType != UA_PUBSUB_DATASET_PUBLISHEDITEMS){
  258. result.result = UA_STATUSCODE_BADNOTIMPLEMENTED;
  259. return result;
  260. }
  261. UA_DataSetField *newField = (UA_DataSetField *) UA_calloc(1, sizeof(UA_DataSetField));
  262. if(!newField){
  263. result.result = UA_STATUSCODE_BADINTERNALERROR;
  264. return result;
  265. }
  266. UA_DataSetFieldConfig tmpFieldConfig;
  267. retVal |= UA_DataSetFieldConfig_copy(fieldConfig, &tmpFieldConfig);
  268. newField->config = tmpFieldConfig;
  269. UA_PubSubManager_generateUniqueNodeId(server, &newField->identifier);
  270. if(fieldIdentifier != NULL){
  271. UA_NodeId_copy(&newField->identifier, fieldIdentifier);
  272. }
  273. newField->publishedDataSet = currentDataSet->identifier;
  274. //update major version of parent published data set
  275. currentDataSet->dataSetMetaData.configurationVersion.majorVersion = UA_PubSubConfigurationVersionTimeDifference();
  276. LIST_INSERT_HEAD(&currentDataSet->fields, newField, listEntry);
  277. if(newField->config.field.variable.promotedField)
  278. currentDataSet->promotedFieldsCount++;
  279. currentDataSet->fieldSize++;
  280. result.result = retVal;
  281. result.configurationVersion.majorVersion = currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  282. result.configurationVersion.minorVersion = currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  283. return result;
  284. }
  285. UA_DataSetFieldResult
  286. UA_Server_removeDataSetField(UA_Server *server, const UA_NodeId dsf) {
  287. UA_DataSetField *currentField = UA_DataSetField_findDSFbyId(server, dsf);
  288. UA_DataSetFieldResult result = {UA_STATUSCODE_BADNOTFOUND, {0, 0}};
  289. if(!currentField)
  290. return result;
  291. UA_PublishedDataSet *parentPublishedDataSet =
  292. UA_PublishedDataSet_findPDSbyId(server, currentField->publishedDataSet);
  293. if(!parentPublishedDataSet)
  294. return result;
  295. parentPublishedDataSet->fieldSize--;
  296. if(currentField->config.field.variable.promotedField)
  297. parentPublishedDataSet->promotedFieldsCount--;
  298. /* update major version of PublishedDataSet */
  299. parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion =
  300. UA_PubSubConfigurationVersionTimeDifference();
  301. UA_DataSetField_deleteMembers(currentField);
  302. LIST_REMOVE(currentField, listEntry);
  303. UA_free(currentField);
  304. result.result = UA_STATUSCODE_GOOD;
  305. result.configurationVersion.majorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.majorVersion;
  306. result.configurationVersion.minorVersion = parentPublishedDataSet->dataSetMetaData.configurationVersion.minorVersion;
  307. return result;
  308. }
  309. /**********************************************/
  310. /* DataSetWriter */
  311. /**********************************************/
  312. UA_StatusCode
  313. UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
  314. UA_DataSetWriterConfig *dst){
  315. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  316. memcpy(dst, src, sizeof(UA_DataSetWriterConfig));
  317. retVal |= UA_String_copy(&src->name, &dst->name);
  318. retVal |= UA_String_copy(&src->dataSetName, &dst->dataSetName);
  319. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  320. dst->dataSetWriterProperties = (UA_KeyValuePair *)
  321. UA_calloc(src->dataSetWriterPropertiesSize, sizeof(UA_KeyValuePair));
  322. if(!dst->dataSetWriterProperties)
  323. return UA_STATUSCODE_BADOUTOFMEMORY;
  324. for(size_t i = 0; i < src->dataSetWriterPropertiesSize; i++){
  325. retVal |= UA_KeyValuePair_copy(&src->dataSetWriterProperties[i], &dst->dataSetWriterProperties[i]);
  326. }
  327. return retVal;
  328. }
  329. UA_StatusCode
  330. UA_Server_getDataSetWriterConfig(UA_Server *server, const UA_NodeId dsw,
  331. UA_DataSetWriterConfig *config){
  332. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  333. if(!config)
  334. return UA_STATUSCODE_BADINVALIDARGUMENT;
  335. UA_DataSetWriter *currentDataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  336. if(!currentDataSetWriter)
  337. return UA_STATUSCODE_BADNOTFOUND;
  338. UA_DataSetWriterConfig tmpWriterConfig;
  339. //deep copy of the actual config
  340. retVal |= UA_DataSetWriterConfig_copy(&currentDataSetWriter->config, &tmpWriterConfig);
  341. *config = tmpWriterConfig;
  342. return retVal;
  343. }
  344. UA_DataSetWriter *
  345. UA_DataSetWriter_findDSWbyId(UA_Server *server, UA_NodeId identifier) {
  346. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  347. UA_WriterGroup *tmpWriterGroup;
  348. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry){
  349. UA_DataSetWriter *tmpWriter;
  350. LIST_FOREACH(tmpWriter, &tmpWriterGroup->writers, listEntry){
  351. if(UA_NodeId_equal(&tmpWriter->identifier, &identifier)){
  352. return tmpWriter;
  353. }
  354. }
  355. }
  356. }
  357. return NULL;
  358. }
  359. void
  360. UA_DataSetWriterConfig_deleteMembers(UA_DataSetWriterConfig *pdsConfig) {
  361. UA_String_deleteMembers(&pdsConfig->name);
  362. UA_String_deleteMembers(&pdsConfig->dataSetName);
  363. for(size_t i = 0; i < pdsConfig->dataSetWriterPropertiesSize; i++){
  364. UA_KeyValuePair_deleteMembers(&pdsConfig->dataSetWriterProperties[i]);
  365. }
  366. UA_free(pdsConfig->dataSetWriterProperties);
  367. UA_ExtensionObject_deleteMembers(&pdsConfig->messageSettings);
  368. }
  369. static void
  370. UA_DataSetWriter_deleteMembers(UA_Server *server, UA_DataSetWriter *dataSetWriter) {
  371. UA_DataSetWriterConfig_deleteMembers(&dataSetWriter->config);
  372. //delete DataSetWriter
  373. UA_NodeId_deleteMembers(&dataSetWriter->identifier);
  374. UA_NodeId_deleteMembers(&dataSetWriter->linkedWriterGroup);
  375. UA_NodeId_deleteMembers(&dataSetWriter->connectedDataSet);
  376. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  377. //delete lastSamples store
  378. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++) {
  379. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  380. }
  381. UA_free(dataSetWriter->lastSamples);
  382. dataSetWriter->lastSamples = NULL;
  383. dataSetWriter->lastSamplesCount = 0;
  384. #endif
  385. }
  386. /**********************************************/
  387. /* WriterGroup */
  388. /**********************************************/
  389. UA_StatusCode
  390. UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
  391. UA_WriterGroupConfig *dst){
  392. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  393. memcpy(dst, src, sizeof(UA_WriterGroupConfig));
  394. retVal |= UA_String_copy(&src->name, &dst->name);
  395. retVal |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
  396. retVal |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
  397. dst->groupProperties = (UA_KeyValuePair *) UA_calloc(src->groupPropertiesSize, sizeof(UA_KeyValuePair));
  398. if(!dst->groupProperties)
  399. return UA_STATUSCODE_BADOUTOFMEMORY;
  400. for(size_t i = 0; i < src->groupPropertiesSize; i++){
  401. retVal |= UA_KeyValuePair_copy(&src->groupProperties[i], &dst->groupProperties[i]);
  402. }
  403. return retVal;
  404. }
  405. UA_StatusCode
  406. UA_Server_getWriterGroupConfig(UA_Server *server, const UA_NodeId writerGroup,
  407. UA_WriterGroupConfig *config){
  408. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  409. if(!config)
  410. return UA_STATUSCODE_BADINVALIDARGUMENT;
  411. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroup);
  412. if(!currentWriterGroup){
  413. return UA_STATUSCODE_BADNOTFOUND;
  414. }
  415. UA_WriterGroupConfig tmpWriterGroupConfig;
  416. //deep copy of the actual config
  417. retVal |= UA_WriterGroupConfig_copy(&currentWriterGroup->config, &tmpWriterGroupConfig);
  418. *config = tmpWriterGroupConfig;
  419. return retVal;
  420. }
  421. UA_StatusCode
  422. UA_Server_updateWriterGroupConfig(UA_Server *server, UA_NodeId writerGroupIdentifier,
  423. const UA_WriterGroupConfig *config){
  424. if(!config)
  425. return UA_STATUSCODE_BADINVALIDARGUMENT;
  426. UA_WriterGroup *currentWriterGroup = UA_WriterGroup_findWGbyId(server, writerGroupIdentifier);
  427. if(!currentWriterGroup)
  428. return UA_STATUSCODE_BADNOTFOUND;
  429. //The update functionality will be extended during the next PubSub batches.
  430. //Currently is only a change of the publishing interval possible.
  431. if(currentWriterGroup->config.publishingInterval != config->publishingInterval) {
  432. UA_PubSubManager_removeRepeatedPubSubCallback(server, currentWriterGroup->publishCallbackId);
  433. currentWriterGroup->config.publishingInterval = config->publishingInterval;
  434. UA_WriterGroup_addPublishCallback(server, currentWriterGroup);
  435. } else if(currentWriterGroup->config.priority != config->priority) {
  436. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  437. "No or unsupported WriterGroup update.");
  438. }
  439. return UA_STATUSCODE_GOOD;
  440. }
  441. UA_WriterGroup *
  442. UA_WriterGroup_findWGbyId(UA_Server *server, UA_NodeId identifier){
  443. for(size_t i = 0; i < server->pubSubManager.connectionsSize; i++){
  444. UA_WriterGroup *tmpWriterGroup;
  445. LIST_FOREACH(tmpWriterGroup, &server->pubSubManager.connections[i].writerGroups, listEntry) {
  446. if(UA_NodeId_equal(&identifier, &tmpWriterGroup->identifier)){
  447. return tmpWriterGroup;
  448. }
  449. }
  450. }
  451. return NULL;
  452. }
  453. void
  454. UA_WriterGroupConfig_deleteMembers(UA_WriterGroupConfig *writerGroupConfig){
  455. //delete writerGroup config
  456. UA_String_deleteMembers(&writerGroupConfig->name);
  457. UA_ExtensionObject_deleteMembers(&writerGroupConfig->transportSettings);
  458. UA_ExtensionObject_deleteMembers(&writerGroupConfig->messageSettings);
  459. for(size_t i = 0; i < writerGroupConfig->groupPropertiesSize; i++){
  460. UA_KeyValuePair_deleteMembers(&writerGroupConfig->groupProperties[i]);
  461. }
  462. UA_free(writerGroupConfig->groupProperties);
  463. }
  464. static void
  465. UA_WriterGroup_deleteMembers(UA_Server *server, UA_WriterGroup *writerGroup) {
  466. UA_WriterGroupConfig_deleteMembers(&writerGroup->config);
  467. //delete WriterGroup
  468. //delete all writers. Therefore removeDataSetWriter is called from PublishedDataSet
  469. UA_DataSetWriter *dataSetWriter, *tmpDataSetWriter;
  470. LIST_FOREACH_SAFE(dataSetWriter, &writerGroup->writers, listEntry, tmpDataSetWriter){
  471. UA_Server_removeDataSetWriter(server, dataSetWriter->identifier);
  472. }
  473. UA_NodeId_deleteMembers(&writerGroup->linkedConnection);
  474. UA_NodeId_deleteMembers(&writerGroup->identifier);
  475. }
  476. UA_StatusCode
  477. UA_Server_addDataSetWriter(UA_Server *server,
  478. const UA_NodeId writerGroup, const UA_NodeId dataSet,
  479. const UA_DataSetWriterConfig *dataSetWriterConfig,
  480. UA_NodeId *writerIdentifier) {
  481. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  482. if(!dataSetWriterConfig)
  483. return UA_STATUSCODE_BADINVALIDARGUMENT;
  484. UA_PublishedDataSet *currentDataSetContext = UA_PublishedDataSet_findPDSbyId(server, dataSet);
  485. if(!currentDataSetContext)
  486. return UA_STATUSCODE_BADNOTFOUND;
  487. UA_WriterGroup *wg = UA_WriterGroup_findWGbyId(server, writerGroup);
  488. if(!wg)
  489. return UA_STATUSCODE_BADNOTFOUND;
  490. UA_DataSetWriter *newDataSetWriter = (UA_DataSetWriter *) UA_calloc(1, sizeof(UA_DataSetWriter));
  491. if(!newDataSetWriter)
  492. return UA_STATUSCODE_BADOUTOFMEMORY;
  493. //copy the config into the new dataSetWriter
  494. UA_DataSetWriterConfig tmpDataSetWriterConfig;
  495. retVal |= UA_DataSetWriterConfig_copy(dataSetWriterConfig, &tmpDataSetWriterConfig);
  496. newDataSetWriter->config = tmpDataSetWriterConfig;
  497. //save the current version of the connected PublishedDataSet
  498. newDataSetWriter->connectedDataSetVersion = currentDataSetContext->dataSetMetaData.configurationVersion;
  499. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  500. //initialize the queue for the last values
  501. newDataSetWriter->lastSamples = (UA_DataSetWriterSample * )
  502. UA_calloc(currentDataSetContext->fieldSize, sizeof(UA_DataSetWriterSample));
  503. if(!newDataSetWriter->lastSamples) {
  504. UA_DataSetWriterConfig_deleteMembers(&newDataSetWriter->config);
  505. UA_free(newDataSetWriter);
  506. return UA_STATUSCODE_BADOUTOFMEMORY;
  507. }
  508. newDataSetWriter->lastSamplesCount = currentDataSetContext->fieldSize;
  509. #endif
  510. //connect PublishedDataSet with DataSetWriter
  511. newDataSetWriter->connectedDataSet = currentDataSetContext->identifier;
  512. newDataSetWriter->linkedWriterGroup = wg->identifier;
  513. UA_PubSubManager_generateUniqueNodeId(server, &newDataSetWriter->identifier);
  514. if(writerIdentifier != NULL)
  515. UA_NodeId_copy(&newDataSetWriter->identifier, writerIdentifier);
  516. //add the new writer to the group
  517. LIST_INSERT_HEAD(&wg->writers, newDataSetWriter, listEntry);
  518. wg->writersCount++;
  519. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  520. addDataSetWriterRepresentation(server, newDataSetWriter);
  521. #endif
  522. return retVal;
  523. }
  524. UA_StatusCode
  525. UA_Server_removeDataSetWriter(UA_Server *server, const UA_NodeId dsw){
  526. UA_DataSetWriter *dataSetWriter = UA_DataSetWriter_findDSWbyId(server, dsw);
  527. if(!dataSetWriter)
  528. return UA_STATUSCODE_BADNOTFOUND;
  529. UA_WriterGroup *linkedWriterGroup = UA_WriterGroup_findWGbyId(server, dataSetWriter->linkedWriterGroup);
  530. if(!linkedWriterGroup)
  531. return UA_STATUSCODE_BADNOTFOUND;
  532. linkedWriterGroup->writersCount--;
  533. #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
  534. removeDataSetWriterRepresentation(server, dataSetWriter);
  535. #endif
  536. //remove DataSetWriter from group
  537. UA_DataSetWriter_deleteMembers(server, dataSetWriter);
  538. LIST_REMOVE(dataSetWriter, listEntry);
  539. UA_free(dataSetWriter);
  540. return UA_STATUSCODE_GOOD;
  541. }
  542. /**********************************************/
  543. /* DataSetField */
  544. /**********************************************/
  545. UA_StatusCode
  546. UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src, UA_DataSetFieldConfig *dst){
  547. memcpy(dst, src, sizeof(UA_DataSetFieldConfig));
  548. if(src->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE) {
  549. UA_String_copy(&src->field.variable.fieldNameAlias, &dst->field.variable.fieldNameAlias);
  550. UA_PublishedVariableDataType_copy(&src->field.variable.publishParameters,
  551. &dst->field.variable.publishParameters);
  552. } else {
  553. return UA_STATUSCODE_BADNOTSUPPORTED;
  554. }
  555. return UA_STATUSCODE_GOOD;
  556. }
  557. UA_StatusCode
  558. UA_Server_getDataSetFieldConfig(UA_Server *server, const UA_NodeId dsf,
  559. UA_DataSetFieldConfig *config) {
  560. UA_StatusCode retVal = UA_STATUSCODE_GOOD;
  561. if(!config)
  562. return UA_STATUSCODE_BADINVALIDARGUMENT;
  563. UA_DataSetField *currentDataSetField = UA_DataSetField_findDSFbyId(server, dsf);
  564. if(!currentDataSetField)
  565. return UA_STATUSCODE_BADNOTFOUND;
  566. UA_DataSetFieldConfig tmpFieldConfig;
  567. //deep copy of the actual config
  568. retVal |= UA_DataSetFieldConfig_copy(&currentDataSetField->config, &tmpFieldConfig);
  569. *config = tmpFieldConfig;
  570. return retVal;
  571. }
  572. UA_DataSetField *
  573. UA_DataSetField_findDSFbyId(UA_Server *server, UA_NodeId identifier) {
  574. for(size_t i = 0; i < server->pubSubManager.publishedDataSetsSize; i++){
  575. UA_DataSetField *tmpField;
  576. LIST_FOREACH(tmpField, &server->pubSubManager.publishedDataSets[i].fields, listEntry){
  577. if(UA_NodeId_equal(&tmpField->identifier, &identifier)){
  578. return tmpField;
  579. }
  580. }
  581. }
  582. return NULL;
  583. }
  584. void
  585. UA_DataSetFieldConfig_deleteMembers(UA_DataSetFieldConfig *dataSetFieldConfig){
  586. if(dataSetFieldConfig->dataSetFieldType == UA_PUBSUB_DATASETFIELD_VARIABLE){
  587. UA_String_deleteMembers(&dataSetFieldConfig->field.variable.fieldNameAlias);
  588. UA_PublishedVariableDataType_deleteMembers(&dataSetFieldConfig->field.variable.publishParameters);
  589. }
  590. }
  591. static void
  592. UA_DataSetField_deleteMembers(UA_DataSetField *field) {
  593. UA_DataSetFieldConfig_deleteMembers(&field->config);
  594. //delete DataSetField
  595. UA_NodeId_deleteMembers(&field->identifier);
  596. UA_NodeId_deleteMembers(&field->publishedDataSet);
  597. UA_FieldMetaData_deleteMembers(&field->fieldMetaData);
  598. }
  599. /*********************************************************/
  600. /* PublishValues handling */
  601. /*********************************************************/
  602. /**
  603. * Compare two variants. Internally used for value change detection.
  604. *
  605. * @return true if the value has changed
  606. */
  607. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  608. static UA_Boolean
  609. valueChangedVariant(UA_Variant *oldValue, UA_Variant *newValue){
  610. if(! (oldValue && newValue))
  611. return false;
  612. UA_ByteString *oldValueEncoding = UA_ByteString_new(), *newValueEncoding = UA_ByteString_new();
  613. size_t oldValueEncodingSize, newValueEncodingSize;
  614. oldValueEncodingSize = UA_calcSizeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT]);
  615. newValueEncodingSize = UA_calcSizeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT]);
  616. if((oldValueEncodingSize == 0) || (newValueEncodingSize == 0))
  617. return false;
  618. if(oldValueEncodingSize != newValueEncodingSize)
  619. return true;
  620. if(UA_ByteString_allocBuffer(oldValueEncoding, oldValueEncodingSize) != UA_STATUSCODE_GOOD)
  621. return false;
  622. if(UA_ByteString_allocBuffer(newValueEncoding, newValueEncodingSize) != UA_STATUSCODE_GOOD)
  623. return false;
  624. UA_Byte *bufPosOldValue = oldValueEncoding->data;
  625. const UA_Byte *bufEndOldValue = &oldValueEncoding->data[oldValueEncoding->length];
  626. UA_Byte *bufPosNewValue = newValueEncoding->data;
  627. const UA_Byte *bufEndNewValue = &newValueEncoding->data[newValueEncoding->length];
  628. if(UA_encodeBinary(oldValue, &UA_TYPES[UA_TYPES_VARIANT],
  629. &bufPosOldValue, &bufEndOldValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  630. return false;
  631. }
  632. if(UA_encodeBinary(newValue, &UA_TYPES[UA_TYPES_VARIANT],
  633. &bufPosNewValue, &bufEndNewValue, NULL, NULL) != UA_STATUSCODE_GOOD){
  634. return false;
  635. }
  636. oldValueEncoding->length = (uintptr_t)bufPosOldValue - (uintptr_t)oldValueEncoding->data;
  637. newValueEncoding->length = (uintptr_t)bufPosNewValue - (uintptr_t)newValueEncoding->data;
  638. UA_Boolean compareResult = !UA_ByteString_equal(oldValueEncoding, newValueEncoding);
  639. UA_ByteString_delete(oldValueEncoding);
  640. UA_ByteString_delete(newValueEncoding);
  641. return compareResult;
  642. }
  643. #endif
  644. /**
  645. * Obtain the latest value for a specific DataSetField. This method is currently
  646. * called inside the DataSetMessage generation process.
  647. */
  648. static void
  649. UA_PubSubDataSetField_sampleValue(UA_Server *server, UA_DataSetField *field,
  650. UA_DataValue *value) {
  651. /* Read the value */
  652. UA_ReadValueId rvid;
  653. UA_ReadValueId_init(&rvid);
  654. rvid.nodeId = field->config.field.variable.publishParameters.publishedVariable;
  655. rvid.attributeId = field->config.field.variable.publishParameters.attributeId;
  656. rvid.indexRange = field->config.field.variable.publishParameters.indexRange;
  657. *value = UA_Server_read(server, &rvid, UA_TIMESTAMPSTORETURN_BOTH);
  658. }
  659. static UA_StatusCode
  660. UA_PubSubDataSetWriter_generateKeyFrameMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  661. UA_DataSetWriter *dataSetWriter) {
  662. UA_PublishedDataSet *currentDataSet =
  663. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  664. if(!currentDataSet)
  665. return UA_STATUSCODE_BADNOTFOUND;
  666. /* Prepare DataSetMessageContent */
  667. dataSetMessage->header.dataSetMessageValid = true;
  668. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
  669. dataSetMessage->data.keyFrameData.fieldCount = currentDataSet->fieldSize;
  670. dataSetMessage->data.keyFrameData.dataSetFields = (UA_DataValue *)
  671. UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
  672. if(!dataSetMessage->data.keyFrameData.dataSetFields)
  673. return UA_STATUSCODE_BADOUTOFMEMORY;
  674. #ifdef UA_ENABLE_JSON_ENCODING
  675. /* json: insert fieldnames used as json keys */
  676. dataSetMessage->data.keyFrameData.fieldNames =
  677. (UA_String *)UA_Array_new(currentDataSet->fieldSize, &UA_TYPES[UA_TYPES_STRING]);
  678. if(!dataSetMessage->data.keyFrameData.fieldNames)
  679. return UA_STATUSCODE_BADOUTOFMEMORY;
  680. #endif
  681. /* Loop over the fields */
  682. size_t counter = 0;
  683. UA_DataSetField *dsf;
  684. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  685. #ifdef UA_ENABLE_JSON_ENCODING
  686. /* json: store the fieldNameAlias*/
  687. UA_String_copy(&dsf->config.field.variable.fieldNameAlias,
  688. &dataSetMessage->data.keyFrameData.fieldNames[counter]);
  689. #endif
  690. /* Sample the value */
  691. UA_DataValue *dfv = &dataSetMessage->data.keyFrameData.dataSetFields[counter];
  692. UA_PubSubDataSetField_sampleValue(server, dsf, dfv);
  693. /* Deactivate statuscode? */
  694. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  695. dfv->hasStatus = false;
  696. /* Deactivate timestamps */
  697. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  698. dfv->hasSourceTimestamp = false;
  699. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  700. dfv->hasSourcePicoseconds = false;
  701. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  702. dfv->hasServerTimestamp = false;
  703. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  704. dfv->hasServerPicoseconds = false;
  705. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  706. /* Update lastValue store */
  707. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  708. UA_DataValue_copy(dfv, &dataSetWriter->lastSamples[counter].value);
  709. #endif
  710. counter++;
  711. }
  712. return UA_STATUSCODE_GOOD;
  713. }
  714. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  715. static UA_StatusCode
  716. UA_PubSubDataSetWriter_generateDeltaFrameMessage(UA_Server *server,
  717. UA_DataSetMessage *dataSetMessage,
  718. UA_DataSetWriter *dataSetWriter) {
  719. UA_PublishedDataSet *currentDataSet =
  720. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  721. if(!currentDataSet)
  722. return UA_STATUSCODE_BADNOTFOUND;
  723. /* Prepare DataSetMessageContent */
  724. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  725. dataSetMessage->header.dataSetMessageValid = true;
  726. dataSetMessage->header.dataSetMessageType = UA_DATASETMESSAGE_DATADELTAFRAME;
  727. UA_DataSetField *dsf;
  728. size_t counter = 0;
  729. LIST_FOREACH(dsf, &currentDataSet->fields, listEntry) {
  730. /* Sample the value */
  731. UA_DataValue value;
  732. UA_DataValue_init(&value);
  733. UA_PubSubDataSetField_sampleValue(server, dsf, &value);
  734. /* Check if the value has changed */
  735. if(valueChangedVariant(&dataSetWriter->lastSamples[counter].value.value, &value.value)) {
  736. /* increase fieldCount for current delta message */
  737. dataSetMessage->data.deltaFrameData.fieldCount++;
  738. dataSetWriter->lastSamples[counter].valueChanged = true;
  739. /* Update last stored sample */
  740. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[counter].value);
  741. dataSetWriter->lastSamples[counter].value = value;
  742. } else {
  743. UA_DataValue_deleteMembers(&value);
  744. dataSetWriter->lastSamples[counter].valueChanged = false;
  745. }
  746. counter++;
  747. }
  748. /* Allocate DeltaFrameFields */
  749. UA_DataSetMessage_DeltaFrameField *deltaFields = (UA_DataSetMessage_DeltaFrameField *)
  750. UA_calloc(dataSetMessage->data.deltaFrameData.fieldCount, sizeof(UA_DataSetMessage_DeltaFrameField));
  751. if(!deltaFields)
  752. return UA_STATUSCODE_BADOUTOFMEMORY;
  753. dataSetMessage->data.deltaFrameData.deltaFrameFields = deltaFields;
  754. size_t currentDeltaField = 0;
  755. for(size_t i = 0; i < currentDataSet->fieldSize; i++) {
  756. if(!dataSetWriter->lastSamples[i].valueChanged)
  757. continue;
  758. UA_DataSetMessage_DeltaFrameField *dff = &deltaFields[currentDeltaField];
  759. dff->fieldIndex = (UA_UInt16) i;
  760. UA_DataValue_copy(&dataSetWriter->lastSamples[i].value, &dff->fieldValue);
  761. dataSetWriter->lastSamples[i].valueChanged = false;
  762. /* Deactivate statuscode? */
  763. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
  764. dff->fieldValue.hasStatus = false;
  765. /* Deactivate timestamps? */
  766. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
  767. dff->fieldValue.hasSourceTimestamp = false;
  768. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
  769. dff->fieldValue.hasServerPicoseconds = false;
  770. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
  771. dff->fieldValue.hasServerTimestamp = false;
  772. if((dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
  773. dff->fieldValue.hasServerPicoseconds = false;
  774. currentDeltaField++;
  775. }
  776. return UA_STATUSCODE_GOOD;
  777. }
  778. #endif
  779. /**
  780. * Generate a DataSetMessage for the given writer.
  781. *
  782. * @param dataSetWriter ptr to corresponding writer
  783. * @return ptr to generated DataSetMessage
  784. */
  785. static UA_StatusCode
  786. UA_DataSetWriter_generateDataSetMessage(UA_Server *server, UA_DataSetMessage *dataSetMessage,
  787. UA_DataSetWriter *dataSetWriter) {
  788. UA_PublishedDataSet *currentDataSet =
  789. UA_PublishedDataSet_findPDSbyId(server, dataSetWriter->connectedDataSet);
  790. if(!currentDataSet)
  791. return UA_STATUSCODE_BADNOTFOUND;
  792. /* Reset the message */
  793. memset(dataSetMessage, 0, sizeof(UA_DataSetMessage));
  794. /* store messageType to switch between json or uadp (default) */
  795. UA_UInt16 messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  796. UA_JsonDataSetWriterMessageDataType *jsonDataSetWriterMessageDataType = NULL;
  797. /* The configuration Flags are included
  798. * inside the std. defined UA_UadpDataSetWriterMessageDataType */
  799. UA_UadpDataSetWriterMessageDataType defaultUadpConfiguration;
  800. UA_UadpDataSetWriterMessageDataType *dataSetWriterMessageDataType = NULL;
  801. if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  802. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  803. (dataSetWriter->config.messageSettings.content.decoded.type == &UA_TYPES[UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE])) {
  804. dataSetWriterMessageDataType = (UA_UadpDataSetWriterMessageDataType *)
  805. dataSetWriter->config.messageSettings.content.decoded.data;
  806. /* type is UADP */
  807. messageType = UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE;
  808. } else if((dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED ||
  809. dataSetWriter->config.messageSettings.encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
  810. (dataSetWriter->config.messageSettings.content.decoded.type == &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE])) {
  811. jsonDataSetWriterMessageDataType = (UA_JsonDataSetWriterMessageDataType *)
  812. dataSetWriter->config.messageSettings.content.decoded.data;
  813. /* type is JSON */
  814. messageType = UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE;
  815. }else {
  816. /* create default flag configuration if no
  817. * UadpDataSetWriterMessageDataType was passed in */
  818. memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetWriterMessageDataType));
  819. defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
  820. (UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP | UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
  821. UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
  822. dataSetWriterMessageDataType = &defaultUadpConfiguration;
  823. }
  824. /* Sanity-test the configuration */
  825. if(dataSetWriterMessageDataType &&
  826. (dataSetWriterMessageDataType->networkMessageNumber != 0 ||
  827. dataSetWriterMessageDataType->dataSetOffset != 0 ||
  828. dataSetWriterMessageDataType->configuredSize !=0 )) {
  829. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  830. "Static DSM configuration not supported. Using defaults");
  831. dataSetWriterMessageDataType->networkMessageNumber = 0;
  832. dataSetWriterMessageDataType->dataSetOffset = 0;
  833. dataSetWriterMessageDataType->configuredSize = 0;
  834. }
  835. /* The field encoding depends on the flags inside the writer config.
  836. * TODO: This can be moved to the encoding layer. */
  837. if(dataSetWriter->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_RAWDATAENCODING) {
  838. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
  839. } else if (dataSetWriter->config.dataSetFieldContentMask &
  840. (UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP | UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
  841. UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS | UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
  842. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
  843. } else {
  844. dataSetMessage->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
  845. }
  846. if(messageType == UA_TYPES_UADPDATASETWRITERMESSAGEDATATYPE){
  847. /* Std: 'The DataSetMessageContentMask defines the flags for the content of the DataSetMessage header.' */
  848. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION){
  849. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  850. dataSetMessage->header.configVersionMajorVersion =
  851. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  852. }
  853. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION){
  854. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  855. dataSetMessage->header.configVersionMinorVersion =
  856. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  857. }
  858. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  859. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  860. dataSetMessage->header.dataSetMessageSequenceNr =
  861. dataSetWriter->actualDataSetMessageSequenceCount;
  862. }
  863. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  864. dataSetMessage->header.timestampEnabled = true;
  865. dataSetMessage->header.timestamp = UA_DateTime_now();
  866. }
  867. /* TODO: Picoseconds resolution not supported atm */
  868. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS) {
  869. dataSetMessage->header.picoSecondsIncluded = false;
  870. }
  871. /* TODO: Statuscode not supported yet */
  872. if(dataSetWriterMessageDataType->dataSetMessageContentMask & UA_UADPDATASETMESSAGECONTENTMASK_STATUS){
  873. dataSetMessage->header.statusEnabled = false;
  874. }
  875. }else if(messageType == UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
  876. if(jsonDataSetWriterMessageDataType->dataSetMessageContentMask & UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION){
  877. dataSetMessage->header.configVersionMajorVersionEnabled = true;
  878. dataSetMessage->header.configVersionMajorVersion =
  879. currentDataSet->dataSetMetaData.configurationVersion.majorVersion;
  880. }
  881. if(jsonDataSetWriterMessageDataType->dataSetMessageContentMask & UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION){
  882. dataSetMessage->header.configVersionMinorVersionEnabled = true;
  883. dataSetMessage->header.configVersionMinorVersion =
  884. currentDataSet->dataSetMetaData.configurationVersion.minorVersion;
  885. }
  886. if(jsonDataSetWriterMessageDataType->dataSetMessageContentMask & UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
  887. dataSetMessage->header.dataSetMessageSequenceNrEnabled = true;
  888. dataSetMessage->header.dataSetMessageSequenceNr =
  889. dataSetWriter->actualDataSetMessageSequenceCount;
  890. }
  891. if(jsonDataSetWriterMessageDataType->dataSetMessageContentMask & UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP) {
  892. dataSetMessage->header.timestampEnabled = true;
  893. dataSetMessage->header.timestamp = UA_DateTime_now();
  894. }
  895. /* TODO: Statuscode not supported yet */
  896. if(jsonDataSetWriterMessageDataType->dataSetMessageContentMask & UA_JSONDATASETMESSAGECONTENTMASK_STATUS){
  897. dataSetMessage->header.statusEnabled = false;
  898. }
  899. }
  900. /* Set the sequence count. Automatically rolls over to zero */
  901. dataSetWriter->actualDataSetMessageSequenceCount++;
  902. /* JSON does not differ between deltaframes and keyframes, only keyframes are currently used. */
  903. if(messageType != UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE){
  904. #ifdef UA_ENABLE_PUBSUB_DELTAFRAMES
  905. /* Check if the PublishedDataSet version has changed -> if yes flush the lastValue store and send a KeyFrame */
  906. if(dataSetWriter->connectedDataSetVersion.majorVersion != currentDataSet->dataSetMetaData.configurationVersion.majorVersion ||
  907. dataSetWriter->connectedDataSetVersion.minorVersion != currentDataSet->dataSetMetaData.configurationVersion.minorVersion) {
  908. /* Remove old samples */
  909. for(size_t i = 0; i < dataSetWriter->lastSamplesCount; i++)
  910. UA_DataValue_deleteMembers(&dataSetWriter->lastSamples[i].value);
  911. /* Realloc pds dependent memory */
  912. dataSetWriter->lastSamplesCount = currentDataSet->fieldSize;
  913. UA_DataSetWriterSample *newSamplesArray = (UA_DataSetWriterSample * )
  914. UA_realloc(dataSetWriter->lastSamples, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  915. if(!newSamplesArray)
  916. return UA_STATUSCODE_BADOUTOFMEMORY;
  917. dataSetWriter->lastSamples = newSamplesArray;
  918. memset(dataSetWriter->lastSamples, 0, sizeof(UA_DataSetWriterSample) * dataSetWriter->lastSamplesCount);
  919. dataSetWriter->connectedDataSetVersion = currentDataSet->dataSetMetaData.configurationVersion;
  920. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  921. dataSetWriter->deltaFrameCounter = 0;
  922. return UA_STATUSCODE_GOOD;
  923. }
  924. /* The standard defines: if a PDS contains only one fields no delta messages
  925. * should be generated because they need more memory than a keyframe with 1
  926. * field. */
  927. if(currentDataSet->fieldSize > 1 && dataSetWriter->deltaFrameCounter > 0 &&
  928. dataSetWriter->deltaFrameCounter <= dataSetWriter->config.keyFrameCount) {
  929. UA_PubSubDataSetWriter_generateDeltaFrameMessage(server, dataSetMessage, dataSetWriter);
  930. dataSetWriter->deltaFrameCounter++;
  931. return UA_STATUSCODE_GOOD;
  932. }
  933. dataSetWriter->deltaFrameCounter = 1;
  934. #endif
  935. }
  936. UA_PubSubDataSetWriter_generateKeyFrameMessage(server, dataSetMessage, dataSetWriter);
  937. return UA_STATUSCODE_GOOD;
  938. }
  939. static UA_StatusCode
  940. sendNetworkMessageJson(UA_PubSubConnection *connection, UA_DataSetMessage *dsm,
  941. UA_UInt16 *writerIds, UA_Byte dsmCount, UA_ExtensionObject *transportSettings) {
  942. UA_StatusCode retval = UA_STATUSCODE_BADNOTSUPPORTED;
  943. #ifdef UA_ENABLE_JSON_ENCODING
  944. UA_NetworkMessage nm;
  945. memset(&nm, 0, sizeof(UA_NetworkMessage));
  946. nm.version = 1;
  947. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  948. nm.payloadHeaderEnabled = true;
  949. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  950. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  951. nm.payload.dataSetPayload.dataSetMessages = dsm;
  952. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  953. UA_ByteString buf;
  954. size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true);
  955. size_t stackSize = 1;
  956. if(msgSize <= UA_MAX_STACKBUF)
  957. stackSize = msgSize;
  958. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  959. buf.data = stackBuf;
  960. buf.length = msgSize;
  961. if(msgSize > UA_MAX_STACKBUF) {
  962. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  963. if(retval != UA_STATUSCODE_GOOD)
  964. return retval;
  965. }
  966. /* Encode the message */
  967. UA_Byte *bufPos = buf.data;
  968. memset(bufPos, 0, msgSize);
  969. const UA_Byte *bufEnd = &buf.data[buf.length];
  970. retval = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true);
  971. if(retval != UA_STATUSCODE_GOOD) {
  972. if(msgSize > UA_MAX_STACKBUF)
  973. UA_ByteString_deleteMembers(&buf);
  974. return retval;
  975. }
  976. /* Send the prepared messages */
  977. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  978. if(msgSize > UA_MAX_STACKBUF)
  979. UA_ByteString_deleteMembers(&buf);
  980. #endif
  981. return retval;
  982. }
  983. static UA_StatusCode
  984. sendNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg,
  985. UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount,
  986. UA_ExtensionObject *messageSettings,
  987. UA_ExtensionObject *transportSettings) {
  988. if(messageSettings->content.decoded.type !=
  989. &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE])
  990. return UA_STATUSCODE_BADINTERNALERROR;
  991. UA_UadpWriterGroupMessageDataType *wgm = (UA_UadpWriterGroupMessageDataType*)
  992. messageSettings->content.decoded.data;
  993. UA_NetworkMessage nm;
  994. memset(&nm, 0, sizeof(UA_NetworkMessage));
  995. nm.publisherIdEnabled =
  996. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID);
  997. nm.groupHeaderEnabled =
  998. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER);
  999. nm.groupHeader.writerGroupIdEnabled =
  1000. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID);
  1001. nm.groupHeader.groupVersionEnabled =
  1002. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION);
  1003. nm.groupHeader.networkMessageNumberEnabled =
  1004. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER);
  1005. nm.groupHeader.sequenceNumberEnabled =
  1006. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER);
  1007. nm.payloadHeaderEnabled =
  1008. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
  1009. nm.timestampEnabled =
  1010. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP);
  1011. nm.picosecondsEnabled =
  1012. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS);
  1013. nm.dataSetClassIdEnabled =
  1014. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID);
  1015. nm.promotedFieldsEnabled =
  1016. !!(wgm->networkMessageContentMask & UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS);
  1017. nm.version = 1;
  1018. nm.networkMessageType = UA_NETWORKMESSAGE_DATASET;
  1019. if(connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_NUMERIC) {
  1020. nm.publisherIdType = UA_PUBLISHERDATATYPE_UINT16;
  1021. nm.publisherId.publisherIdUInt32 = connection->config->publisherId.numeric;
  1022. } else if (connection->config->publisherIdType == UA_PUBSUB_PUBLISHERID_STRING){
  1023. nm.publisherIdType = UA_PUBLISHERDATATYPE_STRING;
  1024. nm.publisherId.publisherIdString = connection->config->publisherId.string;
  1025. }
  1026. /* Compute the length of the dsm separately for the header */
  1027. UA_STACKARRAY(UA_UInt16, dsmLengths, dsmCount);
  1028. for(UA_Byte i = 0; i < dsmCount; i++)
  1029. dsmLengths[i] = (UA_UInt16)UA_DataSetMessage_calcSizeBinary(&dsm[i]);
  1030. nm.payloadHeader.dataSetPayloadHeader.count = dsmCount;
  1031. nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds;
  1032. nm.groupHeader.writerGroupId = wg->config.writerGroupId;
  1033. nm.groupHeader.networkMessageNumber = 1;
  1034. nm.payload.dataSetPayload.sizes = dsmLengths;
  1035. nm.payload.dataSetPayload.dataSetMessages = dsm;
  1036. /* Allocate the buffer. Allocate on the stack if the buffer is small. */
  1037. UA_ByteString buf;
  1038. size_t msgSize = UA_NetworkMessage_calcSizeBinary(&nm);
  1039. size_t stackSize = 1;
  1040. if(msgSize <= UA_MAX_STACKBUF)
  1041. stackSize = msgSize;
  1042. UA_STACKARRAY(UA_Byte, stackBuf, stackSize);
  1043. buf.data = stackBuf;
  1044. buf.length = msgSize;
  1045. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  1046. if(msgSize > UA_MAX_STACKBUF) {
  1047. retval = UA_ByteString_allocBuffer(&buf, msgSize);
  1048. if(retval != UA_STATUSCODE_GOOD)
  1049. return retval;
  1050. }
  1051. /* Encode the message */
  1052. UA_Byte *bufPos = buf.data;
  1053. memset(bufPos, 0, msgSize);
  1054. const UA_Byte *bufEnd = &buf.data[buf.length];
  1055. retval = UA_NetworkMessage_encodeBinary(&nm, &bufPos, bufEnd);
  1056. if(retval != UA_STATUSCODE_GOOD) {
  1057. if(msgSize > UA_MAX_STACKBUF)
  1058. UA_ByteString_deleteMembers(&buf);
  1059. return retval;
  1060. }
  1061. /* Send the prepared messages */
  1062. retval = connection->channel->send(connection->channel, transportSettings, &buf);
  1063. if(msgSize > UA_MAX_STACKBUF)
  1064. UA_ByteString_deleteMembers(&buf);
  1065. return retval;
  1066. }
  1067. /* This callback triggers the collection and publish of NetworkMessages and the
  1068. * contained DataSetMessages. */
  1069. void
  1070. UA_WriterGroup_publishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1071. UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Publish Callback");
  1072. if(!writerGroup) {
  1073. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1074. "Publish failed. WriterGroup not found");
  1075. return;
  1076. }
  1077. /* Nothing to do? */
  1078. if(writerGroup->writersCount <= 0)
  1079. return;
  1080. /* Binary or Json encoding? */
  1081. if(writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_UADP &&
  1082. writerGroup->config.encodingMimeType != UA_PUBSUB_ENCODING_JSON) {
  1083. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1084. "Publish failed: Unknown encoding type.");
  1085. return;
  1086. }
  1087. /* Find the connection associated with the writer */
  1088. UA_PubSubConnection *connection =
  1089. UA_PubSubConnection_findConnectionbyId(server, writerGroup->linkedConnection);
  1090. if(!connection) {
  1091. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1092. "Publish failed. PubSubConnection invalid.");
  1093. return;
  1094. }
  1095. /* How many DSM can be sent in one NM? */
  1096. UA_Byte maxDSM = (UA_Byte)writerGroup->config.maxEncapsulatedDataSetMessageCount;
  1097. if(writerGroup->config.maxEncapsulatedDataSetMessageCount > UA_BYTE_MAX)
  1098. maxDSM = UA_BYTE_MAX;
  1099. /* If the maxEncapsulatedDataSetMessageCount is set to 0->1 */
  1100. if(maxDSM == 0)
  1101. maxDSM = 1;
  1102. /* It is possible to put several DataSetMessages into one NetworkMessage.
  1103. * But only if they do not contain promoted fields. NM with only DSM are
  1104. * sent out right away. The others are kept in a buffer for "batching". */
  1105. size_t dsmCount = 0;
  1106. UA_DataSetWriter *dsw;
  1107. UA_STACKARRAY(UA_UInt16, dsWriterIds, writerGroup->writersCount);
  1108. UA_STACKARRAY(UA_DataSetMessage, dsmStore, writerGroup->writersCount);
  1109. LIST_FOREACH(dsw, &writerGroup->writers, listEntry) {
  1110. /* Find the dataset */
  1111. UA_PublishedDataSet *pds =
  1112. UA_PublishedDataSet_findPDSbyId(server, dsw->connectedDataSet);
  1113. if(!pds) {
  1114. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1115. "PubSub Publish: PublishedDataSet not found");
  1116. continue;
  1117. }
  1118. /* Generate the DSM */
  1119. UA_StatusCode res =
  1120. UA_DataSetWriter_generateDataSetMessage(server, &dsmStore[dsmCount], dsw);
  1121. if(res != UA_STATUSCODE_GOOD) {
  1122. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1123. "PubSub Publish: DataSetMessage creation failed");
  1124. continue;
  1125. }
  1126. /* Send right away if there is only this DSM in a NM. If promoted fields
  1127. * are contained in the PublishedDataSet, then this DSM must go into a
  1128. * dedicated NM as well. */
  1129. if(pds->promotedFieldsCount > 0 || maxDSM == 1) {
  1130. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1131. res = sendNetworkMessage(connection, writerGroup, &dsmStore[dsmCount],
  1132. &dsw->config.dataSetWriterId, 1,
  1133. &writerGroup->config.messageSettings,
  1134. &writerGroup->config.transportSettings);
  1135. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1136. res = sendNetworkMessageJson(connection, &dsmStore[dsmCount],
  1137. &dsw->config.dataSetWriterId, 1, &writerGroup->config.transportSettings);
  1138. }
  1139. if(res != UA_STATUSCODE_GOOD)
  1140. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1141. "PubSub Publish: Could not send a NetworkMessage");
  1142. UA_DataSetMessage_free(&dsmStore[dsmCount]);
  1143. continue;
  1144. }
  1145. dsWriterIds[dsmCount] = dsw->config.dataSetWriterId;
  1146. dsmCount++;
  1147. }
  1148. /* Send the NetworkMessages with batched DataSetMessages */
  1149. size_t nmCount = (dsmCount / maxDSM) + ((dsmCount % maxDSM) == 0 ? 0 : 1);
  1150. for(UA_UInt32 i = 0; i < nmCount; i++) {
  1151. UA_Byte nmDsmCount = maxDSM;
  1152. if(i == nmCount - 1)
  1153. nmDsmCount = (UA_Byte)dsmCount % maxDSM;
  1154. UA_StatusCode res3 = UA_STATUSCODE_GOOD;
  1155. if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP){
  1156. res3 = sendNetworkMessage(connection, writerGroup, &dsmStore[i * maxDSM],
  1157. &dsWriterIds[i * maxDSM], nmDsmCount,
  1158. &writerGroup->config.messageSettings,
  1159. &writerGroup->config.transportSettings);
  1160. }else if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON){
  1161. res3 = sendNetworkMessageJson(connection, &dsmStore[i * maxDSM],
  1162. &dsWriterIds[i * maxDSM], nmDsmCount, &writerGroup->config.transportSettings);
  1163. }
  1164. if(res3 != UA_STATUSCODE_GOOD)
  1165. UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER,
  1166. "PubSub Publish: Sending a NetworkMessage failed");
  1167. }
  1168. /* Clean up DSM */
  1169. for(size_t i = 0; i < dsmCount; i++)
  1170. UA_DataSetMessage_free(&dsmStore[i]);
  1171. }
  1172. /* Add new publishCallback. The first execution is triggered directly after
  1173. * creation. */
  1174. UA_StatusCode
  1175. UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *writerGroup) {
  1176. UA_StatusCode retval =
  1177. UA_PubSubManager_addRepeatedCallback(server,
  1178. (UA_ServerCallback) UA_WriterGroup_publishCallback,
  1179. writerGroup, writerGroup->config.publishingInterval,
  1180. &writerGroup->publishCallbackId);
  1181. if(retval == UA_STATUSCODE_GOOD)
  1182. writerGroup->publishCallbackIsRegistered = true;
  1183. /* Run once after creation */
  1184. UA_WriterGroup_publishCallback(server, writerGroup);
  1185. return retval;
  1186. }
  1187. #endif /* UA_ENABLE_PUBSUB */