ua_client_subscriptions.c 32 KB


  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. *
  5. * Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
  6. * Copyright 2015 (c) Oleksiy Vasylyev
  7. * Copyright 2016 (c) Sten Grüner
  8. * Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
  9. * Copyright 2016-2017 (c) Florian Palm
  10. * Copyright 2017 (c) Frank Meerkötter
  11. * Copyright 2017 (c) Stefan Profanter, fortiss GmbH
  12. */
  13. #include "ua_client_highlevel.h"
  14. #include "ua_client_internal.h"
  15. #include "ua_util.h"
  16. #ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
  17. /*****************/
  18. /* Subscriptions */
  19. /*****************/
  20. UA_CreateSubscriptionResponse UA_EXPORT
  21. UA_Client_Subscriptions_create(UA_Client *client,
  22. const UA_CreateSubscriptionRequest request,
  23. void *subscriptionContext,
  24. UA_Client_StatusChangeNotificationCallback statusChangeCallback,
  25. UA_Client_DeleteSubscriptionCallback deleteCallback) {
  26. UA_CreateSubscriptionResponse response;
  27. UA_CreateSubscriptionResponse_init(&response);
  28. /* Allocate the internal representation */
  29. UA_Client_Subscription *newSub = (UA_Client_Subscription*)
  30. UA_malloc(sizeof(UA_Client_Subscription));
  31. if(!newSub) {
  32. response.responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  33. return response;
  34. }
  35. /* Send the request as a synchronous service call */
  36. __UA_Client_Service(client,
  37. &request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST],
  38. &response, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE]);
  39. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  40. UA_free(newSub);
  41. return response;
  42. }
  43. /* Prepare the internal representation */
  44. newSub->context = subscriptionContext;
  45. newSub->subscriptionId = response.subscriptionId;
  46. newSub->sequenceNumber = 0;
  47. newSub->lastActivity = UA_DateTime_nowMonotonic();
  48. newSub->statusChangeCallback = statusChangeCallback;
  49. newSub->deleteCallback = deleteCallback;
  50. newSub->publishingInterval = response.revisedPublishingInterval;
  51. newSub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
  52. LIST_INIT(&newSub->monitoredItems);
  53. LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
  54. return response;
  55. }
  56. static UA_Client_Subscription *
  57. findSubscription(const UA_Client *client, UA_UInt32 subscriptionId) {
  58. UA_Client_Subscription *sub = NULL;
  59. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  60. if(sub->subscriptionId == subscriptionId)
  61. break;
  62. }
  63. return sub;
  64. }
  65. UA_ModifySubscriptionResponse UA_EXPORT
  66. UA_Client_Subscriptions_modify(UA_Client *client, const UA_ModifySubscriptionRequest request) {
  67. UA_ModifySubscriptionResponse response;
  68. UA_ModifySubscriptionResponse_init(&response);
  69. /* Find the internal representation */
  70. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  71. if(!sub) {
  72. response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  73. return response;
  74. }
  75. /* Call the service */
  76. __UA_Client_Service(client,
  77. &request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
  78. &response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]);
  79. /* Adjust the internal representation */
  80. sub->publishingInterval = response.revisedPublishingInterval;
  81. sub->maxKeepAliveCount = response.revisedMaxKeepAliveCount;
  82. return response;
  83. }
  84. static void
  85. UA_Client_Subscription_deleteInternal(UA_Client *client, UA_Client_Subscription *sub) {
  86. /* Remove the MonitoredItems */
  87. UA_Client_MonitoredItem *mon, *mon_tmp;
  88. LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp)
  89. UA_Client_MonitoredItem_remove(client, sub, mon);
  90. /* Call the delete callback */
  91. if(sub->deleteCallback)
  92. sub->deleteCallback(client, sub->subscriptionId, sub->context);
  93. /* Remove */
  94. LIST_REMOVE(sub, listEntry);
  95. UA_free(sub);
  96. }
  97. UA_DeleteSubscriptionsResponse UA_EXPORT
  98. UA_Client_Subscriptions_delete(UA_Client *client, const UA_DeleteSubscriptionsRequest request) {
  99. UA_STACKARRAY(UA_Client_Subscription*, subs, request.subscriptionIdsSize);
  100. memset(subs, 0, sizeof(void*) * request.subscriptionIdsSize);
  101. /* temporary remove the subscriptions from the list */
  102. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  103. subs[i] = findSubscription(client, request.subscriptionIds[i]);
  104. if (subs[i])
  105. LIST_REMOVE(subs[i], listEntry);
  106. }
  107. /* Send the request */
  108. UA_DeleteSubscriptionsResponse response;
  109. __UA_Client_Service(client,
  110. &request, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST],
  111. &response, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE]);
  112. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  113. goto cleanup;
  114. if(request.subscriptionIdsSize != response.resultsSize) {
  115. response.responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  116. goto cleanup;
  117. }
  118. /* Loop over the removed subscriptions and remove internally */
  119. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  120. if(response.results[i] != UA_STATUSCODE_GOOD &&
  121. response.results[i] != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID) {
  122. /* Something was wrong, reinsert the subscription in the list */
  123. if (subs[i])
  124. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  125. continue;
  126. }
  127. if(!subs[i]) {
  128. UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  129. "No internal representation of subscription %u",
  130. request.subscriptionIds[i]);
  131. continue;
  132. } else {
  133. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  134. }
  135. UA_Client_Subscription_deleteInternal(client, subs[i]);
  136. }
  137. return response;
  138. cleanup:
  139. for(size_t i = 0; i < request.subscriptionIdsSize; i++) {
  140. if (subs[i]) {
  141. LIST_INSERT_HEAD(&client->subscriptions, subs[i], listEntry);
  142. }
  143. }
  144. return response;
  145. }
  146. UA_StatusCode UA_EXPORT
  147. UA_Client_Subscriptions_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId) {
  148. UA_DeleteSubscriptionsRequest request;
  149. UA_DeleteSubscriptionsRequest_init(&request);
  150. request.subscriptionIds = &subscriptionId;
  151. request.subscriptionIdsSize = 1;
  152. UA_DeleteSubscriptionsResponse response =
  153. UA_Client_Subscriptions_delete(client, request);
  154. UA_StatusCode retval = response.responseHeader.serviceResult;
  155. if(retval != UA_STATUSCODE_GOOD) {
  156. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  157. return retval;
  158. }
  159. if(response.resultsSize != 1) {
  160. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  161. return UA_STATUSCODE_BADINTERNALERROR;
  162. }
  163. retval = response.results[0];
  164. UA_DeleteSubscriptionsResponse_deleteMembers(&response);
  165. return retval;
  166. }
  167. /******************/
  168. /* MonitoredItems */
  169. /******************/
  170. void
  171. UA_Client_MonitoredItem_remove(UA_Client *client, UA_Client_Subscription *sub,
  172. UA_Client_MonitoredItem *mon) {
  173. LIST_REMOVE(mon, listEntry);
  174. if(mon->deleteCallback)
  175. mon->deleteCallback(client, sub->subscriptionId, sub->context,
  176. mon->monitoredItemId, mon->context);
  177. UA_free(mon);
  178. }
  179. static void
  180. __UA_Client_MonitoredItems_create(UA_Client *client,
  181. const UA_CreateMonitoredItemsRequest *request,
  182. void **contexts, void **handlingCallbacks,
  183. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
  184. UA_CreateMonitoredItemsResponse *response) {
  185. UA_CreateMonitoredItemsResponse_init(response);
  186. if (!request->itemsToCreateSize) {
  187. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  188. return;
  189. }
  190. /* Fix clang warning */
  191. size_t itemsToCreateSize = request->itemsToCreateSize;
  192. UA_Client_Subscription *sub = NULL;
  193. /* Allocate the memory for internal representations */
  194. UA_STACKARRAY(UA_Client_MonitoredItem*, mis, itemsToCreateSize);
  195. memset(mis, 0, sizeof(void*) * itemsToCreateSize);
  196. for(size_t i = 0; i < itemsToCreateSize; i++) {
  197. mis[i] = (UA_Client_MonitoredItem*)UA_malloc(sizeof(UA_Client_MonitoredItem));
  198. if(!mis[i]) {
  199. response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
  200. goto cleanup;
  201. }
  202. }
  203. /* Get the subscription */
  204. sub = findSubscription(client, request->subscriptionId);
  205. if(!sub) {
  206. response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
  207. goto cleanup;
  208. }
  209. /* Set the clientHandle */
  210. for(size_t i = 0; i < itemsToCreateSize; i++)
  211. request->itemsToCreate[i].requestedParameters.clientHandle = ++(client->monitoredItemHandles);
  212. /* Call the service */
  213. __UA_Client_Service(client, request, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
  214. response, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE]);
  215. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  216. goto cleanup;
  217. if(response->resultsSize != itemsToCreateSize) {
  218. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  219. goto cleanup;
  220. }
  221. /* Add internally */
  222. for(size_t i = 0; i < itemsToCreateSize; i++) {
  223. if(response->results[i].statusCode != UA_STATUSCODE_GOOD) {
  224. if (deleteCallbacks[i])
  225. deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
  226. UA_free(mis[i]);
  227. mis[i] = NULL;
  228. continue;
  229. }
  230. UA_Client_MonitoredItem *newMon = mis[i];
  231. newMon->clientHandle = request->itemsToCreate[i].requestedParameters.clientHandle;
  232. newMon->monitoredItemId = response->results[i].monitoredItemId;
  233. newMon->context = contexts[i];
  234. newMon->deleteCallback = deleteCallbacks[i];
  235. newMon->handler.dataChangeCallback =
  236. (UA_Client_DataChangeNotificationCallback)(uintptr_t)handlingCallbacks[i];
  237. newMon->isEventMonitoredItem =
  238. (request->itemsToCreate[i].itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER);
  239. LIST_INSERT_HEAD(&sub->monitoredItems, newMon, listEntry);
  240. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  241. "Subscription %u | Added a MonitoredItem with handle %u",
  242. sub->subscriptionId, newMon->clientHandle);
  243. }
  244. return;
  245. cleanup:
  246. for(size_t i = 0; i < itemsToCreateSize; i++) {
  247. if (deleteCallbacks[i]) {
  248. if (sub)
  249. deleteCallbacks[i](client, sub->subscriptionId, sub->context, 0, contexts[i]);
  250. else
  251. deleteCallbacks[i](client, 0, NULL, 0, contexts[i]);
  252. }
  253. if(mis[i])
  254. UA_free(mis[i]);
  255. }
  256. }
  257. UA_CreateMonitoredItemsResponse UA_EXPORT
  258. UA_Client_MonitoredItems_createDataChanges(UA_Client *client,
  259. const UA_CreateMonitoredItemsRequest request, void **contexts,
  260. UA_Client_DataChangeNotificationCallback *callbacks,
  261. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) {
  262. UA_CreateMonitoredItemsResponse response;
  263. __UA_Client_MonitoredItems_create(client, &request, contexts,
  264. (void**)(uintptr_t)callbacks, deleteCallbacks, &response);
  265. return response;
  266. }
  267. UA_MonitoredItemCreateResult UA_EXPORT
  268. UA_Client_MonitoredItems_createDataChange(UA_Client *client, UA_UInt32 subscriptionId,
  269. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  270. void *context, UA_Client_DataChangeNotificationCallback callback,
  271. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  272. UA_CreateMonitoredItemsRequest request;
  273. UA_CreateMonitoredItemsRequest_init(&request);
  274. request.subscriptionId = subscriptionId;
  275. request.timestampsToReturn = timestampsToReturn;
  276. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  277. request.itemsToCreateSize = 1;
  278. UA_CreateMonitoredItemsResponse response =
  279. UA_Client_MonitoredItems_createDataChanges(client, request, &context,
  280. &callback, &deleteCallback);
  281. UA_MonitoredItemCreateResult result;
  282. UA_MonitoredItemCreateResult_init(&result);
  283. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  284. result.statusCode = response.responseHeader.serviceResult;
  285. if(result.statusCode == UA_STATUSCODE_GOOD &&
  286. response.resultsSize != 1)
  287. result.statusCode = UA_STATUSCODE_BADINTERNALERROR;
  288. if(result.statusCode == UA_STATUSCODE_GOOD)
  289. UA_MonitoredItemCreateResult_copy(&response.results[0] , &result);
  290. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  291. return result;
  292. }
  293. UA_CreateMonitoredItemsResponse UA_EXPORT
  294. UA_Client_MonitoredItems_createEvents(UA_Client *client,
  295. const UA_CreateMonitoredItemsRequest request, void **contexts,
  296. UA_Client_EventNotificationCallback *callbacks,
  297. UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) {
  298. UA_CreateMonitoredItemsResponse response;
  299. __UA_Client_MonitoredItems_create(client, &request, contexts,
  300. (void**)(uintptr_t)callbacks, deleteCallbacks, &response);
  301. return response;
  302. }
  303. UA_MonitoredItemCreateResult UA_EXPORT
  304. UA_Client_MonitoredItems_createEvent(UA_Client *client, UA_UInt32 subscriptionId,
  305. UA_TimestampsToReturn timestampsToReturn, const UA_MonitoredItemCreateRequest item,
  306. void *context, UA_Client_EventNotificationCallback callback,
  307. UA_Client_DeleteMonitoredItemCallback deleteCallback) {
  308. UA_CreateMonitoredItemsRequest request;
  309. UA_CreateMonitoredItemsRequest_init(&request);
  310. request.subscriptionId = subscriptionId;
  311. request.timestampsToReturn = timestampsToReturn;
  312. request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
  313. request.itemsToCreateSize = 1;
  314. UA_CreateMonitoredItemsResponse response =
  315. UA_Client_MonitoredItems_createEvents(client, request, &context,
  316. &callback, &deleteCallback);
  317. UA_MonitoredItemCreateResult result;
  318. UA_MonitoredItemCreateResult_copy(response.results , &result);
  319. UA_CreateMonitoredItemsResponse_deleteMembers(&response);
  320. return result;
  321. }
  322. UA_DeleteMonitoredItemsResponse UA_EXPORT
  323. UA_Client_MonitoredItems_delete(UA_Client *client, const UA_DeleteMonitoredItemsRequest request) {
  324. /* Send the request */
  325. UA_DeleteMonitoredItemsResponse response;
  326. __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST],
  327. &response, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE]);
  328. if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
  329. return response;
  330. UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
  331. if(!sub) {
  332. UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  333. "No internal representation of subscription %u",
  334. request.subscriptionId);
  335. return response;
  336. }
  337. /* Loop over deleted MonitoredItems */
  338. for(size_t i = 0; i < response.resultsSize; i++) {
  339. if(response.results[i] != UA_STATUSCODE_GOOD &&
  340. response.results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
  341. continue;
  342. }
  343. #ifndef __clang_analyzer__
  344. /* Delete the internal representation */
  345. UA_Client_MonitoredItem *mon;
  346. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  347. if(mon->monitoredItemId == request.monitoredItemIds[i]) {
  348. UA_Client_MonitoredItem_remove(client, sub, mon);
  349. break;
  350. }
  351. }
  352. #endif
  353. }
  354. return response;
  355. }
  356. UA_StatusCode UA_EXPORT
  357. UA_Client_MonitoredItems_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId) {
  358. UA_DeleteMonitoredItemsRequest request;
  359. UA_DeleteMonitoredItemsRequest_init(&request);
  360. request.subscriptionId = subscriptionId;
  361. request.monitoredItemIds = &monitoredItemId;
  362. request.monitoredItemIdsSize = 1;
  363. UA_DeleteMonitoredItemsResponse response =
  364. UA_Client_MonitoredItems_delete(client, request);
  365. UA_StatusCode retval = response.responseHeader.serviceResult;
  366. if(retval != UA_STATUSCODE_GOOD) {
  367. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  368. return retval;
  369. }
  370. if(response.resultsSize != 1) {
  371. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  372. return UA_STATUSCODE_BADINTERNALERROR;
  373. }
  374. retval = response.results[0];
  375. UA_DeleteMonitoredItemsResponse_deleteMembers(&response);
  376. return retval;
  377. }
  378. /*************************************/
  379. /* Async Processing of Notifications */
  380. /*************************************/
  381. /* Assume the request is already initialized */
  382. UA_StatusCode
  383. UA_Client_preparePublishRequest(UA_Client *client, UA_PublishRequest *request) {
  384. /* Count acks */
  385. UA_Client_NotificationsAckNumber *ack;
  386. LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
  387. ++request->subscriptionAcknowledgementsSize;
  388. /* Create the array. Returns a sentinel pointer if the length is zero. */
  389. request->subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
  390. UA_Array_new(request->subscriptionAcknowledgementsSize,
  391. &UA_TYPES[UA_TYPES_SUBSCRIPTIONACKNOWLEDGEMENT]);
  392. if(!request->subscriptionAcknowledgements) {
  393. request->subscriptionAcknowledgementsSize = 0;
  394. return UA_STATUSCODE_BADOUTOFMEMORY;
  395. }
  396. size_t i = 0;
  397. UA_Client_NotificationsAckNumber *ack_tmp;
  398. LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, ack_tmp) {
  399. request->subscriptionAcknowledgements[i].sequenceNumber = ack->subAck.sequenceNumber;
  400. request->subscriptionAcknowledgements[i].subscriptionId = ack->subAck.subscriptionId;
  401. ++i;
  402. LIST_REMOVE(ack, listEntry);
  403. UA_free(ack);
  404. }
  405. return UA_STATUSCODE_GOOD;
  406. }
  407. /* According to OPC Unified Architecture, Part 4 5.13.1.1 i) */
  408. /* The value 0 is never used for the sequence number */
  409. static UA_UInt32
  410. UA_Client_Subscriptions_nextSequenceNumber(UA_UInt32 sequenceNumber) {
  411. UA_UInt32 nextSequenceNumber = sequenceNumber + 1;
  412. if(nextSequenceNumber == 0)
  413. nextSequenceNumber = 1;
  414. return nextSequenceNumber;
  415. }
  416. static void
  417. processDataChangeNotification(UA_Client *client, UA_Client_Subscription *sub,
  418. UA_DataChangeNotification *dataChangeNotification) {
  419. for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
  420. UA_MonitoredItemNotification *min = &dataChangeNotification->monitoredItems[j];
  421. /* Find the MonitoredItem */
  422. UA_Client_MonitoredItem *mon;
  423. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  424. if(mon->clientHandle == min->clientHandle)
  425. break;
  426. }
  427. if(!mon) {
  428. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  429. "Could not process a notification with clienthandle %u on subscription %u",
  430. min->clientHandle, sub->subscriptionId);
  431. continue;
  432. }
  433. if(mon->isEventMonitoredItem) {
  434. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  435. "MonitoredItem is configured for Events. But received a "
  436. "DataChangeNotification.");
  437. continue;
  438. }
  439. mon->handler.dataChangeCallback(client, sub->subscriptionId, sub->context,
  440. mon->monitoredItemId, mon->context,
  441. &min->value);
  442. }
  443. }
  444. static void
  445. processEventNotification(UA_Client *client, UA_Client_Subscription *sub,
  446. UA_EventNotificationList *eventNotificationList) {
  447. for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
  448. UA_EventFieldList *eventFieldList = &eventNotificationList->events[j];
  449. /* Find the MonitoredItem */
  450. UA_Client_MonitoredItem *mon;
  451. LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
  452. if(mon->monitoredItemId == eventFieldList->clientHandle)
  453. break;
  454. }
  455. if(!mon) {
  456. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  457. "Could not process a notification with clienthandle %u on subscription %u",
  458. eventFieldList->clientHandle, sub->subscriptionId);
  459. continue;
  460. }
  461. if(!mon->isEventMonitoredItem) {
  462. UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  463. "MonitoredItem is configured for DataChanges. But received a "
  464. "EventNotification.");
  465. continue;
  466. }
  467. mon->handler.eventCallback(client, sub->subscriptionId, sub->context,
  468. mon->monitoredItemId, mon->context,
  469. eventFieldList->eventFieldsSize,
  470. eventFieldList->eventFields);
  471. }
  472. }
  473. static void
  474. processNotificationMessage(UA_Client *client, UA_Client_Subscription *sub,
  475. UA_ExtensionObject *msg) {
  476. if(msg->encoding != UA_EXTENSIONOBJECT_DECODED)
  477. return;
  478. /* Handle DataChangeNotification */
  479. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
  480. UA_DataChangeNotification *dataChangeNotification =
  481. (UA_DataChangeNotification *)msg->content.decoded.data;
  482. processDataChangeNotification(client, sub, dataChangeNotification);
  483. return;
  484. }
  485. /* Handle EventNotification */
  486. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
  487. UA_EventNotificationList *eventNotificationList =
  488. (UA_EventNotificationList *)msg->content.decoded.data;
  489. processEventNotification(client, sub, eventNotificationList);
  490. return;
  491. }
  492. /* Handle StatusChangeNotification */
  493. if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]) {
  494. if(sub->statusChangeCallback) {
  495. sub->statusChangeCallback(client, sub->subscriptionId, sub->context,
  496. (UA_StatusChangeNotification*)msg->content.decoded.data);
  497. } else {
  498. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  499. "Dropped a StatusChangeNotification since no callback is registered");
  500. }
  501. return;
  502. }
  503. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  504. "Unknown notification message type");
  505. }
  506. void
  507. UA_Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
  508. UA_PublishResponse *response) {
  509. UA_NotificationMessage *msg = &response->notificationMessage;
  510. client->currentlyOutStandingPublishRequests--;
  511. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS) {
  512. if(client->config.outStandingPublishRequests > 1) {
  513. client->config.outStandingPublishRequests--;
  514. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  515. "Too many publishrequest, reduce outStandingPublishRequests to %d",
  516. client->config.outStandingPublishRequests);
  517. } else {
  518. UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  519. "Too many publishrequest when outStandingPublishRequests = 1");
  520. UA_Client_Subscriptions_deleteSingle(client, response->subscriptionId);
  521. }
  522. return;
  523. }
  524. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSHUTDOWN)
  525. return;
  526. if(!LIST_FIRST(&client->subscriptions)) {
  527. response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
  528. return;
  529. }
  530. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONCLOSED) {
  531. if(client->state >= UA_CLIENTSTATE_SESSION) {
  532. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  533. "Received Publish Response with code %s",
  534. UA_StatusCode_name(response->responseHeader.serviceResult));
  535. UA_Client_Subscription* sub = findSubscription(client, response->subscriptionId);
  536. if (sub != NULL)
  537. UA_Client_Subscription_deleteInternal(client, sub);
  538. }
  539. return;
  540. }
  541. if(response->responseHeader.serviceResult == UA_STATUSCODE_BADSESSIONIDINVALID) {
  542. UA_Client_disconnect(client); /* TODO: This should be handled before the process callback */
  543. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  544. "Received BadSessionIdInvalid");
  545. return;
  546. }
  547. if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
  548. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  549. "Received Publish Response with code %s",
  550. UA_StatusCode_name(response->responseHeader.serviceResult));
  551. return;
  552. }
  553. UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
  554. if(!sub) {
  555. response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
  556. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  557. "Received Publish Response for a non-existant subscription");
  558. return;
  559. }
  560. sub->lastActivity = UA_DateTime_nowMonotonic();
  561. /* Detect missing message - OPC Unified Architecture, Part 4 5.13.1.1 e) */
  562. if(UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber) != msg->sequenceNumber) {
  563. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  564. "Invalid subscription sequence number: expected %u but got %u",
  565. UA_Client_Subscriptions_nextSequenceNumber(sub->sequenceNumber),
  566. msg->sequenceNumber);
  567. /* This is an error. But we do not abort the connection. Some server
  568. * SDKs misbehave from time to time and send out-of-order sequence
  569. * numbers. (Probably some multi-threading synchronization issue.) */
  570. /* UA_Client_disconnect(client);
  571. return; */
  572. }
  573. /* According to f), a keep-alive message contains no notifications and has the sequence number
  574. * of the next NotificationMessage that is to be sent => More than one consecutive keep-alive
  575. * message or a NotificationMessage following a keep-alive message will share the same sequence
  576. * number. */
  577. if (msg->notificationDataSize)
  578. sub->sequenceNumber = msg->sequenceNumber;
  579. /* Process the notification messages */
  580. for(size_t k = 0; k < msg->notificationDataSize; ++k)
  581. processNotificationMessage(client, sub, &msg->notificationData[k]);
  582. /* Add to the list of pending acks */
  583. for(size_t i = 0; i < response->availableSequenceNumbersSize; i++) {
  584. if(response->availableSequenceNumbers[i] != msg->sequenceNumber)
  585. continue;
  586. UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber*)
  587. UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
  588. if(!tmpAck) {
  589. UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  590. "Not enough memory to store the acknowledgement for a publish "
  591. "message on subscription %u", sub->subscriptionId);
  592. break;
  593. }
  594. tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
  595. tmpAck->subAck.subscriptionId = sub->subscriptionId;
  596. LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
  597. break;
  598. }
  599. }
  600. static void
  601. processPublishResponseAsync(UA_Client *client, void *userdata, UA_UInt32 requestId,
  602. void *response) {
  603. UA_PublishRequest *req = (UA_PublishRequest*)userdata;
  604. UA_PublishResponse *res = (UA_PublishResponse*)response;
  605. /* Process the response */
  606. UA_Client_Subscriptions_processPublishResponse(client, req, res);
  607. /* Delete the cached request */
  608. UA_PublishRequest_delete(req);
  609. /* Fill up the outstanding publish requests */
  610. UA_Client_Subscriptions_backgroundPublish(client);
  611. }
  612. void
  613. UA_Client_Subscriptions_clean(UA_Client *client) {
  614. UA_Client_NotificationsAckNumber *n, *tmp;
  615. LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
  616. LIST_REMOVE(n, listEntry);
  617. UA_free(n);
  618. }
  619. UA_Client_Subscription *sub, *tmps;
  620. LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
  621. UA_Client_Subscription_deleteInternal(client, sub); /* force local removal */
  622. client->monitoredItemHandles = 0;
  623. }
  624. void
  625. UA_Client_Subscriptions_backgroundPublishInactivityCheck(UA_Client *client) {
  626. if(client->state < UA_CLIENTSTATE_SESSION)
  627. return;
  628. /* Is the lack of responses the client's fault? */
  629. if(client->currentlyOutStandingPublishRequests == 0)
  630. return;
  631. UA_Client_Subscription *sub;
  632. LIST_FOREACH(sub, &client->subscriptions, listEntry) {
  633. UA_DateTime maxSilence = (UA_DateTime)
  634. ((sub->publishingInterval * sub->maxKeepAliveCount) +
  635. client->config.timeout) * UA_DATETIME_MSEC;
  636. if(maxSilence + sub->lastActivity < UA_DateTime_nowMonotonic()) {
  637. /* Reset activity */
  638. sub->lastActivity = UA_DateTime_nowMonotonic();
  639. if(client->config.subscriptionInactivityCallback)
  640. client->config.subscriptionInactivityCallback(client, sub->subscriptionId,
  641. sub->context);
  642. UA_LOG_ERROR(&client->config.logger, UA_LOGCATEGORY_CLIENT,
  643. "Inactivity for Subscription %u.", sub->subscriptionId);
  644. }
  645. }
  646. }
  647. UA_StatusCode
  648. UA_Client_Subscriptions_backgroundPublish(UA_Client *client) {
  649. if(client->state < UA_CLIENTSTATE_SESSION)
  650. return UA_STATUSCODE_BADSERVERNOTCONNECTED;
  651. /* The session must have at least one subscription */
  652. if(!LIST_FIRST(&client->subscriptions))
  653. return UA_STATUSCODE_GOOD;
  654. while(client->currentlyOutStandingPublishRequests < client->config.outStandingPublishRequests) {
  655. UA_PublishRequest *request = UA_PublishRequest_new();
  656. if (!request)
  657. return UA_STATUSCODE_BADOUTOFMEMORY;
  658. request->requestHeader.timeoutHint=60000;
  659. UA_StatusCode retval = UA_Client_preparePublishRequest(client, request);
  660. if(retval != UA_STATUSCODE_GOOD) {
  661. UA_PublishRequest_delete(request);
  662. return retval;
  663. }
  664. UA_UInt32 requestId;
  665. client->currentlyOutStandingPublishRequests++;
  666. /* Disable the timeout, it is treat in UA_Client_Subscriptions_backgroundPublishInactivityCheck */
  667. retval = __UA_Client_AsyncServiceEx(client, request, &UA_TYPES[UA_TYPES_PUBLISHREQUEST],
  668. processPublishResponseAsync,
  669. &UA_TYPES[UA_TYPES_PUBLISHRESPONSE],
  670. (void*)request, &requestId, 0);
  671. if(retval != UA_STATUSCODE_GOOD) {
  672. UA_PublishRequest_delete(request);
  673. return retval;
  674. }
  675. }
  676. return UA_STATUSCODE_GOOD;
  677. }
  678. #endif /* UA_ENABLE_SUBSCRIPTIONS */