ua_client_subscriptions.c 30 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2015 (c) Oleksiy Vasylyev
  7. * Copyright 2016 (c) Sten Grüner
  8. * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2016-2017 (c) Florian Palm
  10. * Copyright 2017 (c) Frank Meerkötter
  11. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  12. */
  13. #include "ua_client_highlevel.h"
  14. #include "ua_client_internal.h"
  15. #include "ua_util.h"
  16. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  17. /*****************/
  18. /* Subscriptions */
  19. /*****************/
  20. UA_CreateSubscriptionResponse UA_EXPORT
  21. UA_Client_Subscriptions_create(UA_Client *client,
  22. const UA_CreateSubscriptionRequest request,
  23. void *subscriptionContext,
  24. UA_Client_StatusChangeNotificationCallback statusChangeCallback,
  25. UA_Client_DeleteSubscriptionCallback deleteCallback) {
  26. UA_CreateSubscriptionResponse response;
  27. UA_CreateSubscriptionResponse_init(&response);
  28. /* Allocate the internal representation */
  29. UA_Client_Subscription *newSub = (UA_Client_Subscription*)
  30. UA_malloc(sizeof(UA_Client_Subscription));
  31. if(!newSub) {
  32. response.responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  33. return response;
  34. }
  35. /* Send the request as a synchronous service call */
  36. __UA_Client_Service(client,
  37. &request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST],
  38. &response, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE]);
  39. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  40. UA_free(newSub);
  41. return response;
  42. }
  43. /* Prepare the internal representation */
  44. newSub->context = subscriptionContext;
  45. newSub->subscriptionId = response.subscriptionId;
  46. newSub->sequenceNumber = 0;
  47. newSub->lastActivity = UA_DateTime_nowMonotonic();
  48. newSub->statusChangeCallback = statusChangeCallback;
  49. newSub->deleteCallback = deleteCallback;
  50. newSub->publishingInterval = response.revisedPublishingInterval;
  51. newSub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
  52. LIST_INIT(&newSub->monitoredItems);
  53. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  54. return response;
  55. }
  56. static UA_Client_Subscription *
  57. findSubscription(const UA_Client *client, UA_UInt32 subscriptionId) {
  58. UA_Client_Subscription *sub = NULL;
  59. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  60. if(sub->subscriptionId == subscriptionId)
  61. break;
  62. }
  63. return sub;
  64. }
  65. UA_ModifySubscriptionResponse UA_EXPORT
  66. UA_Client_Subscriptions_modify(UA_Client *client, const UA_ModifySubscriptionRequest request) {
  67. UA_ModifySubscriptionResponse response;
  68. UA_ModifySubscriptionResponse_init(&response);
  69. /* Find the internal representation */
  70. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  71. if(!sub) {
  72. response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  73. return response;
  74. }
  75. /* Call the service */
  76. __UA_Client_Service(client,
  77. &request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
  78. &response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]);
  79. /* Adjust the internal representation */
  80. sub->publishingInterval = response.revisedPublishingInterval;
  81. sub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
  82. return response;
  83. }
  84. static void
  85. UA_Client_Subscription_deleteInternal(UA_Client *client, UA_Client_Subscription *sub) {
  86. /* Remove the MonitoredItems */
  87. UA_Client_MonitoredItem *mon, *mon_tmp;
  88. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp)
  89. UA_Client_MonitoredItem_remove(client, sub, mon);
  90. /* Call the delete callback */
  91. if(sub->deleteCallback)
  92. sub->deleteCallback(client, sub->subscriptionId, sub->context);
  93. /* Remove */
  94. LIST_REMOVE(sub, listEntry);
  95. UA_free(sub);
  96. }
  97. UA_DeleteSubscriptionsResponse UA_EXPORT
  98. UA_Client_Subscriptions_delete(UA_Client *client, const UA_DeleteSubscriptionsRequest request) {
  99. UA_STACKARRAY(UA_Client_Subscription*, subs, request.subscriptionIdsSize);
  100. memset(subs, 0, sizeof(void*) * request.subscriptionIdsSize);
  101. /* temporary remove the subscriptions from the list */
  102. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  103. subs[i] = findSubscription(client, request.subscriptionIds[i]);
  104. if (subs[i])
  105. LIST_REMOVE(subs[i], listEntry);
  106. }
  107. /* Send the request */
  108. UA_DeleteSubscriptionsResponse response;
  109. __UA_Client_Service(client,
  110. &request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST],
  111. &response, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE]);
  112. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  113. goto cleanup;
  114. if(request.subscriptionIdsSize != response.resultsSize) {
  115. response.responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  116. goto cleanup;
  117. }
  118. /* Loop over the removed subscriptions and remove internally */
  119. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  120. if(response.results[i] != UA_STATUSCODE_GOOD && response.results[i] != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
  121. /* Something was wrong, reinsert the subscription in the list */
  122. if (subs[i])
  123. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  124. continue;
  125. }
  126. if(!subs[i]) {
  127. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  128. "No internal representation of subscription %u",
  129. request.subscriptionIds[i]);
  130. continue;
  131. } else {
  132. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  133. }
  134. UA_Client_Subscription_deleteInternal(client, subs[i]);
  135. }
  136. return response;
  137. cleanup:
  138. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  139. if (subs[i]) {
  140. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  141. }
  142. }
  143. return response;
  144. }
  145. UA_StatusCode UA_EXPORT
  146. UA_Client_Subscriptions_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId) {
  147. UA_DeleteSubscriptionsRequest request;
  148. UA_DeleteSubscriptionsRequest_init(&request);
  149. request.subscriptionIds = &subscriptionId;
  150. request.subscriptionIdsSize = 1;
  151. UA_DeleteSubscriptionsResponse response =
  152. UA_Client_Subscriptions_delete(client, request);
  153. UA_StatusCode retval = response.responseHeader.serviceResult;
  154. if(retval != UA_STATUSCODE_GOOD) {
  155. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  156. return retval;
  157. }
  158. if(response.resultsSize != 1) {
  159. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  160. return UA_STATUSCODE_BADINTERNALERROR;
  161. }
  162. retval = response.results[0];
  163. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  164. return retval;
  165. }
  166. /******************/
  167. /* MonitoredItems */
  168. /******************/
  169. void
  170. UA_Client_MonitoredItem_remove(UA_Client *client, UA_Client_Subscription *sub,
  171. UA_Client_MonitoredItem *mon) {
  172. LIST_REMOVE(mon, listEntry);
  173. if(mon->deleteCallback)
  174. mon->deleteCallback(client, sub->subscriptionId, sub->context,
  175. mon->monitoredItemId, mon->context);
  176. UA_free(mon);
  177. }
  178. static void
  179. __UA_Client_MonitoredItems_create(UA_Client *client,
  180. const UA_CreateMonitoredItemsRequest *request,
  181. void **contexts, void **handlingCallbacks,
  182. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
  183. UA_CreateMonitoredItemsResponse *response) {
  184. UA_CreateMonitoredItemsResponse_init(response);
  185. if (!request->itemsToCreateSize) {
  186. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  187. return;
  188. }
  189. /* Fix clang warning */
  190. size_t itemsToCreateSize = request->itemsToCreateSize;
  191. UA_Client_Subscription *sub = NULL;
  192. /* Allocate the memory for internal representations */
  193. UA_STACKARRAY(UA_Client_MonitoredItem*, mis, itemsToCreateSize);
  194. memset(mis, 0, sizeof(void*) * itemsToCreateSize);
  195. for(size_t i = 0; i < itemsToCreateSize; i++) {
  196. mis[i] = (UA_Client_MonitoredItem*)UA_malloc(sizeof(UA_Client_MonitoredItem));
  197. if(!mis[i]) {
  198. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  199. goto cleanup;
  200. }
  201. }
  202. /* Get the subscription */
  203. sub = findSubscription(client, request->subscriptionId);
  204. if(!sub) {
  205. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  206. goto cleanup;
  207. }
  208. /* Set the clientHandle */
  209. for(size_t i = 0; i < itemsToCreateSize; i++)
  210. request->itemsToCreate[i].requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  211. /* Call the service */
  212. __UA_Client_Service(client, request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
  213. response, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE]);
  214. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  215. goto cleanup;
  216. if(response->resultsSize != itemsToCreateSize) {
  217. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  218. goto cleanup;
  219. }
  220. /* Add internally */
  221. for(size_t i = 0; i < itemsToCreateSize; i++) {
  222. if(response->results[i].statusCode != UA_STATUSCODE_GOOD) {
  223. if (deleteCallbacks[i])
  224. deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
  225. UA_free(mis[i]);
  226. mis[i] = NULL;
  227. continue;
  228. }
  229. UA_Client_MonitoredItem *newMon = mis[i];
  230. newMon->clientHandle = request->itemsToCreate[i].requestedParameters.clientHandle;
  231. newMon->monitoredItemId = response->results[i].monitoredItemId;
  232. newMon->context = contexts[i];
  233. newMon->deleteCallback = deleteCallbacks[i];
  234. newMon->handler.dataChangeCallback =
  235. (UA_Client_DataChangeNotificationCallback)(uintptr_t)handlingCallbacks[i];
  236. newMon->isEventMonitoredItem =
  237. (request->itemsToCreate[i].itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER);
  238. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  239. }
  240. return;
  241. cleanup:
  242. for(size_t i = 0; i < itemsToCreateSize; i++) {
  243. if (deleteCallbacks[i]) {
  244. if (sub)
  245. deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
  246. else
  247. deleteCallbacks[i](client, 0, NULL, 0, contexts[i]);
  248. }
  249. if(mis[i])
  250. UA_free(mis[i]);
  251. }
  252. }
  253. UA_CreateMonitoredItemsResponse UA_EXPORT
  254. UA_Client_MonitoredItems_createDataChanges(UA_Client *client,
  255. const UA_CreateMonitoredItemsRequest request, void **contexts,
  256. UA_Client_DataChangeNotificationCallback *callbacks,
  257. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) {
  258. UA_CreateMonitoredItemsResponse response;
  259. __UA_Client_MonitoredItems_create(client, &request, contexts,
  260. (void**)(uintptr_t)callbacks, deleteCallbacks, &response);
  261. return response;
  262. }
  263. UA_MonitoredItemCreateResult UA_EXPORT
  264. UA_Client_MonitoredItems_createDataChange(UA_Client *client, UA_UInt32 subscriptionId,
  265. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  266. void *context, UA_Client_DataChangeNotificationCallback callback,
  267. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  268. UA_CreateMonitoredItemsRequest request;
  269. UA_CreateMonitoredItemsRequest_init(&request);
  270. request.subscriptionId = subscriptionId;
  271. request.timestampsToReturn = timestampsToReturn;
  272. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  273. request.itemsToCreateSize = 1;
  274. UA_CreateMonitoredItemsResponse response =
  275. UA_Client_MonitoredItems_createDataChanges(client, request, &context,
  276. &callback, &deleteCallback);
  277. UA_MonitoredItemCreateResult result;
  278. UA_MonitoredItemCreateResult_init(&result);
  279. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  280. result.statusCode = response.responseHeader.serviceResult;
  281. if(result.statusCode == UA_STATUSCODE_GOOD &&
  282. response.resultsSize != 1)
  283. result.statusCode = UA_STATUSCODE_BADINTERNALERROR;
  284. if(result.statusCode == UA_STATUSCODE_GOOD)
  285. UA_MonitoredItemCreateResult_copy(&response.results[0] , &result);
  286. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  287. return result;
  288. }
  289. UA_CreateMonitoredItemsResponse UA_EXPORT
  290. UA_Client_MonitoredItems_createEvents(UA_Client *client,
  291. const UA_CreateMonitoredItemsRequest request, void **contexts,
  292. UA_Client_EventNotificationCallback *callbacks,
  293. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) {
  294. UA_CreateMonitoredItemsResponse response;
  295. __UA_Client_MonitoredItems_create(client, &request, contexts,
  296. (void**)(uintptr_t)callbacks, deleteCallbacks, &response);
  297. return response;
  298. }
  299. UA_MonitoredItemCreateResult UA_EXPORT
  300. UA_Client_MonitoredItems_createEvent(UA_Client *client, UA_UInt32 subscriptionId,
  301. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  302. void *context, UA_Client_EventNotificationCallback callback,
  303. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  304. UA_CreateMonitoredItemsRequest request;
  305. UA_CreateMonitoredItemsRequest_init(&request);
  306. request.subscriptionId = subscriptionId;
  307. request.timestampsToReturn = timestampsToReturn;
  308. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  309. request.itemsToCreateSize = 1;
  310. UA_CreateMonitoredItemsResponse response =
  311. UA_Client_MonitoredItems_createEvents(client, request, &context,
  312. &callback, &deleteCallback);
  313. UA_MonitoredItemCreateResult result;
  314. UA_MonitoredItemCreateResult_copy(response.results , &result);
  315. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  316. return result;
  317. }
  318. UA_DeleteMonitoredItemsResponse UA_EXPORT
  319. UA_Client_MonitoredItems_delete(UA_Client *client, const UA_DeleteMonitoredItemsRequest request) {
  320. /* Send the request */
  321. UA_DeleteMonitoredItemsResponse response;
  322. __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST],
  323. &response, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE]);
  324. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  325. return response;
  326. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  327. if(!sub) {
  328. UA_LOG_INFO(client->config.logger, UA_LOGCATEGORY_CLIENT,
  329. "No internal representation of subscription %u",
  330. request.subscriptionId);
  331. return response;
  332. }
  333. /* Loop over deleted MonitoredItems */
  334. for(size_t i = 0; i < response.resultsSize; i++) {
  335. if(response.results[i] != UA_STATUSCODE_GOOD &&
  336. response.results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
  337. continue;
  338. }
  339. #ifndef __clang_analyzer__
  340. /* Delete the internal representation */
  341. UA_Client_MonitoredItem *mon;
  342. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  343. if(mon->monitoredItemId == request.monitoredItemIds[i]) {
  344. UA_Client_MonitoredItem_remove(client, sub, mon);
  345. break;
  346. }
  347. }
  348. #endif
  349. }
  350. return response;
  351. }
  352. UA_StatusCode UA_EXPORT
  353. UA_Client_MonitoredItems_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId) {
  354. UA_DeleteMonitoredItemsRequest request;
  355. UA_DeleteMonitoredItemsRequest_init(&request);
  356. request.subscriptionId = subscriptionId;
  357. request.monitoredItemIds = &monitoredItemId;
  358. request.monitoredItemIdsSize = 1;
  359. UA_DeleteMonitoredItemsResponse response =
  360. UA_Client_MonitoredItems_delete(client, request);
  361. UA_StatusCode retval = response.responseHeader.serviceResult;
  362. if(retval != UA_STATUSCODE_GOOD) {
  363. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  364. return retval;
  365. }
  366. if(response.resultsSize != 1) {
  367. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  368. return UA_STATUSCODE_BADINTERNALERROR;
  369. }
  370. retval = response.results[0];
  371. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  372. return retval;
  373. }
  374. /*************************************/
  375. /* Async Processing of Notifications */
  376. /*************************************/
  377. /* Assume the request is already initialized */
  378. UA_StatusCode
  379. UA_Client_preparePublishRequest(UA_Client *client, UA_PublishRequest *request) {
  380. /* Count acks */
  381. UA_Client_NotificationsAckNumber *ack;
  382. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  383. ++request->subscriptionAcknowledgementsSize;
  384. /* Create the array. Returns a sentinel pointer if the length is zero. */
  385. request->subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
  386. UA_Array_new(request->subscriptionAcknowledgementsSize,
  387. &UA_TYPES[UA_TYPES_SUBSCRIPTIONACKNOWLEDGEMENT]);
  388. if(!request->subscriptionAcknowledgements) {
  389. request->subscriptionAcknowledgementsSize = 0;
  390. return UA_STATUSCODE_BADOUTOFMEMORY;
  391. }
  392. size_t i = 0;
  393. UA_Client_NotificationsAckNumber *ack_tmp;
  394. LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, ack_tmp) {
  395. request->subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
  396. request->subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
  397. ++i;
  398. LIST_REMOVE(ack, listEntry);
  399. UA_free(ack);
  400. }
  401. return UA_STATUSCODE_GOOD;
  402. }
  403. /* According to OPC Unified Architecture, Part 4 5.13.1.1 i) */
  404. /* The value 0 is never used for the sequence number */
  405. static UA_UInt32
  406. UA_Client_Subscriptions_nextSequenceNumber(UA_UInt32 sequenceNumber) {
  407. UA_UInt32 nextSequenceNumber = sequenceNumber + 1;
  408. if(nextSequenceNumber == 0)
  409. nextSequenceNumber = 1;
  410. return nextSequenceNumber;
  411. }
  412. static void
  413. processDataChangeNotification(UA_Client *client, UA_Client_Subscription *sub,
  414. UA_DataChangeNotification *dataChangeNotification) {
  415. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
  416. UA_MonitoredItemNotification *min = &dataChangeNotification->monitoredItems[j];
  417. /* Find the MonitoredItem */
  418. UA_Client_MonitoredItem *mon;
  419. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  420. if(mon->clientHandle == min->clientHandle)
  421. break;
  422. }
  423. if(!mon) {
  424. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  425. "Could not process a notification with clienthandle %u on subscription %u",
  426. min->clientHandle, sub->subscriptionId);
  427. continue;
  428. }
  429. if(mon->isEventMonitoredItem) {
  430. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  431. "MonitoredItem is configured for Events. But received a "
  432. "DataChangeNotification.");
  433. continue;
  434. }
  435. mon->handler.dataChangeCallback(client, sub->subscriptionId, sub->context,
  436. mon->monitoredItemId, mon->context,
  437. &min->value);
  438. }
  439. }
  440. static void
  441. processEventNotification(UA_Client *client, UA_Client_Subscription *sub,
  442. UA_EventNotificationList *eventNotificationList) {
  443. for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
  444. UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
  445. /* Find the MonitoredItem */
  446. UA_Client_MonitoredItem *mon;
  447. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  448. if(mon->monitoredItemId == eventFieldList->clientHandle)
  449. break;
  450. }
  451. if(!mon) {
  452. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  453. "Could not process a notification with clienthandle %u on subscription %u",
  454. eventFieldList->clientHandle, sub->subscriptionId);
  455. continue;
  456. }
  457. if(!mon->isEventMonitoredItem) {
  458. UA_LOG_DEBUG(client->config.logger, UA_LOGCATEGORY_CLIENT,
  459. "MonitoredItem is configured for DataChanges. But received a "
  460. "EventNotification.");
  461. continue;
  462. }
  463. mon->handler.eventCallback(client, sub->subscriptionId, sub->context,
  464. mon->monitoredItemId, mon->context,
  465. eventFieldList->eventFieldsSize,
  466. eventFieldList->eventFields);
  467. }
  468. }
  469. static void
  470. processNotificationMessage(UA_Client *client, UA_Client_Subscription *sub,
  471. UA_ExtensionObject *msg) {
  472. if(msg->encoding != UA_EXTENSIONOBJECT_DECODED)
  473. return;
  474. /* Handle DataChangeNotification */
  475. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
  476. UA_DataChangeNotification *dataChangeNotification =
  477. (UA_DataChangeNotification *)msg->content.decoded.data;
  478. processDataChangeNotification(client, sub, dataChangeNotification);
  479. return;
  480. }
  481. /* Handle EventNotification */
  482. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
  483. UA_EventNotificationList *eventNotificationList =
  484. (UA_EventNotificationList *)msg->content.decoded.data;
  485. processEventNotification(client, sub, eventNotificationList);
  486. return;
  487. }
  488. /* Handle StatusChangeNotification */
  489. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]) {
  490. if(sub->statusChangeCallback) {
  491. sub->statusChangeCallback(client, sub->subscriptionId, sub->context,
  492. (UA_StatusChangeNotification*)msg->content.decoded.data);
  493. } else {
  494. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  495. "Dropped a StatusChangeNotification since no callback is registered");
  496. }
  497. return;
  498. }
  499. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  500. "Unknown notification message type");
  501. }
  502. void
  503. UA_Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
  504. UA_PublishResponse *response) {
  505. UA_NotificationMessage *msg = &response->notificationMessage;
  506. client->currentlyOutStandingPublishRequests--;
  507. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS) {
  508. if(client->config.outStandingPublishRequests > 1) {
  509. client->config.outStandingPublishRequests--;
  510. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  511. "Too many publishrequest, reduce outStandingPublishRequests to %d",
  512. client->config.outStandingPublishRequests);
  513. } else {
  514. UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT,
  515. "Too many publishrequest when outStandingPublishRequests = 1");
  516. UA_Client_Subscriptions_deleteSingle(client, response->subscriptionId);
  517. }
  518. return;
  519. }
  520. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSHUTDOWN)
  521. return;
  522. if(!LIST_FIRST(&client->subscriptions)) {
  523. response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
  524. return;
  525. }
  526. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONIDINVALID) {
  527. UA_Client_close(client); /* TODO: This should be handled before the process callback */
  528. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  529. "Received BadSessionIdInvalid");
  530. return;
  531. }
  532. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  533. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  534. "Received Publish Response with code %s",
  535. UA_StatusCode_name(response->responseHeader.serviceResult));
  536. return;
  537. }
  538. UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
  539. if(!sub) {
  540. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  541. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  542. "Received Publish Response for a non-existant subscription");
  543. return;
  544. }
  545. sub->lastActivity = UA_DateTime_nowMonotonic();
  546. /* Detect missing message - OPC Unified Architecture, Part 4 5.13.1.1 e) */
  547. if((sub->sequenceNumber != msg->sequenceNumber) && (msg->sequenceNumber != 0) &&
  548. (UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber) != msg->sequenceNumber)) {
  549. UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT,
  550. "Invalid subscritpion sequenceNumber");
  551. UA_Client_close(client);
  552. return;
  553. }
  554. sub->sequenceNumber = msg->sequenceNumber;
  555. /* Process the notification messages */
  556. for(size_t k = 0; k < msg->notificationDataSize; ++k)
  557. processNotificationMessage(client, sub, &msg->notificationData[k]);
  558. /* Add to the list of pending acks */
  559. for(size_t i = 0; i < response->availableSequenceNumbersSize; i++) {
  560. if(response->availableSequenceNumbers[i] != msg->sequenceNumber)
  561. continue;
  562. UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber*)
  563. UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  564. if(!tmpAck) {
  565. UA_LOG_WARNING(client->config.logger, UA_LOGCATEGORY_CLIENT,
  566. "Not enough memory to store the acknowledgement for a publish "
  567. "message on subscription %u", sub->subscriptionId);
  568. break;
  569. }
  570. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  571. tmpAck->subAck.subscriptionId = sub->subscriptionId;
  572. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  573. break;
  574. }
  575. }
  576. static void
  577. processPublishResponseAsync(UA_Client *client, void *userdata, UA_UInt32 requestId,
  578. void *response, const UA_DataType *responseType) {
  579. UA_PublishRequest *req = (UA_PublishRequest*)userdata;
  580. UA_PublishResponse *res = (UA_PublishResponse*)response;
  581. /* Process the response */
  582. UA_Client_Subscriptions_processPublishResponse(client, req, res);
  583. /* Delete the cached request */
  584. UA_PublishRequest_delete(req);
  585. /* Fill up the outstanding publish requests */
  586. UA_Client_Subscriptions_backgroundPublish(client);
  587. }
  588. void
  589. UA_Client_Subscriptions_clean(UA_Client *client) {
  590. UA_Client_NotificationsAckNumber *n, *tmp;
  591. LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
  592. LIST_REMOVE(n, listEntry);
  593. UA_free(n);
  594. }
  595. UA_Client_Subscription *sub, *tmps;
  596. LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
  597. UA_Client_Subscription_deleteInternal(client, sub); /* force local removal */
  598. client->monitoredItemHandles = 0;
  599. }
  600. void
  601. UA_Client_Subscriptions_backgroundPublishInactivityCheck(UA_Client *client) {
  602. if(client->state < UA_CLIENTSTATE_SESSION)
  603. return;
  604. /* Is the lack of responses the client's fault? */
  605. if(client->currentlyOutStandingPublishRequests == 0)
  606. return;
  607. UA_Client_Subscription *sub;
  608. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  609. UA_DateTime maxSilence = (UA_DateTime)
  610. ((sub->publishingInterval * sub->maxKeepAliveCount) +
  611. client->config.timeout) * UA_DATETIME_MSEC;
  612. if(maxSilence + sub->lastActivity < UA_DateTime_nowMonotonic()) {
  613. /* Reset activity */
  614. sub->lastActivity = UA_DateTime_nowMonotonic();
  615. if (client->config.subscriptionInactivityCallback)
  616. client->config.subscriptionInactivityCallback(client, sub->subscriptionId, sub->context);
  617. UA_LOG_ERROR(client->config.logger, UA_LOGCATEGORY_CLIENT,
  618. "Inactivity for Subscription %u.", sub->subscriptionId);
  619. }
  620. }
  621. }
  622. UA_StatusCode
  623. UA_Client_Subscriptions_backgroundPublish(UA_Client *client) {
  624. if(client->state < UA_CLIENTSTATE_SESSION)
  625. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  626. /* The session must have at least one subscription */
  627. if(!LIST_FIRST(&client->subscriptions))
  628. return UA_STATUSCODE_GOOD;
  629. while(client->currentlyOutStandingPublishRequests < client->config.outStandingPublishRequests) {
  630. UA_PublishRequest *request = UA_PublishRequest_new();
  631. if (!request)
  632. return UA_STATUSCODE_BADOUTOFMEMORY;
  633. UA_StatusCode retval = UA_Client_preparePublishRequest(client, request);
  634. if(retval != UA_STATUSCODE_GOOD) {
  635. UA_PublishRequest_delete(request);
  636. return retval;
  637. }
  638. UA_UInt32 requestId;
  639. client->currentlyOutStandingPublishRequests++;
  640. retval = __UA_Client_AsyncService(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST],
  641. processPublishResponseAsync,
  642. &UA_TYPES[UA_TYPES_PUBLISHRESPONSE],
  643. (void*)request, &requestId);
  644. if(retval != UA_STATUSCODE_GOOD) {
  645. UA_PublishRequest_delete(request);
  646. return retval;
  647. }
  648. }
  649. return UA_STATUSCODE_GOOD;
  650. }
  651. #endif /* UA_ENABLE_SUBSCRIPTIONS */