ua_client_subscriptions.c 31 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2015 (c) Oleksiy Vasylyev
  7. * Copyright 2016 (c) Sten Grüner
  8. * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2016-2017 (c) Florian Palm
  10. * Copyright 2017 (c) Frank Meerkötter
  11. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  12. */
  13. #include "ua_client_highlevel.h"
  14. #include "ua_client_internal.h"
  15. #include "ua_util.h"
  16. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  17. /*****************/
  18. /* Subscriptions */
  19. /*****************/
  20. UA_CreateSubscriptionResponse UA_EXPORT
  21. UA_Client_Subscriptions_create(UA_Client *client,
  22. const UA_CreateSubscriptionRequest request,
  23. void *subscriptionContext,
  24. UA_Client_StatusChangeNotificationCallback statusChangeCallback,
  25. UA_Client_DeleteSubscriptionCallback deleteCallback) {
  26. UA_CreateSubscriptionResponse response;
  27. UA_CreateSubscriptionResponse_init(&response);
  28. /* Allocate the internal representation */
  29. UA_Client_Subscription *newSub = (UA_Client_Subscription*)
  30. UA_malloc(sizeof(UA_Client_Subscription));
  31. if(!newSub) {
  32. response.responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  33. return response;
  34. }
  35. /* Send the request as a synchronous service call */
  36. __UA_Client_Service(client,
  37. &request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST],
  38. &response, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE]);
  39. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  40. UA_free(newSub);
  41. return response;
  42. }
  43. /* Prepare the internal representation */
  44. newSub->context = subscriptionContext;
  45. newSub->subscriptionId = response.subscriptionId;
  46. newSub->sequenceNumber = 0;
  47. newSub->lastActivity = UA_DateTime_nowMonotonic();
  48. newSub->statusChangeCallback = statusChangeCallback;
  49. newSub->deleteCallback = deleteCallback;
  50. newSub->publishingInterval = response.revisedPublishingInterval;
  51. newSub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
  52. LIST_INIT(&newSub->monitoredItems);
  53. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  54. return response;
  55. }
  56. static UA_Client_Subscription *
  57. findSubscription(const UA_Client *client, UA_UInt32 subscriptionId) {
  58. UA_Client_Subscription *sub = NULL;
  59. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  60. if(sub->subscriptionId == subscriptionId)
  61. break;
  62. }
  63. return sub;
  64. }
  65. UA_ModifySubscriptionResponse UA_EXPORT
  66. UA_Client_Subscriptions_modify(UA_Client *client, const UA_ModifySubscriptionRequest request) {
  67. UA_ModifySubscriptionResponse response;
  68. UA_ModifySubscriptionResponse_init(&response);
  69. /* Find the internal representation */
  70. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  71. if(!sub) {
  72. response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  73. return response;
  74. }
  75. /* Call the service */
  76. __UA_Client_Service(client,
  77. &request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
  78. &response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]);
  79. /* Adjust the internal representation */
  80. sub->publishingInterval = response.revisedPublishingInterval;
  81. sub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
  82. return response;
  83. }
  84. static void
  85. UA_Client_Subscription_deleteInternal(UA_Client *client, UA_Client_Subscription *sub) {
  86. /* Remove the MonitoredItems */
  87. UA_Client_MonitoredItem *mon, *mon_tmp;
  88. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp)
  89. UA_Client_MonitoredItem_remove(client, sub, mon);
  90. /* Call the delete callback */
  91. if(sub->deleteCallback)
  92. sub->deleteCallback(client, sub->subscriptionId, sub->context);
  93. /* Remove */
  94. LIST_REMOVE(sub, listEntry);
  95. UA_free(sub);
  96. }
  97. UA_DeleteSubscriptionsResponse UA_EXPORT
  98. UA_Client_Subscriptions_delete(UA_Client *client, const UA_DeleteSubscriptionsRequest request) {
  99. UA_STACKARRAY(UA_Client_Subscription*, subs, request.subscriptionIdsSize);
  100. memset(subs, 0, sizeof(void*) * request.subscriptionIdsSize);
  101. /* temporary remove the subscriptions from the list */
  102. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  103. subs[i] = findSubscription(client, request.subscriptionIds[i]);
  104. if (subs[i])
  105. LIST_REMOVE(subs[i], listEntry);
  106. }
  107. /* Send the request */
  108. UA_DeleteSubscriptionsResponse response;
  109. __UA_Client_Service(client,
  110. &request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST],
  111. &response, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE]);
  112. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  113. goto cleanup;
  114. if(request.subscriptionIdsSize != response.resultsSize) {
  115. response.responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  116. goto cleanup;
  117. }
  118. /* Loop over the removed subscriptions and remove internally */
  119. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  120. if(response.results[i] != UA_STATUSCODE_GOOD && response.results[i] != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
  121. /* Something was wrong, reinsert the subscription in the list */
  122. if (subs[i])
  123. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  124. continue;
  125. }
  126. if(!subs[i]) {
  127. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  128. "No internal representation of subscription %u",
  129. request.subscriptionIds[i]);
  130. continue;
  131. } else {
  132. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  133. }
  134. UA_Client_Subscription_deleteInternal(client, subs[i]);
  135. }
  136. return response;
  137. cleanup:
  138. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  139. if (subs[i]) {
  140. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  141. }
  142. }
  143. return response;
  144. }
  145. UA_StatusCode UA_EXPORT
  146. UA_Client_Subscriptions_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId) {
  147. UA_DeleteSubscriptionsRequest request;
  148. UA_DeleteSubscriptionsRequest_init(&request);
  149. request.subscriptionIds = &subscriptionId;
  150. request.subscriptionIdsSize = 1;
  151. UA_DeleteSubscriptionsResponse response =
  152. UA_Client_Subscriptions_delete(client, request);
  153. UA_StatusCode retval = response.responseHeader.serviceResult;
  154. if(retval != UA_STATUSCODE_GOOD) {
  155. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  156. return retval;
  157. }
  158. if(response.resultsSize != 1) {
  159. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  160. return UA_STATUSCODE_BADINTERNALERROR;
  161. }
  162. retval = response.results[0];
  163. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  164. return retval;
  165. }
  166. /******************/
  167. /* MonitoredItems */
  168. /******************/
  169. void
  170. UA_Client_MonitoredItem_remove(UA_Client *client, UA_Client_Subscription *sub,
  171. UA_Client_MonitoredItem *mon) {
  172. LIST_REMOVE(mon, listEntry);
  173. if(mon->deleteCallback)
  174. mon->deleteCallback(client, sub->subscriptionId, sub->context,
  175. mon->monitoredItemId, mon->context);
  176. UA_free(mon);
  177. }
  178. static void
  179. __UA_Client_MonitoredItems_create(UA_Client *client,
  180. const UA_CreateMonitoredItemsRequest *request,
  181. void **contexts, void **handlingCallbacks,
  182. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
  183. UA_CreateMonitoredItemsResponse *response) {
  184. UA_CreateMonitoredItemsResponse_init(response);
  185. if (!request->itemsToCreateSize) {
  186. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  187. return;
  188. }
  189. /* Fix clang warning */
  190. size_t itemsToCreateSize = request->itemsToCreateSize;
  191. UA_Client_Subscription *sub = NULL;
  192. /* Allocate the memory for internal representations */
  193. UA_STACKARRAY(UA_Client_MonitoredItem*, mis, itemsToCreateSize);
  194. memset(mis, 0, sizeof(void*) * itemsToCreateSize);
  195. for(size_t i = 0; i < itemsToCreateSize; i++) {
  196. mis[i] = (UA_Client_MonitoredItem*)UA_malloc(sizeof(UA_Client_MonitoredItem));
  197. if(!mis[i]) {
  198. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  199. goto cleanup;
  200. }
  201. }
  202. /* Get the subscription */
  203. sub = findSubscription(client, request->subscriptionId);
  204. if(!sub) {
  205. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  206. goto cleanup;
  207. }
  208. /* Set the clientHandle */
  209. for(size_t i = 0; i < itemsToCreateSize; i++)
  210. request->itemsToCreate[i].requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  211. /* Call the service */
  212. __UA_Client_Service(client, request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
  213. response, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE]);
  214. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  215. goto cleanup;
  216. if(response->resultsSize != itemsToCreateSize) {
  217. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  218. goto cleanup;
  219. }
  220. /* Add internally */
  221. for(size_t i = 0; i < itemsToCreateSize; i++) {
  222. if(response->results[i].statusCode != UA_STATUSCODE_GOOD) {
  223. if (deleteCallbacks[i])
  224. deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
  225. UA_free(mis[i]);
  226. mis[i] = NULL;
  227. continue;
  228. }
  229. UA_Client_MonitoredItem *newMon = mis[i];
  230. newMon->clientHandle = request->itemsToCreate[i].requestedParameters.clientHandle;
  231. newMon->monitoredItemId = response->results[i].monitoredItemId;
  232. newMon->context = contexts[i];
  233. newMon->deleteCallback = deleteCallbacks[i];
  234. newMon->handler.dataChangeCallback =
  235. (UA_Client_DataChangeNotificationCallback)(uintptr_t)handlingCallbacks[i];
  236. newMon->isEventMonitoredItem =
  237. (request->itemsToCreate[i].itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER);
  238. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  239. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  240. "Subscription %u | Added a MonitoredItem with handle %u",
  241. sub->subscriptionId, newMon->clientHandle);
  242. }
  243. return;
  244. cleanup:
  245. for(size_t i = 0; i < itemsToCreateSize; i++) {
  246. if (deleteCallbacks[i]) {
  247. if (sub)
  248. deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
  249. else
  250. deleteCallbacks[i](client, 0, NULL, 0, contexts[i]);
  251. }
  252. if(mis[i])
  253. UA_free(mis[i]);
  254. }
  255. }
  256. UA_CreateMonitoredItemsResponse UA_EXPORT
  257. UA_Client_MonitoredItems_createDataChanges(UA_Client *client,
  258. const UA_CreateMonitoredItemsRequest request, void **contexts,
  259. UA_Client_DataChangeNotificationCallback *callbacks,
  260. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) {
  261. UA_CreateMonitoredItemsResponse response;
  262. __UA_Client_MonitoredItems_create(client, &request, contexts,
  263. (void**)(uintptr_t)callbacks, deleteCallbacks, &response);
  264. return response;
  265. }
  266. UA_MonitoredItemCreateResult UA_EXPORT
  267. UA_Client_MonitoredItems_createDataChange(UA_Client *client, UA_UInt32 subscriptionId,
  268. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  269. void *context, UA_Client_DataChangeNotificationCallback callback,
  270. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  271. UA_CreateMonitoredItemsRequest request;
  272. UA_CreateMonitoredItemsRequest_init(&request);
  273. request.subscriptionId = subscriptionId;
  274. request.timestampsToReturn = timestampsToReturn;
  275. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  276. request.itemsToCreateSize = 1;
  277. UA_CreateMonitoredItemsResponse response =
  278. UA_Client_MonitoredItems_createDataChanges(client, request, &context,
  279. &callback, &deleteCallback);
  280. UA_MonitoredItemCreateResult result;
  281. UA_MonitoredItemCreateResult_init(&result);
  282. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  283. result.statusCode = response.responseHeader.serviceResult;
  284. if(result.statusCode == UA_STATUSCODE_GOOD &&
  285. response.resultsSize != 1)
  286. result.statusCode = UA_STATUSCODE_BADINTERNALERROR;
  287. if(result.statusCode == UA_STATUSCODE_GOOD)
  288. UA_MonitoredItemCreateResult_copy(&response.results[0] , &result);
  289. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  290. return result;
  291. }
  292. UA_CreateMonitoredItemsResponse UA_EXPORT
  293. UA_Client_MonitoredItems_createEvents(UA_Client *client,
  294. const UA_CreateMonitoredItemsRequest request, void **contexts,
  295. UA_Client_EventNotificationCallback *callbacks,
  296. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) {
  297. UA_CreateMonitoredItemsResponse response;
  298. __UA_Client_MonitoredItems_create(client, &request, contexts,
  299. (void**)(uintptr_t)callbacks, deleteCallbacks, &response);
  300. return response;
  301. }
  302. UA_MonitoredItemCreateResult UA_EXPORT
  303. UA_Client_MonitoredItems_createEvent(UA_Client *client, UA_UInt32 subscriptionId,
  304. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  305. void *context, UA_Client_EventNotificationCallback callback,
  306. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  307. UA_CreateMonitoredItemsRequest request;
  308. UA_CreateMonitoredItemsRequest_init(&request);
  309. request.subscriptionId = subscriptionId;
  310. request.timestampsToReturn = timestampsToReturn;
  311. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  312. request.itemsToCreateSize = 1;
  313. UA_CreateMonitoredItemsResponse response =
  314. UA_Client_MonitoredItems_createEvents(client, request, &context,
  315. &callback, &deleteCallback);
  316. UA_MonitoredItemCreateResult result;
  317. UA_MonitoredItemCreateResult_copy(response.results , &result);
  318. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  319. return result;
  320. }
  321. UA_DeleteMonitoredItemsResponse UA_EXPORT
  322. UA_Client_MonitoredItems_delete(UA_Client *client, const UA_DeleteMonitoredItemsRequest request) {
  323. /* Send the request */
  324. UA_DeleteMonitoredItemsResponse response;
  325. __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST],
  326. &response, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE]);
  327. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  328. return response;
  329. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  330. if(!sub) {
  331. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  332. "No internal representation of subscription %u",
  333. request.subscriptionId);
  334. return response;
  335. }
  336. /* Loop over deleted MonitoredItems */
  337. for(size_t i = 0; i < response.resultsSize; i++) {
  338. if(response.results[i] != UA_STATUSCODE_GOOD &&
  339. response.results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
  340. continue;
  341. }
  342. #ifndef __clang_analyzer__
  343. /* Delete the internal representation */
  344. UA_Client_MonitoredItem *mon;
  345. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  346. if(mon->monitoredItemId == request.monitoredItemIds[i]) {
  347. UA_Client_MonitoredItem_remove(client, sub, mon);
  348. break;
  349. }
  350. }
  351. #endif
  352. }
  353. return response;
  354. }
  355. UA_StatusCode UA_EXPORT
  356. UA_Client_MonitoredItems_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId) {
  357. UA_DeleteMonitoredItemsRequest request;
  358. UA_DeleteMonitoredItemsRequest_init(&request);
  359. request.subscriptionId = subscriptionId;
  360. request.monitoredItemIds = &monitoredItemId;
  361. request.monitoredItemIdsSize = 1;
  362. UA_DeleteMonitoredItemsResponse response =
  363. UA_Client_MonitoredItems_delete(client, request);
  364. UA_StatusCode retval = response.responseHeader.serviceResult;
  365. if(retval != UA_STATUSCODE_GOOD) {
  366. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  367. return retval;
  368. }
  369. if(response.resultsSize != 1) {
  370. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  371. return UA_STATUSCODE_BADINTERNALERROR;
  372. }
  373. retval = response.results[0];
  374. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  375. return retval;
  376. }
  377. /*************************************/
  378. /* Async Processing of Notifications */
  379. /*************************************/
  380. /* Assume the request is already initialized */
  381. UA_StatusCode
  382. UA_Client_preparePublishRequest(UA_Client *client, UA_PublishRequest *request) {
  383. /* Count acks */
  384. UA_Client_NotificationsAckNumber *ack;
  385. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  386. ++request->subscriptionAcknowledgementsSize;
  387. /* Create the array. Returns a sentinel pointer if the length is zero. */
  388. request->subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
  389. UA_Array_new(request->subscriptionAcknowledgementsSize,
  390. &UA_TYPES[UA_TYPES_SUBSCRIPTIONACKNOWLEDGEMENT]);
  391. if(!request->subscriptionAcknowledgements) {
  392. request->subscriptionAcknowledgementsSize = 0;
  393. return UA_STATUSCODE_BADOUTOFMEMORY;
  394. }
  395. size_t i = 0;
  396. UA_Client_NotificationsAckNumber *ack_tmp;
  397. LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, ack_tmp) {
  398. request->subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
  399. request->subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
  400. ++i;
  401. LIST_REMOVE(ack, listEntry);
  402. UA_free(ack);
  403. }
  404. return UA_STATUSCODE_GOOD;
  405. }
  406. /* According to OPC Unified Architecture, Part 4 5.13.1.1 i) */
  407. /* The value 0 is never used for the sequence number */
  408. static UA_UInt32
  409. UA_Client_Subscriptions_nextSequenceNumber(UA_UInt32 sequenceNumber) {
  410. UA_UInt32 nextSequenceNumber = sequenceNumber + 1;
  411. if(nextSequenceNumber == 0)
  412. nextSequenceNumber = 1;
  413. return nextSequenceNumber;
  414. }
  415. static void
  416. processDataChangeNotification(UA_Client *client, UA_Client_Subscription *sub,
  417. UA_DataChangeNotification *dataChangeNotification) {
  418. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
  419. UA_MonitoredItemNotification *min = &dataChangeNotification->monitoredItems[j];
  420. /* Find the MonitoredItem */
  421. UA_Client_MonitoredItem *mon;
  422. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  423. if(mon->clientHandle == min->clientHandle)
  424. break;
  425. }
  426. if(!mon) {
  427. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  428. "Could not process a notification with clienthandle %u on subscription %u",
  429. min->clientHandle, sub->subscriptionId);
  430. continue;
  431. }
  432. if(mon->isEventMonitoredItem) {
  433. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  434. "MonitoredItem is configured for Events. But received a "
  435. "DataChangeNotification.");
  436. continue;
  437. }
  438. mon->handler.dataChangeCallback(client, sub->subscriptionId, sub->context,
  439. mon->monitoredItemId, mon->context,
  440. &min->value);
  441. }
  442. }
  443. static void
  444. processEventNotification(UA_Client *client, UA_Client_Subscription *sub,
  445. UA_EventNotificationList *eventNotificationList) {
  446. for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
  447. UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
  448. /* Find the MonitoredItem */
  449. UA_Client_MonitoredItem *mon;
  450. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  451. if(mon->monitoredItemId == eventFieldList->clientHandle)
  452. break;
  453. }
  454. if(!mon) {
  455. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  456. "Could not process a notification with clienthandle %u on subscription %u",
  457. eventFieldList->clientHandle, sub->subscriptionId);
  458. continue;
  459. }
  460. if(!mon->isEventMonitoredItem) {
  461. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  462. "MonitoredItem is configured for DataChanges. But received a "
  463. "EventNotification.");
  464. continue;
  465. }
  466. mon->handler.eventCallback(client, sub->subscriptionId, sub->context,
  467. mon->monitoredItemId, mon->context,
  468. eventFieldList->eventFieldsSize,
  469. eventFieldList->eventFields);
  470. }
  471. }
  472. static void
  473. processNotificationMessage(UA_Client *client, UA_Client_Subscription *sub,
  474. UA_ExtensionObject *msg) {
  475. if(msg->encoding != UA_EXTENSIONOBJECT_DECODED)
  476. return;
  477. /* Handle DataChangeNotification */
  478. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
  479. UA_DataChangeNotification *dataChangeNotification =
  480. (UA_DataChangeNotification *)msg->content.decoded.data;
  481. processDataChangeNotification(client, sub, dataChangeNotification);
  482. return;
  483. }
  484. /* Handle EventNotification */
  485. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
  486. UA_EventNotificationList *eventNotificationList =
  487. (UA_EventNotificationList *)msg->content.decoded.data;
  488. processEventNotification(client, sub, eventNotificationList);
  489. return;
  490. }
  491. /* Handle StatusChangeNotification */
  492. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]) {
  493. if(sub->statusChangeCallback) {
  494. sub->statusChangeCallback(client, sub->subscriptionId, sub->context,
  495. (UA_StatusChangeNotification*)msg->content.decoded.data);
  496. } else {
  497. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  498. "Dropped a StatusChangeNotification since no callback is registered");
  499. }
  500. return;
  501. }
  502. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  503. "Unknown notification message type");
  504. }
  505. void
  506. UA_Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
  507. UA_PublishResponse *response) {
  508. UA_NotificationMessage *msg = &response->notificationMessage;
  509. client->currentlyOutStandingPublishRequests--;
  510. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS) {
  511. if(client->config.outStandingPublishRequests > 1) {
  512. client->config.outStandingPublishRequests--;
  513. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  514. "Too many publishrequest, reduce outStandingPublishRequests to %d",
  515. client->config.outStandingPublishRequests);
  516. } else {
  517. UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT,
  518. "Too many publishrequest when outStandingPublishRequests = 1");
  519. UA_Client_Subscriptions_deleteSingle(client, response->subscriptionId);
  520. }
  521. return;
  522. }
  523. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSHUTDOWN)
  524. return;
  525. if(!LIST_FIRST(&client->subscriptions)) {
  526. response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
  527. return;
  528. }
  529. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONCLOSED) {
  530. if(client->state >= UA_CLIENTSTATE_SESSION) {
  531. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  532. "Received Publish Response with code %s",
  533. UA_StatusCode_name(response->responseHeader.serviceResult));
  534. }
  535. return;
  536. }
  537. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONIDINVALID) {
  538. UA_Client_close(client); /* TODO: This should be handled before the process callback */
  539. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  540. "Received BadSessionIdInvalid");
  541. return;
  542. }
  543. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  544. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  545. "Received Publish Response with code %s",
  546. UA_StatusCode_name(response->responseHeader.serviceResult));
  547. return;
  548. }
  549. UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
  550. if(!sub) {
  551. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  552. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  553. "Received Publish Response for a non-existant subscription");
  554. return;
  555. }
  556. sub->lastActivity = UA_DateTime_nowMonotonic();
  557. /* Detect missing message - OPC Unified Architecture, Part 4 5.13.1.1 e) */
  558. if(UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber) != msg->sequenceNumber) {
  559. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  560. "Invalid subscription sequence number: expected %u but got %u",
  561. UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber),
  562. msg->sequenceNumber);
  563. /* This is an error. But we do not abort the connection. Some server
  564. * SDKs misbehave from time to time and send out-of-order sequence
  565. * numbers. (Probably some multi-threading synchronization issue.) */
  566. /* UA_Client_close(client);
  567. return; */
  568. }
  569. /* According to f), a keep-alive message contains no notifications and has the sequence number
  570. * of the next NotificationMessage that is to be sent => More than one consecutive keep-alive
  571. * message or a NotificationMessage following a keep-alive message will share the same sequence
  572. * number. */
  573. if (msg->notificationDataSize)
  574. sub->sequenceNumber = msg->sequenceNumber;
  575. /* Process the notification messages */
  576. for(size_t k = 0; k < msg->notificationDataSize; ++k)
  577. processNotificationMessage(client, sub, &msg->notificationData[k]);
  578. /* Add to the list of pending acks */
  579. for(size_t i = 0; i < response->availableSequenceNumbersSize; i++) {
  580. if(response->availableSequenceNumbers[i] != msg->sequenceNumber)
  581. continue;
  582. UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber*)
  583. UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  584. if(!tmpAck) {
  585. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  586. "Not enough memory to store the acknowledgement for a publish "
  587. "message on subscription %u", sub->subscriptionId);
  588. break;
  589. }
  590. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  591. tmpAck->subAck.subscriptionId = sub->subscriptionId;
  592. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  593. break;
  594. }
  595. }
  596. static void
  597. processPublishResponseAsync(UA_Client *client, void *userdata, UA_UInt32 requestId,
  598. void *response) {
  599. UA_PublishRequest *req = (UA_PublishRequest*)userdata;
  600. UA_PublishResponse *res = (UA_PublishResponse*)response;
  601. /* Process the response */
  602. UA_Client_Subscriptions_processPublishResponse(client, req, res);
  603. /* Delete the cached request */
  604. UA_PublishRequest_delete(req);
  605. /* Fill up the outstanding publish requests */
  606. UA_Client_Subscriptions_backgroundPublish(client);
  607. }
  608. void
  609. UA_Client_Subscriptions_clean(UA_Client *client) {
  610. UA_Client_NotificationsAckNumber *n, *tmp;
  611. LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
  612. LIST_REMOVE(n, listEntry);
  613. UA_free(n);
  614. }
  615. UA_Client_Subscription *sub, *tmps;
  616. LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
  617. UA_Client_Subscription_deleteInternal(client, sub); /* force local removal */
  618. client->monitoredItemHandles = 0;
  619. }
  620. void
  621. UA_Client_Subscriptions_backgroundPublishInactivityCheck(UA_Client *client) {
  622. if(client->state < UA_CLIENTSTATE_SESSION)
  623. return;
  624. /* Is the lack of responses the client's fault? */
  625. if(client->currentlyOutStandingPublishRequests == 0)
  626. return;
  627. UA_Client_Subscription *sub;
  628. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  629. UA_DateTime maxSilence = (UA_DateTime)
  630. ((sub->publishingInterval * sub->maxKeepAliveCount) +
  631. client->config.timeout) * UA_DATETIME_MSEC;
  632. if(maxSilence + sub->lastActivity < UA_DateTime_nowMonotonic()) {
  633. /* Reset activity */
  634. sub->lastActivity = UA_DateTime_nowMonotonic();
  635. if(client->config.subscriptionInactivityCallback)
  636. client->config.subscriptionInactivityCallback(client, sub->subscriptionId, sub->context);
  637. UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT,
  638. "Inactivity for Subscription %u.", sub->subscriptionId);
  639. }
  640. }
  641. }
  642. UA_StatusCode
  643. UA_Client_Subscriptions_backgroundPublish(UA_Client *client) {
  644. if(client->state < UA_CLIENTSTATE_SESSION)
  645. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  646. /* The session must have at least one subscription */
  647. if(!LIST_FIRST(&client->subscriptions))
  648. return UA_STATUSCODE_GOOD;
  649. while(client->currentlyOutStandingPublishRequests < client->config.outStandingPublishRequests) {
  650. UA_PublishRequest *request = UA_PublishRequest_new();
  651. if (!request)
  652. return UA_STATUSCODE_BADOUTOFMEMORY;
  653. UA_StatusCode retval = UA_Client_preparePublishRequest(client, request);
  654. if(retval != UA_STATUSCODE_GOOD) {
  655. UA_PublishRequest_delete(request);
  656. return retval;
  657. }
  658. UA_UInt32 requestId;
  659. client->currentlyOutStandingPublishRequests++;
  660. /* Disable the timeout, it is treat in UA_Client_Subscriptions_backgroundPublishInactivityCheck */
  661. retval = __UA_Client_AsyncServiceEx(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST],
  662. processPublishResponseAsync,
  663. &UA_TYPES[UA_TYPES_PUBLISHRESPONSE],
  664. (void*)request, &requestId, 0);
  665. if(retval != UA_STATUSCODE_GOOD) {
  666. UA_PublishRequest_delete(request);
  667. return retval;
  668. }
  669. }
  670. return UA_STATUSCODE_GOOD;
  671. }
  672. #endif /* UA_ENABLE_SUBSCRIPTIONS */