ua_client_subscriptions.c 32 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2015 (c) Oleksiy Vasylyev
  7. * Copyright 2016 (c) Sten Grüner
  8. * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2016-2017 (c) Florian Palm
  10. * Copyright 2017 (c) Frank Meerkötter
  11. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  12. */
  13. #include <open62541/client_highlevel.h>
  14. #include <open62541/client_highlevel_async.h>
  15. #include "ua_client_internal.h"
  16. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  17. /*****************/
  18. /* Subscriptions */
  19. /*****************/
  20. UA_CreateSubscriptionResponse UA_EXPORT
  21. UA_Client_Subscriptions_create(UA_Client *client,
  22. const UA_CreateSubscriptionRequest request,
  23. void *subscriptionContext,
  24. UA_Client_StatusChangeNotificationCallback statusChangeCallback,
  25. UA_Client_DeleteSubscriptionCallback deleteCallback) {
  26. UA_CreateSubscriptionResponse response;
  27. UA_CreateSubscriptionResponse_init(&response);
  28. /* Allocate the internal representation */
  29. UA_Client_Subscription *newSub = (UA_Client_Subscription*)
  30. UA_malloc(sizeof(UA_Client_Subscription));
  31. if(!newSub) {
  32. response.responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  33. return response;
  34. }
  35. /* Send the request as a synchronous service call */
  36. __UA_Client_Service(client,
  37. &request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST],
  38. &response, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE]);
  39. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  40. UA_free(newSub);
  41. return response;
  42. }
  43. /* Prepare the internal representation */
  44. newSub->context = subscriptionContext;
  45. newSub->subscriptionId = response.subscriptionId;
  46. newSub->sequenceNumber = 0;
  47. newSub->lastActivity = UA_DateTime_nowMonotonic();
  48. newSub->statusChangeCallback = statusChangeCallback;
  49. newSub->deleteCallback = deleteCallback;
  50. newSub->publishingInterval = response.revisedPublishingInterval;
  51. newSub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
  52. LIST_INIT(&newSub->monitoredItems);
  53. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  54. return response;
  55. }
  56. static UA_Client_Subscription *
  57. findSubscription(const UA_Client *client, UA_UInt32 subscriptionId) {
  58. UA_Client_Subscription *sub = NULL;
  59. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  60. if(sub->subscriptionId == subscriptionId)
  61. break;
  62. }
  63. return sub;
  64. }
  65. UA_ModifySubscriptionResponse UA_EXPORT
  66. UA_Client_Subscriptions_modify(UA_Client *client, const UA_ModifySubscriptionRequest request) {
  67. UA_ModifySubscriptionResponse response;
  68. UA_ModifySubscriptionResponse_init(&response);
  69. /* Find the internal representation */
  70. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  71. if(!sub) {
  72. response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  73. return response;
  74. }
  75. /* Call the service */
  76. __UA_Client_Service(client,
  77. &request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
  78. &response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]);
  79. /* Adjust the internal representation */
  80. sub->publishingInterval = response.revisedPublishingInterval;
  81. sub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
  82. return response;
  83. }
  84. static void
  85. UA_Client_Subscription_deleteInternal(UA_Client *client, UA_Client_Subscription *sub) {
  86. /* Remove the MonitoredItems */
  87. UA_Client_MonitoredItem *mon, *mon_tmp;
  88. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp)
  89. UA_Client_MonitoredItem_remove(client, sub, mon);
  90. /* Call the delete callback */
  91. if(sub->deleteCallback)
  92. sub->deleteCallback(client, sub->subscriptionId, sub->context);
  93. /* Remove */
  94. LIST_REMOVE(sub, listEntry);
  95. UA_free(sub);
  96. }
  97. UA_DeleteSubscriptionsResponse UA_EXPORT
  98. UA_Client_Subscriptions_delete(UA_Client *client, const UA_DeleteSubscriptionsRequest request) {
  99. UA_STACKARRAY(UA_Client_Subscription*, subs, request.subscriptionIdsSize);
  100. memset(subs, 0, sizeof(void*) * request.subscriptionIdsSize);
  101. /* temporary remove the subscriptions from the list */
  102. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  103. subs[i] = findSubscription(client, request.subscriptionIds[i]);
  104. if (subs[i])
  105. LIST_REMOVE(subs[i], listEntry);
  106. }
  107. /* Send the request */
  108. UA_DeleteSubscriptionsResponse response;
  109. __UA_Client_Service(client,
  110. &request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST],
  111. &response, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE]);
  112. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  113. goto cleanup;
  114. if(request.subscriptionIdsSize != response.resultsSize) {
  115. response.responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  116. goto cleanup;
  117. }
  118. /* Loop over the removed subscriptions and remove internally */
  119. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  120. if(response.results[i] != UA_STATUSCODE_GOOD &&
  121. response.results[i] != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
  122. /* Something was wrong, reinsert the subscription in the list */
  123. if (subs[i])
  124. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  125. continue;
  126. }
  127. if(!subs[i]) {
  128. UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  129. "No internal representation of subscription %u",
  130. request.subscriptionIds[i]);
  131. continue;
  132. }
  133. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  134. UA_Client_Subscription_deleteInternal(client, subs[i]);
  135. }
  136. return response;
  137. cleanup:
  138. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  139. if (subs[i]) {
  140. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  141. }
  142. }
  143. return response;
  144. }
  145. UA_StatusCode UA_EXPORT
  146. UA_Client_Subscriptions_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId) {
  147. UA_DeleteSubscriptionsRequest request;
  148. UA_DeleteSubscriptionsRequest_init(&request);
  149. request.subscriptionIds = &subscriptionId;
  150. request.subscriptionIdsSize = 1;
  151. UA_DeleteSubscriptionsResponse response =
  152. UA_Client_Subscriptions_delete(client, request);
  153. UA_StatusCode retval = response.responseHeader.serviceResult;
  154. if(retval != UA_STATUSCODE_GOOD) {
  155. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  156. return retval;
  157. }
  158. if(response.resultsSize != 1) {
  159. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  160. return UA_STATUSCODE_BADINTERNALERROR;
  161. }
  162. retval = response.results[0];
  163. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  164. return retval;
  165. }
  166. /******************/
  167. /* MonitoredItems */
  168. /******************/
  169. void
  170. UA_Client_MonitoredItem_remove(UA_Client *client, UA_Client_Subscription *sub,
  171. UA_Client_MonitoredItem *mon) {
  172. // NOLINTNEXTLINE
  173. LIST_REMOVE(mon, listEntry);
  174. if(mon->deleteCallback)
  175. mon->deleteCallback(client, sub->subscriptionId, sub->context,
  176. mon->monitoredItemId, mon->context);
  177. UA_free(mon);
  178. }
  179. static void
  180. __UA_Client_MonitoredItems_create(UA_Client *client,
  181. const UA_CreateMonitoredItemsRequest *request,
  182. void **contexts, void **handlingCallbacks,
  183. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
  184. UA_CreateMonitoredItemsResponse *response) {
  185. UA_CreateMonitoredItemsResponse_init(response);
  186. if (!request->itemsToCreateSize) {
  187. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  188. return;
  189. }
  190. /* Fix clang warning */
  191. size_t itemsToCreateSize = request->itemsToCreateSize;
  192. UA_Client_Subscription *sub = NULL;
  193. /* Allocate the memory for internal representations */
  194. UA_STACKARRAY(UA_Client_MonitoredItem*, mis, itemsToCreateSize);
  195. memset(mis, 0, sizeof(void*) * itemsToCreateSize);
  196. for(size_t i = 0; i < itemsToCreateSize; i++) {
  197. mis[i] = (UA_Client_MonitoredItem*)UA_malloc(sizeof(UA_Client_MonitoredItem));
  198. if(!mis[i]) {
  199. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  200. goto cleanup;
  201. }
  202. }
  203. /* Get the subscription */
  204. sub = findSubscription(client, request->subscriptionId);
  205. if(!sub) {
  206. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  207. goto cleanup;
  208. }
  209. /* Set the clientHandle */
  210. for(size_t i = 0; i < itemsToCreateSize; i++)
  211. request->itemsToCreate[i].requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  212. /* Call the service */
  213. __UA_Client_Service(client, request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
  214. response, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE]);
  215. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  216. goto cleanup;
  217. if(response->resultsSize != itemsToCreateSize) {
  218. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  219. goto cleanup;
  220. }
  221. /* Add internally */
  222. for(size_t i = 0; i < itemsToCreateSize; i++) {
  223. if(response->results[i].statusCode != UA_STATUSCODE_GOOD) {
  224. if (deleteCallbacks[i])
  225. deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
  226. UA_free(mis[i]);
  227. mis[i] = NULL;
  228. continue;
  229. }
  230. UA_Client_MonitoredItem *newMon = mis[i];
  231. newMon->clientHandle = request->itemsToCreate[i].requestedParameters.clientHandle;
  232. newMon->monitoredItemId = response->results[i].monitoredItemId;
  233. newMon->context = contexts[i];
  234. newMon->deleteCallback = deleteCallbacks[i];
  235. newMon->handler.dataChangeCallback =
  236. (UA_Client_DataChangeNotificationCallback)(uintptr_t)handlingCallbacks[i];
  237. newMon->isEventMonitoredItem =
  238. (request->itemsToCreate[i].itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER);
  239. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  240. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  241. "Subscription %u | Added a MonitoredItem with handle %u",
  242. sub->subscriptionId, newMon->clientHandle);
  243. }
  244. return;
  245. cleanup:
  246. for(size_t i = 0; i < itemsToCreateSize; i++) {
  247. if (deleteCallbacks[i]) {
  248. if (sub)
  249. deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
  250. else
  251. deleteCallbacks[i](client, 0, NULL, 0, contexts[i]);
  252. }
  253. if(mis[i])
  254. UA_free(mis[i]);
  255. }
  256. }
  257. UA_CreateMonitoredItemsResponse UA_EXPORT
  258. UA_Client_MonitoredItems_createDataChanges(UA_Client *client,
  259. const UA_CreateMonitoredItemsRequest request, void **contexts,
  260. UA_Client_DataChangeNotificationCallback *callbacks,
  261. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) {
  262. UA_CreateMonitoredItemsResponse response;
  263. __UA_Client_MonitoredItems_create(client, &request, contexts,
  264. (void**)(uintptr_t)callbacks, deleteCallbacks, &response);
  265. return response;
  266. }
  267. UA_MonitoredItemCreateResult UA_EXPORT
  268. UA_Client_MonitoredItems_createDataChange(UA_Client *client, UA_UInt32 subscriptionId,
  269. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  270. void *context, UA_Client_DataChangeNotificationCallback callback,
  271. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  272. UA_CreateMonitoredItemsRequest request;
  273. UA_CreateMonitoredItemsRequest_init(&request);
  274. request.subscriptionId = subscriptionId;
  275. request.timestampsToReturn = timestampsToReturn;
  276. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  277. request.itemsToCreateSize = 1;
  278. UA_CreateMonitoredItemsResponse response =
  279. UA_Client_MonitoredItems_createDataChanges(client, request, &context,
  280. &callback, &deleteCallback);
  281. UA_MonitoredItemCreateResult result;
  282. UA_MonitoredItemCreateResult_init(&result);
  283. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  284. result.statusCode = response.responseHeader.serviceResult;
  285. if(result.statusCode == UA_STATUSCODE_GOOD &&
  286. response.resultsSize != 1)
  287. result.statusCode = UA_STATUSCODE_BADINTERNALERROR;
  288. if(result.statusCode == UA_STATUSCODE_GOOD)
  289. UA_MonitoredItemCreateResult_copy(&response.results[0] , &result);
  290. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  291. return result;
  292. }
  293. UA_CreateMonitoredItemsResponse UA_EXPORT
  294. UA_Client_MonitoredItems_createEvents(UA_Client *client,
  295. const UA_CreateMonitoredItemsRequest request, void **contexts,
  296. UA_Client_EventNotificationCallback *callback,
  297. UA_Client_DeleteMonitoredItemCallback *deleteCallback) {
  298. UA_CreateMonitoredItemsResponse response;
  299. __UA_Client_MonitoredItems_create(client, &request, contexts,
  300. (void**)(uintptr_t)callback, deleteCallback, &response);
  301. return response;
  302. }
  303. UA_MonitoredItemCreateResult UA_EXPORT
  304. UA_Client_MonitoredItems_createEvent(UA_Client *client, UA_UInt32 subscriptionId,
  305. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  306. void *context, UA_Client_EventNotificationCallback callback,
  307. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  308. UA_CreateMonitoredItemsRequest request;
  309. UA_CreateMonitoredItemsRequest_init(&request);
  310. request.subscriptionId = subscriptionId;
  311. request.timestampsToReturn = timestampsToReturn;
  312. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  313. request.itemsToCreateSize = 1;
  314. UA_CreateMonitoredItemsResponse response =
  315. UA_Client_MonitoredItems_createEvents(client, request, &context,
  316. &callback, &deleteCallback);
  317. UA_MonitoredItemCreateResult result;
  318. UA_MonitoredItemCreateResult_copy(response.results , &result);
  319. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  320. return result;
  321. }
  322. UA_DeleteMonitoredItemsResponse UA_EXPORT
  323. UA_Client_MonitoredItems_delete(UA_Client *client, const UA_DeleteMonitoredItemsRequest request) {
  324. /* Send the request */
  325. UA_DeleteMonitoredItemsResponse response;
  326. __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST],
  327. &response, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE]);
  328. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  329. return response;
  330. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  331. if(!sub) {
  332. UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  333. "No internal representation of subscription %u",
  334. request.subscriptionId);
  335. return response;
  336. }
  337. /* Loop over deleted MonitoredItems */
  338. for(size_t i = 0; i < response.resultsSize; i++) {
  339. if(response.results[i] != UA_STATUSCODE_GOOD &&
  340. response.results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
  341. continue;
  342. }
  343. #ifndef __clang_analyzer__
  344. /* Delete the internal representation */
  345. UA_Client_MonitoredItem *mon;
  346. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  347. // NOLINTNEXTLINE
  348. if (mon->monitoredItemId == request.monitoredItemIds[i]) {
  349. UA_Client_MonitoredItem_remove(client, sub, mon);
  350. break;
  351. }
  352. }
  353. #endif
  354. }
  355. return response;
  356. }
  357. UA_StatusCode UA_EXPORT
  358. UA_Client_MonitoredItems_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId) {
  359. UA_DeleteMonitoredItemsRequest request;
  360. UA_DeleteMonitoredItemsRequest_init(&request);
  361. request.subscriptionId = subscriptionId;
  362. request.monitoredItemIds = &monitoredItemId;
  363. request.monitoredItemIdsSize = 1;
  364. UA_DeleteMonitoredItemsResponse response =
  365. UA_Client_MonitoredItems_delete(client, request);
  366. UA_StatusCode retval = response.responseHeader.serviceResult;
  367. if(retval != UA_STATUSCODE_GOOD) {
  368. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  369. return retval;
  370. }
  371. if(response.resultsSize != 1) {
  372. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  373. return UA_STATUSCODE_BADINTERNALERROR;
  374. }
  375. retval = response.results[0];
  376. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  377. return retval;
  378. }
  379. /*************************************/
  380. /* Async Processing of Notifications */
  381. /*************************************/
  382. /* Assume the request is already initialized */
  383. UA_StatusCode
  384. UA_Client_preparePublishRequest(UA_Client *client, UA_PublishRequest *request) {
  385. /* Count acks */
  386. UA_Client_NotificationsAckNumber *ack;
  387. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  388. ++request->subscriptionAcknowledgementsSize;
  389. /* Create the array. Returns a sentinel pointer if the length is zero. */
  390. request->subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
  391. UA_Array_new(request->subscriptionAcknowledgementsSize,
  392. &UA_TYPES[UA_TYPES_SUBSCRIPTIONACKNOWLEDGEMENT]);
  393. if(!request->subscriptionAcknowledgements) {
  394. request->subscriptionAcknowledgementsSize = 0;
  395. return UA_STATUSCODE_BADOUTOFMEMORY;
  396. }
  397. size_t i = 0;
  398. UA_Client_NotificationsAckNumber *ack_tmp;
  399. LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, ack_tmp) {
  400. request->subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
  401. request->subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
  402. ++i;
  403. LIST_REMOVE(ack, listEntry);
  404. UA_free(ack);
  405. }
  406. return UA_STATUSCODE_GOOD;
  407. }
  408. /* According to OPC Unified Architecture, Part 4 5.13.1.1 i) */
  409. /* The value 0 is never used for the sequence number */
  410. static UA_UInt32
  411. UA_Client_Subscriptions_nextSequenceNumber(UA_UInt32 sequenceNumber) {
  412. UA_UInt32 nextSequenceNumber = sequenceNumber + 1;
  413. if(nextSequenceNumber == 0)
  414. nextSequenceNumber = 1;
  415. return nextSequenceNumber;
  416. }
  417. static void
  418. processDataChangeNotification(UA_Client *client, UA_Client_Subscription *sub,
  419. UA_DataChangeNotification *dataChangeNotification) {
  420. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
  421. UA_MonitoredItemNotification *min = &dataChangeNotification->monitoredItems[j];
  422. /* Find the MonitoredItem */
  423. UA_Client_MonitoredItem *mon;
  424. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  425. if(mon->clientHandle == min->clientHandle)
  426. break;
  427. }
  428. if(!mon) {
  429. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  430. "Could not process a notification with clienthandle %u on subscription %u",
  431. min->clientHandle, sub->subscriptionId);
  432. continue;
  433. }
  434. if(mon->isEventMonitoredItem) {
  435. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  436. "MonitoredItem is configured for Events. But received a "
  437. "DataChangeNotification.");
  438. continue;
  439. }
  440. mon->handler.dataChangeCallback(client, sub->subscriptionId, sub->context,
  441. mon->monitoredItemId, mon->context,
  442. &min->value);
  443. }
  444. }
  445. static void
  446. processEventNotification(UA_Client *client, UA_Client_Subscription *sub,
  447. UA_EventNotificationList *eventNotificationList) {
  448. for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
  449. UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
  450. /* Find the MonitoredItem */
  451. UA_Client_MonitoredItem *mon;
  452. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  453. if(mon->monitoredItemId == eventFieldList->clientHandle)
  454. break;
  455. }
  456. if(!mon) {
  457. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  458. "Could not process a notification with clienthandle %u on subscription %u",
  459. eventFieldList->clientHandle, sub->subscriptionId);
  460. continue;
  461. }
  462. if(!mon->isEventMonitoredItem) {
  463. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  464. "MonitoredItem is configured for DataChanges. But received a "
  465. "EventNotification.");
  466. continue;
  467. }
  468. mon->handler.eventCallback(client, sub->subscriptionId, sub->context,
  469. mon->monitoredItemId, mon->context,
  470. eventFieldList->eventFieldsSize,
  471. eventFieldList->eventFields);
  472. }
  473. }
  474. static void
  475. processNotificationMessage(UA_Client *client, UA_Client_Subscription *sub,
  476. UA_ExtensionObject *msg) {
  477. if(msg->encoding != UA_EXTENSIONOBJECT_DECODED)
  478. return;
  479. /* Handle DataChangeNotification */
  480. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
  481. UA_DataChangeNotification *dataChangeNotification =
  482. (UA_DataChangeNotification *)msg->content.decoded.data;
  483. processDataChangeNotification(client, sub, dataChangeNotification);
  484. return;
  485. }
  486. /* Handle EventNotification */
  487. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
  488. UA_EventNotificationList *eventNotificationList =
  489. (UA_EventNotificationList *)msg->content.decoded.data;
  490. processEventNotification(client, sub, eventNotificationList);
  491. return;
  492. }
  493. /* Handle StatusChangeNotification */
  494. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]) {
  495. if(sub->statusChangeCallback) {
  496. sub->statusChangeCallback(client, sub->subscriptionId, sub->context,
  497. (UA_StatusChangeNotification*)msg->content.decoded.data);
  498. } else {
  499. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  500. "Dropped a StatusChangeNotification since no callback is registered");
  501. }
  502. return;
  503. }
  504. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  505. "Unknown notification message type");
  506. }
  507. void
  508. UA_Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
  509. UA_PublishResponse *response) {
  510. UA_NotificationMessage *msg = &response->notificationMessage;
  511. client->currentlyOutStandingPublishRequests--;
  512. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS) {
  513. if(client->config.outStandingPublishRequests > 1) {
  514. client->config.outStandingPublishRequests--;
  515. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  516. "Too many publishrequest, reduce outStandingPublishRequests to %d",
  517. client->config.outStandingPublishRequests);
  518. } else {
  519. UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  520. "Too many publishrequest when outStandingPublishRequests = 1");
  521. UA_Client_Subscriptions_deleteSingle(client, response->subscriptionId);
  522. }
  523. return;
  524. }
  525. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSHUTDOWN)
  526. return;
  527. if(!LIST_FIRST(&client->subscriptions)) {
  528. response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
  529. return;
  530. }
  531. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONCLOSED) {
  532. if(client->state >= UA_CLIENTSTATE_SESSION) {
  533. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  534. "Received Publish Response with code %s",
  535. UA_StatusCode_name(response->responseHeader.serviceResult));
  536. UA_Client_Subscription* sub = findSubscription(client, response->subscriptionId);
  537. if (sub != NULL)
  538. UA_Client_Subscription_deleteInternal(client, sub);
  539. }
  540. return;
  541. }
  542. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONIDINVALID) {
  543. UA_Client_disconnect(client); /* TODO: This should be handled before the process callback */
  544. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  545. "Received BadSessionIdInvalid");
  546. return;
  547. }
  548. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  549. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  550. "Received Publish Response with code %s",
  551. UA_StatusCode_name(response->responseHeader.serviceResult));
  552. return;
  553. }
  554. UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
  555. if(!sub) {
  556. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  557. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  558. "Received Publish Response for a non-existant subscription");
  559. return;
  560. }
  561. sub->lastActivity = UA_DateTime_nowMonotonic();
  562. /* Detect missing message - OPC Unified Architecture, Part 4 5.13.1.1 e) */
  563. if(UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber) != msg->sequenceNumber) {
  564. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  565. "Invalid subscription sequence number: expected %u but got %u",
  566. UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber),
  567. msg->sequenceNumber);
  568. /* This is an error. But we do not abort the connection. Some server
  569. * SDKs misbehave from time to time and send out-of-order sequence
  570. * numbers. (Probably some multi-threading synchronization issue.) */
  571. /* UA_Client_disconnect(client);
  572. return; */
  573. }
  574. /* According to f), a keep-alive message contains no notifications and has the sequence number
  575. * of the next NotificationMessage that is to be sent => More than one consecutive keep-alive
  576. * message or a NotificationMessage following a keep-alive message will share the same sequence
  577. * number. */
  578. if (msg->notificationDataSize)
  579. sub->sequenceNumber = msg->sequenceNumber;
  580. /* Process the notification messages */
  581. for(size_t k = 0; k < msg->notificationDataSize; ++k)
  582. processNotificationMessage(client, sub, &msg->notificationData[k]);
  583. /* Add to the list of pending acks */
  584. for(size_t i = 0; i < response->availableSequenceNumbersSize; i++) {
  585. if(response->availableSequenceNumbers[i] != msg->sequenceNumber)
  586. continue;
  587. UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber*)
  588. UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  589. if(!tmpAck) {
  590. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  591. "Not enough memory to store the acknowledgement for a publish "
  592. "message on subscription %u", sub->subscriptionId);
  593. break;
  594. }
  595. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  596. tmpAck->subAck.subscriptionId = sub->subscriptionId;
  597. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  598. break;
  599. }
  600. }
  601. static void
  602. processPublishResponseAsync(UA_Client *client, void *userdata, UA_UInt32 requestId,
  603. void *response) {
  604. UA_PublishRequest *req = (UA_PublishRequest*)userdata;
  605. UA_PublishResponse *res = (UA_PublishResponse*)response;
  606. /* Process the response */
  607. UA_Client_Subscriptions_processPublishResponse(client, req, res);
  608. /* Delete the cached request */
  609. UA_PublishRequest_delete(req);
  610. /* Fill up the outstanding publish requests */
  611. UA_Client_Subscriptions_backgroundPublish(client);
  612. }
  613. void
  614. UA_Client_Subscriptions_clean(UA_Client *client) {
  615. UA_Client_NotificationsAckNumber *n, *tmp;
  616. LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
  617. LIST_REMOVE(n, listEntry);
  618. UA_free(n);
  619. }
  620. UA_Client_Subscription *sub, *tmps;
  621. LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
  622. UA_Client_Subscription_deleteInternal(client, sub); /* force local removal */
  623. client->monitoredItemHandles = 0;
  624. }
  625. void
  626. UA_Client_Subscriptions_backgroundPublishInactivityCheck(UA_Client *client) {
  627. if(client->state < UA_CLIENTSTATE_SESSION)
  628. return;
  629. /* Is the lack of responses the client's fault? */
  630. if(client->currentlyOutStandingPublishRequests == 0)
  631. return;
  632. UA_Client_Subscription *sub;
  633. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  634. UA_DateTime maxSilence = (UA_DateTime)
  635. ((sub->publishingInterval * sub->maxKeepAliveCount) +
  636. client->config.timeout) * UA_DATETIME_MSEC;
  637. if(maxSilence + sub->lastActivity < UA_DateTime_nowMonotonic()) {
  638. /* Reset activity */
  639. sub->lastActivity = UA_DateTime_nowMonotonic();
  640. if(client->config.subscriptionInactivityCallback)
  641. client->config.subscriptionInactivityCallback(client, sub->subscriptionId,
  642. sub->context);
  643. UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  644. "Inactivity for Subscription %u.", sub->subscriptionId);
  645. }
  646. }
  647. }
  648. UA_StatusCode
  649. UA_Client_Subscriptions_backgroundPublish(UA_Client *client) {
  650. if(client->state < UA_CLIENTSTATE_SESSION)
  651. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  652. /* The session must have at least one subscription */
  653. if(!LIST_FIRST(&client->subscriptions))
  654. return UA_STATUSCODE_GOOD;
  655. while(client->currentlyOutStandingPublishRequests < client->config.outStandingPublishRequests) {
  656. UA_PublishRequest *request = UA_PublishRequest_new();
  657. if (!request)
  658. return UA_STATUSCODE_BADOUTOFMEMORY;
  659. request->requestHeader.timeoutHint=60000;
  660. UA_StatusCode retval = UA_Client_preparePublishRequest(client, request);
  661. if(retval != UA_STATUSCODE_GOOD) {
  662. UA_PublishRequest_delete(request);
  663. return retval;
  664. }
  665. UA_UInt32 requestId;
  666. client->currentlyOutStandingPublishRequests++;
  667. /* Disable the timeout, it is treat in UA_Client_Subscriptions_backgroundPublishInactivityCheck */
  668. retval = __UA_Client_AsyncServiceEx(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST],
  669. processPublishResponseAsync,
  670. &UA_TYPES[UA_TYPES_PUBLISHRESPONSE],
  671. (void*)request, &requestId, 0);
  672. if(retval != UA_STATUSCODE_GOOD) {
  673. UA_PublishRequest_delete(request);
  674. return retval;
  675. }
  676. }
  677. return UA_STATUSCODE_GOOD;
  678. }
  679. #endif /* UA_ENABLE_SUBSCRIPTIONS */