ua_client_highlevel_subscriptions.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2015-2018 (c) Julius Pfrommer, Fraunhofer IOSB
  6. * Copyright 2015 (c) Oleksiy Vasylyev
  7. * Copyright 2016 (c) Sten Grüner
  8. * Copyright 2017-2018 (c) Thomas Stalder
  9. * Copyright 2016-2017 (c) Florian Palm
  10. * Copyright 2017 (c) Frank Meerkötter
  11. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  12. */
  13. #include "ua_client_highlevel.h"
  14. #include "ua_client_internal.h"
  15. #include "ua_util.h"
  16. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  17. UA_StatusCode
  18. UA_Client_Subscriptions_new(UA_Client *client, UA_SubscriptionSettings settings,
  19. UA_UInt32 *newSubscriptionId) {
  20. UA_CreateSubscriptionRequest request;
  21. UA_CreateSubscriptionRequest_init(&request);
  22. request.requestedPublishingInterval = settings.requestedPublishingInterval;
  23. request.requestedLifetimeCount = settings.requestedLifetimeCount;
  24. request.requestedMaxKeepAliveCount = settings.requestedMaxKeepAliveCount;
  25. request.maxNotificationsPerPublish = settings.maxNotificationsPerPublish;
  26. request.publishingEnabled = settings.publishingEnabled;
  27. request.priority = settings.priority;
  28. UA_CreateSubscriptionResponse response = UA_Client_Service_createSubscription(client, request);
  29. UA_StatusCode retval = response.responseHeader.serviceResult;
  30. if(retval != UA_STATUSCODE_GOOD) {
  31. UA_CreateSubscriptionResponse_deleteMembers(&response);
  32. return retval;
  33. }
  34. UA_Client_Subscription *newSub = (UA_Client_Subscription *)UA_malloc(sizeof(UA_Client_Subscription));
  35. if(!newSub) {
  36. UA_CreateSubscriptionResponse_deleteMembers(&response);
  37. return UA_STATUSCODE_BADOUTOFMEMORY;
  38. }
  39. LIST_INIT(&newSub->monitoredItems);
  40. newSub->lifeTime = response.revisedLifetimeCount;
  41. newSub->keepAliveCount = response.revisedMaxKeepAliveCount;
  42. newSub->publishingInterval = response.revisedPublishingInterval;
  43. newSub->subscriptionID = response.subscriptionId;
  44. newSub->notificationsPerPublish = request.maxNotificationsPerPublish;
  45. newSub->priority = request.priority;
  46. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  47. if(newSubscriptionId)
  48. *newSubscriptionId = newSub->subscriptionID;
  49. UA_CreateSubscriptionResponse_deleteMembers(&response);
  50. return UA_STATUSCODE_GOOD;
  51. }
  52. static UA_Client_Subscription *findSubscription(const UA_Client *client, UA_UInt32 subscriptionId)
  53. {
  54. UA_Client_Subscription *sub = NULL;
  55. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  56. if(sub->subscriptionID == subscriptionId)
  57. break;
  58. }
  59. return sub;
  60. }
  61. /* remove the subscription remotely */
  62. UA_StatusCode
  63. UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId) {
  64. UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
  65. if(!sub)
  66. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  67. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  68. UA_Client_MonitoredItem *mon, *tmpmon;
  69. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmpmon) {
  70. retval =
  71. UA_Client_Subscriptions_removeMonitoredItem(client, sub->subscriptionID,
  72. mon->monitoredItemId);
  73. if(retval != UA_STATUSCODE_GOOD)
  74. return retval;
  75. }
  76. /* remove the subscription remotely */
  77. UA_DeleteSubscriptionsRequest request;
  78. UA_DeleteSubscriptionsRequest_init(&request);
  79. request.subscriptionIdsSize = 1;
  80. request.subscriptionIds = &sub->subscriptionID;
  81. UA_DeleteSubscriptionsResponse response = UA_Client_Service_deleteSubscriptions(client, request);
  82. retval = response.responseHeader.serviceResult;
  83. if(retval == UA_STATUSCODE_GOOD && response.resultsSize > 0)
  84. retval = response.results[0];
  85. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  86. if(retval != UA_STATUSCODE_GOOD && retval != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
  87. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  88. "Could not remove subscription %u with error code %s",
  89. sub->subscriptionID, UA_StatusCode_name(retval));
  90. return retval;
  91. }
  92. UA_Client_Subscriptions_forceDelete(client, sub);
  93. return UA_STATUSCODE_GOOD;
  94. }
  95. void
  96. UA_Client_Subscriptions_forceDelete(UA_Client *client,
  97. UA_Client_Subscription *sub) {
  98. UA_Client_MonitoredItem *mon, *mon_tmp;
  99. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp) {
  100. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  101. LIST_REMOVE(mon, listEntry);
  102. UA_free(mon);
  103. }
  104. LIST_REMOVE(sub, listEntry);
  105. UA_free(sub);
  106. }
  107. static UA_StatusCode
  108. addMonitoredItems(UA_Client *client, const UA_UInt32 subscriptionId,
  109. UA_MonitoredItemCreateRequest *items, size_t itemsSize,
  110. void **hfs, void **hfContexts, UA_StatusCode *itemResults,
  111. UA_UInt32 *newMonitoredItemIds, UA_Boolean isEventMonitoredItem) {
  112. UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
  113. if(!sub)
  114. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  115. UA_CreateMonitoredItemsRequest request;
  116. UA_CreateMonitoredItemsRequest_init(&request);
  117. UA_CreateMonitoredItemsResponse response;
  118. UA_CreateMonitoredItemsResponse_init(&response);
  119. /* Create the handlers */
  120. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  121. UA_Client_MonitoredItem **mis = (UA_Client_MonitoredItem**)
  122. UA_alloca(sizeof(void*) * itemsSize);
  123. memset(mis, 0, sizeof(void*) * itemsSize);
  124. for(size_t i = 0; i < itemsSize; i++) {
  125. mis[i] = (UA_Client_MonitoredItem*)UA_malloc(sizeof(UA_Client_MonitoredItem));
  126. if(!mis[i]) {
  127. retval = UA_STATUSCODE_BADOUTOFMEMORY;
  128. goto cleanup;
  129. }
  130. }
  131. /* Set the clientHandle */
  132. for(size_t i = 0; i < itemsSize; i++)
  133. items[i].requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  134. /* Initialize the request */
  135. request.subscriptionId = subscriptionId;
  136. request.itemsToCreate = items;
  137. request.itemsToCreateSize = itemsSize;
  138. /* Send the request */
  139. response = UA_Client_Service_createMonitoredItems(client, request);
  140. /* Remove for _deleteMembers */
  141. request.itemsToCreate = NULL;
  142. request.itemsToCreateSize = 0;
  143. retval = response.responseHeader.serviceResult;
  144. if(retval != UA_STATUSCODE_GOOD)
  145. goto cleanup;
  146. if(response.resultsSize != itemsSize) {
  147. retval = UA_STATUSCODE_BADINTERNALERROR;
  148. goto cleanup;
  149. }
  150. for(size_t i = 0; i < itemsSize; i++) {
  151. UA_MonitoredItemCreateResult *result = &response.results[i];
  152. UA_Client_MonitoredItem *newMon = mis[i];
  153. itemResults[i] = result->statusCode;
  154. if(result->statusCode != UA_STATUSCODE_GOOD) {
  155. UA_free(newMon);
  156. continue;
  157. }
  158. /* Set the internal representation */
  159. newMon->monitoringMode = UA_MONITORINGMODE_REPORTING;
  160. UA_NodeId_copy(&items[i].itemToMonitor.nodeId, &newMon->monitoredNodeId);
  161. newMon->attributeID = items[i].itemToMonitor.attributeId;
  162. newMon->clientHandle = items[i].requestedParameters.clientHandle;
  163. newMon->samplingInterval = result->revisedSamplingInterval;
  164. newMon->queueSize = result->revisedQueueSize;
  165. newMon->discardOldest = items[i].requestedParameters.discardOldest;
  166. newMon->monitoredItemId = response.results[i].monitoredItemId;
  167. newMon->isEventMonitoredItem = isEventMonitoredItem;
  168. /* eventHandler is at the same position in the union */
  169. newMon->handler.dataChangeHandler = (UA_MonitoredItemHandlingFunction)(uintptr_t)hfs[i];
  170. newMon->handlerContext = hfContexts[i];
  171. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  172. newMonitoredItemIds[i] = newMon->monitoredItemId;
  173. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  174. "Created a monitored item with client handle %u",
  175. newMon->clientHandle);
  176. }
  177. cleanup:
  178. if(retval != UA_STATUSCODE_GOOD) {
  179. for(size_t i = 0; i < itemsSize; i++)
  180. UA_free(mis[i]);
  181. }
  182. UA_CreateMonitoredItemsRequest_deleteMembers(&request);
  183. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  184. return retval;
  185. }
  186. UA_StatusCode
  187. UA_Client_Subscriptions_addMonitoredItems(UA_Client *client, const UA_UInt32 subscriptionId,
  188. UA_MonitoredItemCreateRequest *items, size_t itemsSize,
  189. UA_MonitoredItemHandlingFunction *hfs,
  190. void **hfContexts, UA_StatusCode *itemResults,
  191. UA_UInt32 *newMonitoredItemIds) {
  192. return addMonitoredItems(client, subscriptionId, items, itemsSize, (void**)hfs,
  193. hfContexts, itemResults, newMonitoredItemIds, false);
  194. }
  195. UA_StatusCode UA_EXPORT
  196. UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  197. UA_NodeId nodeId, UA_UInt32 attributeID,
  198. UA_MonitoredItemHandlingFunction hf, void *hfContext,
  199. UA_UInt32 *newMonitoredItemId, UA_Double samplingInterval) {
  200. UA_MonitoredItemCreateRequest item;
  201. UA_MonitoredItemCreateRequest_init(&item);
  202. item.itemToMonitor.nodeId = nodeId;
  203. item.itemToMonitor.attributeId = attributeID;
  204. item.monitoringMode = UA_MONITORINGMODE_REPORTING;
  205. item.requestedParameters.samplingInterval = samplingInterval;
  206. item.requestedParameters.discardOldest = true;
  207. item.requestedParameters.queueSize = 1;
  208. UA_StatusCode retval_item = UA_STATUSCODE_GOOD;
  209. UA_StatusCode retval = addMonitoredItems(client, subscriptionId, &item, 1,
  210. (void**)(uintptr_t)&hf, &hfContext,
  211. &retval_item, newMonitoredItemId, false);
  212. return retval | retval_item;
  213. }
  214. UA_StatusCode
  215. UA_Client_Subscriptions_addMonitoredEvents(UA_Client *client, const UA_UInt32 subscriptionId,
  216. UA_MonitoredItemCreateRequest *items, size_t itemsSize,
  217. UA_MonitoredEventHandlingFunction *hfs,
  218. void **hfContexts, UA_StatusCode *itemResults,
  219. UA_UInt32 *newMonitoredItemIds) {
  220. return addMonitoredItems(client, subscriptionId, items, itemsSize, (void**)hfs,
  221. hfContexts, itemResults, newMonitoredItemIds, true);
  222. }
  223. UA_StatusCode
  224. UA_Client_Subscriptions_addMonitoredEvent(UA_Client *client, UA_UInt32 subscriptionId,
  225. const UA_NodeId nodeId, UA_UInt32 attributeID,
  226. const UA_SimpleAttributeOperand *selectClauses,
  227. size_t selectClausesSize,
  228. const UA_ContentFilterElement *whereClauses,
  229. size_t whereClausesSize,
  230. const UA_MonitoredEventHandlingFunction hf,
  231. void *hfContext, UA_UInt32 *newMonitoredItemId) {
  232. UA_MonitoredItemCreateRequest item;
  233. UA_MonitoredItemCreateRequest_init(&item);
  234. item.itemToMonitor.nodeId = nodeId;
  235. item.itemToMonitor.attributeId = attributeID;
  236. item.monitoringMode = UA_MONITORINGMODE_REPORTING;
  237. item.requestedParameters.samplingInterval = 0;
  238. item.requestedParameters.discardOldest = false;
  239. UA_EventFilter *evFilter = UA_EventFilter_new();
  240. if(!evFilter)
  241. return UA_STATUSCODE_BADOUTOFMEMORY;
  242. UA_EventFilter_init(evFilter);
  243. evFilter->selectClausesSize = selectClausesSize;
  244. evFilter->selectClauses = (UA_SimpleAttributeOperand*)(uintptr_t)selectClauses;
  245. evFilter->whereClause.elementsSize = whereClausesSize;
  246. evFilter->whereClause.elements = (UA_ContentFilterElement*)(uintptr_t)whereClauses;
  247. item.requestedParameters.filter.encoding = UA_EXTENSIONOBJECT_DECODED_NODELETE;
  248. item.requestedParameters.filter.content.decoded.type = &UA_TYPES[UA_TYPES_EVENTFILTER];
  249. item.requestedParameters.filter.content.decoded.data = evFilter;
  250. UA_StatusCode retval_item = UA_STATUSCODE_GOOD;
  251. UA_StatusCode retval = addMonitoredItems(client, subscriptionId, &item, 1,
  252. (void**)(uintptr_t)&hf, &hfContext,
  253. &retval_item, newMonitoredItemId, true);
  254. UA_free(evFilter);
  255. return retval | retval_item;
  256. }
  257. UA_StatusCode
  258. UA_Client_Subscriptions_removeMonitoredItems(UA_Client *client, UA_UInt32 subscriptionId,
  259. UA_UInt32 *monitoredItemId, size_t itemsSize,
  260. UA_StatusCode *itemResults) {
  261. UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
  262. if(!sub)
  263. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  264. /* remove the monitoreditem remotely */
  265. UA_DeleteMonitoredItemsRequest request;
  266. UA_DeleteMonitoredItemsRequest_init(&request);
  267. request.subscriptionId = sub->subscriptionID;
  268. request.monitoredItemIdsSize = itemsSize;
  269. request.monitoredItemIds = monitoredItemId;
  270. UA_DeleteMonitoredItemsResponse response = UA_Client_Service_deleteMonitoredItems(client, request);
  271. UA_StatusCode retval = response.responseHeader.serviceResult;
  272. if(retval != UA_STATUSCODE_GOOD)
  273. goto cleanup;
  274. if(response.resultsSize != itemsSize) {
  275. retval = UA_STATUSCODE_BADINTERNALERROR;
  276. goto cleanup;
  277. }
  278. for(size_t i = 0; i < itemsSize; i++) {
  279. itemResults[i] = response.results[i];
  280. if(response.results[i] != UA_STATUSCODE_GOOD &&
  281. response.results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
  282. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  283. "Could not remove monitoreditem %u with error code %s",
  284. monitoredItemId[i], UA_StatusCode_name(response.results[i]));
  285. continue;
  286. }
  287. UA_Client_MonitoredItem *mon;
  288. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  289. if(mon->monitoredItemId == monitoredItemId[i]) {
  290. LIST_REMOVE(mon, listEntry);
  291. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  292. UA_free(mon);
  293. break;
  294. }
  295. }
  296. }
  297. cleanup:
  298. /* Remove for _deleteMembers */
  299. request.monitoredItemIdsSize = 0;
  300. request.monitoredItemIds = NULL;
  301. UA_DeleteMonitoredItemsRequest_deleteMembers(&request);
  302. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  303. return retval;
  304. }
  305. UA_StatusCode
  306. UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  307. UA_UInt32 monitoredItemId) {
  308. UA_StatusCode retval_item = UA_STATUSCODE_GOOD;
  309. UA_StatusCode retval = UA_Client_Subscriptions_removeMonitoredItems(client, subscriptionId,
  310. &monitoredItemId, 1,
  311. &retval_item);
  312. return retval | retval_item;
  313. }
  314. static void
  315. UA_Client_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
  316. UA_PublishResponse *response) {
  317. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  318. return;
  319. UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
  320. if(!sub)
  321. return;
  322. /* Check if the server has acknowledged any of the sent ACKs */
  323. for(size_t i = 0; i < response->resultsSize && i < request->subscriptionAcknowledgementsSize; ++i) {
  324. /* remove also acks that are unknown to the server */
  325. if(response->results[i] != UA_STATUSCODE_GOOD &&
  326. response->results[i] != UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN)
  327. continue;
  328. /* Remove the ack from the list */
  329. UA_SubscriptionAcknowledgement *orig_ack = &request->subscriptionAcknowledgements[i];
  330. UA_Client_NotificationsAckNumber *ack;
  331. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  332. if(ack->subAck.subscriptionId == orig_ack->subscriptionId &&
  333. ack->subAck.sequenceNumber == orig_ack->sequenceNumber) {
  334. LIST_REMOVE(ack, listEntry);
  335. UA_free(ack);
  336. UA_assert(ack != LIST_FIRST(&client->pendingNotificationsAcks));
  337. break;
  338. }
  339. }
  340. }
  341. /* Process the notification messages */
  342. UA_NotificationMessage *msg = &response->notificationMessage;
  343. for(size_t k = 0; k < msg->notificationDataSize; ++k) {
  344. if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
  345. continue;
  346. /* Handle DataChangeNotification */
  347. if(msg->notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
  348. UA_DataChangeNotification *dataChangeNotification =
  349. (UA_DataChangeNotification *)msg->notificationData[k].content.decoded.data;
  350. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
  351. UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
  352. /* Find the MonitoredItem */
  353. UA_Client_MonitoredItem *mon;
  354. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  355. if(mon->clientHandle == mitemNot->clientHandle)
  356. break;
  357. }
  358. if(!mon) {
  359. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  360. "Could not process a notification with clienthandle %u on subscription %u",
  361. mitemNot->clientHandle, sub->subscriptionID);
  362. continue;
  363. }
  364. if(mon->isEventMonitoredItem) {
  365. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  366. "MonitoredItem is configured for Events. But received a "
  367. "DataChangeNotification.");
  368. continue;
  369. }
  370. mon->handler.dataChangeHandler(client, mon->monitoredItemId,
  371. &mitemNot->value, mon->handlerContext);
  372. }
  373. continue;
  374. }
  375. /* Handle EventNotification */
  376. if(msg->notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
  377. UA_EventNotificationList *eventNotificationList =
  378. (UA_EventNotificationList *)msg->notificationData[k].content.decoded.data;
  379. for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
  380. UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
  381. /* Find the MonitoredItem */
  382. UA_Client_MonitoredItem *mon;
  383. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  384. if(mon->clientHandle == eventFieldList->clientHandle)
  385. break;
  386. }
  387. if(!mon) {
  388. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  389. "Could not process a notification with clienthandle %u on subscription %u",
  390. eventFieldList->clientHandle, sub->subscriptionID);
  391. continue;
  392. }
  393. if(!mon->isEventMonitoredItem) {
  394. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  395. "MonitoredItem is configured for DataChanges. But received a "
  396. "EventNotification.");
  397. continue;
  398. }
  399. mon->handler.eventHandler(client, mon->monitoredItemId, eventFieldList->eventFieldsSize,
  400. eventFieldList->eventFields, mon->handlerContext);
  401. }
  402. }
  403. }
  404. /* Add to the list of pending acks */
  405. UA_Client_NotificationsAckNumber *tmpAck =
  406. (UA_Client_NotificationsAckNumber*)UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  407. if(!tmpAck) {
  408. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  409. "Not enough memory to store the acknowledgement for a publish "
  410. "message on subscription %u", sub->subscriptionID);
  411. return;
  412. }
  413. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  414. tmpAck->subAck.subscriptionId = sub->subscriptionID;
  415. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  416. }
  417. UA_StatusCode
  418. UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
  419. if(client->state < UA_CLIENTSTATE_SESSION)
  420. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  421. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  422. UA_DateTime now = UA_DateTime_nowMonotonic();
  423. UA_DateTime maxDate = now + (UA_DateTime)(client->config.timeout * UA_DATETIME_MSEC);
  424. UA_Boolean moreNotifications = true;
  425. while(moreNotifications) {
  426. UA_PublishRequest request;
  427. UA_PublishRequest_init(&request);
  428. request.subscriptionAcknowledgementsSize = 0;
  429. UA_Client_NotificationsAckNumber *ack;
  430. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  431. ++request.subscriptionAcknowledgementsSize;
  432. if(request.subscriptionAcknowledgementsSize > 0) {
  433. request.subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
  434. UA_malloc(sizeof(UA_SubscriptionAcknowledgement) * request.subscriptionAcknowledgementsSize);
  435. if(!request.subscriptionAcknowledgements)
  436. return UA_STATUSCODE_BADOUTOFMEMORY;
  437. }
  438. int i = 0;
  439. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  440. request.subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
  441. request.subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
  442. ++i;
  443. }
  444. UA_PublishResponse response = UA_Client_Service_publish(client, request);
  445. UA_Client_processPublishResponse(client, &request, &response);
  446. now = UA_DateTime_nowMonotonic();
  447. if(now > maxDate) {
  448. moreNotifications = UA_FALSE;
  449. retval = UA_STATUSCODE_GOODNONCRITICALTIMEOUT;
  450. } else {
  451. moreNotifications = response.moreNotifications;
  452. }
  453. UA_PublishResponse_deleteMembers(&response);
  454. UA_PublishRequest_deleteMembers(&request);
  455. }
  456. if(client->state < UA_CLIENTSTATE_SESSION)
  457. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  458. return retval;
  459. }
  460. void
  461. UA_Client_Subscriptions_clean(UA_Client *client) {
  462. UA_Client_NotificationsAckNumber *n, *tmp;
  463. LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
  464. LIST_REMOVE(n, listEntry);
  465. UA_free(n);
  466. }
  467. UA_Client_Subscription *sub, *tmps;
  468. LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
  469. UA_Client_Subscriptions_forceDelete(client, sub); /* force local removal */
  470. }
  471. #endif /* UA_ENABLE_SUBSCRIPTIONS */