ua_client_highlevel_subscriptions.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. #include "ua_client_highlevel.h"
  5. #include "ua_client_internal.h"
  6. #include "ua_util.h"
  7. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  8. UA_StatusCode
  9. UA_Client_Subscriptions_new(UA_Client *client, UA_SubscriptionSettings settings,
  10. UA_UInt32 *newSubscriptionId) {
  11. UA_CreateSubscriptionRequest request;
  12. UA_CreateSubscriptionRequest_init(&request);
  13. request.requestedPublishingInterval = settings.requestedPublishingInterval;
  14. request.requestedLifetimeCount = settings.requestedLifetimeCount;
  15. request.requestedMaxKeepAliveCount = settings.requestedMaxKeepAliveCount;
  16. request.maxNotificationsPerPublish = settings.maxNotificationsPerPublish;
  17. request.publishingEnabled = settings.publishingEnabled;
  18. request.priority = settings.priority;
  19. UA_CreateSubscriptionResponse response = UA_Client_Service_createSubscription(client, request);
  20. UA_StatusCode retval = response.responseHeader.serviceResult;
  21. if(retval != UA_STATUSCODE_GOOD) {
  22. UA_CreateSubscriptionResponse_deleteMembers(&response);
  23. return retval;
  24. }
  25. UA_Client_Subscription *newSub = (UA_Client_Subscription *)UA_malloc(sizeof(UA_Client_Subscription));
  26. if(!newSub) {
  27. UA_CreateSubscriptionResponse_deleteMembers(&response);
  28. return UA_STATUSCODE_BADOUTOFMEMORY;
  29. }
  30. LIST_INIT(&newSub->monitoredItems);
  31. newSub->lifeTime = response.revisedLifetimeCount;
  32. newSub->keepAliveCount = response.revisedMaxKeepAliveCount;
  33. newSub->publishingInterval = response.revisedPublishingInterval;
  34. newSub->subscriptionID = response.subscriptionId;
  35. newSub->notificationsPerPublish = request.maxNotificationsPerPublish;
  36. newSub->priority = request.priority;
  37. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  38. if(newSubscriptionId)
  39. *newSubscriptionId = newSub->subscriptionID;
  40. UA_CreateSubscriptionResponse_deleteMembers(&response);
  41. return UA_STATUSCODE_GOOD;
  42. }
  43. static UA_Client_Subscription *findSubscription(const UA_Client *client, UA_UInt32 subscriptionId)
  44. {
  45. UA_Client_Subscription *sub = NULL;
  46. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  47. if(sub->subscriptionID == subscriptionId)
  48. break;
  49. }
  50. return sub;
  51. }
  52. /* remove the subscription remotely */
  53. UA_StatusCode
  54. UA_Client_Subscriptions_remove(UA_Client *client, UA_UInt32 subscriptionId) {
  55. UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
  56. if(!sub)
  57. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  58. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  59. UA_Client_MonitoredItem *mon, *tmpmon;
  60. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, tmpmon) {
  61. retval =
  62. UA_Client_Subscriptions_removeMonitoredItem(client, sub->subscriptionID,
  63. mon->monitoredItemId);
  64. if(retval != UA_STATUSCODE_GOOD)
  65. return retval;
  66. }
  67. /* remove the subscription remotely */
  68. UA_DeleteSubscriptionsRequest request;
  69. UA_DeleteSubscriptionsRequest_init(&request);
  70. request.subscriptionIdsSize = 1;
  71. request.subscriptionIds = &sub->subscriptionID;
  72. UA_DeleteSubscriptionsResponse response = UA_Client_Service_deleteSubscriptions(client, request);
  73. retval = response.responseHeader.serviceResult;
  74. if(retval == UA_STATUSCODE_GOOD && response.resultsSize > 0)
  75. retval = response.results[0];
  76. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  77. if(retval != UA_STATUSCODE_GOOD && retval != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
  78. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  79. "Could not remove subscription %u with error code %s",
  80. sub->subscriptionID, UA_StatusCode_name(retval));
  81. return retval;
  82. }
  83. UA_Client_Subscriptions_forceDelete(client, sub);
  84. return UA_STATUSCODE_GOOD;
  85. }
  86. void
  87. UA_Client_Subscriptions_forceDelete(UA_Client *client,
  88. UA_Client_Subscription *sub) {
  89. UA_Client_MonitoredItem *mon, *mon_tmp;
  90. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp) {
  91. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  92. LIST_REMOVE(mon, listEntry);
  93. UA_free(mon);
  94. }
  95. LIST_REMOVE(sub, listEntry);
  96. UA_free(sub);
  97. }
  98. static UA_StatusCode
  99. addMonitoredItems(UA_Client *client, const UA_UInt32 subscriptionId,
  100. UA_MonitoredItemCreateRequest *items, size_t itemsSize,
  101. void **hfs, void **hfContexts, UA_StatusCode *itemResults,
  102. UA_UInt32 *newMonitoredItemIds, UA_Boolean isEventMonitoredItem) {
  103. UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
  104. if(!sub)
  105. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  106. UA_CreateMonitoredItemsRequest request;
  107. UA_CreateMonitoredItemsRequest_init(&request);
  108. UA_CreateMonitoredItemsResponse response;
  109. UA_CreateMonitoredItemsResponse_init(&response);
  110. /* Create the handlers */
  111. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  112. UA_Client_MonitoredItem **mis = (UA_Client_MonitoredItem**)
  113. UA_alloca(sizeof(void*) * itemsSize);
  114. memset(mis, 0, sizeof(void*) * itemsSize);
  115. for(size_t i = 0; i < itemsSize; i++) {
  116. mis[i] = (UA_Client_MonitoredItem*)UA_malloc(sizeof(UA_Client_MonitoredItem));
  117. if(!mis[i]) {
  118. retval = UA_STATUSCODE_BADOUTOFMEMORY;
  119. goto cleanup;
  120. }
  121. }
  122. /* Set the clientHandle */
  123. for(size_t i = 0; i < itemsSize; i++)
  124. items[i].requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  125. /* Initialize the request */
  126. request.subscriptionId = subscriptionId;
  127. request.itemsToCreate = items;
  128. request.itemsToCreateSize = itemsSize;
  129. /* Send the request */
  130. response = UA_Client_Service_createMonitoredItems(client, request);
  131. /* Remove for _deleteMembers */
  132. request.itemsToCreate = NULL;
  133. request.itemsToCreateSize = 0;
  134. retval = response.responseHeader.serviceResult;
  135. if(retval != UA_STATUSCODE_GOOD)
  136. goto cleanup;
  137. if(response.resultsSize != itemsSize) {
  138. retval = UA_STATUSCODE_BADINTERNALERROR;
  139. goto cleanup;
  140. }
  141. for(size_t i = 0; i < itemsSize; i++) {
  142. UA_MonitoredItemCreateResult *result = &response.results[i];
  143. UA_Client_MonitoredItem *newMon = mis[i];
  144. itemResults[i] = result->statusCode;
  145. if(result->statusCode != UA_STATUSCODE_GOOD) {
  146. UA_free(newMon);
  147. continue;
  148. }
  149. /* Set the internal representation */
  150. newMon->monitoringMode = UA_MONITORINGMODE_REPORTING;
  151. UA_NodeId_copy(&items[i].itemToMonitor.nodeId, &newMon->monitoredNodeId);
  152. newMon->attributeID = items[i].itemToMonitor.attributeId;
  153. newMon->clientHandle = items[i].requestedParameters.clientHandle;
  154. newMon->samplingInterval = result->revisedSamplingInterval;
  155. newMon->queueSize = result->revisedQueueSize;
  156. newMon->discardOldest = items[i].requestedParameters.discardOldest;
  157. newMon->monitoredItemId = response.results[i].monitoredItemId;
  158. newMon->isEventMonitoredItem = isEventMonitoredItem;
  159. /* eventHandler is at the same position in the union */
  160. newMon->handler.dataChangeHandler = (UA_MonitoredItemHandlingFunction)(uintptr_t)hfs[i];
  161. newMon->handlerContext = hfContexts[i];
  162. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  163. newMonitoredItemIds[i] = newMon->monitoredItemId;
  164. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  165. "Created a monitored item with client handle %u",
  166. client->monitoredItemHandles);
  167. }
  168. cleanup:
  169. if(retval != UA_STATUSCODE_GOOD) {
  170. for(size_t i = 0; i < itemsSize; i++)
  171. UA_free(mis[i]);
  172. }
  173. UA_CreateMonitoredItemsRequest_deleteMembers(&request);
  174. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  175. return retval;
  176. }
  177. UA_StatusCode
  178. UA_Client_Subscriptions_addMonitoredItems(UA_Client *client, const UA_UInt32 subscriptionId,
  179. UA_MonitoredItemCreateRequest *items, size_t itemsSize,
  180. UA_MonitoredItemHandlingFunction *hfs,
  181. void **hfContexts, UA_StatusCode *itemResults,
  182. UA_UInt32 *newMonitoredItemIds) {
  183. return addMonitoredItems(client, subscriptionId, items, itemsSize, (void**)hfs,
  184. hfContexts, itemResults, newMonitoredItemIds, false);
  185. }
  186. UA_StatusCode UA_EXPORT
  187. UA_Client_Subscriptions_addMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  188. UA_NodeId nodeId, UA_UInt32 attributeID,
  189. UA_MonitoredItemHandlingFunction hf, void *hfContext,
  190. UA_UInt32 *newMonitoredItemId, UA_Double samplingInterval) {
  191. UA_MonitoredItemCreateRequest item;
  192. UA_MonitoredItemCreateRequest_init(&item);
  193. item.itemToMonitor.nodeId = nodeId;
  194. item.itemToMonitor.attributeId = attributeID;
  195. item.monitoringMode = UA_MONITORINGMODE_REPORTING;
  196. item.requestedParameters.samplingInterval = samplingInterval;
  197. item.requestedParameters.discardOldest = true;
  198. item.requestedParameters.queueSize = 1;
  199. UA_StatusCode retval_item = UA_STATUSCODE_GOOD;
  200. UA_StatusCode retval = addMonitoredItems(client, subscriptionId, &item, 1,
  201. (void**)(uintptr_t)&hf, &hfContext,
  202. &retval_item, newMonitoredItemId, false);
  203. return retval | retval_item;
  204. }
  205. UA_StatusCode
  206. UA_Client_Subscriptions_addMonitoredEvents(UA_Client *client, const UA_UInt32 subscriptionId,
  207. UA_MonitoredItemCreateRequest *items, size_t itemsSize,
  208. UA_MonitoredEventHandlingFunction *hfs,
  209. void **hfContexts, UA_StatusCode *itemResults,
  210. UA_UInt32 *newMonitoredItemIds) {
  211. return addMonitoredItems(client, subscriptionId, items, itemsSize, (void**)hfs,
  212. hfContexts, itemResults, newMonitoredItemIds, true);
  213. }
  214. UA_StatusCode
  215. UA_Client_Subscriptions_addMonitoredEvent(UA_Client *client, UA_UInt32 subscriptionId,
  216. const UA_NodeId nodeId, UA_UInt32 attributeID,
  217. const UA_SimpleAttributeOperand *selectClauses,
  218. size_t selectClausesSize,
  219. const UA_ContentFilterElement *whereClauses,
  220. size_t whereClausesSize,
  221. const UA_MonitoredEventHandlingFunction hf,
  222. void *hfContext, UA_UInt32 *newMonitoredItemId) {
  223. UA_MonitoredItemCreateRequest item;
  224. UA_MonitoredItemCreateRequest_init(&item);
  225. item.itemToMonitor.nodeId = nodeId;
  226. item.itemToMonitor.attributeId = attributeID;
  227. item.monitoringMode = UA_MONITORINGMODE_REPORTING;
  228. item.requestedParameters.samplingInterval = 0;
  229. item.requestedParameters.discardOldest = false;
  230. UA_EventFilter *evFilter = UA_EventFilter_new();
  231. if(!evFilter)
  232. return UA_STATUSCODE_BADOUTOFMEMORY;
  233. UA_EventFilter_init(evFilter);
  234. evFilter->selectClausesSize = selectClausesSize;
  235. evFilter->selectClauses = (UA_SimpleAttributeOperand*)(uintptr_t)selectClauses;
  236. evFilter->whereClause.elementsSize = whereClausesSize;
  237. evFilter->whereClause.elements = (UA_ContentFilterElement*)(uintptr_t)whereClauses;
  238. item.requestedParameters.filter.encoding = UA_EXTENSIONOBJECT_DECODED_NODELETE;
  239. item.requestedParameters.filter.content.decoded.type = &UA_TYPES[UA_TYPES_EVENTFILTER];
  240. item.requestedParameters.filter.content.decoded.data = evFilter;
  241. UA_StatusCode retval_item = UA_STATUSCODE_GOOD;
  242. UA_StatusCode retval = addMonitoredItems(client, subscriptionId, &item, 1,
  243. (void**)(uintptr_t)&hf, &hfContext,
  244. &retval_item, newMonitoredItemId, true);
  245. UA_free(evFilter);
  246. return retval | retval_item;
  247. }
  248. UA_StatusCode
  249. UA_Client_Subscriptions_removeMonitoredItems(UA_Client *client, UA_UInt32 subscriptionId,
  250. UA_UInt32 *monitoredItemId, size_t itemsSize,
  251. UA_StatusCode *itemResults) {
  252. UA_Client_Subscription *sub = findSubscription(client, subscriptionId);
  253. if(!sub)
  254. return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  255. /* remove the monitoreditem remotely */
  256. UA_DeleteMonitoredItemsRequest request;
  257. UA_DeleteMonitoredItemsRequest_init(&request);
  258. request.subscriptionId = sub->subscriptionID;
  259. request.monitoredItemIdsSize = itemsSize;
  260. request.monitoredItemIds = monitoredItemId;
  261. UA_DeleteMonitoredItemsResponse response = UA_Client_Service_deleteMonitoredItems(client, request);
  262. UA_StatusCode retval = response.responseHeader.serviceResult;
  263. if(retval != UA_STATUSCODE_GOOD)
  264. goto cleanup;
  265. if(response.resultsSize != itemsSize) {
  266. retval = UA_STATUSCODE_BADINTERNALERROR;
  267. goto cleanup;
  268. }
  269. for(size_t i = 0; i < itemsSize; i++) {
  270. itemResults[i] = response.results[i];
  271. if(response.results[i] != UA_STATUSCODE_GOOD &&
  272. response.results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
  273. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  274. "Could not remove monitoreditem %u with error code %s",
  275. monitoredItemId[i], UA_StatusCode_name(response.results[i]));
  276. continue;
  277. }
  278. UA_Client_MonitoredItem *mon;
  279. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  280. if(mon->monitoredItemId == monitoredItemId[i]){
  281. LIST_REMOVE(mon, listEntry);
  282. UA_NodeId_deleteMembers(&mon->monitoredNodeId);
  283. UA_free(mon);
  284. break;
  285. }
  286. }
  287. }
  288. cleanup:
  289. /* Remove for _deleteMembers */
  290. request.monitoredItemIdsSize = 0;
  291. request.monitoredItemIds = NULL;
  292. UA_DeleteMonitoredItemsRequest_deleteMembers(&request);
  293. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  294. return retval;
  295. }
  296. UA_StatusCode
  297. UA_Client_Subscriptions_removeMonitoredItem(UA_Client *client, UA_UInt32 subscriptionId,
  298. UA_UInt32 monitoredItemId) {
  299. UA_StatusCode retval_item = UA_STATUSCODE_GOOD;
  300. UA_StatusCode retval = UA_Client_Subscriptions_removeMonitoredItems(client, subscriptionId,
  301. &monitoredItemId, 1,
  302. &retval_item);
  303. return retval | retval_item;
  304. }
  305. static void
  306. UA_Client_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
  307. UA_PublishResponse *response) {
  308. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  309. return;
  310. UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
  311. if(!sub)
  312. return;
  313. /* Check if the server has acknowledged any of the sent ACKs */
  314. for(size_t i = 0; i < response->resultsSize && i < request->subscriptionAcknowledgementsSize; ++i) {
  315. /* remove also acks that are unknown to the server */
  316. if(response->results[i] != UA_STATUSCODE_GOOD &&
  317. response->results[i] != UA_STATUSCODE_BADSEQUENCENUMBERUNKNOWN)
  318. continue;
  319. /* Remove the ack from the list */
  320. UA_SubscriptionAcknowledgement *orig_ack = &request->subscriptionAcknowledgements[i];
  321. UA_Client_NotificationsAckNumber *ack;
  322. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  323. if(ack->subAck.subscriptionId == orig_ack->subscriptionId &&
  324. ack->subAck.sequenceNumber == orig_ack->sequenceNumber) {
  325. LIST_REMOVE(ack, listEntry);
  326. UA_free(ack);
  327. UA_assert(ack != LIST_FIRST(&client->pendingNotificationsAcks));
  328. break;
  329. }
  330. }
  331. }
  332. /* Process the notification messages */
  333. UA_NotificationMessage *msg = &response->notificationMessage;
  334. for(size_t k = 0; k < msg->notificationDataSize; ++k) {
  335. if(msg->notificationData[k].encoding != UA_EXTENSIONOBJECT_DECODED)
  336. continue;
  337. /* Handle DataChangeNotification */
  338. if(msg->notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
  339. UA_DataChangeNotification *dataChangeNotification =
  340. (UA_DataChangeNotification *)msg->notificationData[k].content.decoded.data;
  341. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
  342. UA_MonitoredItemNotification *mitemNot = &dataChangeNotification->monitoredItems[j];
  343. /* Find the MonitoredItem */
  344. UA_Client_MonitoredItem *mon;
  345. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  346. if(mon->clientHandle == mitemNot->clientHandle)
  347. break;
  348. }
  349. if(!mon) {
  350. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  351. "Could not process a notification with clienthandle %u on subscription %u",
  352. mitemNot->clientHandle, sub->subscriptionID);
  353. continue;
  354. }
  355. if(mon->isEventMonitoredItem) {
  356. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  357. "MonitoredItem is configured for Events. But received a "
  358. "DataChangeNotification.");
  359. continue;
  360. }
  361. mon->handler.dataChangeHandler(mon->monitoredItemId, &mitemNot->value, mon->handlerContext);
  362. }
  363. continue;
  364. }
  365. /* Handle EventNotification */
  366. if(msg->notificationData[k].content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
  367. UA_EventNotificationList *eventNotificationList =
  368. (UA_EventNotificationList *)msg->notificationData[k].content.decoded.data;
  369. for (size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
  370. UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
  371. /* Find the MonitoredItem */
  372. UA_Client_MonitoredItem *mon;
  373. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  374. if(mon->clientHandle == eventFieldList->clientHandle)
  375. break;
  376. }
  377. if(!mon) {
  378. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  379. "Could not process a notification with clienthandle %u on subscription %u",
  380. eventFieldList->clientHandle, sub->subscriptionID);
  381. continue;
  382. }
  383. if(!mon->isEventMonitoredItem) {
  384. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  385. "MonitoredItem is configured for DataChanges. But received a "
  386. "EventNotification.");
  387. continue;
  388. }
  389. mon->handler.eventHandler(mon->monitoredItemId, eventFieldList->eventFieldsSize,
  390. eventFieldList->eventFields, mon->handlerContext);
  391. }
  392. }
  393. }
  394. /* Add to the list of pending acks */
  395. UA_Client_NotificationsAckNumber *tmpAck =
  396. (UA_Client_NotificationsAckNumber*)UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  397. if(!tmpAck) {
  398. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  399. "Not enough memory to store the acknowledgement for a publish "
  400. "message on subscription %u", sub->subscriptionID);
  401. return;
  402. }
  403. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  404. tmpAck->subAck.subscriptionId = sub->subscriptionID;
  405. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  406. }
  407. UA_StatusCode
  408. UA_Client_Subscriptions_manuallySendPublishRequest(UA_Client *client) {
  409. if(client->state < UA_CLIENTSTATE_SESSION)
  410. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  411. UA_StatusCode retval = UA_STATUSCODE_GOOD;
  412. UA_DateTime now = UA_DateTime_nowMonotonic();
  413. UA_DateTime maxDate = now + (UA_DateTime)(client->config.timeout * UA_DATETIME_MSEC);
  414. UA_Boolean moreNotifications = true;
  415. while(moreNotifications) {
  416. UA_PublishRequest request;
  417. UA_PublishRequest_init(&request);
  418. request.subscriptionAcknowledgementsSize = 0;
  419. UA_Client_NotificationsAckNumber *ack;
  420. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  421. ++request.subscriptionAcknowledgementsSize;
  422. if(request.subscriptionAcknowledgementsSize > 0) {
  423. request.subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
  424. UA_malloc(sizeof(UA_SubscriptionAcknowledgement) * request.subscriptionAcknowledgementsSize);
  425. if(!request.subscriptionAcknowledgements)
  426. return UA_STATUSCODE_BADOUTOFMEMORY;
  427. }
  428. int i = 0;
  429. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry) {
  430. request.subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
  431. request.subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
  432. ++i;
  433. }
  434. UA_PublishResponse response = UA_Client_Service_publish(client, request);
  435. UA_Client_processPublishResponse(client, &request, &response);
  436. now = UA_DateTime_nowMonotonic();
  437. if (now > maxDate){
  438. moreNotifications = UA_FALSE;
  439. retval = UA_STATUSCODE_GOODNONCRITICALTIMEOUT;
  440. }else{
  441. moreNotifications = response.moreNotifications;
  442. }
  443. UA_PublishResponse_deleteMembers(&response);
  444. UA_PublishRequest_deleteMembers(&request);
  445. }
  446. if(client->state < UA_CLIENTSTATE_SESSION)
  447. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  448. return retval;
  449. }
  450. void
  451. UA_Client_Subscriptions_clean(UA_Client *client){
  452. UA_Client_NotificationsAckNumber *n, *tmp;
  453. LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
  454. LIST_REMOVE(n, listEntry);
  455. UA_free(n);
  456. }
  457. UA_Client_Subscription *sub, *tmps;
  458. LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
  459. UA_Client_Subscriptions_forceDelete(client, sub); /* force local removal */
  460. }
  461. #endif /* UA_ENABLE_SUBSCRIPTIONS */